aboutsummaryrefslogtreecommitdiffstats
path: root/syz-ci
diff options
context:
space:
mode:
authorTaras Madan <tarasmadan@google.com>2025-05-28 16:03:17 +0200
committerTaras Madan <tarasmadan@google.com>2025-06-04 11:15:51 +0000
commitfd5e6e6163762c69d56c2cf63b53ec292c2f23ea (patch)
tree709ff0f1ec07dec823c12bff50af376296beb4d8 /syz-ci
parente565f08d1cb546967f255601a17a0c958d06ce6d (diff)
syz-ci: use context for termination
1. Use context for the goroutines termination. 2. uploadCoverJSONLToGCS reached 8 params, refactor.
Diffstat (limited to 'syz-ci')
-rw-r--r--syz-ci/jobs.go25
-rw-r--r--syz-ci/manager.go77
-rw-r--r--syz-ci/manager_test.go11
-rw-r--r--syz-ci/syz-ci.go24
4 files changed, 73 insertions, 64 deletions
diff --git a/syz-ci/jobs.go b/syz-ci/jobs.go
index 0c2df74d4..a9bb23abc 100644
--- a/syz-ci/jobs.go
+++ b/syz-ci/jobs.go
@@ -5,6 +5,7 @@ package main
import (
"bytes"
+ "context"
"errors"
"fmt"
"io"
@@ -61,24 +62,16 @@ func newJobManager(cfg *Config, managers []*Manager, shutdownPending chan struct
}, nil
}
-// startLoop starts a job loop in parallel and returns a blocking function
-// to gracefully stop job processing.
-func (jm *JobManager) startLoop(wg *sync.WaitGroup) func() {
- stop := make(chan struct{})
- done := make(chan struct{}, 1)
+// startLoop starts a job loop in parallel.
+func (jm *JobManager) startLoop(ctx context.Context, wg *sync.WaitGroup) {
wg.Add(1)
go func() {
defer wg.Done()
- jm.loop(stop)
- done <- struct{}{}
+ jm.loop(ctx)
}()
- return func() {
- close(stop)
- <-done
- }
}
-func (jm *JobManager) loop(stop chan struct{}) {
+func (jm *JobManager) loop(ctx context.Context) {
if err := jm.resetJobs(); err != nil {
if jm.dash != nil {
jm.dash.LogError("syz-ci", "reset jobs failed: %v", err)
@@ -109,7 +102,7 @@ func (jm *JobManager) loop(stop chan struct{}) {
wg.Add(1)
go func() {
defer wg.Done()
- jp.loop(stop)
+ jp.loop(ctx)
}()
if !main || !jm.needParallelProcessor() {
break
@@ -143,14 +136,14 @@ func (jm *JobManager) resetJobs() error {
return nil
}
-func (jp *JobProcessor) loop(stop chan struct{}) {
+func (jp *JobProcessor) loop(ctx context.Context) {
jp.Logf(0, "job loop started")
loop:
for {
// Check jp.stop separately first, otherwise if stop signal arrives during a job execution,
// we can still grab the next job with 50% probability.
select {
- case <-stop:
+ case <-ctx.Done():
break loop
default:
}
@@ -166,7 +159,7 @@ loop:
jp.pollJobs()
case <-jp.commitTicker:
jp.pollCommits()
- case <-stop:
+ case <-ctx.Done():
break loop
}
}
diff --git a/syz-ci/manager.go b/syz-ci/manager.go
index 297828d79..2036805c9 100644
--- a/syz-ci/manager.go
+++ b/syz-ci/manager.go
@@ -89,7 +89,6 @@ type Manager struct {
dash ManagerDashapi
debugStorage bool
storage *asset.Storage
- stop chan struct{}
debug bool
lastBuild *dashapi.Build
buildFailed bool
@@ -105,8 +104,7 @@ type ManagerDashapi interface {
UploadCommits(commits []dashapi.Commit) error
}
-func createManager(cfg *Config, mgrcfg *ManagerConfig, stop chan struct{},
- debug bool) (*Manager, error) {
+func createManager(cfg *Config, mgrcfg *ManagerConfig, debug bool) (*Manager, error) {
dir := osutil.Abs(filepath.Join("managers", mgrcfg.Name))
err := osutil.MkdirAll(dir)
if err != nil {
@@ -157,7 +155,6 @@ func createManager(cfg *Config, mgrcfg *ManagerConfig, stop chan struct{},
managercfg: mgrcfg.managercfg,
storage: assetStorage,
debugStorage: !cfg.AssetStorage.IsEmpty() && cfg.AssetStorage.Debug,
- stop: stop,
debug: debug,
}
// Leave the dashboard interface value as nil if it does not wrap a valid dashboard pointer.
@@ -182,7 +179,7 @@ var testSem = instance.NewSemaphore(1)
const fuzzingMinutesBeforeCover = 360
const benchUploadPeriod = 30 * time.Minute
-func (mgr *Manager) loop() {
+func (mgr *Manager) loop(ctx context.Context) {
lastCommit := ""
nextBuildTime := time.Now()
var managerRestartTime, artifactUploadTime, benchUploadTime time.Time
@@ -210,20 +207,20 @@ loop:
for {
if time.Since(nextBuildTime) >= 0 {
var rebuildAfter time.Duration
- lastCommit, latestInfo, rebuildAfter = mgr.pollAndBuild(lastCommit, latestInfo)
+ lastCommit, latestInfo, rebuildAfter = mgr.pollAndBuild(ctx, lastCommit, latestInfo)
nextBuildTime = time.Now().Add(rebuildAfter)
}
if !artifactUploadTime.IsZero() && time.Now().After(artifactUploadTime) {
artifactUploadTime = time.Time{}
- if err := mgr.uploadCoverReport(); err != nil {
+ if err := mgr.uploadCoverReport(ctx); err != nil {
mgr.Errorf("failed to upload cover report: %v", err)
}
- if err := mgr.uploadProgramsWithCoverage(); err != nil {
+ if err := mgr.uploadProgramsWithCoverage(ctx); err != nil {
mgr.Errorf("failed to upload programs with coverage: %v", err)
}
// Function uploadCoverStat also forces manager to drop the coverage structures to reduce memory usage.
// Should be the last request touching the coverage data.
- if err := mgr.uploadCoverStat(fuzzingMinutesBeforeCover); err != nil {
+ if err := mgr.uploadCoverStat(ctx, fuzzingMinutesBeforeCover); err != nil {
mgr.Errorf("failed to upload coverage stat: %v", err)
}
if err := mgr.uploadCorpus(); err != nil {
@@ -232,13 +229,13 @@ loop:
}
if mgr.cfg.BenchUploadPath != "" && time.Now().After(benchUploadTime) {
benchUploadTime = time.Now().Add(benchUploadPeriod)
- if err := mgr.uploadBenchData(); err != nil {
+ if err := mgr.uploadBenchData(ctx); err != nil {
mgr.Errorf("failed to upload bench: %v", err)
}
}
select {
- case <-mgr.stop:
+ case <-ctx.Done():
break loop
default:
}
@@ -253,7 +250,7 @@ loop:
select {
case <-ticker.C:
- case <-mgr.stop:
+ case <-ctx.Done():
break loop
}
}
@@ -275,7 +272,7 @@ func (mgr *Manager) archiveCommit(commit string) {
}
}
-func (mgr *Manager) pollAndBuild(lastCommit string, latestInfo *BuildInfo) (
+func (mgr *Manager) pollAndBuild(ctx context.Context, lastCommit string, latestInfo *BuildInfo) (
string, *BuildInfo, time.Duration) {
rebuildAfter := buildRetryPeriod
commit, err := mgr.repo.Poll(mgr.mgrcfg.Repo, mgr.mgrcfg.Branch)
@@ -307,7 +304,7 @@ func (mgr *Manager) pollAndBuild(lastCommit string, latestInfo *BuildInfo) (
}
}
buildSem.Signal()
- case <-mgr.stop:
+ case <-ctx.Done():
}
}
}
@@ -848,7 +845,7 @@ func (mgr *Manager) httpGET(ctx context.Context, path string) (resp *http.Respon
return client.Do(req)
}
-func (mgr *Manager) uploadCoverReport() error {
+func (mgr *Manager) uploadCoverReport(ctx context.Context) error {
directUpload := mgr.managercfg.Cover && mgr.cfg.CoverUploadPath != ""
if mgr.storage == nil && !directUpload {
// Cover report uploading is disabled.
@@ -860,7 +857,7 @@ func (mgr *Manager) uploadCoverReport() error {
// Report generation can consume lots of memory. Generate one at a time.
select {
case <-buildSem.WaitC():
- case <-mgr.stop:
+ case <-ctx.Done():
return nil
}
defer buildSem.Signal()
@@ -886,8 +883,14 @@ func (mgr *Manager) uploadCoverReport() error {
return nil
}
-func (mgr *Manager) uploadCoverJSONLToGCS(gcsClient gcs.Client, mgrSrc, gcsDest, nameSuffix string,
- publish, compress bool, f func(io.Writer, *json.Decoder) error) error {
+type uploadOptions struct {
+ nameSuffix string
+ publish bool
+ compress bool
+}
+
+func (mgr *Manager) uploadCoverJSONLToGCS(ctx context.Context, gcsClient gcs.Client, mgrSrc, gcsDest string,
+ opts uploadOptions, f func(io.Writer, *json.Decoder) error) error {
if !mgr.managercfg.Cover || gcsDest == "" {
return nil
}
@@ -896,12 +899,12 @@ func (mgr *Manager) uploadCoverJSONLToGCS(gcsClient gcs.Client, mgrSrc, gcsDest,
// TODO: remove it once #4585 (symbolization tuning) is closed
select {
case <-buildSem.WaitC():
- case <-mgr.stop:
+ case <-ctx.Done():
return nil
}
defer buildSem.Signal()
- eg, egCtx := errgroup.WithContext(context.Background())
+ eg, egCtx := errgroup.WithContext(ctx)
resp, err := mgr.httpGET(egCtx, mgrSrc)
if err != nil {
return fmt.Errorf("failed to httpGet %s: %w", mgrSrc, err)
@@ -919,7 +922,7 @@ func (mgr *Manager) uploadCoverJSONLToGCS(gcsClient gcs.Client, mgrSrc, gcsDest,
defer pw.Close()
var w io.Writer
w = pw
- if compress {
+ if opts.compress {
gzw := gzip.NewWriter(pw)
defer gzw.Close()
w = gzw
@@ -934,8 +937,8 @@ func (mgr *Manager) uploadCoverJSONLToGCS(gcsClient gcs.Client, mgrSrc, gcsDest,
})
eg.Go(func() error {
defer pr.Close()
- fileName := fmt.Sprintf("%s/%s%s.jsonl", mgr.mgrcfg.DashboardClient, mgr.name, nameSuffix)
- if err := uploadFile(egCtx, gcsClient, gcsDest, fileName, pr, publish); err != nil {
+ fileName := fmt.Sprintf("%s/%s%s.jsonl", mgr.mgrcfg.DashboardClient, mgr.name, opts.nameSuffix)
+ if err := uploadFile(egCtx, gcsClient, gcsDest, fileName, pr, opts.publish); err != nil {
return fmt.Errorf("uploadFile: %w", err)
}
return nil
@@ -943,17 +946,19 @@ func (mgr *Manager) uploadCoverJSONLToGCS(gcsClient gcs.Client, mgrSrc, gcsDest,
return eg.Wait()
}
-func (mgr *Manager) uploadCoverStat(fuzzingMinutes int) error {
+func (mgr *Manager) uploadCoverStat(ctx context.Context, fuzzingMinutes int) error {
// Coverage report generation consumes and caches lots of memory.
// In the syz-ci context report generation won't be used after this point,
// so tell manager to flush report generator.
curTime := time.Now()
- if err := mgr.uploadCoverJSONLToGCS(nil,
+ if err := mgr.uploadCoverJSONLToGCS(ctx, nil,
"/cover?jsonl=1&flush=1",
mgr.cfg.CoverPipelinePath,
- time.Now().Format("-2006-01-02-15-04"),
- false,
- false,
+ uploadOptions{
+ nameSuffix: time.Now().Format("-2006-01-02-15-04"),
+ publish: false,
+ compress: false,
+ },
func(w io.Writer, dec *json.Decoder) error {
var covInfo cover.CoverageInfo
if err := dec.Decode(&covInfo); err != nil {
@@ -979,13 +984,15 @@ func (mgr *Manager) uploadCoverStat(fuzzingMinutes int) error {
return nil
}
-func (mgr *Manager) uploadProgramsWithCoverage() error {
- if err := mgr.uploadCoverJSONLToGCS(nil,
+func (mgr *Manager) uploadProgramsWithCoverage(ctx context.Context) error {
+ if err := mgr.uploadCoverJSONLToGCS(ctx, nil,
"/coverprogs?jsonl=1",
mgr.cfg.CoverProgramsPath,
- "",
- mgr.cfg.PublishGCS,
- true,
+ uploadOptions{
+ nameSuffix: "",
+ publish: mgr.cfg.PublishGCS,
+ compress: true,
+ },
func(w io.Writer, dec *json.Decoder) error {
var programCoverage cover.ProgramCoverage
if err := dec.Decode(&programCoverage); err != nil {
@@ -1015,7 +1022,7 @@ func (mgr *Manager) uploadCorpus() error {
return uploadFile(context.Background(), nil, mgr.cfg.CorpusUploadPath, mgr.name+"-corpus.db", f, mgr.cfg.PublishGCS)
}
-func (mgr *Manager) uploadBenchData() error {
+func (mgr *Manager) uploadBenchData(ctx context.Context) error {
if mgr.lastRestarted.IsZero() {
return nil
}
@@ -1030,7 +1037,7 @@ func (mgr *Manager) uploadBenchData() error {
return fmt.Errorf("failed to open bench file: %w", err)
}
defer f.Close()
- err = uploadFile(context.Background(), nil, mgr.cfg.BenchUploadPath+"/"+mgr.name,
+ err = uploadFile(ctx, nil, mgr.cfg.BenchUploadPath+"/"+mgr.name,
mgr.lastRestarted.Format("2006-01-02_15h.json"), f, false)
if err != nil {
return fmt.Errorf("failed to upload the bench file: %w", err)
diff --git a/syz-ci/manager_test.go b/syz-ci/manager_test.go
index 9fd980ed2..e9ca7071d 100644
--- a/syz-ci/manager_test.go
+++ b/syz-ci/manager_test.go
@@ -6,6 +6,7 @@ package main
import (
"bytes"
"compress/gzip"
+ "context"
"encoding/json"
"fmt"
"io"
@@ -200,11 +201,15 @@ func TestUploadCoverJSONLToGCS(t *testing.T) {
gcsMock.On("Publish", test.wantGCSFileName).
Return(nil).Once()
}
- err := mgr.uploadCoverJSONLToGCS(gcsMock,
+ err := mgr.uploadCoverJSONLToGCS(context.Background(), gcsMock,
"/teststream&jsonl=1",
"gs://test-bucket",
- test.inputNameSuffix,
- test.inputPublish, test.inputCompress, func(w io.Writer, dec *json.Decoder) error {
+ uploadOptions{
+ nameSuffix: test.inputNameSuffix,
+ publish: test.inputPublish,
+ compress: test.inputCompress,
+ },
+ func(w io.Writer, dec *json.Decoder) error {
var v any
if err := dec.Decode(&v); err != nil {
return fmt.Errorf("cover.ProgramCoverage: %w", err)
diff --git a/syz-ci/syz-ci.go b/syz-ci/syz-ci.go
index 74afa2b89..3cefab1bc 100644
--- a/syz-ci/syz-ci.go
+++ b/syz-ci/syz-ci.go
@@ -52,6 +52,7 @@ package main
// modification time allows us to understand if we need to rebuild after a restart.
import (
+ "context"
"encoding/json"
"flag"
"fmt"
@@ -281,10 +282,10 @@ func main() {
}()
}
- stop := make(chan struct{})
+ ctx, stop := context.WithCancel(context.Background())
var managers []*Manager
for _, mgrcfg := range cfg.Managers {
- mgr, err := createManager(cfg, mgrcfg, stop, *flagDebug)
+ mgr, err := createManager(cfg, mgrcfg, *flagDebug)
if err != nil {
log.Errorf("failed to create manager %v: %v", mgrcfg.Name, err)
continue
@@ -300,7 +301,7 @@ func main() {
wg.Add(1)
go func() {
defer wg.Done()
- mgr.loop()
+ mgr.loop(ctx)
}()
}
}
@@ -308,12 +309,14 @@ func main() {
if err != nil {
log.Fatalf("failed to create dashapi connection %v", err)
}
- stopJobs := jp.startLoop(&wg)
+ ctxJobs, stopJobs := context.WithCancel(ctx)
+ wgJobs := sync.WaitGroup{}
+ jp.startLoop(ctxJobs, &wgJobs)
// For testing. Racy. Use with care.
http.HandleFunc("/upload_cover", func(w http.ResponseWriter, r *http.Request) {
for _, mgr := range managers {
- if err := mgr.uploadCoverReport(); err != nil {
+ if err := mgr.uploadCoverReport(ctx); err != nil {
w.Write([]byte(fmt.Sprintf("failed for %v: %v <br>\n", mgr.name, err)))
return
}
@@ -322,14 +325,15 @@ func main() {
})
wg.Add(1)
- go deprecateAssets(cfg, stop, &wg)
+ go deprecateAssets(ctx, cfg, &wg)
select {
case <-shutdownPending:
case <-updatePending:
}
- stopJobs() // Gracefully wait for the running jobs to finish.
- close(stop)
+ stopJobs()
+ wgJobs.Wait()
+ stop()
wg.Wait()
select {
@@ -339,7 +343,7 @@ func main() {
}
}
-func deprecateAssets(cfg *Config, stop chan struct{}, wg *sync.WaitGroup) {
+func deprecateAssets(ctx context.Context, cfg *Config, wg *sync.WaitGroup) {
defer wg.Done()
if cfg.DashboardAddr == "" || cfg.AssetStorage.IsEmpty() ||
!cfg.AssetStorage.DoDeprecation {
@@ -359,7 +363,7 @@ loop:
for {
const sleepDuration = 6 * time.Hour
select {
- case <-stop:
+ case <-ctx.Done():
break loop
case <-time.After(sleepDuration):
}