aboutsummaryrefslogtreecommitdiffstats
path: root/syz-cluster/controller/processor_test.go
diff options
context:
space:
mode:
authorAleksandr Nogikh <nogikh@google.com>2025-04-16 17:18:35 +0200
committerAleksandr Nogikh <nogikh@google.com>2025-04-17 07:57:04 +0000
commit5b14b31251d4cc2e33eb313e29b037262c597679 (patch)
tree82ecbf770db13bfefffa65bf6b6a5c63021a4f98 /syz-cluster/controller/processor_test.go
parent94b0f8be5c512cccc2d1dbb405dab561b4d23940 (diff)
syz-cluster: clean up running steps of finished workflows
If the workflow step crashed or timed out, we used to have Running status for such steps even though the session itself may be long finished. In order to prevent this inconsistency, on finishing each session go through all remaining running steps and update their status to Error.
Diffstat (limited to 'syz-cluster/controller/processor_test.go')
-rw-r--r--syz-cluster/controller/processor_test.go45
1 files changed, 45 insertions, 0 deletions
diff --git a/syz-cluster/controller/processor_test.go b/syz-cluster/controller/processor_test.go
index f6f6abbbe..9f5e04d9b 100644
--- a/syz-cluster/controller/processor_test.go
+++ b/syz-cluster/controller/processor_test.go
@@ -80,6 +80,50 @@ func TestProcessor(t *testing.T) {
cancel()
}
+func TestFinishRunningSteps(t *testing.T) {
+ workflows := newMockedWorkflows()
+ processor, client, ctx := prepareProcessorTest(t, workflows)
+
+ // Start the loop.
+ var wg sync.WaitGroup
+ ctx2, cancel := context.WithCancel(ctx)
+ wg.Add(1)
+ go func() {
+ processor.Loop(ctx2)
+ wg.Done()
+ }()
+
+ series := &api.Series{
+ ExtID: "ext-id",
+ Title: "title",
+ }
+ _, sessionID := controller.UploadTestSeries(t, ctx, client, series)
+ buildResp := controller.UploadTestBuild(t, ctx, client, &api.Build{
+ Arch: "amd64",
+ TreeName: "mainline",
+ ConfigName: "config",
+ CommitHash: "abcd",
+ })
+ err := client.UploadTestResult(ctx, &api.TestResult{
+ SessionID: sessionID,
+ BaseBuildID: buildResp.ID,
+ TestName: "test",
+ Result: api.TestRunning,
+ })
+ assert.NoError(t, err)
+
+ // Let the workflow finish.
+ workflows.finish <- struct{}{}
+ awaitFinishedSessions(t, processor.seriesRepo, 1)
+ cancel()
+
+ // Verify that the session test is finished.
+ // A bit hacky, but it works.
+ list, err := processor.sessionTestRepo.BySessionRaw(ctx, sessionID)
+ assert.NoError(t, err)
+ assert.Equal(t, api.TestError, list[0].Result)
+}
+
func awaitFinishedSessions(t *testing.T, seriesRepo *db.SeriesRepository, wantFinished int) {
t.Logf("awaiting %d finished sessions", wantFinished)
deadline := time.Second * 2
@@ -149,6 +193,7 @@ func prepareProcessorTest(t *testing.T, workflows workflow.Service) (*SeriesProc
return &SeriesProcessor{
seriesRepo: db.NewSeriesRepository(env.Spanner),
sessionRepo: db.NewSessionRepository(env.Spanner),
+ sessionTestRepo: db.NewSessionTestRepository(env.Spanner),
workflows: workflows,
dbPollInterval: time.Second / 10,
parallelWorkers: 2,