aboutsummaryrefslogtreecommitdiffstats
path: root/pkg/covermerger
diff options
context:
space:
mode:
authorTaras Madan <tarasmadan@google.com>2025-04-02 20:28:15 +0200
committerTaras Madan <tarasmadan@google.com>2025-04-04 14:52:42 +0000
commitc53ea9c902c6745d750c5700631e1556716f4cd4 (patch)
tree855f9483db0aeedee6fe7dbd34e746356db13085 /pkg/covermerger
parent1c4febdbbe8f34403baef506b22cd689576927f6 (diff)
pkg/covermerger: read gcs files sequentially
Quarter long aggregation means thousands of gzip files. Opening all the files in parallel we struggle from: 1. Memory overhead. 2. GCS API errors. It can't read Attrs for 1500+ files.
Diffstat (limited to 'pkg/covermerger')
-rw-r--r--pkg/covermerger/bq_csv_reader.go112
-rw-r--r--pkg/covermerger/bq_csv_reader_test.go117
2 files changed, 187 insertions, 42 deletions
diff --git a/pkg/covermerger/bq_csv_reader.go b/pkg/covermerger/bq_csv_reader.go
index 483af792e..5933b4ab4 100644
--- a/pkg/covermerger/bq_csv_reader.go
+++ b/pkg/covermerger/bq_csv_reader.go
@@ -16,28 +16,13 @@ import (
"github.com/google/uuid"
)
-type bqCSVReader struct {
- closers []io.Closer
- gcsFiles []io.Reader
-}
-
-func (r *bqCSVReader) Close() {
- for _, c := range r.closers {
- c.Close()
- }
-}
-
-func MakeBQCSVReader() *bqCSVReader {
- return &bqCSVReader{}
-}
-
-func (r *bqCSVReader) InitNsRecords(ctx context.Context, ns, filePath, commit string, from, to civil.Date) error {
+func InitNsRecords(ctx context.Context, ns, filePath, commit string, from, to civil.Date) (io.ReadCloser, error) {
if err := validator.AnyError("input validation failed",
validator.NamespaceName(ns),
validator.AnyOk(validator.EmptyStr(filePath), validator.KernelFilePath(filePath)),
validator.AnyOk(validator.EmptyStr(commit), validator.CommitHash(commit)),
); err != nil {
- return err
+ return nil, err
}
sessionUUID := uuid.New().String()
gsBucket := "syzbot-temp"
@@ -45,10 +30,10 @@ func (r *bqCSVReader) InitNsRecords(ctx context.Context, ns, filePath, commit st
gsURI := "gs://" + gsBucket + "/" + gsPath + "/*.csv.gz"
client, err := bigquery.NewClient(ctx, "syzkaller")
if err != nil {
- return fmt.Errorf("failed to initialize bigquery client: %w", err)
+ return nil, fmt.Errorf("failed to initialize bigquery client: %w", err)
}
if err := client.EnableStorageReadClient(ctx); err != nil {
- return fmt.Errorf("failed to client.EnableStorageReadClient: %w", err)
+ return nil, fmt.Errorf("failed to client.EnableStorageReadClient: %w", err)
}
selectCommit := ""
if commit != "" {
@@ -77,48 +62,91 @@ func (r *bqCSVReader) InitNsRecords(ctx context.Context, ns, filePath, commit st
`, gsURI, ns, from.String(), to.String(), filePath, selectCommit))
job, err := q.Run(ctx)
if err != nil {
- return fmt.Errorf("err during bigquery.Run: %w", err)
+ return nil, fmt.Errorf("err during bigquery.Run: %w", err)
}
status, err := job.Wait(ctx)
if err != nil {
- return fmt.Errorf("err waiting for the bigquery.Job: %w", err)
+ return nil, fmt.Errorf("err waiting for the bigquery.Job: %w", err)
}
if status.Err() != nil {
- return fmt.Errorf("bigquery job failed with status %w", status.Err())
+ return nil, fmt.Errorf("bigquery job failed with status %w", status.Err())
}
- return r.initGCSFileReaders(ctx, gsBucket, gsPath)
+ return initGCSMultiReader(ctx, gsBucket, gsPath)
}
-func (r *bqCSVReader) initGCSFileReaders(ctx context.Context, bucket, path string) error {
+func initGCSMultiReader(ctx context.Context, bucket, path string) (io.ReadCloser, error) {
var gcsClient gcs.Client
var err error
if gcsClient, err = gcs.NewClient(ctx); err != nil {
- return fmt.Errorf("err creating gcs client: %w", err)
+ return nil, fmt.Errorf("err creating gcs client: %w", err)
}
var gcsFiles []*gcs.Object
if gcsFiles, err = gcsClient.ListObjects(bucket + "/" + path); err != nil {
- return fmt.Errorf("err enumerating gcs files: %w", err)
+ return nil, fmt.Errorf("err enumerating gcs files: %w", err)
}
+ paths := []string{}
for _, obj := range gcsFiles {
- var readCloser io.ReadCloser
- if readCloser, err = gcsClient.FileReader(bucket + "/" + obj.Path); err != nil {
- return fmt.Errorf("failed to get %s reader: %w", obj.Path, err)
- }
- r.closers = append(r.closers, readCloser)
- r.gcsFiles = append(r.gcsFiles, readCloser)
+ paths = append(paths, bucket+"/"+obj.Path)
}
- return nil
+ return &gcsGZIPMultiReader{
+ gcsClient: gcsClient,
+ gcsFiles: paths,
+ }, nil
}
-func (r *bqCSVReader) Reader() (io.Reader, error) {
- var readers []io.Reader
- for _, file := range r.gcsFiles {
- gzipReaderCloser, err := gzip.NewReader(file)
- if err != nil {
- return nil, fmt.Errorf("err calling gzip.NewReader: %w", err)
+type gcsGZIPMultiReader struct {
+ gcsClient gcs.Client
+ gcsFiles []string
+
+ curFileReader io.ReadCloser
+ curGZReadCloser io.ReadCloser
+}
+
+func (mr *gcsGZIPMultiReader) Read(p []byte) (int, error) {
+ for len(mr.gcsFiles) > 0 {
+ if mr.curFileReader == nil {
+ var err error
+ if mr.curFileReader, err = mr.gcsClient.FileReader(mr.gcsFiles[0]); err != nil {
+ return 0, fmt.Errorf("failed to get %s reader: %w", mr.gcsFiles[0], err)
+ }
+ if mr.curGZReadCloser, err = gzip.NewReader(mr.curFileReader); err != nil {
+ mr.curGZReadCloser = nil // gzip.NewReader returns *Reader(nil) on corrupted header
+ return 0, fmt.Errorf("err calling gzip.NewReader: %w", err)
+ }
+ }
+ n, err := mr.curGZReadCloser.Read(p)
+ if err == io.EOF {
+ mr.gcsFiles = mr.gcsFiles[1:]
+ if err := mr.Close(); err != nil {
+ return 0, fmt.Errorf("mr.Close: %w", err)
+ }
+ }
+ if n > 0 || err != io.EOF {
+ if err == io.EOF && len(mr.gcsFiles) > 0 {
+ // Don't return EOF yet. More readers remain.
+ err = nil
+ }
+ return n, err
}
- r.closers = append(r.closers, gzipReaderCloser)
- readers = append(readers, gzipReaderCloser)
}
- return io.MultiReader(readers...), nil
+ return 0, io.EOF
+}
+
+func (mr *gcsGZIPMultiReader) Close() error {
+ var err1, err2 error
+ if mr.curGZReadCloser != nil {
+ err1 = mr.curGZReadCloser.Close()
+ }
+ if mr.curFileReader != nil {
+ err2 = mr.curFileReader.Close()
+ }
+ mr.curFileReader = nil
+ mr.curGZReadCloser = nil
+ if err1 != nil {
+ return fmt.Errorf("mr.curGZReadCloser.Close: %w", err1)
+ }
+ if err2 != nil {
+ return fmt.Errorf("mr.curFileReader.Close: %w", err2)
+ }
+ return nil
}
diff --git a/pkg/covermerger/bq_csv_reader_test.go b/pkg/covermerger/bq_csv_reader_test.go
new file mode 100644
index 000000000..6858acf30
--- /dev/null
+++ b/pkg/covermerger/bq_csv_reader_test.go
@@ -0,0 +1,117 @@
+// Copyright 2025 syzkaller project authors. All rights reserved.
+// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
+
+package covermerger
+
+import (
+ "bytes"
+ "compress/gzip"
+ "errors"
+ "fmt"
+ "io"
+ "testing"
+
+ "github.com/google/syzkaller/pkg/gcs"
+ gcsmocks "github.com/google/syzkaller/pkg/gcs/mocks"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/mock"
+)
+
+func TestGCSGZIPMultiReader_Read(t *testing.T) {
+ tests := []struct {
+ name string
+ inputFiles []string
+ inputBytes [][]byte
+
+ wantBytes []byte
+ wantErr error
+ }{
+ {
+ name: "single file, single read",
+ inputFiles: []string{"file1"},
+ inputBytes: [][]byte{gzBytes("1")},
+ wantBytes: []byte("1"),
+ wantErr: nil,
+ },
+ {
+ name: "single file, multiple reads",
+ inputFiles: []string{"file1"},
+ inputBytes: [][]byte{gzBytes("123")},
+ wantBytes: []byte("123"),
+ wantErr: nil,
+ },
+ {
+ name: "multiple files, multiple reads",
+ inputFiles: []string{"file1", "file2", "file3"},
+ inputBytes: [][]byte{gzBytes("123"), gzBytes("456"), gzBytes("789")},
+ wantBytes: []byte("123456789"),
+ wantErr: nil,
+ },
+ {
+ name: "multiple files, badbytes",
+ inputFiles: []string{"file1", "file2", "file3"},
+ inputBytes: [][]byte{gzBytes("123"), gzBytes("456"), []byte("789")},
+ wantBytes: []byte("123456"),
+ wantErr: fmt.Errorf("err calling gzip.NewReader: %w", errors.New("unexpected EOF")),
+ },
+ }
+
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ mr := &gcsGZIPMultiReader{
+ gcsClient: makeGCSClientMock(t, test.inputFiles, test.inputBytes),
+ gcsFiles: test.inputFiles,
+ }
+ gotBytes, gotErr := io.ReadAll(mr)
+ assert.NoError(t, mr.Close())
+ assert.Equal(t, test.wantErr, gotErr)
+ assert.Equal(t, test.wantBytes, gotBytes)
+ })
+ }
+}
+
+func makeGCSClientMock(t *testing.T, files []string, bytes [][]byte) gcs.Client {
+ gcsClientMock := gcsmocks.NewClient(t)
+ for i, file := range files {
+ rcMock := &readCloserMock{}
+ for _, b := range bytes[i] {
+ rcMock.On("Read", mock.Anything).
+ Run(func(args mock.Arguments) {
+ arg := args.Get(0).([]byte)
+ arg[0] = b
+ }).
+ Return(1, nil).Once()
+ }
+ rcMock.On("Read", mock.Anything).
+ Return(0, io.EOF).
+ On("Close").
+ Return(nil).Once()
+
+ gcsClientMock.EXPECT().
+ FileReader(file).
+ Return(rcMock, nil)
+ }
+ return gcsClientMock
+}
+
+type readCloserMock struct {
+ mock.Mock
+}
+
+func (m *readCloserMock) Read(p []byte) (n int, err error) {
+ args := m.Called(p)
+ return args.Int(0), args.Error(1)
+}
+
+func (m *readCloserMock) Close() (err error) {
+ args := m.Called()
+ return args.Error(0)
+}
+
+func gzBytes(str string) []byte {
+ buf := &bytes.Buffer{}
+ gzw := gzip.NewWriter(buf)
+ gzw.Write([]byte(str))
+ gzw.Close()
+ return buf.Bytes()
+}