1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
|
// Copyright 2024 syzkaller project authors. All rights reserved.
// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
package main
import (
"bytes"
"context"
"fmt"
"log"
"sync"
"time"
"github.com/google/syzkaller/syz-cluster/pkg/api"
"github.com/google/syzkaller/syz-cluster/pkg/app"
"github.com/google/syzkaller/syz-cluster/pkg/blob"
"github.com/google/syzkaller/syz-cluster/pkg/db"
"github.com/google/syzkaller/syz-cluster/pkg/workflow"
"golang.org/x/sync/errgroup"
)
type SeriesProcessor struct {
blobStorage blob.Storage
seriesRepo *db.SeriesRepository
sessionRepo *db.SessionRepository
sessionTestRepo *db.SessionTestRepository
workflows workflow.Service
dbPollInterval time.Duration
parallelWorkflows int
}
func NewSeriesProcessor(env *app.AppEnvironment, cfg *app.AppConfig) *SeriesProcessor {
workflows, err := workflow.NewArgoService()
if err != nil {
app.Fatalf("failed to initialize workflows: %v", err)
}
return &SeriesProcessor{
blobStorage: env.BlobStorage,
seriesRepo: db.NewSeriesRepository(env.Spanner),
sessionRepo: db.NewSessionRepository(env.Spanner),
sessionTestRepo: db.NewSessionTestRepository(env.Spanner),
dbPollInterval: time.Minute,
workflows: workflows,
parallelWorkflows: cfg.ParallelWorkflows,
}
}
func (sp *SeriesProcessor) Loop(ctx context.Context) error {
var wg sync.WaitGroup
defer wg.Wait()
ch := make(chan *db.Session, 1)
wg.Add(1)
go func() {
defer wg.Done()
sp.seriesRunner(ctx, ch)
}()
// First pick up the previously running sessions.
activeSessions, err := sp.sessionRepo.ListRunning(ctx)
if err != nil {
return err
}
log.Printf("queried %d unfinished sessions", len(activeSessions))
for _, session := range activeSessions {
ch <- session
}
// Then, monitor the DB for the new series.
wg.Add(1)
go func() {
defer wg.Done()
sp.streamSeries(ctx, ch)
close(ch)
}()
return nil
}
func (sp *SeriesProcessor) streamSeries(ctx context.Context, ch chan<- *db.Session) {
var next *db.NextSession
for {
select {
case <-ctx.Done():
return
case <-time.After(sp.dbPollInterval):
}
if len(ch) > 0 {
// There are still series to be picked, no need to query the DB.
continue
}
var err error
var list []*db.Session
list, next, err = sp.sessionRepo.ListWaiting(ctx, next, cap(ch))
if err != nil {
app.Errorf("failed to query series: %v", err)
continue
}
for _, session := range list {
ch <- session
}
}
}
func (sp *SeriesProcessor) seriesRunner(ctx context.Context, ch <-chan *db.Session) {
var eg errgroup.Group
defer eg.Wait()
eg.SetLimit(sp.parallelWorkflows)
for {
var session *db.Session
select {
case session = <-ch:
if session == nil {
return
}
case <-ctx.Done():
return
}
log.Printf("scheduled session %q for series %q", session.ID, session.SeriesID)
eg.Go(func() error {
log.Printf("started processing session %q", session.ID)
sp.handleSession(ctx, session)
log.Printf("finished processing session %q", session.ID)
return nil
})
}
}
func (sp *SeriesProcessor) handleSession(ctx context.Context, session *db.Session) {
// TODO: set some sane deadline or just track indefinitely?
pollPeriod := sp.workflows.PollPeriod()
for {
select {
case <-time.After(pollPeriod):
case <-ctx.Done():
return
}
status, workflowLog, err := sp.workflows.Status(session.ID)
if err != nil {
app.Errorf("failed to query workflow %q status: %v", session.ID, err)
continue
}
if len(workflowLog) > 0 {
err := sp.updateSessionLog(ctx, session, workflowLog)
if err != nil {
app.Errorf("failed to update session log: %v", err)
}
}
switch status {
case workflow.StatusNotFound:
log.Printf("scheduling a workflow for %q", session.ID)
err := sp.sessionRepo.Start(ctx, session.ID)
if err == db.ErrSessionAlreadyStarted {
// It may happen if the service was restarted right between the moment we updated the DB
// and actually started the workflow.
log.Printf("session %q was already marked as started, but there's no actual workflow", session.ID)
} else if err != nil {
app.Errorf("failed to mark session started: %v", err)
break
}
err = sp.workflows.Start(session.ID)
if err != nil {
app.Errorf("failed to start a workflow: %v", err)
}
case workflow.StatusFinished, workflow.StatusFailed:
log.Printf("workflow for %q completed (status=%q), mark the session finished", session.ID, status)
err := sp.stopRunningTests(ctx, session.ID)
if err != nil {
app.Errorf("failed to check running tests for %s: %v", session.ID, err)
}
// TODO: StatusFailed needs a different handling.
err = sp.sessionRepo.Update(ctx, session.ID, func(session *db.Session) error {
session.SetFinishedAt(time.Now())
return nil
})
if err == nil {
// Nothing to do here anymore.
return
}
// Let's hope the error was transient.
app.Errorf("failed to update session %q: %v", session.ID, err)
case workflow.StatusRunning:
// Let's keep on tracking.
continue
default:
panic("unexpected workflow status: " + status)
}
}
}
// The session steps are expected to report that they are finished themselves.
// If the workflow was aborted for some external reason (or the session step crashed/timed out),
// the step may remain forever in the "Running" state.
// Go through such steps and mark them as finished (with an error).
func (sp *SeriesProcessor) stopRunningTests(ctx context.Context, sessionID string) error {
tests, err := sp.sessionTestRepo.BySessionRaw(ctx, sessionID)
if err != nil {
return fmt.Errorf("failed to query session steps: %w", err)
}
for _, test := range tests {
if test.Result != api.TestRunning {
continue
}
log.Printf("session %q is finished, but the test %q is running: marking it stopped",
sessionID, test.TestName)
err = sp.sessionTestRepo.InsertOrUpdate(ctx, test, func(entity *db.SessionTest) {
if entity.Result == api.TestRunning {
entity.Result = api.TestError
}
})
if err != nil {
return fmt.Errorf("failed to update the step %q: %w", test.TestName, err)
}
}
return nil
}
func (sp *SeriesProcessor) updateSessionLog(ctx context.Context, session *db.Session, log []byte) error {
logURI, err := sp.blobStorage.Write(bytes.NewReader(log), "Session", session.ID, "log")
if err != nil {
return fmt.Errorf("failed to save the log: %w", err)
}
return sp.sessionRepo.Update(ctx, session.ID, func(session *db.Session) error {
session.LogURI = logURI
return nil
})
}
|