diff options
| author | Aleksandr Nogikh <nogikh@google.com> | 2025-02-06 19:35:29 +0100 |
|---|---|---|
| committer | Aleksandr Nogikh <nogikh@google.com> | 2025-02-14 13:40:12 +0000 |
| commit | 5b082ecabf223d8f52f6bb9e4acd8ace0bffc902 (patch) | |
| tree | fec57f8b12bd6a583c68c87da9ef66d5c8e4de04 /syz-cluster/controller/processor.go | |
| parent | 1022af749615bfc4e3a305b891449d95d5fc050a (diff) | |
syz-cluster: report series/sessions via API
In the previous version of the code, series-tracker was directly pushing
patch series into the DB and the controller auto-created fuzzing
sessions.
Mediate these via the controller API instead.
Instead of creating Session objects on the fly, pre-create them and
let processor take them one by one.
The approach has multiple
benefits:
1) The same API might be used for the patch series sources other than
LKML.
2) If the existence of Session objects is not a sign that we have
started working on it, it allows for a more precise status display
(not created/waiting/running/finished).
3) We could manually push older patch series and manually trigger
fuzzing sessions to experimentally measure the bug detection rates.
4) The controller tests could be organized only by relying on the API
offered by the component.
Diffstat (limited to 'syz-cluster/controller/processor.go')
| -rw-r--r-- | syz-cluster/controller/processor.go | 30 |
1 files changed, 9 insertions, 21 deletions
diff --git a/syz-cluster/controller/processor.go b/syz-cluster/controller/processor.go index 1493b5852..532bfca3a 100644 --- a/syz-cluster/controller/processor.go +++ b/syz-cluster/controller/processor.go @@ -82,6 +82,7 @@ func (sp *SeriesProcessor) Loop(ctx context.Context) error { } func (sp *SeriesProcessor) streamSeries(ctx context.Context, ch chan<- *db.Session) { + var next *db.NextSession for { select { case <-ctx.Done(): @@ -92,20 +93,14 @@ func (sp *SeriesProcessor) streamSeries(ctx context.Context, ch chan<- *db.Sessi // There are still series to be picked, no need to query the DB. continue } - list, err := sp.seriesRepo.ListWithoutSession(ctx, cap(ch)) + var err error + var list []*db.Session + list, next, err = sp.sessionRepo.ListWaiting(ctx, next, cap(ch)) if err != nil { app.Errorf("failed to query series: %v", err) continue } - // Note: it seems that we here actively rely on Spanner's external consistency. - // E.g. once we add new Session, we expect to no longer see the series in - // the returned list. - for _, series := range list { - session, err := sp.createSession(ctx, series) - if err != nil { - app.Errorf("failed to create session for %q: %v", series.ID, err) - continue - } + for _, session := range list { ch <- session } } @@ -136,17 +131,6 @@ func (sp *SeriesProcessor) seriesRunner(ctx context.Context, ch <-chan *db.Sessi } } -func (sp *SeriesProcessor) createSession(ctx context.Context, series *db.Series) (*db.Session, error) { - session := &db.Session{ - CreatedAt: time.Now(), - } - err := sp.sessionRepo.Insert(ctx, series, session) - if err != nil { - return nil, err - } - return session, err -} - func (sp *SeriesProcessor) handleSession(ctx context.Context, session *db.Session) { // TODO: set some sane deadline or just track indefinitely? pollPeriod := sp.workflows.PollPeriod() @@ -169,6 +153,10 @@ func (sp *SeriesProcessor) handleSession(ctx context.Context, session *db.Sessio } switch status { case workflow.StatusNotFound: + if err := sp.sessionRepo.Start(ctx, session.ID); err != nil { + app.Errorf("failed to mark session started: %v", err) + break + } err := sp.workflows.Start(session.ID) if err != nil { app.Errorf("failed to start a workflow: %v", err) |
