diff options
| -rw-r--r-- | pkg/covermerger/bq_csv_reader.go | 124 | ||||
| -rwxr-xr-x | tools/syz-bq.sh | 29 | ||||
| -rw-r--r-- | tools/syz-covermerger/syz_covermerger.go | 31 |
3 files changed, 148 insertions, 36 deletions
diff --git a/pkg/covermerger/bq_csv_reader.go b/pkg/covermerger/bq_csv_reader.go new file mode 100644 index 000000000..d1bf77881 --- /dev/null +++ b/pkg/covermerger/bq_csv_reader.go @@ -0,0 +1,124 @@ +// Copyright 2024 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package covermerger + +import ( + "compress/gzip" + "context" + "fmt" + "io" + "regexp" + + "cloud.google.com/go/bigquery" + "cloud.google.com/go/civil" + "github.com/google/syzkaller/pkg/gcs" + "github.com/google/uuid" +) + +var allowedFilePath = regexp.MustCompile(`^[./_a-zA-Z0-9]*$`) + +func isAllowedFilePath(s string) bool { + return allowedFilePath.MatchString(s) +} + +type bqCSVReader struct { + closers []io.Closer + gcsFiles []io.Reader +} + +func (r *bqCSVReader) Close() { + for _, c := range r.closers { + c.Close() + } +} + +func MakeBQCSVReader() *bqCSVReader { + return &bqCSVReader{} +} + +func (r *bqCSVReader) InitNsRecords(ctx context.Context, ns, filePath string, from, to civil.Date) error { + if !isAllowedFilePath(filePath) { + return fmt.Errorf("wrong file path '%s'", filePath) + } + sessionUUID := uuid.New().String() + gsBucket := "syzbot-temp" + gsPath := fmt.Sprintf("bq-exports/%s", sessionUUID) + gsURI := "gs://" + gsBucket + "/" + gsPath + "/*.csv.gz" + client, err := bigquery.NewClient(ctx, "syzkaller") + client.EnableStorageReadClient(ctx) + if err != nil { + return fmt.Errorf("failed to initialize bigquery client: %w", err) + } + q := client.Query(fmt.Sprintf(` + EXPORT DATA + OPTIONS ( + uri = "%s", + format = "CSV", + overwrite = true, + header = true, + compression = "GZIP") + AS ( + SELECT + kernel_repo, kernel_branch, kernel_commit, file_path, sl, SUM(hit_count) as hit_count + FROM syzkaller.syzbot_coverage.%s + WHERE + TIMESTAMP_TRUNC(timestamp, DAY) >= "%s" AND + TIMESTAMP_TRUNC(timestamp, DAY) <= "%s" AND + version = 1 AND + starts_with(file_path, "%s") + GROUP BY file_path, kernel_commit, kernel_repo, kernel_branch, sl + ORDER BY file_path + ); + `, gsURI, ns, from.String(), to.String(), filePath)) + job, err := q.Run(ctx) + if err != nil { + return fmt.Errorf("err during bigquery.Run: %w", err) + } + status, err := job.Wait(ctx) + if err != nil { + return fmt.Errorf("err waiting for the bigquery.Job: %w", err) + } + if status.Err() != nil { + return fmt.Errorf("bigquery job failed with status %w", status.Err()) + } + return r.initGCSFileReaders(ctx, gsBucket, gsPath) +} + +func (r *bqCSVReader) initGCSFileReaders(ctx context.Context, bucket, path string) error { + var gcsClient *gcs.Client + var err error + if gcsClient, err = gcs.NewClient(ctx); err != nil { + return fmt.Errorf("err creating gcs client: %w", err) + } + var gcsFiles []*gcs.Object + if gcsFiles, err = gcsClient.ListObjects(bucket + "/" + path); err != nil { + return fmt.Errorf("err enumerating gcs files: %w", err) + } + for _, obj := range gcsFiles { + var file *gcs.File + if file, err = gcsClient.Read(bucket + "/" + obj.Path); err != nil { + return fmt.Errorf("failed to start reading %s: %w", obj.Path, err) + } + var readCloser io.ReadCloser + if readCloser, err = file.Reader(); err != nil { + return fmt.Errorf("failed to get %s reader: %w", obj.Path, err) + } + r.closers = append(r.closers, readCloser) + r.gcsFiles = append(r.gcsFiles, readCloser) + } + return nil +} + +func (r *bqCSVReader) Reader() (io.Reader, error) { + var readers []io.Reader + for _, file := range r.gcsFiles { + gzipReaderCloser, err := gzip.NewReader(file) + if err != nil { + return nil, fmt.Errorf("err calling gzip.NewReader: %w", err) + } + r.closers = append(r.closers, gzipReaderCloser) + readers = append(readers, gzipReaderCloser) + } + return io.MultiReader(readers...), nil +} diff --git a/tools/syz-bq.sh b/tools/syz-bq.sh index f15de82cf..7d070a3b7 100755 --- a/tools/syz-bq.sh +++ b/tools/syz-bq.sh @@ -95,35 +95,6 @@ else echo $total_rows rows are available for processing fi -sessionID=$(cat /proc/sys/kernel/random/uuid) -gsURI=$(echo gs://syzbot-temp/bq-exports/${sessionID}/*.csv.gz) -echo fetching data from bigquery -query=$( echo -n ' -EXPORT DATA - OPTIONS ( - uri = "'$gsURI'", - format = "CSV", - overwrite = true, - header = true, - compression = "GZIP") -AS ( - SELECT - kernel_repo, kernel_branch, kernel_commit, file_path, sl, SUM(hit_count) as hit_count - FROM syzkaller.syzbot_coverage.`'$namespace'` - WHERE - TIMESTAMP_TRUNC(timestamp, DAY) >= "'$from_date'" AND - TIMESTAMP_TRUNC(timestamp, DAY) <= "'$to_date'" AND - version = 1 - GROUP BY file_path, kernel_commit, kernel_repo, kernel_branch, sl - ORDER BY file_path -); -') - -bq query --format=csv --use_legacy_sql=false "$query" -sessionDir="$workdir/sessions/$sessionID" -mkdir -p $sessionDir -gsutil -m cp $gsURI $sessionDir -cat $sessionDir/*.csv.gz | gunzip | \ go run ./tools/syz-covermerger/ -workdir $workdir \ -repo $repo \ -branch $branch \ diff --git a/tools/syz-covermerger/syz_covermerger.go b/tools/syz-covermerger/syz_covermerger.go index d3b9de2ae..05bd9f4d2 100644 --- a/tools/syz-covermerger/syz_covermerger.go +++ b/tools/syz-covermerger/syz_covermerger.go @@ -4,10 +4,10 @@ package main import ( + "context" "flag" "fmt" "log" - "os" "runtime" "sort" @@ -57,15 +57,32 @@ func main() { }, FileVersProvider: makeProvider(), } - mergeResult, err := covermerger.MergeCSVData(config, os.Stdin) - if err != nil { - panic(err) - } - printMergeResult(mergeResult) - var dateTo civil.Date + var dateFrom, dateTo civil.Date + var err error if dateTo, err = civil.ParseDate(*flagDateTo); err != nil { panic(fmt.Sprintf("failed to parse time_to: %s", err.Error())) } + dateFrom = dateTo.AddDays(-int(*flagDuration)) + dbReader := covermerger.MakeBQCSVReader() + if err = dbReader.InitNsRecords(context.Background(), + *flagNamespace, + "", + dateFrom, + dateTo, + ); err != nil { + panic(fmt.Sprintf("failed to dbReader.InitNsRecords: %v", err.Error())) + } + defer dbReader.Close() + csvReader, errReader := dbReader.Reader() + if errReader != nil { + panic(fmt.Sprintf("failed to dbReader.Reader: %v", errReader.Error())) + } + mergeResult, errMerge := covermerger.MergeCSVData(config, csvReader) + if errMerge != nil { + panic(errMerge) + } + printMergeResult(mergeResult) + coverage, _, _ := mergeResultsToCoverage(mergeResult) if *flagToDashAPI != "" { if err := saveCoverage(*flagToDashAPI, *flagDashboardClientName, &dashapi.MergedCoverage{ |
