aboutsummaryrefslogtreecommitdiffstats
path: root/syz-cluster/controller/processor.go
diff options
context:
space:
mode:
authorAleksandr Nogikh <nogikh@google.com>2025-04-16 17:18:35 +0200
committerAleksandr Nogikh <nogikh@google.com>2025-04-17 07:57:04 +0000
commit5b14b31251d4cc2e33eb313e29b037262c597679 (patch)
tree82ecbf770db13bfefffa65bf6b6a5c63021a4f98 /syz-cluster/controller/processor.go
parent94b0f8be5c512cccc2d1dbb405dab561b4d23940 (diff)
syz-cluster: clean up running steps of finished workflows
If the workflow step crashed or timed out, we used to have Running status for such steps even though the session itself may be long finished. In order to prevent this inconsistency, on finishing each session go through all remaining running steps and update their status to Error.
Diffstat (limited to 'syz-cluster/controller/processor.go')
-rw-r--r--syz-cluster/controller/processor.go36
1 files changed, 35 insertions, 1 deletions
diff --git a/syz-cluster/controller/processor.go b/syz-cluster/controller/processor.go
index 5e96f04c7..aa286a149 100644
--- a/syz-cluster/controller/processor.go
+++ b/syz-cluster/controller/processor.go
@@ -13,6 +13,7 @@ import (
"sync"
"time"
+ "github.com/google/syzkaller/syz-cluster/pkg/api"
"github.com/google/syzkaller/syz-cluster/pkg/app"
"github.com/google/syzkaller/syz-cluster/pkg/blob"
"github.com/google/syzkaller/syz-cluster/pkg/db"
@@ -24,6 +25,7 @@ type SeriesProcessor struct {
blobStorage blob.Storage
seriesRepo *db.SeriesRepository
sessionRepo *db.SessionRepository
+ sessionTestRepo *db.SessionTestRepository
workflows workflow.Service
dbPollInterval time.Duration
parallelWorkers int
@@ -46,6 +48,7 @@ func NewSeriesProcessor(env *app.AppEnvironment) *SeriesProcessor {
blobStorage: env.BlobStorage,
seriesRepo: db.NewSeriesRepository(env.Spanner),
sessionRepo: db.NewSessionRepository(env.Spanner),
+ sessionTestRepo: db.NewSessionTestRepository(env.Spanner),
dbPollInterval: time.Minute,
workflows: workflows,
parallelWorkers: parallelWorkers,
@@ -169,8 +172,12 @@ func (sp *SeriesProcessor) handleSession(ctx context.Context, session *db.Sessio
}
case workflow.StatusFinished, workflow.StatusFailed:
log.Printf("workflow for %q completed, mark the session finished", session.ID)
+ err := sp.stopRunningTests(ctx, session.ID)
+ if err != nil {
+ app.Errorf("failed to check running tests for %s: %v", session.ID, err)
+ }
// TODO: StatusFailed needs a different handling.
- err := sp.sessionRepo.Update(ctx, session.ID, func(session *db.Session) error {
+ err = sp.sessionRepo.Update(ctx, session.ID, func(session *db.Session) error {
session.SetFinishedAt(time.Now())
return nil
})
@@ -189,6 +196,33 @@ func (sp *SeriesProcessor) handleSession(ctx context.Context, session *db.Sessio
}
}
+// The session steps are expected to report that they are finished themselves.
+// If the workflow was aborted for some external reason (or the session step crashed/timed out),
+// the step may remain forever in the "Running" state.
+// Go through such steps and mark them as finished (with an error).
+func (sp *SeriesProcessor) stopRunningTests(ctx context.Context, sessionID string) error {
+ tests, err := sp.sessionTestRepo.BySessionRaw(ctx, sessionID)
+ if err != nil {
+ return fmt.Errorf("failed to query session steps: %w", err)
+ }
+ for _, test := range tests {
+ if test.Result != api.TestRunning {
+ continue
+ }
+ log.Printf("session %q is finished, but the test %q is running: marking it stopped",
+ sessionID, test.TestName)
+ err = sp.sessionTestRepo.InsertOrUpdate(ctx, test, func(entity *db.SessionTest) {
+ if entity.Result == api.TestRunning {
+ entity.Result = api.TestError
+ }
+ })
+ if err != nil {
+ return fmt.Errorf("failed to update the step %q: %w", test.TestName, err)
+ }
+ }
+ return nil
+}
+
func (sp *SeriesProcessor) updateSessionLog(ctx context.Context, session *db.Session, log []byte) error {
return sp.sessionRepo.Update(ctx, session.ID, func(session *db.Session) error {
if session.LogURI == "" {