aboutsummaryrefslogtreecommitdiffstats
path: root/syz-ci/manager.go
diff options
context:
space:
mode:
authorTaras Madan <tarasmadan@google.com>2025-05-28 16:03:17 +0200
committerTaras Madan <tarasmadan@google.com>2025-06-04 11:15:51 +0000
commitfd5e6e6163762c69d56c2cf63b53ec292c2f23ea (patch)
tree709ff0f1ec07dec823c12bff50af376296beb4d8 /syz-ci/manager.go
parente565f08d1cb546967f255601a17a0c958d06ce6d (diff)
syz-ci: use context for termination
1. Use context for the goroutines termination. 2. uploadCoverJSONLToGCS reached 8 params, refactor.
Diffstat (limited to 'syz-ci/manager.go')
-rw-r--r--syz-ci/manager.go77
1 files changed, 42 insertions, 35 deletions
diff --git a/syz-ci/manager.go b/syz-ci/manager.go
index 297828d79..2036805c9 100644
--- a/syz-ci/manager.go
+++ b/syz-ci/manager.go
@@ -89,7 +89,6 @@ type Manager struct {
dash ManagerDashapi
debugStorage bool
storage *asset.Storage
- stop chan struct{}
debug bool
lastBuild *dashapi.Build
buildFailed bool
@@ -105,8 +104,7 @@ type ManagerDashapi interface {
UploadCommits(commits []dashapi.Commit) error
}
-func createManager(cfg *Config, mgrcfg *ManagerConfig, stop chan struct{},
- debug bool) (*Manager, error) {
+func createManager(cfg *Config, mgrcfg *ManagerConfig, debug bool) (*Manager, error) {
dir := osutil.Abs(filepath.Join("managers", mgrcfg.Name))
err := osutil.MkdirAll(dir)
if err != nil {
@@ -157,7 +155,6 @@ func createManager(cfg *Config, mgrcfg *ManagerConfig, stop chan struct{},
managercfg: mgrcfg.managercfg,
storage: assetStorage,
debugStorage: !cfg.AssetStorage.IsEmpty() && cfg.AssetStorage.Debug,
- stop: stop,
debug: debug,
}
// Leave the dashboard interface value as nil if it does not wrap a valid dashboard pointer.
@@ -182,7 +179,7 @@ var testSem = instance.NewSemaphore(1)
const fuzzingMinutesBeforeCover = 360
const benchUploadPeriod = 30 * time.Minute
-func (mgr *Manager) loop() {
+func (mgr *Manager) loop(ctx context.Context) {
lastCommit := ""
nextBuildTime := time.Now()
var managerRestartTime, artifactUploadTime, benchUploadTime time.Time
@@ -210,20 +207,20 @@ loop:
for {
if time.Since(nextBuildTime) >= 0 {
var rebuildAfter time.Duration
- lastCommit, latestInfo, rebuildAfter = mgr.pollAndBuild(lastCommit, latestInfo)
+ lastCommit, latestInfo, rebuildAfter = mgr.pollAndBuild(ctx, lastCommit, latestInfo)
nextBuildTime = time.Now().Add(rebuildAfter)
}
if !artifactUploadTime.IsZero() && time.Now().After(artifactUploadTime) {
artifactUploadTime = time.Time{}
- if err := mgr.uploadCoverReport(); err != nil {
+ if err := mgr.uploadCoverReport(ctx); err != nil {
mgr.Errorf("failed to upload cover report: %v", err)
}
- if err := mgr.uploadProgramsWithCoverage(); err != nil {
+ if err := mgr.uploadProgramsWithCoverage(ctx); err != nil {
mgr.Errorf("failed to upload programs with coverage: %v", err)
}
// Function uploadCoverStat also forces manager to drop the coverage structures to reduce memory usage.
// Should be the last request touching the coverage data.
- if err := mgr.uploadCoverStat(fuzzingMinutesBeforeCover); err != nil {
+ if err := mgr.uploadCoverStat(ctx, fuzzingMinutesBeforeCover); err != nil {
mgr.Errorf("failed to upload coverage stat: %v", err)
}
if err := mgr.uploadCorpus(); err != nil {
@@ -232,13 +229,13 @@ loop:
}
if mgr.cfg.BenchUploadPath != "" && time.Now().After(benchUploadTime) {
benchUploadTime = time.Now().Add(benchUploadPeriod)
- if err := mgr.uploadBenchData(); err != nil {
+ if err := mgr.uploadBenchData(ctx); err != nil {
mgr.Errorf("failed to upload bench: %v", err)
}
}
select {
- case <-mgr.stop:
+ case <-ctx.Done():
break loop
default:
}
@@ -253,7 +250,7 @@ loop:
select {
case <-ticker.C:
- case <-mgr.stop:
+ case <-ctx.Done():
break loop
}
}
@@ -275,7 +272,7 @@ func (mgr *Manager) archiveCommit(commit string) {
}
}
-func (mgr *Manager) pollAndBuild(lastCommit string, latestInfo *BuildInfo) (
+func (mgr *Manager) pollAndBuild(ctx context.Context, lastCommit string, latestInfo *BuildInfo) (
string, *BuildInfo, time.Duration) {
rebuildAfter := buildRetryPeriod
commit, err := mgr.repo.Poll(mgr.mgrcfg.Repo, mgr.mgrcfg.Branch)
@@ -307,7 +304,7 @@ func (mgr *Manager) pollAndBuild(lastCommit string, latestInfo *BuildInfo) (
}
}
buildSem.Signal()
- case <-mgr.stop:
+ case <-ctx.Done():
}
}
}
@@ -848,7 +845,7 @@ func (mgr *Manager) httpGET(ctx context.Context, path string) (resp *http.Respon
return client.Do(req)
}
-func (mgr *Manager) uploadCoverReport() error {
+func (mgr *Manager) uploadCoverReport(ctx context.Context) error {
directUpload := mgr.managercfg.Cover && mgr.cfg.CoverUploadPath != ""
if mgr.storage == nil && !directUpload {
// Cover report uploading is disabled.
@@ -860,7 +857,7 @@ func (mgr *Manager) uploadCoverReport() error {
// Report generation can consume lots of memory. Generate one at a time.
select {
case <-buildSem.WaitC():
- case <-mgr.stop:
+ case <-ctx.Done():
return nil
}
defer buildSem.Signal()
@@ -886,8 +883,14 @@ func (mgr *Manager) uploadCoverReport() error {
return nil
}
-func (mgr *Manager) uploadCoverJSONLToGCS(gcsClient gcs.Client, mgrSrc, gcsDest, nameSuffix string,
- publish, compress bool, f func(io.Writer, *json.Decoder) error) error {
+type uploadOptions struct {
+ nameSuffix string
+ publish bool
+ compress bool
+}
+
+func (mgr *Manager) uploadCoverJSONLToGCS(ctx context.Context, gcsClient gcs.Client, mgrSrc, gcsDest string,
+ opts uploadOptions, f func(io.Writer, *json.Decoder) error) error {
if !mgr.managercfg.Cover || gcsDest == "" {
return nil
}
@@ -896,12 +899,12 @@ func (mgr *Manager) uploadCoverJSONLToGCS(gcsClient gcs.Client, mgrSrc, gcsDest,
// TODO: remove it once #4585 (symbolization tuning) is closed
select {
case <-buildSem.WaitC():
- case <-mgr.stop:
+ case <-ctx.Done():
return nil
}
defer buildSem.Signal()
- eg, egCtx := errgroup.WithContext(context.Background())
+ eg, egCtx := errgroup.WithContext(ctx)
resp, err := mgr.httpGET(egCtx, mgrSrc)
if err != nil {
return fmt.Errorf("failed to httpGet %s: %w", mgrSrc, err)
@@ -919,7 +922,7 @@ func (mgr *Manager) uploadCoverJSONLToGCS(gcsClient gcs.Client, mgrSrc, gcsDest,
defer pw.Close()
var w io.Writer
w = pw
- if compress {
+ if opts.compress {
gzw := gzip.NewWriter(pw)
defer gzw.Close()
w = gzw
@@ -934,8 +937,8 @@ func (mgr *Manager) uploadCoverJSONLToGCS(gcsClient gcs.Client, mgrSrc, gcsDest,
})
eg.Go(func() error {
defer pr.Close()
- fileName := fmt.Sprintf("%s/%s%s.jsonl", mgr.mgrcfg.DashboardClient, mgr.name, nameSuffix)
- if err := uploadFile(egCtx, gcsClient, gcsDest, fileName, pr, publish); err != nil {
+ fileName := fmt.Sprintf("%s/%s%s.jsonl", mgr.mgrcfg.DashboardClient, mgr.name, opts.nameSuffix)
+ if err := uploadFile(egCtx, gcsClient, gcsDest, fileName, pr, opts.publish); err != nil {
return fmt.Errorf("uploadFile: %w", err)
}
return nil
@@ -943,17 +946,19 @@ func (mgr *Manager) uploadCoverJSONLToGCS(gcsClient gcs.Client, mgrSrc, gcsDest,
return eg.Wait()
}
-func (mgr *Manager) uploadCoverStat(fuzzingMinutes int) error {
+func (mgr *Manager) uploadCoverStat(ctx context.Context, fuzzingMinutes int) error {
// Coverage report generation consumes and caches lots of memory.
// In the syz-ci context report generation won't be used after this point,
// so tell manager to flush report generator.
curTime := time.Now()
- if err := mgr.uploadCoverJSONLToGCS(nil,
+ if err := mgr.uploadCoverJSONLToGCS(ctx, nil,
"/cover?jsonl=1&flush=1",
mgr.cfg.CoverPipelinePath,
- time.Now().Format("-2006-01-02-15-04"),
- false,
- false,
+ uploadOptions{
+ nameSuffix: time.Now().Format("-2006-01-02-15-04"),
+ publish: false,
+ compress: false,
+ },
func(w io.Writer, dec *json.Decoder) error {
var covInfo cover.CoverageInfo
if err := dec.Decode(&covInfo); err != nil {
@@ -979,13 +984,15 @@ func (mgr *Manager) uploadCoverStat(fuzzingMinutes int) error {
return nil
}
-func (mgr *Manager) uploadProgramsWithCoverage() error {
- if err := mgr.uploadCoverJSONLToGCS(nil,
+func (mgr *Manager) uploadProgramsWithCoverage(ctx context.Context) error {
+ if err := mgr.uploadCoverJSONLToGCS(ctx, nil,
"/coverprogs?jsonl=1",
mgr.cfg.CoverProgramsPath,
- "",
- mgr.cfg.PublishGCS,
- true,
+ uploadOptions{
+ nameSuffix: "",
+ publish: mgr.cfg.PublishGCS,
+ compress: true,
+ },
func(w io.Writer, dec *json.Decoder) error {
var programCoverage cover.ProgramCoverage
if err := dec.Decode(&programCoverage); err != nil {
@@ -1015,7 +1022,7 @@ func (mgr *Manager) uploadCorpus() error {
return uploadFile(context.Background(), nil, mgr.cfg.CorpusUploadPath, mgr.name+"-corpus.db", f, mgr.cfg.PublishGCS)
}
-func (mgr *Manager) uploadBenchData() error {
+func (mgr *Manager) uploadBenchData(ctx context.Context) error {
if mgr.lastRestarted.IsZero() {
return nil
}
@@ -1030,7 +1037,7 @@ func (mgr *Manager) uploadBenchData() error {
return fmt.Errorf("failed to open bench file: %w", err)
}
defer f.Close()
- err = uploadFile(context.Background(), nil, mgr.cfg.BenchUploadPath+"/"+mgr.name,
+ err = uploadFile(ctx, nil, mgr.cfg.BenchUploadPath+"/"+mgr.name,
mgr.lastRestarted.Format("2006-01-02_15h.json"), f, false)
if err != nil {
return fmt.Errorf("failed to upload the bench file: %w", err)