aboutsummaryrefslogtreecommitdiffstats
path: root/pkg/covermerger/bq_csv_reader.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/covermerger/bq_csv_reader.go')
-rw-r--r--pkg/covermerger/bq_csv_reader.go112
1 files changed, 70 insertions, 42 deletions
diff --git a/pkg/covermerger/bq_csv_reader.go b/pkg/covermerger/bq_csv_reader.go
index 483af792e..5933b4ab4 100644
--- a/pkg/covermerger/bq_csv_reader.go
+++ b/pkg/covermerger/bq_csv_reader.go
@@ -16,28 +16,13 @@ import (
"github.com/google/uuid"
)
-type bqCSVReader struct {
- closers []io.Closer
- gcsFiles []io.Reader
-}
-
-func (r *bqCSVReader) Close() {
- for _, c := range r.closers {
- c.Close()
- }
-}
-
-func MakeBQCSVReader() *bqCSVReader {
- return &bqCSVReader{}
-}
-
-func (r *bqCSVReader) InitNsRecords(ctx context.Context, ns, filePath, commit string, from, to civil.Date) error {
+func InitNsRecords(ctx context.Context, ns, filePath, commit string, from, to civil.Date) (io.ReadCloser, error) {
if err := validator.AnyError("input validation failed",
validator.NamespaceName(ns),
validator.AnyOk(validator.EmptyStr(filePath), validator.KernelFilePath(filePath)),
validator.AnyOk(validator.EmptyStr(commit), validator.CommitHash(commit)),
); err != nil {
- return err
+ return nil, err
}
sessionUUID := uuid.New().String()
gsBucket := "syzbot-temp"
@@ -45,10 +30,10 @@ func (r *bqCSVReader) InitNsRecords(ctx context.Context, ns, filePath, commit st
gsURI := "gs://" + gsBucket + "/" + gsPath + "/*.csv.gz"
client, err := bigquery.NewClient(ctx, "syzkaller")
if err != nil {
- return fmt.Errorf("failed to initialize bigquery client: %w", err)
+ return nil, fmt.Errorf("failed to initialize bigquery client: %w", err)
}
if err := client.EnableStorageReadClient(ctx); err != nil {
- return fmt.Errorf("failed to client.EnableStorageReadClient: %w", err)
+ return nil, fmt.Errorf("failed to client.EnableStorageReadClient: %w", err)
}
selectCommit := ""
if commit != "" {
@@ -77,48 +62,91 @@ func (r *bqCSVReader) InitNsRecords(ctx context.Context, ns, filePath, commit st
`, gsURI, ns, from.String(), to.String(), filePath, selectCommit))
job, err := q.Run(ctx)
if err != nil {
- return fmt.Errorf("err during bigquery.Run: %w", err)
+ return nil, fmt.Errorf("err during bigquery.Run: %w", err)
}
status, err := job.Wait(ctx)
if err != nil {
- return fmt.Errorf("err waiting for the bigquery.Job: %w", err)
+ return nil, fmt.Errorf("err waiting for the bigquery.Job: %w", err)
}
if status.Err() != nil {
- return fmt.Errorf("bigquery job failed with status %w", status.Err())
+ return nil, fmt.Errorf("bigquery job failed with status %w", status.Err())
}
- return r.initGCSFileReaders(ctx, gsBucket, gsPath)
+ return initGCSMultiReader(ctx, gsBucket, gsPath)
}
-func (r *bqCSVReader) initGCSFileReaders(ctx context.Context, bucket, path string) error {
+func initGCSMultiReader(ctx context.Context, bucket, path string) (io.ReadCloser, error) {
var gcsClient gcs.Client
var err error
if gcsClient, err = gcs.NewClient(ctx); err != nil {
- return fmt.Errorf("err creating gcs client: %w", err)
+ return nil, fmt.Errorf("err creating gcs client: %w", err)
}
var gcsFiles []*gcs.Object
if gcsFiles, err = gcsClient.ListObjects(bucket + "/" + path); err != nil {
- return fmt.Errorf("err enumerating gcs files: %w", err)
+ return nil, fmt.Errorf("err enumerating gcs files: %w", err)
}
+ paths := []string{}
for _, obj := range gcsFiles {
- var readCloser io.ReadCloser
- if readCloser, err = gcsClient.FileReader(bucket + "/" + obj.Path); err != nil {
- return fmt.Errorf("failed to get %s reader: %w", obj.Path, err)
- }
- r.closers = append(r.closers, readCloser)
- r.gcsFiles = append(r.gcsFiles, readCloser)
+ paths = append(paths, bucket+"/"+obj.Path)
}
- return nil
+ return &gcsGZIPMultiReader{
+ gcsClient: gcsClient,
+ gcsFiles: paths,
+ }, nil
}
-func (r *bqCSVReader) Reader() (io.Reader, error) {
- var readers []io.Reader
- for _, file := range r.gcsFiles {
- gzipReaderCloser, err := gzip.NewReader(file)
- if err != nil {
- return nil, fmt.Errorf("err calling gzip.NewReader: %w", err)
+type gcsGZIPMultiReader struct {
+ gcsClient gcs.Client
+ gcsFiles []string
+
+ curFileReader io.ReadCloser
+ curGZReadCloser io.ReadCloser
+}
+
+func (mr *gcsGZIPMultiReader) Read(p []byte) (int, error) {
+ for len(mr.gcsFiles) > 0 {
+ if mr.curFileReader == nil {
+ var err error
+ if mr.curFileReader, err = mr.gcsClient.FileReader(mr.gcsFiles[0]); err != nil {
+ return 0, fmt.Errorf("failed to get %s reader: %w", mr.gcsFiles[0], err)
+ }
+ if mr.curGZReadCloser, err = gzip.NewReader(mr.curFileReader); err != nil {
+ mr.curGZReadCloser = nil // gzip.NewReader returns *Reader(nil) on corrupted header
+ return 0, fmt.Errorf("err calling gzip.NewReader: %w", err)
+ }
+ }
+ n, err := mr.curGZReadCloser.Read(p)
+ if err == io.EOF {
+ mr.gcsFiles = mr.gcsFiles[1:]
+ if err := mr.Close(); err != nil {
+ return 0, fmt.Errorf("mr.Close: %w", err)
+ }
+ }
+ if n > 0 || err != io.EOF {
+ if err == io.EOF && len(mr.gcsFiles) > 0 {
+ // Don't return EOF yet. More readers remain.
+ err = nil
+ }
+ return n, err
}
- r.closers = append(r.closers, gzipReaderCloser)
- readers = append(readers, gzipReaderCloser)
}
- return io.MultiReader(readers...), nil
+ return 0, io.EOF
+}
+
+func (mr *gcsGZIPMultiReader) Close() error {
+ var err1, err2 error
+ if mr.curGZReadCloser != nil {
+ err1 = mr.curGZReadCloser.Close()
+ }
+ if mr.curFileReader != nil {
+ err2 = mr.curFileReader.Close()
+ }
+ mr.curFileReader = nil
+ mr.curGZReadCloser = nil
+ if err1 != nil {
+ return fmt.Errorf("mr.curGZReadCloser.Close: %w", err1)
+ }
+ if err2 != nil {
+ return fmt.Errorf("mr.curFileReader.Close: %w", err2)
+ }
+ return nil
}