aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksandr Nogikh <nogikh@google.com>2025-06-12 10:34:46 +0200
committerAleksandr Nogikh <nogikh@google.com>2025-06-17 08:59:03 +0000
commit417d98fa2e5e60a0ed623a4e406918c5cacfd500 (patch)
tree88c5ccf815967870da4c81c2afd28ca9c635b65f
parent74c9d2523829e2f377ef7ee2e819a04ead202264 (diff)
syz-cluster: avoid UUIDs in blob store
Make blob store URIs dependent on the IDs explicitly passed into the Write() function. In many cases this removes the need to distinguish between the case when the object has already been saved and we must overwrite it and when it's saved the first time. Keep on first storing the object to the blob storage and only then submitting the entities to Spanner. This will lead to some wasted space, but we'll add garbage collection at some point.
-rw-r--r--syz-cluster/controller/processor.go19
-rw-r--r--syz-cluster/pkg/blob/gcs.go51
-rw-r--r--syz-cluster/pkg/blob/storage.go42
-rw-r--r--syz-cluster/pkg/blob/storage_test.go2
-rw-r--r--syz-cluster/pkg/service/finding.go20
-rw-r--r--syz-cluster/pkg/service/series.go5
-rw-r--r--syz-cluster/pkg/service/session.go4
-rw-r--r--syz-cluster/pkg/service/sessiontest.go23
8 files changed, 59 insertions, 107 deletions
diff --git a/syz-cluster/controller/processor.go b/syz-cluster/controller/processor.go
index 07851a9fa..e634ad5e5 100644
--- a/syz-cluster/controller/processor.go
+++ b/syz-cluster/controller/processor.go
@@ -138,7 +138,7 @@ func (sp *SeriesProcessor) handleSession(ctx context.Context, session *db.Sessio
app.Errorf("failed to query workflow %q status: %v", session.ID, err)
continue
}
- if workflowLog != nil {
+ if len(workflowLog) > 0 {
err := sp.updateSessionLog(ctx, session, workflowLog)
if err != nil {
app.Errorf("failed to update session log: %v", err)
@@ -214,19 +214,12 @@ func (sp *SeriesProcessor) stopRunningTests(ctx context.Context, sessionID strin
}
func (sp *SeriesProcessor) updateSessionLog(ctx context.Context, session *db.Session, log []byte) error {
+ logURI, err := sp.blobStorage.Write(bytes.NewReader(log), "Session", session.ID, "log")
+ if err != nil {
+ return fmt.Errorf("failed to save the log: %w", err)
+ }
return sp.sessionRepo.Update(ctx, session.ID, func(session *db.Session) error {
- if session.LogURI == "" {
- path, err := sp.blobStorage.Store(bytes.NewReader(log))
- if err != nil {
- return fmt.Errorf("failed to save the log: %w", err)
- }
- session.LogURI = path
- } else {
- err := sp.blobStorage.Update(session.LogURI, bytes.NewReader(log))
- if err != nil {
- return fmt.Errorf("failed to update the log %q: %w", session.LogURI, err)
- }
- }
+ session.LogURI = logURI
return nil
})
}
diff --git a/syz-cluster/pkg/blob/gcs.go b/syz-cluster/pkg/blob/gcs.go
index f6f5797d5..7d22ccd57 100644
--- a/syz-cluster/pkg/blob/gcs.go
+++ b/syz-cluster/pkg/blob/gcs.go
@@ -7,10 +7,10 @@ import (
"context"
"fmt"
"io"
+ "path"
"regexp"
"github.com/google/syzkaller/pkg/gcs"
- "github.com/google/uuid"
)
type gcsDriver struct {
@@ -29,54 +29,39 @@ func NewGCSClient(ctx context.Context, bucket string) (Storage, error) {
}, nil
}
-func (gcs *gcsDriver) Store(source io.Reader) (string, error) {
- object := uuid.NewString()
- err := gcs.writeObject(object, source)
+func (gcs *gcsDriver) Write(source io.Reader, parts ...string) (string, error) {
+ if len(parts) == 0 {
+ return "", fmt.Errorf("no identifiers for the object were passed to Write")
+ }
+ object := path.Join(gcs.bucket, path.Join(parts...))
+ w, err := gcs.client.FileWriter(object, "", "")
if err != nil {
return "", err
}
- return gcs.objectURI(object), nil
-}
-
-func (gcs *gcsDriver) Update(uri string, source io.Reader) error {
- object, err := gcs.objectName(uri)
+ defer w.Close()
+ _, err = io.Copy(w, source)
if err != nil {
- return err
+ return "", err
}
- return gcs.writeObject(object, source)
+ return "gcs://" + object, nil
}
func (gcs *gcsDriver) Read(uri string) (io.ReadCloser, error) {
- object, err := gcs.objectName(uri)
+ bucket, object, err := gcs.parseURI(uri)
if err != nil {
return nil, err
}
- return gcs.client.FileReader(fmt.Sprintf("%s/%s", gcs.bucket, object))
+ return gcs.client.FileReader(path.Join(bucket, object))
}
-var gcsObjectRe = regexp.MustCompile(`^gcs://([\w-]+)/([\w-]+)$`)
+var gcsObjectRe = regexp.MustCompile(`^gcs://([\w-]+)/(.+)$`)
-func (gcs *gcsDriver) objectName(uri string) (string, error) {
+func (gcs *gcsDriver) parseURI(uri string) (string, string, error) {
match := gcsObjectRe.FindStringSubmatch(uri)
if len(match) == 0 {
- return "", fmt.Errorf("invalid GCS URI")
+ return "", "", fmt.Errorf("invalid GCS URI")
} else if match[1] != gcs.bucket {
- return "", fmt.Errorf("unexpected GCS bucket")
+ return "", "", fmt.Errorf("unexpected GCS bucket")
}
- return match[2], nil
-}
-
-func (gcs *gcsDriver) objectURI(object string) string {
- return fmt.Sprintf("gcs://%s/%s", gcs.bucket, object)
-}
-
-func (gcs *gcsDriver) writeObject(object string, source io.Reader) error {
- w, err := gcs.client.FileWriter(fmt.Sprintf("%s/%s", gcs.bucket, object), "", "")
- if err != nil {
- return err
- }
- defer w.Close()
-
- _, err = io.Copy(w, source)
- return err
+ return gcs.bucket, match[2], nil
}
diff --git a/syz-cluster/pkg/blob/storage.go b/syz-cluster/pkg/blob/storage.go
index be1d8f493..e3446e7fa 100644
--- a/syz-cluster/pkg/blob/storage.go
+++ b/syz-cluster/pkg/blob/storage.go
@@ -4,21 +4,21 @@
package blob
import (
+ "encoding/base64"
"fmt"
"io"
"os"
"path/filepath"
"strings"
-
- "github.com/google/uuid"
)
// Storage is not assumed to be used for partciularly large objects (e.g. GB of size),
// but rather for blobs that risk overwhelming Spanner column size limits.
type Storage interface {
- // Store returns a URI to use later.
- Store(source io.Reader) (string, error)
- Update(key string, source io.Reader) error
+ // Write stores the object uniquely identified by a set of IDs (parts).
+ // If it already exists, it will be overwritten.
+ // The first argument is the URI which can be used to later retrieve it with Read.
+ Write(source io.Reader, parts ...string) (string, error)
Read(uri string) (io.ReadCloser, error)
}
@@ -36,20 +36,19 @@ func NewLocalStorage(baseFolder string) *LocalStorage {
const localStoragePrefix = "local://"
-func (ls *LocalStorage) Store(source io.Reader) (string, error) {
- name := uuid.NewString()
- err := ls.writeFile(name, source)
+func (ls *LocalStorage) Write(source io.Reader, parts ...string) (string, error) {
+ // A whatever approach that can handle arbitrary inputs.
+ name := base64.StdEncoding.EncodeToString([]byte(filepath.Join(parts...)))
+ file, err := os.Create(filepath.Join(ls.baseFolder, name))
if err != nil {
return "", err
}
- return localStoragePrefix + name, nil
-}
-
-func (ls *LocalStorage) Update(uri string, source io.Reader) error {
- if !strings.HasPrefix(uri, localStoragePrefix) {
- return fmt.Errorf("unsupported URI type")
+ defer file.Close()
+ _, err = io.Copy(file, source)
+ if err != nil {
+ return "", fmt.Errorf("failed to save data: %w", err)
}
- return ls.writeFile(strings.TrimPrefix(uri, localStoragePrefix), source)
+ return localStoragePrefix + name, nil
}
func (ls *LocalStorage) Read(uri string) (io.ReadCloser, error) {
@@ -61,19 +60,6 @@ func (ls *LocalStorage) Read(uri string) (io.ReadCloser, error) {
return os.Open(path)
}
-func (ls *LocalStorage) writeFile(name string, source io.Reader) error {
- file, err := os.Create(filepath.Join(ls.baseFolder, name))
- if err != nil {
- return err
- }
- defer file.Close()
- _, err = io.Copy(file, source)
- if err != nil {
- return fmt.Errorf("failed to save data: %w", err)
- }
- return nil
-}
-
func ReadAllBytes(storage Storage, uri string) ([]byte, error) {
if uri == "" {
return nil, nil
diff --git a/syz-cluster/pkg/blob/storage_test.go b/syz-cluster/pkg/blob/storage_test.go
index 9d2d830ce..51e7887bf 100644
--- a/syz-cluster/pkg/blob/storage_test.go
+++ b/syz-cluster/pkg/blob/storage_test.go
@@ -16,7 +16,7 @@ func TestLocalStorage(t *testing.T) {
var uris []string
for i := 0; i < 2; i++ {
content := fmt.Sprintf("object #%d", i)
- uri, err := storage.Store(bytes.NewReader([]byte(content)))
+ uri, err := storage.Write(bytes.NewReader([]byte(content)), fmt.Sprint(i))
assert.NoError(t, err)
uris = append(uris, uri)
}
diff --git a/syz-cluster/pkg/service/finding.go b/syz-cluster/pkg/service/finding.go
index bb605ba73..b668dc798 100644
--- a/syz-cluster/pkg/service/finding.go
+++ b/syz-cluster/pkg/service/finding.go
@@ -12,6 +12,7 @@ import (
"github.com/google/syzkaller/syz-cluster/pkg/app"
"github.com/google/syzkaller/syz-cluster/pkg/blob"
"github.com/google/syzkaller/syz-cluster/pkg/db"
+ "github.com/google/uuid"
)
type FindingService struct {
@@ -27,28 +28,27 @@ func NewFindingService(env *app.AppEnvironment) *FindingService {
}
func (s *FindingService) Save(ctx context.Context, req *api.NewFinding) error {
- var reportURI, logURI string
+ finding := &db.Finding{
+ ID: uuid.NewString(),
+ SessionID: req.SessionID,
+ TestName: req.TestName,
+ Title: req.Title,
+ }
var err error
if len(req.Log) > 0 {
- logURI, err = s.blobStorage.Store(bytes.NewReader(req.Log))
+ finding.LogURI, err = s.blobStorage.Write(bytes.NewReader(req.Log), "Finding", finding.ID, "log")
if err != nil {
return fmt.Errorf("failed to save the log: %w", err)
}
}
if len(req.Report) > 0 {
- reportURI, err = s.blobStorage.Store(bytes.NewReader(req.Report))
+ finding.ReportURI, err = s.blobStorage.Write(bytes.NewReader(req.Report), "Finding", finding.ID, "report")
if err != nil {
return fmt.Errorf("failed to save the report: %w", err)
}
}
// TODO: if it's not actually addded, the blob records will be orphaned.
- err = s.findingRepo.Save(ctx, &db.Finding{
- SessionID: req.SessionID,
- TestName: req.TestName,
- Title: req.Title,
- ReportURI: reportURI,
- LogURI: logURI,
- })
+ err = s.findingRepo.Save(ctx, finding)
if err == db.ErrFindingExists {
// It's ok, just ignore.
return nil
diff --git a/syz-cluster/pkg/service/series.go b/syz-cluster/pkg/service/series.go
index 8ab8b5cef..7628ec48b 100644
--- a/syz-cluster/pkg/service/series.go
+++ b/syz-cluster/pkg/service/series.go
@@ -13,6 +13,7 @@ import (
"github.com/google/syzkaller/syz-cluster/pkg/app"
"github.com/google/syzkaller/syz-cluster/pkg/blob"
"github.com/google/syzkaller/syz-cluster/pkg/db"
+ "github.com/google/uuid"
)
// SeriesService is tested in controller/.
@@ -53,6 +54,7 @@ func (s *SeriesService) getSessionSeries(ctx context.Context, sessionID string,
func (s *SeriesService) UploadSeries(ctx context.Context, series *api.Series) (*api.UploadSeriesResp, error) {
seriesObj := &db.Series{
+ ID: uuid.NewString(),
ExtID: series.ExtID,
AuthorEmail: series.AuthorEmail,
Title: series.Title,
@@ -66,7 +68,8 @@ func (s *SeriesService) UploadSeries(ctx context.Context, series *api.Series) (*
for _, patch := range series.Patches {
// In case of errors, we will waste some space, but let's ignore it for simplicity.
// Patches are not super big.
- uri, err := s.blobStorage.Store(bytes.NewReader(patch.Body))
+ uri, err := s.blobStorage.Write(bytes.NewReader(patch.Body),
+ "Series", seriesObj.ID, "Patches", fmt.Sprint(patch.Seq))
if err != nil {
return nil, fmt.Errorf("failed to upload patch body: %w", err)
}
diff --git a/syz-cluster/pkg/service/session.go b/syz-cluster/pkg/service/session.go
index 9ad0688fd..446b431f4 100644
--- a/syz-cluster/pkg/service/session.go
+++ b/syz-cluster/pkg/service/session.go
@@ -36,9 +36,9 @@ func (s *SessionService) SkipSession(ctx context.Context, sessionID string, skip
var triageLogURI string
if len(skip.TriageLog) > 0 {
var err error
- triageLogURI, err = s.blobStorage.Store(bytes.NewReader(skip.TriageLog))
+ triageLogURI, err = s.blobStorage.Write(bytes.NewReader(skip.TriageLog), "Session", sessionID, "triage_log")
if err != nil {
- return fmt.Errorf("failed to save the log: %w", err)
+ return fmt.Errorf("failed to save the triage log: %w", err)
}
}
err := s.sessionRepo.Update(ctx, sessionID, func(session *db.Session) error {
diff --git a/syz-cluster/pkg/service/sessiontest.go b/syz-cluster/pkg/service/sessiontest.go
index be410fdb7..2fa4d8701 100644
--- a/syz-cluster/pkg/service/sessiontest.go
+++ b/syz-cluster/pkg/service/sessiontest.go
@@ -41,7 +41,8 @@ func (s *SessionTestService) Save(ctx context.Context, req *api.TestResult) erro
}
logURI := entity.LogURI
if len(req.Log) > 0 {
- logURI, err = s.uploadOrUpdate(ctx, logURI, bytes.NewReader(req.Log))
+ logURI, err = s.blobStorage.Write(bytes.NewReader(req.Log),
+ "Session", req.SessionID, "Test", req.TestName, "log")
if err != nil {
return fmt.Errorf("failed to save the log: %w", err)
}
@@ -66,27 +67,11 @@ func (s *SessionTestService) SaveArtifacts(ctx context.Context, sessionID, testN
} else if entity == nil {
return fmt.Errorf("the test has not been submitted yet")
}
- newArchiveURI, err := s.uploadOrUpdate(ctx, entity.ArtifactsArchiveURI, reader)
+ archiveURI, err := s.blobStorage.Write(reader, "Session", sessionID, "Test", testName, "artifacts")
if err != nil {
return fmt.Errorf("failed to save the artifacts archive: %w", err)
}
return s.testRepo.InsertOrUpdate(ctx, entity, func(test *db.SessionTest) {
- test.ArtifactsArchiveURI = newArchiveURI
+ test.ArtifactsArchiveURI = archiveURI
})
}
-
-func (s *SessionTestService) uploadOrUpdate(ctx context.Context, uri string, reader io.Reader) (string, error) {
- if uri != "" {
- err := s.blobStorage.Update(uri, reader)
- if err != nil {
- return "", fmt.Errorf("failed to update: %w", err)
- }
- return uri, nil
- }
- // TODO: it will leak if we fail to save the entity.
- uri, err := s.blobStorage.Store(reader)
- if err != nil {
- return "", fmt.Errorf("failed to save: %w", err)
- }
- return uri, nil
-}