From fef5bb5152a5013d0c4571cd0601f46824a9664b Mon Sep 17 00:00:00 2001 From: Taras Madan Date: Wed, 18 Dec 2024 13:33:01 +0100 Subject: tools/syz-covermerger: upload coverage as jsonl Previous implementation store only the summary of processed records. The summary was <1GB and single processing node was able to manipulate the data. Current implementation stores all the details about records read to make post-processing more flexible. This change was needed to get access to the source manager name and will help to analyze other details. This new implementation requires 20GB mem to process single day records. CSV log interning experiment allowed to merge using 10G. Quarter data aggregation will cost ~100 times more. The alternative is to use stream processing. We can process data kernel-file-by-file. It allows to /15000 memory consumption. This approach is implemented here. We're batching coverage signals by file and store per-file results in GCS JSONL file. See https://jsonlines.org/ to learn about jsonl. --- pkg/covermerger/covermerger.go | 41 ++++++++++++++++--------------------- pkg/covermerger/covermerger_test.go | 14 ++++++++++--- 2 files changed, 29 insertions(+), 26 deletions(-) (limited to 'pkg/covermerger') diff --git a/pkg/covermerger/covermerger.go b/pkg/covermerger/covermerger.go index 394b606de..0f6ff43e4 100644 --- a/pkg/covermerger/covermerger.go +++ b/pkg/covermerger/covermerger.go @@ -10,7 +10,6 @@ import ( "fmt" "io" "strconv" - "sync" "github.com/google/syzkaller/pkg/log" "golang.org/x/exp/maps" @@ -127,13 +126,16 @@ func isSchema(fields, schema []string) bool { return true } -type FilesMergeResults map[string]*MergeResult +type FileMergeResult struct { + FilePath string + *MergeResult +} -func MergeCSVData(config *Config, reader io.Reader) (FilesMergeResults, error) { +func MergeCSVData(config *Config, reader io.Reader, results chan<- *FileMergeResult) error { var schema []string csvReader := csv.NewReader(reader) if fields, err := csvReader.Read(); err != nil { - return nil, fmt.Errorf("failed to read schema: %w", err) + return fmt.Errorf("failed to read schema: %w", err) } else { schema = fields } @@ -163,13 +165,13 @@ func MergeCSVData(config *Config, reader io.Reader) (FilesMergeResults, error) { } errStreamChan <- nil }() - mergeResult, errMerging := mergeChanData(config, recordsChan) + errMerging := mergeChanData(config, recordsChan, results) errStream := <-errStreamChan if errMerging != nil || errStream != nil { - return nil, fmt.Errorf("errors merging stream data:\nmerger err: %w\nstream reader err: %w", + return fmt.Errorf("errors merging stream data:\nmerger err: %w\nstream reader err: %w", errMerging, errStream) } - return mergeResult, nil + return nil } type FileRecords struct { @@ -177,33 +179,26 @@ type FileRecords struct { records []*FileRecord } -func mergeChanData(c *Config, recordChan <-chan *FileRecord) (FilesMergeResults, error) { +func mergeChanData(c *Config, recordChan <-chan *FileRecord, results chan<- *FileMergeResult) error { g, ctx := errgroup.WithContext(context.Background()) frecordChan := groupFileRecords(recordChan, ctx) - stat := make(FilesMergeResults) - var mu sync.Mutex + for i := 0; i < c.Jobs; i++ { g.Go(func() error { for frecord := range frecordChan { - if mr, err := batchFileData(c, frecord.fileName, frecord.records); err != nil { + mr, err := batchFileData(c, frecord.fileName, frecord.records) + if err != nil { return fmt.Errorf("failed to batchFileData(%s): %w", frecord.fileName, err) - } else { - mu.Lock() - if _, exist := stat[frecord.fileName]; exist { - mu.Unlock() - return fmt.Errorf("file %s was already processed", frecord.fileName) - } - stat[frecord.fileName] = mr - mu.Unlock() + } + results <- &FileMergeResult{ + FilePath: frecord.fileName, + MergeResult: mr, } } return nil }) } - if err := g.Wait(); err != nil { - return nil, err - } - return stat, nil + return g.Wait() } func groupFileRecords(recordChan <-chan *FileRecord, ctx context.Context) chan FileRecords { diff --git a/pkg/covermerger/covermerger_test.go b/pkg/covermerger/covermerger_test.go index b63ae36be..4580c114f 100644 --- a/pkg/covermerger/covermerger_test.go +++ b/pkg/covermerger/covermerger_test.go @@ -172,7 +172,14 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - aggregation, err := MergeCSVData( + ch := make(chan *FileMergeResult) + aggregation := make(map[string]*MergeResult) + go func() { + for fmr := range ch { + aggregation[fmr.FilePath] = fmr.MergeResult + } + }() + err := MergeCSVData( &Config{ Jobs: 2, skipRepoClone: true, @@ -183,10 +190,11 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4, FileVersProvider: &fileVersProviderMock{Workdir: test.workdir}, }, strings.NewReader(test.bqTable), + ch, ) assert.Nil(t, err) - var expectedAggregation FilesMergeResults + var expectedAggregation map[string]*MergeResult assert.Nil(t, json.Unmarshal([]byte(test.simpleAggregation), &expectedAggregation)) if !test.checkDetails { ignoreLineDetailsInTest(aggregation) @@ -196,7 +204,7 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4, } } -func ignoreLineDetailsInTest(results FilesMergeResults) { +func ignoreLineDetailsInTest(results map[string]*MergeResult) { for _, mr := range results { mr.LineDetails = nil } -- cgit mrf-deployment