aboutsummaryrefslogtreecommitdiffstats
path: root/pkg/covermerger
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/covermerger')
-rw-r--r--pkg/covermerger/covermerger.go41
-rw-r--r--pkg/covermerger/covermerger_test.go14
2 files changed, 29 insertions, 26 deletions
diff --git a/pkg/covermerger/covermerger.go b/pkg/covermerger/covermerger.go
index 394b606de..0f6ff43e4 100644
--- a/pkg/covermerger/covermerger.go
+++ b/pkg/covermerger/covermerger.go
@@ -10,7 +10,6 @@ import (
"fmt"
"io"
"strconv"
- "sync"
"github.com/google/syzkaller/pkg/log"
"golang.org/x/exp/maps"
@@ -127,13 +126,16 @@ func isSchema(fields, schema []string) bool {
return true
}
-type FilesMergeResults map[string]*MergeResult
+type FileMergeResult struct {
+ FilePath string
+ *MergeResult
+}
-func MergeCSVData(config *Config, reader io.Reader) (FilesMergeResults, error) {
+func MergeCSVData(config *Config, reader io.Reader, results chan<- *FileMergeResult) error {
var schema []string
csvReader := csv.NewReader(reader)
if fields, err := csvReader.Read(); err != nil {
- return nil, fmt.Errorf("failed to read schema: %w", err)
+ return fmt.Errorf("failed to read schema: %w", err)
} else {
schema = fields
}
@@ -163,13 +165,13 @@ func MergeCSVData(config *Config, reader io.Reader) (FilesMergeResults, error) {
}
errStreamChan <- nil
}()
- mergeResult, errMerging := mergeChanData(config, recordsChan)
+ errMerging := mergeChanData(config, recordsChan, results)
errStream := <-errStreamChan
if errMerging != nil || errStream != nil {
- return nil, fmt.Errorf("errors merging stream data:\nmerger err: %w\nstream reader err: %w",
+ return fmt.Errorf("errors merging stream data:\nmerger err: %w\nstream reader err: %w",
errMerging, errStream)
}
- return mergeResult, nil
+ return nil
}
type FileRecords struct {
@@ -177,33 +179,26 @@ type FileRecords struct {
records []*FileRecord
}
-func mergeChanData(c *Config, recordChan <-chan *FileRecord) (FilesMergeResults, error) {
+func mergeChanData(c *Config, recordChan <-chan *FileRecord, results chan<- *FileMergeResult) error {
g, ctx := errgroup.WithContext(context.Background())
frecordChan := groupFileRecords(recordChan, ctx)
- stat := make(FilesMergeResults)
- var mu sync.Mutex
+
for i := 0; i < c.Jobs; i++ {
g.Go(func() error {
for frecord := range frecordChan {
- if mr, err := batchFileData(c, frecord.fileName, frecord.records); err != nil {
+ mr, err := batchFileData(c, frecord.fileName, frecord.records)
+ if err != nil {
return fmt.Errorf("failed to batchFileData(%s): %w", frecord.fileName, err)
- } else {
- mu.Lock()
- if _, exist := stat[frecord.fileName]; exist {
- mu.Unlock()
- return fmt.Errorf("file %s was already processed", frecord.fileName)
- }
- stat[frecord.fileName] = mr
- mu.Unlock()
+ }
+ results <- &FileMergeResult{
+ FilePath: frecord.fileName,
+ MergeResult: mr,
}
}
return nil
})
}
- if err := g.Wait(); err != nil {
- return nil, err
- }
- return stat, nil
+ return g.Wait()
}
func groupFileRecords(recordChan <-chan *FileRecord, ctx context.Context) chan FileRecords {
diff --git a/pkg/covermerger/covermerger_test.go b/pkg/covermerger/covermerger_test.go
index b63ae36be..4580c114f 100644
--- a/pkg/covermerger/covermerger_test.go
+++ b/pkg/covermerger/covermerger_test.go
@@ -172,7 +172,14 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4,
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
- aggregation, err := MergeCSVData(
+ ch := make(chan *FileMergeResult)
+ aggregation := make(map[string]*MergeResult)
+ go func() {
+ for fmr := range ch {
+ aggregation[fmr.FilePath] = fmr.MergeResult
+ }
+ }()
+ err := MergeCSVData(
&Config{
Jobs: 2,
skipRepoClone: true,
@@ -183,10 +190,11 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4,
FileVersProvider: &fileVersProviderMock{Workdir: test.workdir},
},
strings.NewReader(test.bqTable),
+ ch,
)
assert.Nil(t, err)
- var expectedAggregation FilesMergeResults
+ var expectedAggregation map[string]*MergeResult
assert.Nil(t, json.Unmarshal([]byte(test.simpleAggregation), &expectedAggregation))
if !test.checkDetails {
ignoreLineDetailsInTest(aggregation)
@@ -196,7 +204,7 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4,
}
}
-func ignoreLineDetailsInTest(results FilesMergeResults) {
+func ignoreLineDetailsInTest(results map[string]*MergeResult) {
for _, mr := range results {
mr.LineDetails = nil
}