From 4bd62a18b32a876e5ef2b024e454ccf793849050 Mon Sep 17 00:00:00 2001 From: Aleksandr Nogikh Date: Fri, 23 Dec 2022 12:17:37 +0100 Subject: dashboard: support multiple simultaneous jobs Modify job_poll never to return already started jobs. Introduce a special job_reset API call to clear the "started" flag for the specified list of managers. --- dashboard/app/api.go | 12 +++++ dashboard/app/bisect_test.go | 8 +--- dashboard/app/entities.go | 11 +++-- dashboard/app/index.yaml | 1 + dashboard/app/jobs.go | 49 +++++++++++++++++++- dashboard/app/jobs_test.go | 105 +++++++++++++++++++++++++++++++++++++++++-- dashboard/app/main.go | 4 +- dashboard/dashapi/dashapi.go | 10 +++++ 8 files changed, 183 insertions(+), 17 deletions(-) (limited to 'dashboard') diff --git a/dashboard/app/api.go b/dashboard/app/api.go index 82b90fed8..926e7f90d 100644 --- a/dashboard/app/api.go +++ b/dashboard/app/api.go @@ -38,6 +38,7 @@ func initAPIHandlers() { var apiHandlers = map[string]APIHandler{ "log_error": apiLogError, "job_poll": apiJobPoll, + "job_reset": apiJobReset, "job_done": apiJobDone, "reporting_poll_bugs": apiReportingPollBugs, "reporting_poll_notifs": apiReportingPollNotifications, @@ -366,6 +367,7 @@ func apiJobPoll(c context.Context, r *http.Request, payload []byte) (interface{} return pollPendingJobs(c, req.Managers) } +// nolint: dupl func apiJobDone(c context.Context, r *http.Request, payload []byte) (interface{}, error) { req := new(dashapi.JobDoneReq) if err := json.Unmarshal(payload, req); err != nil { @@ -375,6 +377,16 @@ func apiJobDone(c context.Context, r *http.Request, payload []byte) (interface{} return nil, err } +// nolint: dupl +func apiJobReset(c context.Context, r *http.Request, payload []byte) (interface{}, error) { + req := new(dashapi.JobResetReq) + if err := json.Unmarshal(payload, req); err != nil { + return nil, fmt.Errorf("failed to unmarshal request: %v", err) + } + err := resetJobs(c, req) + return nil, err +} + func apiUploadBuild(c context.Context, ns string, r *http.Request, payload []byte) (interface{}, error) { req := new(dashapi.Build) if err := json.Unmarshal(payload, req); err != nil { diff --git a/dashboard/app/bisect_test.go b/dashboard/app/bisect_test.go index 2f4e821cd..2c345ffec 100644 --- a/dashboard/app/bisect_test.go +++ b/dashboard/app/bisect_test.go @@ -80,11 +80,6 @@ func TestBisectCause(t *testing.T) { "syncfs(3)")) c.expectEQ(pollResp.ReproC, []byte("int main() { return 3; }")) - // Since we did not reply, we should get the same response. - c.advanceTime(5 * 24 * time.Hour) - pollResp2 := c.client2.pollJobs(build.Manager) - c.expectEQ(pollResp, pollResp2) - // Bisection failed with an error. done := &dashapi.JobDoneReq{ ID: pollResp.ID, @@ -95,6 +90,7 @@ func TestBisectCause(t *testing.T) { c.expectNoEmail() // BisectCause #2 + pollResp2 := pollResp pollResp = c.client2.pollJobs(build.Manager) c.expectNE(pollResp.ID, pollResp2.ID) c.expectEQ(pollResp.ReproOpts, []byte("repro opts 2")) @@ -316,8 +312,6 @@ https://goo.gl/tpsmEJ#testing-patches`, c.expectEQ(pollResp.Type, dashapi.JobBisectFix) c.expectEQ(pollResp.ReproOpts, []byte("repro opts 2")) c.advanceTime(5 * 24 * time.Hour) - pollResp2 = c.client2.pollJobs(build.Manager) - c.expectEQ(pollResp, pollResp2) done = &dashapi.JobDoneReq{ ID: pollResp.ID, Log: []byte("bisect log 2"), diff --git a/dashboard/app/entities.go b/dashboard/app/entities.go index 97d4c7012..1395f4346 100644 --- a/dashboard/app/entities.go +++ b/dashboard/app/entities.go @@ -328,9 +328,10 @@ type Job struct { Patch int64 // reference to Patch text entity KernelConfig int64 // reference to the kernel config entity - Attempts int // number of times we tried to execute this job - Started time.Time - Finished time.Time // if set, job is finished + Attempts int // number of times we tried to execute this job + IsRunning bool // the job might have been started, but never finished + LastStarted time.Time `datastore:"Started"` + Finished time.Time // if set, job is finished // Result of execution: CrashTitle string // if empty, we did not hit crash during testing @@ -345,6 +346,10 @@ type Job struct { Reported bool // have we reported result back to user? } +func (job *Job) IsFinished() bool { + return !job.Finished.IsZero() +} + type JobType int const ( diff --git a/dashboard/app/index.yaml b/dashboard/app/index.yaml index a60069178..fb9e7a378 100644 --- a/dashboard/app/index.yaml +++ b/dashboard/app/index.yaml @@ -177,6 +177,7 @@ indexes: - kind: Job properties: - name: Finished + - name: IsRunning - name: Attempts - name: Created diff --git a/dashboard/app/jobs.go b/dashboard/app/jobs.go index d94c08b09..6d2efc75a 100644 --- a/dashboard/app/jobs.go +++ b/dashboard/app/jobs.go @@ -548,7 +548,8 @@ func createJobResp(c context.Context, job *Job, jobKey *db.Key) (*dashapi.JobPol return nil } job.Attempts++ - job.Started = now + job.IsRunning = true + job.LastStarted = now if _, err := db.Put(c, jobKey, job); err != nil { return fmt.Errorf("job %v: failed to put: %v", jobID, err) } @@ -666,6 +667,48 @@ func gatherCrashTitles(req *dashapi.JobDoneReq) []string { return ret } +// resetJobs is called to indicate that, for the specified managers, all started jobs are no longer +// in progress. +func resetJobs(c context.Context, req *dashapi.JobResetReq) error { + var jobs []*Job + keys, err := db.NewQuery("Job"). + Filter("Finished=", time.Time{}). + Filter("IsRunning=", true). + GetAll(c, &jobs) + if err != nil { + return err + } + managerMap := map[string]bool{} + for _, name := range req.Managers { + managerMap[name] = true + } + for idx, job := range jobs { + if !managerMap[job.Manager] { + continue + } + jobKey := keys[idx] + tx := func(c context.Context) error { + job = new(Job) + if err := db.Get(c, jobKey, job); err != nil { + return fmt.Errorf("job %v: failed to get in tx: %v", jobKey, err) + } + if job.IsFinished() { + // Just in case. + return nil + } + job.IsRunning = false + if _, err := db.Put(c, jobKey, job); err != nil { + return fmt.Errorf("job %v: failed to put: %v", jobKey, err) + } + return nil + } + if err := db.RunInTransaction(c, tx, nil); err != nil { + return err + } + } + return nil +} + // doneJob is called by syz-ci to mark completion of a job. func doneJob(c context.Context, req *dashapi.JobDoneReq) error { jobID := req.ID @@ -734,6 +777,7 @@ func doneJob(c context.Context, req *dashapi.JobDoneReq) error { job.BuildID = req.Build.ID job.CrashTitle = req.CrashTitle job.Finished = now + job.IsRunning = false job.Flags = JobFlags(req.Flags) if job.Type == JobBisectCause || job.Type == JobBisectFix { // Update bug.BisectCause/Fix status and also remember current bug reporting to send results. @@ -1066,6 +1110,7 @@ func loadPendingJob(c context.Context, managers map[string]dashapi.ManagerJobs) var jobs []*Job keys, err := db.NewQuery("Job"). Filter("Finished=", time.Time{}). + Filter("IsRunning=", false). Order("Attempts"). Order("Created"). GetAll(c, &jobs) @@ -1089,7 +1134,7 @@ func loadPendingJob(c context.Context, managers map[string]dashapi.ManagerJobs) // and protects from bisection job crashing syz-ci. const bisectRepeat = 3 * 24 * time.Hour if timeSince(c, job.Created) < bisectRepeat || - timeSince(c, job.Started) < bisectRepeat { + timeSince(c, job.LastStarted) < bisectRepeat { continue } default: diff --git a/dashboard/app/jobs_test.go b/dashboard/app/jobs_test.go index 1b5c9603e..a473bc03b 100644 --- a/dashboard/app/jobs_test.go +++ b/dashboard/app/jobs_test.go @@ -12,6 +12,7 @@ import ( "github.com/google/syzkaller/dashboard/dashapi" "github.com/google/syzkaller/pkg/email" + "github.com/stretchr/testify/assert" db "google.golang.org/appengine/v2/datastore" ) @@ -113,9 +114,6 @@ func TestJob(t *testing.T) { "repro syz")) c.expectEQ(pollResp.ReproC, []byte("repro C")) - pollResp2 := client.pollJobs(build.Manager) - c.expectEQ(pollResp2, pollResp) - jobDoneReq := &dashapi.JobDoneReq{ ID: pollResp.ID, Build: *build, @@ -1001,3 +999,104 @@ func TestExternalPatchCompletion(t *testing.T) { c.expectEQ(pollResp.KernelRepo, build.KernelRepo) c.expectEQ(pollResp.KernelBranch, build.KernelBranch) } + +func TestParallelJobs(t *testing.T) { + c := NewCtx(t) + defer c.Close() + + client := c.client + + build := testBuild(1) + client.UploadBuild(build) + + crash := testCrash(build, 2) + crash.Title = testErrorTitle + client.ReportCrash(crash) + + // Confirm the report. + reports, err := client.ReportingPollBugs("test") + origReport := reports.Reports[0] + c.expectOK(err) + c.expectEQ(len(reports.Reports), 1) + + reply, _ := client.ReportingUpdate(&dashapi.BugUpdate{ + ID: origReport.ID, + Status: dashapi.BugStatusOpen, + }) + client.expectEQ(reply.Error, false) + client.expectEQ(reply.OK, true) + + // Create a patch testing job. + const ( + repo1 = "git://git.git/git1.git" + repo2 = "git://git.git/git2.git" + ) + testPatchReq := &dashapi.TestPatchRequest{ + BugID: origReport.ID, + Link: "http://some-link.com/", + User: "developer@kernel.org", + Branch: "kernel-branch", + Repo: repo1, + Patch: []byte(sampleGitPatch), + } + ret, err := client.NewTestJob(testPatchReq) + c.expectOK(err) + c.expectEQ(ret.ErrorText, "") + + // Make sure the job will be passed to the job processor. + pollResp := client.pollJobs(build.Manager) + c.expectEQ(pollResp.Type, dashapi.JobTestPatch) + c.expectEQ(pollResp.KernelRepo, repo1) + + // This job is already taken, there are no other jobs. + emptyPollResp := client.pollJobs(build.Manager) + c.expectEQ(emptyPollResp, &dashapi.JobPollResp{}) + + // Create another job. + testPatchReq.Repo = repo2 + ret, err = client.NewTestJob(testPatchReq) + c.expectOK(err) + c.expectEQ(ret.ErrorText, "") + + // Make sure the new job will be passed to the job processor. + pollResp = client.pollJobs(build.Manager) + c.expectEQ(pollResp.Type, dashapi.JobTestPatch) + c.expectEQ(pollResp.KernelRepo, repo2) + + // .. and then there'll be no other jobs. + emptyPollResp = client.pollJobs(build.Manager) + c.expectEQ(emptyPollResp, &dashapi.JobPollResp{}) + + // Emulate a syz-ci restart. + client.JobReset(&dashapi.JobResetReq{Managers: []string{build.Manager}}) + + // .. and re-query both jobs. + repos := []string{} + for i := 0; i < 2; i++ { + pollResp = client.pollJobs(build.Manager) + c.expectEQ(pollResp.Type, dashapi.JobTestPatch) + repos = append(repos, pollResp.KernelRepo) + } + assert.ElementsMatch(t, repos, []string{repo1, repo2}, "two patch testing requests are expected") + + // .. but nothing else is to be expected. + emptyPollResp = client.pollJobs(build.Manager) + c.expectEQ(emptyPollResp, &dashapi.JobPollResp{}) + + // Emulate the job's completion. + build2 := testBuild(2) + jobDoneReq := &dashapi.JobDoneReq{ + ID: pollResp.ID, + Build: *build2, + CrashTitle: "test crash title", + CrashLog: []byte("test crash log"), + CrashReport: []byte("test crash report"), + } + err = client.JobDone(jobDoneReq) + c.expectOK(err) + client.pollBugs(1) + + // .. and make sure it doesn't appear again. + emptyPollResp = client.pollJobs(build.Manager) + c.expectEQ(emptyPollResp, &dashapi.JobPollResp{}) +} diff --git a/dashboard/app/main.go b/dashboard/app/main.go index 1448796c6..1dab67cf5 100644 --- a/dashboard/app/main.go +++ b/dashboard/app/main.go @@ -1542,7 +1542,7 @@ func makeUIJob(job *Job, jobKey *db.Key, bug *Bug, crash *Crash, build *Build) * KernelCommitLink: vcs.CommitLink(kernelRepo, kernelCommit), PatchLink: textLink(textPatch, job.Patch), Attempts: job.Attempts, - Started: job.Started, + Started: job.LastStarted, Finished: job.Finished, CrashTitle: job.CrashTitle, CrashLogLink: textLink(textCrashLog, job.CrashLog), @@ -1552,7 +1552,7 @@ func makeUIJob(job *Job, jobKey *db.Key, bug *Bug, crash *Crash, build *Build) * Reported: job.Reported, } if !job.Finished.IsZero() { - ui.Duration = job.Finished.Sub(job.Started) + ui.Duration = job.Finished.Sub(job.LastStarted) } if job.Type == JobBisectCause || job.Type == JobBisectFix { // We don't report these yet (or at all), see pollCompletedJobs. diff --git a/dashboard/dashapi/dashapi.go b/dashboard/dashapi/dashapi.go index 348fe92c5..4c46d1582 100644 --- a/dashboard/dashapi/dashapi.go +++ b/dashboard/dashapi/dashapi.go @@ -147,6 +147,8 @@ func (dash *Dashboard) BuilderPoll(manager string) (*BuilderPollResp, error) { } // Jobs workflow: +// - syz-ci sends JobResetReq to indicate that no previously started jobs +// are any longer in progress. // - syz-ci sends JobPollReq periodically to check for new jobs, // request contains list of managers that this syz-ci runs. // - dashboard replies with JobPollResp that contains job details, @@ -155,6 +157,10 @@ func (dash *Dashboard) BuilderPoll(manager string) (*BuilderPollResp, error) { // job execution result (Build, Crash or Error details), // ID must match JobPollResp.ID. +type JobResetReq struct { + Managers []string +} + type JobPollReq struct { Managers map[string]ManagerJobs } @@ -228,6 +234,10 @@ func (dash *Dashboard) JobDone(req *JobDoneReq) error { return dash.Query("job_done", req, nil) } +func (dash *Dashboard) JobReset(req *JobResetReq) error { + return dash.Query("job_reset", req, nil) +} + type BuildErrorReq struct { Build Build Crash Crash -- cgit mrf-deployment