aboutsummaryrefslogtreecommitdiffstats
path: root/pkg
diff options
context:
space:
mode:
Diffstat (limited to 'pkg')
-rw-r--r--pkg/cover/file.go2
-rw-r--r--pkg/covermerger/covermerger.go49
-rw-r--r--pkg/covermerger/covermerger_test.go1
3 files changed, 33 insertions, 19 deletions
diff --git a/pkg/cover/file.go b/pkg/cover/file.go
index 2939935b9..a0561dfe7 100644
--- a/pkg/cover/file.go
+++ b/pkg/cover/file.go
@@ -79,7 +79,7 @@ func GetMergeResult(c context.Context, ns, repo, forCommit, sourceCommit, filePa
}
ch := make(chan *covermerger.FileMergeResult, 1)
- if err := covermerger.MergeCSVData(config, csvReader, ch); err != nil {
+ if err := covermerger.MergeCSVData(c, config, csvReader, ch); err != nil {
return nil, fmt.Errorf("error merging coverage: %w", err)
}
diff --git a/pkg/covermerger/covermerger.go b/pkg/covermerger/covermerger.go
index 839a85a8e..11ed5c043 100644
--- a/pkg/covermerger/covermerger.go
+++ b/pkg/covermerger/covermerger.go
@@ -59,11 +59,11 @@ type FileCoverageMerger interface {
// Returns (totalInstrumentedLines, totalCoveredLines, error).
func MergeCSVWriteJSONL(config *Config, descr *coveragedb.HistoryRecord, csvReader io.Reader, w io.Writer,
) (int, int, error) {
- eg := errgroup.Group{}
+ eg, c := errgroup.WithContext(context.Background())
mergeResults := make(chan *FileMergeResult)
eg.Go(func() error {
defer close(mergeResults)
- if err := MergeCSVData(config, csvReader, mergeResults); err != nil {
+ if err := MergeCSVData(c, config, csvReader, mergeResults); err != nil {
return fmt.Errorf("covermerger.MergeCSVData: %w", err)
}
return nil
@@ -223,7 +223,7 @@ type FileMergeResult struct {
*MergeResult
}
-func MergeCSVData(config *Config, reader io.Reader, results chan<- *FileMergeResult) error {
+func MergeCSVData(c context.Context, config *Config, reader io.Reader, results chan<- *FileMergeResult) error {
var schema []string
csvReader := csv.NewReader(reader)
if fields, err := csvReader.Read(); err != nil {
@@ -231,10 +231,11 @@ func MergeCSVData(config *Config, reader io.Reader, results chan<- *FileMergeRes
} else {
schema = fields
}
- errStreamChan := make(chan error, 1)
+ errStreamChan := make(chan error, 2)
recordsChan := make(chan *FileRecord)
go func() {
defer close(recordsChan)
+ defer func() { errStreamChan <- nil }()
for {
fields, err := csvReader.Read()
if err == io.EOF {
@@ -253,11 +254,14 @@ func MergeCSVData(config *Config, reader io.Reader, results chan<- *FileMergeRes
errStreamChan <- fmt.Errorf("makeRecord: %w", err)
return
}
- recordsChan <- record
+ select {
+ case <-c.Done():
+ return
+ case recordsChan <- record:
+ }
}
- errStreamChan <- nil
}()
- errMerging := mergeChanData(config, recordsChan, results)
+ errMerging := mergeChanData(c, config, recordsChan, results)
errStream := <-errStreamChan
if errMerging != nil || errStream != nil {
return fmt.Errorf("errors merging stream data:\nmerger err: %w\nstream reader err: %w",
@@ -271,20 +275,24 @@ type FileRecords struct {
records []*FileRecord
}
-func mergeChanData(c *Config, recordChan <-chan *FileRecord, results chan<- *FileMergeResult) error {
- g, ctx := errgroup.WithContext(context.Background())
- frecordChan := groupFileRecords(recordChan, ctx)
+func mergeChanData(c context.Context, cfg *Config, recordChan <-chan *FileRecord, results chan<- *FileMergeResult,
+) error {
+ g := errgroup.Group{}
+ frecordChan := groupFileRecords(recordChan, c)
- for i := 0; i < c.Jobs; i++ {
+ for i := 0; i < cfg.Jobs; i++ {
g.Go(func() error {
for frecord := range frecordChan {
- mr, err := batchFileData(c, frecord.fileName, frecord.records)
+ mr, err := batchFileData(cfg, frecord.fileName, frecord.records)
if err != nil {
return fmt.Errorf("failed to batchFileData(%s): %w", frecord.fileName, err)
}
- results <- &FileMergeResult{
+ select {
+ case <-c.Done():
+ return nil
+ case results <- &FileMergeResult{
FilePath: frecord.fileName,
- MergeResult: mr,
+ MergeResult: mr}:
}
}
return nil
@@ -310,18 +318,23 @@ func groupFileRecords(recordChan <-chan *FileRecord, ctx context.Context) chan F
targetFile = curTargetFile
}
if curTargetFile != targetFile {
- frecordChan <- FileRecords{
+ select {
+ case <-ctx.Done():
+ return
+ case frecordChan <- FileRecords{
fileName: targetFile,
- records: records,
+ records: records}:
}
records = nil
targetFile = curTargetFile
}
records = append(records, record)
}
- frecordChan <- FileRecords{
+ select {
+ case <-ctx.Done():
+ case frecordChan <- FileRecords{
fileName: targetFile,
- records: records,
+ records: records}:
}
}()
return frecordChan
diff --git a/pkg/covermerger/covermerger_test.go b/pkg/covermerger/covermerger_test.go
index 29fee83a6..3d1b2d71a 100644
--- a/pkg/covermerger/covermerger_test.go
+++ b/pkg/covermerger/covermerger_test.go
@@ -246,6 +246,7 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4,
doneCh <- true
}()
assert.NoError(t, MergeCSVData(
+ context.Background(),
testConfig(test.baseRepo, test.baseCommit, test.workdir),
strings.NewReader(test.bqTable),
mergeResultsCh))