aboutsummaryrefslogtreecommitdiffstats
path: root/syz-cluster/pkg/db
diff options
context:
space:
mode:
authorAleksandr Nogikh <nogikh@google.com>2025-02-06 19:35:29 +0100
committerAleksandr Nogikh <nogikh@google.com>2025-02-14 13:40:12 +0000
commit5b082ecabf223d8f52f6bb9e4acd8ace0bffc902 (patch)
treefec57f8b12bd6a583c68c87da9ef66d5c8e4de04 /syz-cluster/pkg/db
parent1022af749615bfc4e3a305b891449d95d5fc050a (diff)
syz-cluster: report series/sessions via API
In the previous version of the code, series-tracker was directly pushing patch series into the DB and the controller auto-created fuzzing sessions. Mediate these via the controller API instead. Instead of creating Session objects on the fly, pre-create them and let processor take them one by one. The approach has multiple benefits: 1) The same API might be used for the patch series sources other than LKML. 2) If the existence of Session objects is not a sign that we have started working on it, it allows for a more precise status display (not created/waiting/running/finished). 3) We could manually push older patch series and manually trigger fuzzing sessions to experimentally measure the bug detection rates. 4) The controller tests could be organized only by relying on the API offered by the component.
Diffstat (limited to 'syz-cluster/pkg/db')
-rw-r--r--syz-cluster/pkg/db/build_repo.go10
-rw-r--r--syz-cluster/pkg/db/entities.go25
-rw-r--r--syz-cluster/pkg/db/finding_repo_test.go5
-rw-r--r--syz-cluster/pkg/db/migrations/1_initialize.up.sql3
-rw-r--r--syz-cluster/pkg/db/series_repo.go26
-rw-r--r--syz-cluster/pkg/db/session_repo.go89
-rw-r--r--syz-cluster/pkg/db/session_repo_test.go62
-rw-r--r--syz-cluster/pkg/db/session_test_repo_test.go5
-rw-r--r--syz-cluster/pkg/db/spanner.go12
9 files changed, 170 insertions, 67 deletions
diff --git a/syz-cluster/pkg/db/build_repo.go b/syz-cluster/pkg/db/build_repo.go
index e87a18f63..08ef8253c 100644
--- a/syz-cluster/pkg/db/build_repo.go
+++ b/syz-cluster/pkg/db/build_repo.go
@@ -30,15 +30,7 @@ func (repo *BuildRepository) Insert(ctx context.Context, build *Build) error {
if build.ID == "" {
build.ID = uuid.New().String()
}
- _, err := repo.client.ReadWriteTransaction(ctx,
- func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
- m, err := spanner.InsertStruct("Builds", build)
- if err != nil {
- return err
- }
- return txn.BufferWrite([]*spanner.Mutation{m})
- })
- return err
+ return repo.genericEntityOps.Insert(ctx, build)
}
func (repo *BuildRepository) LastBuiltTree(ctx context.Context, arch, tree, config string) (*Build, error) {
diff --git a/syz-cluster/pkg/db/entities.go b/syz-cluster/pkg/db/entities.go
index 784a96a2e..db714e554 100644
--- a/syz-cluster/pkg/db/entities.go
+++ b/syz-cluster/pkg/db/entities.go
@@ -10,14 +10,15 @@ import (
)
type Series struct {
- ID string `spanner:"ID"`
- ExtID string `spanner:"ExtID"`
- AuthorName string `spanner:"AuthorName"`
- AuthorEmail string `spanner:"AuthorEmail"`
- Title string `spanner:"Title"`
- Link string `spanner:"Link"`
- Version int64 `spanner:"Version"`
- PublishedAt time.Time `spanner:"PublishedAt"`
+ ID string `spanner:"ID"`
+ ExtID string `spanner:"ExtID"`
+ AuthorName string `spanner:"AuthorName"`
+ AuthorEmail string `spanner:"AuthorEmail"`
+ Title string `spanner:"Title"`
+ Link string `spanner:"Link"`
+ Version int64 `spanner:"Version"`
+ PublishedAt time.Time `spanner:"PublishedAt"`
+ // TODO: we could ger rid of the field by using slightly more complicated SQL queries.
LatestSessionID spanner.NullString `spanner:"LatestSessionID"`
Cc []string `spanner:"Cc"`
}
@@ -62,9 +63,17 @@ type Session struct {
ID string `spanner:"ID"`
SeriesID string `spanner:"SeriesID"`
CreatedAt time.Time `spanner:"CreatedAt"`
+ StartedAt spanner.NullTime `spanner:"StartedAt"`
FinishedAt spanner.NullTime `spanner:"FinishedAt"`
SkipReason spanner.NullString `spanner:"SkipReason"`
LogURI string `spanner:"LogURI"`
+ Tags []string `spanner:"Tags"`
+ // TODO: to accept more specific fuzzing assignment,
+ // add Triager, BaseRepo, BaseCommit, Config fields.
+}
+
+func (s *Session) SetStartedAt(t time.Time) {
+ s.StartedAt = spanner.NullTime{Time: t, Valid: true}
}
func (s *Session) SetFinishedAt(t time.Time) {
diff --git a/syz-cluster/pkg/db/finding_repo_test.go b/syz-cluster/pkg/db/finding_repo_test.go
index 9afa22ede..ac18c1f5d 100644
--- a/syz-cluster/pkg/db/finding_repo_test.go
+++ b/syz-cluster/pkg/db/finding_repo_test.go
@@ -5,7 +5,6 @@ package db
import (
"testing"
- "time"
"github.com/google/syzkaller/syz-cluster/pkg/api"
"github.com/stretchr/testify/assert"
@@ -22,8 +21,8 @@ func TestFindingRepo(t *testing.T) {
err := seriesRepo.Insert(ctx, series, nil)
assert.NoError(t, err)
- session := &Session{CreatedAt: time.Now()}
- err = sessionRepo.Insert(ctx, series, session)
+ session := &Session{SeriesID: series.ID}
+ err = sessionRepo.Insert(ctx, session)
assert.NoError(t, err)
// Add test steps.
diff --git a/syz-cluster/pkg/db/migrations/1_initialize.up.sql b/syz-cluster/pkg/db/migrations/1_initialize.up.sql
index 4612956e7..0be0c7cef 100644
--- a/syz-cluster/pkg/db/migrations/1_initialize.up.sql
+++ b/syz-cluster/pkg/db/migrations/1_initialize.up.sql
@@ -52,10 +52,11 @@ CREATE TABLE Sessions (
ID STRING(36) NOT NULL, -- UUID
SeriesID STRING(36) NOT NULL,
CreatedAt TIMESTAMP NOT NULL,
+ StartedAt TIMESTAMP,
FinishedAt TIMESTAMP,
SkipReason STRING(1024),
LogURI STRING(512) NOT NULL,
- -- TODO: moderation/reporting.
+ Tags ARRAY<STRING(256)>,
CONSTRAINT FK_SeriesSessions FOREIGN KEY (SeriesID) REFERENCES Series (ID),
) PRIMARY KEY(ID);
diff --git a/syz-cluster/pkg/db/series_repo.go b/syz-cluster/pkg/db/series_repo.go
index 2c449f531..dbf271b42 100644
--- a/syz-cluster/pkg/db/series_repo.go
+++ b/syz-cluster/pkg/db/series_repo.go
@@ -33,6 +33,7 @@ func NewSeriesRepository(client *spanner.Client) *SeriesRepository {
}
// TODO: move to SeriesPatchesRepository?
+// nolint:dupl
func (repo *SeriesRepository) PatchByID(ctx context.Context, id string) (*Patch, error) {
stmt := spanner.Statement{
SQL: "SELECT * FROM Patches WHERE ID=@id",
@@ -43,6 +44,17 @@ func (repo *SeriesRepository) PatchByID(ctx context.Context, id string) (*Patch,
return readOne[Patch](iter)
}
+// nolint:dupl
+func (repo *SeriesRepository) GetByExtID(ctx context.Context, extID string) (*Series, error) {
+ stmt := spanner.Statement{
+ SQL: "SELECT * FROM Series WHERE ExtID=@extID",
+ Params: map[string]interface{}{"extID": extID},
+ }
+ iter := repo.client.Single().Query(ctx, stmt)
+ defer iter.Stop()
+ return readOne[Series](iter)
+}
+
var ErrSeriesExists = errors.New("the series already exists")
// Insert() checks whether there already exists a series with the same ExtID.
@@ -179,20 +191,6 @@ func (repo *SeriesRepository) ListLatest(ctx context.Context,
return ret, nil
}
-func (repo *SeriesRepository) ListWithoutSession(ctx context.Context, limit int) ([]*Series, error) {
- stmt := spanner.Statement{
- SQL: "SELECT * FROM Series WHERE `LatestSessionID` IS NULL ORDER BY `PublishedAt`",
- Params: map[string]interface{}{},
- }
- if limit > 0 {
- stmt.SQL += " LIMIT @limit"
- stmt.Params["limit"] = limit
- }
- iter := repo.client.Single().Query(ctx, stmt)
- defer iter.Stop()
- return readEntities[Series](iter)
-}
-
// golint sees too much similarity with SessionRepository's ListForSeries, but in reality there's not.
// nolint:dupl
func (repo *SeriesRepository) ListPatches(ctx context.Context, series *Series) ([]*Patch, error) {
diff --git a/syz-cluster/pkg/db/session_repo.go b/syz-cluster/pkg/db/session_repo.go
index 551f7e488..2ed5b8deb 100644
--- a/syz-cluster/pkg/db/session_repo.go
+++ b/syz-cluster/pkg/db/session_repo.go
@@ -5,6 +5,8 @@ package db
import (
"context"
+ "errors"
+ "time"
"cloud.google.com/go/spanner"
"github.com/google/uuid"
@@ -26,19 +28,32 @@ func NewSessionRepository(client *spanner.Client) *SessionRepository {
}
}
-func (repo *SessionRepository) Insert(ctx context.Context, series *Series, session *Session) error {
- if session.ID == "" {
- session.ID = uuid.New().String()
- }
+var ErrSessionAlreadyStarted = errors.New("the session already started")
+
+func (repo *SessionRepository) Start(ctx context.Context, sessionID string) error {
_, err := repo.client.ReadWriteTransaction(ctx,
func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
- // TODO: we need to update LastSessionID only for sessions whose results might
- // be reported to the author.
- stmt := spanner.Statement{
- SQL: "SELECT * from `Series` WHERE `ID`=@id",
- Params: map[string]interface{}{"id": series.ID},
+ iter := txn.Query(ctx, spanner.Statement{
+ SQL: "SELECT * from `Sessions` WHERE `ID`=@id",
+ Params: map[string]interface{}{"id": sessionID},
+ })
+ session, err := readOne[Session](iter)
+ iter.Stop()
+ if err != nil {
+ return err
+ }
+ if !session.StartedAt.IsNull() {
+ return ErrSessionAlreadyStarted
}
- iter := txn.Query(ctx, stmt)
+ session.SetStartedAt(time.Now())
+ updateSession, err := spanner.UpdateStruct("Sessions", session)
+ if err != nil {
+ return err
+ }
+ iter = txn.Query(ctx, spanner.Statement{
+ SQL: "SELECT * from `Series` WHERE `ID`=@id",
+ Params: map[string]interface{}{"id": session.SeriesID},
+ })
series, err := readOne[Series](iter)
iter.Stop()
if err != nil {
@@ -49,23 +64,63 @@ func (repo *SessionRepository) Insert(ctx context.Context, series *Series, sessi
if err != nil {
return err
}
- session.SeriesID = series.ID
- insertSession, err := spanner.InsertStruct("Sessions", session)
- if err != nil {
- return err
- }
- return txn.BufferWrite([]*spanner.Mutation{updateSeries, insertSession})
+ return txn.BufferWrite([]*spanner.Mutation{updateSeries, updateSession})
})
return err
}
+func (repo *SessionRepository) Insert(ctx context.Context, session *Session) error {
+ if session.ID == "" {
+ session.ID = uuid.New().String()
+ }
+ return repo.genericEntityOps.Insert(ctx, session)
+}
+
func (repo *SessionRepository) ListRunning(ctx context.Context) ([]*Session, error) {
- stmt := spanner.Statement{SQL: "SELECT * FROM `Sessions` WHERE `FinishedAt` IS NULL"}
+ stmt := spanner.Statement{SQL: "SELECT * FROM `Sessions` WHERE `StartedAt` IS NOT NULL " +
+ "AND `FinishedAt` IS NULL"}
iter := repo.client.Single().Query(ctx, stmt)
defer iter.Stop()
return readEntities[Session](iter)
}
+type NextSession struct {
+ id string
+ createdAt time.Time
+}
+
+func (repo *SessionRepository) ListWaiting(ctx context.Context, from *NextSession,
+ limit int) ([]*Session, *NextSession, error) {
+ // Here we assume that once the session is started, it never appears again.
+ stmt := spanner.Statement{
+ SQL: "SELECT * FROM `Sessions` WHERE `StartedAt` IS NULL",
+ Params: map[string]interface{}{},
+ }
+ if from != nil {
+ stmt.SQL += " AND ((`CreatedAt` > @from) OR (`CreatedAt` = @from AND `ID` > @id))"
+ stmt.Params["from"] = from.createdAt
+ stmt.Params["id"] = from.id
+ }
+ stmt.SQL += " ORDER BY `CreatedAt`, `ID`"
+ if limit > 0 {
+ stmt.SQL += " LIMIT @limit"
+ stmt.Params["limit"] = limit
+ }
+ iter := repo.client.Single().Query(ctx, stmt)
+ defer iter.Stop()
+ list, err := readEntities[Session](iter)
+
+ var next *NextSession
+ if err == nil && len(list) > 0 {
+ last := list[len(list)-1]
+ next = &NextSession{
+ id: last.ID,
+ createdAt: last.CreatedAt,
+ }
+ }
+ return list, next, err
+}
+
// golint sees too much similarity with SeriesRepository's ListPatches, but in reality there's not.
// nolint:dupl
func (repo *SessionRepository) ListForSeries(ctx context.Context, series *Series) ([]*Session, error) {
diff --git a/syz-cluster/pkg/db/session_repo_test.go b/syz-cluster/pkg/db/session_repo_test.go
index 55cee2f31..2a3b9bd69 100644
--- a/syz-cluster/pkg/db/session_repo_test.go
+++ b/syz-cluster/pkg/db/session_repo_test.go
@@ -19,25 +19,63 @@ func TestSeriesInsertSession(t *testing.T) {
err := seriesRepo.Insert(ctx, series, nil)
assert.NoError(t, err)
+ withSession := func(need int) {
+ list, err := seriesRepo.ListLatest(ctx, time.Time{}, 10)
+ assert.NoError(t, err)
+ var cnt int
+ for _, item := range list {
+ if item.Session != nil {
+ cnt++
+ }
+ }
+ assert.Equal(t, cnt, need)
+ }
+
// This series is indeed without a session.
- list, err := seriesRepo.ListWithoutSession(ctx, 10)
- assert.NoError(t, err)
- assert.Len(t, list, 1)
+ withSession(0)
// Add a new session.
- session := &Session{CreatedAt: time.Now()}
- err = sessionRepo.Insert(ctx, series, session)
+ session := &Session{SeriesID: series.ID}
+ err = sessionRepo.Insert(ctx, session)
assert.NoError(t, err)
- // All sessions are with sessions now.
- list, err = seriesRepo.ListWithoutSession(ctx, 10)
+ // The sessions is not started yet.
+ withSession(0)
+
+ // Now start it.
+ err = sessionRepo.Start(ctx, session.ID)
assert.NoError(t, err)
- assert.Len(t, list, 0)
+ withSession(1)
+}
- // We can also query the information together.
+func TestQueryWaitingSessions(t *testing.T) {
+ client, ctx := NewTransientDB(t)
+ sessionRepo := NewSessionRepository(client)
+ seriesRepo := NewSeriesRepository(client)
- list2, err := seriesRepo.ListLatest(ctx, time.Time{}, 0)
+ series := &Series{ExtID: "some-series"}
+ err := seriesRepo.Insert(ctx, series, nil)
assert.NoError(t, err)
- assert.Len(t, list2, 1)
- assert.NotNil(t, list2[0].Session)
+
+ nthTime := func(i int) time.Time {
+ return time.Date(2009, time.January, 1, 1, i, 0, 0, time.UTC)
+ }
+
+ for i := 0; i < 5; i++ {
+ session := &Session{
+ SeriesID: series.ID,
+ CreatedAt: nthTime(i),
+ }
+ err = sessionRepo.Insert(ctx, session)
+ assert.NoError(t, err)
+ }
+
+ var next *NextSession
+ for i := 0; i < 5; i++ {
+ var list []*Session
+ list, next, err = sessionRepo.ListWaiting(ctx, next, 1)
+ assert.NoError(t, err)
+ assert.Len(t, list, 1)
+ assert.Equal(t, nthTime(i), list[0].CreatedAt)
+ }
}
diff --git a/syz-cluster/pkg/db/session_test_repo_test.go b/syz-cluster/pkg/db/session_test_repo_test.go
index 2ecd696d3..f18349339 100644
--- a/syz-cluster/pkg/db/session_test_repo_test.go
+++ b/syz-cluster/pkg/db/session_test_repo_test.go
@@ -6,7 +6,6 @@ package db
import (
"fmt"
"testing"
- "time"
"cloud.google.com/go/spanner"
"github.com/google/syzkaller/syz-cluster/pkg/api"
@@ -24,8 +23,8 @@ func TestSessionTestRepository(t *testing.T) {
err := seriesRepo.Insert(ctx, series, nil)
assert.NoError(t, err)
- session := &Session{CreatedAt: time.Now()}
- err = sessionRepo.Insert(ctx, series, session)
+ session := &Session{SeriesID: series.ID}
+ err = sessionRepo.Insert(ctx, session)
assert.NoError(t, err)
build1 := &Build{TreeName: "mainline", Arch: "amd64", CommitHash: "abcd", Status: "success"}
diff --git a/syz-cluster/pkg/db/spanner.go b/syz-cluster/pkg/db/spanner.go
index a7d61f920..35020f3cd 100644
--- a/syz-cluster/pkg/db/spanner.go
+++ b/syz-cluster/pkg/db/spanner.go
@@ -304,3 +304,15 @@ func (g *genericEntityOps[EntityType, KeyType]) Update(ctx context.Context, key
})
return err
}
+
+func (g *genericEntityOps[EntityType, KeyType]) Insert(ctx context.Context, obj *EntityType) error {
+ _, err := g.client.ReadWriteTransaction(ctx,
+ func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
+ insert, err := spanner.InsertStruct(g.table, obj)
+ if err != nil {
+ return err
+ }
+ return txn.BufferWrite([]*spanner.Mutation{insert})
+ })
+ return err
+}