aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--syz-cluster/pkg/controller/api_test.go41
-rw-r--r--syz-cluster/pkg/db/finding_repo.go79
-rw-r--r--syz-cluster/pkg/db/finding_repo_test.go6
-rw-r--r--syz-cluster/pkg/db/report_repo_test.go2
-rw-r--r--syz-cluster/pkg/db/util_test.go2
-rw-r--r--syz-cluster/pkg/service/finding.go40
6 files changed, 132 insertions, 38 deletions
diff --git a/syz-cluster/pkg/controller/api_test.go b/syz-cluster/pkg/controller/api_test.go
index 1c1c8de9f..e64f3bc78 100644
--- a/syz-cluster/pkg/controller/api_test.go
+++ b/syz-cluster/pkg/controller/api_test.go
@@ -10,7 +10,9 @@ import (
"github.com/google/syzkaller/syz-cluster/pkg/api"
"github.com/google/syzkaller/syz-cluster/pkg/app"
+ "github.com/google/syzkaller/syz-cluster/pkg/db"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
func TestAPIGetSeries(t *testing.T) {
@@ -70,17 +72,52 @@ func TestAPISaveFinding(t *testing.T) {
finding := &api.NewFinding{
SessionID: ids.SessionID,
TestName: "test",
+ Title: "title",
Report: []byte("report"),
Log: []byte("log"),
SyzRepro: []byte("syz repro"),
SyzReproOpts: []byte("syz_repro_opts"),
- CRepro: []byte("C repro"),
}
err = client.UploadFinding(ctx, finding)
assert.NoError(t, err)
- // Even if the finding is reported the second time, it must still not fail.
+ // Even if the same finding is reported the second time, it must still not fail.
+ err = client.UploadFinding(ctx, finding)
+ assert.NoError(t, err)
+ })
+
+ t.Run("add C repro", func(t *testing.T) {
+ finding := &api.NewFinding{
+ SessionID: ids.SessionID,
+ TestName: "test",
+ Title: "title",
+ Report: []byte("report"),
+ Log: []byte("log"),
+ SyzRepro: []byte("syz repro"),
+ SyzReproOpts: []byte("syz_repro_opts"),
+ CRepro: []byte("C repro"),
+ }
err = client.UploadFinding(ctx, finding)
assert.NoError(t, err)
+ // Verify that C repro has appeared indeed.
+ findingRepo := db.NewFindingRepository(env.Spanner)
+ findings, err := findingRepo.ListForSession(ctx, ids.SessionID, db.NoLimit)
+ require.NoError(t, err)
+ require.Len(t, findings, 1)
+ assert.NotEmpty(t, findings[0].CReproURI)
+ })
+
+ t.Run("session stopped", func(t *testing.T) {
+ MarkSessionFinished(t, env, ids.SessionID)
+ finding := &api.NewFinding{
+ SessionID: ids.SessionID,
+ TestName: "test",
+ Title: "new title",
+ Report: []byte("report"),
+ Log: []byte("log"),
+ SyzRepro: []byte("syz repro"),
+ }
+ err = client.UploadFinding(ctx, finding)
+ assert.ErrorContains(t, err, "session is already finished")
})
}
diff --git a/syz-cluster/pkg/db/finding_repo.go b/syz-cluster/pkg/db/finding_repo.go
index b5ce94534..3472a72bf 100644
--- a/syz-cluster/pkg/db/finding_repo.go
+++ b/syz-cluster/pkg/db/finding_repo.go
@@ -9,7 +9,6 @@ import (
"cloud.google.com/go/spanner"
"github.com/google/uuid"
- "google.golang.org/api/iterator"
)
type FindingRepository struct {
@@ -28,42 +27,86 @@ func NewFindingRepository(client *spanner.Client) *FindingRepository {
}
}
-var ErrFindingExists = errors.New("the finding already exists")
+type FindingID struct {
+ SessionID string
+ TestName string
+ Title string
+}
-// Save either adds the finding to the database or returns ErrFindingExists.
-func (repo *FindingRepository) Save(ctx context.Context, finding *Finding) error {
- if finding.ID == "" {
- finding.ID = uuid.NewString()
- }
+// Store queries the information about the session and the existing finding and then
+// requests a new Finding object to replace the old one.
+// If the callback returns nil, nothing it updated.
+func (repo *FindingRepository) Store(ctx context.Context, id *FindingID,
+ cb func(session *Session, old *Finding) (*Finding, error)) error {
_, err := repo.client.ReadWriteTransaction(ctx,
func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
- // Check if there is still no such finding.
+ // Query the existing finding, if it exists.
stmt := spanner.Statement{
SQL: "SELECT * from `Findings` WHERE `SessionID`=@sessionID " +
"AND `TestName` = @testName AND `Title`=@title",
Params: map[string]interface{}{
- "sessionID": finding.SessionID,
- "testName": finding.TestName,
- "title": finding.Title,
+ "sessionID": id.SessionID,
+ "testName": id.TestName,
+ "title": id.Title,
},
}
iter := txn.Query(ctx, stmt)
- defer iter.Stop()
- _, iterErr := iter.Next()
- if iterErr == nil {
- return ErrFindingExists
- } else if iterErr != iterator.Done {
- return iterErr
+ oldFinding, err := readOne[Finding](iter)
+ iter.Stop()
+ if err != nil {
+ return err
+ }
+ // Query the Session object.
+ stmt = spanner.Statement{
+ SQL: "SELECT * FROM `Sessions` WHERE `ID`=@id",
+ Params: map[string]interface{}{"id": id.SessionID},
+ }
+ iter = txn.Query(ctx, stmt)
+ session, err := readOne[Session](iter)
+ iter.Stop()
+ if err != nil {
+ return err
}
+ // Query the callback.
+ finding, err := cb(session, oldFinding)
+ if err != nil {
+ return err
+ } else if finding == nil {
+ return nil // Just abort.
+ } else if finding.ID == "" {
+ finding.ID = uuid.NewString()
+ }
+ // Insert the finding.
m, err := spanner.InsertStruct("Findings", finding)
if err != nil {
return err
}
- return txn.BufferWrite([]*spanner.Mutation{m})
+ var mutations []*spanner.Mutation
+ if oldFinding != nil {
+ mutations = append(mutations, spanner.Delete("Findings", spanner.Key{oldFinding.ID}))
+ }
+ mutations = append(mutations, m)
+ return txn.BufferWrite(mutations)
})
return err
}
+var errFindingExists = errors.New("the finding already exists")
+
+// A helper for tests.
+func (repo *FindingRepository) mustStore(ctx context.Context, finding *Finding) error {
+ return repo.Store(ctx, &FindingID{
+ SessionID: finding.SessionID,
+ TestName: finding.TestName,
+ Title: finding.Title,
+ }, func(_ *Session, old *Finding) (*Finding, error) {
+ if old != nil {
+ return nil, errFindingExists
+ }
+ return finding, nil
+ })
+}
+
// nolint: dupl
func (repo *FindingRepository) ListForSession(ctx context.Context, sessionID string, limit int) ([]*Finding, error) {
stmt := spanner.Statement{
diff --git a/syz-cluster/pkg/db/finding_repo_test.go b/syz-cluster/pkg/db/finding_repo_test.go
index 291bb0129..4d352270b 100644
--- a/syz-cluster/pkg/db/finding_repo_test.go
+++ b/syz-cluster/pkg/db/finding_repo_test.go
@@ -44,13 +44,13 @@ func TestFindingRepo(t *testing.T) {
}
// Insert them all.
for _, finding := range toInsert {
- err := findingRepo.Save(ctx, finding)
+ err := findingRepo.mustStore(ctx, finding)
assert.NoError(t, err, "finding=%q", finding)
}
// Now it should report a duplicate each time.
for _, finding := range toInsert {
- err := findingRepo.Save(ctx, finding)
- assert.ErrorIs(t, err, ErrFindingExists)
+ err := findingRepo.mustStore(ctx, finding)
+ assert.ErrorIs(t, err, errFindingExists)
}
list, err := findingRepo.ListForSession(ctx, session.ID, NoLimit)
diff --git a/syz-cluster/pkg/db/report_repo_test.go b/syz-cluster/pkg/db/report_repo_test.go
index a9e23fd75..dfcce7e5b 100644
--- a/syz-cluster/pkg/db/report_repo_test.go
+++ b/syz-cluster/pkg/db/report_repo_test.go
@@ -79,7 +79,7 @@ func TestSessionsWithoutReports(t *testing.T) {
Result: api.TestPassed,
}, nil)
assert.NoError(t, err)
- err = findingRepo.Save(ctx, &Finding{
+ err = findingRepo.mustStore(ctx, &Finding{
SessionID: session.ID,
TestName: "test",
Title: "A",
diff --git a/syz-cluster/pkg/db/util_test.go b/syz-cluster/pkg/db/util_test.go
index cd9e1fff7..6b3d7a3df 100644
--- a/syz-cluster/pkg/db/util_test.go
+++ b/syz-cluster/pkg/db/util_test.go
@@ -67,7 +67,7 @@ func (d *dummyTestData) finishSession(session *Session) {
func (d *dummyTestData) addFinding(session *Session, title, test string) {
findingRepo := NewFindingRepository(d.client)
- assert.NoError(d.t, findingRepo.Save(d.ctx, &Finding{
+ assert.NoError(d.t, findingRepo.mustStore(d.ctx, &Finding{
SessionID: session.ID,
Title: title,
TestName: test,
diff --git a/syz-cluster/pkg/service/finding.go b/syz-cluster/pkg/service/finding.go
index 001860e1c..df5f7c2d9 100644
--- a/syz-cluster/pkg/service/finding.go
+++ b/syz-cluster/pkg/service/finding.go
@@ -34,20 +34,40 @@ func NewFindingService(env *app.AppEnvironment) *FindingService {
}
func (s *FindingService) Save(ctx context.Context, req *api.NewFinding) error {
- finding := &db.Finding{
- ID: uuid.NewString(),
+ return s.findingRepo.Store(ctx, &db.FindingID{
SessionID: req.SessionID,
TestName: req.TestName,
Title: req.Title,
- }
- var err error
+ }, func(session *db.Session, old *db.Finding) (*db.Finding, error) {
+ if !session.FinishedAt.IsNull() {
+ // We may have already sent a report, so the findings must stay as they are.
+ return nil, fmt.Errorf("session is already finished")
+ }
+ if old != nil && (old.CReproURI != "" || len(req.CRepro) == 0) {
+ // The existing finding already has a C reproducer, no reason to update.
+ return nil, nil
+ }
+ finding := &db.Finding{
+ ID: uuid.NewString(),
+ SessionID: req.SessionID,
+ TestName: req.TestName,
+ Title: req.Title,
+ }
+ // TODO: if it's not actually addded, these blobs will be orphaned.
+ err := s.saveAssets(finding, req)
+ if err != nil {
+ return nil, err
+ }
+ return finding, nil
+ })
+}
+func (s *FindingService) saveAssets(finding *db.Finding, req *api.NewFinding) error {
type saveAsset struct {
saveTo *string
value []byte
name string
}
-
for _, asset := range []saveAsset{
{&finding.LogURI, req.Log, "log"},
{&finding.ReportURI, req.Report, "report"},
@@ -58,19 +78,13 @@ func (s *FindingService) Save(ctx context.Context, req *api.NewFinding) error {
if len(asset.value) == 0 {
continue
}
+ var err error
*asset.saveTo, err = s.blobStorage.Write(bytes.NewReader(asset.value), "Finding", finding.ID, asset.name)
if err != nil {
return fmt.Errorf("failed to save %s: %w", asset.name, err)
}
}
-
- // TODO: if it's not actually addded, the blobs above will be orphaned.
- err = s.findingRepo.Save(ctx, finding)
- if err == db.ErrFindingExists {
- // It's ok, just ignore.
- return nil
- }
- return err
+ return nil
}
func (s *FindingService) List(ctx context.Context, sessionID string, limit int) ([]*api.Finding, error) {