aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--pkg/gcs/gcs.go8
-rw-r--r--pkg/gcs/mocks/Client.go36
-rw-r--r--syz-ci/manager.go86
-rw-r--r--syz-ci/manager_test.go137
4 files changed, 214 insertions, 53 deletions
diff --git a/pkg/gcs/gcs.go b/pkg/gcs/gcs.go
index 9d2e68339..b97e9c77a 100644
--- a/pkg/gcs/gcs.go
+++ b/pkg/gcs/gcs.go
@@ -32,7 +32,7 @@ type Client interface {
FileExists(path string) (bool, error)
ListObjects(path string) ([]*Object, error)
- publish(path string) error
+ Publish(path string) error
}
type UploadOptions struct {
@@ -63,7 +63,7 @@ func UploadFile(ctx context.Context, srcFile io.Reader, destURL string, opts Upl
return fmt.Errorf("gcsWriter.Close: %w", err)
}
if opts.Publish {
- return gcsClient.publish(destURL)
+ return gcsClient.Publish(destURL)
}
return nil
}
@@ -143,8 +143,8 @@ func (c *client) FileWriter(gcsFile, contentType, contentEncoding string) (io.Wr
return w, nil
}
-// publish lets any user read gcsFile.
-func (c *client) publish(gcsFile string) error {
+// Publish lets any user read gcsFile.
+func (c *client) Publish(gcsFile string) error {
bucket, filename, err := split(gcsFile)
if err != nil {
return err
diff --git a/pkg/gcs/mocks/Client.go b/pkg/gcs/mocks/Client.go
index 80e72c73e..a6c1351ad 100644
--- a/pkg/gcs/mocks/Client.go
+++ b/pkg/gcs/mocks/Client.go
@@ -139,6 +139,24 @@ func (_m *Client) ListObjects(path string) ([]*gcs.Object, error) {
return r0, r1
}
+// Publish provides a mock function with given fields: path
+func (_m *Client) Publish(path string) error {
+ ret := _m.Called(path)
+
+ if len(ret) == 0 {
+ panic("no return value specified for Publish")
+ }
+
+ var r0 error
+ if rf, ok := ret.Get(0).(func(string) error); ok {
+ r0 = rf(path)
+ } else {
+ r0 = ret.Error(0)
+ }
+
+ return r0
+}
+
// Read provides a mock function with given fields: path
func (_m *Client) Read(path string) (*gcs.File, error) {
ret := _m.Called(path)
@@ -169,24 +187,6 @@ func (_m *Client) Read(path string) (*gcs.File, error) {
return r0, r1
}
-// publish provides a mock function with given fields: path
-func (_m *Client) publish(path string) error {
- ret := _m.Called(path)
-
- if len(ret) == 0 {
- panic("no return value specified for publish")
- }
-
- var r0 error
- if rf, ok := ret.Get(0).(func(string) error); ok {
- r0 = rf(path)
- } else {
- r0 = ret.Error(0)
- }
-
- return r0
-}
-
// NewClient creates a new instance of Client. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewClient(t interface {
diff --git a/syz-ci/manager.go b/syz-ci/manager.go
index ff646b5ff..0fe796b47 100644
--- a/syz-ci/manager.go
+++ b/syz-ci/manager.go
@@ -4,6 +4,7 @@
package main
import (
+ "compress/gzip"
"context"
"crypto/sha256"
"encoding/json"
@@ -36,6 +37,7 @@ import (
"github.com/google/syzkaller/prog"
_ "github.com/google/syzkaller/sys"
"github.com/google/syzkaller/sys/targets"
+ "golang.org/x/sync/errgroup"
)
// This is especially slightly longer than syzkaller rebuild period.
@@ -831,7 +833,7 @@ func (mgr *Manager) uploadBuildAssets(buildInfo *dashapi.Build, assetFolder stri
return ret, nil
}
-func (mgr *Manager) httpGET(path string) (resp *http.Response, err error) {
+func (mgr *Manager) httpGET(ctx context.Context, path string) (resp *http.Response, err error) {
addr := mgr.managercfg.HTTP
if addr != "" && addr[0] == ':' {
addr = "127.0.0.1" + addr // in case addr is ":port"
@@ -839,7 +841,11 @@ func (mgr *Manager) httpGET(path string) (resp *http.Response, err error) {
client := &http.Client{
Timeout: time.Hour,
}
- return client.Get(fmt.Sprintf("http://%s%s", addr, path))
+ req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://%s%s", addr, path), nil)
+ if err != nil {
+ return nil, err
+ }
+ return client.Do(req)
}
func (mgr *Manager) uploadCoverReport() error {
@@ -859,13 +865,13 @@ func (mgr *Manager) uploadCoverReport() error {
}
defer buildSem.Signal()
- resp, err := mgr.httpGET("/cover")
+ resp, err := mgr.httpGET(context.Background(), "/cover")
if err != nil {
return fmt.Errorf("failed to get report: %w", err)
}
defer resp.Body.Close()
if directUpload {
- return uploadFile(mgr.cfg.CoverUploadPath, mgr.name+".html", resp.Body, mgr.cfg.PublishGCS)
+ return uploadFile(context.Background(), nil, mgr.cfg.CoverUploadPath, mgr.name+".html", resp.Body, mgr.cfg.PublishGCS)
}
// Upload via the asset storage.
newAsset, err := mgr.storage.UploadBuildAsset(resp.Body, mgr.name+".html",
@@ -880,8 +886,8 @@ func (mgr *Manager) uploadCoverReport() error {
return nil
}
-func (mgr *Manager) uploadCoverJSONLToGCS(mgrSrc, gcsDest string, curTime time.Time, publish bool,
- f func(io.Writer, *json.Decoder) error) error {
+func (mgr *Manager) uploadCoverJSONLToGCS(gcsClient gcs.Client, mgrSrc, gcsDest string, curTime time.Time,
+ publish, compress bool, f func(io.Writer, *json.Decoder) error) error {
if !mgr.managercfg.Cover || gcsDest == "" {
return nil
}
@@ -895,7 +901,8 @@ func (mgr *Manager) uploadCoverJSONLToGCS(mgrSrc, gcsDest string, curTime time.T
}
defer buildSem.Signal()
- resp, err := mgr.httpGET(mgrSrc)
+ eg, egCtx := errgroup.WithContext(context.Background())
+ resp, err := mgr.httpGET(egCtx, mgrSrc)
if err != nil {
return fmt.Errorf("failed to httpGet %s: %w", mgrSrc, err)
}
@@ -908,25 +915,35 @@ func (mgr *Manager) uploadCoverJSONLToGCS(mgrSrc, gcsDest string, curTime time.T
}
pr, pw := io.Pipe()
- defer pr.Close()
- go func() {
+ eg.Go(func() error {
+ defer pw.Close()
+ var w io.Writer
+ w = pw
+ if compress {
+ gzw := gzip.NewWriter(pw)
+ defer gzw.Close()
+ w = gzw
+ }
decoder := json.NewDecoder(resp.Body)
for decoder.More() {
- if err := f(pw, decoder); err != nil {
- pw.CloseWithError(fmt.Errorf("callback: %w", err))
- return
+ if err := f(w, decoder); err != nil {
+ return fmt.Errorf("callback: %w", err)
}
}
- pw.Close()
- }()
- fileName := fmt.Sprintf("%s/%s-%s-%d-%d.jsonl",
- mgr.mgrcfg.DashboardClient,
- mgr.name, curTime.Format(time.DateOnly),
- curTime.Hour(), curTime.Minute())
- if err := uploadFile(gcsDest, fileName, pr, publish); err != nil {
- return fmt.Errorf("failed to uploadFileGCS(): %w", err)
- }
- return nil
+ return nil
+ })
+ eg.Go(func() error {
+ defer pr.Close()
+ fileName := fmt.Sprintf("%s/%s-%s-%d-%d.jsonl",
+ mgr.mgrcfg.DashboardClient,
+ mgr.name, curTime.Format(time.DateOnly),
+ curTime.Hour(), curTime.Minute())
+ if err := uploadFile(egCtx, gcsClient, gcsDest, fileName, pr, publish); err != nil {
+ return fmt.Errorf("uploadFile: %w", err)
+ }
+ return nil
+ })
+ return eg.Wait()
}
func (mgr *Manager) uploadCoverStat(fuzzingMinutes int) error {
@@ -934,10 +951,12 @@ func (mgr *Manager) uploadCoverStat(fuzzingMinutes int) error {
// In the syz-ci context report generation won't be used after this point,
// so tell manager to flush report generator.
curTime := time.Now()
- if err := mgr.uploadCoverJSONLToGCS("/cover?jsonl=1&flush=1",
+ if err := mgr.uploadCoverJSONLToGCS(nil,
+ "/cover?jsonl=1&flush=1",
mgr.cfg.CoverPipelinePath,
curTime,
false,
+ false,
func(w io.Writer, dec *json.Decoder) error {
var covInfo cover.CoverageInfo
if err := dec.Decode(&covInfo); err != nil {
@@ -964,10 +983,12 @@ func (mgr *Manager) uploadCoverStat(fuzzingMinutes int) error {
}
func (mgr *Manager) uploadProgramsWithCoverage() error {
- if err := mgr.uploadCoverJSONLToGCS("/coverprogs?jsonl=1",
+ if err := mgr.uploadCoverJSONLToGCS(nil,
+ "/coverprogs?jsonl=1",
mgr.cfg.CoverProgramsPath,
time.Now(),
mgr.cfg.PublishGCS,
+ true,
func(w io.Writer, dec *json.Decoder) error {
var programCoverage cover.ProgramCoverage
if err := dec.Decode(&programCoverage); err != nil {
@@ -994,7 +1015,7 @@ func (mgr *Manager) uploadCorpus() error {
return err
}
defer f.Close()
- return uploadFile(mgr.cfg.CorpusUploadPath, mgr.name+"-corpus.db", f, mgr.cfg.PublishGCS)
+ return uploadFile(context.Background(), nil, mgr.cfg.CorpusUploadPath, mgr.name+"-corpus.db", f, mgr.cfg.PublishGCS)
}
func (mgr *Manager) uploadBenchData() error {
@@ -1012,7 +1033,7 @@ func (mgr *Manager) uploadBenchData() error {
return fmt.Errorf("failed to open bench file: %w", err)
}
defer f.Close()
- err = uploadFile(mgr.cfg.BenchUploadPath+"/"+mgr.name,
+ err = uploadFile(context.Background(), nil, mgr.cfg.BenchUploadPath+"/"+mgr.name,
mgr.lastRestarted.Format("2006-01-02_15h.json"), f, false)
if err != nil {
return fmt.Errorf("failed to upload the bench file: %w", err)
@@ -1020,7 +1041,7 @@ func (mgr *Manager) uploadBenchData() error {
return nil
}
-func uploadFile(dstPath, name string, file io.Reader, publish bool) error {
+func uploadFile(ctx context.Context, gcsClient gcs.Client, dstPath, name string, file io.Reader, publish bool) error {
URL, err := url.Parse(dstPath)
if err != nil {
return fmt.Errorf("failed to parse upload path: %w", err)
@@ -1030,13 +1051,16 @@ func uploadFile(dstPath, name string, file io.Reader, publish bool) error {
log.Logf(0, "uploading %v to %v", name, URLStr)
if strings.HasPrefix(URLStr, "http://") ||
strings.HasPrefix(URLStr, "https://") {
- return uploadFileHTTPPut(URLStr, file)
+ if gcsClient != nil {
+ return fmt.Errorf("gcsClient is expected to be nil for the http* requests")
+ }
+ return uploadFileHTTPPut(ctx, URLStr, file)
}
- return gcs.UploadFile(context.Background(), file, URLStr, gcs.UploadOptions{Publish: publish})
+ return gcs.UploadFile(ctx, file, URLStr, gcs.UploadOptions{Publish: publish, GCSClientMock: gcsClient})
}
-func uploadFileHTTPPut(URL string, file io.Reader) error {
- req, err := http.NewRequest(http.MethodPut, URL, file)
+func uploadFileHTTPPut(ctx context.Context, URL string, file io.Reader) error {
+ req, err := http.NewRequestWithContext(ctx, http.MethodPut, URL, file)
if err != nil {
return fmt.Errorf("failed to create HTTP PUT request: %w", err)
}
diff --git a/syz-ci/manager_test.go b/syz-ci/manager_test.go
index d8366102b..42282711d 100644
--- a/syz-ci/manager_test.go
+++ b/syz-ci/manager_test.go
@@ -4,10 +4,21 @@
package main
import (
+ "bytes"
+ "compress/gzip"
+ "encoding/json"
"fmt"
+ "io"
+ "net/http"
+ "net/http/httptest"
+ "strings"
"testing"
+ "time"
"github.com/google/syzkaller/dashboard/dashapi"
+ "github.com/google/syzkaller/pkg/cover"
+ gcsmocks "github.com/google/syzkaller/pkg/gcs/mocks"
+ "github.com/google/syzkaller/pkg/mgrconfig"
"github.com/google/syzkaller/pkg/vcs"
"github.com/google/syzkaller/sys/targets"
"github.com/stretchr/testify/assert"
@@ -106,3 +117,129 @@ Reported-by: foo+abcd000@bar.com`,
assert.Equal(t, commit.Title, "title with fix")
assert.ElementsMatch(t, commit.BugIDs, []string{"abcd000"})
}
+
+func TestUploadCoverJSONLToGCS(t *testing.T) {
+ tests := []struct {
+ name string
+
+ inputJSONL string
+ inputTime time.Time
+
+ inputCompress bool
+ inputPublish bool
+
+ wantGCSFileName string
+ wantGCSFileContent string
+ wantCompressed bool
+ wantPublish bool
+ wantError string
+ }{
+ {
+ name: "upload single object",
+ inputJSONL: "{}",
+ inputTime: time.Time{},
+ wantGCSFileName: "test-bucket/test-namespace/mgr-name-0001-01-01-0-0.jsonl",
+ wantGCSFileContent: "{}\n",
+ },
+ {
+ name: "upload single object, compress",
+ inputJSONL: "{}",
+ inputTime: time.Time{},
+ inputCompress: true,
+ wantGCSFileName: "test-bucket/test-namespace/mgr-name-0001-01-01-0-0.jsonl",
+ wantGCSFileContent: "{}\n",
+ wantCompressed: true,
+ },
+ {
+ name: "upload single object, publish",
+ inputJSONL: "{}",
+ inputTime: time.Time{},
+ inputPublish: true,
+ wantGCSFileName: "test-bucket/test-namespace/mgr-name-0001-01-01-0-0.jsonl",
+ wantGCSFileContent: "{}\n",
+ wantPublish: true,
+ },
+ {
+ name: "upload single object, error",
+ inputJSONL: "{",
+ inputTime: time.Time{},
+ wantGCSFileName: "test-bucket/test-namespace/mgr-name-0001-01-01-0-0.jsonl",
+ wantError: "callback: cover.ProgramCoverage: unexpected EOF",
+ },
+ }
+
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ httpServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+ w.Write([]byte(test.inputJSONL))
+ }))
+ defer httpServer.Close()
+
+ testSetverAddrPort, _ := strings.CutPrefix(httpServer.URL, "http://")
+ mgr := Manager{
+ name: "mgr-name",
+ managercfg: &mgrconfig.Config{
+ HTTP: testSetverAddrPort,
+ Cover: true,
+ },
+ mgrcfg: &ManagerConfig{
+ DashboardClient: "test-namespace",
+ },
+ }
+
+ gcsMock := gcsmocks.NewClient(t)
+ gotBytes := mockWriteCloser{}
+
+ gcsMock.On("FileWriter", test.wantGCSFileName, "", "").
+ Return(&gotBytes, nil).Once()
+ gcsMock.On("Close").Return(nil).Once()
+ if test.wantPublish {
+ gcsMock.On("Publish", test.wantGCSFileName).
+ Return(nil).Once()
+ }
+ err := mgr.uploadCoverJSONLToGCS(gcsMock,
+ "/teststream&jsonl=1",
+ "gs://test-bucket",
+ time.Time{}, test.inputPublish, test.inputCompress, func(w io.Writer, dec *json.Decoder) error {
+ var v any
+ if err := dec.Decode(&v); err != nil {
+ return fmt.Errorf("cover.ProgramCoverage: %w", err)
+ }
+ if err := cover.WriteJSLine(w, &v); err != nil {
+ return fmt.Errorf("cover.WriteJSLine: %w", err)
+ }
+ return nil
+ })
+ if test.wantError != "" {
+ assert.Equal(t, test.wantError, err.Error())
+ } else {
+ assert.NoError(t, err)
+ }
+ assert.Equal(t, 1, gotBytes.closedTimes)
+ if test.wantCompressed {
+ gzReader, err := gzip.NewReader(&gotBytes.buf)
+ assert.NoError(t, err)
+ defer gzReader.Close()
+ plainBytes := mockWriteCloser{}
+ _, err = io.Copy(&plainBytes, gzReader)
+ assert.NoError(t, err)
+ gotBytes = plainBytes
+ }
+ assert.Equal(t, test.wantGCSFileContent, gotBytes.buf.String())
+ })
+ }
+}
+
+type mockWriteCloser struct {
+ buf bytes.Buffer
+ closedTimes int
+}
+
+func (m *mockWriteCloser) Write(p []byte) (n int, err error) {
+ return m.buf.Write(p)
+}
+
+func (m *mockWriteCloser) Close() error {
+ m.closedTimes++
+ return nil
+}