diff options
| -rw-r--r-- | pkg/asset/backend_dummy.go | 2 | ||||
| -rw-r--r-- | pkg/asset/backend_gcs.go | 10 | ||||
| -rw-r--r-- | pkg/asset/storage.go | 88 | ||||
| -rw-r--r-- | pkg/asset/storage_test.go | 33 | ||||
| -rw-r--r-- | pkg/gcs/gcs.go | 14 | ||||
| -rw-r--r-- | syz-ci/manager.go | 20 |
6 files changed, 135 insertions, 32 deletions
diff --git a/pkg/asset/backend_dummy.go b/pkg/asset/backend_dummy.go index 5e811d48e..6df86809d 100644 --- a/pkg/asset/backend_dummy.go +++ b/pkg/asset/backend_dummy.go @@ -52,7 +52,7 @@ func (be *dummyStorageBackend) upload(req *uploadRequest) (*uploadResponse, erro if be.objectUpload != nil { return be.objectUpload(req) } - return &uploadResponse{writer: &dummyWriteCloser{}}, nil + return &uploadResponse{writer: &dummyWriteCloser{}, path: req.savePath}, nil } func (be *dummyStorageBackend) downloadURL(path string, publicURL bool) (string, error) { diff --git a/pkg/asset/backend_gcs.go b/pkg/asset/backend_gcs.go index bf53c9f65..3cbe445ee 100644 --- a/pkg/asset/backend_gcs.go +++ b/pkg/asset/backend_gcs.go @@ -57,6 +57,16 @@ func (wel *writeErrorLogger) Close() error { func (csb *cloudStorageBackend) upload(req *uploadRequest) (*uploadResponse, error) { path := fmt.Sprintf("%s/%s", csb.bucket, req.savePath) + // Best-effort check only. In the worst case we'll just overwite the file. + // The alternative would be to add an If-precondition, but it'd require + // complicated error-during-write handling. + exists, err := csb.client.FileExists(path) + if err != nil { + return nil, &FileExistsError{req.savePath} + } + if exists { + return nil, ErrAssetExists + } w, err := csb.client.FileWriterExt(path, req.contentType, req.contentEncoding) csb.tracer.Log("gcs upload: obtained a writer for %s, error %s", path, err) if err != nil { diff --git a/pkg/asset/storage.go b/pkg/asset/storage.go index 563035743..5c7134fd0 100644 --- a/pkg/asset/storage.go +++ b/pkg/asset/storage.go @@ -67,10 +67,36 @@ func (storage *Storage) getDefaultCompressor() Compressor { return xzCompressor } +type ExtraUploadArg struct { + // It is assumed that paths constructed with same UniqueTag values + // always correspond to an asset having the same content. + UniqueTag string + // If the asset being uploaded already exists (see above), don't return + // an error, abort uploading and return the download URL. + SkipIfExists bool +} + var ErrAssetTypeDisabled = errors.New("uploading assets of this type is disabled") +func (storage *Storage) assetPath(name string, extra *ExtraUploadArg) string { + folderName := "" + if extra != nil && extra.UniqueTag != "" { + folderName = extra.UniqueTag + } else { + // The idea is to make a file name useful and yet unique. + // So we put a file to a pseudo-unique "folder". + folderNameBytes := sha256.Sum256([]byte(fmt.Sprintf("%v", time.Now().UnixNano()))) + folderName = fmt.Sprintf("%x", folderNameBytes) + } + const folderPrefix = 12 + if len(folderName) > folderPrefix { + folderName = folderName[0:folderPrefix] + } + return fmt.Sprintf("%s/%s", folderName, name) +} + func (storage *Storage) uploadFileStream(reader io.Reader, assetType dashapi.AssetType, - name string) (string, error) { + name string, extra *ExtraUploadArg) (string, error) { if name == "" { return "", fmt.Errorf("file name is not specified") } @@ -82,14 +108,7 @@ func (storage *Storage) uploadFileStream(reader io.Reader, assetType dashapi.Ass return "", fmt.Errorf("not allowed to upload an asset of type %s: %w", assetType, ErrAssetTypeDisabled) } - // The idea is to make a file name useful and yet unique. - // So we put a file to a pseudo-unique "folder". - const folderPrefix = 6 - folderName := sha256.Sum256([]byte(fmt.Sprintf("%v", time.Now().UnixNano()))) - path := fmt.Sprintf("%x/%s", folderName[0:folderPrefix], name) - // TODO: allow an optional argument that would replace this "random" prefix. - // This should prevent the duplication of a number of assets (if the caller - // passes e.g. a file hash). + path := storage.assetPath(name, extra) req := &uploadRequest{ savePath: path, contentType: typeDescr.ContentType, @@ -103,28 +122,36 @@ func (storage *Storage) uploadFileStream(reader io.Reader, assetType dashapi.Ass compressor = typeDescr.customCompressor } res, err := compressor(req, storage.backend.upload) - if err != nil { + if existsErr, ok := err.(*FileExistsError); ok { + storage.tracer.Log("asset %s already exists", path) + if extra == nil || !extra.SkipIfExists { + return "", err + } + // Let's just return the download URL. + return storage.backend.downloadURL(existsErr.Path, storage.cfg.PublicAccess) + } else if err != nil { return "", fmt.Errorf("failed to query writer: %w", err) - } - written, err := io.Copy(res.writer, reader) - if err != nil { - more := "" - closeErr := res.writer.Close() - if exiterr, ok := closeErr.(*exec.ExitError); ok { - more = fmt.Sprintf(", process state '%s'", exiterr.ProcessState) + } else { + written, err := io.Copy(res.writer, reader) + if err != nil { + more := "" + closeErr := res.writer.Close() + if exiterr, ok := closeErr.(*exec.ExitError); ok { + more = fmt.Sprintf(", process state '%s'", exiterr.ProcessState) + } + return "", fmt.Errorf("failed to redirect byte stream: copied %d bytes, error %w%s", + written, err, more) + } + err = res.writer.Close() + if err != nil { + return "", fmt.Errorf("failed to close writer: %w", err) } - return "", fmt.Errorf("failed to redirect byte stream: copied %d bytes, error %w%s", - written, err, more) - } - err = res.writer.Close() - if err != nil { - return "", fmt.Errorf("failed to close writer: %w", err) } return storage.backend.downloadURL(res.path, storage.cfg.PublicAccess) } func (storage *Storage) UploadBuildAsset(reader io.Reader, fileName string, assetType dashapi.AssetType, - build *dashapi.Build) (dashapi.NewAsset, error) { + build *dashapi.Build, extra *ExtraUploadArg) (dashapi.NewAsset, error) { const commitPrefix = 8 commit := build.KernelCommit if len(commit) > commitPrefix { @@ -136,7 +163,7 @@ func (storage *Storage) UploadBuildAsset(reader io.Reader, fileName string, asse strings.TrimSuffix(baseName, fileExt), commit, fileExt) - url, err := storage.uploadFileStream(reader, assetType, name) + url, err := storage.uploadFileStream(reader, assetType, name, extra) if err != nil { return dashapi.NewAsset{}, err } @@ -156,6 +183,17 @@ func (storage *Storage) ReportBuildAssets(build *dashapi.Build, assets ...dashap var ErrAssetDoesNotExist = errors.New("the asset did not exist") +type FileExistsError struct { + // The path gets changed by wrappers, so we need to return it back. + Path string +} + +func (e *FileExistsError) Error() string { + return fmt.Sprintf("asset exists: %s", e.Path) +} + +var ErrAssetExists = errors.New("the asset already exists") + const deletionEmbargo = time.Hour * 24 * 7 // Best way: convert download URLs to paths. diff --git a/pkg/asset/storage_test.go b/pkg/asset/storage_test.go index f1c057806..9652c9d2a 100644 --- a/pkg/asset/storage_test.go +++ b/pkg/asset/storage_test.go @@ -114,7 +114,7 @@ func validateXz(res *uploadedFile, expected []byte) error { func (storage *Storage) sendBuildAsset(reader io.Reader, fileName string, assetType dashapi.AssetType, build *dashapi.Build) error { - asset, err := storage.UploadBuildAsset(reader, fileName, assetType, build) + asset, err := storage.UploadBuildAsset(reader, fileName, assetType, build, nil) if err != nil { return err } @@ -321,3 +321,34 @@ func TestAssetStorageConfiguration(t *testing.T) { t.Fatalf("UploadBuildAssetStream of BootableDisk expected to succeed, got %v", err) } } + +func TestUploadSameContent(t *testing.T) { + dashMock := newDashMock() + storage, be := makeStorage(t, dashMock.getDashapi()) + be.currentTime = time.Now().Add(-2 * deletionEmbargo) + + build := &dashapi.Build{ID: "1234", KernelCommit: "abcdef2134"} + extra := &ExtraUploadArg{UniqueTag: "uniquetag", SkipIfExists: true} + testContent := []byte{0x1, 0x2, 0x3, 0x4} + asset, err := storage.UploadBuildAsset(bytes.NewReader(testContent), "disk.raw", + dashapi.BootableDisk, build, extra) + if err != nil { + t.Fatalf("UploadBuildAssetexpected to succeed, got %v", err) + } + if !strings.Contains(asset.DownloadURL, extra.UniqueTag) { + t.Fatalf("%#v was expected to contain %#v", asset.DownloadURL, extra.UniqueTag) + } + // Upload the same asset again. + be.objectUpload = func(req *uploadRequest) (*uploadResponse, error) { + return nil, &FileExistsError{req.savePath} + } + assetTwo, err := storage.UploadBuildAsset(bytes.NewReader(testContent), "disk.raw", + dashapi.BootableDisk, build, extra) + if err != nil { + t.Fatalf("UploadBuildAssetexpected to succeed, got %v", err) + } + if asset.DownloadURL != assetTwo.DownloadURL { + t.Fatalf("assets were expected to have same download URL, got %#v %#v", + asset.DownloadURL, assetTwo.DownloadURL) + } +} diff --git a/pkg/gcs/gcs.go b/pkg/gcs/gcs.go index 2776927d4..e0ccd39c5 100644 --- a/pkg/gcs/gcs.go +++ b/pkg/gcs/gcs.go @@ -143,6 +143,20 @@ func (client *Client) DeleteFile(gcsFile string) error { return err } +func (client *Client) FileExists(gcsFile string) (bool, error) { + bucket, filename, err := split(gcsFile) + if err != nil { + return false, err + } + _, err = client.client.Bucket(bucket).Object(filename).Attrs(client.ctx) + if err == storage.ErrObjectNotExist { + return false, nil + } else if err != nil { + return false, err + } + return true, nil +} + // Where things get published. const ( PublicPrefix = "https://storage.googleapis.com/" diff --git a/syz-ci/manager.go b/syz-ci/manager.go index 182bfec1e..bb2ad2698 100644 --- a/syz-ci/manager.go +++ b/syz-ci/manager.go @@ -4,6 +4,7 @@ package main import ( + "crypto/sha256" "fmt" "io" "io/ioutil" @@ -723,11 +724,20 @@ func (mgr *Manager) uploadBuildAssets(build *dashapi.Build, assetFolder string) log.Logf(0, "uploading an asset %s of type %s", pendingAsset.path, pendingAsset.assetType) } - // TODO: ask the dashboard whether it actually needs this asset? - // This should remove the unnecessary re-uploading of assets after - // each manager restart, even if the kernel was not built anew. + extra := &asset.ExtraUploadArg{SkipIfExists: true} + hash := sha256.New() + if _, err := io.Copy(hash, file); err != nil { + log.Logf(0, "failed calculate hash for the asset %s: %s", pendingAsset.path, err) + continue + } + extra.UniqueTag = fmt.Sprintf("%x", hash.Sum(nil)) + // Now we need to go back to the beginning of the file again. + if _, err := file.Seek(0, io.SeekStart); err != nil { + log.Logf(0, "failed wind back the opened file for %s: %s", pendingAsset.path, err) + continue + } info, err := mgr.storage.UploadBuildAsset(file, pendingAsset.name, - pendingAsset.assetType, build) + pendingAsset.assetType, build, extra) if err != nil { log.Logf(0, "failed to upload an asset: %s, %s", pendingAsset.path, err) @@ -775,7 +785,7 @@ func (mgr *Manager) uploadCoverReport() error { } // Upload via the asset storage. newAsset, err := mgr.storage.UploadBuildAsset(resp.Body, mgr.name+".html", - dashapi.HTMLCoverageReport, mgr.lastBuild) + dashapi.HTMLCoverageReport, mgr.lastBuild, nil) if err != nil { return fmt.Errorf("failed to upload html coverage report: %w", err) } |
