aboutsummaryrefslogtreecommitdiffstats
path: root/syz-ci/jobs.go
diff options
context:
space:
mode:
authorAleksandr Nogikh <nogikh@google.com>2023-01-02 19:34:01 +0100
committerAleksandr Nogikh <wp32pw@gmail.com>2023-01-19 11:26:54 +0100
commite9883646d93ed92b96068f3fad8e8c5dc29bd5bd (patch)
tree03b291c93e58df9412ed361ee20af4cfe7a4edcf /syz-ci/jobs.go
parent4ac68196f128fe22ddd1e5b57d021c70f19e4894 (diff)
syz-ci: do jobs in parallel
Introduce a new ParallelJobs configuration parameter. If set, syz-ci will execute the specified kinds of jobs in parallel to the main job processing thread. This should resolve the 7-8 hour patch testing delays when syz-ci is busy doing a bisection.
Diffstat (limited to 'syz-ci/jobs.go')
-rw-r--r--syz-ci/jobs.go138
1 files changed, 96 insertions, 42 deletions
diff --git a/syz-ci/jobs.go b/syz-ci/jobs.go
index 7f19a862f..7c3da9cf8 100644
--- a/syz-ci/jobs.go
+++ b/syz-ci/jobs.go
@@ -11,6 +11,7 @@ import (
"os"
"path/filepath"
"strings"
+ "sync"
"time"
"github.com/google/syzkaller/dashboard/dashapi"
@@ -26,68 +27,119 @@ import (
"github.com/google/syzkaller/vm"
)
-type JobProcessor struct {
+type JobManager struct {
cfg *Config
- name string
- managers []*Manager
- knownCommits map[string]bool
- stop chan struct{}
- shutdownPending chan struct{}
dash *dashapi.Dashboard
- syzkallerRepo string
- syzkallerBranch string
+ managers []*Manager
+ shutdownPending <-chan struct{}
+}
+
+type JobProcessor struct {
+ *JobManager
+ name string
+ instanceSuffix string
+ knownCommits map[string]bool
+ baseDir string
+ jobFilter *ManagerJobs
+ jobTicker <-chan time.Time
+ commitTicker <-chan time.Time
}
-func newJobProcessor(cfg *Config, managers []*Manager, stop, shutdownPending chan struct{}) (*JobProcessor, error) {
+func newJobManager(cfg *Config, managers []*Manager, shutdownPending chan struct{}) (*JobManager, error) {
dash, err := dashapi.New(cfg.DashboardClient, cfg.DashboardAddr, cfg.DashboardKey)
if err != nil {
return nil, err
}
- return &JobProcessor{
+ return &JobManager{
cfg: cfg,
- name: fmt.Sprintf("%v-job", cfg.Name),
+ dash: dash,
managers: managers,
- knownCommits: make(map[string]bool),
- stop: stop,
shutdownPending: shutdownPending,
- dash: dash,
- syzkallerRepo: cfg.SyzkallerRepo,
- syzkallerBranch: cfg.SyzkallerBranch,
}, nil
}
-func (jp *JobProcessor) loop() {
- jobTicker := time.NewTicker(time.Duration(jp.cfg.JobPollPeriod) * time.Second)
- commitTicker := time.NewTicker(time.Duration(jp.cfg.CommitPollPeriod) * time.Second)
- defer jobTicker.Stop()
+func (jm *JobManager) loop(stop chan struct{}) {
+ if err := jm.resetJobs(); err != nil {
+ if jm.dash != nil {
+ jm.dash.LogError("syz-ci", "reset jobs failed: %v", err)
+ }
+ return
+ }
+ commitTicker := time.NewTicker(time.Duration(jm.cfg.CommitPollPeriod) * time.Second)
defer commitTicker.Stop()
+ jobTicker := time.NewTicker(time.Duration(jm.cfg.JobPollPeriod) * time.Second)
+ defer jobTicker.Stop()
+ var wg sync.WaitGroup
+ for main := true; ; main = false {
+ jp := &JobProcessor{
+ JobManager: jm,
+ jobTicker: jobTicker.C,
+ }
+ if main {
+ jp.instanceSuffix = "-job"
+ jp.baseDir = osutil.Abs("jobs")
+ jp.commitTicker = commitTicker.C
+ jp.knownCommits = make(map[string]bool)
+ } else {
+ jp.instanceSuffix = "-job-parallel"
+ jp.baseDir = osutil.Abs("jobs-2")
+ // For now let's only parallelize patch testing requests.
+ jp.jobFilter = &ManagerJobs{TestPatches: true}
+ }
+ jp.name = fmt.Sprintf("%v%v", jm.cfg.Name, jp.instanceSuffix)
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ jp.loop(stop)
+ }()
+ if main != jm.cfg.ParallelJobs {
+ break
+ }
+ }
+ wg.Wait()
+}
+
+func (jm *JobManager) resetJobs() error {
+ managerNames := []string{}
+ for _, mgr := range jm.managers {
+ if mgr.mgrcfg.Jobs.AnyEnabled() {
+ managerNames = append(managerNames, mgr.name)
+ }
+ }
+ if len(managerNames) > 0 {
+ return jm.dash.JobReset(&dashapi.JobResetReq{Managers: managerNames})
+ }
+ return nil
+}
+
+func (jp *JobProcessor) loop(stop chan struct{}) {
loop:
for {
// Check jp.stop separately first, otherwise if stop signal arrives during a job execution,
// we can still grab the next job with 50% probability.
select {
- case <-jp.stop:
+ case <-stop:
break loop
default:
}
// Similar for commit polling: if we grab 2-3 bisect jobs in a row,
// it can delay commit polling by days.
select {
- case <-commitTicker.C:
+ case <-jp.commitTicker:
jp.pollCommits()
default:
}
select {
- case <-jobTicker.C:
+ case <-jp.jobTicker:
if buildSem.Available() == 0 {
// If normal kernel build is in progress (usually on start), don't query jobs.
// Otherwise we claim a job, but can't start it for a while.
continue loop
}
jp.pollJobs()
- case <-commitTicker.C:
+ case <-jp.commitTicker:
jp.pollCommits()
- case <-jp.stop:
+ case <-stop:
break loop
}
}
@@ -174,7 +226,7 @@ func (jp *JobProcessor) pollManagerCommits(mgr *Manager) error {
}
func (jp *JobProcessor) pollRepo(mgr *Manager, URL, branch, reportEmail string) ([]*vcs.Commit, error) {
- dir := osutil.Abs(filepath.Join("jobs", mgr.managercfg.TargetOS, "kernel"))
+ dir := filepath.Join(jp.baseDir, mgr.managercfg.TargetOS, "kernel")
repo, err := vcs.NewRepo(mgr.managercfg.TargetOS, mgr.managercfg.Type, dir)
if err != nil {
return nil, fmt.Errorf("failed to create kernel repo: %v", err)
@@ -186,7 +238,7 @@ func (jp *JobProcessor) pollRepo(mgr *Manager, URL, branch, reportEmail string)
}
func (jp *JobProcessor) getCommitInfo(mgr *Manager, URL, branch string, commits []string) ([]*vcs.Commit, error) {
- dir := osutil.Abs(filepath.Join("jobs", mgr.managercfg.TargetOS, "kernel"))
+ dir := filepath.Join(jp.baseDir, mgr.managercfg.TargetOS, "kernel")
repo, err := vcs.NewRepo(mgr.managercfg.TargetOS, mgr.managercfg.Type, dir)
if err != nil {
return nil, fmt.Errorf("failed to create kernel repo: %v", err)
@@ -209,15 +261,17 @@ func (jp *JobProcessor) pollJobs() {
Managers: make(map[string]dashapi.ManagerJobs),
}
for _, mgr := range jp.managers {
- if !mgr.mgrcfg.Jobs.TestPatches &&
- !mgr.mgrcfg.Jobs.BisectCause &&
- !mgr.mgrcfg.Jobs.BisectFix {
- continue
+ jobs := &mgr.mgrcfg.Jobs
+ if jp.jobFilter != nil {
+ jobs = jobs.Filter(jp.jobFilter)
+ }
+ apiJobs := dashapi.ManagerJobs{
+ TestPatches: jobs.TestPatches,
+ BisectCause: jobs.BisectCause,
+ BisectFix: jobs.BisectFix,
}
- poll.Managers[mgr.name] = dashapi.ManagerJobs{
- TestPatches: mgr.mgrcfg.Jobs.TestPatches,
- BisectCause: mgr.mgrcfg.Jobs.BisectCause,
- BisectFix: mgr.mgrcfg.Jobs.BisectFix,
+ if apiJobs.TestPatches || apiJobs.BisectCause || apiJobs.BisectFix {
+ poll.Managers[mgr.name] = apiJobs
}
}
if len(poll.Managers) == 0 {
@@ -280,7 +334,7 @@ type Job struct {
func (jp *JobProcessor) process(job *Job) *dashapi.JobDoneReq {
req, mgr := job.req, job.mgr
- dir := osutil.Abs(filepath.Join("jobs", mgr.managercfg.TargetOS))
+ dir := filepath.Join(jp.baseDir, mgr.managercfg.TargetOS)
mgrcfg := new(mgrconfig.Config)
*mgrcfg = *mgr.managercfg
mgrcfg.Workdir = filepath.Join(dir, "workdir")
@@ -303,12 +357,12 @@ func (jp *JobProcessor) process(job *Job) *dashapi.JobDoneReq {
job.resp = resp
switch req.Type {
case dashapi.JobTestPatch:
- mgrcfg.Name += "-test-job"
+ mgrcfg.Name += "-test" + jp.instanceSuffix
resp.Build.KernelRepo = req.KernelRepo
resp.Build.KernelBranch = req.KernelBranch
resp.Build.KernelCommit = "[unknown]"
case dashapi.JobBisectCause, dashapi.JobBisectFix:
- mgrcfg.Name += "-bisect-job"
+ mgrcfg.Name += "-bisect" + jp.instanceSuffix
resp.Build.KernelRepo = mgr.mgrcfg.Repo
resp.Build.KernelBranch = mgr.mgrcfg.Branch
resp.Build.KernelCommit = req.KernelCommit
@@ -351,10 +405,10 @@ func (jp *JobProcessor) process(job *Job) *dashapi.JobDoneReq {
var err error
switch req.Type {
case dashapi.JobTestPatch:
- mgrcfg.Name += "-test-job"
+ mgrcfg.Name += "-test" + jp.instanceSuffix
err = jp.testPatch(job, mgrcfg)
case dashapi.JobBisectCause, dashapi.JobBisectFix:
- mgrcfg.Name += "-bisect-job"
+ mgrcfg.Name += "-bisect" + jp.instanceSuffix
err = jp.bisect(job, mgrcfg)
}
if err != nil {
@@ -424,7 +478,7 @@ func (jp *JobProcessor) bisect(job *Job, mgrcfg *mgrconfig.Config) error {
Userspace: mgr.mgrcfg.Userspace,
},
Syzkaller: bisect.SyzkallerConfig{
- Repo: jp.syzkallerRepo,
+ Repo: jp.cfg.SyzkallerRepo,
Commit: req.SyzkallerCommit,
},
Repro: bisect.ReproConfig{
@@ -498,7 +552,7 @@ func (jp *JobProcessor) testPatch(job *Job, mgrcfg *mgrconfig.Config) error {
return err
}
log.Logf(0, "job: building syzkaller on %v...", req.SyzkallerCommit)
- syzBuildLog, syzBuildErr := env.BuildSyzkaller(jp.syzkallerRepo, req.SyzkallerCommit)
+ syzBuildLog, syzBuildErr := env.BuildSyzkaller(jp.cfg.SyzkallerRepo, req.SyzkallerCommit)
if syzBuildErr != nil {
return syzBuildErr
}