aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksandr Nogikh <nogikh@google.com>2025-04-16 17:16:44 +0200
committerAleksandr Nogikh <nogikh@google.com>2025-04-17 07:57:04 +0000
commit94b0f8be5c512cccc2d1dbb405dab561b4d23940 (patch)
treeeaa8113ba775e3dbe7265dde77ba1653057a1285
parent552876f8f8f64b90e59c3ecc76806b04f4a30b1c (diff)
syz-cluster: enable atomic updates for SessionTestRepo
A raw InsertOrUpdate method is not very reliable in case of concurrent update requests. Add a callback inside which the modified fields would be set. Refactor the existing code that used to call the old method.
-rw-r--r--syz-cluster/pkg/db/report_repo_test.go2
-rw-r--r--syz-cluster/pkg/db/session_test_repo.go10
-rw-r--r--syz-cluster/pkg/db/session_test_repo_test.go2
-rw-r--r--syz-cluster/pkg/db/util_test.go2
-rw-r--r--syz-cluster/pkg/service/sessiontest.go30
5 files changed, 28 insertions, 18 deletions
diff --git a/syz-cluster/pkg/db/report_repo_test.go b/syz-cluster/pkg/db/report_repo_test.go
index 9648a7bc2..ee05a2e18 100644
--- a/syz-cluster/pkg/db/report_repo_test.go
+++ b/syz-cluster/pkg/db/report_repo_test.go
@@ -74,7 +74,7 @@ func TestSessionsWithoutReports(t *testing.T) {
SessionID: session.ID,
TestName: "test",
Result: api.TestPassed,
- })
+ }, nil)
assert.NoError(t, err)
err = findingRepo.Save(ctx, &Finding{
SessionID: session.ID,
diff --git a/syz-cluster/pkg/db/session_test_repo.go b/syz-cluster/pkg/db/session_test_repo.go
index 43ada137c..e5b923fd4 100644
--- a/syz-cluster/pkg/db/session_test_repo.go
+++ b/syz-cluster/pkg/db/session_test_repo.go
@@ -20,7 +20,9 @@ func NewSessionTestRepository(client *spanner.Client) *SessionTestRepository {
}
}
-func (repo *SessionTestRepository) InsertOrUpdate(ctx context.Context, test *SessionTest) error {
+// If the beforeSave callback is specified, it will be called before saving the entity.
+func (repo *SessionTestRepository) InsertOrUpdate(ctx context.Context, test *SessionTest,
+ beforeSave func(*SessionTest)) error {
_, err := repo.client.ReadWriteTransaction(ctx,
func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
// Check if the test already exists.
@@ -38,6 +40,9 @@ func (repo *SessionTestRepository) InsertOrUpdate(ctx context.Context, test *Ses
_, iterErr := iter.Next()
if iterErr == nil {
+ if beforeSave != nil {
+ beforeSave(test)
+ }
m, err := spanner.UpdateStruct("SessionTests", test)
if err != nil {
return err
@@ -46,6 +51,9 @@ func (repo *SessionTestRepository) InsertOrUpdate(ctx context.Context, test *Ses
} else if iterErr != iterator.Done {
return iterErr
} else {
+ if beforeSave != nil {
+ beforeSave(test)
+ }
m, err := spanner.InsertStruct("SessionTests", test)
if err != nil {
return err
diff --git a/syz-cluster/pkg/db/session_test_repo_test.go b/syz-cluster/pkg/db/session_test_repo_test.go
index f18349339..49eb2f3b4 100644
--- a/syz-cluster/pkg/db/session_test_repo_test.go
+++ b/syz-cluster/pkg/db/session_test_repo_test.go
@@ -43,7 +43,7 @@ func TestSessionTestRepository(t *testing.T) {
PatchedBuildID: spanner.NullString{StringVal: build2.ID, Valid: true},
Result: api.TestPassed,
}
- err = testsRepo.InsertOrUpdate(ctx, test)
+ err = testsRepo.InsertOrUpdate(ctx, test, nil)
assert.NoError(t, err)
}
diff --git a/syz-cluster/pkg/db/util_test.go b/syz-cluster/pkg/db/util_test.go
index f33f1e221..64fedeab2 100644
--- a/syz-cluster/pkg/db/util_test.go
+++ b/syz-cluster/pkg/db/util_test.go
@@ -26,7 +26,7 @@ func (d *dummyTestData) addSessionTest(session *Session, names ...string) {
SessionID: session.ID,
TestName: name,
Result: api.TestPassed,
- })
+ }, nil)
assert.NoError(d.t, err)
}
}
diff --git a/syz-cluster/pkg/service/sessiontest.go b/syz-cluster/pkg/service/sessiontest.go
index a59126860..be410fdb7 100644
--- a/syz-cluster/pkg/service/sessiontest.go
+++ b/syz-cluster/pkg/service/sessiontest.go
@@ -39,25 +39,26 @@ func (s *SessionTestService) Save(ctx context.Context, req *api.TestResult) erro
TestName: req.TestName,
}
}
- entity.Result = req.Result
- entity.UpdatedAt = time.Now()
- if req.BaseBuildID != "" {
- entity.BaseBuildID = spanner.NullString{StringVal: req.BaseBuildID, Valid: true}
- }
- if req.PatchedBuildID != "" {
- entity.PatchedBuildID = spanner.NullString{StringVal: req.PatchedBuildID, Valid: true}
- }
- // TODO: the code does not really handle simultaneous requests.
+ logURI := entity.LogURI
if len(req.Log) > 0 {
- entity.LogURI, err = s.uploadOrUpdate(ctx, entity.LogURI, bytes.NewReader(req.Log))
+ logURI, err = s.uploadOrUpdate(ctx, logURI, bytes.NewReader(req.Log))
if err != nil {
return fmt.Errorf("failed to save the log: %w", err)
}
}
- return s.testRepo.InsertOrUpdate(ctx, entity)
+ return s.testRepo.InsertOrUpdate(ctx, entity, func(test *db.SessionTest) {
+ test.Result = req.Result
+ test.UpdatedAt = time.Now()
+ test.LogURI = logURI
+ if req.BaseBuildID != "" {
+ test.BaseBuildID = spanner.NullString{StringVal: req.BaseBuildID, Valid: true}
+ }
+ if req.PatchedBuildID != "" {
+ test.PatchedBuildID = spanner.NullString{StringVal: req.PatchedBuildID, Valid: true}
+ }
+ })
}
-// TODO: this function has the same problems as Save().
func (s *SessionTestService) SaveArtifacts(ctx context.Context, sessionID, testName string, reader io.Reader) error {
entity, err := s.testRepo.Get(ctx, sessionID, testName)
if err != nil {
@@ -69,8 +70,9 @@ func (s *SessionTestService) SaveArtifacts(ctx context.Context, sessionID, testN
if err != nil {
return fmt.Errorf("failed to save the artifacts archive: %w", err)
}
- entity.ArtifactsArchiveURI = newArchiveURI
- return s.testRepo.InsertOrUpdate(ctx, entity)
+ return s.testRepo.InsertOrUpdate(ctx, entity, func(test *db.SessionTest) {
+ test.ArtifactsArchiveURI = newArchiveURI
+ })
}
func (s *SessionTestService) uploadOrUpdate(ctx context.Context, uri string, reader io.Reader) (string, error) {