diff options
| author | Aleksandr Nogikh <nogikh@google.com> | 2025-05-12 14:34:19 +0200 |
|---|---|---|
| committer | Aleksandr Nogikh <nogikh@google.com> | 2025-05-13 10:05:33 +0000 |
| commit | 50f6de2f6c8b05193d0ca3f3881bd662b4babe60 (patch) | |
| tree | 109620befa31a9696888a69a44a3cfaf06fa7cce /syz-cluster/pkg/db | |
| parent | a11966dc8beffc94c9f60f6ad1188b3a02f770f0 (diff) | |
syz-cluster: add reply tracking functionality
Add API to record replies under the reports that allows to determine the
original report only by having the MessageID.
Diffstat (limited to 'syz-cluster/pkg/db')
| -rw-r--r-- | syz-cluster/pkg/db/entities.go | 6 | ||||
| -rw-r--r-- | syz-cluster/pkg/db/migrations/1_initialize.up.sql | 13 | ||||
| -rw-r--r-- | syz-cluster/pkg/db/report_reply_repo.go | 92 | ||||
| -rw-r--r-- | syz-cluster/pkg/db/report_reply_repo_test.go | 61 | ||||
| -rw-r--r-- | syz-cluster/pkg/db/report_repo.go | 11 | ||||
| -rw-r--r-- | syz-cluster/pkg/db/report_repo_test.go | 22 | ||||
| -rw-r--r-- | syz-cluster/pkg/db/spanner.go | 7 | ||||
| -rw-r--r-- | syz-cluster/pkg/db/util_test.go | 8 |
8 files changed, 214 insertions, 6 deletions
diff --git a/syz-cluster/pkg/db/entities.go b/syz-cluster/pkg/db/entities.go index a7f08f9ab..c6c469131 100644 --- a/syz-cluster/pkg/db/entities.go +++ b/syz-cluster/pkg/db/entities.go @@ -141,3 +141,9 @@ type SessionReport struct { func (s *SessionReport) SetReportedAt(t time.Time) { s.ReportedAt = spanner.NullTime{Time: t, Valid: true} } + +type ReportReply struct { + MessageID string `spanner:"MessageID"` + ReportID string `spanner:"ReportID"` + Time time.Time `spanner:"Time"` +} diff --git a/syz-cluster/pkg/db/migrations/1_initialize.up.sql b/syz-cluster/pkg/db/migrations/1_initialize.up.sql index ca91b343a..3100c9bb2 100644 --- a/syz-cluster/pkg/db/migrations/1_initialize.up.sql +++ b/syz-cluster/pkg/db/migrations/1_initialize.up.sql @@ -100,14 +100,23 @@ CREATE UNIQUE INDEX NoDupFindings ON Findings(SessionID, TestName, Title); -- Session's bug reports. CREATE TABLE SessionReports ( - ID STRING(36) NOT NULL, -- UUID?? + ID STRING(36) NOT NULL, -- UUID SessionID STRING(36) NOT NULL, -- UUID ReportedAt TIMESTAMP, Moderation BOOL, - MessageID STRING(256), + MessageID STRING(512), Reporter STRING(256), CONSTRAINT FK_SessionReports FOREIGN KEY (SessionID) REFERENCES Sessions (ID), ) PRIMARY KEY(ID); CREATE UNIQUE INDEX NoDupSessionReports ON SessionReports(SessionID, Moderation); CREATE INDEX SessionReportsByStatus ON SessionReports (Reporter, ReportedAt); +CREATE INDEX SessionReportsByMessageID ON SessionReports(Reporter, MessageID); + +-- Replies on a session report. +CREATE TABLE ReportReplies ( + MessageID STRING(512) NOT NULL, -- Gmail sets a limit of 500 characters for Message-ID + ReportID STRING(36) NOT NULL, -- UUID + Time TIMESTAMP, + CONSTRAINT FK_ReplyReportID FOREIGN KEY (ReportID) REFERENCES SessionReports (ID), +) PRIMARY KEY(MessageID, ReportID); diff --git a/syz-cluster/pkg/db/report_reply_repo.go b/syz-cluster/pkg/db/report_reply_repo.go new file mode 100644 index 000000000..7e21403c2 --- /dev/null +++ b/syz-cluster/pkg/db/report_reply_repo.go @@ -0,0 +1,92 @@ +// Copyright 2025 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package db + +import ( + "context" + "errors" + + "cloud.google.com/go/spanner" +) + +type ReportReplyRepository struct { + client *spanner.Client +} + +func NewReportReplyRepository(client *spanner.Client) *ReportReplyRepository { + return &ReportReplyRepository{ + client: client, + } +} + +func (repo *ReportReplyRepository) FindParentReportID(ctx context.Context, reporter, messageID string) (string, error) { + stmt := spanner.Statement{ + SQL: "SELECT `ReportReplies`.ReportID FROM `ReportReplies` " + + "JOIN `SessionReports` ON `SessionReports`.ID = `ReportReplies`.ReportID " + + "WHERE `ReportReplies`.MessageID = @messageID " + + "AND `SessionReports`.Reporter = @reporter LIMIT 1", + Params: map[string]interface{}{ + "reporter": reporter, + "messageID": messageID, + }, + } + iter := repo.client.Single().Query(ctx, stmt) + defer iter.Stop() + + type result struct { + ReportID string `spanner:"ReportID"` + } + ret, err := readOne[result](iter) + if err != nil { + return "", err + } else if ret != nil { + return ret.ReportID, nil + } + return "", nil +} + +var ErrReportReplyExists = errors.New("the reply has already been recorded") + +func (repo *ReportReplyRepository) Insert(ctx context.Context, reply *ReportReply) error { + _, err := repo.client.ReadWriteTransaction(ctx, + func(ctx context.Context, txn *spanner.ReadWriteTransaction) error { + stmt := spanner.Statement{ + SQL: "SELECT * from `ReportReplies` " + + "WHERE `ReportID`=@reportID AND `MessageID`=@messageID", + Params: map[string]interface{}{ + "reportID": reply.ReportID, + "messageID": reply.MessageID, + }, + } + iter := txn.Query(ctx, stmt) + entity, err := readOne[ReportReply](iter) + iter.Stop() + if err != nil { + return err + } else if entity != nil { + return ErrReportReplyExists + } + insert, err := spanner.InsertStruct("ReportReplies", reply) + if err != nil { + return err + } + return txn.BufferWrite([]*spanner.Mutation{insert}) + }) + return err +} + +func (repo *ReportReplyRepository) LastForReporter(ctx context.Context, reporter string) (*ReportReply, error) { + stmt := spanner.Statement{ + SQL: "SELECT `ReportReplies`.* FROM `ReportReplies` " + + "JOIN `SessionReports` ON `SessionReports`.ID=`ReportReplies`.ReportID " + + "WHERE `SessionReports`.Reporter=@reporter " + + "ORDER BY `Time` DESC LIMIT 1", + Params: map[string]interface{}{ + "reporter": reporter, + }, + } + iter := repo.client.Single().Query(ctx, stmt) + defer iter.Stop() + return readOne[ReportReply](iter) +} diff --git a/syz-cluster/pkg/db/report_reply_repo_test.go b/syz-cluster/pkg/db/report_reply_repo_test.go new file mode 100644 index 000000000..27c2849cd --- /dev/null +++ b/syz-cluster/pkg/db/report_reply_repo_test.go @@ -0,0 +1,61 @@ +// Copyright 2025 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package db + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestReportReplyRepository(t *testing.T) { + client, ctx := NewTransientDB(t) + dtd := &dummyTestData{t, ctx, client} + session := dtd.dummySession(dtd.dummySeries()) + + reportRepo := NewReportRepository(client) + report := &SessionReport{SessionID: session.ID, Reporter: dummyReporter} + err := reportRepo.Insert(ctx, report) + assert.NoError(t, err) + + replyRepo := NewReportReplyRepository(client) + baseTime := time.Now() + for i := 0; i < 2; i++ { + err = replyRepo.Insert(ctx, &ReportReply{ + MessageID: fmt.Sprintf("message-id-%d", i), + ReportID: report.ID, + Time: baseTime.Add(time.Duration(i) * time.Second), + }) + assert.NoError(t, err) + } + + t.Run("insert-dup-reply", func(t *testing.T) { + err := replyRepo.Insert(ctx, &ReportReply{ + MessageID: "message-id-0", + ReportID: report.ID, + Time: time.Now(), + }) + assert.Error(t, ErrReportReplyExists, err) + }) + + t.Run("last-report", func(t *testing.T) { + reply, err := replyRepo.LastForReporter(ctx, dummyReporter) + assert.NoError(t, err) + assert.Equal(t, "message-id-1", reply.MessageID) + }) + + t.Run("last-report-unknown", func(t *testing.T) { + reply, err := replyRepo.LastForReporter(ctx, "unknown-reporter") + assert.NoError(t, err) + assert.Nil(t, reply) + }) + + t.Run("find-by-parent", func(t *testing.T) { + reportID, err := replyRepo.FindParentReportID(ctx, dummyReporter, "message-id-0") + assert.NoError(t, err) + assert.Equal(t, report.ID, reportID) + }) +} diff --git a/syz-cluster/pkg/db/report_repo.go b/syz-cluster/pkg/db/report_repo.go index 4e039c4a7..37a09a746 100644 --- a/syz-cluster/pkg/db/report_repo.go +++ b/syz-cluster/pkg/db/report_repo.go @@ -44,3 +44,14 @@ func (repo *ReportRepository) ListNotReported(ctx context.Context, reporter stri addLimit(&stmt, limit) return repo.readEntities(ctx, stmt) } + +func (repo *ReportRepository) FindByMessageID(ctx context.Context, reporter, messageID string) (*SessionReport, error) { + stmt := spanner.Statement{ + SQL: "SELECT * FROM `SessionReports` WHERE `Reporter` = @reporter AND `MessageID` = @messageID", + Params: map[string]interface{}{ + "reporter": reporter, + "messageID": messageID, + }, + } + return repo.readEntity(ctx, stmt) +} diff --git a/syz-cluster/pkg/db/report_repo_test.go b/syz-cluster/pkg/db/report_repo_test.go index bc81c37bd..a0cee2a11 100644 --- a/syz-cluster/pkg/db/report_repo_test.go +++ b/syz-cluster/pkg/db/report_repo_test.go @@ -40,16 +40,30 @@ func TestReportRepository(t *testing.T) { assert.NoError(t, err) assert.Len(t, list, 3) + const messageID = "message-id" err = reportRepo.Update(ctx, keys[0], func(rep *SessionReport) error { rep.SetReportedAt(time.Now()) + rep.MessageID = messageID return nil }) assert.NoError(t, err) - // Now one less. - list, err = reportRepo.ListNotReported(ctx, dummyReporter, 10) - assert.NoError(t, err) - assert.Len(t, list, 2) + t.Run("not-reported-count", func(t *testing.T) { + // Now one less. + list, err := reportRepo.ListNotReported(ctx, dummyReporter, 10) + assert.NoError(t, err) + assert.Len(t, list, 2) + }) + t.Run("find-by-id-found", func(t *testing.T) { + report, err := reportRepo.FindByMessageID(ctx, dummyReporter, messageID) + assert.NoError(t, err) + assert.NotNil(t, report) + }) + t.Run("find-by-id-empty", func(t *testing.T) { + report, err := reportRepo.FindByMessageID(ctx, dummyReporter, "non-existing-id") + assert.NoError(t, err) + assert.Nil(t, report) + }) } func TestSessionsWithoutReports(t *testing.T) { diff --git a/syz-cluster/pkg/db/spanner.go b/syz-cluster/pkg/db/spanner.go index d114727b8..8db081c14 100644 --- a/syz-cluster/pkg/db/spanner.go +++ b/syz-cluster/pkg/db/spanner.go @@ -333,3 +333,10 @@ func (g *genericEntityOps[EntityType, KeyType]) readEntities(ctx context.Context defer iter.Stop() return readEntities[EntityType](iter) } + +func (g *genericEntityOps[EntityType, KeyType]) readEntity(ctx context.Context, + stmt spanner.Statement) (*EntityType, error) { + iter := g.client.Single().Query(ctx, stmt) + defer iter.Stop() + return readOne[EntityType](iter) +} diff --git a/syz-cluster/pkg/db/util_test.go b/syz-cluster/pkg/db/util_test.go index 64fedeab2..cd9e1fff7 100644 --- a/syz-cluster/pkg/db/util_test.go +++ b/syz-cluster/pkg/db/util_test.go @@ -31,6 +31,14 @@ func (d *dummyTestData) addSessionTest(session *Session, names ...string) { } } +func (d *dummyTestData) dummySeries() *Series { + seriesRepo := NewSeriesRepository(d.client) + series := &Series{ExtID: "series-ext-id"} + err := seriesRepo.Insert(d.ctx, series, nil) + assert.NoError(d.t, err) + return series +} + func (d *dummyTestData) dummySession(series *Series) *Session { sessionRepo := NewSessionRepository(d.client) session := &Session{ |
