diff options
Diffstat (limited to 'pkg/covermerger')
| -rw-r--r-- | pkg/covermerger/covermerger.go | 41 | ||||
| -rw-r--r-- | pkg/covermerger/covermerger_test.go | 14 |
2 files changed, 29 insertions, 26 deletions
diff --git a/pkg/covermerger/covermerger.go b/pkg/covermerger/covermerger.go index 394b606de..0f6ff43e4 100644 --- a/pkg/covermerger/covermerger.go +++ b/pkg/covermerger/covermerger.go @@ -10,7 +10,6 @@ import ( "fmt" "io" "strconv" - "sync" "github.com/google/syzkaller/pkg/log" "golang.org/x/exp/maps" @@ -127,13 +126,16 @@ func isSchema(fields, schema []string) bool { return true } -type FilesMergeResults map[string]*MergeResult +type FileMergeResult struct { + FilePath string + *MergeResult +} -func MergeCSVData(config *Config, reader io.Reader) (FilesMergeResults, error) { +func MergeCSVData(config *Config, reader io.Reader, results chan<- *FileMergeResult) error { var schema []string csvReader := csv.NewReader(reader) if fields, err := csvReader.Read(); err != nil { - return nil, fmt.Errorf("failed to read schema: %w", err) + return fmt.Errorf("failed to read schema: %w", err) } else { schema = fields } @@ -163,13 +165,13 @@ func MergeCSVData(config *Config, reader io.Reader) (FilesMergeResults, error) { } errStreamChan <- nil }() - mergeResult, errMerging := mergeChanData(config, recordsChan) + errMerging := mergeChanData(config, recordsChan, results) errStream := <-errStreamChan if errMerging != nil || errStream != nil { - return nil, fmt.Errorf("errors merging stream data:\nmerger err: %w\nstream reader err: %w", + return fmt.Errorf("errors merging stream data:\nmerger err: %w\nstream reader err: %w", errMerging, errStream) } - return mergeResult, nil + return nil } type FileRecords struct { @@ -177,33 +179,26 @@ type FileRecords struct { records []*FileRecord } -func mergeChanData(c *Config, recordChan <-chan *FileRecord) (FilesMergeResults, error) { +func mergeChanData(c *Config, recordChan <-chan *FileRecord, results chan<- *FileMergeResult) error { g, ctx := errgroup.WithContext(context.Background()) frecordChan := groupFileRecords(recordChan, ctx) - stat := make(FilesMergeResults) - var mu sync.Mutex + for i := 0; i < c.Jobs; i++ { g.Go(func() error { for frecord := range frecordChan { - if mr, err := batchFileData(c, frecord.fileName, frecord.records); err != nil { + mr, err := batchFileData(c, frecord.fileName, frecord.records) + if err != nil { return fmt.Errorf("failed to batchFileData(%s): %w", frecord.fileName, err) - } else { - mu.Lock() - if _, exist := stat[frecord.fileName]; exist { - mu.Unlock() - return fmt.Errorf("file %s was already processed", frecord.fileName) - } - stat[frecord.fileName] = mr - mu.Unlock() + } + results <- &FileMergeResult{ + FilePath: frecord.fileName, + MergeResult: mr, } } return nil }) } - if err := g.Wait(); err != nil { - return nil, err - } - return stat, nil + return g.Wait() } func groupFileRecords(recordChan <-chan *FileRecord, ctx context.Context) chan FileRecords { diff --git a/pkg/covermerger/covermerger_test.go b/pkg/covermerger/covermerger_test.go index b63ae36be..4580c114f 100644 --- a/pkg/covermerger/covermerger_test.go +++ b/pkg/covermerger/covermerger_test.go @@ -172,7 +172,14 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - aggregation, err := MergeCSVData( + ch := make(chan *FileMergeResult) + aggregation := make(map[string]*MergeResult) + go func() { + for fmr := range ch { + aggregation[fmr.FilePath] = fmr.MergeResult + } + }() + err := MergeCSVData( &Config{ Jobs: 2, skipRepoClone: true, @@ -183,10 +190,11 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4, FileVersProvider: &fileVersProviderMock{Workdir: test.workdir}, }, strings.NewReader(test.bqTable), + ch, ) assert.Nil(t, err) - var expectedAggregation FilesMergeResults + var expectedAggregation map[string]*MergeResult assert.Nil(t, json.Unmarshal([]byte(test.simpleAggregation), &expectedAggregation)) if !test.checkDetails { ignoreLineDetailsInTest(aggregation) @@ -196,7 +204,7 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4, } } -func ignoreLineDetailsInTest(results FilesMergeResults) { +func ignoreLineDetailsInTest(results map[string]*MergeResult) { for _, mr := range results { mr.LineDetails = nil } |
