aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--syz-cluster/controller/processor.go19
-rw-r--r--syz-cluster/pkg/blob/gcs.go51
-rw-r--r--syz-cluster/pkg/blob/storage.go42
-rw-r--r--syz-cluster/pkg/blob/storage_test.go2
-rw-r--r--syz-cluster/pkg/service/finding.go20
-rw-r--r--syz-cluster/pkg/service/series.go5
-rw-r--r--syz-cluster/pkg/service/session.go4
-rw-r--r--syz-cluster/pkg/service/sessiontest.go23
8 files changed, 59 insertions, 107 deletions
diff --git a/syz-cluster/controller/processor.go b/syz-cluster/controller/processor.go
index 07851a9fa..e634ad5e5 100644
--- a/syz-cluster/controller/processor.go
+++ b/syz-cluster/controller/processor.go
@@ -138,7 +138,7 @@ func (sp *SeriesProcessor) handleSession(ctx context.Context, session *db.Sessio
app.Errorf("failed to query workflow %q status: %v", session.ID, err)
continue
}
- if workflowLog != nil {
+ if len(workflowLog) > 0 {
err := sp.updateSessionLog(ctx, session, workflowLog)
if err != nil {
app.Errorf("failed to update session log: %v", err)
@@ -214,19 +214,12 @@ func (sp *SeriesProcessor) stopRunningTests(ctx context.Context, sessionID strin
}
func (sp *SeriesProcessor) updateSessionLog(ctx context.Context, session *db.Session, log []byte) error {
+ logURI, err := sp.blobStorage.Write(bytes.NewReader(log), "Session", session.ID, "log")
+ if err != nil {
+ return fmt.Errorf("failed to save the log: %w", err)
+ }
return sp.sessionRepo.Update(ctx, session.ID, func(session *db.Session) error {
- if session.LogURI == "" {
- path, err := sp.blobStorage.Store(bytes.NewReader(log))
- if err != nil {
- return fmt.Errorf("failed to save the log: %w", err)
- }
- session.LogURI = path
- } else {
- err := sp.blobStorage.Update(session.LogURI, bytes.NewReader(log))
- if err != nil {
- return fmt.Errorf("failed to update the log %q: %w", session.LogURI, err)
- }
- }
+ session.LogURI = logURI
return nil
})
}
diff --git a/syz-cluster/pkg/blob/gcs.go b/syz-cluster/pkg/blob/gcs.go
index f6f5797d5..7d22ccd57 100644
--- a/syz-cluster/pkg/blob/gcs.go
+++ b/syz-cluster/pkg/blob/gcs.go
@@ -7,10 +7,10 @@ import (
"context"
"fmt"
"io"
+ "path"
"regexp"
"github.com/google/syzkaller/pkg/gcs"
- "github.com/google/uuid"
)
type gcsDriver struct {
@@ -29,54 +29,39 @@ func NewGCSClient(ctx context.Context, bucket string) (Storage, error) {
}, nil
}
-func (gcs *gcsDriver) Store(source io.Reader) (string, error) {
- object := uuid.NewString()
- err := gcs.writeObject(object, source)
+func (gcs *gcsDriver) Write(source io.Reader, parts ...string) (string, error) {
+ if len(parts) == 0 {
+ return "", fmt.Errorf("no identifiers for the object were passed to Write")
+ }
+ object := path.Join(gcs.bucket, path.Join(parts...))
+ w, err := gcs.client.FileWriter(object, "", "")
if err != nil {
return "", err
}
- return gcs.objectURI(object), nil
-}
-
-func (gcs *gcsDriver) Update(uri string, source io.Reader) error {
- object, err := gcs.objectName(uri)
+ defer w.Close()
+ _, err = io.Copy(w, source)
if err != nil {
- return err
+ return "", err
}
- return gcs.writeObject(object, source)
+ return "gcs://" + object, nil
}
func (gcs *gcsDriver) Read(uri string) (io.ReadCloser, error) {
- object, err := gcs.objectName(uri)
+ bucket, object, err := gcs.parseURI(uri)
if err != nil {
return nil, err
}
- return gcs.client.FileReader(fmt.Sprintf("%s/%s", gcs.bucket, object))
+ return gcs.client.FileReader(path.Join(bucket, object))
}
-var gcsObjectRe = regexp.MustCompile(`^gcs://([\w-]+)/([\w-]+)$`)
+var gcsObjectRe = regexp.MustCompile(`^gcs://([\w-]+)/(.+)$`)
-func (gcs *gcsDriver) objectName(uri string) (string, error) {
+func (gcs *gcsDriver) parseURI(uri string) (string, string, error) {
match := gcsObjectRe.FindStringSubmatch(uri)
if len(match) == 0 {
- return "", fmt.Errorf("invalid GCS URI")
+ return "", "", fmt.Errorf("invalid GCS URI")
} else if match[1] != gcs.bucket {
- return "", fmt.Errorf("unexpected GCS bucket")
+ return "", "", fmt.Errorf("unexpected GCS bucket")
}
- return match[2], nil
-}
-
-func (gcs *gcsDriver) objectURI(object string) string {
- return fmt.Sprintf("gcs://%s/%s", gcs.bucket, object)
-}
-
-func (gcs *gcsDriver) writeObject(object string, source io.Reader) error {
- w, err := gcs.client.FileWriter(fmt.Sprintf("%s/%s", gcs.bucket, object), "", "")
- if err != nil {
- return err
- }
- defer w.Close()
-
- _, err = io.Copy(w, source)
- return err
+ return gcs.bucket, match[2], nil
}
diff --git a/syz-cluster/pkg/blob/storage.go b/syz-cluster/pkg/blob/storage.go
index be1d8f493..e3446e7fa 100644
--- a/syz-cluster/pkg/blob/storage.go
+++ b/syz-cluster/pkg/blob/storage.go
@@ -4,21 +4,21 @@
package blob
import (
+ "encoding/base64"
"fmt"
"io"
"os"
"path/filepath"
"strings"
-
- "github.com/google/uuid"
)
// Storage is not assumed to be used for partciularly large objects (e.g. GB of size),
// but rather for blobs that risk overwhelming Spanner column size limits.
type Storage interface {
- // Store returns a URI to use later.
- Store(source io.Reader) (string, error)
- Update(key string, source io.Reader) error
+ // Write stores the object uniquely identified by a set of IDs (parts).
+ // If it already exists, it will be overwritten.
+ // The first argument is the URI which can be used to later retrieve it with Read.
+ Write(source io.Reader, parts ...string) (string, error)
Read(uri string) (io.ReadCloser, error)
}
@@ -36,20 +36,19 @@ func NewLocalStorage(baseFolder string) *LocalStorage {
const localStoragePrefix = "local://"
-func (ls *LocalStorage) Store(source io.Reader) (string, error) {
- name := uuid.NewString()
- err := ls.writeFile(name, source)
+func (ls *LocalStorage) Write(source io.Reader, parts ...string) (string, error) {
+ // A whatever approach that can handle arbitrary inputs.
+ name := base64.StdEncoding.EncodeToString([]byte(filepath.Join(parts...)))
+ file, err := os.Create(filepath.Join(ls.baseFolder, name))
if err != nil {
return "", err
}
- return localStoragePrefix + name, nil
-}
-
-func (ls *LocalStorage) Update(uri string, source io.Reader) error {
- if !strings.HasPrefix(uri, localStoragePrefix) {
- return fmt.Errorf("unsupported URI type")
+ defer file.Close()
+ _, err = io.Copy(file, source)
+ if err != nil {
+ return "", fmt.Errorf("failed to save data: %w", err)
}
- return ls.writeFile(strings.TrimPrefix(uri, localStoragePrefix), source)
+ return localStoragePrefix + name, nil
}
func (ls *LocalStorage) Read(uri string) (io.ReadCloser, error) {
@@ -61,19 +60,6 @@ func (ls *LocalStorage) Read(uri string) (io.ReadCloser, error) {
return os.Open(path)
}
-func (ls *LocalStorage) writeFile(name string, source io.Reader) error {
- file, err := os.Create(filepath.Join(ls.baseFolder, name))
- if err != nil {
- return err
- }
- defer file.Close()
- _, err = io.Copy(file, source)
- if err != nil {
- return fmt.Errorf("failed to save data: %w", err)
- }
- return nil
-}
-
func ReadAllBytes(storage Storage, uri string) ([]byte, error) {
if uri == "" {
return nil, nil
diff --git a/syz-cluster/pkg/blob/storage_test.go b/syz-cluster/pkg/blob/storage_test.go
index 9d2d830ce..51e7887bf 100644
--- a/syz-cluster/pkg/blob/storage_test.go
+++ b/syz-cluster/pkg/blob/storage_test.go
@@ -16,7 +16,7 @@ func TestLocalStorage(t *testing.T) {
var uris []string
for i := 0; i < 2; i++ {
content := fmt.Sprintf("object #%d", i)
- uri, err := storage.Store(bytes.NewReader([]byte(content)))
+ uri, err := storage.Write(bytes.NewReader([]byte(content)), fmt.Sprint(i))
assert.NoError(t, err)
uris = append(uris, uri)
}
diff --git a/syz-cluster/pkg/service/finding.go b/syz-cluster/pkg/service/finding.go
index bb605ba73..b668dc798 100644
--- a/syz-cluster/pkg/service/finding.go
+++ b/syz-cluster/pkg/service/finding.go
@@ -12,6 +12,7 @@ import (
"github.com/google/syzkaller/syz-cluster/pkg/app"
"github.com/google/syzkaller/syz-cluster/pkg/blob"
"github.com/google/syzkaller/syz-cluster/pkg/db"
+ "github.com/google/uuid"
)
type FindingService struct {
@@ -27,28 +28,27 @@ func NewFindingService(env *app.AppEnvironment) *FindingService {
}
func (s *FindingService) Save(ctx context.Context, req *api.NewFinding) error {
- var reportURI, logURI string
+ finding := &db.Finding{
+ ID: uuid.NewString(),
+ SessionID: req.SessionID,
+ TestName: req.TestName,
+ Title: req.Title,
+ }
var err error
if len(req.Log) > 0 {
- logURI, err = s.blobStorage.Store(bytes.NewReader(req.Log))
+ finding.LogURI, err = s.blobStorage.Write(bytes.NewReader(req.Log), "Finding", finding.ID, "log")
if err != nil {
return fmt.Errorf("failed to save the log: %w", err)
}
}
if len(req.Report) > 0 {
- reportURI, err = s.blobStorage.Store(bytes.NewReader(req.Report))
+ finding.ReportURI, err = s.blobStorage.Write(bytes.NewReader(req.Report), "Finding", finding.ID, "report")
if err != nil {
return fmt.Errorf("failed to save the report: %w", err)
}
}
// TODO: if it's not actually addded, the blob records will be orphaned.
- err = s.findingRepo.Save(ctx, &db.Finding{
- SessionID: req.SessionID,
- TestName: req.TestName,
- Title: req.Title,
- ReportURI: reportURI,
- LogURI: logURI,
- })
+ err = s.findingRepo.Save(ctx, finding)
if err == db.ErrFindingExists {
// It's ok, just ignore.
return nil
diff --git a/syz-cluster/pkg/service/series.go b/syz-cluster/pkg/service/series.go
index 8ab8b5cef..7628ec48b 100644
--- a/syz-cluster/pkg/service/series.go
+++ b/syz-cluster/pkg/service/series.go
@@ -13,6 +13,7 @@ import (
"github.com/google/syzkaller/syz-cluster/pkg/app"
"github.com/google/syzkaller/syz-cluster/pkg/blob"
"github.com/google/syzkaller/syz-cluster/pkg/db"
+ "github.com/google/uuid"
)
// SeriesService is tested in controller/.
@@ -53,6 +54,7 @@ func (s *SeriesService) getSessionSeries(ctx context.Context, sessionID string,
func (s *SeriesService) UploadSeries(ctx context.Context, series *api.Series) (*api.UploadSeriesResp, error) {
seriesObj := &db.Series{
+ ID: uuid.NewString(),
ExtID: series.ExtID,
AuthorEmail: series.AuthorEmail,
Title: series.Title,
@@ -66,7 +68,8 @@ func (s *SeriesService) UploadSeries(ctx context.Context, series *api.Series) (*
for _, patch := range series.Patches {
// In case of errors, we will waste some space, but let's ignore it for simplicity.
// Patches are not super big.
- uri, err := s.blobStorage.Store(bytes.NewReader(patch.Body))
+ uri, err := s.blobStorage.Write(bytes.NewReader(patch.Body),
+ "Series", seriesObj.ID, "Patches", fmt.Sprint(patch.Seq))
if err != nil {
return nil, fmt.Errorf("failed to upload patch body: %w", err)
}
diff --git a/syz-cluster/pkg/service/session.go b/syz-cluster/pkg/service/session.go
index 9ad0688fd..446b431f4 100644
--- a/syz-cluster/pkg/service/session.go
+++ b/syz-cluster/pkg/service/session.go
@@ -36,9 +36,9 @@ func (s *SessionService) SkipSession(ctx context.Context, sessionID string, skip
var triageLogURI string
if len(skip.TriageLog) > 0 {
var err error
- triageLogURI, err = s.blobStorage.Store(bytes.NewReader(skip.TriageLog))
+ triageLogURI, err = s.blobStorage.Write(bytes.NewReader(skip.TriageLog), "Session", sessionID, "triage_log")
if err != nil {
- return fmt.Errorf("failed to save the log: %w", err)
+ return fmt.Errorf("failed to save the triage log: %w", err)
}
}
err := s.sessionRepo.Update(ctx, sessionID, func(session *db.Session) error {
diff --git a/syz-cluster/pkg/service/sessiontest.go b/syz-cluster/pkg/service/sessiontest.go
index be410fdb7..2fa4d8701 100644
--- a/syz-cluster/pkg/service/sessiontest.go
+++ b/syz-cluster/pkg/service/sessiontest.go
@@ -41,7 +41,8 @@ func (s *SessionTestService) Save(ctx context.Context, req *api.TestResult) erro
}
logURI := entity.LogURI
if len(req.Log) > 0 {
- logURI, err = s.uploadOrUpdate(ctx, logURI, bytes.NewReader(req.Log))
+ logURI, err = s.blobStorage.Write(bytes.NewReader(req.Log),
+ "Session", req.SessionID, "Test", req.TestName, "log")
if err != nil {
return fmt.Errorf("failed to save the log: %w", err)
}
@@ -66,27 +67,11 @@ func (s *SessionTestService) SaveArtifacts(ctx context.Context, sessionID, testN
} else if entity == nil {
return fmt.Errorf("the test has not been submitted yet")
}
- newArchiveURI, err := s.uploadOrUpdate(ctx, entity.ArtifactsArchiveURI, reader)
+ archiveURI, err := s.blobStorage.Write(reader, "Session", sessionID, "Test", testName, "artifacts")
if err != nil {
return fmt.Errorf("failed to save the artifacts archive: %w", err)
}
return s.testRepo.InsertOrUpdate(ctx, entity, func(test *db.SessionTest) {
- test.ArtifactsArchiveURI = newArchiveURI
+ test.ArtifactsArchiveURI = archiveURI
})
}
-
-func (s *SessionTestService) uploadOrUpdate(ctx context.Context, uri string, reader io.Reader) (string, error) {
- if uri != "" {
- err := s.blobStorage.Update(uri, reader)
- if err != nil {
- return "", fmt.Errorf("failed to update: %w", err)
- }
- return uri, nil
- }
- // TODO: it will leak if we fail to save the entity.
- uri, err := s.blobStorage.Store(reader)
- if err != nil {
- return "", fmt.Errorf("failed to save: %w", err)
- }
- return uri, nil
-}