diff options
Diffstat (limited to 'syz-ci/jobs.go')
| -rw-r--r-- | syz-ci/jobs.go | 25 |
1 files changed, 9 insertions, 16 deletions
diff --git a/syz-ci/jobs.go b/syz-ci/jobs.go index 0c2df74d4..a9bb23abc 100644 --- a/syz-ci/jobs.go +++ b/syz-ci/jobs.go @@ -5,6 +5,7 @@ package main import ( "bytes" + "context" "errors" "fmt" "io" @@ -61,24 +62,16 @@ func newJobManager(cfg *Config, managers []*Manager, shutdownPending chan struct }, nil } -// startLoop starts a job loop in parallel and returns a blocking function -// to gracefully stop job processing. -func (jm *JobManager) startLoop(wg *sync.WaitGroup) func() { - stop := make(chan struct{}) - done := make(chan struct{}, 1) +// startLoop starts a job loop in parallel. +func (jm *JobManager) startLoop(ctx context.Context, wg *sync.WaitGroup) { wg.Add(1) go func() { defer wg.Done() - jm.loop(stop) - done <- struct{}{} + jm.loop(ctx) }() - return func() { - close(stop) - <-done - } } -func (jm *JobManager) loop(stop chan struct{}) { +func (jm *JobManager) loop(ctx context.Context) { if err := jm.resetJobs(); err != nil { if jm.dash != nil { jm.dash.LogError("syz-ci", "reset jobs failed: %v", err) @@ -109,7 +102,7 @@ func (jm *JobManager) loop(stop chan struct{}) { wg.Add(1) go func() { defer wg.Done() - jp.loop(stop) + jp.loop(ctx) }() if !main || !jm.needParallelProcessor() { break @@ -143,14 +136,14 @@ func (jm *JobManager) resetJobs() error { return nil } -func (jp *JobProcessor) loop(stop chan struct{}) { +func (jp *JobProcessor) loop(ctx context.Context) { jp.Logf(0, "job loop started") loop: for { // Check jp.stop separately first, otherwise if stop signal arrives during a job execution, // we can still grab the next job with 50% probability. select { - case <-stop: + case <-ctx.Done(): break loop default: } @@ -166,7 +159,7 @@ loop: jp.pollJobs() case <-jp.commitTicker: jp.pollCommits() - case <-stop: + case <-ctx.Done(): break loop } } |
