aboutsummaryrefslogtreecommitdiffstats
path: root/syz-cluster/controller/processor.go
diff options
context:
space:
mode:
authorAleksandr Nogikh <nogikh@google.com>2025-02-06 19:35:29 +0100
committerAleksandr Nogikh <nogikh@google.com>2025-02-14 13:40:12 +0000
commit5b082ecabf223d8f52f6bb9e4acd8ace0bffc902 (patch)
treefec57f8b12bd6a583c68c87da9ef66d5c8e4de04 /syz-cluster/controller/processor.go
parent1022af749615bfc4e3a305b891449d95d5fc050a (diff)
syz-cluster: report series/sessions via API
In the previous version of the code, series-tracker was directly pushing patch series into the DB and the controller auto-created fuzzing sessions. Mediate these via the controller API instead. Instead of creating Session objects on the fly, pre-create them and let processor take them one by one. The approach has multiple benefits: 1) The same API might be used for the patch series sources other than LKML. 2) If the existence of Session objects is not a sign that we have started working on it, it allows for a more precise status display (not created/waiting/running/finished). 3) We could manually push older patch series and manually trigger fuzzing sessions to experimentally measure the bug detection rates. 4) The controller tests could be organized only by relying on the API offered by the component.
Diffstat (limited to 'syz-cluster/controller/processor.go')
-rw-r--r--syz-cluster/controller/processor.go30
1 files changed, 9 insertions, 21 deletions
diff --git a/syz-cluster/controller/processor.go b/syz-cluster/controller/processor.go
index 1493b5852..532bfca3a 100644
--- a/syz-cluster/controller/processor.go
+++ b/syz-cluster/controller/processor.go
@@ -82,6 +82,7 @@ func (sp *SeriesProcessor) Loop(ctx context.Context) error {
}
func (sp *SeriesProcessor) streamSeries(ctx context.Context, ch chan<- *db.Session) {
+ var next *db.NextSession
for {
select {
case <-ctx.Done():
@@ -92,20 +93,14 @@ func (sp *SeriesProcessor) streamSeries(ctx context.Context, ch chan<- *db.Sessi
// There are still series to be picked, no need to query the DB.
continue
}
- list, err := sp.seriesRepo.ListWithoutSession(ctx, cap(ch))
+ var err error
+ var list []*db.Session
+ list, next, err = sp.sessionRepo.ListWaiting(ctx, next, cap(ch))
if err != nil {
app.Errorf("failed to query series: %v", err)
continue
}
- // Note: it seems that we here actively rely on Spanner's external consistency.
- // E.g. once we add new Session, we expect to no longer see the series in
- // the returned list.
- for _, series := range list {
- session, err := sp.createSession(ctx, series)
- if err != nil {
- app.Errorf("failed to create session for %q: %v", series.ID, err)
- continue
- }
+ for _, session := range list {
ch <- session
}
}
@@ -136,17 +131,6 @@ func (sp *SeriesProcessor) seriesRunner(ctx context.Context, ch <-chan *db.Sessi
}
}
-func (sp *SeriesProcessor) createSession(ctx context.Context, series *db.Series) (*db.Session, error) {
- session := &db.Session{
- CreatedAt: time.Now(),
- }
- err := sp.sessionRepo.Insert(ctx, series, session)
- if err != nil {
- return nil, err
- }
- return session, err
-}
-
func (sp *SeriesProcessor) handleSession(ctx context.Context, session *db.Session) {
// TODO: set some sane deadline or just track indefinitely?
pollPeriod := sp.workflows.PollPeriod()
@@ -169,6 +153,10 @@ func (sp *SeriesProcessor) handleSession(ctx context.Context, session *db.Sessio
}
switch status {
case workflow.StatusNotFound:
+ if err := sp.sessionRepo.Start(ctx, session.ID); err != nil {
+ app.Errorf("failed to mark session started: %v", err)
+ break
+ }
err := sp.workflows.Start(session.ID)
if err != nil {
app.Errorf("failed to start a workflow: %v", err)