From 5b14b31251d4cc2e33eb313e29b037262c597679 Mon Sep 17 00:00:00 2001 From: Aleksandr Nogikh Date: Wed, 16 Apr 2025 17:18:35 +0200 Subject: syz-cluster: clean up running steps of finished workflows If the workflow step crashed or timed out, we used to have Running status for such steps even though the session itself may be long finished. In order to prevent this inconsistency, on finishing each session go through all remaining running steps and update their status to Error. --- syz-cluster/controller/processor.go | 36 +++++++++++++++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) (limited to 'syz-cluster/controller/processor.go') diff --git a/syz-cluster/controller/processor.go b/syz-cluster/controller/processor.go index 5e96f04c7..aa286a149 100644 --- a/syz-cluster/controller/processor.go +++ b/syz-cluster/controller/processor.go @@ -13,6 +13,7 @@ import ( "sync" "time" + "github.com/google/syzkaller/syz-cluster/pkg/api" "github.com/google/syzkaller/syz-cluster/pkg/app" "github.com/google/syzkaller/syz-cluster/pkg/blob" "github.com/google/syzkaller/syz-cluster/pkg/db" @@ -24,6 +25,7 @@ type SeriesProcessor struct { blobStorage blob.Storage seriesRepo *db.SeriesRepository sessionRepo *db.SessionRepository + sessionTestRepo *db.SessionTestRepository workflows workflow.Service dbPollInterval time.Duration parallelWorkers int @@ -46,6 +48,7 @@ func NewSeriesProcessor(env *app.AppEnvironment) *SeriesProcessor { blobStorage: env.BlobStorage, seriesRepo: db.NewSeriesRepository(env.Spanner), sessionRepo: db.NewSessionRepository(env.Spanner), + sessionTestRepo: db.NewSessionTestRepository(env.Spanner), dbPollInterval: time.Minute, workflows: workflows, parallelWorkers: parallelWorkers, @@ -169,8 +172,12 @@ func (sp *SeriesProcessor) handleSession(ctx context.Context, session *db.Sessio } case workflow.StatusFinished, workflow.StatusFailed: log.Printf("workflow for %q completed, mark the session finished", session.ID) + err := sp.stopRunningTests(ctx, session.ID) + if err != nil { + app.Errorf("failed to check running tests for %s: %v", session.ID, err) + } // TODO: StatusFailed needs a different handling. - err := sp.sessionRepo.Update(ctx, session.ID, func(session *db.Session) error { + err = sp.sessionRepo.Update(ctx, session.ID, func(session *db.Session) error { session.SetFinishedAt(time.Now()) return nil }) @@ -189,6 +196,33 @@ func (sp *SeriesProcessor) handleSession(ctx context.Context, session *db.Sessio } } +// The session steps are expected to report that they are finished themselves. +// If the workflow was aborted for some external reason (or the session step crashed/timed out), +// the step may remain forever in the "Running" state. +// Go through such steps and mark them as finished (with an error). +func (sp *SeriesProcessor) stopRunningTests(ctx context.Context, sessionID string) error { + tests, err := sp.sessionTestRepo.BySessionRaw(ctx, sessionID) + if err != nil { + return fmt.Errorf("failed to query session steps: %w", err) + } + for _, test := range tests { + if test.Result != api.TestRunning { + continue + } + log.Printf("session %q is finished, but the test %q is running: marking it stopped", + sessionID, test.TestName) + err = sp.sessionTestRepo.InsertOrUpdate(ctx, test, func(entity *db.SessionTest) { + if entity.Result == api.TestRunning { + entity.Result = api.TestError + } + }) + if err != nil { + return fmt.Errorf("failed to update the step %q: %w", test.TestName, err) + } + } + return nil +} + func (sp *SeriesProcessor) updateSessionLog(ctx context.Context, session *db.Session, log []byte) error { return sp.sessionRepo.Update(ctx, session.ID, func(session *db.Session) error { if session.LogURI == "" { -- cgit mrf-deployment