From 5b14b31251d4cc2e33eb313e29b037262c597679 Mon Sep 17 00:00:00 2001 From: Aleksandr Nogikh Date: Wed, 16 Apr 2025 17:18:35 +0200 Subject: syz-cluster: clean up running steps of finished workflows If the workflow step crashed or timed out, we used to have Running status for such steps even though the session itself may be long finished. In order to prevent this inconsistency, on finishing each session go through all remaining running steps and update their status to Error. --- syz-cluster/controller/processor_test.go | 45 ++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) (limited to 'syz-cluster/controller/processor_test.go') diff --git a/syz-cluster/controller/processor_test.go b/syz-cluster/controller/processor_test.go index f6f6abbbe..9f5e04d9b 100644 --- a/syz-cluster/controller/processor_test.go +++ b/syz-cluster/controller/processor_test.go @@ -80,6 +80,50 @@ func TestProcessor(t *testing.T) { cancel() } +func TestFinishRunningSteps(t *testing.T) { + workflows := newMockedWorkflows() + processor, client, ctx := prepareProcessorTest(t, workflows) + + // Start the loop. + var wg sync.WaitGroup + ctx2, cancel := context.WithCancel(ctx) + wg.Add(1) + go func() { + processor.Loop(ctx2) + wg.Done() + }() + + series := &api.Series{ + ExtID: "ext-id", + Title: "title", + } + _, sessionID := controller.UploadTestSeries(t, ctx, client, series) + buildResp := controller.UploadTestBuild(t, ctx, client, &api.Build{ + Arch: "amd64", + TreeName: "mainline", + ConfigName: "config", + CommitHash: "abcd", + }) + err := client.UploadTestResult(ctx, &api.TestResult{ + SessionID: sessionID, + BaseBuildID: buildResp.ID, + TestName: "test", + Result: api.TestRunning, + }) + assert.NoError(t, err) + + // Let the workflow finish. + workflows.finish <- struct{}{} + awaitFinishedSessions(t, processor.seriesRepo, 1) + cancel() + + // Verify that the session test is finished. + // A bit hacky, but it works. + list, err := processor.sessionTestRepo.BySessionRaw(ctx, sessionID) + assert.NoError(t, err) + assert.Equal(t, api.TestError, list[0].Result) +} + func awaitFinishedSessions(t *testing.T, seriesRepo *db.SeriesRepository, wantFinished int) { t.Logf("awaiting %d finished sessions", wantFinished) deadline := time.Second * 2 @@ -149,6 +193,7 @@ func prepareProcessorTest(t *testing.T, workflows workflow.Service) (*SeriesProc return &SeriesProcessor{ seriesRepo: db.NewSeriesRepository(env.Spanner), sessionRepo: db.NewSessionRepository(env.Spanner), + sessionTestRepo: db.NewSessionTestRepository(env.Spanner), workflows: workflows, dbPollInterval: time.Second / 10, parallelWorkers: 2, -- cgit mrf-deployment