diff options
| author | Aleksandr Nogikh <nogikh@google.com> | 2025-06-12 10:34:46 +0200 |
|---|---|---|
| committer | Aleksandr Nogikh <nogikh@google.com> | 2025-06-17 08:59:03 +0000 |
| commit | 417d98fa2e5e60a0ed623a4e406918c5cacfd500 (patch) | |
| tree | 88c5ccf815967870da4c81c2afd28ca9c635b65f /syz-cluster/pkg | |
| parent | 74c9d2523829e2f377ef7ee2e819a04ead202264 (diff) | |
syz-cluster: avoid UUIDs in blob store
Make blob store URIs dependent on the IDs explicitly passed into the
Write() function. In many cases this removes the need to distinguish
between the case when the object has already been saved and we must
overwrite it and when it's saved the first time.
Keep on first storing the object to the blob storage and only then
submitting the entities to Spanner. This will lead to some wasted space,
but we'll add garbage collection at some point.
Diffstat (limited to 'syz-cluster/pkg')
| -rw-r--r-- | syz-cluster/pkg/blob/gcs.go | 51 | ||||
| -rw-r--r-- | syz-cluster/pkg/blob/storage.go | 42 | ||||
| -rw-r--r-- | syz-cluster/pkg/blob/storage_test.go | 2 | ||||
| -rw-r--r-- | syz-cluster/pkg/service/finding.go | 20 | ||||
| -rw-r--r-- | syz-cluster/pkg/service/series.go | 5 | ||||
| -rw-r--r-- | syz-cluster/pkg/service/session.go | 4 | ||||
| -rw-r--r-- | syz-cluster/pkg/service/sessiontest.go | 23 |
7 files changed, 53 insertions, 94 deletions
diff --git a/syz-cluster/pkg/blob/gcs.go b/syz-cluster/pkg/blob/gcs.go index f6f5797d5..7d22ccd57 100644 --- a/syz-cluster/pkg/blob/gcs.go +++ b/syz-cluster/pkg/blob/gcs.go @@ -7,10 +7,10 @@ import ( "context" "fmt" "io" + "path" "regexp" "github.com/google/syzkaller/pkg/gcs" - "github.com/google/uuid" ) type gcsDriver struct { @@ -29,54 +29,39 @@ func NewGCSClient(ctx context.Context, bucket string) (Storage, error) { }, nil } -func (gcs *gcsDriver) Store(source io.Reader) (string, error) { - object := uuid.NewString() - err := gcs.writeObject(object, source) +func (gcs *gcsDriver) Write(source io.Reader, parts ...string) (string, error) { + if len(parts) == 0 { + return "", fmt.Errorf("no identifiers for the object were passed to Write") + } + object := path.Join(gcs.bucket, path.Join(parts...)) + w, err := gcs.client.FileWriter(object, "", "") if err != nil { return "", err } - return gcs.objectURI(object), nil -} - -func (gcs *gcsDriver) Update(uri string, source io.Reader) error { - object, err := gcs.objectName(uri) + defer w.Close() + _, err = io.Copy(w, source) if err != nil { - return err + return "", err } - return gcs.writeObject(object, source) + return "gcs://" + object, nil } func (gcs *gcsDriver) Read(uri string) (io.ReadCloser, error) { - object, err := gcs.objectName(uri) + bucket, object, err := gcs.parseURI(uri) if err != nil { return nil, err } - return gcs.client.FileReader(fmt.Sprintf("%s/%s", gcs.bucket, object)) + return gcs.client.FileReader(path.Join(bucket, object)) } -var gcsObjectRe = regexp.MustCompile(`^gcs://([\w-]+)/([\w-]+)$`) +var gcsObjectRe = regexp.MustCompile(`^gcs://([\w-]+)/(.+)$`) -func (gcs *gcsDriver) objectName(uri string) (string, error) { +func (gcs *gcsDriver) parseURI(uri string) (string, string, error) { match := gcsObjectRe.FindStringSubmatch(uri) if len(match) == 0 { - return "", fmt.Errorf("invalid GCS URI") + return "", "", fmt.Errorf("invalid GCS URI") } else if match[1] != gcs.bucket { - return "", fmt.Errorf("unexpected GCS bucket") + return "", "", fmt.Errorf("unexpected GCS bucket") } - return match[2], nil -} - -func (gcs *gcsDriver) objectURI(object string) string { - return fmt.Sprintf("gcs://%s/%s", gcs.bucket, object) -} - -func (gcs *gcsDriver) writeObject(object string, source io.Reader) error { - w, err := gcs.client.FileWriter(fmt.Sprintf("%s/%s", gcs.bucket, object), "", "") - if err != nil { - return err - } - defer w.Close() - - _, err = io.Copy(w, source) - return err + return gcs.bucket, match[2], nil } diff --git a/syz-cluster/pkg/blob/storage.go b/syz-cluster/pkg/blob/storage.go index be1d8f493..e3446e7fa 100644 --- a/syz-cluster/pkg/blob/storage.go +++ b/syz-cluster/pkg/blob/storage.go @@ -4,21 +4,21 @@ package blob import ( + "encoding/base64" "fmt" "io" "os" "path/filepath" "strings" - - "github.com/google/uuid" ) // Storage is not assumed to be used for partciularly large objects (e.g. GB of size), // but rather for blobs that risk overwhelming Spanner column size limits. type Storage interface { - // Store returns a URI to use later. - Store(source io.Reader) (string, error) - Update(key string, source io.Reader) error + // Write stores the object uniquely identified by a set of IDs (parts). + // If it already exists, it will be overwritten. + // The first argument is the URI which can be used to later retrieve it with Read. + Write(source io.Reader, parts ...string) (string, error) Read(uri string) (io.ReadCloser, error) } @@ -36,20 +36,19 @@ func NewLocalStorage(baseFolder string) *LocalStorage { const localStoragePrefix = "local://" -func (ls *LocalStorage) Store(source io.Reader) (string, error) { - name := uuid.NewString() - err := ls.writeFile(name, source) +func (ls *LocalStorage) Write(source io.Reader, parts ...string) (string, error) { + // A whatever approach that can handle arbitrary inputs. + name := base64.StdEncoding.EncodeToString([]byte(filepath.Join(parts...))) + file, err := os.Create(filepath.Join(ls.baseFolder, name)) if err != nil { return "", err } - return localStoragePrefix + name, nil -} - -func (ls *LocalStorage) Update(uri string, source io.Reader) error { - if !strings.HasPrefix(uri, localStoragePrefix) { - return fmt.Errorf("unsupported URI type") + defer file.Close() + _, err = io.Copy(file, source) + if err != nil { + return "", fmt.Errorf("failed to save data: %w", err) } - return ls.writeFile(strings.TrimPrefix(uri, localStoragePrefix), source) + return localStoragePrefix + name, nil } func (ls *LocalStorage) Read(uri string) (io.ReadCloser, error) { @@ -61,19 +60,6 @@ func (ls *LocalStorage) Read(uri string) (io.ReadCloser, error) { return os.Open(path) } -func (ls *LocalStorage) writeFile(name string, source io.Reader) error { - file, err := os.Create(filepath.Join(ls.baseFolder, name)) - if err != nil { - return err - } - defer file.Close() - _, err = io.Copy(file, source) - if err != nil { - return fmt.Errorf("failed to save data: %w", err) - } - return nil -} - func ReadAllBytes(storage Storage, uri string) ([]byte, error) { if uri == "" { return nil, nil diff --git a/syz-cluster/pkg/blob/storage_test.go b/syz-cluster/pkg/blob/storage_test.go index 9d2d830ce..51e7887bf 100644 --- a/syz-cluster/pkg/blob/storage_test.go +++ b/syz-cluster/pkg/blob/storage_test.go @@ -16,7 +16,7 @@ func TestLocalStorage(t *testing.T) { var uris []string for i := 0; i < 2; i++ { content := fmt.Sprintf("object #%d", i) - uri, err := storage.Store(bytes.NewReader([]byte(content))) + uri, err := storage.Write(bytes.NewReader([]byte(content)), fmt.Sprint(i)) assert.NoError(t, err) uris = append(uris, uri) } diff --git a/syz-cluster/pkg/service/finding.go b/syz-cluster/pkg/service/finding.go index bb605ba73..b668dc798 100644 --- a/syz-cluster/pkg/service/finding.go +++ b/syz-cluster/pkg/service/finding.go @@ -12,6 +12,7 @@ import ( "github.com/google/syzkaller/syz-cluster/pkg/app" "github.com/google/syzkaller/syz-cluster/pkg/blob" "github.com/google/syzkaller/syz-cluster/pkg/db" + "github.com/google/uuid" ) type FindingService struct { @@ -27,28 +28,27 @@ func NewFindingService(env *app.AppEnvironment) *FindingService { } func (s *FindingService) Save(ctx context.Context, req *api.NewFinding) error { - var reportURI, logURI string + finding := &db.Finding{ + ID: uuid.NewString(), + SessionID: req.SessionID, + TestName: req.TestName, + Title: req.Title, + } var err error if len(req.Log) > 0 { - logURI, err = s.blobStorage.Store(bytes.NewReader(req.Log)) + finding.LogURI, err = s.blobStorage.Write(bytes.NewReader(req.Log), "Finding", finding.ID, "log") if err != nil { return fmt.Errorf("failed to save the log: %w", err) } } if len(req.Report) > 0 { - reportURI, err = s.blobStorage.Store(bytes.NewReader(req.Report)) + finding.ReportURI, err = s.blobStorage.Write(bytes.NewReader(req.Report), "Finding", finding.ID, "report") if err != nil { return fmt.Errorf("failed to save the report: %w", err) } } // TODO: if it's not actually addded, the blob records will be orphaned. - err = s.findingRepo.Save(ctx, &db.Finding{ - SessionID: req.SessionID, - TestName: req.TestName, - Title: req.Title, - ReportURI: reportURI, - LogURI: logURI, - }) + err = s.findingRepo.Save(ctx, finding) if err == db.ErrFindingExists { // It's ok, just ignore. return nil diff --git a/syz-cluster/pkg/service/series.go b/syz-cluster/pkg/service/series.go index 8ab8b5cef..7628ec48b 100644 --- a/syz-cluster/pkg/service/series.go +++ b/syz-cluster/pkg/service/series.go @@ -13,6 +13,7 @@ import ( "github.com/google/syzkaller/syz-cluster/pkg/app" "github.com/google/syzkaller/syz-cluster/pkg/blob" "github.com/google/syzkaller/syz-cluster/pkg/db" + "github.com/google/uuid" ) // SeriesService is tested in controller/. @@ -53,6 +54,7 @@ func (s *SeriesService) getSessionSeries(ctx context.Context, sessionID string, func (s *SeriesService) UploadSeries(ctx context.Context, series *api.Series) (*api.UploadSeriesResp, error) { seriesObj := &db.Series{ + ID: uuid.NewString(), ExtID: series.ExtID, AuthorEmail: series.AuthorEmail, Title: series.Title, @@ -66,7 +68,8 @@ func (s *SeriesService) UploadSeries(ctx context.Context, series *api.Series) (* for _, patch := range series.Patches { // In case of errors, we will waste some space, but let's ignore it for simplicity. // Patches are not super big. - uri, err := s.blobStorage.Store(bytes.NewReader(patch.Body)) + uri, err := s.blobStorage.Write(bytes.NewReader(patch.Body), + "Series", seriesObj.ID, "Patches", fmt.Sprint(patch.Seq)) if err != nil { return nil, fmt.Errorf("failed to upload patch body: %w", err) } diff --git a/syz-cluster/pkg/service/session.go b/syz-cluster/pkg/service/session.go index 9ad0688fd..446b431f4 100644 --- a/syz-cluster/pkg/service/session.go +++ b/syz-cluster/pkg/service/session.go @@ -36,9 +36,9 @@ func (s *SessionService) SkipSession(ctx context.Context, sessionID string, skip var triageLogURI string if len(skip.TriageLog) > 0 { var err error - triageLogURI, err = s.blobStorage.Store(bytes.NewReader(skip.TriageLog)) + triageLogURI, err = s.blobStorage.Write(bytes.NewReader(skip.TriageLog), "Session", sessionID, "triage_log") if err != nil { - return fmt.Errorf("failed to save the log: %w", err) + return fmt.Errorf("failed to save the triage log: %w", err) } } err := s.sessionRepo.Update(ctx, sessionID, func(session *db.Session) error { diff --git a/syz-cluster/pkg/service/sessiontest.go b/syz-cluster/pkg/service/sessiontest.go index be410fdb7..2fa4d8701 100644 --- a/syz-cluster/pkg/service/sessiontest.go +++ b/syz-cluster/pkg/service/sessiontest.go @@ -41,7 +41,8 @@ func (s *SessionTestService) Save(ctx context.Context, req *api.TestResult) erro } logURI := entity.LogURI if len(req.Log) > 0 { - logURI, err = s.uploadOrUpdate(ctx, logURI, bytes.NewReader(req.Log)) + logURI, err = s.blobStorage.Write(bytes.NewReader(req.Log), + "Session", req.SessionID, "Test", req.TestName, "log") if err != nil { return fmt.Errorf("failed to save the log: %w", err) } @@ -66,27 +67,11 @@ func (s *SessionTestService) SaveArtifacts(ctx context.Context, sessionID, testN } else if entity == nil { return fmt.Errorf("the test has not been submitted yet") } - newArchiveURI, err := s.uploadOrUpdate(ctx, entity.ArtifactsArchiveURI, reader) + archiveURI, err := s.blobStorage.Write(reader, "Session", sessionID, "Test", testName, "artifacts") if err != nil { return fmt.Errorf("failed to save the artifacts archive: %w", err) } return s.testRepo.InsertOrUpdate(ctx, entity, func(test *db.SessionTest) { - test.ArtifactsArchiveURI = newArchiveURI + test.ArtifactsArchiveURI = archiveURI }) } - -func (s *SessionTestService) uploadOrUpdate(ctx context.Context, uri string, reader io.Reader) (string, error) { - if uri != "" { - err := s.blobStorage.Update(uri, reader) - if err != nil { - return "", fmt.Errorf("failed to update: %w", err) - } - return uri, nil - } - // TODO: it will leak if we fail to save the entity. - uri, err := s.blobStorage.Store(reader) - if err != nil { - return "", fmt.Errorf("failed to save: %w", err) - } - return uri, nil -} |
