aboutsummaryrefslogtreecommitdiffstats
path: root/pkg/covermerger
diff options
context:
space:
mode:
authorTaras Madan <tarasmadan@google.com>2024-12-18 13:33:01 +0100
committerTaras Madan <tarasmadan@google.com>2024-12-19 14:38:21 +0000
commitfef5bb5152a5013d0c4571cd0601f46824a9664b (patch)
treec8fc887f6bfe48ae314a55f439ca1e7cbea370c7 /pkg/covermerger
parentf1c188dc6963bf016692df095d783e4945d84f7b (diff)
tools/syz-covermerger: upload coverage as jsonl
Previous implementation store only the summary of processed records. The summary was <1GB and single processing node was able to manipulate the data. Current implementation stores all the details about records read to make post-processing more flexible. This change was needed to get access to the source manager name and will help to analyze other details. This new implementation requires 20GB mem to process single day records. CSV log interning experiment allowed to merge using 10G. Quarter data aggregation will cost ~100 times more. The alternative is to use stream processing. We can process data kernel-file-by-file. It allows to /15000 memory consumption. This approach is implemented here. We're batching coverage signals by file and store per-file results in GCS JSONL file. See https://jsonlines.org/ to learn about jsonl.
Diffstat (limited to 'pkg/covermerger')
-rw-r--r--pkg/covermerger/covermerger.go41
-rw-r--r--pkg/covermerger/covermerger_test.go14
2 files changed, 29 insertions, 26 deletions
diff --git a/pkg/covermerger/covermerger.go b/pkg/covermerger/covermerger.go
index 394b606de..0f6ff43e4 100644
--- a/pkg/covermerger/covermerger.go
+++ b/pkg/covermerger/covermerger.go
@@ -10,7 +10,6 @@ import (
"fmt"
"io"
"strconv"
- "sync"
"github.com/google/syzkaller/pkg/log"
"golang.org/x/exp/maps"
@@ -127,13 +126,16 @@ func isSchema(fields, schema []string) bool {
return true
}
-type FilesMergeResults map[string]*MergeResult
+type FileMergeResult struct {
+ FilePath string
+ *MergeResult
+}
-func MergeCSVData(config *Config, reader io.Reader) (FilesMergeResults, error) {
+func MergeCSVData(config *Config, reader io.Reader, results chan<- *FileMergeResult) error {
var schema []string
csvReader := csv.NewReader(reader)
if fields, err := csvReader.Read(); err != nil {
- return nil, fmt.Errorf("failed to read schema: %w", err)
+ return fmt.Errorf("failed to read schema: %w", err)
} else {
schema = fields
}
@@ -163,13 +165,13 @@ func MergeCSVData(config *Config, reader io.Reader) (FilesMergeResults, error) {
}
errStreamChan <- nil
}()
- mergeResult, errMerging := mergeChanData(config, recordsChan)
+ errMerging := mergeChanData(config, recordsChan, results)
errStream := <-errStreamChan
if errMerging != nil || errStream != nil {
- return nil, fmt.Errorf("errors merging stream data:\nmerger err: %w\nstream reader err: %w",
+ return fmt.Errorf("errors merging stream data:\nmerger err: %w\nstream reader err: %w",
errMerging, errStream)
}
- return mergeResult, nil
+ return nil
}
type FileRecords struct {
@@ -177,33 +179,26 @@ type FileRecords struct {
records []*FileRecord
}
-func mergeChanData(c *Config, recordChan <-chan *FileRecord) (FilesMergeResults, error) {
+func mergeChanData(c *Config, recordChan <-chan *FileRecord, results chan<- *FileMergeResult) error {
g, ctx := errgroup.WithContext(context.Background())
frecordChan := groupFileRecords(recordChan, ctx)
- stat := make(FilesMergeResults)
- var mu sync.Mutex
+
for i := 0; i < c.Jobs; i++ {
g.Go(func() error {
for frecord := range frecordChan {
- if mr, err := batchFileData(c, frecord.fileName, frecord.records); err != nil {
+ mr, err := batchFileData(c, frecord.fileName, frecord.records)
+ if err != nil {
return fmt.Errorf("failed to batchFileData(%s): %w", frecord.fileName, err)
- } else {
- mu.Lock()
- if _, exist := stat[frecord.fileName]; exist {
- mu.Unlock()
- return fmt.Errorf("file %s was already processed", frecord.fileName)
- }
- stat[frecord.fileName] = mr
- mu.Unlock()
+ }
+ results <- &FileMergeResult{
+ FilePath: frecord.fileName,
+ MergeResult: mr,
}
}
return nil
})
}
- if err := g.Wait(); err != nil {
- return nil, err
- }
- return stat, nil
+ return g.Wait()
}
func groupFileRecords(recordChan <-chan *FileRecord, ctx context.Context) chan FileRecords {
diff --git a/pkg/covermerger/covermerger_test.go b/pkg/covermerger/covermerger_test.go
index b63ae36be..4580c114f 100644
--- a/pkg/covermerger/covermerger_test.go
+++ b/pkg/covermerger/covermerger_test.go
@@ -172,7 +172,14 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4,
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
- aggregation, err := MergeCSVData(
+ ch := make(chan *FileMergeResult)
+ aggregation := make(map[string]*MergeResult)
+ go func() {
+ for fmr := range ch {
+ aggregation[fmr.FilePath] = fmr.MergeResult
+ }
+ }()
+ err := MergeCSVData(
&Config{
Jobs: 2,
skipRepoClone: true,
@@ -183,10 +190,11 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4,
FileVersProvider: &fileVersProviderMock{Workdir: test.workdir},
},
strings.NewReader(test.bqTable),
+ ch,
)
assert.Nil(t, err)
- var expectedAggregation FilesMergeResults
+ var expectedAggregation map[string]*MergeResult
assert.Nil(t, json.Unmarshal([]byte(test.simpleAggregation), &expectedAggregation))
if !test.checkDetails {
ignoreLineDetailsInTest(aggregation)
@@ -196,7 +204,7 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4,
}
}
-func ignoreLineDetailsInTest(results FilesMergeResults) {
+func ignoreLineDetailsInTest(results map[string]*MergeResult) {
for _, mr := range results {
mr.LineDetails = nil
}