diff options
| author | Aleksandr Nogikh <nogikh@google.com> | 2025-06-12 10:34:46 +0200 |
|---|---|---|
| committer | Aleksandr Nogikh <nogikh@google.com> | 2025-06-17 08:59:03 +0000 |
| commit | 417d98fa2e5e60a0ed623a4e406918c5cacfd500 (patch) | |
| tree | 88c5ccf815967870da4c81c2afd28ca9c635b65f /syz-cluster/pkg/blob | |
| parent | 74c9d2523829e2f377ef7ee2e819a04ead202264 (diff) | |
syz-cluster: avoid UUIDs in blob store
Make blob store URIs dependent on the IDs explicitly passed into the
Write() function. In many cases this removes the need to distinguish
between the case when the object has already been saved and we must
overwrite it and when it's saved the first time.
Keep on first storing the object to the blob storage and only then
submitting the entities to Spanner. This will lead to some wasted space,
but we'll add garbage collection at some point.
Diffstat (limited to 'syz-cluster/pkg/blob')
| -rw-r--r-- | syz-cluster/pkg/blob/gcs.go | 51 | ||||
| -rw-r--r-- | syz-cluster/pkg/blob/storage.go | 42 | ||||
| -rw-r--r-- | syz-cluster/pkg/blob/storage_test.go | 2 |
3 files changed, 33 insertions, 62 deletions
diff --git a/syz-cluster/pkg/blob/gcs.go b/syz-cluster/pkg/blob/gcs.go index f6f5797d5..7d22ccd57 100644 --- a/syz-cluster/pkg/blob/gcs.go +++ b/syz-cluster/pkg/blob/gcs.go @@ -7,10 +7,10 @@ import ( "context" "fmt" "io" + "path" "regexp" "github.com/google/syzkaller/pkg/gcs" - "github.com/google/uuid" ) type gcsDriver struct { @@ -29,54 +29,39 @@ func NewGCSClient(ctx context.Context, bucket string) (Storage, error) { }, nil } -func (gcs *gcsDriver) Store(source io.Reader) (string, error) { - object := uuid.NewString() - err := gcs.writeObject(object, source) +func (gcs *gcsDriver) Write(source io.Reader, parts ...string) (string, error) { + if len(parts) == 0 { + return "", fmt.Errorf("no identifiers for the object were passed to Write") + } + object := path.Join(gcs.bucket, path.Join(parts...)) + w, err := gcs.client.FileWriter(object, "", "") if err != nil { return "", err } - return gcs.objectURI(object), nil -} - -func (gcs *gcsDriver) Update(uri string, source io.Reader) error { - object, err := gcs.objectName(uri) + defer w.Close() + _, err = io.Copy(w, source) if err != nil { - return err + return "", err } - return gcs.writeObject(object, source) + return "gcs://" + object, nil } func (gcs *gcsDriver) Read(uri string) (io.ReadCloser, error) { - object, err := gcs.objectName(uri) + bucket, object, err := gcs.parseURI(uri) if err != nil { return nil, err } - return gcs.client.FileReader(fmt.Sprintf("%s/%s", gcs.bucket, object)) + return gcs.client.FileReader(path.Join(bucket, object)) } -var gcsObjectRe = regexp.MustCompile(`^gcs://([\w-]+)/([\w-]+)$`) +var gcsObjectRe = regexp.MustCompile(`^gcs://([\w-]+)/(.+)$`) -func (gcs *gcsDriver) objectName(uri string) (string, error) { +func (gcs *gcsDriver) parseURI(uri string) (string, string, error) { match := gcsObjectRe.FindStringSubmatch(uri) if len(match) == 0 { - return "", fmt.Errorf("invalid GCS URI") + return "", "", fmt.Errorf("invalid GCS URI") } else if match[1] != gcs.bucket { - return "", fmt.Errorf("unexpected GCS bucket") + return "", "", fmt.Errorf("unexpected GCS bucket") } - return match[2], nil -} - -func (gcs *gcsDriver) objectURI(object string) string { - return fmt.Sprintf("gcs://%s/%s", gcs.bucket, object) -} - -func (gcs *gcsDriver) writeObject(object string, source io.Reader) error { - w, err := gcs.client.FileWriter(fmt.Sprintf("%s/%s", gcs.bucket, object), "", "") - if err != nil { - return err - } - defer w.Close() - - _, err = io.Copy(w, source) - return err + return gcs.bucket, match[2], nil } diff --git a/syz-cluster/pkg/blob/storage.go b/syz-cluster/pkg/blob/storage.go index be1d8f493..e3446e7fa 100644 --- a/syz-cluster/pkg/blob/storage.go +++ b/syz-cluster/pkg/blob/storage.go @@ -4,21 +4,21 @@ package blob import ( + "encoding/base64" "fmt" "io" "os" "path/filepath" "strings" - - "github.com/google/uuid" ) // Storage is not assumed to be used for partciularly large objects (e.g. GB of size), // but rather for blobs that risk overwhelming Spanner column size limits. type Storage interface { - // Store returns a URI to use later. - Store(source io.Reader) (string, error) - Update(key string, source io.Reader) error + // Write stores the object uniquely identified by a set of IDs (parts). + // If it already exists, it will be overwritten. + // The first argument is the URI which can be used to later retrieve it with Read. + Write(source io.Reader, parts ...string) (string, error) Read(uri string) (io.ReadCloser, error) } @@ -36,20 +36,19 @@ func NewLocalStorage(baseFolder string) *LocalStorage { const localStoragePrefix = "local://" -func (ls *LocalStorage) Store(source io.Reader) (string, error) { - name := uuid.NewString() - err := ls.writeFile(name, source) +func (ls *LocalStorage) Write(source io.Reader, parts ...string) (string, error) { + // A whatever approach that can handle arbitrary inputs. + name := base64.StdEncoding.EncodeToString([]byte(filepath.Join(parts...))) + file, err := os.Create(filepath.Join(ls.baseFolder, name)) if err != nil { return "", err } - return localStoragePrefix + name, nil -} - -func (ls *LocalStorage) Update(uri string, source io.Reader) error { - if !strings.HasPrefix(uri, localStoragePrefix) { - return fmt.Errorf("unsupported URI type") + defer file.Close() + _, err = io.Copy(file, source) + if err != nil { + return "", fmt.Errorf("failed to save data: %w", err) } - return ls.writeFile(strings.TrimPrefix(uri, localStoragePrefix), source) + return localStoragePrefix + name, nil } func (ls *LocalStorage) Read(uri string) (io.ReadCloser, error) { @@ -61,19 +60,6 @@ func (ls *LocalStorage) Read(uri string) (io.ReadCloser, error) { return os.Open(path) } -func (ls *LocalStorage) writeFile(name string, source io.Reader) error { - file, err := os.Create(filepath.Join(ls.baseFolder, name)) - if err != nil { - return err - } - defer file.Close() - _, err = io.Copy(file, source) - if err != nil { - return fmt.Errorf("failed to save data: %w", err) - } - return nil -} - func ReadAllBytes(storage Storage, uri string) ([]byte, error) { if uri == "" { return nil, nil diff --git a/syz-cluster/pkg/blob/storage_test.go b/syz-cluster/pkg/blob/storage_test.go index 9d2d830ce..51e7887bf 100644 --- a/syz-cluster/pkg/blob/storage_test.go +++ b/syz-cluster/pkg/blob/storage_test.go @@ -16,7 +16,7 @@ func TestLocalStorage(t *testing.T) { var uris []string for i := 0; i < 2; i++ { content := fmt.Sprintf("object #%d", i) - uri, err := storage.Store(bytes.NewReader([]byte(content))) + uri, err := storage.Write(bytes.NewReader([]byte(content)), fmt.Sprint(i)) assert.NoError(t, err) uris = append(uris, uri) } |
