diff options
Diffstat (limited to 'pkg')
| -rw-r--r-- | pkg/cover/file.go | 18 | ||||
| -rw-r--r-- | pkg/coveragedb/spanner.go | 73 | ||||
| -rw-r--r-- | pkg/covermerger/covermerger.go | 41 | ||||
| -rw-r--r-- | pkg/covermerger/covermerger_test.go | 14 |
4 files changed, 84 insertions, 62 deletions
diff --git a/pkg/cover/file.go b/pkg/cover/file.go index ff2b7a0ec..2939935b9 100644 --- a/pkg/cover/file.go +++ b/pkg/cover/file.go @@ -78,14 +78,24 @@ func GetMergeResult(c context.Context, ns, repo, forCommit, sourceCommit, filePa return nil, fmt.Errorf("failed to dbReader.Reader: %w", err) } - mergeResult, err := covermerger.MergeCSVData(config, csvReader) - if err != nil { + ch := make(chan *covermerger.FileMergeResult, 1) + if err := covermerger.MergeCSVData(config, csvReader, ch); err != nil { return nil, fmt.Errorf("error merging coverage: %w", err) } - if _, exist := mergeResult[filePath]; !exist { + + var mr *covermerger.MergeResult + select { + case fmr := <-ch: + if fmr != nil { + mr = fmr.MergeResult + } + default: + } + + if mr != nil { return nil, fmt.Errorf("no merge result for file %s", filePath) } - return mergeResult[filePath], nil + return mr, nil } func rendResult(content string, coverage *covermerger.MergeResult, renderConfig *CoverageRenderConfig) string { diff --git a/pkg/coveragedb/spanner.go b/pkg/coveragedb/spanner.go index db6d9ac79..e2a197e0c 100644 --- a/pkg/coveragedb/spanner.go +++ b/pkg/coveragedb/spanner.go @@ -4,8 +4,11 @@ package coveragedb import ( + "bufio" "context" + "encoding/json" "fmt" + "io" "os" "sync/atomic" "time" @@ -51,11 +54,6 @@ func NewClient(ctx context.Context, projectID string) (*spanner.Client, error) { return spanner.NewClient(ctx, database) } -type ManagersCoverage map[string]ManagerCoverage - -// ManagerCoverage is a file to coverage mapping. -type ManagerCoverage map[string]*Coverage - type Coverage struct { Instrumented int64 Covered int64 @@ -72,8 +70,14 @@ func (c *Coverage) AddLineHitCount(line, hitCount int) { } } -func SaveMergeResult(ctx context.Context, projectID string, manCovMap ManagersCoverage, - template *HistoryRecord, totalRows int64, sss []*subsystem.Subsystem) (int, error) { +type MergedCoverageRecord struct { + Manager string + FilePath string + FileData *Coverage +} + +func SaveMergeResult(ctx context.Context, projectID string, descr *HistoryRecord, jsonl io.Reader, + sss []*subsystem.Subsystem) (int, error) { client, err := NewClient(ctx, projectID) if err != nil { return 0, fmt.Errorf("spanner.NewClient() failed: %s", err.Error()) @@ -87,25 +91,30 @@ func SaveMergeResult(ctx context.Context, projectID string, manCovMap ManagersCo session := uuid.New().String() mutations := []*spanner.Mutation{} - for manager, covMap := range manCovMap { - for filePath, record := range covMap { - mutations = append(mutations, fileRecordMutation(session, manager, filePath, record)) - subsystems := fileSubsystems(filePath, ssMatcher, ssCache) - mutations = append(mutations, fileSubsystemsMutation(template.Namespace, filePath, subsystems)) - // There is a limit on the number of mutations per transaction (80k) imposed by the DB. - // This includes both explicit mutations of the fields (6 fields * 1k records = 6k mutations) - // and implicit index mutations. - // We keep the number of records low enough for the number of explicit mutations * 10 does not exceed the limit. - if len(mutations) > 1000 { - if _, err = client.Apply(ctx, mutations); err != nil { - return rowsCreated, fmt.Errorf("failed to spanner.Apply(inserts): %s", err.Error()) - } - rowsCreated += len(mutations) - mutations = nil + jsonlScanner := bufio.NewScanner(jsonl) + + for jsonlScanner.Scan() { + var mcr MergedCoverageRecord + if err := json.Unmarshal([]byte(jsonlScanner.Text()), &mcr); err != nil { + return rowsCreated, fmt.Errorf("json.Unmarshal(MergedCoverageRecord): %w", err) + } + mutations = append(mutations, fileRecordMutation(session, &mcr)) + subsystems := fileSubsystems(mcr.FilePath, ssMatcher, ssCache) + mutations = append(mutations, fileSubsystemsMutation(descr.Namespace, mcr.FilePath, subsystems)) + // There is a limit on the number of mutations per transaction (80k) imposed by the DB. + // This includes both explicit mutations of the fields (6 fields * 1k records = 6k mutations) + // and implicit index mutations. + // We keep the number of records low enough for the number of explicit mutations * 10 does not exceed the limit. + if len(mutations) > 1000 { + if _, err = client.Apply(ctx, mutations); err != nil { + return rowsCreated, fmt.Errorf("failed to spanner.Apply(inserts): %s", err.Error()) } + rowsCreated += len(mutations) + mutations = nil } } - mutations = append(mutations, historyMutation(session, template, totalRows)) + + mutations = append(mutations, historyMutation(session, descr)) if _, err = client.Apply(ctx, mutations); err != nil { return rowsCreated, fmt.Errorf("failed to spanner.Apply(inserts): %s", err.Error()) } @@ -170,7 +179,7 @@ func ReadLinesHitCount(ctx context.Context, ns, commit, file string, tp TimePeri return res, nil } -func historyMutation(session string, template *HistoryRecord, totalRows int64) *spanner.Mutation { +func historyMutation(session string, template *HistoryRecord) *spanner.Mutation { historyInsert, err := spanner.InsertOrUpdateStruct("merge_history", &HistoryRecord{ Session: session, Time: time.Now(), @@ -179,7 +188,7 @@ func historyMutation(session string, template *HistoryRecord, totalRows int64) * Commit: template.Commit, Duration: template.Duration, DateTo: template.DateTo, - TotalRows: totalRows, + TotalRows: template.TotalRows, }) if err != nil { panic(fmt.Sprintf("failed to spanner.InsertStruct(): %s", err.Error())) @@ -187,15 +196,15 @@ func historyMutation(session string, template *HistoryRecord, totalRows int64) * return historyInsert } -func fileRecordMutation(session, manager, filePath string, record *Coverage) *spanner.Mutation { +func fileRecordMutation(session string, mcr *MergedCoverageRecord) *spanner.Mutation { insert, err := spanner.InsertOrUpdateStruct("files", &FilesRecord{ Session: session, - FilePath: filePath, - Instrumented: record.Instrumented, - Covered: record.Covered, - LinesInstrumented: record.LinesInstrumented, - HitCounts: record.HitCounts, - Manager: manager, + FilePath: mcr.FilePath, + Instrumented: mcr.FileData.Instrumented, + Covered: mcr.FileData.Covered, + LinesInstrumented: mcr.FileData.LinesInstrumented, + HitCounts: mcr.FileData.HitCounts, + Manager: mcr.Manager, }) if err != nil { panic(fmt.Sprintf("failed to fileRecordMutation: %v", err)) diff --git a/pkg/covermerger/covermerger.go b/pkg/covermerger/covermerger.go index 394b606de..0f6ff43e4 100644 --- a/pkg/covermerger/covermerger.go +++ b/pkg/covermerger/covermerger.go @@ -10,7 +10,6 @@ import ( "fmt" "io" "strconv" - "sync" "github.com/google/syzkaller/pkg/log" "golang.org/x/exp/maps" @@ -127,13 +126,16 @@ func isSchema(fields, schema []string) bool { return true } -type FilesMergeResults map[string]*MergeResult +type FileMergeResult struct { + FilePath string + *MergeResult +} -func MergeCSVData(config *Config, reader io.Reader) (FilesMergeResults, error) { +func MergeCSVData(config *Config, reader io.Reader, results chan<- *FileMergeResult) error { var schema []string csvReader := csv.NewReader(reader) if fields, err := csvReader.Read(); err != nil { - return nil, fmt.Errorf("failed to read schema: %w", err) + return fmt.Errorf("failed to read schema: %w", err) } else { schema = fields } @@ -163,13 +165,13 @@ func MergeCSVData(config *Config, reader io.Reader) (FilesMergeResults, error) { } errStreamChan <- nil }() - mergeResult, errMerging := mergeChanData(config, recordsChan) + errMerging := mergeChanData(config, recordsChan, results) errStream := <-errStreamChan if errMerging != nil || errStream != nil { - return nil, fmt.Errorf("errors merging stream data:\nmerger err: %w\nstream reader err: %w", + return fmt.Errorf("errors merging stream data:\nmerger err: %w\nstream reader err: %w", errMerging, errStream) } - return mergeResult, nil + return nil } type FileRecords struct { @@ -177,33 +179,26 @@ type FileRecords struct { records []*FileRecord } -func mergeChanData(c *Config, recordChan <-chan *FileRecord) (FilesMergeResults, error) { +func mergeChanData(c *Config, recordChan <-chan *FileRecord, results chan<- *FileMergeResult) error { g, ctx := errgroup.WithContext(context.Background()) frecordChan := groupFileRecords(recordChan, ctx) - stat := make(FilesMergeResults) - var mu sync.Mutex + for i := 0; i < c.Jobs; i++ { g.Go(func() error { for frecord := range frecordChan { - if mr, err := batchFileData(c, frecord.fileName, frecord.records); err != nil { + mr, err := batchFileData(c, frecord.fileName, frecord.records) + if err != nil { return fmt.Errorf("failed to batchFileData(%s): %w", frecord.fileName, err) - } else { - mu.Lock() - if _, exist := stat[frecord.fileName]; exist { - mu.Unlock() - return fmt.Errorf("file %s was already processed", frecord.fileName) - } - stat[frecord.fileName] = mr - mu.Unlock() + } + results <- &FileMergeResult{ + FilePath: frecord.fileName, + MergeResult: mr, } } return nil }) } - if err := g.Wait(); err != nil { - return nil, err - } - return stat, nil + return g.Wait() } func groupFileRecords(recordChan <-chan *FileRecord, ctx context.Context) chan FileRecords { diff --git a/pkg/covermerger/covermerger_test.go b/pkg/covermerger/covermerger_test.go index b63ae36be..4580c114f 100644 --- a/pkg/covermerger/covermerger_test.go +++ b/pkg/covermerger/covermerger_test.go @@ -172,7 +172,14 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - aggregation, err := MergeCSVData( + ch := make(chan *FileMergeResult) + aggregation := make(map[string]*MergeResult) + go func() { + for fmr := range ch { + aggregation[fmr.FilePath] = fmr.MergeResult + } + }() + err := MergeCSVData( &Config{ Jobs: 2, skipRepoClone: true, @@ -183,10 +190,11 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4, FileVersProvider: &fileVersProviderMock{Workdir: test.workdir}, }, strings.NewReader(test.bqTable), + ch, ) assert.Nil(t, err) - var expectedAggregation FilesMergeResults + var expectedAggregation map[string]*MergeResult assert.Nil(t, json.Unmarshal([]byte(test.simpleAggregation), &expectedAggregation)) if !test.checkDetails { ignoreLineDetailsInTest(aggregation) @@ -196,7 +204,7 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4, } } -func ignoreLineDetailsInTest(results FilesMergeResults) { +func ignoreLineDetailsInTest(results map[string]*MergeResult) { for _, mr := range results { mr.LineDetails = nil } |
