diff options
| author | Aleksandr Nogikh <nogikh@google.com> | 2022-07-15 09:35:48 +0000 |
|---|---|---|
| committer | Aleksandr Nogikh <wp32pw@gmail.com> | 2022-08-24 12:05:06 +0200 |
| commit | 924a4fd1cb7a5ad3b5720380eb6fc742ea7602d2 (patch) | |
| tree | 00f090d775e22d1c32340909d66cccdea6e32011 /pkg/asset/storage.go | |
| parent | 6db8af716fb0995966f00e2d52d2f3baa43ea868 (diff) | |
syz-ci: upload build assets
This commit introduces the syz-ci side of the asset storage
functionality.
* Intercept assets at various stages of syz-ci operation.
* Compress and upload assets to GCS.
* Report assets to the dashboard.
* Remove no longer needed assets.
Diffstat (limited to 'pkg/asset/storage.go')
| -rw-r--r-- | pkg/asset/storage.go | 368 |
1 files changed, 368 insertions, 0 deletions
diff --git a/pkg/asset/storage.go b/pkg/asset/storage.go new file mode 100644 index 000000000..95e7a236b --- /dev/null +++ b/pkg/asset/storage.go @@ -0,0 +1,368 @@ +// Copyright 2022 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package asset + +import ( + "compress/gzip" + "crypto/sha256" + "errors" + "fmt" + "io" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/google/syzkaller/dashboard/dashapi" + "github.com/google/syzkaller/pkg/debugtracer" + "github.com/google/syzkaller/pkg/osutil" +) + +type Storage struct { + cfg *Config + backend StorageBackend + dash *dashapi.Dashboard + tracer debugtracer.DebugTracer +} + +func StorageFromConfig(cfg *Config, dash *dashapi.Dashboard) (*Storage, error) { + if dash == nil { + return nil, fmt.Errorf("dashboard api instance is necessary") + } + tracer := debugtracer.DebugTracer(&debugtracer.NullTracer{}) + if cfg.Debug { + tracer = &debugtracer.GenericTracer{ + WithTime: true, + TraceWriter: os.Stdout, + } + } + var backend StorageBackend + if strings.HasPrefix(cfg.UploadTo, "gs://") { + var err error + backend, err = makeCloudStorageBackend(strings.TrimPrefix(cfg.UploadTo, "gs://"), tracer) + if err != nil { + return nil, fmt.Errorf("the call to MakeCloudStorageBackend failed: %w", err) + } + } else if strings.HasPrefix(cfg.UploadTo, "dummy://") { + backend = makeDummyStorageBackend() + } else { + return nil, fmt.Errorf("unknown UploadTo during StorageFromConfig(): %#v", cfg.UploadTo) + } + return &Storage{ + cfg: cfg, + backend: backend, + dash: dash, + tracer: tracer, + }, nil +} + +func (storage *Storage) AssetTypeEnabled(assetType dashapi.AssetType) bool { + return storage.cfg.IsEnabled(assetType) +} + +func (storage *Storage) getDefaultCompressor() Compressor { + if xzAvailable() { + return xzCompressor + } + return gzipCompressor +} + +var ErrAssetTypeDisabled = errors.New("uploading assets of this type is disabled") + +func (storage *Storage) uploadFileStream(reader io.Reader, assetType dashapi.AssetType, + name string) (string, error) { + if name == "" { + return "", fmt.Errorf("file name is not specified") + } + typeDescr := GetTypeDescription(assetType) + if typeDescr == nil { + return "", fmt.Errorf("asset type %s is unknown", assetType) + } + if !storage.AssetTypeEnabled(assetType) { + return "", fmt.Errorf("not allowed to upload an asset of type %s: %w", + assetType, ErrAssetTypeDisabled) + } + // The idea is to make a file name useful and yet unique. + // So we put a file to a pseudo-unique "folder". + const folderPrefix = 6 + folderName := sha256.Sum256([]byte(fmt.Sprintf("%v", time.Now().UnixNano()))) + path := fmt.Sprintf("%x/%s", folderName[0:folderPrefix], name) + // TODO: allow an optional argument that would replace this "random" prefix. + // This should prevent the duplication of a number of assets (if the caller + // passes e.g. a file hash). + req := &uploadRequest{ + savePath: path, + contentType: typeDescr.ContentType, + preserveExtension: typeDescr.preserveExtension, + } + if req.contentType == "" { + req.contentType = "application/octet-stream" + } + compressor := storage.getDefaultCompressor() + if typeDescr.customCompressor != nil { + compressor = typeDescr.customCompressor + } + res, err := compressor(req, storage.backend.upload) + if err != nil { + return "", fmt.Errorf("failed to query writer: %w", err) + } + written, err := io.Copy(res.writer, reader) + if err != nil { + more := "" + closeErr := res.writer.Close() + if exiterr, ok := closeErr.(*exec.ExitError); ok { + more = fmt.Sprintf(", process state '%s'", exiterr.ProcessState) + } + return "", fmt.Errorf("failed to redirect byte stream: copied %d bytes, error %w%s", + written, err, more) + } + err = res.writer.Close() + if err != nil { + return "", fmt.Errorf("failed to close writer: %w", err) + } + return storage.backend.downloadURL(res.path, storage.cfg.PublicAccess) +} + +func (storage *Storage) UploadBuildAsset(reader io.Reader, fileName string, assetType dashapi.AssetType, + build *dashapi.Build) (dashapi.NewAsset, error) { + const commitPrefix = 8 + commit := build.KernelCommit + if len(commit) > commitPrefix { + commit = commit[:commitPrefix] + } + baseName := filepath.Base(fileName) + fileExt := filepath.Ext(baseName) + name := fmt.Sprintf("%s-%s%s", + strings.TrimSuffix(baseName, fileExt), + commit, + fileExt) + url, err := storage.uploadFileStream(reader, assetType, name) + if err != nil { + return dashapi.NewAsset{}, err + } + return dashapi.NewAsset{ + Type: assetType, + DownloadURL: url, + }, nil +} +func (storage *Storage) ReportBuildAssets(build *dashapi.Build, assets ...dashapi.NewAsset) error { + // If the server denies the reques, we'll delete the orphaned file during deprecated files + // deletion later. + return storage.dash.AddBuildAssets(&dashapi.AddBuildAssetsReq{ + BuildID: build.ID, + Assets: assets, + }) +} + +var ErrAssetDoesNotExist = errors.New("the asset did not exist") + +const deletionEmbargo = time.Hour * 24 * 7 + +// Best way: convert download URLs to paths. +// We don't want to risk killing all assets after a slight domain change. +func (storage *Storage) DeprecateAssets() error { + resp, err := storage.dash.NeededAssetsList() + if err != nil { + return fmt.Errorf("failed to query needed assets: %w", err) + } + needed := map[string]bool{} + for _, url := range resp.DownloadURLs { + path, err := storage.backend.getPath(url) + if err != nil { + // If we failed to parse just one URL, let's stop the entire process. + // Otherwise we'll start deleting still needed files we couldn't recognize. + return fmt.Errorf("failed to parse '%s': %w", url, err) + } + needed[path] = true + } + storage.tracer.Log("queried needed assets: %#v", needed) + existing, err := storage.backend.list() + if err != nil { + return fmt.Errorf("failed to query object list: %w", err) + } + toDelete := []string{} + intersection := 0 + for _, obj := range existing { + keep := false + if time.Since(obj.createdAt) < deletionEmbargo { + // To avoid races between object upload and object deletion, we don't delete + // newly uploaded files for a while after they're uploaded. + keep = true + } + if val, ok := needed[obj.path]; ok && val { + keep = true + intersection++ + } + storage.tracer.Log("-- object %v, %v: keep %t", obj.path, obj.createdAt, keep) + if !keep { + toDelete = append(toDelete, obj.path) + } + } + const intersectionCheckCutOff = 4 + if len(existing) > intersectionCheckCutOff && intersection == 0 { + // This is a last-resort protection against possible dashboard bugs. + // If the needed assets have no intersection with the existing assets, + // don't delete anything. Otherwise, if it was a bug, we will lose all files. + return fmt.Errorf("needed assets have almost no intersection with the existing ones") + } + for _, path := range toDelete { + err := storage.backend.remove(path) + storage.tracer.Log("-- deleted %v: %v", path, err) + // Several syz-ci's might be sharing the same storage. So let's tolerate + // races during file deletion. + if err != nil && err != ErrAssetDoesNotExist { + return fmt.Errorf("asset deletion failure: %w", err) + } + } + return nil +} + +type uploadRequest struct { + savePath string + contentEncoding string + contentType string + preserveExtension bool +} + +type uploadResponse struct { + path string + writer io.WriteCloser +} + +type storedObject struct { + path string + createdAt time.Time +} + +type StorageBackend interface { + upload(req *uploadRequest) (*uploadResponse, error) + list() ([]storedObject, error) + remove(path string) error + downloadURL(path string, publicURL bool) (string, error) + getPath(url string) (string, error) +} + +type Compressor func(req *uploadRequest, + next func(req *uploadRequest) (*uploadResponse, error)) (*uploadResponse, error) + +var xzPresent bool +var xzCheck sync.Once + +const xzCompressionRatio = 0 +const xzThreadsCount = 6 + +// TODO: switch to an xz library, so that we don't have to run commands. +// Then, it'd probably be easier to wrap "writer" instead of "reader". + +func xzAvailable() bool { + xzCheck.Do(func() { + _, err := osutil.RunCmd(time.Minute, "", "xz --version") + xzPresent = err != nil + }) + return xzPresent +} + +func xzCompressor(req *uploadRequest, + next func(req *uploadRequest) (*uploadResponse, error)) (*uploadResponse, error) { + newReq := *req + if !req.preserveExtension { + newReq.savePath = fmt.Sprintf("%s.xz", newReq.savePath) + } + // "gz" contentEncoding is not really supported so far, so let's just set contentType. + if newReq.contentType == "" { + newReq.contentType = "application/x-xz" + } + resp, err := next(&newReq) + if err != nil { + return nil, err + } + // Take source data from stdin, write compressed data to stdout. + cmd := osutil.Command("xz", fmt.Sprintf("-%d", xzCompressionRatio), + "-T", fmt.Sprintf("%d", xzThreadsCount), "-F", "xz", "-c") + stdinWriter, err := cmd.StdinPipe() + if err != nil { + return nil, fmt.Errorf("failed to create stdout pipe: %w", err) + } + cmd.Stdout = resp.writer + err = cmd.Start() + if err != nil { + return nil, fmt.Errorf("xz preprocess: command start failed: %w", err) + } + return &uploadResponse{ + path: resp.path, + writer: &wrappedWriteCloser{ + writer: stdinWriter, + closeCallback: func() error { + // Once the writer which we return is closed, we want to + // also close the writer we're proxying. + err := cmd.Wait() + err2 := resp.writer.Close() + if err != nil { + return err + } + if err2 != nil { + return err2 + } + return nil + }, + }, + }, nil +} + +const gzipCompressionRatio = 4 + +// This struct allows to attach a callback on the Close() method invocation of +// an existing io.WriteCloser. Also, it can convert an io.Writer to an io.WriteCloser. +type wrappedWriteCloser struct { + writer io.Writer + closeCallback func() error +} + +func (wwc *wrappedWriteCloser) Write(p []byte) (int, error) { + return wwc.writer.Write(p) +} + +func (wwc *wrappedWriteCloser) Close() error { + var err error + closer, ok := wwc.writer.(io.Closer) + if ok { + err = closer.Close() + } + err2 := wwc.closeCallback() + if err != nil { + return err + } else if err2 != nil { + return err2 + } + return nil +} + +func gzipCompressor(req *uploadRequest, + next func(req *uploadRequest) (*uploadResponse, error)) (*uploadResponse, error) { + newReq := *req + if !req.preserveExtension { + newReq.savePath = fmt.Sprintf("%s.gz", newReq.savePath) + } + newReq.contentEncoding = "gzip" + resp, err := next(&newReq) + if err != nil { + return nil, err + } + gzip, err := gzip.NewWriterLevel(resp.writer, gzipCompressionRatio) + if err != nil { + resp.writer.Close() + return nil, err + } + return &uploadResponse{ + path: resp.path, + writer: &wrappedWriteCloser{ + writer: gzip, + closeCallback: func() error { + return resp.writer.Close() + }, + }, + }, nil +} |
