diff options
| author | Aleksandr Nogikh <nogikh@google.com> | 2025-04-16 17:18:35 +0200 |
|---|---|---|
| committer | Aleksandr Nogikh <nogikh@google.com> | 2025-04-17 07:57:04 +0000 |
| commit | 5b14b31251d4cc2e33eb313e29b037262c597679 (patch) | |
| tree | 82ecbf770db13bfefffa65bf6b6a5c63021a4f98 /syz-cluster/controller/processor.go | |
| parent | 94b0f8be5c512cccc2d1dbb405dab561b4d23940 (diff) | |
syz-cluster: clean up running steps of finished workflows
If the workflow step crashed or timed out, we used to have Running
status for such steps even though the session itself may be long
finished.
In order to prevent this inconsistency, on finishing each session go
through all remaining running steps and update their status to Error.
Diffstat (limited to 'syz-cluster/controller/processor.go')
| -rw-r--r-- | syz-cluster/controller/processor.go | 36 |
1 files changed, 35 insertions, 1 deletions
diff --git a/syz-cluster/controller/processor.go b/syz-cluster/controller/processor.go index 5e96f04c7..aa286a149 100644 --- a/syz-cluster/controller/processor.go +++ b/syz-cluster/controller/processor.go @@ -13,6 +13,7 @@ import ( "sync" "time" + "github.com/google/syzkaller/syz-cluster/pkg/api" "github.com/google/syzkaller/syz-cluster/pkg/app" "github.com/google/syzkaller/syz-cluster/pkg/blob" "github.com/google/syzkaller/syz-cluster/pkg/db" @@ -24,6 +25,7 @@ type SeriesProcessor struct { blobStorage blob.Storage seriesRepo *db.SeriesRepository sessionRepo *db.SessionRepository + sessionTestRepo *db.SessionTestRepository workflows workflow.Service dbPollInterval time.Duration parallelWorkers int @@ -46,6 +48,7 @@ func NewSeriesProcessor(env *app.AppEnvironment) *SeriesProcessor { blobStorage: env.BlobStorage, seriesRepo: db.NewSeriesRepository(env.Spanner), sessionRepo: db.NewSessionRepository(env.Spanner), + sessionTestRepo: db.NewSessionTestRepository(env.Spanner), dbPollInterval: time.Minute, workflows: workflows, parallelWorkers: parallelWorkers, @@ -169,8 +172,12 @@ func (sp *SeriesProcessor) handleSession(ctx context.Context, session *db.Sessio } case workflow.StatusFinished, workflow.StatusFailed: log.Printf("workflow for %q completed, mark the session finished", session.ID) + err := sp.stopRunningTests(ctx, session.ID) + if err != nil { + app.Errorf("failed to check running tests for %s: %v", session.ID, err) + } // TODO: StatusFailed needs a different handling. - err := sp.sessionRepo.Update(ctx, session.ID, func(session *db.Session) error { + err = sp.sessionRepo.Update(ctx, session.ID, func(session *db.Session) error { session.SetFinishedAt(time.Now()) return nil }) @@ -189,6 +196,33 @@ func (sp *SeriesProcessor) handleSession(ctx context.Context, session *db.Sessio } } +// The session steps are expected to report that they are finished themselves. +// If the workflow was aborted for some external reason (or the session step crashed/timed out), +// the step may remain forever in the "Running" state. +// Go through such steps and mark them as finished (with an error). +func (sp *SeriesProcessor) stopRunningTests(ctx context.Context, sessionID string) error { + tests, err := sp.sessionTestRepo.BySessionRaw(ctx, sessionID) + if err != nil { + return fmt.Errorf("failed to query session steps: %w", err) + } + for _, test := range tests { + if test.Result != api.TestRunning { + continue + } + log.Printf("session %q is finished, but the test %q is running: marking it stopped", + sessionID, test.TestName) + err = sp.sessionTestRepo.InsertOrUpdate(ctx, test, func(entity *db.SessionTest) { + if entity.Result == api.TestRunning { + entity.Result = api.TestError + } + }) + if err != nil { + return fmt.Errorf("failed to update the step %q: %w", test.TestName, err) + } + } + return nil +} + func (sp *SeriesProcessor) updateSessionLog(ctx context.Context, session *db.Session, log []byte) error { return sp.sessionRepo.Update(ctx, session.ID, func(session *db.Session) error { if session.LogURI == "" { |
