aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--pkg/covermerger/bq_csv_reader.go124
-rwxr-xr-xtools/syz-bq.sh29
-rw-r--r--tools/syz-covermerger/syz_covermerger.go31
3 files changed, 148 insertions, 36 deletions
diff --git a/pkg/covermerger/bq_csv_reader.go b/pkg/covermerger/bq_csv_reader.go
new file mode 100644
index 000000000..d1bf77881
--- /dev/null
+++ b/pkg/covermerger/bq_csv_reader.go
@@ -0,0 +1,124 @@
+// Copyright 2024 syzkaller project authors. All rights reserved.
+// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
+
+package covermerger
+
+import (
+ "compress/gzip"
+ "context"
+ "fmt"
+ "io"
+ "regexp"
+
+ "cloud.google.com/go/bigquery"
+ "cloud.google.com/go/civil"
+ "github.com/google/syzkaller/pkg/gcs"
+ "github.com/google/uuid"
+)
+
+var allowedFilePath = regexp.MustCompile(`^[./_a-zA-Z0-9]*$`)
+
+func isAllowedFilePath(s string) bool {
+ return allowedFilePath.MatchString(s)
+}
+
+type bqCSVReader struct {
+ closers []io.Closer
+ gcsFiles []io.Reader
+}
+
+func (r *bqCSVReader) Close() {
+ for _, c := range r.closers {
+ c.Close()
+ }
+}
+
+func MakeBQCSVReader() *bqCSVReader {
+ return &bqCSVReader{}
+}
+
+func (r *bqCSVReader) InitNsRecords(ctx context.Context, ns, filePath string, from, to civil.Date) error {
+ if !isAllowedFilePath(filePath) {
+ return fmt.Errorf("wrong file path '%s'", filePath)
+ }
+ sessionUUID := uuid.New().String()
+ gsBucket := "syzbot-temp"
+ gsPath := fmt.Sprintf("bq-exports/%s", sessionUUID)
+ gsURI := "gs://" + gsBucket + "/" + gsPath + "/*.csv.gz"
+ client, err := bigquery.NewClient(ctx, "syzkaller")
+ client.EnableStorageReadClient(ctx)
+ if err != nil {
+ return fmt.Errorf("failed to initialize bigquery client: %w", err)
+ }
+ q := client.Query(fmt.Sprintf(`
+ EXPORT DATA
+ OPTIONS (
+ uri = "%s",
+ format = "CSV",
+ overwrite = true,
+ header = true,
+ compression = "GZIP")
+ AS (
+ SELECT
+ kernel_repo, kernel_branch, kernel_commit, file_path, sl, SUM(hit_count) as hit_count
+ FROM syzkaller.syzbot_coverage.%s
+ WHERE
+ TIMESTAMP_TRUNC(timestamp, DAY) >= "%s" AND
+ TIMESTAMP_TRUNC(timestamp, DAY) <= "%s" AND
+ version = 1 AND
+ starts_with(file_path, "%s")
+ GROUP BY file_path, kernel_commit, kernel_repo, kernel_branch, sl
+ ORDER BY file_path
+ );
+ `, gsURI, ns, from.String(), to.String(), filePath))
+ job, err := q.Run(ctx)
+ if err != nil {
+ return fmt.Errorf("err during bigquery.Run: %w", err)
+ }
+ status, err := job.Wait(ctx)
+ if err != nil {
+ return fmt.Errorf("err waiting for the bigquery.Job: %w", err)
+ }
+ if status.Err() != nil {
+ return fmt.Errorf("bigquery job failed with status %w", status.Err())
+ }
+ return r.initGCSFileReaders(ctx, gsBucket, gsPath)
+}
+
+func (r *bqCSVReader) initGCSFileReaders(ctx context.Context, bucket, path string) error {
+ var gcsClient *gcs.Client
+ var err error
+ if gcsClient, err = gcs.NewClient(ctx); err != nil {
+ return fmt.Errorf("err creating gcs client: %w", err)
+ }
+ var gcsFiles []*gcs.Object
+ if gcsFiles, err = gcsClient.ListObjects(bucket + "/" + path); err != nil {
+ return fmt.Errorf("err enumerating gcs files: %w", err)
+ }
+ for _, obj := range gcsFiles {
+ var file *gcs.File
+ if file, err = gcsClient.Read(bucket + "/" + obj.Path); err != nil {
+ return fmt.Errorf("failed to start reading %s: %w", obj.Path, err)
+ }
+ var readCloser io.ReadCloser
+ if readCloser, err = file.Reader(); err != nil {
+ return fmt.Errorf("failed to get %s reader: %w", obj.Path, err)
+ }
+ r.closers = append(r.closers, readCloser)
+ r.gcsFiles = append(r.gcsFiles, readCloser)
+ }
+ return nil
+}
+
+func (r *bqCSVReader) Reader() (io.Reader, error) {
+ var readers []io.Reader
+ for _, file := range r.gcsFiles {
+ gzipReaderCloser, err := gzip.NewReader(file)
+ if err != nil {
+ return nil, fmt.Errorf("err calling gzip.NewReader: %w", err)
+ }
+ r.closers = append(r.closers, gzipReaderCloser)
+ readers = append(readers, gzipReaderCloser)
+ }
+ return io.MultiReader(readers...), nil
+}
diff --git a/tools/syz-bq.sh b/tools/syz-bq.sh
index f15de82cf..7d070a3b7 100755
--- a/tools/syz-bq.sh
+++ b/tools/syz-bq.sh
@@ -95,35 +95,6 @@ else
echo $total_rows rows are available for processing
fi
-sessionID=$(cat /proc/sys/kernel/random/uuid)
-gsURI=$(echo gs://syzbot-temp/bq-exports/${sessionID}/*.csv.gz)
-echo fetching data from bigquery
-query=$( echo -n '
-EXPORT DATA
- OPTIONS (
- uri = "'$gsURI'",
- format = "CSV",
- overwrite = true,
- header = true,
- compression = "GZIP")
-AS (
- SELECT
- kernel_repo, kernel_branch, kernel_commit, file_path, sl, SUM(hit_count) as hit_count
- FROM syzkaller.syzbot_coverage.`'$namespace'`
- WHERE
- TIMESTAMP_TRUNC(timestamp, DAY) >= "'$from_date'" AND
- TIMESTAMP_TRUNC(timestamp, DAY) <= "'$to_date'" AND
- version = 1
- GROUP BY file_path, kernel_commit, kernel_repo, kernel_branch, sl
- ORDER BY file_path
-);
-')
-
-bq query --format=csv --use_legacy_sql=false "$query"
-sessionDir="$workdir/sessions/$sessionID"
-mkdir -p $sessionDir
-gsutil -m cp $gsURI $sessionDir
-cat $sessionDir/*.csv.gz | gunzip | \
go run ./tools/syz-covermerger/ -workdir $workdir \
-repo $repo \
-branch $branch \
diff --git a/tools/syz-covermerger/syz_covermerger.go b/tools/syz-covermerger/syz_covermerger.go
index d3b9de2ae..05bd9f4d2 100644
--- a/tools/syz-covermerger/syz_covermerger.go
+++ b/tools/syz-covermerger/syz_covermerger.go
@@ -4,10 +4,10 @@
package main
import (
+ "context"
"flag"
"fmt"
"log"
- "os"
"runtime"
"sort"
@@ -57,15 +57,32 @@ func main() {
},
FileVersProvider: makeProvider(),
}
- mergeResult, err := covermerger.MergeCSVData(config, os.Stdin)
- if err != nil {
- panic(err)
- }
- printMergeResult(mergeResult)
- var dateTo civil.Date
+ var dateFrom, dateTo civil.Date
+ var err error
if dateTo, err = civil.ParseDate(*flagDateTo); err != nil {
panic(fmt.Sprintf("failed to parse time_to: %s", err.Error()))
}
+ dateFrom = dateTo.AddDays(-int(*flagDuration))
+ dbReader := covermerger.MakeBQCSVReader()
+ if err = dbReader.InitNsRecords(context.Background(),
+ *flagNamespace,
+ "",
+ dateFrom,
+ dateTo,
+ ); err != nil {
+ panic(fmt.Sprintf("failed to dbReader.InitNsRecords: %v", err.Error()))
+ }
+ defer dbReader.Close()
+ csvReader, errReader := dbReader.Reader()
+ if errReader != nil {
+ panic(fmt.Sprintf("failed to dbReader.Reader: %v", errReader.Error()))
+ }
+ mergeResult, errMerge := covermerger.MergeCSVData(config, csvReader)
+ if errMerge != nil {
+ panic(errMerge)
+ }
+ printMergeResult(mergeResult)
+
coverage, _, _ := mergeResultsToCoverage(mergeResult)
if *flagToDashAPI != "" {
if err := saveCoverage(*flagToDashAPI, *flagDashboardClientName, &dashapi.MergedCoverage{