aboutsummaryrefslogtreecommitdiffstats
path: root/pkg/asset/storage.go
diff options
context:
space:
mode:
authorAleksandr Nogikh <nogikh@google.com>2022-07-15 09:35:48 +0000
committerAleksandr Nogikh <wp32pw@gmail.com>2022-08-24 12:05:06 +0200
commit924a4fd1cb7a5ad3b5720380eb6fc742ea7602d2 (patch)
tree00f090d775e22d1c32340909d66cccdea6e32011 /pkg/asset/storage.go
parent6db8af716fb0995966f00e2d52d2f3baa43ea868 (diff)
syz-ci: upload build assets
This commit introduces the syz-ci side of the asset storage functionality. * Intercept assets at various stages of syz-ci operation. * Compress and upload assets to GCS. * Report assets to the dashboard. * Remove no longer needed assets.
Diffstat (limited to 'pkg/asset/storage.go')
-rw-r--r--pkg/asset/storage.go368
1 files changed, 368 insertions, 0 deletions
diff --git a/pkg/asset/storage.go b/pkg/asset/storage.go
new file mode 100644
index 000000000..95e7a236b
--- /dev/null
+++ b/pkg/asset/storage.go
@@ -0,0 +1,368 @@
+// Copyright 2022 syzkaller project authors. All rights reserved.
+// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
+
+package asset
+
+import (
+ "compress/gzip"
+ "crypto/sha256"
+ "errors"
+ "fmt"
+ "io"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/google/syzkaller/dashboard/dashapi"
+ "github.com/google/syzkaller/pkg/debugtracer"
+ "github.com/google/syzkaller/pkg/osutil"
+)
+
+type Storage struct {
+ cfg *Config
+ backend StorageBackend
+ dash *dashapi.Dashboard
+ tracer debugtracer.DebugTracer
+}
+
+func StorageFromConfig(cfg *Config, dash *dashapi.Dashboard) (*Storage, error) {
+ if dash == nil {
+ return nil, fmt.Errorf("dashboard api instance is necessary")
+ }
+ tracer := debugtracer.DebugTracer(&debugtracer.NullTracer{})
+ if cfg.Debug {
+ tracer = &debugtracer.GenericTracer{
+ WithTime: true,
+ TraceWriter: os.Stdout,
+ }
+ }
+ var backend StorageBackend
+ if strings.HasPrefix(cfg.UploadTo, "gs://") {
+ var err error
+ backend, err = makeCloudStorageBackend(strings.TrimPrefix(cfg.UploadTo, "gs://"), tracer)
+ if err != nil {
+ return nil, fmt.Errorf("the call to MakeCloudStorageBackend failed: %w", err)
+ }
+ } else if strings.HasPrefix(cfg.UploadTo, "dummy://") {
+ backend = makeDummyStorageBackend()
+ } else {
+ return nil, fmt.Errorf("unknown UploadTo during StorageFromConfig(): %#v", cfg.UploadTo)
+ }
+ return &Storage{
+ cfg: cfg,
+ backend: backend,
+ dash: dash,
+ tracer: tracer,
+ }, nil
+}
+
+func (storage *Storage) AssetTypeEnabled(assetType dashapi.AssetType) bool {
+ return storage.cfg.IsEnabled(assetType)
+}
+
+func (storage *Storage) getDefaultCompressor() Compressor {
+ if xzAvailable() {
+ return xzCompressor
+ }
+ return gzipCompressor
+}
+
+var ErrAssetTypeDisabled = errors.New("uploading assets of this type is disabled")
+
+func (storage *Storage) uploadFileStream(reader io.Reader, assetType dashapi.AssetType,
+ name string) (string, error) {
+ if name == "" {
+ return "", fmt.Errorf("file name is not specified")
+ }
+ typeDescr := GetTypeDescription(assetType)
+ if typeDescr == nil {
+ return "", fmt.Errorf("asset type %s is unknown", assetType)
+ }
+ if !storage.AssetTypeEnabled(assetType) {
+ return "", fmt.Errorf("not allowed to upload an asset of type %s: %w",
+ assetType, ErrAssetTypeDisabled)
+ }
+ // The idea is to make a file name useful and yet unique.
+ // So we put a file to a pseudo-unique "folder".
+ const folderPrefix = 6
+ folderName := sha256.Sum256([]byte(fmt.Sprintf("%v", time.Now().UnixNano())))
+ path := fmt.Sprintf("%x/%s", folderName[0:folderPrefix], name)
+ // TODO: allow an optional argument that would replace this "random" prefix.
+ // This should prevent the duplication of a number of assets (if the caller
+ // passes e.g. a file hash).
+ req := &uploadRequest{
+ savePath: path,
+ contentType: typeDescr.ContentType,
+ preserveExtension: typeDescr.preserveExtension,
+ }
+ if req.contentType == "" {
+ req.contentType = "application/octet-stream"
+ }
+ compressor := storage.getDefaultCompressor()
+ if typeDescr.customCompressor != nil {
+ compressor = typeDescr.customCompressor
+ }
+ res, err := compressor(req, storage.backend.upload)
+ if err != nil {
+ return "", fmt.Errorf("failed to query writer: %w", err)
+ }
+ written, err := io.Copy(res.writer, reader)
+ if err != nil {
+ more := ""
+ closeErr := res.writer.Close()
+ if exiterr, ok := closeErr.(*exec.ExitError); ok {
+ more = fmt.Sprintf(", process state '%s'", exiterr.ProcessState)
+ }
+ return "", fmt.Errorf("failed to redirect byte stream: copied %d bytes, error %w%s",
+ written, err, more)
+ }
+ err = res.writer.Close()
+ if err != nil {
+ return "", fmt.Errorf("failed to close writer: %w", err)
+ }
+ return storage.backend.downloadURL(res.path, storage.cfg.PublicAccess)
+}
+
+func (storage *Storage) UploadBuildAsset(reader io.Reader, fileName string, assetType dashapi.AssetType,
+ build *dashapi.Build) (dashapi.NewAsset, error) {
+ const commitPrefix = 8
+ commit := build.KernelCommit
+ if len(commit) > commitPrefix {
+ commit = commit[:commitPrefix]
+ }
+ baseName := filepath.Base(fileName)
+ fileExt := filepath.Ext(baseName)
+ name := fmt.Sprintf("%s-%s%s",
+ strings.TrimSuffix(baseName, fileExt),
+ commit,
+ fileExt)
+ url, err := storage.uploadFileStream(reader, assetType, name)
+ if err != nil {
+ return dashapi.NewAsset{}, err
+ }
+ return dashapi.NewAsset{
+ Type: assetType,
+ DownloadURL: url,
+ }, nil
+}
+func (storage *Storage) ReportBuildAssets(build *dashapi.Build, assets ...dashapi.NewAsset) error {
+ // If the server denies the reques, we'll delete the orphaned file during deprecated files
+ // deletion later.
+ return storage.dash.AddBuildAssets(&dashapi.AddBuildAssetsReq{
+ BuildID: build.ID,
+ Assets: assets,
+ })
+}
+
+var ErrAssetDoesNotExist = errors.New("the asset did not exist")
+
+const deletionEmbargo = time.Hour * 24 * 7
+
+// Best way: convert download URLs to paths.
+// We don't want to risk killing all assets after a slight domain change.
+func (storage *Storage) DeprecateAssets() error {
+ resp, err := storage.dash.NeededAssetsList()
+ if err != nil {
+ return fmt.Errorf("failed to query needed assets: %w", err)
+ }
+ needed := map[string]bool{}
+ for _, url := range resp.DownloadURLs {
+ path, err := storage.backend.getPath(url)
+ if err != nil {
+ // If we failed to parse just one URL, let's stop the entire process.
+ // Otherwise we'll start deleting still needed files we couldn't recognize.
+ return fmt.Errorf("failed to parse '%s': %w", url, err)
+ }
+ needed[path] = true
+ }
+ storage.tracer.Log("queried needed assets: %#v", needed)
+ existing, err := storage.backend.list()
+ if err != nil {
+ return fmt.Errorf("failed to query object list: %w", err)
+ }
+ toDelete := []string{}
+ intersection := 0
+ for _, obj := range existing {
+ keep := false
+ if time.Since(obj.createdAt) < deletionEmbargo {
+ // To avoid races between object upload and object deletion, we don't delete
+ // newly uploaded files for a while after they're uploaded.
+ keep = true
+ }
+ if val, ok := needed[obj.path]; ok && val {
+ keep = true
+ intersection++
+ }
+ storage.tracer.Log("-- object %v, %v: keep %t", obj.path, obj.createdAt, keep)
+ if !keep {
+ toDelete = append(toDelete, obj.path)
+ }
+ }
+ const intersectionCheckCutOff = 4
+ if len(existing) > intersectionCheckCutOff && intersection == 0 {
+ // This is a last-resort protection against possible dashboard bugs.
+ // If the needed assets have no intersection with the existing assets,
+ // don't delete anything. Otherwise, if it was a bug, we will lose all files.
+ return fmt.Errorf("needed assets have almost no intersection with the existing ones")
+ }
+ for _, path := range toDelete {
+ err := storage.backend.remove(path)
+ storage.tracer.Log("-- deleted %v: %v", path, err)
+ // Several syz-ci's might be sharing the same storage. So let's tolerate
+ // races during file deletion.
+ if err != nil && err != ErrAssetDoesNotExist {
+ return fmt.Errorf("asset deletion failure: %w", err)
+ }
+ }
+ return nil
+}
+
+type uploadRequest struct {
+ savePath string
+ contentEncoding string
+ contentType string
+ preserveExtension bool
+}
+
+type uploadResponse struct {
+ path string
+ writer io.WriteCloser
+}
+
+type storedObject struct {
+ path string
+ createdAt time.Time
+}
+
+type StorageBackend interface {
+ upload(req *uploadRequest) (*uploadResponse, error)
+ list() ([]storedObject, error)
+ remove(path string) error
+ downloadURL(path string, publicURL bool) (string, error)
+ getPath(url string) (string, error)
+}
+
+type Compressor func(req *uploadRequest,
+ next func(req *uploadRequest) (*uploadResponse, error)) (*uploadResponse, error)
+
+var xzPresent bool
+var xzCheck sync.Once
+
+const xzCompressionRatio = 0
+const xzThreadsCount = 6
+
+// TODO: switch to an xz library, so that we don't have to run commands.
+// Then, it'd probably be easier to wrap "writer" instead of "reader".
+
+func xzAvailable() bool {
+ xzCheck.Do(func() {
+ _, err := osutil.RunCmd(time.Minute, "", "xz --version")
+ xzPresent = err != nil
+ })
+ return xzPresent
+}
+
+func xzCompressor(req *uploadRequest,
+ next func(req *uploadRequest) (*uploadResponse, error)) (*uploadResponse, error) {
+ newReq := *req
+ if !req.preserveExtension {
+ newReq.savePath = fmt.Sprintf("%s.xz", newReq.savePath)
+ }
+ // "gz" contentEncoding is not really supported so far, so let's just set contentType.
+ if newReq.contentType == "" {
+ newReq.contentType = "application/x-xz"
+ }
+ resp, err := next(&newReq)
+ if err != nil {
+ return nil, err
+ }
+ // Take source data from stdin, write compressed data to stdout.
+ cmd := osutil.Command("xz", fmt.Sprintf("-%d", xzCompressionRatio),
+ "-T", fmt.Sprintf("%d", xzThreadsCount), "-F", "xz", "-c")
+ stdinWriter, err := cmd.StdinPipe()
+ if err != nil {
+ return nil, fmt.Errorf("failed to create stdout pipe: %w", err)
+ }
+ cmd.Stdout = resp.writer
+ err = cmd.Start()
+ if err != nil {
+ return nil, fmt.Errorf("xz preprocess: command start failed: %w", err)
+ }
+ return &uploadResponse{
+ path: resp.path,
+ writer: &wrappedWriteCloser{
+ writer: stdinWriter,
+ closeCallback: func() error {
+ // Once the writer which we return is closed, we want to
+ // also close the writer we're proxying.
+ err := cmd.Wait()
+ err2 := resp.writer.Close()
+ if err != nil {
+ return err
+ }
+ if err2 != nil {
+ return err2
+ }
+ return nil
+ },
+ },
+ }, nil
+}
+
+const gzipCompressionRatio = 4
+
+// This struct allows to attach a callback on the Close() method invocation of
+// an existing io.WriteCloser. Also, it can convert an io.Writer to an io.WriteCloser.
+type wrappedWriteCloser struct {
+ writer io.Writer
+ closeCallback func() error
+}
+
+func (wwc *wrappedWriteCloser) Write(p []byte) (int, error) {
+ return wwc.writer.Write(p)
+}
+
+func (wwc *wrappedWriteCloser) Close() error {
+ var err error
+ closer, ok := wwc.writer.(io.Closer)
+ if ok {
+ err = closer.Close()
+ }
+ err2 := wwc.closeCallback()
+ if err != nil {
+ return err
+ } else if err2 != nil {
+ return err2
+ }
+ return nil
+}
+
+func gzipCompressor(req *uploadRequest,
+ next func(req *uploadRequest) (*uploadResponse, error)) (*uploadResponse, error) {
+ newReq := *req
+ if !req.preserveExtension {
+ newReq.savePath = fmt.Sprintf("%s.gz", newReq.savePath)
+ }
+ newReq.contentEncoding = "gzip"
+ resp, err := next(&newReq)
+ if err != nil {
+ return nil, err
+ }
+ gzip, err := gzip.NewWriterLevel(resp.writer, gzipCompressionRatio)
+ if err != nil {
+ resp.writer.Close()
+ return nil, err
+ }
+ return &uploadResponse{
+ path: resp.path,
+ writer: &wrappedWriteCloser{
+ writer: gzip,
+ closeCallback: func() error {
+ return resp.writer.Close()
+ },
+ },
+ }, nil
+}