diff options
Diffstat (limited to 'pkg')
| -rw-r--r-- | pkg/cover/file.go | 2 | ||||
| -rw-r--r-- | pkg/covermerger/covermerger.go | 49 | ||||
| -rw-r--r-- | pkg/covermerger/covermerger_test.go | 1 |
3 files changed, 33 insertions, 19 deletions
diff --git a/pkg/cover/file.go b/pkg/cover/file.go index 2939935b9..a0561dfe7 100644 --- a/pkg/cover/file.go +++ b/pkg/cover/file.go @@ -79,7 +79,7 @@ func GetMergeResult(c context.Context, ns, repo, forCommit, sourceCommit, filePa } ch := make(chan *covermerger.FileMergeResult, 1) - if err := covermerger.MergeCSVData(config, csvReader, ch); err != nil { + if err := covermerger.MergeCSVData(c, config, csvReader, ch); err != nil { return nil, fmt.Errorf("error merging coverage: %w", err) } diff --git a/pkg/covermerger/covermerger.go b/pkg/covermerger/covermerger.go index 839a85a8e..11ed5c043 100644 --- a/pkg/covermerger/covermerger.go +++ b/pkg/covermerger/covermerger.go @@ -59,11 +59,11 @@ type FileCoverageMerger interface { // Returns (totalInstrumentedLines, totalCoveredLines, error). func MergeCSVWriteJSONL(config *Config, descr *coveragedb.HistoryRecord, csvReader io.Reader, w io.Writer, ) (int, int, error) { - eg := errgroup.Group{} + eg, c := errgroup.WithContext(context.Background()) mergeResults := make(chan *FileMergeResult) eg.Go(func() error { defer close(mergeResults) - if err := MergeCSVData(config, csvReader, mergeResults); err != nil { + if err := MergeCSVData(c, config, csvReader, mergeResults); err != nil { return fmt.Errorf("covermerger.MergeCSVData: %w", err) } return nil @@ -223,7 +223,7 @@ type FileMergeResult struct { *MergeResult } -func MergeCSVData(config *Config, reader io.Reader, results chan<- *FileMergeResult) error { +func MergeCSVData(c context.Context, config *Config, reader io.Reader, results chan<- *FileMergeResult) error { var schema []string csvReader := csv.NewReader(reader) if fields, err := csvReader.Read(); err != nil { @@ -231,10 +231,11 @@ func MergeCSVData(config *Config, reader io.Reader, results chan<- *FileMergeRes } else { schema = fields } - errStreamChan := make(chan error, 1) + errStreamChan := make(chan error, 2) recordsChan := make(chan *FileRecord) go func() { defer close(recordsChan) + defer func() { errStreamChan <- nil }() for { fields, err := csvReader.Read() if err == io.EOF { @@ -253,11 +254,14 @@ func MergeCSVData(config *Config, reader io.Reader, results chan<- *FileMergeRes errStreamChan <- fmt.Errorf("makeRecord: %w", err) return } - recordsChan <- record + select { + case <-c.Done(): + return + case recordsChan <- record: + } } - errStreamChan <- nil }() - errMerging := mergeChanData(config, recordsChan, results) + errMerging := mergeChanData(c, config, recordsChan, results) errStream := <-errStreamChan if errMerging != nil || errStream != nil { return fmt.Errorf("errors merging stream data:\nmerger err: %w\nstream reader err: %w", @@ -271,20 +275,24 @@ type FileRecords struct { records []*FileRecord } -func mergeChanData(c *Config, recordChan <-chan *FileRecord, results chan<- *FileMergeResult) error { - g, ctx := errgroup.WithContext(context.Background()) - frecordChan := groupFileRecords(recordChan, ctx) +func mergeChanData(c context.Context, cfg *Config, recordChan <-chan *FileRecord, results chan<- *FileMergeResult, +) error { + g := errgroup.Group{} + frecordChan := groupFileRecords(recordChan, c) - for i := 0; i < c.Jobs; i++ { + for i := 0; i < cfg.Jobs; i++ { g.Go(func() error { for frecord := range frecordChan { - mr, err := batchFileData(c, frecord.fileName, frecord.records) + mr, err := batchFileData(cfg, frecord.fileName, frecord.records) if err != nil { return fmt.Errorf("failed to batchFileData(%s): %w", frecord.fileName, err) } - results <- &FileMergeResult{ + select { + case <-c.Done(): + return nil + case results <- &FileMergeResult{ FilePath: frecord.fileName, - MergeResult: mr, + MergeResult: mr}: } } return nil @@ -310,18 +318,23 @@ func groupFileRecords(recordChan <-chan *FileRecord, ctx context.Context) chan F targetFile = curTargetFile } if curTargetFile != targetFile { - frecordChan <- FileRecords{ + select { + case <-ctx.Done(): + return + case frecordChan <- FileRecords{ fileName: targetFile, - records: records, + records: records}: } records = nil targetFile = curTargetFile } records = append(records, record) } - frecordChan <- FileRecords{ + select { + case <-ctx.Done(): + case frecordChan <- FileRecords{ fileName: targetFile, - records: records, + records: records}: } }() return frecordChan diff --git a/pkg/covermerger/covermerger_test.go b/pkg/covermerger/covermerger_test.go index 29fee83a6..3d1b2d71a 100644 --- a/pkg/covermerger/covermerger_test.go +++ b/pkg/covermerger/covermerger_test.go @@ -246,6 +246,7 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4, doneCh <- true }() assert.NoError(t, MergeCSVData( + context.Background(), testConfig(test.baseRepo, test.baseCommit, test.workdir), strings.NewReader(test.bqTable), mergeResultsCh)) |
