diff options
| author | Aleksandr Nogikh <nogikh@google.com> | 2025-04-16 17:18:35 +0200 |
|---|---|---|
| committer | Aleksandr Nogikh <nogikh@google.com> | 2025-04-17 07:57:04 +0000 |
| commit | 5b14b31251d4cc2e33eb313e29b037262c597679 (patch) | |
| tree | 82ecbf770db13bfefffa65bf6b6a5c63021a4f98 | |
| parent | 94b0f8be5c512cccc2d1dbb405dab561b4d23940 (diff) | |
syz-cluster: clean up running steps of finished workflows
If the workflow step crashed or timed out, we used to have Running
status for such steps even though the session itself may be long
finished.
In order to prevent this inconsistency, on finishing each session go
through all remaining running steps and update their status to Error.
| -rw-r--r-- | syz-cluster/controller/processor.go | 36 | ||||
| -rw-r--r-- | syz-cluster/controller/processor_test.go | 45 | ||||
| -rw-r--r-- | syz-cluster/pkg/db/session_test_repo.go | 20 |
3 files changed, 92 insertions, 9 deletions
diff --git a/syz-cluster/controller/processor.go b/syz-cluster/controller/processor.go index 5e96f04c7..aa286a149 100644 --- a/syz-cluster/controller/processor.go +++ b/syz-cluster/controller/processor.go @@ -13,6 +13,7 @@ import ( "sync" "time" + "github.com/google/syzkaller/syz-cluster/pkg/api" "github.com/google/syzkaller/syz-cluster/pkg/app" "github.com/google/syzkaller/syz-cluster/pkg/blob" "github.com/google/syzkaller/syz-cluster/pkg/db" @@ -24,6 +25,7 @@ type SeriesProcessor struct { blobStorage blob.Storage seriesRepo *db.SeriesRepository sessionRepo *db.SessionRepository + sessionTestRepo *db.SessionTestRepository workflows workflow.Service dbPollInterval time.Duration parallelWorkers int @@ -46,6 +48,7 @@ func NewSeriesProcessor(env *app.AppEnvironment) *SeriesProcessor { blobStorage: env.BlobStorage, seriesRepo: db.NewSeriesRepository(env.Spanner), sessionRepo: db.NewSessionRepository(env.Spanner), + sessionTestRepo: db.NewSessionTestRepository(env.Spanner), dbPollInterval: time.Minute, workflows: workflows, parallelWorkers: parallelWorkers, @@ -169,8 +172,12 @@ func (sp *SeriesProcessor) handleSession(ctx context.Context, session *db.Sessio } case workflow.StatusFinished, workflow.StatusFailed: log.Printf("workflow for %q completed, mark the session finished", session.ID) + err := sp.stopRunningTests(ctx, session.ID) + if err != nil { + app.Errorf("failed to check running tests for %s: %v", session.ID, err) + } // TODO: StatusFailed needs a different handling. - err := sp.sessionRepo.Update(ctx, session.ID, func(session *db.Session) error { + err = sp.sessionRepo.Update(ctx, session.ID, func(session *db.Session) error { session.SetFinishedAt(time.Now()) return nil }) @@ -189,6 +196,33 @@ func (sp *SeriesProcessor) handleSession(ctx context.Context, session *db.Sessio } } +// The session steps are expected to report that they are finished themselves. +// If the workflow was aborted for some external reason (or the session step crashed/timed out), +// the step may remain forever in the "Running" state. +// Go through such steps and mark them as finished (with an error). +func (sp *SeriesProcessor) stopRunningTests(ctx context.Context, sessionID string) error { + tests, err := sp.sessionTestRepo.BySessionRaw(ctx, sessionID) + if err != nil { + return fmt.Errorf("failed to query session steps: %w", err) + } + for _, test := range tests { + if test.Result != api.TestRunning { + continue + } + log.Printf("session %q is finished, but the test %q is running: marking it stopped", + sessionID, test.TestName) + err = sp.sessionTestRepo.InsertOrUpdate(ctx, test, func(entity *db.SessionTest) { + if entity.Result == api.TestRunning { + entity.Result = api.TestError + } + }) + if err != nil { + return fmt.Errorf("failed to update the step %q: %w", test.TestName, err) + } + } + return nil +} + func (sp *SeriesProcessor) updateSessionLog(ctx context.Context, session *db.Session, log []byte) error { return sp.sessionRepo.Update(ctx, session.ID, func(session *db.Session) error { if session.LogURI == "" { diff --git a/syz-cluster/controller/processor_test.go b/syz-cluster/controller/processor_test.go index f6f6abbbe..9f5e04d9b 100644 --- a/syz-cluster/controller/processor_test.go +++ b/syz-cluster/controller/processor_test.go @@ -80,6 +80,50 @@ func TestProcessor(t *testing.T) { cancel() } +func TestFinishRunningSteps(t *testing.T) { + workflows := newMockedWorkflows() + processor, client, ctx := prepareProcessorTest(t, workflows) + + // Start the loop. + var wg sync.WaitGroup + ctx2, cancel := context.WithCancel(ctx) + wg.Add(1) + go func() { + processor.Loop(ctx2) + wg.Done() + }() + + series := &api.Series{ + ExtID: "ext-id", + Title: "title", + } + _, sessionID := controller.UploadTestSeries(t, ctx, client, series) + buildResp := controller.UploadTestBuild(t, ctx, client, &api.Build{ + Arch: "amd64", + TreeName: "mainline", + ConfigName: "config", + CommitHash: "abcd", + }) + err := client.UploadTestResult(ctx, &api.TestResult{ + SessionID: sessionID, + BaseBuildID: buildResp.ID, + TestName: "test", + Result: api.TestRunning, + }) + assert.NoError(t, err) + + // Let the workflow finish. + workflows.finish <- struct{}{} + awaitFinishedSessions(t, processor.seriesRepo, 1) + cancel() + + // Verify that the session test is finished. + // A bit hacky, but it works. + list, err := processor.sessionTestRepo.BySessionRaw(ctx, sessionID) + assert.NoError(t, err) + assert.Equal(t, api.TestError, list[0].Result) +} + func awaitFinishedSessions(t *testing.T, seriesRepo *db.SeriesRepository, wantFinished int) { t.Logf("awaiting %d finished sessions", wantFinished) deadline := time.Second * 2 @@ -149,6 +193,7 @@ func prepareProcessorTest(t *testing.T, workflows workflow.Service) (*SeriesProc return &SeriesProcessor{ seriesRepo: db.NewSeriesRepository(env.Spanner), sessionRepo: db.NewSessionRepository(env.Spanner), + sessionTestRepo: db.NewSessionTestRepository(env.Spanner), workflows: workflows, dbPollInterval: time.Second / 10, parallelWorkers: 2, diff --git a/syz-cluster/pkg/db/session_test_repo.go b/syz-cluster/pkg/db/session_test_repo.go index e5b923fd4..7043b8389 100644 --- a/syz-cluster/pkg/db/session_test_repo.go +++ b/syz-cluster/pkg/db/session_test_repo.go @@ -85,14 +85,7 @@ type FullSessionTest struct { } func (repo *SessionTestRepository) BySession(ctx context.Context, sessionID string) ([]*FullSessionTest, error) { - stmt := spanner.Statement{ - SQL: "SELECT * FROM `SessionTests` WHERE `SessionID` = @session" + - " ORDER BY `UpdatedAt`", - Params: map[string]interface{}{"session": sessionID}, - } - iter := repo.client.Single().Query(ctx, stmt) - defer iter.Stop() - list, err := readEntities[SessionTest](iter) + list, err := repo.BySessionRaw(ctx, sessionID) if err != nil { return nil, err } @@ -131,3 +124,14 @@ func (repo *SessionTestRepository) BySession(ctx context.Context, sessionID stri } return ret, nil } + +func (repo *SessionTestRepository) BySessionRaw(ctx context.Context, sessionID string) ([]*SessionTest, error) { + stmt := spanner.Statement{ + SQL: "SELECT * FROM `SessionTests` WHERE `SessionID` = @session" + + " ORDER BY `UpdatedAt`", + Params: map[string]interface{}{"session": sessionID}, + } + iter := repo.client.Single().Query(ctx, stmt) + defer iter.Stop() + return readEntities[SessionTest](iter) +} |
