From 5b082ecabf223d8f52f6bb9e4acd8ace0bffc902 Mon Sep 17 00:00:00 2001 From: Aleksandr Nogikh Date: Thu, 6 Feb 2025 19:35:29 +0100 Subject: syz-cluster: report series/sessions via API In the previous version of the code, series-tracker was directly pushing patch series into the DB and the controller auto-created fuzzing sessions. Mediate these via the controller API instead. Instead of creating Session objects on the fly, pre-create them and let processor take them one by one. The approach has multiple benefits: 1) The same API might be used for the patch series sources other than LKML. 2) If the existence of Session objects is not a sign that we have started working on it, it allows for a more precise status display (not created/waiting/running/finished). 3) We could manually push older patch series and manually trigger fuzzing sessions to experimentally measure the bug detection rates. 4) The controller tests could be organized only by relying on the API offered by the component. --- syz-cluster/controller/processor.go | 30 +++++++++--------------------- 1 file changed, 9 insertions(+), 21 deletions(-) (limited to 'syz-cluster/controller/processor.go') diff --git a/syz-cluster/controller/processor.go b/syz-cluster/controller/processor.go index 1493b5852..532bfca3a 100644 --- a/syz-cluster/controller/processor.go +++ b/syz-cluster/controller/processor.go @@ -82,6 +82,7 @@ func (sp *SeriesProcessor) Loop(ctx context.Context) error { } func (sp *SeriesProcessor) streamSeries(ctx context.Context, ch chan<- *db.Session) { + var next *db.NextSession for { select { case <-ctx.Done(): @@ -92,20 +93,14 @@ func (sp *SeriesProcessor) streamSeries(ctx context.Context, ch chan<- *db.Sessi // There are still series to be picked, no need to query the DB. continue } - list, err := sp.seriesRepo.ListWithoutSession(ctx, cap(ch)) + var err error + var list []*db.Session + list, next, err = sp.sessionRepo.ListWaiting(ctx, next, cap(ch)) if err != nil { app.Errorf("failed to query series: %v", err) continue } - // Note: it seems that we here actively rely on Spanner's external consistency. - // E.g. once we add new Session, we expect to no longer see the series in - // the returned list. - for _, series := range list { - session, err := sp.createSession(ctx, series) - if err != nil { - app.Errorf("failed to create session for %q: %v", series.ID, err) - continue - } + for _, session := range list { ch <- session } } @@ -136,17 +131,6 @@ func (sp *SeriesProcessor) seriesRunner(ctx context.Context, ch <-chan *db.Sessi } } -func (sp *SeriesProcessor) createSession(ctx context.Context, series *db.Series) (*db.Session, error) { - session := &db.Session{ - CreatedAt: time.Now(), - } - err := sp.sessionRepo.Insert(ctx, series, session) - if err != nil { - return nil, err - } - return session, err -} - func (sp *SeriesProcessor) handleSession(ctx context.Context, session *db.Session) { // TODO: set some sane deadline or just track indefinitely? pollPeriod := sp.workflows.PollPeriod() @@ -169,6 +153,10 @@ func (sp *SeriesProcessor) handleSession(ctx context.Context, session *db.Sessio } switch status { case workflow.StatusNotFound: + if err := sp.sessionRepo.Start(ctx, session.ID); err != nil { + app.Errorf("failed to mark session started: %v", err) + break + } err := sp.workflows.Start(session.ID) if err != nil { app.Errorf("failed to start a workflow: %v", err) -- cgit mrf-deployment