diff options
| author | Taras Madan <tarasmadan@google.com> | 2025-05-28 16:03:17 +0200 |
|---|---|---|
| committer | Taras Madan <tarasmadan@google.com> | 2025-06-04 11:15:51 +0000 |
| commit | fd5e6e6163762c69d56c2cf63b53ec292c2f23ea (patch) | |
| tree | 709ff0f1ec07dec823c12bff50af376296beb4d8 /syz-ci/manager.go | |
| parent | e565f08d1cb546967f255601a17a0c958d06ce6d (diff) | |
syz-ci: use context for termination
1. Use context for the goroutines termination.
2. uploadCoverJSONLToGCS reached 8 params, refactor.
Diffstat (limited to 'syz-ci/manager.go')
| -rw-r--r-- | syz-ci/manager.go | 77 |
1 files changed, 42 insertions, 35 deletions
diff --git a/syz-ci/manager.go b/syz-ci/manager.go index 297828d79..2036805c9 100644 --- a/syz-ci/manager.go +++ b/syz-ci/manager.go @@ -89,7 +89,6 @@ type Manager struct { dash ManagerDashapi debugStorage bool storage *asset.Storage - stop chan struct{} debug bool lastBuild *dashapi.Build buildFailed bool @@ -105,8 +104,7 @@ type ManagerDashapi interface { UploadCommits(commits []dashapi.Commit) error } -func createManager(cfg *Config, mgrcfg *ManagerConfig, stop chan struct{}, - debug bool) (*Manager, error) { +func createManager(cfg *Config, mgrcfg *ManagerConfig, debug bool) (*Manager, error) { dir := osutil.Abs(filepath.Join("managers", mgrcfg.Name)) err := osutil.MkdirAll(dir) if err != nil { @@ -157,7 +155,6 @@ func createManager(cfg *Config, mgrcfg *ManagerConfig, stop chan struct{}, managercfg: mgrcfg.managercfg, storage: assetStorage, debugStorage: !cfg.AssetStorage.IsEmpty() && cfg.AssetStorage.Debug, - stop: stop, debug: debug, } // Leave the dashboard interface value as nil if it does not wrap a valid dashboard pointer. @@ -182,7 +179,7 @@ var testSem = instance.NewSemaphore(1) const fuzzingMinutesBeforeCover = 360 const benchUploadPeriod = 30 * time.Minute -func (mgr *Manager) loop() { +func (mgr *Manager) loop(ctx context.Context) { lastCommit := "" nextBuildTime := time.Now() var managerRestartTime, artifactUploadTime, benchUploadTime time.Time @@ -210,20 +207,20 @@ loop: for { if time.Since(nextBuildTime) >= 0 { var rebuildAfter time.Duration - lastCommit, latestInfo, rebuildAfter = mgr.pollAndBuild(lastCommit, latestInfo) + lastCommit, latestInfo, rebuildAfter = mgr.pollAndBuild(ctx, lastCommit, latestInfo) nextBuildTime = time.Now().Add(rebuildAfter) } if !artifactUploadTime.IsZero() && time.Now().After(artifactUploadTime) { artifactUploadTime = time.Time{} - if err := mgr.uploadCoverReport(); err != nil { + if err := mgr.uploadCoverReport(ctx); err != nil { mgr.Errorf("failed to upload cover report: %v", err) } - if err := mgr.uploadProgramsWithCoverage(); err != nil { + if err := mgr.uploadProgramsWithCoverage(ctx); err != nil { mgr.Errorf("failed to upload programs with coverage: %v", err) } // Function uploadCoverStat also forces manager to drop the coverage structures to reduce memory usage. // Should be the last request touching the coverage data. - if err := mgr.uploadCoverStat(fuzzingMinutesBeforeCover); err != nil { + if err := mgr.uploadCoverStat(ctx, fuzzingMinutesBeforeCover); err != nil { mgr.Errorf("failed to upload coverage stat: %v", err) } if err := mgr.uploadCorpus(); err != nil { @@ -232,13 +229,13 @@ loop: } if mgr.cfg.BenchUploadPath != "" && time.Now().After(benchUploadTime) { benchUploadTime = time.Now().Add(benchUploadPeriod) - if err := mgr.uploadBenchData(); err != nil { + if err := mgr.uploadBenchData(ctx); err != nil { mgr.Errorf("failed to upload bench: %v", err) } } select { - case <-mgr.stop: + case <-ctx.Done(): break loop default: } @@ -253,7 +250,7 @@ loop: select { case <-ticker.C: - case <-mgr.stop: + case <-ctx.Done(): break loop } } @@ -275,7 +272,7 @@ func (mgr *Manager) archiveCommit(commit string) { } } -func (mgr *Manager) pollAndBuild(lastCommit string, latestInfo *BuildInfo) ( +func (mgr *Manager) pollAndBuild(ctx context.Context, lastCommit string, latestInfo *BuildInfo) ( string, *BuildInfo, time.Duration) { rebuildAfter := buildRetryPeriod commit, err := mgr.repo.Poll(mgr.mgrcfg.Repo, mgr.mgrcfg.Branch) @@ -307,7 +304,7 @@ func (mgr *Manager) pollAndBuild(lastCommit string, latestInfo *BuildInfo) ( } } buildSem.Signal() - case <-mgr.stop: + case <-ctx.Done(): } } } @@ -848,7 +845,7 @@ func (mgr *Manager) httpGET(ctx context.Context, path string) (resp *http.Respon return client.Do(req) } -func (mgr *Manager) uploadCoverReport() error { +func (mgr *Manager) uploadCoverReport(ctx context.Context) error { directUpload := mgr.managercfg.Cover && mgr.cfg.CoverUploadPath != "" if mgr.storage == nil && !directUpload { // Cover report uploading is disabled. @@ -860,7 +857,7 @@ func (mgr *Manager) uploadCoverReport() error { // Report generation can consume lots of memory. Generate one at a time. select { case <-buildSem.WaitC(): - case <-mgr.stop: + case <-ctx.Done(): return nil } defer buildSem.Signal() @@ -886,8 +883,14 @@ func (mgr *Manager) uploadCoverReport() error { return nil } -func (mgr *Manager) uploadCoverJSONLToGCS(gcsClient gcs.Client, mgrSrc, gcsDest, nameSuffix string, - publish, compress bool, f func(io.Writer, *json.Decoder) error) error { +type uploadOptions struct { + nameSuffix string + publish bool + compress bool +} + +func (mgr *Manager) uploadCoverJSONLToGCS(ctx context.Context, gcsClient gcs.Client, mgrSrc, gcsDest string, + opts uploadOptions, f func(io.Writer, *json.Decoder) error) error { if !mgr.managercfg.Cover || gcsDest == "" { return nil } @@ -896,12 +899,12 @@ func (mgr *Manager) uploadCoverJSONLToGCS(gcsClient gcs.Client, mgrSrc, gcsDest, // TODO: remove it once #4585 (symbolization tuning) is closed select { case <-buildSem.WaitC(): - case <-mgr.stop: + case <-ctx.Done(): return nil } defer buildSem.Signal() - eg, egCtx := errgroup.WithContext(context.Background()) + eg, egCtx := errgroup.WithContext(ctx) resp, err := mgr.httpGET(egCtx, mgrSrc) if err != nil { return fmt.Errorf("failed to httpGet %s: %w", mgrSrc, err) @@ -919,7 +922,7 @@ func (mgr *Manager) uploadCoverJSONLToGCS(gcsClient gcs.Client, mgrSrc, gcsDest, defer pw.Close() var w io.Writer w = pw - if compress { + if opts.compress { gzw := gzip.NewWriter(pw) defer gzw.Close() w = gzw @@ -934,8 +937,8 @@ func (mgr *Manager) uploadCoverJSONLToGCS(gcsClient gcs.Client, mgrSrc, gcsDest, }) eg.Go(func() error { defer pr.Close() - fileName := fmt.Sprintf("%s/%s%s.jsonl", mgr.mgrcfg.DashboardClient, mgr.name, nameSuffix) - if err := uploadFile(egCtx, gcsClient, gcsDest, fileName, pr, publish); err != nil { + fileName := fmt.Sprintf("%s/%s%s.jsonl", mgr.mgrcfg.DashboardClient, mgr.name, opts.nameSuffix) + if err := uploadFile(egCtx, gcsClient, gcsDest, fileName, pr, opts.publish); err != nil { return fmt.Errorf("uploadFile: %w", err) } return nil @@ -943,17 +946,19 @@ func (mgr *Manager) uploadCoverJSONLToGCS(gcsClient gcs.Client, mgrSrc, gcsDest, return eg.Wait() } -func (mgr *Manager) uploadCoverStat(fuzzingMinutes int) error { +func (mgr *Manager) uploadCoverStat(ctx context.Context, fuzzingMinutes int) error { // Coverage report generation consumes and caches lots of memory. // In the syz-ci context report generation won't be used after this point, // so tell manager to flush report generator. curTime := time.Now() - if err := mgr.uploadCoverJSONLToGCS(nil, + if err := mgr.uploadCoverJSONLToGCS(ctx, nil, "/cover?jsonl=1&flush=1", mgr.cfg.CoverPipelinePath, - time.Now().Format("-2006-01-02-15-04"), - false, - false, + uploadOptions{ + nameSuffix: time.Now().Format("-2006-01-02-15-04"), + publish: false, + compress: false, + }, func(w io.Writer, dec *json.Decoder) error { var covInfo cover.CoverageInfo if err := dec.Decode(&covInfo); err != nil { @@ -979,13 +984,15 @@ func (mgr *Manager) uploadCoverStat(fuzzingMinutes int) error { return nil } -func (mgr *Manager) uploadProgramsWithCoverage() error { - if err := mgr.uploadCoverJSONLToGCS(nil, +func (mgr *Manager) uploadProgramsWithCoverage(ctx context.Context) error { + if err := mgr.uploadCoverJSONLToGCS(ctx, nil, "/coverprogs?jsonl=1", mgr.cfg.CoverProgramsPath, - "", - mgr.cfg.PublishGCS, - true, + uploadOptions{ + nameSuffix: "", + publish: mgr.cfg.PublishGCS, + compress: true, + }, func(w io.Writer, dec *json.Decoder) error { var programCoverage cover.ProgramCoverage if err := dec.Decode(&programCoverage); err != nil { @@ -1015,7 +1022,7 @@ func (mgr *Manager) uploadCorpus() error { return uploadFile(context.Background(), nil, mgr.cfg.CorpusUploadPath, mgr.name+"-corpus.db", f, mgr.cfg.PublishGCS) } -func (mgr *Manager) uploadBenchData() error { +func (mgr *Manager) uploadBenchData(ctx context.Context) error { if mgr.lastRestarted.IsZero() { return nil } @@ -1030,7 +1037,7 @@ func (mgr *Manager) uploadBenchData() error { return fmt.Errorf("failed to open bench file: %w", err) } defer f.Close() - err = uploadFile(context.Background(), nil, mgr.cfg.BenchUploadPath+"/"+mgr.name, + err = uploadFile(ctx, nil, mgr.cfg.BenchUploadPath+"/"+mgr.name, mgr.lastRestarted.Format("2006-01-02_15h.json"), f, false) if err != nil { return fmt.Errorf("failed to upload the bench file: %w", err) |
