diff options
Diffstat (limited to 'syz-ci/manager.go')
| -rw-r--r-- | syz-ci/manager.go | 86 |
1 files changed, 55 insertions, 31 deletions
diff --git a/syz-ci/manager.go b/syz-ci/manager.go index ff646b5ff..0fe796b47 100644 --- a/syz-ci/manager.go +++ b/syz-ci/manager.go @@ -4,6 +4,7 @@ package main import ( + "compress/gzip" "context" "crypto/sha256" "encoding/json" @@ -36,6 +37,7 @@ import ( "github.com/google/syzkaller/prog" _ "github.com/google/syzkaller/sys" "github.com/google/syzkaller/sys/targets" + "golang.org/x/sync/errgroup" ) // This is especially slightly longer than syzkaller rebuild period. @@ -831,7 +833,7 @@ func (mgr *Manager) uploadBuildAssets(buildInfo *dashapi.Build, assetFolder stri return ret, nil } -func (mgr *Manager) httpGET(path string) (resp *http.Response, err error) { +func (mgr *Manager) httpGET(ctx context.Context, path string) (resp *http.Response, err error) { addr := mgr.managercfg.HTTP if addr != "" && addr[0] == ':' { addr = "127.0.0.1" + addr // in case addr is ":port" @@ -839,7 +841,11 @@ func (mgr *Manager) httpGET(path string) (resp *http.Response, err error) { client := &http.Client{ Timeout: time.Hour, } - return client.Get(fmt.Sprintf("http://%s%s", addr, path)) + req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://%s%s", addr, path), nil) + if err != nil { + return nil, err + } + return client.Do(req) } func (mgr *Manager) uploadCoverReport() error { @@ -859,13 +865,13 @@ func (mgr *Manager) uploadCoverReport() error { } defer buildSem.Signal() - resp, err := mgr.httpGET("/cover") + resp, err := mgr.httpGET(context.Background(), "/cover") if err != nil { return fmt.Errorf("failed to get report: %w", err) } defer resp.Body.Close() if directUpload { - return uploadFile(mgr.cfg.CoverUploadPath, mgr.name+".html", resp.Body, mgr.cfg.PublishGCS) + return uploadFile(context.Background(), nil, mgr.cfg.CoverUploadPath, mgr.name+".html", resp.Body, mgr.cfg.PublishGCS) } // Upload via the asset storage. newAsset, err := mgr.storage.UploadBuildAsset(resp.Body, mgr.name+".html", @@ -880,8 +886,8 @@ func (mgr *Manager) uploadCoverReport() error { return nil } -func (mgr *Manager) uploadCoverJSONLToGCS(mgrSrc, gcsDest string, curTime time.Time, publish bool, - f func(io.Writer, *json.Decoder) error) error { +func (mgr *Manager) uploadCoverJSONLToGCS(gcsClient gcs.Client, mgrSrc, gcsDest string, curTime time.Time, + publish, compress bool, f func(io.Writer, *json.Decoder) error) error { if !mgr.managercfg.Cover || gcsDest == "" { return nil } @@ -895,7 +901,8 @@ func (mgr *Manager) uploadCoverJSONLToGCS(mgrSrc, gcsDest string, curTime time.T } defer buildSem.Signal() - resp, err := mgr.httpGET(mgrSrc) + eg, egCtx := errgroup.WithContext(context.Background()) + resp, err := mgr.httpGET(egCtx, mgrSrc) if err != nil { return fmt.Errorf("failed to httpGet %s: %w", mgrSrc, err) } @@ -908,25 +915,35 @@ func (mgr *Manager) uploadCoverJSONLToGCS(mgrSrc, gcsDest string, curTime time.T } pr, pw := io.Pipe() - defer pr.Close() - go func() { + eg.Go(func() error { + defer pw.Close() + var w io.Writer + w = pw + if compress { + gzw := gzip.NewWriter(pw) + defer gzw.Close() + w = gzw + } decoder := json.NewDecoder(resp.Body) for decoder.More() { - if err := f(pw, decoder); err != nil { - pw.CloseWithError(fmt.Errorf("callback: %w", err)) - return + if err := f(w, decoder); err != nil { + return fmt.Errorf("callback: %w", err) } } - pw.Close() - }() - fileName := fmt.Sprintf("%s/%s-%s-%d-%d.jsonl", - mgr.mgrcfg.DashboardClient, - mgr.name, curTime.Format(time.DateOnly), - curTime.Hour(), curTime.Minute()) - if err := uploadFile(gcsDest, fileName, pr, publish); err != nil { - return fmt.Errorf("failed to uploadFileGCS(): %w", err) - } - return nil + return nil + }) + eg.Go(func() error { + defer pr.Close() + fileName := fmt.Sprintf("%s/%s-%s-%d-%d.jsonl", + mgr.mgrcfg.DashboardClient, + mgr.name, curTime.Format(time.DateOnly), + curTime.Hour(), curTime.Minute()) + if err := uploadFile(egCtx, gcsClient, gcsDest, fileName, pr, publish); err != nil { + return fmt.Errorf("uploadFile: %w", err) + } + return nil + }) + return eg.Wait() } func (mgr *Manager) uploadCoverStat(fuzzingMinutes int) error { @@ -934,10 +951,12 @@ func (mgr *Manager) uploadCoverStat(fuzzingMinutes int) error { // In the syz-ci context report generation won't be used after this point, // so tell manager to flush report generator. curTime := time.Now() - if err := mgr.uploadCoverJSONLToGCS("/cover?jsonl=1&flush=1", + if err := mgr.uploadCoverJSONLToGCS(nil, + "/cover?jsonl=1&flush=1", mgr.cfg.CoverPipelinePath, curTime, false, + false, func(w io.Writer, dec *json.Decoder) error { var covInfo cover.CoverageInfo if err := dec.Decode(&covInfo); err != nil { @@ -964,10 +983,12 @@ func (mgr *Manager) uploadCoverStat(fuzzingMinutes int) error { } func (mgr *Manager) uploadProgramsWithCoverage() error { - if err := mgr.uploadCoverJSONLToGCS("/coverprogs?jsonl=1", + if err := mgr.uploadCoverJSONLToGCS(nil, + "/coverprogs?jsonl=1", mgr.cfg.CoverProgramsPath, time.Now(), mgr.cfg.PublishGCS, + true, func(w io.Writer, dec *json.Decoder) error { var programCoverage cover.ProgramCoverage if err := dec.Decode(&programCoverage); err != nil { @@ -994,7 +1015,7 @@ func (mgr *Manager) uploadCorpus() error { return err } defer f.Close() - return uploadFile(mgr.cfg.CorpusUploadPath, mgr.name+"-corpus.db", f, mgr.cfg.PublishGCS) + return uploadFile(context.Background(), nil, mgr.cfg.CorpusUploadPath, mgr.name+"-corpus.db", f, mgr.cfg.PublishGCS) } func (mgr *Manager) uploadBenchData() error { @@ -1012,7 +1033,7 @@ func (mgr *Manager) uploadBenchData() error { return fmt.Errorf("failed to open bench file: %w", err) } defer f.Close() - err = uploadFile(mgr.cfg.BenchUploadPath+"/"+mgr.name, + err = uploadFile(context.Background(), nil, mgr.cfg.BenchUploadPath+"/"+mgr.name, mgr.lastRestarted.Format("2006-01-02_15h.json"), f, false) if err != nil { return fmt.Errorf("failed to upload the bench file: %w", err) @@ -1020,7 +1041,7 @@ func (mgr *Manager) uploadBenchData() error { return nil } -func uploadFile(dstPath, name string, file io.Reader, publish bool) error { +func uploadFile(ctx context.Context, gcsClient gcs.Client, dstPath, name string, file io.Reader, publish bool) error { URL, err := url.Parse(dstPath) if err != nil { return fmt.Errorf("failed to parse upload path: %w", err) @@ -1030,13 +1051,16 @@ func uploadFile(dstPath, name string, file io.Reader, publish bool) error { log.Logf(0, "uploading %v to %v", name, URLStr) if strings.HasPrefix(URLStr, "http://") || strings.HasPrefix(URLStr, "https://") { - return uploadFileHTTPPut(URLStr, file) + if gcsClient != nil { + return fmt.Errorf("gcsClient is expected to be nil for the http* requests") + } + return uploadFileHTTPPut(ctx, URLStr, file) } - return gcs.UploadFile(context.Background(), file, URLStr, gcs.UploadOptions{Publish: publish}) + return gcs.UploadFile(ctx, file, URLStr, gcs.UploadOptions{Publish: publish, GCSClientMock: gcsClient}) } -func uploadFileHTTPPut(URL string, file io.Reader) error { - req, err := http.NewRequest(http.MethodPut, URL, file) +func uploadFileHTTPPut(ctx context.Context, URL string, file io.Reader) error { + req, err := http.NewRequestWithContext(ctx, http.MethodPut, URL, file) if err != nil { return fmt.Errorf("failed to create HTTP PUT request: %w", err) } |
