aboutsummaryrefslogtreecommitdiffstats
path: root/syz-ci
diff options
context:
space:
mode:
authorTaras Madan <tarasmadan@google.com>2024-03-20 15:36:37 +0100
committerTaras Madan <tarasmadan@google.com>2024-03-21 16:38:22 +0000
commit0a84219ec0829db2fb2d804eb0a9cbc0c5e6b4b1 (patch)
tree586e697f7e99058baf4bc7f3b25d5ae208a94817 /syz-ci
parentc2b3b7068cafe4a798f6cf1d9e65bf105b7e33fc (diff)
syz-ci, pkg/cover: stream coverage from manager to gcs
Diffstat (limited to 'syz-ci')
-rw-r--r--syz-ci/manager.go113
-rw-r--r--syz-ci/syz-ci.go3
2 files changed, 92 insertions, 24 deletions
diff --git a/syz-ci/manager.go b/syz-ci/manager.go
index d4021eac5..2f86325e6 100644
--- a/syz-ci/manager.go
+++ b/syz-ci/manager.go
@@ -5,6 +5,7 @@ package main
import (
"crypto/sha256"
+ "encoding/json"
"errors"
"fmt"
"io"
@@ -22,6 +23,7 @@ import (
"github.com/google/syzkaller/pkg/asset"
"github.com/google/syzkaller/pkg/build"
"github.com/google/syzkaller/pkg/config"
+ "github.com/google/syzkaller/pkg/cover"
"github.com/google/syzkaller/pkg/gcs"
"github.com/google/syzkaller/pkg/hash"
"github.com/google/syzkaller/pkg/instance"
@@ -171,6 +173,8 @@ var buildSem = instance.NewSemaphore(1)
// simultaneous env.Test calls.
var testSem = instance.NewSemaphore(1)
+const fuzzingMinutesBeforeCover = 360
+
func (mgr *Manager) loop() {
lastCommit := ""
nextBuildTime := time.Now()
@@ -205,10 +209,11 @@ loop:
if err := mgr.uploadCoverReport(); err != nil {
mgr.Errorf("failed to upload cover report: %v", err)
}
- if mgr.cfg.CorpusUploadPath != "" {
- if err := mgr.uploadCorpus(); err != nil {
- mgr.Errorf("failed to upload corpus: %v", err)
- }
+ if err := mgr.uploadCoverStat(fuzzingMinutesBeforeCover); err != nil {
+ mgr.Errorf("failed to upload coverage stat: %v", err)
+ }
+ if err := mgr.uploadCorpus(); err != nil {
+ mgr.Errorf("failed to upload corpus: %v", err)
}
}
@@ -222,7 +227,7 @@ loop:
managerRestartTime = latestInfo.Time
mgr.restartManager()
if mgr.cmd != nil {
- artifactUploadTime = time.Now().Add(6 * time.Hour)
+ artifactUploadTime = time.Now().Add(fuzzingMinutesBeforeCover * time.Minute)
}
}
@@ -809,6 +814,17 @@ func (mgr *Manager) uploadBuildAssets(buildInfo *dashapi.Build, assetFolder stri
return ret, nil
}
+func (mgr *Manager) httpGET(path string) (resp *http.Response, err error) {
+ addr := mgr.managercfg.HTTP
+ if addr != "" && addr[0] == ':' {
+ addr = "127.0.0.1" + addr // in case addr is ":port"
+ }
+ client := &http.Client{
+ Timeout: time.Hour,
+ }
+ return client.Get(fmt.Sprintf("http://%s%s", addr, path))
+}
+
func (mgr *Manager) uploadCoverReport() error {
directUpload := mgr.managercfg.Cover && mgr.cfg.CoverUploadPath != ""
if mgr.storage == nil && !directUpload {
@@ -826,21 +842,13 @@ func (mgr *Manager) uploadCoverReport() error {
}
defer buildSem.Signal()
- // Get coverage report from manager.
- addr := mgr.managercfg.HTTP
- if addr != "" && addr[0] == ':' {
- addr = "127.0.0.1" + addr // in case addr is ":port"
- }
- client := http.Client{
- Timeout: time.Hour,
- }
- resp, err := client.Get(fmt.Sprintf("http://%v/cover", addr))
+ resp, err := mgr.httpGET("/cover")
if err != nil {
return fmt.Errorf("failed to get report: %w", err)
}
defer resp.Body.Close()
if directUpload {
- return mgr.uploadFile(mgr.cfg.CoverUploadPath, mgr.name+".html", resp.Body)
+ return mgr.uploadFile(mgr.cfg.CoverUploadPath, mgr.name+".html", resp.Body, true)
}
// Upload via the asset storage.
newAsset, err := mgr.storage.UploadBuildAsset(resp.Body, mgr.name+".html",
@@ -855,16 +863,77 @@ func (mgr *Manager) uploadCoverReport() error {
return nil
}
+func (mgr *Manager) uploadCoverStat(fuzzingMinutes int) error {
+ if !mgr.managercfg.Cover || mgr.cfg.CoverPipelinePath == "" {
+ return nil
+ }
+
+ // Report generation consumes 40G RAM. Generate one at a time.
+ // TODO: remove it once #4585 (symbolization tuning) is closed
+ select {
+ case <-buildSem.WaitC():
+ case <-mgr.stop:
+ return nil
+ }
+ defer buildSem.Signal()
+
+ resp, err := mgr.httpGET("/cover?jsonl=1")
+ if err != nil {
+ return fmt.Errorf("failed to get /cover?json=1 report: %w", err)
+ }
+ defer resp.Body.Close()
+
+ curTime := time.Now()
+ pr, pw := io.Pipe()
+ defer pr.Close()
+ go func() {
+ decoder := json.NewDecoder(resp.Body)
+ for decoder.More() {
+ var covInfo cover.CoverageInfo
+ if err := decoder.Decode(&covInfo); err != nil {
+ pw.CloseWithError(fmt.Errorf("failed to decode CoverageInfo: %w", err))
+ return
+ }
+ if err := cover.WriteCIJSONLine(pw, covInfo, cover.CIDetails{
+ Version: 1,
+ Timestamp: curTime.Format(time.RFC3339Nano),
+ FuzzingMinutes: fuzzingMinutes,
+ Arch: mgr.lastBuild.Arch,
+ BuildID: mgr.lastBuild.ID,
+ Manager: mgr.name,
+ KernelRepo: mgr.lastBuild.KernelRepo,
+ KernelBranch: mgr.lastBuild.KernelBranch,
+ KernelCommit: mgr.lastBuild.KernelCommit,
+ }); err != nil {
+ pw.CloseWithError(fmt.Errorf("failed to write CIJSONLine: %w", err))
+ return
+ }
+ }
+ pw.Close()
+ }()
+ fileName := fmt.Sprintf("%s-%s-%d-%d.jsonl",
+ mgr.name, curTime.Format(time.DateOnly),
+ curTime.Hour(), curTime.Minute())
+ err = mgr.uploadFile(mgr.cfg.CoverPipelinePath, fileName, pr, false)
+ if err != nil {
+ return fmt.Errorf("failed to uploadFileGCS(): %w", err)
+ }
+ return nil
+}
+
func (mgr *Manager) uploadCorpus() error {
+ if mgr.cfg.CorpusUploadPath == "" {
+ return nil
+ }
f, err := os.Open(filepath.Join(mgr.workDir, "corpus.db"))
if err != nil {
return err
}
defer f.Close()
- return mgr.uploadFile(mgr.cfg.CorpusUploadPath, mgr.name+"-corpus.db", f)
+ return mgr.uploadFile(mgr.cfg.CorpusUploadPath, mgr.name+"-corpus.db", f, true)
}
-func (mgr *Manager) uploadFile(dstPath, name string, file io.Reader) error {
+func (mgr *Manager) uploadFile(dstPath, name string, file io.Reader, allowPublishing bool) error {
URL, err := url.Parse(dstPath)
if err != nil {
return fmt.Errorf("failed to parse upload path: %w", err)
@@ -872,18 +941,14 @@ func (mgr *Manager) uploadFile(dstPath, name string, file io.Reader) error {
URL.Path = path.Join(URL.Path, name)
URLStr := URL.String()
log.Logf(0, "uploading %v to %v", name, URLStr)
- if strings.HasPrefix(URLStr, "gs://") {
- return uploadFileGCS(strings.TrimPrefix(URLStr, "gs://"), file, mgr.cfg.PublishGCS)
- }
- if strings.HasPrefix(URLStr, "http://") ||
- strings.HasPrefix(URLStr, "https://") {
+ if strings.HasPrefix(URLStr, "http") {
return uploadFileHTTPPut(URLStr, file)
}
- // Use GCS as default to maintain backwards compatibility.
- return uploadFileGCS(URLStr, file, mgr.cfg.PublishGCS)
+ return uploadFileGCS(URLStr, file, allowPublishing && mgr.cfg.PublishGCS)
}
func uploadFileGCS(URL string, file io.Reader, publish bool) error {
+ URL = strings.TrimPrefix(URL, "gs://")
GCS, err := gcs.NewClient()
if err != nil {
return fmt.Errorf("failed to create GCS client: %w", err)
diff --git a/syz-ci/syz-ci.go b/syz-ci/syz-ci.go
index 7ec88c53e..9c59870ca 100644
--- a/syz-ci/syz-ci.go
+++ b/syz-ci/syz-ci.go
@@ -105,6 +105,9 @@ type Config struct {
// Path to upload coverage reports from managers (optional).
// Supported protocols: GCS (gs://) and HTTP PUT (http:// or https://).
CoverUploadPath string `json:"cover_upload_path"`
+ // Path to upload json coverage reports from managers (optional).
+ // Supported protocol: GCS (gs://)
+ CoverPipelinePath string `json:"cover_pipeline_path"`
// Path to upload corpus.db from managers (optional).
// Supported protocols: GCS (gs://) and HTTP PUT (http:// or https://).
CorpusUploadPath string `json:"corpus_upload_path"`