diff options
| author | Aleksandr Nogikh <nogikh@google.com> | 2025-04-16 17:16:44 +0200 |
|---|---|---|
| committer | Aleksandr Nogikh <nogikh@google.com> | 2025-04-17 07:57:04 +0000 |
| commit | 94b0f8be5c512cccc2d1dbb405dab561b4d23940 (patch) | |
| tree | eaa8113ba775e3dbe7265dde77ba1653057a1285 | |
| parent | 552876f8f8f64b90e59c3ecc76806b04f4a30b1c (diff) | |
syz-cluster: enable atomic updates for SessionTestRepo
A raw InsertOrUpdate method is not very reliable in case of concurrent
update requests. Add a callback inside which the modified fields would
be set.
Refactor the existing code that used to call the old method.
| -rw-r--r-- | syz-cluster/pkg/db/report_repo_test.go | 2 | ||||
| -rw-r--r-- | syz-cluster/pkg/db/session_test_repo.go | 10 | ||||
| -rw-r--r-- | syz-cluster/pkg/db/session_test_repo_test.go | 2 | ||||
| -rw-r--r-- | syz-cluster/pkg/db/util_test.go | 2 | ||||
| -rw-r--r-- | syz-cluster/pkg/service/sessiontest.go | 30 |
5 files changed, 28 insertions, 18 deletions
diff --git a/syz-cluster/pkg/db/report_repo_test.go b/syz-cluster/pkg/db/report_repo_test.go index 9648a7bc2..ee05a2e18 100644 --- a/syz-cluster/pkg/db/report_repo_test.go +++ b/syz-cluster/pkg/db/report_repo_test.go @@ -74,7 +74,7 @@ func TestSessionsWithoutReports(t *testing.T) { SessionID: session.ID, TestName: "test", Result: api.TestPassed, - }) + }, nil) assert.NoError(t, err) err = findingRepo.Save(ctx, &Finding{ SessionID: session.ID, diff --git a/syz-cluster/pkg/db/session_test_repo.go b/syz-cluster/pkg/db/session_test_repo.go index 43ada137c..e5b923fd4 100644 --- a/syz-cluster/pkg/db/session_test_repo.go +++ b/syz-cluster/pkg/db/session_test_repo.go @@ -20,7 +20,9 @@ func NewSessionTestRepository(client *spanner.Client) *SessionTestRepository { } } -func (repo *SessionTestRepository) InsertOrUpdate(ctx context.Context, test *SessionTest) error { +// If the beforeSave callback is specified, it will be called before saving the entity. +func (repo *SessionTestRepository) InsertOrUpdate(ctx context.Context, test *SessionTest, + beforeSave func(*SessionTest)) error { _, err := repo.client.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error { // Check if the test already exists. @@ -38,6 +40,9 @@ func (repo *SessionTestRepository) InsertOrUpdate(ctx context.Context, test *Ses _, iterErr := iter.Next() if iterErr == nil { + if beforeSave != nil { + beforeSave(test) + } m, err := spanner.UpdateStruct("SessionTests", test) if err != nil { return err @@ -46,6 +51,9 @@ func (repo *SessionTestRepository) InsertOrUpdate(ctx context.Context, test *Ses } else if iterErr != iterator.Done { return iterErr } else { + if beforeSave != nil { + beforeSave(test) + } m, err := spanner.InsertStruct("SessionTests", test) if err != nil { return err diff --git a/syz-cluster/pkg/db/session_test_repo_test.go b/syz-cluster/pkg/db/session_test_repo_test.go index f18349339..49eb2f3b4 100644 --- a/syz-cluster/pkg/db/session_test_repo_test.go +++ b/syz-cluster/pkg/db/session_test_repo_test.go @@ -43,7 +43,7 @@ func TestSessionTestRepository(t *testing.T) { PatchedBuildID: spanner.NullString{StringVal: build2.ID, Valid: true}, Result: api.TestPassed, } - err = testsRepo.InsertOrUpdate(ctx, test) + err = testsRepo.InsertOrUpdate(ctx, test, nil) assert.NoError(t, err) } diff --git a/syz-cluster/pkg/db/util_test.go b/syz-cluster/pkg/db/util_test.go index f33f1e221..64fedeab2 100644 --- a/syz-cluster/pkg/db/util_test.go +++ b/syz-cluster/pkg/db/util_test.go @@ -26,7 +26,7 @@ func (d *dummyTestData) addSessionTest(session *Session, names ...string) { SessionID: session.ID, TestName: name, Result: api.TestPassed, - }) + }, nil) assert.NoError(d.t, err) } } diff --git a/syz-cluster/pkg/service/sessiontest.go b/syz-cluster/pkg/service/sessiontest.go index a59126860..be410fdb7 100644 --- a/syz-cluster/pkg/service/sessiontest.go +++ b/syz-cluster/pkg/service/sessiontest.go @@ -39,25 +39,26 @@ func (s *SessionTestService) Save(ctx context.Context, req *api.TestResult) erro TestName: req.TestName, } } - entity.Result = req.Result - entity.UpdatedAt = time.Now() - if req.BaseBuildID != "" { - entity.BaseBuildID = spanner.NullString{StringVal: req.BaseBuildID, Valid: true} - } - if req.PatchedBuildID != "" { - entity.PatchedBuildID = spanner.NullString{StringVal: req.PatchedBuildID, Valid: true} - } - // TODO: the code does not really handle simultaneous requests. + logURI := entity.LogURI if len(req.Log) > 0 { - entity.LogURI, err = s.uploadOrUpdate(ctx, entity.LogURI, bytes.NewReader(req.Log)) + logURI, err = s.uploadOrUpdate(ctx, logURI, bytes.NewReader(req.Log)) if err != nil { return fmt.Errorf("failed to save the log: %w", err) } } - return s.testRepo.InsertOrUpdate(ctx, entity) + return s.testRepo.InsertOrUpdate(ctx, entity, func(test *db.SessionTest) { + test.Result = req.Result + test.UpdatedAt = time.Now() + test.LogURI = logURI + if req.BaseBuildID != "" { + test.BaseBuildID = spanner.NullString{StringVal: req.BaseBuildID, Valid: true} + } + if req.PatchedBuildID != "" { + test.PatchedBuildID = spanner.NullString{StringVal: req.PatchedBuildID, Valid: true} + } + }) } -// TODO: this function has the same problems as Save(). func (s *SessionTestService) SaveArtifacts(ctx context.Context, sessionID, testName string, reader io.Reader) error { entity, err := s.testRepo.Get(ctx, sessionID, testName) if err != nil { @@ -69,8 +70,9 @@ func (s *SessionTestService) SaveArtifacts(ctx context.Context, sessionID, testN if err != nil { return fmt.Errorf("failed to save the artifacts archive: %w", err) } - entity.ArtifactsArchiveURI = newArchiveURI - return s.testRepo.InsertOrUpdate(ctx, entity) + return s.testRepo.InsertOrUpdate(ctx, entity, func(test *db.SessionTest) { + test.ArtifactsArchiveURI = newArchiveURI + }) } func (s *SessionTestService) uploadOrUpdate(ctx context.Context, uri string, reader io.Reader) (string, error) { |
