diff options
Diffstat (limited to 'syz-ci')
| -rw-r--r-- | syz-ci/manager.go | 86 | ||||
| -rw-r--r-- | syz-ci/manager_test.go | 137 |
2 files changed, 192 insertions, 31 deletions
diff --git a/syz-ci/manager.go b/syz-ci/manager.go index ff646b5ff..0fe796b47 100644 --- a/syz-ci/manager.go +++ b/syz-ci/manager.go @@ -4,6 +4,7 @@ package main import ( + "compress/gzip" "context" "crypto/sha256" "encoding/json" @@ -36,6 +37,7 @@ import ( "github.com/google/syzkaller/prog" _ "github.com/google/syzkaller/sys" "github.com/google/syzkaller/sys/targets" + "golang.org/x/sync/errgroup" ) // This is especially slightly longer than syzkaller rebuild period. @@ -831,7 +833,7 @@ func (mgr *Manager) uploadBuildAssets(buildInfo *dashapi.Build, assetFolder stri return ret, nil } -func (mgr *Manager) httpGET(path string) (resp *http.Response, err error) { +func (mgr *Manager) httpGET(ctx context.Context, path string) (resp *http.Response, err error) { addr := mgr.managercfg.HTTP if addr != "" && addr[0] == ':' { addr = "127.0.0.1" + addr // in case addr is ":port" @@ -839,7 +841,11 @@ func (mgr *Manager) httpGET(path string) (resp *http.Response, err error) { client := &http.Client{ Timeout: time.Hour, } - return client.Get(fmt.Sprintf("http://%s%s", addr, path)) + req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://%s%s", addr, path), nil) + if err != nil { + return nil, err + } + return client.Do(req) } func (mgr *Manager) uploadCoverReport() error { @@ -859,13 +865,13 @@ func (mgr *Manager) uploadCoverReport() error { } defer buildSem.Signal() - resp, err := mgr.httpGET("/cover") + resp, err := mgr.httpGET(context.Background(), "/cover") if err != nil { return fmt.Errorf("failed to get report: %w", err) } defer resp.Body.Close() if directUpload { - return uploadFile(mgr.cfg.CoverUploadPath, mgr.name+".html", resp.Body, mgr.cfg.PublishGCS) + return uploadFile(context.Background(), nil, mgr.cfg.CoverUploadPath, mgr.name+".html", resp.Body, mgr.cfg.PublishGCS) } // Upload via the asset storage. newAsset, err := mgr.storage.UploadBuildAsset(resp.Body, mgr.name+".html", @@ -880,8 +886,8 @@ func (mgr *Manager) uploadCoverReport() error { return nil } -func (mgr *Manager) uploadCoverJSONLToGCS(mgrSrc, gcsDest string, curTime time.Time, publish bool, - f func(io.Writer, *json.Decoder) error) error { +func (mgr *Manager) uploadCoverJSONLToGCS(gcsClient gcs.Client, mgrSrc, gcsDest string, curTime time.Time, + publish, compress bool, f func(io.Writer, *json.Decoder) error) error { if !mgr.managercfg.Cover || gcsDest == "" { return nil } @@ -895,7 +901,8 @@ func (mgr *Manager) uploadCoverJSONLToGCS(mgrSrc, gcsDest string, curTime time.T } defer buildSem.Signal() - resp, err := mgr.httpGET(mgrSrc) + eg, egCtx := errgroup.WithContext(context.Background()) + resp, err := mgr.httpGET(egCtx, mgrSrc) if err != nil { return fmt.Errorf("failed to httpGet %s: %w", mgrSrc, err) } @@ -908,25 +915,35 @@ func (mgr *Manager) uploadCoverJSONLToGCS(mgrSrc, gcsDest string, curTime time.T } pr, pw := io.Pipe() - defer pr.Close() - go func() { + eg.Go(func() error { + defer pw.Close() + var w io.Writer + w = pw + if compress { + gzw := gzip.NewWriter(pw) + defer gzw.Close() + w = gzw + } decoder := json.NewDecoder(resp.Body) for decoder.More() { - if err := f(pw, decoder); err != nil { - pw.CloseWithError(fmt.Errorf("callback: %w", err)) - return + if err := f(w, decoder); err != nil { + return fmt.Errorf("callback: %w", err) } } - pw.Close() - }() - fileName := fmt.Sprintf("%s/%s-%s-%d-%d.jsonl", - mgr.mgrcfg.DashboardClient, - mgr.name, curTime.Format(time.DateOnly), - curTime.Hour(), curTime.Minute()) - if err := uploadFile(gcsDest, fileName, pr, publish); err != nil { - return fmt.Errorf("failed to uploadFileGCS(): %w", err) - } - return nil + return nil + }) + eg.Go(func() error { + defer pr.Close() + fileName := fmt.Sprintf("%s/%s-%s-%d-%d.jsonl", + mgr.mgrcfg.DashboardClient, + mgr.name, curTime.Format(time.DateOnly), + curTime.Hour(), curTime.Minute()) + if err := uploadFile(egCtx, gcsClient, gcsDest, fileName, pr, publish); err != nil { + return fmt.Errorf("uploadFile: %w", err) + } + return nil + }) + return eg.Wait() } func (mgr *Manager) uploadCoverStat(fuzzingMinutes int) error { @@ -934,10 +951,12 @@ func (mgr *Manager) uploadCoverStat(fuzzingMinutes int) error { // In the syz-ci context report generation won't be used after this point, // so tell manager to flush report generator. curTime := time.Now() - if err := mgr.uploadCoverJSONLToGCS("/cover?jsonl=1&flush=1", + if err := mgr.uploadCoverJSONLToGCS(nil, + "/cover?jsonl=1&flush=1", mgr.cfg.CoverPipelinePath, curTime, false, + false, func(w io.Writer, dec *json.Decoder) error { var covInfo cover.CoverageInfo if err := dec.Decode(&covInfo); err != nil { @@ -964,10 +983,12 @@ func (mgr *Manager) uploadCoverStat(fuzzingMinutes int) error { } func (mgr *Manager) uploadProgramsWithCoverage() error { - if err := mgr.uploadCoverJSONLToGCS("/coverprogs?jsonl=1", + if err := mgr.uploadCoverJSONLToGCS(nil, + "/coverprogs?jsonl=1", mgr.cfg.CoverProgramsPath, time.Now(), mgr.cfg.PublishGCS, + true, func(w io.Writer, dec *json.Decoder) error { var programCoverage cover.ProgramCoverage if err := dec.Decode(&programCoverage); err != nil { @@ -994,7 +1015,7 @@ func (mgr *Manager) uploadCorpus() error { return err } defer f.Close() - return uploadFile(mgr.cfg.CorpusUploadPath, mgr.name+"-corpus.db", f, mgr.cfg.PublishGCS) + return uploadFile(context.Background(), nil, mgr.cfg.CorpusUploadPath, mgr.name+"-corpus.db", f, mgr.cfg.PublishGCS) } func (mgr *Manager) uploadBenchData() error { @@ -1012,7 +1033,7 @@ func (mgr *Manager) uploadBenchData() error { return fmt.Errorf("failed to open bench file: %w", err) } defer f.Close() - err = uploadFile(mgr.cfg.BenchUploadPath+"/"+mgr.name, + err = uploadFile(context.Background(), nil, mgr.cfg.BenchUploadPath+"/"+mgr.name, mgr.lastRestarted.Format("2006-01-02_15h.json"), f, false) if err != nil { return fmt.Errorf("failed to upload the bench file: %w", err) @@ -1020,7 +1041,7 @@ func (mgr *Manager) uploadBenchData() error { return nil } -func uploadFile(dstPath, name string, file io.Reader, publish bool) error { +func uploadFile(ctx context.Context, gcsClient gcs.Client, dstPath, name string, file io.Reader, publish bool) error { URL, err := url.Parse(dstPath) if err != nil { return fmt.Errorf("failed to parse upload path: %w", err) @@ -1030,13 +1051,16 @@ func uploadFile(dstPath, name string, file io.Reader, publish bool) error { log.Logf(0, "uploading %v to %v", name, URLStr) if strings.HasPrefix(URLStr, "http://") || strings.HasPrefix(URLStr, "https://") { - return uploadFileHTTPPut(URLStr, file) + if gcsClient != nil { + return fmt.Errorf("gcsClient is expected to be nil for the http* requests") + } + return uploadFileHTTPPut(ctx, URLStr, file) } - return gcs.UploadFile(context.Background(), file, URLStr, gcs.UploadOptions{Publish: publish}) + return gcs.UploadFile(ctx, file, URLStr, gcs.UploadOptions{Publish: publish, GCSClientMock: gcsClient}) } -func uploadFileHTTPPut(URL string, file io.Reader) error { - req, err := http.NewRequest(http.MethodPut, URL, file) +func uploadFileHTTPPut(ctx context.Context, URL string, file io.Reader) error { + req, err := http.NewRequestWithContext(ctx, http.MethodPut, URL, file) if err != nil { return fmt.Errorf("failed to create HTTP PUT request: %w", err) } diff --git a/syz-ci/manager_test.go b/syz-ci/manager_test.go index d8366102b..42282711d 100644 --- a/syz-ci/manager_test.go +++ b/syz-ci/manager_test.go @@ -4,10 +4,21 @@ package main import ( + "bytes" + "compress/gzip" + "encoding/json" "fmt" + "io" + "net/http" + "net/http/httptest" + "strings" "testing" + "time" "github.com/google/syzkaller/dashboard/dashapi" + "github.com/google/syzkaller/pkg/cover" + gcsmocks "github.com/google/syzkaller/pkg/gcs/mocks" + "github.com/google/syzkaller/pkg/mgrconfig" "github.com/google/syzkaller/pkg/vcs" "github.com/google/syzkaller/sys/targets" "github.com/stretchr/testify/assert" @@ -106,3 +117,129 @@ Reported-by: foo+abcd000@bar.com`, assert.Equal(t, commit.Title, "title with fix") assert.ElementsMatch(t, commit.BugIDs, []string{"abcd000"}) } + +func TestUploadCoverJSONLToGCS(t *testing.T) { + tests := []struct { + name string + + inputJSONL string + inputTime time.Time + + inputCompress bool + inputPublish bool + + wantGCSFileName string + wantGCSFileContent string + wantCompressed bool + wantPublish bool + wantError string + }{ + { + name: "upload single object", + inputJSONL: "{}", + inputTime: time.Time{}, + wantGCSFileName: "test-bucket/test-namespace/mgr-name-0001-01-01-0-0.jsonl", + wantGCSFileContent: "{}\n", + }, + { + name: "upload single object, compress", + inputJSONL: "{}", + inputTime: time.Time{}, + inputCompress: true, + wantGCSFileName: "test-bucket/test-namespace/mgr-name-0001-01-01-0-0.jsonl", + wantGCSFileContent: "{}\n", + wantCompressed: true, + }, + { + name: "upload single object, publish", + inputJSONL: "{}", + inputTime: time.Time{}, + inputPublish: true, + wantGCSFileName: "test-bucket/test-namespace/mgr-name-0001-01-01-0-0.jsonl", + wantGCSFileContent: "{}\n", + wantPublish: true, + }, + { + name: "upload single object, error", + inputJSONL: "{", + inputTime: time.Time{}, + wantGCSFileName: "test-bucket/test-namespace/mgr-name-0001-01-01-0-0.jsonl", + wantError: "callback: cover.ProgramCoverage: unexpected EOF", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + httpServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + w.Write([]byte(test.inputJSONL)) + })) + defer httpServer.Close() + + testSetverAddrPort, _ := strings.CutPrefix(httpServer.URL, "http://") + mgr := Manager{ + name: "mgr-name", + managercfg: &mgrconfig.Config{ + HTTP: testSetverAddrPort, + Cover: true, + }, + mgrcfg: &ManagerConfig{ + DashboardClient: "test-namespace", + }, + } + + gcsMock := gcsmocks.NewClient(t) + gotBytes := mockWriteCloser{} + + gcsMock.On("FileWriter", test.wantGCSFileName, "", ""). + Return(&gotBytes, nil).Once() + gcsMock.On("Close").Return(nil).Once() + if test.wantPublish { + gcsMock.On("Publish", test.wantGCSFileName). + Return(nil).Once() + } + err := mgr.uploadCoverJSONLToGCS(gcsMock, + "/teststream&jsonl=1", + "gs://test-bucket", + time.Time{}, test.inputPublish, test.inputCompress, func(w io.Writer, dec *json.Decoder) error { + var v any + if err := dec.Decode(&v); err != nil { + return fmt.Errorf("cover.ProgramCoverage: %w", err) + } + if err := cover.WriteJSLine(w, &v); err != nil { + return fmt.Errorf("cover.WriteJSLine: %w", err) + } + return nil + }) + if test.wantError != "" { + assert.Equal(t, test.wantError, err.Error()) + } else { + assert.NoError(t, err) + } + assert.Equal(t, 1, gotBytes.closedTimes) + if test.wantCompressed { + gzReader, err := gzip.NewReader(&gotBytes.buf) + assert.NoError(t, err) + defer gzReader.Close() + plainBytes := mockWriteCloser{} + _, err = io.Copy(&plainBytes, gzReader) + assert.NoError(t, err) + gotBytes = plainBytes + } + assert.Equal(t, test.wantGCSFileContent, gotBytes.buf.String()) + }) + } +} + +type mockWriteCloser struct { + buf bytes.Buffer + closedTimes int +} + +func (m *mockWriteCloser) Write(p []byte) (n int, err error) { + return m.buf.Write(p) +} + +func (m *mockWriteCloser) Close() error { + m.closedTimes++ + return nil +} |
