aboutsummaryrefslogtreecommitdiffstats
path: root/pkg/covermerger/bq_csv_reader.go
blob: d1bf778810363bf963a2cbd729d6df6977dce7b1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
// Copyright 2024 syzkaller project authors. All rights reserved.
// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.

package covermerger

import (
	"compress/gzip"
	"context"
	"fmt"
	"io"
	"regexp"

	"cloud.google.com/go/bigquery"
	"cloud.google.com/go/civil"
	"github.com/google/syzkaller/pkg/gcs"
	"github.com/google/uuid"
)

var allowedFilePath = regexp.MustCompile(`^[./_a-zA-Z0-9]*$`)

func isAllowedFilePath(s string) bool {
	return allowedFilePath.MatchString(s)
}

type bqCSVReader struct {
	closers  []io.Closer
	gcsFiles []io.Reader
}

func (r *bqCSVReader) Close() {
	for _, c := range r.closers {
		c.Close()
	}
}

func MakeBQCSVReader() *bqCSVReader {
	return &bqCSVReader{}
}

func (r *bqCSVReader) InitNsRecords(ctx context.Context, ns, filePath string, from, to civil.Date) error {
	if !isAllowedFilePath(filePath) {
		return fmt.Errorf("wrong file path '%s'", filePath)
	}
	sessionUUID := uuid.New().String()
	gsBucket := "syzbot-temp"
	gsPath := fmt.Sprintf("bq-exports/%s", sessionUUID)
	gsURI := "gs://" + gsBucket + "/" + gsPath + "/*.csv.gz"
	client, err := bigquery.NewClient(ctx, "syzkaller")
	client.EnableStorageReadClient(ctx)
	if err != nil {
		return fmt.Errorf("failed to initialize bigquery client: %w", err)
	}
	q := client.Query(fmt.Sprintf(`
		EXPORT DATA
			OPTIONS (
				uri = "%s",
				format = "CSV",
				overwrite = true,
				header = true,
				compression = "GZIP")
			AS (
				SELECT
					kernel_repo, kernel_branch, kernel_commit, file_path, sl, SUM(hit_count) as hit_count
				FROM syzkaller.syzbot_coverage.%s
				WHERE
					TIMESTAMP_TRUNC(timestamp, DAY) >= "%s" AND
					TIMESTAMP_TRUNC(timestamp, DAY) <= "%s" AND
					version = 1 AND
					starts_with(file_path, "%s")
				GROUP BY file_path, kernel_commit, kernel_repo, kernel_branch, sl
				ORDER BY file_path
			);
	`, gsURI, ns, from.String(), to.String(), filePath))
	job, err := q.Run(ctx)
	if err != nil {
		return fmt.Errorf("err during bigquery.Run: %w", err)
	}
	status, err := job.Wait(ctx)
	if err != nil {
		return fmt.Errorf("err waiting for the bigquery.Job: %w", err)
	}
	if status.Err() != nil {
		return fmt.Errorf("bigquery job failed with status %w", status.Err())
	}
	return r.initGCSFileReaders(ctx, gsBucket, gsPath)
}

func (r *bqCSVReader) initGCSFileReaders(ctx context.Context, bucket, path string) error {
	var gcsClient *gcs.Client
	var err error
	if gcsClient, err = gcs.NewClient(ctx); err != nil {
		return fmt.Errorf("err creating gcs client: %w", err)
	}
	var gcsFiles []*gcs.Object
	if gcsFiles, err = gcsClient.ListObjects(bucket + "/" + path); err != nil {
		return fmt.Errorf("err enumerating gcs files: %w", err)
	}
	for _, obj := range gcsFiles {
		var file *gcs.File
		if file, err = gcsClient.Read(bucket + "/" + obj.Path); err != nil {
			return fmt.Errorf("failed to start reading %s: %w", obj.Path, err)
		}
		var readCloser io.ReadCloser
		if readCloser, err = file.Reader(); err != nil {
			return fmt.Errorf("failed to get %s reader: %w", obj.Path, err)
		}
		r.closers = append(r.closers, readCloser)
		r.gcsFiles = append(r.gcsFiles, readCloser)
	}
	return nil
}

func (r *bqCSVReader) Reader() (io.Reader, error) {
	var readers []io.Reader
	for _, file := range r.gcsFiles {
		gzipReaderCloser, err := gzip.NewReader(file)
		if err != nil {
			return nil, fmt.Errorf("err calling gzip.NewReader: %w", err)
		}
		r.closers = append(r.closers, gzipReaderCloser)
		readers = append(readers, gzipReaderCloser)
	}
	return io.MultiReader(readers...), nil
}