aboutsummaryrefslogtreecommitdiffstats
path: root/syz-cluster/pkg/db
diff options
context:
space:
mode:
authorAleksandr Nogikh <nogikh@google.com>2025-05-12 14:34:19 +0200
committerAleksandr Nogikh <nogikh@google.com>2025-05-13 10:05:33 +0000
commit50f6de2f6c8b05193d0ca3f3881bd662b4babe60 (patch)
tree109620befa31a9696888a69a44a3cfaf06fa7cce /syz-cluster/pkg/db
parenta11966dc8beffc94c9f60f6ad1188b3a02f770f0 (diff)
syz-cluster: add reply tracking functionality
Add API to record replies under the reports that allows to determine the original report only by having the MessageID.
Diffstat (limited to 'syz-cluster/pkg/db')
-rw-r--r--syz-cluster/pkg/db/entities.go6
-rw-r--r--syz-cluster/pkg/db/migrations/1_initialize.up.sql13
-rw-r--r--syz-cluster/pkg/db/report_reply_repo.go92
-rw-r--r--syz-cluster/pkg/db/report_reply_repo_test.go61
-rw-r--r--syz-cluster/pkg/db/report_repo.go11
-rw-r--r--syz-cluster/pkg/db/report_repo_test.go22
-rw-r--r--syz-cluster/pkg/db/spanner.go7
-rw-r--r--syz-cluster/pkg/db/util_test.go8
8 files changed, 214 insertions, 6 deletions
diff --git a/syz-cluster/pkg/db/entities.go b/syz-cluster/pkg/db/entities.go
index a7f08f9ab..c6c469131 100644
--- a/syz-cluster/pkg/db/entities.go
+++ b/syz-cluster/pkg/db/entities.go
@@ -141,3 +141,9 @@ type SessionReport struct {
func (s *SessionReport) SetReportedAt(t time.Time) {
s.ReportedAt = spanner.NullTime{Time: t, Valid: true}
}
+
+type ReportReply struct {
+ MessageID string `spanner:"MessageID"`
+ ReportID string `spanner:"ReportID"`
+ Time time.Time `spanner:"Time"`
+}
diff --git a/syz-cluster/pkg/db/migrations/1_initialize.up.sql b/syz-cluster/pkg/db/migrations/1_initialize.up.sql
index ca91b343a..3100c9bb2 100644
--- a/syz-cluster/pkg/db/migrations/1_initialize.up.sql
+++ b/syz-cluster/pkg/db/migrations/1_initialize.up.sql
@@ -100,14 +100,23 @@ CREATE UNIQUE INDEX NoDupFindings ON Findings(SessionID, TestName, Title);
-- Session's bug reports.
CREATE TABLE SessionReports (
- ID STRING(36) NOT NULL, -- UUID??
+ ID STRING(36) NOT NULL, -- UUID
SessionID STRING(36) NOT NULL, -- UUID
ReportedAt TIMESTAMP,
Moderation BOOL,
- MessageID STRING(256),
+ MessageID STRING(512),
Reporter STRING(256),
CONSTRAINT FK_SessionReports FOREIGN KEY (SessionID) REFERENCES Sessions (ID),
) PRIMARY KEY(ID);
CREATE UNIQUE INDEX NoDupSessionReports ON SessionReports(SessionID, Moderation);
CREATE INDEX SessionReportsByStatus ON SessionReports (Reporter, ReportedAt);
+CREATE INDEX SessionReportsByMessageID ON SessionReports(Reporter, MessageID);
+
+-- Replies on a session report.
+CREATE TABLE ReportReplies (
+ MessageID STRING(512) NOT NULL, -- Gmail sets a limit of 500 characters for Message-ID
+ ReportID STRING(36) NOT NULL, -- UUID
+ Time TIMESTAMP,
+ CONSTRAINT FK_ReplyReportID FOREIGN KEY (ReportID) REFERENCES SessionReports (ID),
+) PRIMARY KEY(MessageID, ReportID);
diff --git a/syz-cluster/pkg/db/report_reply_repo.go b/syz-cluster/pkg/db/report_reply_repo.go
new file mode 100644
index 000000000..7e21403c2
--- /dev/null
+++ b/syz-cluster/pkg/db/report_reply_repo.go
@@ -0,0 +1,92 @@
+// Copyright 2025 syzkaller project authors. All rights reserved.
+// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
+
+package db
+
+import (
+ "context"
+ "errors"
+
+ "cloud.google.com/go/spanner"
+)
+
+type ReportReplyRepository struct {
+ client *spanner.Client
+}
+
+func NewReportReplyRepository(client *spanner.Client) *ReportReplyRepository {
+ return &ReportReplyRepository{
+ client: client,
+ }
+}
+
+func (repo *ReportReplyRepository) FindParentReportID(ctx context.Context, reporter, messageID string) (string, error) {
+ stmt := spanner.Statement{
+ SQL: "SELECT `ReportReplies`.ReportID FROM `ReportReplies` " +
+ "JOIN `SessionReports` ON `SessionReports`.ID = `ReportReplies`.ReportID " +
+ "WHERE `ReportReplies`.MessageID = @messageID " +
+ "AND `SessionReports`.Reporter = @reporter LIMIT 1",
+ Params: map[string]interface{}{
+ "reporter": reporter,
+ "messageID": messageID,
+ },
+ }
+ iter := repo.client.Single().Query(ctx, stmt)
+ defer iter.Stop()
+
+ type result struct {
+ ReportID string `spanner:"ReportID"`
+ }
+ ret, err := readOne[result](iter)
+ if err != nil {
+ return "", err
+ } else if ret != nil {
+ return ret.ReportID, nil
+ }
+ return "", nil
+}
+
+var ErrReportReplyExists = errors.New("the reply has already been recorded")
+
+func (repo *ReportReplyRepository) Insert(ctx context.Context, reply *ReportReply) error {
+ _, err := repo.client.ReadWriteTransaction(ctx,
+ func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
+ stmt := spanner.Statement{
+ SQL: "SELECT * from `ReportReplies` " +
+ "WHERE `ReportID`=@reportID AND `MessageID`=@messageID",
+ Params: map[string]interface{}{
+ "reportID": reply.ReportID,
+ "messageID": reply.MessageID,
+ },
+ }
+ iter := txn.Query(ctx, stmt)
+ entity, err := readOne[ReportReply](iter)
+ iter.Stop()
+ if err != nil {
+ return err
+ } else if entity != nil {
+ return ErrReportReplyExists
+ }
+ insert, err := spanner.InsertStruct("ReportReplies", reply)
+ if err != nil {
+ return err
+ }
+ return txn.BufferWrite([]*spanner.Mutation{insert})
+ })
+ return err
+}
+
+func (repo *ReportReplyRepository) LastForReporter(ctx context.Context, reporter string) (*ReportReply, error) {
+ stmt := spanner.Statement{
+ SQL: "SELECT `ReportReplies`.* FROM `ReportReplies` " +
+ "JOIN `SessionReports` ON `SessionReports`.ID=`ReportReplies`.ReportID " +
+ "WHERE `SessionReports`.Reporter=@reporter " +
+ "ORDER BY `Time` DESC LIMIT 1",
+ Params: map[string]interface{}{
+ "reporter": reporter,
+ },
+ }
+ iter := repo.client.Single().Query(ctx, stmt)
+ defer iter.Stop()
+ return readOne[ReportReply](iter)
+}
diff --git a/syz-cluster/pkg/db/report_reply_repo_test.go b/syz-cluster/pkg/db/report_reply_repo_test.go
new file mode 100644
index 000000000..27c2849cd
--- /dev/null
+++ b/syz-cluster/pkg/db/report_reply_repo_test.go
@@ -0,0 +1,61 @@
+// Copyright 2025 syzkaller project authors. All rights reserved.
+// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
+
+package db
+
+import (
+ "fmt"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestReportReplyRepository(t *testing.T) {
+ client, ctx := NewTransientDB(t)
+ dtd := &dummyTestData{t, ctx, client}
+ session := dtd.dummySession(dtd.dummySeries())
+
+ reportRepo := NewReportRepository(client)
+ report := &SessionReport{SessionID: session.ID, Reporter: dummyReporter}
+ err := reportRepo.Insert(ctx, report)
+ assert.NoError(t, err)
+
+ replyRepo := NewReportReplyRepository(client)
+ baseTime := time.Now()
+ for i := 0; i < 2; i++ {
+ err = replyRepo.Insert(ctx, &ReportReply{
+ MessageID: fmt.Sprintf("message-id-%d", i),
+ ReportID: report.ID,
+ Time: baseTime.Add(time.Duration(i) * time.Second),
+ })
+ assert.NoError(t, err)
+ }
+
+ t.Run("insert-dup-reply", func(t *testing.T) {
+ err := replyRepo.Insert(ctx, &ReportReply{
+ MessageID: "message-id-0",
+ ReportID: report.ID,
+ Time: time.Now(),
+ })
+ assert.Error(t, ErrReportReplyExists, err)
+ })
+
+ t.Run("last-report", func(t *testing.T) {
+ reply, err := replyRepo.LastForReporter(ctx, dummyReporter)
+ assert.NoError(t, err)
+ assert.Equal(t, "message-id-1", reply.MessageID)
+ })
+
+ t.Run("last-report-unknown", func(t *testing.T) {
+ reply, err := replyRepo.LastForReporter(ctx, "unknown-reporter")
+ assert.NoError(t, err)
+ assert.Nil(t, reply)
+ })
+
+ t.Run("find-by-parent", func(t *testing.T) {
+ reportID, err := replyRepo.FindParentReportID(ctx, dummyReporter, "message-id-0")
+ assert.NoError(t, err)
+ assert.Equal(t, report.ID, reportID)
+ })
+}
diff --git a/syz-cluster/pkg/db/report_repo.go b/syz-cluster/pkg/db/report_repo.go
index 4e039c4a7..37a09a746 100644
--- a/syz-cluster/pkg/db/report_repo.go
+++ b/syz-cluster/pkg/db/report_repo.go
@@ -44,3 +44,14 @@ func (repo *ReportRepository) ListNotReported(ctx context.Context, reporter stri
addLimit(&stmt, limit)
return repo.readEntities(ctx, stmt)
}
+
+func (repo *ReportRepository) FindByMessageID(ctx context.Context, reporter, messageID string) (*SessionReport, error) {
+ stmt := spanner.Statement{
+ SQL: "SELECT * FROM `SessionReports` WHERE `Reporter` = @reporter AND `MessageID` = @messageID",
+ Params: map[string]interface{}{
+ "reporter": reporter,
+ "messageID": messageID,
+ },
+ }
+ return repo.readEntity(ctx, stmt)
+}
diff --git a/syz-cluster/pkg/db/report_repo_test.go b/syz-cluster/pkg/db/report_repo_test.go
index bc81c37bd..a0cee2a11 100644
--- a/syz-cluster/pkg/db/report_repo_test.go
+++ b/syz-cluster/pkg/db/report_repo_test.go
@@ -40,16 +40,30 @@ func TestReportRepository(t *testing.T) {
assert.NoError(t, err)
assert.Len(t, list, 3)
+ const messageID = "message-id"
err = reportRepo.Update(ctx, keys[0], func(rep *SessionReport) error {
rep.SetReportedAt(time.Now())
+ rep.MessageID = messageID
return nil
})
assert.NoError(t, err)
- // Now one less.
- list, err = reportRepo.ListNotReported(ctx, dummyReporter, 10)
- assert.NoError(t, err)
- assert.Len(t, list, 2)
+ t.Run("not-reported-count", func(t *testing.T) {
+ // Now one less.
+ list, err := reportRepo.ListNotReported(ctx, dummyReporter, 10)
+ assert.NoError(t, err)
+ assert.Len(t, list, 2)
+ })
+ t.Run("find-by-id-found", func(t *testing.T) {
+ report, err := reportRepo.FindByMessageID(ctx, dummyReporter, messageID)
+ assert.NoError(t, err)
+ assert.NotNil(t, report)
+ })
+ t.Run("find-by-id-empty", func(t *testing.T) {
+ report, err := reportRepo.FindByMessageID(ctx, dummyReporter, "non-existing-id")
+ assert.NoError(t, err)
+ assert.Nil(t, report)
+ })
}
func TestSessionsWithoutReports(t *testing.T) {
diff --git a/syz-cluster/pkg/db/spanner.go b/syz-cluster/pkg/db/spanner.go
index d114727b8..8db081c14 100644
--- a/syz-cluster/pkg/db/spanner.go
+++ b/syz-cluster/pkg/db/spanner.go
@@ -333,3 +333,10 @@ func (g *genericEntityOps[EntityType, KeyType]) readEntities(ctx context.Context
defer iter.Stop()
return readEntities[EntityType](iter)
}
+
+func (g *genericEntityOps[EntityType, KeyType]) readEntity(ctx context.Context,
+ stmt spanner.Statement) (*EntityType, error) {
+ iter := g.client.Single().Query(ctx, stmt)
+ defer iter.Stop()
+ return readOne[EntityType](iter)
+}
diff --git a/syz-cluster/pkg/db/util_test.go b/syz-cluster/pkg/db/util_test.go
index 64fedeab2..cd9e1fff7 100644
--- a/syz-cluster/pkg/db/util_test.go
+++ b/syz-cluster/pkg/db/util_test.go
@@ -31,6 +31,14 @@ func (d *dummyTestData) addSessionTest(session *Session, names ...string) {
}
}
+func (d *dummyTestData) dummySeries() *Series {
+ seriesRepo := NewSeriesRepository(d.client)
+ series := &Series{ExtID: "series-ext-id"}
+ err := seriesRepo.Insert(d.ctx, series, nil)
+ assert.NoError(d.t, err)
+ return series
+}
+
func (d *dummyTestData) dummySession(series *Series) *Session {
sessionRepo := NewSessionRepository(d.client)
session := &Session{