diff options
| author | Taras Madan <tarasmadan@google.com> | 2025-04-02 20:28:15 +0200 |
|---|---|---|
| committer | Taras Madan <tarasmadan@google.com> | 2025-04-04 14:52:42 +0000 |
| commit | c53ea9c902c6745d750c5700631e1556716f4cd4 (patch) | |
| tree | 855f9483db0aeedee6fe7dbd34e746356db13085 /pkg | |
| parent | 1c4febdbbe8f34403baef506b22cd689576927f6 (diff) | |
pkg/covermerger: read gcs files sequentially
Quarter long aggregation means thousands of gzip files.
Opening all the files in parallel we struggle from:
1. Memory overhead.
2. GCS API errors. It can't read Attrs for 1500+ files.
Diffstat (limited to 'pkg')
| -rw-r--r-- | pkg/cover/file.go | 16 | ||||
| -rw-r--r-- | pkg/covermerger/bq_csv_reader.go | 112 | ||||
| -rw-r--r-- | pkg/covermerger/bq_csv_reader_test.go | 117 |
3 files changed, 190 insertions, 55 deletions
diff --git a/pkg/cover/file.go b/pkg/cover/file.go index f96deb677..97b6be6b7 100644 --- a/pkg/cover/file.go +++ b/pkg/cover/file.go @@ -63,21 +63,11 @@ func GetMergeResult(c context.Context, ns, repo, forCommit, sourceCommit, filePa } fromDate, toDate := tp.DatesFromTo() - dbReader := covermerger.MakeBQCSVReader() - if err := dbReader.InitNsRecords(c, - ns, - filePath, - sourceCommit, - fromDate, - toDate, - ); err != nil { - return nil, fmt.Errorf("failed to dbReader.InitNsRecords: %w", err) - } - defer dbReader.Close() - csvReader, err := dbReader.Reader() + csvReader, err := covermerger.InitNsRecords(c, ns, filePath, sourceCommit, fromDate, toDate) if err != nil { - return nil, fmt.Errorf("failed to dbReader.Reader: %w", err) + return nil, fmt.Errorf("failed to covermerger.InitNsRecords: %w", err) } + defer csvReader.Close() ch := make(chan *covermerger.FileMergeResult, 1) if err := covermerger.MergeCSVData(c, config, csvReader, ch); err != nil { diff --git a/pkg/covermerger/bq_csv_reader.go b/pkg/covermerger/bq_csv_reader.go index 483af792e..5933b4ab4 100644 --- a/pkg/covermerger/bq_csv_reader.go +++ b/pkg/covermerger/bq_csv_reader.go @@ -16,28 +16,13 @@ import ( "github.com/google/uuid" ) -type bqCSVReader struct { - closers []io.Closer - gcsFiles []io.Reader -} - -func (r *bqCSVReader) Close() { - for _, c := range r.closers { - c.Close() - } -} - -func MakeBQCSVReader() *bqCSVReader { - return &bqCSVReader{} -} - -func (r *bqCSVReader) InitNsRecords(ctx context.Context, ns, filePath, commit string, from, to civil.Date) error { +func InitNsRecords(ctx context.Context, ns, filePath, commit string, from, to civil.Date) (io.ReadCloser, error) { if err := validator.AnyError("input validation failed", validator.NamespaceName(ns), validator.AnyOk(validator.EmptyStr(filePath), validator.KernelFilePath(filePath)), validator.AnyOk(validator.EmptyStr(commit), validator.CommitHash(commit)), ); err != nil { - return err + return nil, err } sessionUUID := uuid.New().String() gsBucket := "syzbot-temp" @@ -45,10 +30,10 @@ func (r *bqCSVReader) InitNsRecords(ctx context.Context, ns, filePath, commit st gsURI := "gs://" + gsBucket + "/" + gsPath + "/*.csv.gz" client, err := bigquery.NewClient(ctx, "syzkaller") if err != nil { - return fmt.Errorf("failed to initialize bigquery client: %w", err) + return nil, fmt.Errorf("failed to initialize bigquery client: %w", err) } if err := client.EnableStorageReadClient(ctx); err != nil { - return fmt.Errorf("failed to client.EnableStorageReadClient: %w", err) + return nil, fmt.Errorf("failed to client.EnableStorageReadClient: %w", err) } selectCommit := "" if commit != "" { @@ -77,48 +62,91 @@ func (r *bqCSVReader) InitNsRecords(ctx context.Context, ns, filePath, commit st `, gsURI, ns, from.String(), to.String(), filePath, selectCommit)) job, err := q.Run(ctx) if err != nil { - return fmt.Errorf("err during bigquery.Run: %w", err) + return nil, fmt.Errorf("err during bigquery.Run: %w", err) } status, err := job.Wait(ctx) if err != nil { - return fmt.Errorf("err waiting for the bigquery.Job: %w", err) + return nil, fmt.Errorf("err waiting for the bigquery.Job: %w", err) } if status.Err() != nil { - return fmt.Errorf("bigquery job failed with status %w", status.Err()) + return nil, fmt.Errorf("bigquery job failed with status %w", status.Err()) } - return r.initGCSFileReaders(ctx, gsBucket, gsPath) + return initGCSMultiReader(ctx, gsBucket, gsPath) } -func (r *bqCSVReader) initGCSFileReaders(ctx context.Context, bucket, path string) error { +func initGCSMultiReader(ctx context.Context, bucket, path string) (io.ReadCloser, error) { var gcsClient gcs.Client var err error if gcsClient, err = gcs.NewClient(ctx); err != nil { - return fmt.Errorf("err creating gcs client: %w", err) + return nil, fmt.Errorf("err creating gcs client: %w", err) } var gcsFiles []*gcs.Object if gcsFiles, err = gcsClient.ListObjects(bucket + "/" + path); err != nil { - return fmt.Errorf("err enumerating gcs files: %w", err) + return nil, fmt.Errorf("err enumerating gcs files: %w", err) } + paths := []string{} for _, obj := range gcsFiles { - var readCloser io.ReadCloser - if readCloser, err = gcsClient.FileReader(bucket + "/" + obj.Path); err != nil { - return fmt.Errorf("failed to get %s reader: %w", obj.Path, err) - } - r.closers = append(r.closers, readCloser) - r.gcsFiles = append(r.gcsFiles, readCloser) + paths = append(paths, bucket+"/"+obj.Path) } - return nil + return &gcsGZIPMultiReader{ + gcsClient: gcsClient, + gcsFiles: paths, + }, nil } -func (r *bqCSVReader) Reader() (io.Reader, error) { - var readers []io.Reader - for _, file := range r.gcsFiles { - gzipReaderCloser, err := gzip.NewReader(file) - if err != nil { - return nil, fmt.Errorf("err calling gzip.NewReader: %w", err) +type gcsGZIPMultiReader struct { + gcsClient gcs.Client + gcsFiles []string + + curFileReader io.ReadCloser + curGZReadCloser io.ReadCloser +} + +func (mr *gcsGZIPMultiReader) Read(p []byte) (int, error) { + for len(mr.gcsFiles) > 0 { + if mr.curFileReader == nil { + var err error + if mr.curFileReader, err = mr.gcsClient.FileReader(mr.gcsFiles[0]); err != nil { + return 0, fmt.Errorf("failed to get %s reader: %w", mr.gcsFiles[0], err) + } + if mr.curGZReadCloser, err = gzip.NewReader(mr.curFileReader); err != nil { + mr.curGZReadCloser = nil // gzip.NewReader returns *Reader(nil) on corrupted header + return 0, fmt.Errorf("err calling gzip.NewReader: %w", err) + } + } + n, err := mr.curGZReadCloser.Read(p) + if err == io.EOF { + mr.gcsFiles = mr.gcsFiles[1:] + if err := mr.Close(); err != nil { + return 0, fmt.Errorf("mr.Close: %w", err) + } + } + if n > 0 || err != io.EOF { + if err == io.EOF && len(mr.gcsFiles) > 0 { + // Don't return EOF yet. More readers remain. + err = nil + } + return n, err } - r.closers = append(r.closers, gzipReaderCloser) - readers = append(readers, gzipReaderCloser) } - return io.MultiReader(readers...), nil + return 0, io.EOF +} + +func (mr *gcsGZIPMultiReader) Close() error { + var err1, err2 error + if mr.curGZReadCloser != nil { + err1 = mr.curGZReadCloser.Close() + } + if mr.curFileReader != nil { + err2 = mr.curFileReader.Close() + } + mr.curFileReader = nil + mr.curGZReadCloser = nil + if err1 != nil { + return fmt.Errorf("mr.curGZReadCloser.Close: %w", err1) + } + if err2 != nil { + return fmt.Errorf("mr.curFileReader.Close: %w", err2) + } + return nil } diff --git a/pkg/covermerger/bq_csv_reader_test.go b/pkg/covermerger/bq_csv_reader_test.go new file mode 100644 index 000000000..6858acf30 --- /dev/null +++ b/pkg/covermerger/bq_csv_reader_test.go @@ -0,0 +1,117 @@ +// Copyright 2025 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package covermerger + +import ( + "bytes" + "compress/gzip" + "errors" + "fmt" + "io" + "testing" + + "github.com/google/syzkaller/pkg/gcs" + gcsmocks "github.com/google/syzkaller/pkg/gcs/mocks" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestGCSGZIPMultiReader_Read(t *testing.T) { + tests := []struct { + name string + inputFiles []string + inputBytes [][]byte + + wantBytes []byte + wantErr error + }{ + { + name: "single file, single read", + inputFiles: []string{"file1"}, + inputBytes: [][]byte{gzBytes("1")}, + wantBytes: []byte("1"), + wantErr: nil, + }, + { + name: "single file, multiple reads", + inputFiles: []string{"file1"}, + inputBytes: [][]byte{gzBytes("123")}, + wantBytes: []byte("123"), + wantErr: nil, + }, + { + name: "multiple files, multiple reads", + inputFiles: []string{"file1", "file2", "file3"}, + inputBytes: [][]byte{gzBytes("123"), gzBytes("456"), gzBytes("789")}, + wantBytes: []byte("123456789"), + wantErr: nil, + }, + { + name: "multiple files, badbytes", + inputFiles: []string{"file1", "file2", "file3"}, + inputBytes: [][]byte{gzBytes("123"), gzBytes("456"), []byte("789")}, + wantBytes: []byte("123456"), + wantErr: fmt.Errorf("err calling gzip.NewReader: %w", errors.New("unexpected EOF")), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + mr := &gcsGZIPMultiReader{ + gcsClient: makeGCSClientMock(t, test.inputFiles, test.inputBytes), + gcsFiles: test.inputFiles, + } + gotBytes, gotErr := io.ReadAll(mr) + assert.NoError(t, mr.Close()) + assert.Equal(t, test.wantErr, gotErr) + assert.Equal(t, test.wantBytes, gotBytes) + }) + } +} + +func makeGCSClientMock(t *testing.T, files []string, bytes [][]byte) gcs.Client { + gcsClientMock := gcsmocks.NewClient(t) + for i, file := range files { + rcMock := &readCloserMock{} + for _, b := range bytes[i] { + rcMock.On("Read", mock.Anything). + Run(func(args mock.Arguments) { + arg := args.Get(0).([]byte) + arg[0] = b + }). + Return(1, nil).Once() + } + rcMock.On("Read", mock.Anything). + Return(0, io.EOF). + On("Close"). + Return(nil).Once() + + gcsClientMock.EXPECT(). + FileReader(file). + Return(rcMock, nil) + } + return gcsClientMock +} + +type readCloserMock struct { + mock.Mock +} + +func (m *readCloserMock) Read(p []byte) (n int, err error) { + args := m.Called(p) + return args.Int(0), args.Error(1) +} + +func (m *readCloserMock) Close() (err error) { + args := m.Called() + return args.Error(0) +} + +func gzBytes(str string) []byte { + buf := &bytes.Buffer{} + gzw := gzip.NewWriter(buf) + gzw.Write([]byte(str)) + gzw.Close() + return buf.Bytes() +} |
