diff options
| -rw-r--r-- | syz-cluster/controller/processor.go | 19 | ||||
| -rw-r--r-- | syz-cluster/pkg/blob/gcs.go | 51 | ||||
| -rw-r--r-- | syz-cluster/pkg/blob/storage.go | 42 | ||||
| -rw-r--r-- | syz-cluster/pkg/blob/storage_test.go | 2 | ||||
| -rw-r--r-- | syz-cluster/pkg/service/finding.go | 20 | ||||
| -rw-r--r-- | syz-cluster/pkg/service/series.go | 5 | ||||
| -rw-r--r-- | syz-cluster/pkg/service/session.go | 4 | ||||
| -rw-r--r-- | syz-cluster/pkg/service/sessiontest.go | 23 |
8 files changed, 59 insertions, 107 deletions
diff --git a/syz-cluster/controller/processor.go b/syz-cluster/controller/processor.go index 07851a9fa..e634ad5e5 100644 --- a/syz-cluster/controller/processor.go +++ b/syz-cluster/controller/processor.go @@ -138,7 +138,7 @@ func (sp *SeriesProcessor) handleSession(ctx context.Context, session *db.Sessio app.Errorf("failed to query workflow %q status: %v", session.ID, err) continue } - if workflowLog != nil { + if len(workflowLog) > 0 { err := sp.updateSessionLog(ctx, session, workflowLog) if err != nil { app.Errorf("failed to update session log: %v", err) @@ -214,19 +214,12 @@ func (sp *SeriesProcessor) stopRunningTests(ctx context.Context, sessionID strin } func (sp *SeriesProcessor) updateSessionLog(ctx context.Context, session *db.Session, log []byte) error { + logURI, err := sp.blobStorage.Write(bytes.NewReader(log), "Session", session.ID, "log") + if err != nil { + return fmt.Errorf("failed to save the log: %w", err) + } return sp.sessionRepo.Update(ctx, session.ID, func(session *db.Session) error { - if session.LogURI == "" { - path, err := sp.blobStorage.Store(bytes.NewReader(log)) - if err != nil { - return fmt.Errorf("failed to save the log: %w", err) - } - session.LogURI = path - } else { - err := sp.blobStorage.Update(session.LogURI, bytes.NewReader(log)) - if err != nil { - return fmt.Errorf("failed to update the log %q: %w", session.LogURI, err) - } - } + session.LogURI = logURI return nil }) } diff --git a/syz-cluster/pkg/blob/gcs.go b/syz-cluster/pkg/blob/gcs.go index f6f5797d5..7d22ccd57 100644 --- a/syz-cluster/pkg/blob/gcs.go +++ b/syz-cluster/pkg/blob/gcs.go @@ -7,10 +7,10 @@ import ( "context" "fmt" "io" + "path" "regexp" "github.com/google/syzkaller/pkg/gcs" - "github.com/google/uuid" ) type gcsDriver struct { @@ -29,54 +29,39 @@ func NewGCSClient(ctx context.Context, bucket string) (Storage, error) { }, nil } -func (gcs *gcsDriver) Store(source io.Reader) (string, error) { - object := uuid.NewString() - err := gcs.writeObject(object, source) +func (gcs *gcsDriver) Write(source io.Reader, parts ...string) (string, error) { + if len(parts) == 0 { + return "", fmt.Errorf("no identifiers for the object were passed to Write") + } + object := path.Join(gcs.bucket, path.Join(parts...)) + w, err := gcs.client.FileWriter(object, "", "") if err != nil { return "", err } - return gcs.objectURI(object), nil -} - -func (gcs *gcsDriver) Update(uri string, source io.Reader) error { - object, err := gcs.objectName(uri) + defer w.Close() + _, err = io.Copy(w, source) if err != nil { - return err + return "", err } - return gcs.writeObject(object, source) + return "gcs://" + object, nil } func (gcs *gcsDriver) Read(uri string) (io.ReadCloser, error) { - object, err := gcs.objectName(uri) + bucket, object, err := gcs.parseURI(uri) if err != nil { return nil, err } - return gcs.client.FileReader(fmt.Sprintf("%s/%s", gcs.bucket, object)) + return gcs.client.FileReader(path.Join(bucket, object)) } -var gcsObjectRe = regexp.MustCompile(`^gcs://([\w-]+)/([\w-]+)$`) +var gcsObjectRe = regexp.MustCompile(`^gcs://([\w-]+)/(.+)$`) -func (gcs *gcsDriver) objectName(uri string) (string, error) { +func (gcs *gcsDriver) parseURI(uri string) (string, string, error) { match := gcsObjectRe.FindStringSubmatch(uri) if len(match) == 0 { - return "", fmt.Errorf("invalid GCS URI") + return "", "", fmt.Errorf("invalid GCS URI") } else if match[1] != gcs.bucket { - return "", fmt.Errorf("unexpected GCS bucket") + return "", "", fmt.Errorf("unexpected GCS bucket") } - return match[2], nil -} - -func (gcs *gcsDriver) objectURI(object string) string { - return fmt.Sprintf("gcs://%s/%s", gcs.bucket, object) -} - -func (gcs *gcsDriver) writeObject(object string, source io.Reader) error { - w, err := gcs.client.FileWriter(fmt.Sprintf("%s/%s", gcs.bucket, object), "", "") - if err != nil { - return err - } - defer w.Close() - - _, err = io.Copy(w, source) - return err + return gcs.bucket, match[2], nil } diff --git a/syz-cluster/pkg/blob/storage.go b/syz-cluster/pkg/blob/storage.go index be1d8f493..e3446e7fa 100644 --- a/syz-cluster/pkg/blob/storage.go +++ b/syz-cluster/pkg/blob/storage.go @@ -4,21 +4,21 @@ package blob import ( + "encoding/base64" "fmt" "io" "os" "path/filepath" "strings" - - "github.com/google/uuid" ) // Storage is not assumed to be used for partciularly large objects (e.g. GB of size), // but rather for blobs that risk overwhelming Spanner column size limits. type Storage interface { - // Store returns a URI to use later. - Store(source io.Reader) (string, error) - Update(key string, source io.Reader) error + // Write stores the object uniquely identified by a set of IDs (parts). + // If it already exists, it will be overwritten. + // The first argument is the URI which can be used to later retrieve it with Read. + Write(source io.Reader, parts ...string) (string, error) Read(uri string) (io.ReadCloser, error) } @@ -36,20 +36,19 @@ func NewLocalStorage(baseFolder string) *LocalStorage { const localStoragePrefix = "local://" -func (ls *LocalStorage) Store(source io.Reader) (string, error) { - name := uuid.NewString() - err := ls.writeFile(name, source) +func (ls *LocalStorage) Write(source io.Reader, parts ...string) (string, error) { + // A whatever approach that can handle arbitrary inputs. + name := base64.StdEncoding.EncodeToString([]byte(filepath.Join(parts...))) + file, err := os.Create(filepath.Join(ls.baseFolder, name)) if err != nil { return "", err } - return localStoragePrefix + name, nil -} - -func (ls *LocalStorage) Update(uri string, source io.Reader) error { - if !strings.HasPrefix(uri, localStoragePrefix) { - return fmt.Errorf("unsupported URI type") + defer file.Close() + _, err = io.Copy(file, source) + if err != nil { + return "", fmt.Errorf("failed to save data: %w", err) } - return ls.writeFile(strings.TrimPrefix(uri, localStoragePrefix), source) + return localStoragePrefix + name, nil } func (ls *LocalStorage) Read(uri string) (io.ReadCloser, error) { @@ -61,19 +60,6 @@ func (ls *LocalStorage) Read(uri string) (io.ReadCloser, error) { return os.Open(path) } -func (ls *LocalStorage) writeFile(name string, source io.Reader) error { - file, err := os.Create(filepath.Join(ls.baseFolder, name)) - if err != nil { - return err - } - defer file.Close() - _, err = io.Copy(file, source) - if err != nil { - return fmt.Errorf("failed to save data: %w", err) - } - return nil -} - func ReadAllBytes(storage Storage, uri string) ([]byte, error) { if uri == "" { return nil, nil diff --git a/syz-cluster/pkg/blob/storage_test.go b/syz-cluster/pkg/blob/storage_test.go index 9d2d830ce..51e7887bf 100644 --- a/syz-cluster/pkg/blob/storage_test.go +++ b/syz-cluster/pkg/blob/storage_test.go @@ -16,7 +16,7 @@ func TestLocalStorage(t *testing.T) { var uris []string for i := 0; i < 2; i++ { content := fmt.Sprintf("object #%d", i) - uri, err := storage.Store(bytes.NewReader([]byte(content))) + uri, err := storage.Write(bytes.NewReader([]byte(content)), fmt.Sprint(i)) assert.NoError(t, err) uris = append(uris, uri) } diff --git a/syz-cluster/pkg/service/finding.go b/syz-cluster/pkg/service/finding.go index bb605ba73..b668dc798 100644 --- a/syz-cluster/pkg/service/finding.go +++ b/syz-cluster/pkg/service/finding.go @@ -12,6 +12,7 @@ import ( "github.com/google/syzkaller/syz-cluster/pkg/app" "github.com/google/syzkaller/syz-cluster/pkg/blob" "github.com/google/syzkaller/syz-cluster/pkg/db" + "github.com/google/uuid" ) type FindingService struct { @@ -27,28 +28,27 @@ func NewFindingService(env *app.AppEnvironment) *FindingService { } func (s *FindingService) Save(ctx context.Context, req *api.NewFinding) error { - var reportURI, logURI string + finding := &db.Finding{ + ID: uuid.NewString(), + SessionID: req.SessionID, + TestName: req.TestName, + Title: req.Title, + } var err error if len(req.Log) > 0 { - logURI, err = s.blobStorage.Store(bytes.NewReader(req.Log)) + finding.LogURI, err = s.blobStorage.Write(bytes.NewReader(req.Log), "Finding", finding.ID, "log") if err != nil { return fmt.Errorf("failed to save the log: %w", err) } } if len(req.Report) > 0 { - reportURI, err = s.blobStorage.Store(bytes.NewReader(req.Report)) + finding.ReportURI, err = s.blobStorage.Write(bytes.NewReader(req.Report), "Finding", finding.ID, "report") if err != nil { return fmt.Errorf("failed to save the report: %w", err) } } // TODO: if it's not actually addded, the blob records will be orphaned. - err = s.findingRepo.Save(ctx, &db.Finding{ - SessionID: req.SessionID, - TestName: req.TestName, - Title: req.Title, - ReportURI: reportURI, - LogURI: logURI, - }) + err = s.findingRepo.Save(ctx, finding) if err == db.ErrFindingExists { // It's ok, just ignore. return nil diff --git a/syz-cluster/pkg/service/series.go b/syz-cluster/pkg/service/series.go index 8ab8b5cef..7628ec48b 100644 --- a/syz-cluster/pkg/service/series.go +++ b/syz-cluster/pkg/service/series.go @@ -13,6 +13,7 @@ import ( "github.com/google/syzkaller/syz-cluster/pkg/app" "github.com/google/syzkaller/syz-cluster/pkg/blob" "github.com/google/syzkaller/syz-cluster/pkg/db" + "github.com/google/uuid" ) // SeriesService is tested in controller/. @@ -53,6 +54,7 @@ func (s *SeriesService) getSessionSeries(ctx context.Context, sessionID string, func (s *SeriesService) UploadSeries(ctx context.Context, series *api.Series) (*api.UploadSeriesResp, error) { seriesObj := &db.Series{ + ID: uuid.NewString(), ExtID: series.ExtID, AuthorEmail: series.AuthorEmail, Title: series.Title, @@ -66,7 +68,8 @@ func (s *SeriesService) UploadSeries(ctx context.Context, series *api.Series) (* for _, patch := range series.Patches { // In case of errors, we will waste some space, but let's ignore it for simplicity. // Patches are not super big. - uri, err := s.blobStorage.Store(bytes.NewReader(patch.Body)) + uri, err := s.blobStorage.Write(bytes.NewReader(patch.Body), + "Series", seriesObj.ID, "Patches", fmt.Sprint(patch.Seq)) if err != nil { return nil, fmt.Errorf("failed to upload patch body: %w", err) } diff --git a/syz-cluster/pkg/service/session.go b/syz-cluster/pkg/service/session.go index 9ad0688fd..446b431f4 100644 --- a/syz-cluster/pkg/service/session.go +++ b/syz-cluster/pkg/service/session.go @@ -36,9 +36,9 @@ func (s *SessionService) SkipSession(ctx context.Context, sessionID string, skip var triageLogURI string if len(skip.TriageLog) > 0 { var err error - triageLogURI, err = s.blobStorage.Store(bytes.NewReader(skip.TriageLog)) + triageLogURI, err = s.blobStorage.Write(bytes.NewReader(skip.TriageLog), "Session", sessionID, "triage_log") if err != nil { - return fmt.Errorf("failed to save the log: %w", err) + return fmt.Errorf("failed to save the triage log: %w", err) } } err := s.sessionRepo.Update(ctx, sessionID, func(session *db.Session) error { diff --git a/syz-cluster/pkg/service/sessiontest.go b/syz-cluster/pkg/service/sessiontest.go index be410fdb7..2fa4d8701 100644 --- a/syz-cluster/pkg/service/sessiontest.go +++ b/syz-cluster/pkg/service/sessiontest.go @@ -41,7 +41,8 @@ func (s *SessionTestService) Save(ctx context.Context, req *api.TestResult) erro } logURI := entity.LogURI if len(req.Log) > 0 { - logURI, err = s.uploadOrUpdate(ctx, logURI, bytes.NewReader(req.Log)) + logURI, err = s.blobStorage.Write(bytes.NewReader(req.Log), + "Session", req.SessionID, "Test", req.TestName, "log") if err != nil { return fmt.Errorf("failed to save the log: %w", err) } @@ -66,27 +67,11 @@ func (s *SessionTestService) SaveArtifacts(ctx context.Context, sessionID, testN } else if entity == nil { return fmt.Errorf("the test has not been submitted yet") } - newArchiveURI, err := s.uploadOrUpdate(ctx, entity.ArtifactsArchiveURI, reader) + archiveURI, err := s.blobStorage.Write(reader, "Session", sessionID, "Test", testName, "artifacts") if err != nil { return fmt.Errorf("failed to save the artifacts archive: %w", err) } return s.testRepo.InsertOrUpdate(ctx, entity, func(test *db.SessionTest) { - test.ArtifactsArchiveURI = newArchiveURI + test.ArtifactsArchiveURI = archiveURI }) } - -func (s *SessionTestService) uploadOrUpdate(ctx context.Context, uri string, reader io.Reader) (string, error) { - if uri != "" { - err := s.blobStorage.Update(uri, reader) - if err != nil { - return "", fmt.Errorf("failed to update: %w", err) - } - return uri, nil - } - // TODO: it will leak if we fail to save the entity. - uri, err := s.blobStorage.Store(reader) - if err != nil { - return "", fmt.Errorf("failed to save: %w", err) - } - return uri, nil -} |
