aboutsummaryrefslogtreecommitdiffstats
path: root/pkg
diff options
context:
space:
mode:
authorTaras Madan <tarasmadan@google.com>2024-12-18 13:33:01 +0100
committerTaras Madan <tarasmadan@google.com>2024-12-19 14:38:21 +0000
commitfef5bb5152a5013d0c4571cd0601f46824a9664b (patch)
treec8fc887f6bfe48ae314a55f439ca1e7cbea370c7 /pkg
parentf1c188dc6963bf016692df095d783e4945d84f7b (diff)
tools/syz-covermerger: upload coverage as jsonl
Previous implementation store only the summary of processed records. The summary was <1GB and single processing node was able to manipulate the data. Current implementation stores all the details about records read to make post-processing more flexible. This change was needed to get access to the source manager name and will help to analyze other details. This new implementation requires 20GB mem to process single day records. CSV log interning experiment allowed to merge using 10G. Quarter data aggregation will cost ~100 times more. The alternative is to use stream processing. We can process data kernel-file-by-file. It allows to /15000 memory consumption. This approach is implemented here. We're batching coverage signals by file and store per-file results in GCS JSONL file. See https://jsonlines.org/ to learn about jsonl.
Diffstat (limited to 'pkg')
-rw-r--r--pkg/cover/file.go18
-rw-r--r--pkg/coveragedb/spanner.go73
-rw-r--r--pkg/covermerger/covermerger.go41
-rw-r--r--pkg/covermerger/covermerger_test.go14
4 files changed, 84 insertions, 62 deletions
diff --git a/pkg/cover/file.go b/pkg/cover/file.go
index ff2b7a0ec..2939935b9 100644
--- a/pkg/cover/file.go
+++ b/pkg/cover/file.go
@@ -78,14 +78,24 @@ func GetMergeResult(c context.Context, ns, repo, forCommit, sourceCommit, filePa
return nil, fmt.Errorf("failed to dbReader.Reader: %w", err)
}
- mergeResult, err := covermerger.MergeCSVData(config, csvReader)
- if err != nil {
+ ch := make(chan *covermerger.FileMergeResult, 1)
+ if err := covermerger.MergeCSVData(config, csvReader, ch); err != nil {
return nil, fmt.Errorf("error merging coverage: %w", err)
}
- if _, exist := mergeResult[filePath]; !exist {
+
+ var mr *covermerger.MergeResult
+ select {
+ case fmr := <-ch:
+ if fmr != nil {
+ mr = fmr.MergeResult
+ }
+ default:
+ }
+
+ if mr != nil {
return nil, fmt.Errorf("no merge result for file %s", filePath)
}
- return mergeResult[filePath], nil
+ return mr, nil
}
func rendResult(content string, coverage *covermerger.MergeResult, renderConfig *CoverageRenderConfig) string {
diff --git a/pkg/coveragedb/spanner.go b/pkg/coveragedb/spanner.go
index db6d9ac79..e2a197e0c 100644
--- a/pkg/coveragedb/spanner.go
+++ b/pkg/coveragedb/spanner.go
@@ -4,8 +4,11 @@
package coveragedb
import (
+ "bufio"
"context"
+ "encoding/json"
"fmt"
+ "io"
"os"
"sync/atomic"
"time"
@@ -51,11 +54,6 @@ func NewClient(ctx context.Context, projectID string) (*spanner.Client, error) {
return spanner.NewClient(ctx, database)
}
-type ManagersCoverage map[string]ManagerCoverage
-
-// ManagerCoverage is a file to coverage mapping.
-type ManagerCoverage map[string]*Coverage
-
type Coverage struct {
Instrumented int64
Covered int64
@@ -72,8 +70,14 @@ func (c *Coverage) AddLineHitCount(line, hitCount int) {
}
}
-func SaveMergeResult(ctx context.Context, projectID string, manCovMap ManagersCoverage,
- template *HistoryRecord, totalRows int64, sss []*subsystem.Subsystem) (int, error) {
+type MergedCoverageRecord struct {
+ Manager string
+ FilePath string
+ FileData *Coverage
+}
+
+func SaveMergeResult(ctx context.Context, projectID string, descr *HistoryRecord, jsonl io.Reader,
+ sss []*subsystem.Subsystem) (int, error) {
client, err := NewClient(ctx, projectID)
if err != nil {
return 0, fmt.Errorf("spanner.NewClient() failed: %s", err.Error())
@@ -87,25 +91,30 @@ func SaveMergeResult(ctx context.Context, projectID string, manCovMap ManagersCo
session := uuid.New().String()
mutations := []*spanner.Mutation{}
- for manager, covMap := range manCovMap {
- for filePath, record := range covMap {
- mutations = append(mutations, fileRecordMutation(session, manager, filePath, record))
- subsystems := fileSubsystems(filePath, ssMatcher, ssCache)
- mutations = append(mutations, fileSubsystemsMutation(template.Namespace, filePath, subsystems))
- // There is a limit on the number of mutations per transaction (80k) imposed by the DB.
- // This includes both explicit mutations of the fields (6 fields * 1k records = 6k mutations)
- // and implicit index mutations.
- // We keep the number of records low enough for the number of explicit mutations * 10 does not exceed the limit.
- if len(mutations) > 1000 {
- if _, err = client.Apply(ctx, mutations); err != nil {
- return rowsCreated, fmt.Errorf("failed to spanner.Apply(inserts): %s", err.Error())
- }
- rowsCreated += len(mutations)
- mutations = nil
+ jsonlScanner := bufio.NewScanner(jsonl)
+
+ for jsonlScanner.Scan() {
+ var mcr MergedCoverageRecord
+ if err := json.Unmarshal([]byte(jsonlScanner.Text()), &mcr); err != nil {
+ return rowsCreated, fmt.Errorf("json.Unmarshal(MergedCoverageRecord): %w", err)
+ }
+ mutations = append(mutations, fileRecordMutation(session, &mcr))
+ subsystems := fileSubsystems(mcr.FilePath, ssMatcher, ssCache)
+ mutations = append(mutations, fileSubsystemsMutation(descr.Namespace, mcr.FilePath, subsystems))
+ // There is a limit on the number of mutations per transaction (80k) imposed by the DB.
+ // This includes both explicit mutations of the fields (6 fields * 1k records = 6k mutations)
+ // and implicit index mutations.
+ // We keep the number of records low enough for the number of explicit mutations * 10 does not exceed the limit.
+ if len(mutations) > 1000 {
+ if _, err = client.Apply(ctx, mutations); err != nil {
+ return rowsCreated, fmt.Errorf("failed to spanner.Apply(inserts): %s", err.Error())
}
+ rowsCreated += len(mutations)
+ mutations = nil
}
}
- mutations = append(mutations, historyMutation(session, template, totalRows))
+
+ mutations = append(mutations, historyMutation(session, descr))
if _, err = client.Apply(ctx, mutations); err != nil {
return rowsCreated, fmt.Errorf("failed to spanner.Apply(inserts): %s", err.Error())
}
@@ -170,7 +179,7 @@ func ReadLinesHitCount(ctx context.Context, ns, commit, file string, tp TimePeri
return res, nil
}
-func historyMutation(session string, template *HistoryRecord, totalRows int64) *spanner.Mutation {
+func historyMutation(session string, template *HistoryRecord) *spanner.Mutation {
historyInsert, err := spanner.InsertOrUpdateStruct("merge_history", &HistoryRecord{
Session: session,
Time: time.Now(),
@@ -179,7 +188,7 @@ func historyMutation(session string, template *HistoryRecord, totalRows int64) *
Commit: template.Commit,
Duration: template.Duration,
DateTo: template.DateTo,
- TotalRows: totalRows,
+ TotalRows: template.TotalRows,
})
if err != nil {
panic(fmt.Sprintf("failed to spanner.InsertStruct(): %s", err.Error()))
@@ -187,15 +196,15 @@ func historyMutation(session string, template *HistoryRecord, totalRows int64) *
return historyInsert
}
-func fileRecordMutation(session, manager, filePath string, record *Coverage) *spanner.Mutation {
+func fileRecordMutation(session string, mcr *MergedCoverageRecord) *spanner.Mutation {
insert, err := spanner.InsertOrUpdateStruct("files", &FilesRecord{
Session: session,
- FilePath: filePath,
- Instrumented: record.Instrumented,
- Covered: record.Covered,
- LinesInstrumented: record.LinesInstrumented,
- HitCounts: record.HitCounts,
- Manager: manager,
+ FilePath: mcr.FilePath,
+ Instrumented: mcr.FileData.Instrumented,
+ Covered: mcr.FileData.Covered,
+ LinesInstrumented: mcr.FileData.LinesInstrumented,
+ HitCounts: mcr.FileData.HitCounts,
+ Manager: mcr.Manager,
})
if err != nil {
panic(fmt.Sprintf("failed to fileRecordMutation: %v", err))
diff --git a/pkg/covermerger/covermerger.go b/pkg/covermerger/covermerger.go
index 394b606de..0f6ff43e4 100644
--- a/pkg/covermerger/covermerger.go
+++ b/pkg/covermerger/covermerger.go
@@ -10,7 +10,6 @@ import (
"fmt"
"io"
"strconv"
- "sync"
"github.com/google/syzkaller/pkg/log"
"golang.org/x/exp/maps"
@@ -127,13 +126,16 @@ func isSchema(fields, schema []string) bool {
return true
}
-type FilesMergeResults map[string]*MergeResult
+type FileMergeResult struct {
+ FilePath string
+ *MergeResult
+}
-func MergeCSVData(config *Config, reader io.Reader) (FilesMergeResults, error) {
+func MergeCSVData(config *Config, reader io.Reader, results chan<- *FileMergeResult) error {
var schema []string
csvReader := csv.NewReader(reader)
if fields, err := csvReader.Read(); err != nil {
- return nil, fmt.Errorf("failed to read schema: %w", err)
+ return fmt.Errorf("failed to read schema: %w", err)
} else {
schema = fields
}
@@ -163,13 +165,13 @@ func MergeCSVData(config *Config, reader io.Reader) (FilesMergeResults, error) {
}
errStreamChan <- nil
}()
- mergeResult, errMerging := mergeChanData(config, recordsChan)
+ errMerging := mergeChanData(config, recordsChan, results)
errStream := <-errStreamChan
if errMerging != nil || errStream != nil {
- return nil, fmt.Errorf("errors merging stream data:\nmerger err: %w\nstream reader err: %w",
+ return fmt.Errorf("errors merging stream data:\nmerger err: %w\nstream reader err: %w",
errMerging, errStream)
}
- return mergeResult, nil
+ return nil
}
type FileRecords struct {
@@ -177,33 +179,26 @@ type FileRecords struct {
records []*FileRecord
}
-func mergeChanData(c *Config, recordChan <-chan *FileRecord) (FilesMergeResults, error) {
+func mergeChanData(c *Config, recordChan <-chan *FileRecord, results chan<- *FileMergeResult) error {
g, ctx := errgroup.WithContext(context.Background())
frecordChan := groupFileRecords(recordChan, ctx)
- stat := make(FilesMergeResults)
- var mu sync.Mutex
+
for i := 0; i < c.Jobs; i++ {
g.Go(func() error {
for frecord := range frecordChan {
- if mr, err := batchFileData(c, frecord.fileName, frecord.records); err != nil {
+ mr, err := batchFileData(c, frecord.fileName, frecord.records)
+ if err != nil {
return fmt.Errorf("failed to batchFileData(%s): %w", frecord.fileName, err)
- } else {
- mu.Lock()
- if _, exist := stat[frecord.fileName]; exist {
- mu.Unlock()
- return fmt.Errorf("file %s was already processed", frecord.fileName)
- }
- stat[frecord.fileName] = mr
- mu.Unlock()
+ }
+ results <- &FileMergeResult{
+ FilePath: frecord.fileName,
+ MergeResult: mr,
}
}
return nil
})
}
- if err := g.Wait(); err != nil {
- return nil, err
- }
- return stat, nil
+ return g.Wait()
}
func groupFileRecords(recordChan <-chan *FileRecord, ctx context.Context) chan FileRecords {
diff --git a/pkg/covermerger/covermerger_test.go b/pkg/covermerger/covermerger_test.go
index b63ae36be..4580c114f 100644
--- a/pkg/covermerger/covermerger_test.go
+++ b/pkg/covermerger/covermerger_test.go
@@ -172,7 +172,14 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4,
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
- aggregation, err := MergeCSVData(
+ ch := make(chan *FileMergeResult)
+ aggregation := make(map[string]*MergeResult)
+ go func() {
+ for fmr := range ch {
+ aggregation[fmr.FilePath] = fmr.MergeResult
+ }
+ }()
+ err := MergeCSVData(
&Config{
Jobs: 2,
skipRepoClone: true,
@@ -183,10 +190,11 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4,
FileVersProvider: &fileVersProviderMock{Workdir: test.workdir},
},
strings.NewReader(test.bqTable),
+ ch,
)
assert.Nil(t, err)
- var expectedAggregation FilesMergeResults
+ var expectedAggregation map[string]*MergeResult
assert.Nil(t, json.Unmarshal([]byte(test.simpleAggregation), &expectedAggregation))
if !test.checkDetails {
ignoreLineDetailsInTest(aggregation)
@@ -196,7 +204,7 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4,
}
}
-func ignoreLineDetailsInTest(results FilesMergeResults) {
+func ignoreLineDetailsInTest(results map[string]*MergeResult) {
for _, mr := range results {
mr.LineDetails = nil
}