aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksandr Nogikh <nogikh@google.com>2022-08-18 14:32:50 +0000
committerAleksandr Nogikh <wp32pw@gmail.com>2022-08-24 12:05:06 +0200
commit514514f61b1dec32386fca74dfe3875a277d1791 (patch)
treed8f6ef06ab971a28f447b6fbe4a9161ac61d5fe4
parent43795179c6b57e7f671f7471e53a0bf2fdcfad0d (diff)
all: hash and ignore duplicate assets
Calculate sha256 values of assets-to-upload and don't upload them if they already exist.
-rw-r--r--pkg/asset/backend_dummy.go2
-rw-r--r--pkg/asset/backend_gcs.go10
-rw-r--r--pkg/asset/storage.go88
-rw-r--r--pkg/asset/storage_test.go33
-rw-r--r--pkg/gcs/gcs.go14
-rw-r--r--syz-ci/manager.go20
6 files changed, 135 insertions, 32 deletions
diff --git a/pkg/asset/backend_dummy.go b/pkg/asset/backend_dummy.go
index 5e811d48e..6df86809d 100644
--- a/pkg/asset/backend_dummy.go
+++ b/pkg/asset/backend_dummy.go
@@ -52,7 +52,7 @@ func (be *dummyStorageBackend) upload(req *uploadRequest) (*uploadResponse, erro
if be.objectUpload != nil {
return be.objectUpload(req)
}
- return &uploadResponse{writer: &dummyWriteCloser{}}, nil
+ return &uploadResponse{writer: &dummyWriteCloser{}, path: req.savePath}, nil
}
func (be *dummyStorageBackend) downloadURL(path string, publicURL bool) (string, error) {
diff --git a/pkg/asset/backend_gcs.go b/pkg/asset/backend_gcs.go
index bf53c9f65..3cbe445ee 100644
--- a/pkg/asset/backend_gcs.go
+++ b/pkg/asset/backend_gcs.go
@@ -57,6 +57,16 @@ func (wel *writeErrorLogger) Close() error {
func (csb *cloudStorageBackend) upload(req *uploadRequest) (*uploadResponse, error) {
path := fmt.Sprintf("%s/%s", csb.bucket, req.savePath)
+ // Best-effort check only. In the worst case we'll just overwite the file.
+ // The alternative would be to add an If-precondition, but it'd require
+ // complicated error-during-write handling.
+ exists, err := csb.client.FileExists(path)
+ if err != nil {
+ return nil, &FileExistsError{req.savePath}
+ }
+ if exists {
+ return nil, ErrAssetExists
+ }
w, err := csb.client.FileWriterExt(path, req.contentType, req.contentEncoding)
csb.tracer.Log("gcs upload: obtained a writer for %s, error %s", path, err)
if err != nil {
diff --git a/pkg/asset/storage.go b/pkg/asset/storage.go
index 563035743..5c7134fd0 100644
--- a/pkg/asset/storage.go
+++ b/pkg/asset/storage.go
@@ -67,10 +67,36 @@ func (storage *Storage) getDefaultCompressor() Compressor {
return xzCompressor
}
+type ExtraUploadArg struct {
+ // It is assumed that paths constructed with same UniqueTag values
+ // always correspond to an asset having the same content.
+ UniqueTag string
+ // If the asset being uploaded already exists (see above), don't return
+ // an error, abort uploading and return the download URL.
+ SkipIfExists bool
+}
+
var ErrAssetTypeDisabled = errors.New("uploading assets of this type is disabled")
+func (storage *Storage) assetPath(name string, extra *ExtraUploadArg) string {
+ folderName := ""
+ if extra != nil && extra.UniqueTag != "" {
+ folderName = extra.UniqueTag
+ } else {
+ // The idea is to make a file name useful and yet unique.
+ // So we put a file to a pseudo-unique "folder".
+ folderNameBytes := sha256.Sum256([]byte(fmt.Sprintf("%v", time.Now().UnixNano())))
+ folderName = fmt.Sprintf("%x", folderNameBytes)
+ }
+ const folderPrefix = 12
+ if len(folderName) > folderPrefix {
+ folderName = folderName[0:folderPrefix]
+ }
+ return fmt.Sprintf("%s/%s", folderName, name)
+}
+
func (storage *Storage) uploadFileStream(reader io.Reader, assetType dashapi.AssetType,
- name string) (string, error) {
+ name string, extra *ExtraUploadArg) (string, error) {
if name == "" {
return "", fmt.Errorf("file name is not specified")
}
@@ -82,14 +108,7 @@ func (storage *Storage) uploadFileStream(reader io.Reader, assetType dashapi.Ass
return "", fmt.Errorf("not allowed to upload an asset of type %s: %w",
assetType, ErrAssetTypeDisabled)
}
- // The idea is to make a file name useful and yet unique.
- // So we put a file to a pseudo-unique "folder".
- const folderPrefix = 6
- folderName := sha256.Sum256([]byte(fmt.Sprintf("%v", time.Now().UnixNano())))
- path := fmt.Sprintf("%x/%s", folderName[0:folderPrefix], name)
- // TODO: allow an optional argument that would replace this "random" prefix.
- // This should prevent the duplication of a number of assets (if the caller
- // passes e.g. a file hash).
+ path := storage.assetPath(name, extra)
req := &uploadRequest{
savePath: path,
contentType: typeDescr.ContentType,
@@ -103,28 +122,36 @@ func (storage *Storage) uploadFileStream(reader io.Reader, assetType dashapi.Ass
compressor = typeDescr.customCompressor
}
res, err := compressor(req, storage.backend.upload)
- if err != nil {
+ if existsErr, ok := err.(*FileExistsError); ok {
+ storage.tracer.Log("asset %s already exists", path)
+ if extra == nil || !extra.SkipIfExists {
+ return "", err
+ }
+ // Let's just return the download URL.
+ return storage.backend.downloadURL(existsErr.Path, storage.cfg.PublicAccess)
+ } else if err != nil {
return "", fmt.Errorf("failed to query writer: %w", err)
- }
- written, err := io.Copy(res.writer, reader)
- if err != nil {
- more := ""
- closeErr := res.writer.Close()
- if exiterr, ok := closeErr.(*exec.ExitError); ok {
- more = fmt.Sprintf(", process state '%s'", exiterr.ProcessState)
+ } else {
+ written, err := io.Copy(res.writer, reader)
+ if err != nil {
+ more := ""
+ closeErr := res.writer.Close()
+ if exiterr, ok := closeErr.(*exec.ExitError); ok {
+ more = fmt.Sprintf(", process state '%s'", exiterr.ProcessState)
+ }
+ return "", fmt.Errorf("failed to redirect byte stream: copied %d bytes, error %w%s",
+ written, err, more)
+ }
+ err = res.writer.Close()
+ if err != nil {
+ return "", fmt.Errorf("failed to close writer: %w", err)
}
- return "", fmt.Errorf("failed to redirect byte stream: copied %d bytes, error %w%s",
- written, err, more)
- }
- err = res.writer.Close()
- if err != nil {
- return "", fmt.Errorf("failed to close writer: %w", err)
}
return storage.backend.downloadURL(res.path, storage.cfg.PublicAccess)
}
func (storage *Storage) UploadBuildAsset(reader io.Reader, fileName string, assetType dashapi.AssetType,
- build *dashapi.Build) (dashapi.NewAsset, error) {
+ build *dashapi.Build, extra *ExtraUploadArg) (dashapi.NewAsset, error) {
const commitPrefix = 8
commit := build.KernelCommit
if len(commit) > commitPrefix {
@@ -136,7 +163,7 @@ func (storage *Storage) UploadBuildAsset(reader io.Reader, fileName string, asse
strings.TrimSuffix(baseName, fileExt),
commit,
fileExt)
- url, err := storage.uploadFileStream(reader, assetType, name)
+ url, err := storage.uploadFileStream(reader, assetType, name, extra)
if err != nil {
return dashapi.NewAsset{}, err
}
@@ -156,6 +183,17 @@ func (storage *Storage) ReportBuildAssets(build *dashapi.Build, assets ...dashap
var ErrAssetDoesNotExist = errors.New("the asset did not exist")
+type FileExistsError struct {
+ // The path gets changed by wrappers, so we need to return it back.
+ Path string
+}
+
+func (e *FileExistsError) Error() string {
+ return fmt.Sprintf("asset exists: %s", e.Path)
+}
+
+var ErrAssetExists = errors.New("the asset already exists")
+
const deletionEmbargo = time.Hour * 24 * 7
// Best way: convert download URLs to paths.
diff --git a/pkg/asset/storage_test.go b/pkg/asset/storage_test.go
index f1c057806..9652c9d2a 100644
--- a/pkg/asset/storage_test.go
+++ b/pkg/asset/storage_test.go
@@ -114,7 +114,7 @@ func validateXz(res *uploadedFile, expected []byte) error {
func (storage *Storage) sendBuildAsset(reader io.Reader, fileName string, assetType dashapi.AssetType,
build *dashapi.Build) error {
- asset, err := storage.UploadBuildAsset(reader, fileName, assetType, build)
+ asset, err := storage.UploadBuildAsset(reader, fileName, assetType, build, nil)
if err != nil {
return err
}
@@ -321,3 +321,34 @@ func TestAssetStorageConfiguration(t *testing.T) {
t.Fatalf("UploadBuildAssetStream of BootableDisk expected to succeed, got %v", err)
}
}
+
+func TestUploadSameContent(t *testing.T) {
+ dashMock := newDashMock()
+ storage, be := makeStorage(t, dashMock.getDashapi())
+ be.currentTime = time.Now().Add(-2 * deletionEmbargo)
+
+ build := &dashapi.Build{ID: "1234", KernelCommit: "abcdef2134"}
+ extra := &ExtraUploadArg{UniqueTag: "uniquetag", SkipIfExists: true}
+ testContent := []byte{0x1, 0x2, 0x3, 0x4}
+ asset, err := storage.UploadBuildAsset(bytes.NewReader(testContent), "disk.raw",
+ dashapi.BootableDisk, build, extra)
+ if err != nil {
+ t.Fatalf("UploadBuildAssetexpected to succeed, got %v", err)
+ }
+ if !strings.Contains(asset.DownloadURL, extra.UniqueTag) {
+ t.Fatalf("%#v was expected to contain %#v", asset.DownloadURL, extra.UniqueTag)
+ }
+ // Upload the same asset again.
+ be.objectUpload = func(req *uploadRequest) (*uploadResponse, error) {
+ return nil, &FileExistsError{req.savePath}
+ }
+ assetTwo, err := storage.UploadBuildAsset(bytes.NewReader(testContent), "disk.raw",
+ dashapi.BootableDisk, build, extra)
+ if err != nil {
+ t.Fatalf("UploadBuildAssetexpected to succeed, got %v", err)
+ }
+ if asset.DownloadURL != assetTwo.DownloadURL {
+ t.Fatalf("assets were expected to have same download URL, got %#v %#v",
+ asset.DownloadURL, assetTwo.DownloadURL)
+ }
+}
diff --git a/pkg/gcs/gcs.go b/pkg/gcs/gcs.go
index 2776927d4..e0ccd39c5 100644
--- a/pkg/gcs/gcs.go
+++ b/pkg/gcs/gcs.go
@@ -143,6 +143,20 @@ func (client *Client) DeleteFile(gcsFile string) error {
return err
}
+func (client *Client) FileExists(gcsFile string) (bool, error) {
+ bucket, filename, err := split(gcsFile)
+ if err != nil {
+ return false, err
+ }
+ _, err = client.client.Bucket(bucket).Object(filename).Attrs(client.ctx)
+ if err == storage.ErrObjectNotExist {
+ return false, nil
+ } else if err != nil {
+ return false, err
+ }
+ return true, nil
+}
+
// Where things get published.
const (
PublicPrefix = "https://storage.googleapis.com/"
diff --git a/syz-ci/manager.go b/syz-ci/manager.go
index 182bfec1e..bb2ad2698 100644
--- a/syz-ci/manager.go
+++ b/syz-ci/manager.go
@@ -4,6 +4,7 @@
package main
import (
+ "crypto/sha256"
"fmt"
"io"
"io/ioutil"
@@ -723,11 +724,20 @@ func (mgr *Manager) uploadBuildAssets(build *dashapi.Build, assetFolder string)
log.Logf(0, "uploading an asset %s of type %s",
pendingAsset.path, pendingAsset.assetType)
}
- // TODO: ask the dashboard whether it actually needs this asset?
- // This should remove the unnecessary re-uploading of assets after
- // each manager restart, even if the kernel was not built anew.
+ extra := &asset.ExtraUploadArg{SkipIfExists: true}
+ hash := sha256.New()
+ if _, err := io.Copy(hash, file); err != nil {
+ log.Logf(0, "failed calculate hash for the asset %s: %s", pendingAsset.path, err)
+ continue
+ }
+ extra.UniqueTag = fmt.Sprintf("%x", hash.Sum(nil))
+ // Now we need to go back to the beginning of the file again.
+ if _, err := file.Seek(0, io.SeekStart); err != nil {
+ log.Logf(0, "failed wind back the opened file for %s: %s", pendingAsset.path, err)
+ continue
+ }
info, err := mgr.storage.UploadBuildAsset(file, pendingAsset.name,
- pendingAsset.assetType, build)
+ pendingAsset.assetType, build, extra)
if err != nil {
log.Logf(0, "failed to upload an asset: %s, %s",
pendingAsset.path, err)
@@ -775,7 +785,7 @@ func (mgr *Manager) uploadCoverReport() error {
}
// Upload via the asset storage.
newAsset, err := mgr.storage.UploadBuildAsset(resp.Body, mgr.name+".html",
- dashapi.HTMLCoverageReport, mgr.lastBuild)
+ dashapi.HTMLCoverageReport, mgr.lastBuild, nil)
if err != nil {
return fmt.Errorf("failed to upload html coverage report: %w", err)
}