aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksandr Nogikh <nogikh@google.com>2022-07-15 09:35:48 +0000
committerAleksandr Nogikh <wp32pw@gmail.com>2022-08-24 12:05:06 +0200
commit924a4fd1cb7a5ad3b5720380eb6fc742ea7602d2 (patch)
tree00f090d775e22d1c32340909d66cccdea6e32011
parent6db8af716fb0995966f00e2d52d2f3baa43ea868 (diff)
syz-ci: upload build assets
This commit introduces the syz-ci side of the asset storage functionality. * Intercept assets at various stages of syz-ci operation. * Compress and upload assets to GCS. * Report assets to the dashboard. * Remove no longer needed assets.
-rw-r--r--go.mod1
-rw-r--r--pkg/asset/backend_dummy.go103
-rw-r--r--pkg/asset/backend_gcs.go112
-rw-r--r--pkg/asset/config.go75
-rw-r--r--pkg/asset/storage.go368
-rw-r--r--pkg/asset/storage_test.go325
-rw-r--r--pkg/asset/type.go80
-rw-r--r--syz-ci/manager.go206
-rw-r--r--syz-ci/syz-ci.go46
-rw-r--r--vendor/modules.txt1
10 files changed, 1249 insertions, 68 deletions
diff --git a/go.mod b/go.mod
index bec19e68c..9e9dd1ed0 100644
--- a/go.mod
+++ b/go.mod
@@ -31,6 +31,7 @@ require (
golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420
golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c
golang.org/x/perf v0.0.0-20211012211434-03971e389cd3
+ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20210503080704-8803ae5d1324
golang.org/x/tools v0.1.0
google.golang.org/api v0.46.0
diff --git a/pkg/asset/backend_dummy.go b/pkg/asset/backend_dummy.go
new file mode 100644
index 000000000..5e811d48e
--- /dev/null
+++ b/pkg/asset/backend_dummy.go
@@ -0,0 +1,103 @@
+// Copyright 2022 syzkaller project authors. All rights reserved.
+// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
+
+package asset
+
+import (
+ "fmt"
+ "strings"
+ "time"
+)
+
+type objectUploadCallback func(req *uploadRequest) (*uploadResponse, error)
+type objectRemoveCallback func(url string) error
+
+type dummyObject struct {
+ createdAt time.Time
+ contentType string
+ contentEncoding string
+}
+
+type dummyStorageBackend struct {
+ currentTime time.Time
+ objects map[string]*dummyObject
+ objectUpload objectUploadCallback
+ objectRemove objectRemoveCallback
+}
+
+func makeDummyStorageBackend() *dummyStorageBackend {
+ return &dummyStorageBackend{
+ currentTime: time.Now(),
+ objects: make(map[string]*dummyObject),
+ }
+}
+
+type dummyWriteCloser struct {
+}
+
+func (dwc *dummyWriteCloser) Write(p []byte) (int, error) {
+ return len(p), nil
+}
+
+func (dwc *dummyWriteCloser) Close() error {
+ return nil
+}
+
+func (be *dummyStorageBackend) upload(req *uploadRequest) (*uploadResponse, error) {
+ be.objects[req.savePath] = &dummyObject{
+ createdAt: be.currentTime,
+ contentType: req.contentType,
+ contentEncoding: req.contentEncoding,
+ }
+ if be.objectUpload != nil {
+ return be.objectUpload(req)
+ }
+ return &uploadResponse{writer: &dummyWriteCloser{}}, nil
+}
+
+func (be *dummyStorageBackend) downloadURL(path string, publicURL bool) (string, error) {
+ return "http://download/" + path, nil
+}
+
+func (be *dummyStorageBackend) getPath(url string) (string, error) {
+ return strings.TrimPrefix(url, "http://download/"), nil
+}
+
+func (be *dummyStorageBackend) list() ([]storedObject, error) {
+ ret := []storedObject{}
+ for path, obj := range be.objects {
+ ret = append(ret, storedObject{
+ path: path,
+ createdAt: obj.createdAt,
+ })
+ }
+ return ret, nil
+}
+
+func (be *dummyStorageBackend) remove(path string) error {
+ if be.objectRemove != nil {
+ if err := be.objectRemove(path); err != nil {
+ return err
+ }
+ }
+ if _, ok := be.objects[path]; !ok {
+ return ErrAssetDoesNotExist
+ }
+ delete(be.objects, path)
+ return nil
+}
+
+func (be *dummyStorageBackend) hasOnly(paths []string) error {
+ makeError := func() error {
+ return fmt.Errorf("object sets are not equal; needed: %#v; uploaded: %#v", paths, be.objects)
+ }
+ if len(paths) != len(be.objects) {
+ return makeError()
+ }
+ for _, path := range paths {
+ if be.objects[path] == nil {
+ return makeError()
+ }
+ }
+ return nil
+}
diff --git a/pkg/asset/backend_gcs.go b/pkg/asset/backend_gcs.go
new file mode 100644
index 000000000..bf53c9f65
--- /dev/null
+++ b/pkg/asset/backend_gcs.go
@@ -0,0 +1,112 @@
+// Copyright 2022 syzkaller project authors. All rights reserved.
+// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
+
+package asset
+
+import (
+ "fmt"
+ "io"
+ "net/url"
+ "strings"
+
+ "github.com/google/syzkaller/pkg/debugtracer"
+ "github.com/google/syzkaller/pkg/gcs"
+)
+
+type cloudStorageBackend struct {
+ client *gcs.Client
+ bucket string
+ tracer debugtracer.DebugTracer
+}
+
+func makeCloudStorageBackend(bucket string, tracer debugtracer.DebugTracer) (*cloudStorageBackend, error) {
+ tracer.Log("created gcs backend for bucket '%s'", bucket)
+ client, err := gcs.NewClient()
+ if err != nil {
+ return nil, fmt.Errorf("the call to NewClient failed: %w", err)
+ }
+ return &cloudStorageBackend{
+ client: client,
+ bucket: bucket,
+ tracer: tracer,
+ }, nil
+}
+
+// Actual write errors might be hidden, so we wrap the writer here
+// to ensure that they get logged.
+type writeErrorLogger struct {
+ writeCloser io.WriteCloser
+ tracer debugtracer.DebugTracer
+}
+
+func (wel *writeErrorLogger) Write(p []byte) (n int, err error) {
+ n, err = wel.writeCloser.Write(p)
+ if err != nil {
+ wel.tracer.Log("cloud storage write error: %s", err)
+ }
+ return
+}
+
+func (wel *writeErrorLogger) Close() error {
+ err := wel.writeCloser.Close()
+ if err != nil {
+ wel.tracer.Log("cloud storage writer close error: %s", err)
+ }
+ return err
+}
+
+func (csb *cloudStorageBackend) upload(req *uploadRequest) (*uploadResponse, error) {
+ path := fmt.Sprintf("%s/%s", csb.bucket, req.savePath)
+ w, err := csb.client.FileWriterExt(path, req.contentType, req.contentEncoding)
+ csb.tracer.Log("gcs upload: obtained a writer for %s, error %s", path, err)
+ if err != nil {
+ return nil, err
+ }
+ return &uploadResponse{
+ writer: &writeErrorLogger{
+ writeCloser: w,
+ tracer: csb.tracer,
+ },
+ path: path,
+ }, nil
+}
+
+func (csb *cloudStorageBackend) downloadURL(path string, publicURL bool) (string, error) {
+ return csb.client.GetDownloadURL(path, publicURL), nil
+}
+
+func (csb *cloudStorageBackend) getPath(downloadURL string) (string, error) {
+ u, err := url.Parse(downloadURL)
+ if err != nil {
+ return "", fmt.Errorf("failed to parse the URL: %w", err)
+ }
+ parts := strings.SplitN(u.Path, csb.bucket+"/", 2)
+ if len(parts) != 2 {
+ return "", fmt.Errorf("%s/ is not present in the path %s", csb.bucket, u.Path)
+ }
+ return parts[1], nil
+}
+
+func (csb *cloudStorageBackend) list() ([]storedObject, error) {
+ list, err := csb.client.ListObjects(csb.bucket)
+ if err != nil {
+ return nil, err
+ }
+ ret := []storedObject{}
+ for _, obj := range list {
+ ret = append(ret, storedObject{
+ path: obj.Path,
+ createdAt: obj.CreatedAt,
+ })
+ }
+ return ret, nil
+}
+
+func (csb *cloudStorageBackend) remove(path string) error {
+ path = fmt.Sprintf("%s/%s", csb.bucket, path)
+ err := csb.client.DeleteFile(path)
+ if err == gcs.ErrFileNotFound {
+ return ErrAssetDoesNotExist
+ }
+ return err
+}
diff --git a/pkg/asset/config.go b/pkg/asset/config.go
new file mode 100644
index 000000000..ea6013df1
--- /dev/null
+++ b/pkg/asset/config.go
@@ -0,0 +1,75 @@
+// Copyright 2022 syzkaller project authors. All rights reserved.
+// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
+
+package asset
+
+import (
+ "fmt"
+ "strings"
+
+ "github.com/google/syzkaller/dashboard/dashapi"
+)
+
+type Config struct {
+ // Debug mode forces syz-ci upload artifacts on each syz-manager restart and also forces
+ // it to produce more logs.
+ Debug bool `json:"debug"`
+ // Where to upload artifacts.
+ // If "gs://bucket/" is specified, assets will be stored in the corresponding GCS bucket.
+ // If "dummpy://" is specified, assets will not be actually stored anywhere. May be helpful
+ // for debugging.
+ UploadTo string `json:"upload_to"`
+ // Perform asset deprecation from this instance. If several syz-ci's share a common stoage,
+ // it make sense to enable derprecation only on one of them.
+ DoDeprecation bool `json:"do_deprecation"`
+ // Make assets publicly available (note that it also might require special configuration
+ // on the storage backend's side).
+ PublicAccess bool `json:"public_access"`
+ // Which assets need to be uploaded.
+ Assets map[dashapi.AssetType]TypeConfig `json:"assets"`
+}
+
+type TypeConfig struct {
+ Never bool `json:"never"`
+ // TODO: in future there'll also be `OnlyOn` and `NeverOn`, but so far we don't really need that.
+ // TODO: here will also go compression settings, should we ever want to make it configurable.
+}
+
+func (tc *TypeConfig) Validate() error {
+ return nil
+}
+
+func (c *Config) IsEnabled(assetType dashapi.AssetType) bool {
+ return !c.Assets[assetType].Never
+}
+
+func (c *Config) IsEmpty() bool {
+ return c == nil
+}
+
+func (c *Config) Validate() error {
+ for assetType, cfg := range c.Assets {
+ if GetTypeDescription(assetType) == nil {
+ return fmt.Errorf("invalid asset type: %s", assetType)
+ }
+ if err := cfg.Validate(); err != nil {
+ return fmt.Errorf("invalid config for %s: %w", assetType, err)
+ }
+ }
+ if c.UploadTo == "" && len(c.Assets) != 0 {
+ return fmt.Errorf("assets are specified, but upload_to is empty")
+ }
+ allowedFormats := []string{"gs://", "dummy://"}
+ if c.UploadTo != "" {
+ any := false
+ for _, prefix := range allowedFormats {
+ if strings.HasPrefix(c.UploadTo, prefix) {
+ any = true
+ }
+ }
+ if !any {
+ return fmt.Errorf("the currently supported upload destinations are: %v", allowedFormats)
+ }
+ }
+ return nil
+}
diff --git a/pkg/asset/storage.go b/pkg/asset/storage.go
new file mode 100644
index 000000000..95e7a236b
--- /dev/null
+++ b/pkg/asset/storage.go
@@ -0,0 +1,368 @@
+// Copyright 2022 syzkaller project authors. All rights reserved.
+// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
+
+package asset
+
+import (
+ "compress/gzip"
+ "crypto/sha256"
+ "errors"
+ "fmt"
+ "io"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/google/syzkaller/dashboard/dashapi"
+ "github.com/google/syzkaller/pkg/debugtracer"
+ "github.com/google/syzkaller/pkg/osutil"
+)
+
+type Storage struct {
+ cfg *Config
+ backend StorageBackend
+ dash *dashapi.Dashboard
+ tracer debugtracer.DebugTracer
+}
+
+func StorageFromConfig(cfg *Config, dash *dashapi.Dashboard) (*Storage, error) {
+ if dash == nil {
+ return nil, fmt.Errorf("dashboard api instance is necessary")
+ }
+ tracer := debugtracer.DebugTracer(&debugtracer.NullTracer{})
+ if cfg.Debug {
+ tracer = &debugtracer.GenericTracer{
+ WithTime: true,
+ TraceWriter: os.Stdout,
+ }
+ }
+ var backend StorageBackend
+ if strings.HasPrefix(cfg.UploadTo, "gs://") {
+ var err error
+ backend, err = makeCloudStorageBackend(strings.TrimPrefix(cfg.UploadTo, "gs://"), tracer)
+ if err != nil {
+ return nil, fmt.Errorf("the call to MakeCloudStorageBackend failed: %w", err)
+ }
+ } else if strings.HasPrefix(cfg.UploadTo, "dummy://") {
+ backend = makeDummyStorageBackend()
+ } else {
+ return nil, fmt.Errorf("unknown UploadTo during StorageFromConfig(): %#v", cfg.UploadTo)
+ }
+ return &Storage{
+ cfg: cfg,
+ backend: backend,
+ dash: dash,
+ tracer: tracer,
+ }, nil
+}
+
+func (storage *Storage) AssetTypeEnabled(assetType dashapi.AssetType) bool {
+ return storage.cfg.IsEnabled(assetType)
+}
+
+func (storage *Storage) getDefaultCompressor() Compressor {
+ if xzAvailable() {
+ return xzCompressor
+ }
+ return gzipCompressor
+}
+
+var ErrAssetTypeDisabled = errors.New("uploading assets of this type is disabled")
+
+func (storage *Storage) uploadFileStream(reader io.Reader, assetType dashapi.AssetType,
+ name string) (string, error) {
+ if name == "" {
+ return "", fmt.Errorf("file name is not specified")
+ }
+ typeDescr := GetTypeDescription(assetType)
+ if typeDescr == nil {
+ return "", fmt.Errorf("asset type %s is unknown", assetType)
+ }
+ if !storage.AssetTypeEnabled(assetType) {
+ return "", fmt.Errorf("not allowed to upload an asset of type %s: %w",
+ assetType, ErrAssetTypeDisabled)
+ }
+ // The idea is to make a file name useful and yet unique.
+ // So we put a file to a pseudo-unique "folder".
+ const folderPrefix = 6
+ folderName := sha256.Sum256([]byte(fmt.Sprintf("%v", time.Now().UnixNano())))
+ path := fmt.Sprintf("%x/%s", folderName[0:folderPrefix], name)
+ // TODO: allow an optional argument that would replace this "random" prefix.
+ // This should prevent the duplication of a number of assets (if the caller
+ // passes e.g. a file hash).
+ req := &uploadRequest{
+ savePath: path,
+ contentType: typeDescr.ContentType,
+ preserveExtension: typeDescr.preserveExtension,
+ }
+ if req.contentType == "" {
+ req.contentType = "application/octet-stream"
+ }
+ compressor := storage.getDefaultCompressor()
+ if typeDescr.customCompressor != nil {
+ compressor = typeDescr.customCompressor
+ }
+ res, err := compressor(req, storage.backend.upload)
+ if err != nil {
+ return "", fmt.Errorf("failed to query writer: %w", err)
+ }
+ written, err := io.Copy(res.writer, reader)
+ if err != nil {
+ more := ""
+ closeErr := res.writer.Close()
+ if exiterr, ok := closeErr.(*exec.ExitError); ok {
+ more = fmt.Sprintf(", process state '%s'", exiterr.ProcessState)
+ }
+ return "", fmt.Errorf("failed to redirect byte stream: copied %d bytes, error %w%s",
+ written, err, more)
+ }
+ err = res.writer.Close()
+ if err != nil {
+ return "", fmt.Errorf("failed to close writer: %w", err)
+ }
+ return storage.backend.downloadURL(res.path, storage.cfg.PublicAccess)
+}
+
+func (storage *Storage) UploadBuildAsset(reader io.Reader, fileName string, assetType dashapi.AssetType,
+ build *dashapi.Build) (dashapi.NewAsset, error) {
+ const commitPrefix = 8
+ commit := build.KernelCommit
+ if len(commit) > commitPrefix {
+ commit = commit[:commitPrefix]
+ }
+ baseName := filepath.Base(fileName)
+ fileExt := filepath.Ext(baseName)
+ name := fmt.Sprintf("%s-%s%s",
+ strings.TrimSuffix(baseName, fileExt),
+ commit,
+ fileExt)
+ url, err := storage.uploadFileStream(reader, assetType, name)
+ if err != nil {
+ return dashapi.NewAsset{}, err
+ }
+ return dashapi.NewAsset{
+ Type: assetType,
+ DownloadURL: url,
+ }, nil
+}
+func (storage *Storage) ReportBuildAssets(build *dashapi.Build, assets ...dashapi.NewAsset) error {
+ // If the server denies the reques, we'll delete the orphaned file during deprecated files
+ // deletion later.
+ return storage.dash.AddBuildAssets(&dashapi.AddBuildAssetsReq{
+ BuildID: build.ID,
+ Assets: assets,
+ })
+}
+
+var ErrAssetDoesNotExist = errors.New("the asset did not exist")
+
+const deletionEmbargo = time.Hour * 24 * 7
+
+// Best way: convert download URLs to paths.
+// We don't want to risk killing all assets after a slight domain change.
+func (storage *Storage) DeprecateAssets() error {
+ resp, err := storage.dash.NeededAssetsList()
+ if err != nil {
+ return fmt.Errorf("failed to query needed assets: %w", err)
+ }
+ needed := map[string]bool{}
+ for _, url := range resp.DownloadURLs {
+ path, err := storage.backend.getPath(url)
+ if err != nil {
+ // If we failed to parse just one URL, let's stop the entire process.
+ // Otherwise we'll start deleting still needed files we couldn't recognize.
+ return fmt.Errorf("failed to parse '%s': %w", url, err)
+ }
+ needed[path] = true
+ }
+ storage.tracer.Log("queried needed assets: %#v", needed)
+ existing, err := storage.backend.list()
+ if err != nil {
+ return fmt.Errorf("failed to query object list: %w", err)
+ }
+ toDelete := []string{}
+ intersection := 0
+ for _, obj := range existing {
+ keep := false
+ if time.Since(obj.createdAt) < deletionEmbargo {
+ // To avoid races between object upload and object deletion, we don't delete
+ // newly uploaded files for a while after they're uploaded.
+ keep = true
+ }
+ if val, ok := needed[obj.path]; ok && val {
+ keep = true
+ intersection++
+ }
+ storage.tracer.Log("-- object %v, %v: keep %t", obj.path, obj.createdAt, keep)
+ if !keep {
+ toDelete = append(toDelete, obj.path)
+ }
+ }
+ const intersectionCheckCutOff = 4
+ if len(existing) > intersectionCheckCutOff && intersection == 0 {
+ // This is a last-resort protection against possible dashboard bugs.
+ // If the needed assets have no intersection with the existing assets,
+ // don't delete anything. Otherwise, if it was a bug, we will lose all files.
+ return fmt.Errorf("needed assets have almost no intersection with the existing ones")
+ }
+ for _, path := range toDelete {
+ err := storage.backend.remove(path)
+ storage.tracer.Log("-- deleted %v: %v", path, err)
+ // Several syz-ci's might be sharing the same storage. So let's tolerate
+ // races during file deletion.
+ if err != nil && err != ErrAssetDoesNotExist {
+ return fmt.Errorf("asset deletion failure: %w", err)
+ }
+ }
+ return nil
+}
+
+type uploadRequest struct {
+ savePath string
+ contentEncoding string
+ contentType string
+ preserveExtension bool
+}
+
+type uploadResponse struct {
+ path string
+ writer io.WriteCloser
+}
+
+type storedObject struct {
+ path string
+ createdAt time.Time
+}
+
+type StorageBackend interface {
+ upload(req *uploadRequest) (*uploadResponse, error)
+ list() ([]storedObject, error)
+ remove(path string) error
+ downloadURL(path string, publicURL bool) (string, error)
+ getPath(url string) (string, error)
+}
+
+type Compressor func(req *uploadRequest,
+ next func(req *uploadRequest) (*uploadResponse, error)) (*uploadResponse, error)
+
+var xzPresent bool
+var xzCheck sync.Once
+
+const xzCompressionRatio = 0
+const xzThreadsCount = 6
+
+// TODO: switch to an xz library, so that we don't have to run commands.
+// Then, it'd probably be easier to wrap "writer" instead of "reader".
+
+func xzAvailable() bool {
+ xzCheck.Do(func() {
+ _, err := osutil.RunCmd(time.Minute, "", "xz --version")
+ xzPresent = err != nil
+ })
+ return xzPresent
+}
+
+func xzCompressor(req *uploadRequest,
+ next func(req *uploadRequest) (*uploadResponse, error)) (*uploadResponse, error) {
+ newReq := *req
+ if !req.preserveExtension {
+ newReq.savePath = fmt.Sprintf("%s.xz", newReq.savePath)
+ }
+ // "gz" contentEncoding is not really supported so far, so let's just set contentType.
+ if newReq.contentType == "" {
+ newReq.contentType = "application/x-xz"
+ }
+ resp, err := next(&newReq)
+ if err != nil {
+ return nil, err
+ }
+ // Take source data from stdin, write compressed data to stdout.
+ cmd := osutil.Command("xz", fmt.Sprintf("-%d", xzCompressionRatio),
+ "-T", fmt.Sprintf("%d", xzThreadsCount), "-F", "xz", "-c")
+ stdinWriter, err := cmd.StdinPipe()
+ if err != nil {
+ return nil, fmt.Errorf("failed to create stdout pipe: %w", err)
+ }
+ cmd.Stdout = resp.writer
+ err = cmd.Start()
+ if err != nil {
+ return nil, fmt.Errorf("xz preprocess: command start failed: %w", err)
+ }
+ return &uploadResponse{
+ path: resp.path,
+ writer: &wrappedWriteCloser{
+ writer: stdinWriter,
+ closeCallback: func() error {
+ // Once the writer which we return is closed, we want to
+ // also close the writer we're proxying.
+ err := cmd.Wait()
+ err2 := resp.writer.Close()
+ if err != nil {
+ return err
+ }
+ if err2 != nil {
+ return err2
+ }
+ return nil
+ },
+ },
+ }, nil
+}
+
+const gzipCompressionRatio = 4
+
+// This struct allows to attach a callback on the Close() method invocation of
+// an existing io.WriteCloser. Also, it can convert an io.Writer to an io.WriteCloser.
+type wrappedWriteCloser struct {
+ writer io.Writer
+ closeCallback func() error
+}
+
+func (wwc *wrappedWriteCloser) Write(p []byte) (int, error) {
+ return wwc.writer.Write(p)
+}
+
+func (wwc *wrappedWriteCloser) Close() error {
+ var err error
+ closer, ok := wwc.writer.(io.Closer)
+ if ok {
+ err = closer.Close()
+ }
+ err2 := wwc.closeCallback()
+ if err != nil {
+ return err
+ } else if err2 != nil {
+ return err2
+ }
+ return nil
+}
+
+func gzipCompressor(req *uploadRequest,
+ next func(req *uploadRequest) (*uploadResponse, error)) (*uploadResponse, error) {
+ newReq := *req
+ if !req.preserveExtension {
+ newReq.savePath = fmt.Sprintf("%s.gz", newReq.savePath)
+ }
+ newReq.contentEncoding = "gzip"
+ resp, err := next(&newReq)
+ if err != nil {
+ return nil, err
+ }
+ gzip, err := gzip.NewWriterLevel(resp.writer, gzipCompressionRatio)
+ if err != nil {
+ resp.writer.Close()
+ return nil, err
+ }
+ return &uploadResponse{
+ path: resp.path,
+ writer: &wrappedWriteCloser{
+ writer: gzip,
+ closeCallback: func() error {
+ return resp.writer.Close()
+ },
+ },
+ }, nil
+}
diff --git a/pkg/asset/storage_test.go b/pkg/asset/storage_test.go
new file mode 100644
index 000000000..4f5ddd68d
--- /dev/null
+++ b/pkg/asset/storage_test.go
@@ -0,0 +1,325 @@
+// Copyright 2022 syzkaller project authors. All rights reserved.
+// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
+
+package asset
+
+import (
+ "bytes"
+ "compress/gzip"
+ "errors"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "reflect"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/google/syzkaller/dashboard/dashapi"
+ "github.com/google/syzkaller/pkg/debugtracer"
+ "github.com/google/syzkaller/pkg/osutil"
+)
+
+type addBuildAssetCallback func(obj dashapi.NewAsset) error
+
+type dashMock struct {
+ downloadURLs map[string]bool
+ addBuildAsset addBuildAssetCallback
+}
+
+func newDashMock() *dashMock {
+ return &dashMock{downloadURLs: map[string]bool{}}
+}
+
+func (dm *dashMock) do(method string, req, mockReply interface{}) error {
+ switch method {
+ case "add_build_assets":
+ addBuildAssets := req.(*dashapi.AddBuildAssetsReq)
+ for _, obj := range addBuildAssets.Assets {
+ if dm.addBuildAsset != nil {
+ if err := dm.addBuildAsset(obj); err != nil {
+ return err
+ }
+ }
+ dm.downloadURLs[obj.DownloadURL] = true
+ }
+ return nil
+ case "needed_assets":
+ resp := mockReply.(*dashapi.NeededAssetsResp)
+ for url := range dm.downloadURLs {
+ resp.DownloadURLs = append(resp.DownloadURLs, url)
+ }
+ return nil
+ }
+ return nil
+}
+
+func (dm *dashMock) getDashapi() *dashapi.Dashboard {
+ return dashapi.NewMock(dm.do)
+}
+
+func makeStorage(t *testing.T, dash *dashapi.Dashboard) (*Storage, *dummyStorageBackend) {
+ be := makeDummyStorageBackend()
+ cfg := &Config{
+ UploadTo: "dummy://test",
+ }
+ return &Storage{
+ dash: dash,
+ cfg: cfg,
+ backend: be,
+ tracer: &debugtracer.TestTracer{T: t},
+ }, be
+}
+
+func validateGzip(res *uploadedFile, expected []byte) error {
+ if res == nil {
+ return fmt.Errorf("no file was uploaded")
+ }
+ reader, err := gzip.NewReader(bytes.NewReader(res.bytes))
+ if err != nil {
+ return fmt.Errorf("gzip.NewReader failed: %w", err)
+ }
+ defer reader.Close()
+ body, err := ioutil.ReadAll(reader)
+ if err != nil {
+ return fmt.Errorf("read of ungzipped content failed: %w", err)
+ }
+ if !reflect.DeepEqual(body, expected) {
+ return fmt.Errorf("decompressed: %#v, expected: %#v", body, expected)
+ }
+ return nil
+}
+
+func validateXz(res *uploadedFile, expected []byte) error {
+ if res == nil {
+ return fmt.Errorf("no file was uploaded")
+ }
+ xzAvailable := xzAvailable()
+ xzUsed := strings.HasSuffix(res.req.savePath, ".xz")
+ if xzAvailable && !xzUsed {
+ return fmt.Errorf("xz was available, but didn't get used")
+ }
+ if !xzUsed {
+ return validateGzip(res, expected)
+ }
+ cmd := osutil.Command("xz", "--decompress", "--to-stdout")
+ cmd.Stdin = bytes.NewReader(res.bytes)
+ out, err := osutil.Run(time.Minute, cmd)
+ if err != nil {
+ return fmt.Errorf("xz invocation failed: %v", err)
+ }
+ if !reflect.DeepEqual(out, expected) {
+ return fmt.Errorf("decompressed: %#v, expected: %#v", out, expected)
+ }
+ return nil
+}
+
+func (storage *Storage) sendBuildAsset(reader io.Reader, fileName string, assetType dashapi.AssetType,
+ build *dashapi.Build) error {
+ asset, err := storage.UploadBuildAsset(reader, fileName, assetType, build)
+ if err != nil {
+ return err
+ }
+ return storage.ReportBuildAssets(build, asset)
+}
+
+func TestUploadBuildAsset(t *testing.T) {
+ dashMock := newDashMock()
+ storage, be := makeStorage(t, dashMock.getDashapi())
+ be.currentTime = time.Now().Add(-2 * deletionEmbargo)
+ build := &dashapi.Build{ID: "1234", KernelCommit: "abcdef2134"}
+
+ // Upload two assets using different means.
+ vmLinuxContent := []byte{0xDE, 0xAD, 0xBE, 0xEF}
+ dashMock.addBuildAsset = func(newAsset dashapi.NewAsset) error {
+ if newAsset.Type != dashapi.KernelObject {
+ t.Fatalf("expected KernelObject, got %v", newAsset.Type)
+ }
+ if !strings.Contains(newAsset.DownloadURL, "vmlinux") {
+ t.Fatalf("%#v was expected to mention vmlinux", newAsset.DownloadURL)
+ }
+ return nil
+ }
+ var file *uploadedFile
+ be.objectUpload = collectBytes(&file)
+ err := storage.sendBuildAsset(bytes.NewReader(vmLinuxContent), "vmlinux",
+ dashapi.KernelObject, build)
+ if err != nil {
+ t.Fatalf("file upload failed: %s", err)
+ }
+ if err := validateXz(file, vmLinuxContent); err != nil {
+ t.Fatalf("vmlinux validation failed: %s", err)
+ }
+ // Upload the same file the second time.
+ storage.sendBuildAsset(bytes.NewReader(vmLinuxContent), "vmlinux", dashapi.KernelObject, build)
+ // The currently expected behavior is that it will be uploaded twice and will have
+ // different names.
+ if len(dashMock.downloadURLs) < 2 {
+ t.Fatalf("same-file upload was expected to succeed, but it didn't; %#v", dashMock.downloadURLs)
+ }
+
+ diskImageContent := []byte{0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8}
+ dashMock.addBuildAsset = func(newAsset dashapi.NewAsset) error {
+ if newAsset.Type != dashapi.KernelImage {
+ t.Fatalf("expected KernelImage, got %v", newAsset.Type)
+ }
+ if !strings.Contains(newAsset.DownloadURL, "disk") ||
+ !strings.Contains(newAsset.DownloadURL, ".img") {
+ t.Fatalf("%#v was expected to mention disk.img", newAsset.DownloadURL)
+ }
+ if !strings.Contains(newAsset.DownloadURL, build.KernelCommit[:6]) {
+ t.Fatalf("%#v was expected to mention build commit", newAsset.DownloadURL)
+ }
+ return nil
+ }
+ file = nil
+ be.objectUpload = collectBytes(&file)
+ storage.sendBuildAsset(bytes.NewReader(diskImageContent), "disk.img", dashapi.KernelImage, build)
+ if err := validateXz(file, diskImageContent); err != nil {
+ t.Fatalf("disk.img validation failed: %s", err)
+ }
+
+ allUrls := []string{}
+ for url := range dashMock.downloadURLs {
+ allUrls = append(allUrls, url)
+ }
+ if len(allUrls) != 3 {
+ t.Fatalf("invalid dashMock state: expected 3 assets, got %d", len(allUrls))
+ }
+ // First try to remove two assets.
+ dashMock.downloadURLs = map[string]bool{allUrls[2]: true, "http://non-related-asset.com/abcd": true}
+
+ // Pretend there's an asset deletion error.
+ be.objectRemove = func(string) error { return fmt.Errorf("not now") }
+ err = storage.DeprecateAssets()
+ if err == nil {
+ t.Fatalf("DeprecateAssets() should have failed")
+ }
+
+ // Let the deletion be successful.
+ be.objectRemove = nil
+ err = storage.DeprecateAssets()
+ if err != nil {
+ t.Fatalf("DeprecateAssets() was expected to be successful, got %s", err)
+ }
+ path, err := be.getPath(allUrls[2])
+ if err != nil {
+ t.Fatalf("getPath failed: %s", err)
+ }
+ err = be.hasOnly([]string{path})
+ if err != nil {
+ t.Fatalf("after first DeprecateAssets(): %s", err)
+ }
+
+ // Delete the rest.
+ dashMock.downloadURLs = map[string]bool{}
+ err = storage.DeprecateAssets()
+ if err != nil || len(be.objects) != 0 {
+ t.Fatalf("second DeprecateAssets() failed: %s, len %d",
+ err, len(be.objects))
+ }
+}
+
+type uploadedFile struct {
+ req uploadRequest
+ bytes []byte
+}
+
+func collectBytes(saveTo **uploadedFile) objectUploadCallback {
+ return func(req *uploadRequest) (*uploadResponse, error) {
+ buf := &bytes.Buffer{}
+ wwc := &wrappedWriteCloser{
+ writer: buf,
+ closeCallback: func() error {
+ *saveTo = &uploadedFile{req: *req, bytes: buf.Bytes()}
+ return nil
+ },
+ }
+ return &uploadResponse{path: req.savePath, writer: wwc}, nil
+ }
+}
+
+func TestUploadHtmlAsset(t *testing.T) {
+ dashMock := newDashMock()
+ storage, be := makeStorage(t, dashMock.getDashapi())
+ build := &dashapi.Build{ID: "1234", KernelCommit: "abcdef2134"}
+ htmlContent := []byte("<html><head><title>Hi!</title></head></html>")
+ dashMock.addBuildAsset = func(newAsset dashapi.NewAsset) error {
+ if newAsset.Type != dashapi.HTMLCoverageReport {
+ t.Fatalf("expected HtmlCoverageReport, got %v", newAsset.Type)
+ }
+ if !strings.Contains(newAsset.DownloadURL, "cover_report") {
+ t.Fatalf("%#v was expected to mention cover_report", newAsset.DownloadURL)
+ }
+ if !strings.HasSuffix(newAsset.DownloadURL, ".html") {
+ t.Fatalf("%#v was expected to have .html extension", newAsset.DownloadURL)
+ }
+ return nil
+ }
+ var file *uploadedFile
+ be.objectUpload = collectBytes(&file)
+ storage.sendBuildAsset(bytes.NewReader(htmlContent), "cover_report.html",
+ dashapi.HTMLCoverageReport, build)
+ if err := validateGzip(file, htmlContent); err != nil {
+ t.Fatalf("cover_report.html validation failed: %s", err)
+ }
+}
+
+func TestRecentAssetDeletionProtection(t *testing.T) {
+ dashMock := newDashMock()
+ storage, be := makeStorage(t, dashMock.getDashapi())
+ build := &dashapi.Build{ID: "1234", KernelCommit: "abcdef2134"}
+ htmlContent := []byte("<html><head><title>Hi!</title></head></html>")
+ be.currentTime = time.Now().Add(-time.Hour * 24 * 6)
+ err := storage.sendBuildAsset(bytes.NewReader(htmlContent), "cover_report.html",
+ dashapi.HTMLCoverageReport, build)
+ if err != nil {
+ t.Fatalf("failed to upload a file: %v", err)
+ }
+
+ // Try to delete a recent file.
+ dashMock.downloadURLs = map[string]bool{}
+ err = storage.DeprecateAssets()
+ if err != nil {
+ t.Fatalf("DeprecateAssets failed: %v", err)
+ } else if len(be.objects) == 0 {
+ t.Fatalf("a recent object was deleted: %v", err)
+ }
+}
+
+func TestAssetStorageConfiguration(t *testing.T) {
+ dashMock := newDashMock()
+ cfg := &Config{
+ UploadTo: "dummy://",
+ Assets: map[dashapi.AssetType]TypeConfig{
+ dashapi.HTMLCoverageReport: {Never: true},
+ dashapi.KernelObject: {},
+ },
+ }
+ storage, err := StorageFromConfig(cfg, dashMock.getDashapi())
+ if err != nil {
+ t.Fatalf("unexpected error from StorageFromConfig: %s", err)
+ }
+ build := &dashapi.Build{ID: "1234", KernelCommit: "abcdef2134"}
+
+ // Uploading a file of a disabled asset type.
+ htmlContent := []byte("<html><head><title>Hi!</title></head></html>")
+ err = storage.sendBuildAsset(bytes.NewReader(htmlContent), "cover_report.html",
+ dashapi.HTMLCoverageReport, build)
+ if !errors.Is(err, ErrAssetTypeDisabled) {
+ t.Fatalf("UploadBuildAssetStream expected to fail with ErrAssetTypeDisabled, but got %v", err)
+ }
+
+ // Uploading a file of an unspecified asset type.
+ testContent := []byte{0x1, 0x2, 0x3, 0x4}
+ err = storage.sendBuildAsset(bytes.NewReader(testContent), "disk.raw", dashapi.BootableDisk, build)
+ if err != nil {
+ t.Fatalf("UploadBuildAssetStream of BootableDisk expected to succeed, got %v", err)
+ }
+
+ // Uploading a file of a specified asset type.
+ err = storage.sendBuildAsset(bytes.NewReader(testContent), "vmlinux", dashapi.KernelObject, build)
+ if err != nil {
+ t.Fatalf("UploadBuildAssetStream of BootableDisk expected to succeed, got %v", err)
+ }
+}
diff --git a/pkg/asset/type.go b/pkg/asset/type.go
index 1f3f2f220..1fd218da8 100644
--- a/pkg/asset/type.go
+++ b/pkg/asset/type.go
@@ -3,35 +3,61 @@
package asset
-import "github.com/google/syzkaller/sys/targets"
+import (
+ "github.com/google/syzkaller/dashboard/dashapi"
+ "github.com/google/syzkaller/sys/targets"
+)
-type Type string
+type TypeDescription struct {
+ AllowMultiple bool
+ GetTitle QueryTypeTitle
+ ContentType string
+ ReportingPrio int // the smaller, the higher the asset is on the list during reporting
+ NoReporting bool
+ customCompressor Compressor
+ preserveExtension bool
+}
-// Asset types used throughout the system.
-const (
- BootableDisk Type = "bootable_disk"
- NonBootableDisk Type = "non_bootable_disk"
- KernelObject Type = "kernel_object"
- KernelImage Type = "kernel_image"
- HTMLCoverageReport Type = "html_coverage_report"
-)
+var assetTypes = map[dashapi.AssetType]*TypeDescription{
+ dashapi.BootableDisk: {
+ GetTitle: constTitle("disk image"),
+ ReportingPrio: 1,
+ },
+ dashapi.NonBootableDisk: {
+ GetTitle: constTitle("disk image (non-bootable)"),
+ ReportingPrio: 2,
+ },
+ dashapi.KernelObject: {
+ GetTitle: func(target *targets.Target) string {
+ if target != nil && target.KernelObject != "" {
+ return target.KernelObject
+ }
+ return "kernel object"
+ },
+ ReportingPrio: 3,
+ },
+ dashapi.KernelImage: {
+ GetTitle: constTitle("kernel image"),
+ ReportingPrio: 4,
+ },
+ dashapi.HTMLCoverageReport: {
+ GetTitle: constTitle("coverage report(html)"),
+ AllowMultiple: true,
+ ContentType: "text/html",
+ NoReporting: true,
+ customCompressor: gzipCompressor,
+ preserveExtension: true,
+ },
+}
-func GetHumanReadableName(assetType Type, target *targets.Target) string {
- switch assetType {
- case BootableDisk:
- return "disk image"
- case NonBootableDisk:
- return "disk image (non-bootable)"
- case KernelImage:
- return "kernel image"
- case KernelObject:
- if target != nil && target.KernelObject != "" {
- return target.KernelObject
- }
- return "kernel object"
- case HTMLCoverageReport:
- return "coverage report (html)"
- default:
- panic("invalid asset type: " + assetType)
+type QueryTypeTitle func(*targets.Target) string
+
+func constTitle(title string) QueryTypeTitle {
+ return func(*targets.Target) string {
+ return title
}
}
+
+func GetTypeDescription(assetType dashapi.AssetType) *TypeDescription {
+ return assetTypes[assetType]
+}
diff --git a/syz-ci/manager.go b/syz-ci/manager.go
index c2ebd2bb3..182bfec1e 100644
--- a/syz-ci/manager.go
+++ b/syz-ci/manager.go
@@ -16,6 +16,7 @@ import (
"time"
"github.com/google/syzkaller/dashboard/dashapi"
+ "github.com/google/syzkaller/pkg/asset"
"github.com/google/syzkaller/pkg/build"
"github.com/google/syzkaller/pkg/config"
"github.com/google/syzkaller/pkg/gcs"
@@ -62,27 +63,32 @@ func init() {
// Manager represents a single syz-manager instance.
// Handles kernel polling, image rebuild and manager process management.
// As syzkaller builder, it maintains 2 builds:
-// - latest: latest known good kernel build
-// - current: currently used kernel build
+// - latest: latest known good kernel build
+// - current: currently used kernel build
type Manager struct {
- name string
- workDir string
- kernelDir string
- currentDir string
- latestDir string
- configTag string
- configData []byte
- cfg *Config
- repo vcs.Repo
- mgrcfg *ManagerConfig
- managercfg *mgrconfig.Config
- cmd *ManagerCmd
- dash *dashapi.Dashboard
- stop chan struct{}
- debug bool
+ name string
+ workDir string
+ kernelDir string
+ currentDir string
+ latestDir string
+ configTag string
+ configData []byte
+ cfg *Config
+ repo vcs.Repo
+ mgrcfg *ManagerConfig
+ managercfg *mgrconfig.Config
+ cmd *ManagerCmd
+ dash *dashapi.Dashboard
+ debugStorage bool
+ storage *asset.Storage
+ stop chan struct{}
+ debug bool
+ lastBuild *dashapi.Build
+ buildUpdated bool
}
-func createManager(cfg *Config, mgrcfg *ManagerConfig, stop chan struct{}, debug bool) (*Manager, error) {
+func createManager(cfg *Config, mgrcfg *ManagerConfig, stop chan struct{},
+ debug bool) (*Manager, error) {
dir := osutil.Abs(filepath.Join("managers", mgrcfg.Name))
err := osutil.MkdirAll(dir)
if err != nil {
@@ -99,7 +105,13 @@ func createManager(cfg *Config, mgrcfg *ManagerConfig, stop chan struct{}, debug
return nil, err
}
}
-
+ var assetStorage *asset.Storage
+ if !cfg.AssetStorage.IsEmpty() {
+ assetStorage, err = asset.StorageFromConfig(cfg.AssetStorage, dash)
+ if err != nil {
+ log.Fatalf("failed to create asset storage: %v", err)
+ }
+ }
var configData []byte
if mgrcfg.KernelConfig != "" {
if configData, err = ioutil.ReadFile(mgrcfg.KernelConfig); err != nil {
@@ -113,20 +125,22 @@ func createManager(cfg *Config, mgrcfg *ManagerConfig, stop chan struct{}, debug
}
mgr := &Manager{
- name: mgrcfg.managercfg.Name,
- workDir: filepath.Join(dir, "workdir"),
- kernelDir: kernelDir,
- currentDir: filepath.Join(dir, "current"),
- latestDir: filepath.Join(dir, "latest"),
- configTag: hash.String(configData),
- configData: configData,
- cfg: cfg,
- repo: repo,
- mgrcfg: mgrcfg,
- managercfg: mgrcfg.managercfg,
- dash: dash,
- stop: stop,
- debug: debug,
+ name: mgrcfg.managercfg.Name,
+ workDir: filepath.Join(dir, "workdir"),
+ kernelDir: kernelDir,
+ currentDir: filepath.Join(dir, "current"),
+ latestDir: filepath.Join(dir, "latest"),
+ configTag: hash.String(configData),
+ configData: configData,
+ cfg: cfg,
+ repo: repo,
+ mgrcfg: mgrcfg,
+ managercfg: mgrcfg.managercfg,
+ dash: dash,
+ storage: assetStorage,
+ debugStorage: !cfg.AssetStorage.IsEmpty() && cfg.AssetStorage.Debug,
+ stop: stop,
+ debug: debug,
}
os.RemoveAll(mgr.currentDir)
@@ -169,10 +183,8 @@ loop:
}
if !artifactUploadTime.IsZero() && time.Now().After(artifactUploadTime) {
artifactUploadTime = time.Time{}
- if mgr.managercfg.Cover && mgr.cfg.CoverUploadPath != "" {
- if err := mgr.uploadCoverReport(); err != nil {
- mgr.Errorf("failed to upload cover report: %v", err)
- }
+ if err := mgr.uploadCoverReport(); err != nil {
+ mgr.Errorf("failed to upload cover report: %v", err)
}
if mgr.cfg.CorpusUploadPath != "" {
if err := mgr.uploadCorpus(); err != nil {
@@ -234,6 +246,7 @@ func (mgr *Manager) pollAndBuild(lastCommit string, latestInfo *BuildInfo) (
if latestInfo == nil {
mgr.Errorf("failed to read build info after build")
}
+ mgr.buildUpdated = true
}
<-kernelBuildSem
case <-mgr.stop:
@@ -461,6 +474,16 @@ func (mgr *Manager) reportBuildError(rep *report.Report, info *BuildInfo, imageD
if err != nil {
return err
}
+ if mgr.storage != nil {
+ // We have to send assets together with the other info because the report
+ // might be generated immediately.
+ uploadedAssets, err := mgr.uploadBuildAssets(build, imageDir)
+ if err == nil {
+ build.Assets = uploadedAssets
+ } else {
+ log.Logf(0, "%v: failed to upload build assets: %s", mgr.name, err)
+ }
+ }
req := &dashapi.BuildErrorReq{
Build: *build,
Crash: dashapi.Crash{
@@ -472,7 +495,10 @@ func (mgr *Manager) reportBuildError(rep *report.Report, info *BuildInfo, imageD
Report: rep.Report,
},
}
- return mgr.dash.ReportBuildError(req)
+ if err := mgr.dash.ReportBuildError(req); err != nil {
+ return err
+ }
+ return nil
}
func (mgr *Manager) createTestConfig(imageDir string, info *BuildInfo) (*mgrconfig.Config, error) {
@@ -545,6 +571,7 @@ func (mgr *Manager) uploadBuild(info *BuildInfo, imageDir string) (string, error
if err != nil {
return "", err
}
+ mgr.lastBuild = build
commitTitles, fixCommits, err := mgr.pollCommits(info.KernelCommit)
if err != nil {
// This is not critical for operation.
@@ -552,6 +579,19 @@ func (mgr *Manager) uploadBuild(info *BuildInfo, imageDir string) (string, error
}
build.Commits = commitTitles
build.FixCommits = fixCommits
+ // Normally we have no reason to upload build artifacts every restart - we don't rebuild
+ // kernels that often. So we only do that after we have built them. There's only one small
+ // exception - for debugging purposes it's better to upload more than to rebuild more.
+ assetUploadNeeded := mgr.debugStorage || mgr.buildUpdated
+ if mgr.storage != nil && assetUploadNeeded {
+ mgr.buildUpdated = false
+ assets, err := mgr.uploadBuildAssets(build, imageDir)
+ if err != nil {
+ mgr.Errorf("failed to upload build assets: %v", err)
+ return "", err
+ }
+ build.Assets = assets
+ }
if err := mgr.dash.UploadBuild(build); err != nil {
return "", err
}
@@ -634,7 +674,81 @@ func (mgr *Manager) pollCommits(buildCommit string) ([]string, []dashapi.Commit,
return present, fixCommits, nil
}
+func (mgr *Manager) uploadBuildAssets(build *dashapi.Build, assetFolder string) ([]dashapi.NewAsset, error) {
+ if mgr.storage == nil {
+ // No reason to continue anyway.
+ return nil, fmt.Errorf("asset storage is not configured")
+ }
+ type pendingAsset struct {
+ path string
+ assetType dashapi.AssetType
+ name string
+ }
+ pending := []pendingAsset{}
+ bootableDisk := true
+ kernelFile := filepath.Join(assetFolder, "kernel")
+ if osutil.IsExist(kernelFile) {
+ bootableDisk = true
+ pending = append(pending, pendingAsset{kernelFile, dashapi.KernelImage, "kernel"})
+ }
+ imageFile := filepath.Join(assetFolder, "image")
+ if osutil.IsExist(imageFile) {
+ if bootableDisk {
+ pending = append(pending, pendingAsset{imageFile, dashapi.BootableDisk,
+ "disk.raw"})
+ } else {
+ pending = append(pending, pendingAsset{imageFile, dashapi.NonBootableDisk,
+ "non_bootable_disk.raw"})
+ }
+ }
+ target := mgr.managercfg.SysTarget
+ kernelObjFile := filepath.Join(assetFolder, "obj", target.KernelObject)
+ if osutil.IsExist(kernelObjFile) {
+ pending = append(pending,
+ pendingAsset{kernelObjFile, dashapi.KernelObject, target.KernelObject})
+ }
+ // TODO: add initrd?
+ ret := []dashapi.NewAsset{}
+ for _, pendingAsset := range pending {
+ if !mgr.storage.AssetTypeEnabled(pendingAsset.assetType) {
+ continue
+ }
+ file, err := os.Open(pendingAsset.path)
+ if err != nil {
+ log.Logf(0, "failed to open an asset for uploading: %s, %s",
+ pendingAsset.path, err)
+ continue
+ }
+ if mgr.debugStorage {
+ log.Logf(0, "uploading an asset %s of type %s",
+ pendingAsset.path, pendingAsset.assetType)
+ }
+ // TODO: ask the dashboard whether it actually needs this asset?
+ // This should remove the unnecessary re-uploading of assets after
+ // each manager restart, even if the kernel was not built anew.
+ info, err := mgr.storage.UploadBuildAsset(file, pendingAsset.name,
+ pendingAsset.assetType, build)
+ if err != nil {
+ log.Logf(0, "failed to upload an asset: %s, %s",
+ pendingAsset.path, err)
+ continue
+ } else if mgr.debugStorage {
+ log.Logf(0, "uploaded an asset: %#v", info)
+ }
+ ret = append(ret, info)
+ }
+ return ret, nil
+}
+
func (mgr *Manager) uploadCoverReport() error {
+ directUpload := mgr.managercfg.Cover && mgr.cfg.CoverUploadPath != ""
+ if mgr.storage == nil && !directUpload {
+ // Cover report uploading is disabled.
+ return nil
+ }
+ if mgr.storage != nil && directUpload {
+ return fmt.Errorf("cover report must be either uploaded directly or via asset storage")
+ }
// Report generation can consume lots of memory. Generate one at a time.
select {
case kernelBuildSem <- struct{}{}:
@@ -656,8 +770,20 @@ func (mgr *Manager) uploadCoverReport() error {
return fmt.Errorf("failed to get report: %v", err)
}
defer resp.Body.Close()
- // Upload coverage report.
- return uploadFile(mgr.cfg.CoverUploadPath, mgr.name+".html", resp.Body)
+ if directUpload {
+ return uploadFile(mgr.cfg.CoverUploadPath, mgr.name+".html", resp.Body)
+ }
+ // Upload via the asset storage.
+ newAsset, err := mgr.storage.UploadBuildAsset(resp.Body, mgr.name+".html",
+ dashapi.HTMLCoverageReport, mgr.lastBuild)
+ if err != nil {
+ return fmt.Errorf("failed to upload html coverage report: %w", err)
+ }
+ err = mgr.storage.ReportBuildAssets(mgr.lastBuild, newAsset)
+ if err != nil {
+ return fmt.Errorf("failed to report the html coverage report asset: %w", err)
+ }
+ return nil
}
func (mgr *Manager) uploadCorpus() error {
diff --git a/syz-ci/syz-ci.go b/syz-ci/syz-ci.go
index be7c5b470..2f29fec5e 100644
--- a/syz-ci/syz-ci.go
+++ b/syz-ci/syz-ci.go
@@ -62,7 +62,10 @@ import (
"path/filepath"
"regexp"
"sync"
+ "time"
+ "github.com/google/syzkaller/dashboard/dashapi"
+ "github.com/google/syzkaller/pkg/asset"
"github.com/google/syzkaller/pkg/config"
"github.com/google/syzkaller/pkg/log"
"github.com/google/syzkaller/pkg/mgrconfig"
@@ -107,6 +110,8 @@ type Config struct {
JobPollPeriod int `json:"job_poll_period"`
// Poll period for commits in seconds (optional, defaults to 3600 seconds)
CommitPollPeriod int `json:"commit_poll_period"`
+ // Asset Storage config.
+ AssetStorage *asset.Config `json:"asset_storage"`
}
type ManagerConfig struct {
@@ -234,7 +239,6 @@ func main() {
}()
}
}
-
jp, err := newJobProcessor(cfg, managers, stop, shutdownPending)
if err != nil {
log.Fatalf("failed to create dashapi connection %v", err)
@@ -256,6 +260,8 @@ func main() {
}
})
+ wg.Add(1)
+ go deprecateAssets(cfg, shutdownPending, &wg)
wg.Wait()
select {
@@ -265,6 +271,39 @@ func main() {
}
}
+func deprecateAssets(cfg *Config, shutdownPending chan struct{}, wg *sync.WaitGroup) {
+ defer wg.Done()
+ if cfg.DashboardAddr == "" || cfg.AssetStorage.IsEmpty() ||
+ !cfg.AssetStorage.DoDeprecation {
+ return
+ }
+ dash, err := dashapi.New(cfg.DashboardClient, cfg.DashboardAddr, cfg.DashboardKey)
+ if err != nil {
+ log.Fatalf("failed to create dashapi during asset deprecation: %v", err)
+ return
+ }
+ storage, err := asset.StorageFromConfig(cfg.AssetStorage, dash)
+ if err != nil {
+ dash.LogError("syz-ci",
+ "failed to create asset storage during asset deprecation: %v", err)
+ return
+ }
+loop:
+ for {
+ const sleepDuration = 6 * time.Hour
+ select {
+ case <-shutdownPending:
+ break loop
+ case <-time.After(sleepDuration):
+ }
+ log.Logf(0, "deprecating assets")
+ err := storage.DeprecateAssets()
+ if err != nil {
+ dash.LogError("syz-ci", "asset deprecation failed: %v", err)
+ }
+ }
+}
+
func serveHTTP(cfg *Config) {
ln, err := net.Listen("tcp4", cfg.HTTP)
if err != nil {
@@ -312,6 +351,11 @@ func loadConfig(filename string) (*Config, error) {
if len(cfg.Managers) == 0 {
return nil, fmt.Errorf("no managers specified")
}
+ if cfg.AssetStorage != nil {
+ if err := cfg.AssetStorage.Validate(); err != nil {
+ return nil, fmt.Errorf("asset storage config error: %w", err)
+ }
+ }
return cfg, nil
}
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 37cf61e15..0ff65a341 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -434,6 +434,7 @@ golang.org/x/perf/benchstat
golang.org/x/perf/internal/stats
golang.org/x/perf/storage/benchfmt
# golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
+## explicit
golang.org/x/sync/errgroup
golang.org/x/sync/semaphore
# golang.org/x/sys v0.0.0-20210503080704-8803ae5d1324