aboutsummaryrefslogtreecommitdiffstats
path: root/syz-ci/jobs.go
diff options
context:
space:
mode:
Diffstat (limited to 'syz-ci/jobs.go')
-rw-r--r--syz-ci/jobs.go25
1 files changed, 9 insertions, 16 deletions
diff --git a/syz-ci/jobs.go b/syz-ci/jobs.go
index 0c2df74d4..a9bb23abc 100644
--- a/syz-ci/jobs.go
+++ b/syz-ci/jobs.go
@@ -5,6 +5,7 @@ package main
import (
"bytes"
+ "context"
"errors"
"fmt"
"io"
@@ -61,24 +62,16 @@ func newJobManager(cfg *Config, managers []*Manager, shutdownPending chan struct
}, nil
}
-// startLoop starts a job loop in parallel and returns a blocking function
-// to gracefully stop job processing.
-func (jm *JobManager) startLoop(wg *sync.WaitGroup) func() {
- stop := make(chan struct{})
- done := make(chan struct{}, 1)
+// startLoop starts a job loop in parallel.
+func (jm *JobManager) startLoop(ctx context.Context, wg *sync.WaitGroup) {
wg.Add(1)
go func() {
defer wg.Done()
- jm.loop(stop)
- done <- struct{}{}
+ jm.loop(ctx)
}()
- return func() {
- close(stop)
- <-done
- }
}
-func (jm *JobManager) loop(stop chan struct{}) {
+func (jm *JobManager) loop(ctx context.Context) {
if err := jm.resetJobs(); err != nil {
if jm.dash != nil {
jm.dash.LogError("syz-ci", "reset jobs failed: %v", err)
@@ -109,7 +102,7 @@ func (jm *JobManager) loop(stop chan struct{}) {
wg.Add(1)
go func() {
defer wg.Done()
- jp.loop(stop)
+ jp.loop(ctx)
}()
if !main || !jm.needParallelProcessor() {
break
@@ -143,14 +136,14 @@ func (jm *JobManager) resetJobs() error {
return nil
}
-func (jp *JobProcessor) loop(stop chan struct{}) {
+func (jp *JobProcessor) loop(ctx context.Context) {
jp.Logf(0, "job loop started")
loop:
for {
// Check jp.stop separately first, otherwise if stop signal arrives during a job execution,
// we can still grab the next job with 50% probability.
select {
- case <-stop:
+ case <-ctx.Done():
break loop
default:
}
@@ -166,7 +159,7 @@ loop:
jp.pollJobs()
case <-jp.commitTicker:
jp.pollCommits()
- case <-stop:
+ case <-ctx.Done():
break loop
}
}