diff options
| author | Aleksandr Nogikh <nogikh@google.com> | 2023-01-02 19:34:01 +0100 |
|---|---|---|
| committer | Aleksandr Nogikh <wp32pw@gmail.com> | 2023-01-19 11:26:54 +0100 |
| commit | e9883646d93ed92b96068f3fad8e8c5dc29bd5bd (patch) | |
| tree | 03b291c93e58df9412ed361ee20af4cfe7a4edcf /syz-ci/jobs.go | |
| parent | 4ac68196f128fe22ddd1e5b57d021c70f19e4894 (diff) | |
syz-ci: do jobs in parallel
Introduce a new ParallelJobs configuration parameter. If set, syz-ci
will execute the specified kinds of jobs in parallel to the main job
processing thread.
This should resolve the 7-8 hour patch testing delays when syz-ci is
busy doing a bisection.
Diffstat (limited to 'syz-ci/jobs.go')
| -rw-r--r-- | syz-ci/jobs.go | 138 |
1 files changed, 96 insertions, 42 deletions
diff --git a/syz-ci/jobs.go b/syz-ci/jobs.go index 7f19a862f..7c3da9cf8 100644 --- a/syz-ci/jobs.go +++ b/syz-ci/jobs.go @@ -11,6 +11,7 @@ import ( "os" "path/filepath" "strings" + "sync" "time" "github.com/google/syzkaller/dashboard/dashapi" @@ -26,68 +27,119 @@ import ( "github.com/google/syzkaller/vm" ) -type JobProcessor struct { +type JobManager struct { cfg *Config - name string - managers []*Manager - knownCommits map[string]bool - stop chan struct{} - shutdownPending chan struct{} dash *dashapi.Dashboard - syzkallerRepo string - syzkallerBranch string + managers []*Manager + shutdownPending <-chan struct{} +} + +type JobProcessor struct { + *JobManager + name string + instanceSuffix string + knownCommits map[string]bool + baseDir string + jobFilter *ManagerJobs + jobTicker <-chan time.Time + commitTicker <-chan time.Time } -func newJobProcessor(cfg *Config, managers []*Manager, stop, shutdownPending chan struct{}) (*JobProcessor, error) { +func newJobManager(cfg *Config, managers []*Manager, shutdownPending chan struct{}) (*JobManager, error) { dash, err := dashapi.New(cfg.DashboardClient, cfg.DashboardAddr, cfg.DashboardKey) if err != nil { return nil, err } - return &JobProcessor{ + return &JobManager{ cfg: cfg, - name: fmt.Sprintf("%v-job", cfg.Name), + dash: dash, managers: managers, - knownCommits: make(map[string]bool), - stop: stop, shutdownPending: shutdownPending, - dash: dash, - syzkallerRepo: cfg.SyzkallerRepo, - syzkallerBranch: cfg.SyzkallerBranch, }, nil } -func (jp *JobProcessor) loop() { - jobTicker := time.NewTicker(time.Duration(jp.cfg.JobPollPeriod) * time.Second) - commitTicker := time.NewTicker(time.Duration(jp.cfg.CommitPollPeriod) * time.Second) - defer jobTicker.Stop() +func (jm *JobManager) loop(stop chan struct{}) { + if err := jm.resetJobs(); err != nil { + if jm.dash != nil { + jm.dash.LogError("syz-ci", "reset jobs failed: %v", err) + } + return + } + commitTicker := time.NewTicker(time.Duration(jm.cfg.CommitPollPeriod) * time.Second) defer commitTicker.Stop() + jobTicker := time.NewTicker(time.Duration(jm.cfg.JobPollPeriod) * time.Second) + defer jobTicker.Stop() + var wg sync.WaitGroup + for main := true; ; main = false { + jp := &JobProcessor{ + JobManager: jm, + jobTicker: jobTicker.C, + } + if main { + jp.instanceSuffix = "-job" + jp.baseDir = osutil.Abs("jobs") + jp.commitTicker = commitTicker.C + jp.knownCommits = make(map[string]bool) + } else { + jp.instanceSuffix = "-job-parallel" + jp.baseDir = osutil.Abs("jobs-2") + // For now let's only parallelize patch testing requests. + jp.jobFilter = &ManagerJobs{TestPatches: true} + } + jp.name = fmt.Sprintf("%v%v", jm.cfg.Name, jp.instanceSuffix) + wg.Add(1) + go func() { + defer wg.Done() + jp.loop(stop) + }() + if main != jm.cfg.ParallelJobs { + break + } + } + wg.Wait() +} + +func (jm *JobManager) resetJobs() error { + managerNames := []string{} + for _, mgr := range jm.managers { + if mgr.mgrcfg.Jobs.AnyEnabled() { + managerNames = append(managerNames, mgr.name) + } + } + if len(managerNames) > 0 { + return jm.dash.JobReset(&dashapi.JobResetReq{Managers: managerNames}) + } + return nil +} + +func (jp *JobProcessor) loop(stop chan struct{}) { loop: for { // Check jp.stop separately first, otherwise if stop signal arrives during a job execution, // we can still grab the next job with 50% probability. select { - case <-jp.stop: + case <-stop: break loop default: } // Similar for commit polling: if we grab 2-3 bisect jobs in a row, // it can delay commit polling by days. select { - case <-commitTicker.C: + case <-jp.commitTicker: jp.pollCommits() default: } select { - case <-jobTicker.C: + case <-jp.jobTicker: if buildSem.Available() == 0 { // If normal kernel build is in progress (usually on start), don't query jobs. // Otherwise we claim a job, but can't start it for a while. continue loop } jp.pollJobs() - case <-commitTicker.C: + case <-jp.commitTicker: jp.pollCommits() - case <-jp.stop: + case <-stop: break loop } } @@ -174,7 +226,7 @@ func (jp *JobProcessor) pollManagerCommits(mgr *Manager) error { } func (jp *JobProcessor) pollRepo(mgr *Manager, URL, branch, reportEmail string) ([]*vcs.Commit, error) { - dir := osutil.Abs(filepath.Join("jobs", mgr.managercfg.TargetOS, "kernel")) + dir := filepath.Join(jp.baseDir, mgr.managercfg.TargetOS, "kernel") repo, err := vcs.NewRepo(mgr.managercfg.TargetOS, mgr.managercfg.Type, dir) if err != nil { return nil, fmt.Errorf("failed to create kernel repo: %v", err) @@ -186,7 +238,7 @@ func (jp *JobProcessor) pollRepo(mgr *Manager, URL, branch, reportEmail string) } func (jp *JobProcessor) getCommitInfo(mgr *Manager, URL, branch string, commits []string) ([]*vcs.Commit, error) { - dir := osutil.Abs(filepath.Join("jobs", mgr.managercfg.TargetOS, "kernel")) + dir := filepath.Join(jp.baseDir, mgr.managercfg.TargetOS, "kernel") repo, err := vcs.NewRepo(mgr.managercfg.TargetOS, mgr.managercfg.Type, dir) if err != nil { return nil, fmt.Errorf("failed to create kernel repo: %v", err) @@ -209,15 +261,17 @@ func (jp *JobProcessor) pollJobs() { Managers: make(map[string]dashapi.ManagerJobs), } for _, mgr := range jp.managers { - if !mgr.mgrcfg.Jobs.TestPatches && - !mgr.mgrcfg.Jobs.BisectCause && - !mgr.mgrcfg.Jobs.BisectFix { - continue + jobs := &mgr.mgrcfg.Jobs + if jp.jobFilter != nil { + jobs = jobs.Filter(jp.jobFilter) + } + apiJobs := dashapi.ManagerJobs{ + TestPatches: jobs.TestPatches, + BisectCause: jobs.BisectCause, + BisectFix: jobs.BisectFix, } - poll.Managers[mgr.name] = dashapi.ManagerJobs{ - TestPatches: mgr.mgrcfg.Jobs.TestPatches, - BisectCause: mgr.mgrcfg.Jobs.BisectCause, - BisectFix: mgr.mgrcfg.Jobs.BisectFix, + if apiJobs.TestPatches || apiJobs.BisectCause || apiJobs.BisectFix { + poll.Managers[mgr.name] = apiJobs } } if len(poll.Managers) == 0 { @@ -280,7 +334,7 @@ type Job struct { func (jp *JobProcessor) process(job *Job) *dashapi.JobDoneReq { req, mgr := job.req, job.mgr - dir := osutil.Abs(filepath.Join("jobs", mgr.managercfg.TargetOS)) + dir := filepath.Join(jp.baseDir, mgr.managercfg.TargetOS) mgrcfg := new(mgrconfig.Config) *mgrcfg = *mgr.managercfg mgrcfg.Workdir = filepath.Join(dir, "workdir") @@ -303,12 +357,12 @@ func (jp *JobProcessor) process(job *Job) *dashapi.JobDoneReq { job.resp = resp switch req.Type { case dashapi.JobTestPatch: - mgrcfg.Name += "-test-job" + mgrcfg.Name += "-test" + jp.instanceSuffix resp.Build.KernelRepo = req.KernelRepo resp.Build.KernelBranch = req.KernelBranch resp.Build.KernelCommit = "[unknown]" case dashapi.JobBisectCause, dashapi.JobBisectFix: - mgrcfg.Name += "-bisect-job" + mgrcfg.Name += "-bisect" + jp.instanceSuffix resp.Build.KernelRepo = mgr.mgrcfg.Repo resp.Build.KernelBranch = mgr.mgrcfg.Branch resp.Build.KernelCommit = req.KernelCommit @@ -351,10 +405,10 @@ func (jp *JobProcessor) process(job *Job) *dashapi.JobDoneReq { var err error switch req.Type { case dashapi.JobTestPatch: - mgrcfg.Name += "-test-job" + mgrcfg.Name += "-test" + jp.instanceSuffix err = jp.testPatch(job, mgrcfg) case dashapi.JobBisectCause, dashapi.JobBisectFix: - mgrcfg.Name += "-bisect-job" + mgrcfg.Name += "-bisect" + jp.instanceSuffix err = jp.bisect(job, mgrcfg) } if err != nil { @@ -424,7 +478,7 @@ func (jp *JobProcessor) bisect(job *Job, mgrcfg *mgrconfig.Config) error { Userspace: mgr.mgrcfg.Userspace, }, Syzkaller: bisect.SyzkallerConfig{ - Repo: jp.syzkallerRepo, + Repo: jp.cfg.SyzkallerRepo, Commit: req.SyzkallerCommit, }, Repro: bisect.ReproConfig{ @@ -498,7 +552,7 @@ func (jp *JobProcessor) testPatch(job *Job, mgrcfg *mgrconfig.Config) error { return err } log.Logf(0, "job: building syzkaller on %v...", req.SyzkallerCommit) - syzBuildLog, syzBuildErr := env.BuildSyzkaller(jp.syzkallerRepo, req.SyzkallerCommit) + syzBuildLog, syzBuildErr := env.BuildSyzkaller(jp.cfg.SyzkallerRepo, req.SyzkallerCommit) if syzBuildErr != nil { return syzBuildErr } |
