diff options
| author | Taras Madan <tarasmadan@google.com> | 2024-08-12 16:15:13 +0200 |
|---|---|---|
| committer | Taras Madan <tarasmadan@google.com> | 2024-08-13 16:01:47 +0000 |
| commit | bde81f6f54f0b2fa4440884eeffa7db36b70f837 (patch) | |
| tree | 25ef99c49caea6bf937781168b3ceb7a817e4d97 /pkg | |
| parent | 068ad4fdb4cc546e45708cc40115e2baf3c49830 (diff) | |
tools/syz-bq.sh: rewrite logic in golang
Diffstat (limited to 'pkg')
| -rw-r--r-- | pkg/covermerger/bq_csv_reader.go | 124 |
1 files changed, 124 insertions, 0 deletions
diff --git a/pkg/covermerger/bq_csv_reader.go b/pkg/covermerger/bq_csv_reader.go new file mode 100644 index 000000000..d1bf77881 --- /dev/null +++ b/pkg/covermerger/bq_csv_reader.go @@ -0,0 +1,124 @@ +// Copyright 2024 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package covermerger + +import ( + "compress/gzip" + "context" + "fmt" + "io" + "regexp" + + "cloud.google.com/go/bigquery" + "cloud.google.com/go/civil" + "github.com/google/syzkaller/pkg/gcs" + "github.com/google/uuid" +) + +var allowedFilePath = regexp.MustCompile(`^[./_a-zA-Z0-9]*$`) + +func isAllowedFilePath(s string) bool { + return allowedFilePath.MatchString(s) +} + +type bqCSVReader struct { + closers []io.Closer + gcsFiles []io.Reader +} + +func (r *bqCSVReader) Close() { + for _, c := range r.closers { + c.Close() + } +} + +func MakeBQCSVReader() *bqCSVReader { + return &bqCSVReader{} +} + +func (r *bqCSVReader) InitNsRecords(ctx context.Context, ns, filePath string, from, to civil.Date) error { + if !isAllowedFilePath(filePath) { + return fmt.Errorf("wrong file path '%s'", filePath) + } + sessionUUID := uuid.New().String() + gsBucket := "syzbot-temp" + gsPath := fmt.Sprintf("bq-exports/%s", sessionUUID) + gsURI := "gs://" + gsBucket + "/" + gsPath + "/*.csv.gz" + client, err := bigquery.NewClient(ctx, "syzkaller") + client.EnableStorageReadClient(ctx) + if err != nil { + return fmt.Errorf("failed to initialize bigquery client: %w", err) + } + q := client.Query(fmt.Sprintf(` + EXPORT DATA + OPTIONS ( + uri = "%s", + format = "CSV", + overwrite = true, + header = true, + compression = "GZIP") + AS ( + SELECT + kernel_repo, kernel_branch, kernel_commit, file_path, sl, SUM(hit_count) as hit_count + FROM syzkaller.syzbot_coverage.%s + WHERE + TIMESTAMP_TRUNC(timestamp, DAY) >= "%s" AND + TIMESTAMP_TRUNC(timestamp, DAY) <= "%s" AND + version = 1 AND + starts_with(file_path, "%s") + GROUP BY file_path, kernel_commit, kernel_repo, kernel_branch, sl + ORDER BY file_path + ); + `, gsURI, ns, from.String(), to.String(), filePath)) + job, err := q.Run(ctx) + if err != nil { + return fmt.Errorf("err during bigquery.Run: %w", err) + } + status, err := job.Wait(ctx) + if err != nil { + return fmt.Errorf("err waiting for the bigquery.Job: %w", err) + } + if status.Err() != nil { + return fmt.Errorf("bigquery job failed with status %w", status.Err()) + } + return r.initGCSFileReaders(ctx, gsBucket, gsPath) +} + +func (r *bqCSVReader) initGCSFileReaders(ctx context.Context, bucket, path string) error { + var gcsClient *gcs.Client + var err error + if gcsClient, err = gcs.NewClient(ctx); err != nil { + return fmt.Errorf("err creating gcs client: %w", err) + } + var gcsFiles []*gcs.Object + if gcsFiles, err = gcsClient.ListObjects(bucket + "/" + path); err != nil { + return fmt.Errorf("err enumerating gcs files: %w", err) + } + for _, obj := range gcsFiles { + var file *gcs.File + if file, err = gcsClient.Read(bucket + "/" + obj.Path); err != nil { + return fmt.Errorf("failed to start reading %s: %w", obj.Path, err) + } + var readCloser io.ReadCloser + if readCloser, err = file.Reader(); err != nil { + return fmt.Errorf("failed to get %s reader: %w", obj.Path, err) + } + r.closers = append(r.closers, readCloser) + r.gcsFiles = append(r.gcsFiles, readCloser) + } + return nil +} + +func (r *bqCSVReader) Reader() (io.Reader, error) { + var readers []io.Reader + for _, file := range r.gcsFiles { + gzipReaderCloser, err := gzip.NewReader(file) + if err != nil { + return nil, fmt.Errorf("err calling gzip.NewReader: %w", err) + } + r.closers = append(r.closers, gzipReaderCloser) + readers = append(readers, gzipReaderCloser) + } + return io.MultiReader(readers...), nil +} |
