aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksandr Nogikh <nogikh@google.com>2025-04-16 17:18:35 +0200
committerAleksandr Nogikh <nogikh@google.com>2025-04-17 07:57:04 +0000
commit5b14b31251d4cc2e33eb313e29b037262c597679 (patch)
tree82ecbf770db13bfefffa65bf6b6a5c63021a4f98
parent94b0f8be5c512cccc2d1dbb405dab561b4d23940 (diff)
syz-cluster: clean up running steps of finished workflows
If the workflow step crashed or timed out, we used to have Running status for such steps even though the session itself may be long finished. In order to prevent this inconsistency, on finishing each session go through all remaining running steps and update their status to Error.
-rw-r--r--syz-cluster/controller/processor.go36
-rw-r--r--syz-cluster/controller/processor_test.go45
-rw-r--r--syz-cluster/pkg/db/session_test_repo.go20
3 files changed, 92 insertions, 9 deletions
diff --git a/syz-cluster/controller/processor.go b/syz-cluster/controller/processor.go
index 5e96f04c7..aa286a149 100644
--- a/syz-cluster/controller/processor.go
+++ b/syz-cluster/controller/processor.go
@@ -13,6 +13,7 @@ import (
"sync"
"time"
+ "github.com/google/syzkaller/syz-cluster/pkg/api"
"github.com/google/syzkaller/syz-cluster/pkg/app"
"github.com/google/syzkaller/syz-cluster/pkg/blob"
"github.com/google/syzkaller/syz-cluster/pkg/db"
@@ -24,6 +25,7 @@ type SeriesProcessor struct {
blobStorage blob.Storage
seriesRepo *db.SeriesRepository
sessionRepo *db.SessionRepository
+ sessionTestRepo *db.SessionTestRepository
workflows workflow.Service
dbPollInterval time.Duration
parallelWorkers int
@@ -46,6 +48,7 @@ func NewSeriesProcessor(env *app.AppEnvironment) *SeriesProcessor {
blobStorage: env.BlobStorage,
seriesRepo: db.NewSeriesRepository(env.Spanner),
sessionRepo: db.NewSessionRepository(env.Spanner),
+ sessionTestRepo: db.NewSessionTestRepository(env.Spanner),
dbPollInterval: time.Minute,
workflows: workflows,
parallelWorkers: parallelWorkers,
@@ -169,8 +172,12 @@ func (sp *SeriesProcessor) handleSession(ctx context.Context, session *db.Sessio
}
case workflow.StatusFinished, workflow.StatusFailed:
log.Printf("workflow for %q completed, mark the session finished", session.ID)
+ err := sp.stopRunningTests(ctx, session.ID)
+ if err != nil {
+ app.Errorf("failed to check running tests for %s: %v", session.ID, err)
+ }
// TODO: StatusFailed needs a different handling.
- err := sp.sessionRepo.Update(ctx, session.ID, func(session *db.Session) error {
+ err = sp.sessionRepo.Update(ctx, session.ID, func(session *db.Session) error {
session.SetFinishedAt(time.Now())
return nil
})
@@ -189,6 +196,33 @@ func (sp *SeriesProcessor) handleSession(ctx context.Context, session *db.Sessio
}
}
+// The session steps are expected to report that they are finished themselves.
+// If the workflow was aborted for some external reason (or the session step crashed/timed out),
+// the step may remain forever in the "Running" state.
+// Go through such steps and mark them as finished (with an error).
+func (sp *SeriesProcessor) stopRunningTests(ctx context.Context, sessionID string) error {
+ tests, err := sp.sessionTestRepo.BySessionRaw(ctx, sessionID)
+ if err != nil {
+ return fmt.Errorf("failed to query session steps: %w", err)
+ }
+ for _, test := range tests {
+ if test.Result != api.TestRunning {
+ continue
+ }
+ log.Printf("session %q is finished, but the test %q is running: marking it stopped",
+ sessionID, test.TestName)
+ err = sp.sessionTestRepo.InsertOrUpdate(ctx, test, func(entity *db.SessionTest) {
+ if entity.Result == api.TestRunning {
+ entity.Result = api.TestError
+ }
+ })
+ if err != nil {
+ return fmt.Errorf("failed to update the step %q: %w", test.TestName, err)
+ }
+ }
+ return nil
+}
+
func (sp *SeriesProcessor) updateSessionLog(ctx context.Context, session *db.Session, log []byte) error {
return sp.sessionRepo.Update(ctx, session.ID, func(session *db.Session) error {
if session.LogURI == "" {
diff --git a/syz-cluster/controller/processor_test.go b/syz-cluster/controller/processor_test.go
index f6f6abbbe..9f5e04d9b 100644
--- a/syz-cluster/controller/processor_test.go
+++ b/syz-cluster/controller/processor_test.go
@@ -80,6 +80,50 @@ func TestProcessor(t *testing.T) {
cancel()
}
+func TestFinishRunningSteps(t *testing.T) {
+ workflows := newMockedWorkflows()
+ processor, client, ctx := prepareProcessorTest(t, workflows)
+
+ // Start the loop.
+ var wg sync.WaitGroup
+ ctx2, cancel := context.WithCancel(ctx)
+ wg.Add(1)
+ go func() {
+ processor.Loop(ctx2)
+ wg.Done()
+ }()
+
+ series := &api.Series{
+ ExtID: "ext-id",
+ Title: "title",
+ }
+ _, sessionID := controller.UploadTestSeries(t, ctx, client, series)
+ buildResp := controller.UploadTestBuild(t, ctx, client, &api.Build{
+ Arch: "amd64",
+ TreeName: "mainline",
+ ConfigName: "config",
+ CommitHash: "abcd",
+ })
+ err := client.UploadTestResult(ctx, &api.TestResult{
+ SessionID: sessionID,
+ BaseBuildID: buildResp.ID,
+ TestName: "test",
+ Result: api.TestRunning,
+ })
+ assert.NoError(t, err)
+
+ // Let the workflow finish.
+ workflows.finish <- struct{}{}
+ awaitFinishedSessions(t, processor.seriesRepo, 1)
+ cancel()
+
+ // Verify that the session test is finished.
+ // A bit hacky, but it works.
+ list, err := processor.sessionTestRepo.BySessionRaw(ctx, sessionID)
+ assert.NoError(t, err)
+ assert.Equal(t, api.TestError, list[0].Result)
+}
+
func awaitFinishedSessions(t *testing.T, seriesRepo *db.SeriesRepository, wantFinished int) {
t.Logf("awaiting %d finished sessions", wantFinished)
deadline := time.Second * 2
@@ -149,6 +193,7 @@ func prepareProcessorTest(t *testing.T, workflows workflow.Service) (*SeriesProc
return &SeriesProcessor{
seriesRepo: db.NewSeriesRepository(env.Spanner),
sessionRepo: db.NewSessionRepository(env.Spanner),
+ sessionTestRepo: db.NewSessionTestRepository(env.Spanner),
workflows: workflows,
dbPollInterval: time.Second / 10,
parallelWorkers: 2,
diff --git a/syz-cluster/pkg/db/session_test_repo.go b/syz-cluster/pkg/db/session_test_repo.go
index e5b923fd4..7043b8389 100644
--- a/syz-cluster/pkg/db/session_test_repo.go
+++ b/syz-cluster/pkg/db/session_test_repo.go
@@ -85,14 +85,7 @@ type FullSessionTest struct {
}
func (repo *SessionTestRepository) BySession(ctx context.Context, sessionID string) ([]*FullSessionTest, error) {
- stmt := spanner.Statement{
- SQL: "SELECT * FROM `SessionTests` WHERE `SessionID` = @session" +
- " ORDER BY `UpdatedAt`",
- Params: map[string]interface{}{"session": sessionID},
- }
- iter := repo.client.Single().Query(ctx, stmt)
- defer iter.Stop()
- list, err := readEntities[SessionTest](iter)
+ list, err := repo.BySessionRaw(ctx, sessionID)
if err != nil {
return nil, err
}
@@ -131,3 +124,14 @@ func (repo *SessionTestRepository) BySession(ctx context.Context, sessionID stri
}
return ret, nil
}
+
+func (repo *SessionTestRepository) BySessionRaw(ctx context.Context, sessionID string) ([]*SessionTest, error) {
+ stmt := spanner.Statement{
+ SQL: "SELECT * FROM `SessionTests` WHERE `SessionID` = @session" +
+ " ORDER BY `UpdatedAt`",
+ Params: map[string]interface{}{"session": sessionID},
+ }
+ iter := repo.client.Single().Query(ctx, stmt)
+ defer iter.Stop()
+ return readEntities[SessionTest](iter)
+}