diff options
| -rw-r--r-- | syz-cluster/pkg/controller/api_test.go | 41 | ||||
| -rw-r--r-- | syz-cluster/pkg/db/finding_repo.go | 79 | ||||
| -rw-r--r-- | syz-cluster/pkg/db/finding_repo_test.go | 6 | ||||
| -rw-r--r-- | syz-cluster/pkg/db/report_repo_test.go | 2 | ||||
| -rw-r--r-- | syz-cluster/pkg/db/util_test.go | 2 | ||||
| -rw-r--r-- | syz-cluster/pkg/service/finding.go | 40 |
6 files changed, 132 insertions, 38 deletions
diff --git a/syz-cluster/pkg/controller/api_test.go b/syz-cluster/pkg/controller/api_test.go index 1c1c8de9f..e64f3bc78 100644 --- a/syz-cluster/pkg/controller/api_test.go +++ b/syz-cluster/pkg/controller/api_test.go @@ -10,7 +10,9 @@ import ( "github.com/google/syzkaller/syz-cluster/pkg/api" "github.com/google/syzkaller/syz-cluster/pkg/app" + "github.com/google/syzkaller/syz-cluster/pkg/db" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestAPIGetSeries(t *testing.T) { @@ -70,17 +72,52 @@ func TestAPISaveFinding(t *testing.T) { finding := &api.NewFinding{ SessionID: ids.SessionID, TestName: "test", + Title: "title", Report: []byte("report"), Log: []byte("log"), SyzRepro: []byte("syz repro"), SyzReproOpts: []byte("syz_repro_opts"), - CRepro: []byte("C repro"), } err = client.UploadFinding(ctx, finding) assert.NoError(t, err) - // Even if the finding is reported the second time, it must still not fail. + // Even if the same finding is reported the second time, it must still not fail. + err = client.UploadFinding(ctx, finding) + assert.NoError(t, err) + }) + + t.Run("add C repro", func(t *testing.T) { + finding := &api.NewFinding{ + SessionID: ids.SessionID, + TestName: "test", + Title: "title", + Report: []byte("report"), + Log: []byte("log"), + SyzRepro: []byte("syz repro"), + SyzReproOpts: []byte("syz_repro_opts"), + CRepro: []byte("C repro"), + } err = client.UploadFinding(ctx, finding) assert.NoError(t, err) + // Verify that C repro has appeared indeed. + findingRepo := db.NewFindingRepository(env.Spanner) + findings, err := findingRepo.ListForSession(ctx, ids.SessionID, db.NoLimit) + require.NoError(t, err) + require.Len(t, findings, 1) + assert.NotEmpty(t, findings[0].CReproURI) + }) + + t.Run("session stopped", func(t *testing.T) { + MarkSessionFinished(t, env, ids.SessionID) + finding := &api.NewFinding{ + SessionID: ids.SessionID, + TestName: "test", + Title: "new title", + Report: []byte("report"), + Log: []byte("log"), + SyzRepro: []byte("syz repro"), + } + err = client.UploadFinding(ctx, finding) + assert.ErrorContains(t, err, "session is already finished") }) } diff --git a/syz-cluster/pkg/db/finding_repo.go b/syz-cluster/pkg/db/finding_repo.go index b5ce94534..3472a72bf 100644 --- a/syz-cluster/pkg/db/finding_repo.go +++ b/syz-cluster/pkg/db/finding_repo.go @@ -9,7 +9,6 @@ import ( "cloud.google.com/go/spanner" "github.com/google/uuid" - "google.golang.org/api/iterator" ) type FindingRepository struct { @@ -28,42 +27,86 @@ func NewFindingRepository(client *spanner.Client) *FindingRepository { } } -var ErrFindingExists = errors.New("the finding already exists") +type FindingID struct { + SessionID string + TestName string + Title string +} -// Save either adds the finding to the database or returns ErrFindingExists. -func (repo *FindingRepository) Save(ctx context.Context, finding *Finding) error { - if finding.ID == "" { - finding.ID = uuid.NewString() - } +// Store queries the information about the session and the existing finding and then +// requests a new Finding object to replace the old one. +// If the callback returns nil, nothing it updated. +func (repo *FindingRepository) Store(ctx context.Context, id *FindingID, + cb func(session *Session, old *Finding) (*Finding, error)) error { _, err := repo.client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error { - // Check if there is still no such finding. + // Query the existing finding, if it exists. stmt := spanner.Statement{ SQL: "SELECT * from `Findings` WHERE `SessionID`=@sessionID " + "AND `TestName` = @testName AND `Title`=@title", Params: map[string]interface{}{ - "sessionID": finding.SessionID, - "testName": finding.TestName, - "title": finding.Title, + "sessionID": id.SessionID, + "testName": id.TestName, + "title": id.Title, }, } iter := txn.Query(ctx, stmt) - defer iter.Stop() - _, iterErr := iter.Next() - if iterErr == nil { - return ErrFindingExists - } else if iterErr != iterator.Done { - return iterErr + oldFinding, err := readOne[Finding](iter) + iter.Stop() + if err != nil { + return err + } + // Query the Session object. + stmt = spanner.Statement{ + SQL: "SELECT * FROM `Sessions` WHERE `ID`=@id", + Params: map[string]interface{}{"id": id.SessionID}, + } + iter = txn.Query(ctx, stmt) + session, err := readOne[Session](iter) + iter.Stop() + if err != nil { + return err } + // Query the callback. + finding, err := cb(session, oldFinding) + if err != nil { + return err + } else if finding == nil { + return nil // Just abort. + } else if finding.ID == "" { + finding.ID = uuid.NewString() + } + // Insert the finding. m, err := spanner.InsertStruct("Findings", finding) if err != nil { return err } - return txn.BufferWrite([]*spanner.Mutation{m}) + var mutations []*spanner.Mutation + if oldFinding != nil { + mutations = append(mutations, spanner.Delete("Findings", spanner.Key{oldFinding.ID})) + } + mutations = append(mutations, m) + return txn.BufferWrite(mutations) }) return err } +var errFindingExists = errors.New("the finding already exists") + +// A helper for tests. +func (repo *FindingRepository) mustStore(ctx context.Context, finding *Finding) error { + return repo.Store(ctx, &FindingID{ + SessionID: finding.SessionID, + TestName: finding.TestName, + Title: finding.Title, + }, func(_ *Session, old *Finding) (*Finding, error) { + if old != nil { + return nil, errFindingExists + } + return finding, nil + }) +} + // nolint: dupl func (repo *FindingRepository) ListForSession(ctx context.Context, sessionID string, limit int) ([]*Finding, error) { stmt := spanner.Statement{ diff --git a/syz-cluster/pkg/db/finding_repo_test.go b/syz-cluster/pkg/db/finding_repo_test.go index 291bb0129..4d352270b 100644 --- a/syz-cluster/pkg/db/finding_repo_test.go +++ b/syz-cluster/pkg/db/finding_repo_test.go @@ -44,13 +44,13 @@ func TestFindingRepo(t *testing.T) { } // Insert them all. for _, finding := range toInsert { - err := findingRepo.Save(ctx, finding) + err := findingRepo.mustStore(ctx, finding) assert.NoError(t, err, "finding=%q", finding) } // Now it should report a duplicate each time. for _, finding := range toInsert { - err := findingRepo.Save(ctx, finding) - assert.ErrorIs(t, err, ErrFindingExists) + err := findingRepo.mustStore(ctx, finding) + assert.ErrorIs(t, err, errFindingExists) } list, err := findingRepo.ListForSession(ctx, session.ID, NoLimit) diff --git a/syz-cluster/pkg/db/report_repo_test.go b/syz-cluster/pkg/db/report_repo_test.go index a9e23fd75..dfcce7e5b 100644 --- a/syz-cluster/pkg/db/report_repo_test.go +++ b/syz-cluster/pkg/db/report_repo_test.go @@ -79,7 +79,7 @@ func TestSessionsWithoutReports(t *testing.T) { Result: api.TestPassed, }, nil) assert.NoError(t, err) - err = findingRepo.Save(ctx, &Finding{ + err = findingRepo.mustStore(ctx, &Finding{ SessionID: session.ID, TestName: "test", Title: "A", diff --git a/syz-cluster/pkg/db/util_test.go b/syz-cluster/pkg/db/util_test.go index cd9e1fff7..6b3d7a3df 100644 --- a/syz-cluster/pkg/db/util_test.go +++ b/syz-cluster/pkg/db/util_test.go @@ -67,7 +67,7 @@ func (d *dummyTestData) finishSession(session *Session) { func (d *dummyTestData) addFinding(session *Session, title, test string) { findingRepo := NewFindingRepository(d.client) - assert.NoError(d.t, findingRepo.Save(d.ctx, &Finding{ + assert.NoError(d.t, findingRepo.mustStore(d.ctx, &Finding{ SessionID: session.ID, Title: title, TestName: test, diff --git a/syz-cluster/pkg/service/finding.go b/syz-cluster/pkg/service/finding.go index 001860e1c..df5f7c2d9 100644 --- a/syz-cluster/pkg/service/finding.go +++ b/syz-cluster/pkg/service/finding.go @@ -34,20 +34,40 @@ func NewFindingService(env *app.AppEnvironment) *FindingService { } func (s *FindingService) Save(ctx context.Context, req *api.NewFinding) error { - finding := &db.Finding{ - ID: uuid.NewString(), + return s.findingRepo.Store(ctx, &db.FindingID{ SessionID: req.SessionID, TestName: req.TestName, Title: req.Title, - } - var err error + }, func(session *db.Session, old *db.Finding) (*db.Finding, error) { + if !session.FinishedAt.IsNull() { + // We may have already sent a report, so the findings must stay as they are. + return nil, fmt.Errorf("session is already finished") + } + if old != nil && (old.CReproURI != "" || len(req.CRepro) == 0) { + // The existing finding already has a C reproducer, no reason to update. + return nil, nil + } + finding := &db.Finding{ + ID: uuid.NewString(), + SessionID: req.SessionID, + TestName: req.TestName, + Title: req.Title, + } + // TODO: if it's not actually addded, these blobs will be orphaned. + err := s.saveAssets(finding, req) + if err != nil { + return nil, err + } + return finding, nil + }) +} +func (s *FindingService) saveAssets(finding *db.Finding, req *api.NewFinding) error { type saveAsset struct { saveTo *string value []byte name string } - for _, asset := range []saveAsset{ {&finding.LogURI, req.Log, "log"}, {&finding.ReportURI, req.Report, "report"}, @@ -58,19 +78,13 @@ func (s *FindingService) Save(ctx context.Context, req *api.NewFinding) error { if len(asset.value) == 0 { continue } + var err error *asset.saveTo, err = s.blobStorage.Write(bytes.NewReader(asset.value), "Finding", finding.ID, asset.name) if err != nil { return fmt.Errorf("failed to save %s: %w", asset.name, err) } } - - // TODO: if it's not actually addded, the blobs above will be orphaned. - err = s.findingRepo.Save(ctx, finding) - if err == db.ErrFindingExists { - // It's ok, just ignore. - return nil - } - return err + return nil } func (s *FindingService) List(ctx context.Context, sessionID string, limit int) ([]*api.Finding, error) { |
