aboutsummaryrefslogtreecommitdiffstats
path: root/pkg/covermerger
diff options
context:
space:
mode:
authorTaras Madan <tarasmadan@google.com>2024-12-19 21:05:23 +0100
committerTaras Madan <tarasmadan@google.com>2024-12-19 21:57:15 +0000
commit0f61b4150f69bfee656d0398f8b4125a66a9512f (patch)
tree7cb40b53201338818416db3ff00d49c0aa6268f3 /pkg/covermerger
parentb8e81ee07dfe7221f7d4a54bb2a294ab84bed5f1 (diff)
pkg/covermerger: propagate context cancellation
The problem is the deadlock happening on GCS storage error. GCS client establishes the connection when it has enough data to write. It is approximately 16M. The error happens on io.Writer access in the middle of the merge. This error terminates errgroup goroutine. Other goroutines remain blocked. This commit propagates termination signal to other goroutines.
Diffstat (limited to 'pkg/covermerger')
-rw-r--r--pkg/covermerger/covermerger.go49
-rw-r--r--pkg/covermerger/covermerger_test.go1
2 files changed, 32 insertions, 18 deletions
diff --git a/pkg/covermerger/covermerger.go b/pkg/covermerger/covermerger.go
index 839a85a8e..11ed5c043 100644
--- a/pkg/covermerger/covermerger.go
+++ b/pkg/covermerger/covermerger.go
@@ -59,11 +59,11 @@ type FileCoverageMerger interface {
// Returns (totalInstrumentedLines, totalCoveredLines, error).
func MergeCSVWriteJSONL(config *Config, descr *coveragedb.HistoryRecord, csvReader io.Reader, w io.Writer,
) (int, int, error) {
- eg := errgroup.Group{}
+ eg, c := errgroup.WithContext(context.Background())
mergeResults := make(chan *FileMergeResult)
eg.Go(func() error {
defer close(mergeResults)
- if err := MergeCSVData(config, csvReader, mergeResults); err != nil {
+ if err := MergeCSVData(c, config, csvReader, mergeResults); err != nil {
return fmt.Errorf("covermerger.MergeCSVData: %w", err)
}
return nil
@@ -223,7 +223,7 @@ type FileMergeResult struct {
*MergeResult
}
-func MergeCSVData(config *Config, reader io.Reader, results chan<- *FileMergeResult) error {
+func MergeCSVData(c context.Context, config *Config, reader io.Reader, results chan<- *FileMergeResult) error {
var schema []string
csvReader := csv.NewReader(reader)
if fields, err := csvReader.Read(); err != nil {
@@ -231,10 +231,11 @@ func MergeCSVData(config *Config, reader io.Reader, results chan<- *FileMergeRes
} else {
schema = fields
}
- errStreamChan := make(chan error, 1)
+ errStreamChan := make(chan error, 2)
recordsChan := make(chan *FileRecord)
go func() {
defer close(recordsChan)
+ defer func() { errStreamChan <- nil }()
for {
fields, err := csvReader.Read()
if err == io.EOF {
@@ -253,11 +254,14 @@ func MergeCSVData(config *Config, reader io.Reader, results chan<- *FileMergeRes
errStreamChan <- fmt.Errorf("makeRecord: %w", err)
return
}
- recordsChan <- record
+ select {
+ case <-c.Done():
+ return
+ case recordsChan <- record:
+ }
}
- errStreamChan <- nil
}()
- errMerging := mergeChanData(config, recordsChan, results)
+ errMerging := mergeChanData(c, config, recordsChan, results)
errStream := <-errStreamChan
if errMerging != nil || errStream != nil {
return fmt.Errorf("errors merging stream data:\nmerger err: %w\nstream reader err: %w",
@@ -271,20 +275,24 @@ type FileRecords struct {
records []*FileRecord
}
-func mergeChanData(c *Config, recordChan <-chan *FileRecord, results chan<- *FileMergeResult) error {
- g, ctx := errgroup.WithContext(context.Background())
- frecordChan := groupFileRecords(recordChan, ctx)
+func mergeChanData(c context.Context, cfg *Config, recordChan <-chan *FileRecord, results chan<- *FileMergeResult,
+) error {
+ g := errgroup.Group{}
+ frecordChan := groupFileRecords(recordChan, c)
- for i := 0; i < c.Jobs; i++ {
+ for i := 0; i < cfg.Jobs; i++ {
g.Go(func() error {
for frecord := range frecordChan {
- mr, err := batchFileData(c, frecord.fileName, frecord.records)
+ mr, err := batchFileData(cfg, frecord.fileName, frecord.records)
if err != nil {
return fmt.Errorf("failed to batchFileData(%s): %w", frecord.fileName, err)
}
- results <- &FileMergeResult{
+ select {
+ case <-c.Done():
+ return nil
+ case results <- &FileMergeResult{
FilePath: frecord.fileName,
- MergeResult: mr,
+ MergeResult: mr}:
}
}
return nil
@@ -310,18 +318,23 @@ func groupFileRecords(recordChan <-chan *FileRecord, ctx context.Context) chan F
targetFile = curTargetFile
}
if curTargetFile != targetFile {
- frecordChan <- FileRecords{
+ select {
+ case <-ctx.Done():
+ return
+ case frecordChan <- FileRecords{
fileName: targetFile,
- records: records,
+ records: records}:
}
records = nil
targetFile = curTargetFile
}
records = append(records, record)
}
- frecordChan <- FileRecords{
+ select {
+ case <-ctx.Done():
+ case frecordChan <- FileRecords{
fileName: targetFile,
- records: records,
+ records: records}:
}
}()
return frecordChan
diff --git a/pkg/covermerger/covermerger_test.go b/pkg/covermerger/covermerger_test.go
index 29fee83a6..3d1b2d71a 100644
--- a/pkg/covermerger/covermerger_test.go
+++ b/pkg/covermerger/covermerger_test.go
@@ -246,6 +246,7 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4,
doneCh <- true
}()
assert.NoError(t, MergeCSVData(
+ context.Background(),
testConfig(test.baseRepo, test.baseCommit, test.workdir),
strings.NewReader(test.bqTable),
mergeResultsCh))