1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
// Copyright 2024 syzkaller project authors. All rights reserved.
// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
package covermerger
import (
"compress/gzip"
"context"
"fmt"
"io"
"regexp"
"cloud.google.com/go/bigquery"
"cloud.google.com/go/civil"
"github.com/google/syzkaller/pkg/gcs"
"github.com/google/uuid"
)
var allowedFilePath = regexp.MustCompile(`^[./_a-zA-Z0-9]*$`)
func isAllowedFilePath(s string) bool {
return allowedFilePath.MatchString(s)
}
type bqCSVReader struct {
closers []io.Closer
gcsFiles []io.Reader
}
func (r *bqCSVReader) Close() {
for _, c := range r.closers {
c.Close()
}
}
func MakeBQCSVReader() *bqCSVReader {
return &bqCSVReader{}
}
func (r *bqCSVReader) InitNsRecords(ctx context.Context, ns, filePath string, from, to civil.Date) error {
if !isAllowedFilePath(filePath) {
return fmt.Errorf("wrong file path '%s'", filePath)
}
sessionUUID := uuid.New().String()
gsBucket := "syzbot-temp"
gsPath := fmt.Sprintf("bq-exports/%s", sessionUUID)
gsURI := "gs://" + gsBucket + "/" + gsPath + "/*.csv.gz"
client, err := bigquery.NewClient(ctx, "syzkaller")
client.EnableStorageReadClient(ctx)
if err != nil {
return fmt.Errorf("failed to initialize bigquery client: %w", err)
}
q := client.Query(fmt.Sprintf(`
EXPORT DATA
OPTIONS (
uri = "%s",
format = "CSV",
overwrite = true,
header = true,
compression = "GZIP")
AS (
SELECT
kernel_repo, kernel_branch, kernel_commit, file_path, sl, SUM(hit_count) as hit_count
FROM syzkaller.syzbot_coverage.%s
WHERE
TIMESTAMP_TRUNC(timestamp, DAY) >= "%s" AND
TIMESTAMP_TRUNC(timestamp, DAY) <= "%s" AND
version = 1 AND
starts_with(file_path, "%s")
GROUP BY file_path, kernel_commit, kernel_repo, kernel_branch, sl
ORDER BY file_path
);
`, gsURI, ns, from.String(), to.String(), filePath))
job, err := q.Run(ctx)
if err != nil {
return fmt.Errorf("err during bigquery.Run: %w", err)
}
status, err := job.Wait(ctx)
if err != nil {
return fmt.Errorf("err waiting for the bigquery.Job: %w", err)
}
if status.Err() != nil {
return fmt.Errorf("bigquery job failed with status %w", status.Err())
}
return r.initGCSFileReaders(ctx, gsBucket, gsPath)
}
func (r *bqCSVReader) initGCSFileReaders(ctx context.Context, bucket, path string) error {
var gcsClient *gcs.Client
var err error
if gcsClient, err = gcs.NewClient(ctx); err != nil {
return fmt.Errorf("err creating gcs client: %w", err)
}
var gcsFiles []*gcs.Object
if gcsFiles, err = gcsClient.ListObjects(bucket + "/" + path); err != nil {
return fmt.Errorf("err enumerating gcs files: %w", err)
}
for _, obj := range gcsFiles {
var file *gcs.File
if file, err = gcsClient.Read(bucket + "/" + obj.Path); err != nil {
return fmt.Errorf("failed to start reading %s: %w", obj.Path, err)
}
var readCloser io.ReadCloser
if readCloser, err = file.Reader(); err != nil {
return fmt.Errorf("failed to get %s reader: %w", obj.Path, err)
}
r.closers = append(r.closers, readCloser)
r.gcsFiles = append(r.gcsFiles, readCloser)
}
return nil
}
func (r *bqCSVReader) Reader() (io.Reader, error) {
var readers []io.Reader
for _, file := range r.gcsFiles {
gzipReaderCloser, err := gzip.NewReader(file)
if err != nil {
return nil, fmt.Errorf("err calling gzip.NewReader: %w", err)
}
r.closers = append(r.closers, gzipReaderCloser)
readers = append(readers, gzipReaderCloser)
}
return io.MultiReader(readers...), nil
}
|