aboutsummaryrefslogtreecommitdiffstats
path: root/pkg
diff options
context:
space:
mode:
Diffstat (limited to 'pkg')
-rw-r--r--pkg/cover/file.go18
-rw-r--r--pkg/coveragedb/spanner.go73
-rw-r--r--pkg/covermerger/covermerger.go41
-rw-r--r--pkg/covermerger/covermerger_test.go14
4 files changed, 84 insertions, 62 deletions
diff --git a/pkg/cover/file.go b/pkg/cover/file.go
index ff2b7a0ec..2939935b9 100644
--- a/pkg/cover/file.go
+++ b/pkg/cover/file.go
@@ -78,14 +78,24 @@ func GetMergeResult(c context.Context, ns, repo, forCommit, sourceCommit, filePa
return nil, fmt.Errorf("failed to dbReader.Reader: %w", err)
}
- mergeResult, err := covermerger.MergeCSVData(config, csvReader)
- if err != nil {
+ ch := make(chan *covermerger.FileMergeResult, 1)
+ if err := covermerger.MergeCSVData(config, csvReader, ch); err != nil {
return nil, fmt.Errorf("error merging coverage: %w", err)
}
- if _, exist := mergeResult[filePath]; !exist {
+
+ var mr *covermerger.MergeResult
+ select {
+ case fmr := <-ch:
+ if fmr != nil {
+ mr = fmr.MergeResult
+ }
+ default:
+ }
+
+ if mr != nil {
return nil, fmt.Errorf("no merge result for file %s", filePath)
}
- return mergeResult[filePath], nil
+ return mr, nil
}
func rendResult(content string, coverage *covermerger.MergeResult, renderConfig *CoverageRenderConfig) string {
diff --git a/pkg/coveragedb/spanner.go b/pkg/coveragedb/spanner.go
index db6d9ac79..e2a197e0c 100644
--- a/pkg/coveragedb/spanner.go
+++ b/pkg/coveragedb/spanner.go
@@ -4,8 +4,11 @@
package coveragedb
import (
+ "bufio"
"context"
+ "encoding/json"
"fmt"
+ "io"
"os"
"sync/atomic"
"time"
@@ -51,11 +54,6 @@ func NewClient(ctx context.Context, projectID string) (*spanner.Client, error) {
return spanner.NewClient(ctx, database)
}
-type ManagersCoverage map[string]ManagerCoverage
-
-// ManagerCoverage is a file to coverage mapping.
-type ManagerCoverage map[string]*Coverage
-
type Coverage struct {
Instrumented int64
Covered int64
@@ -72,8 +70,14 @@ func (c *Coverage) AddLineHitCount(line, hitCount int) {
}
}
-func SaveMergeResult(ctx context.Context, projectID string, manCovMap ManagersCoverage,
- template *HistoryRecord, totalRows int64, sss []*subsystem.Subsystem) (int, error) {
+type MergedCoverageRecord struct {
+ Manager string
+ FilePath string
+ FileData *Coverage
+}
+
+func SaveMergeResult(ctx context.Context, projectID string, descr *HistoryRecord, jsonl io.Reader,
+ sss []*subsystem.Subsystem) (int, error) {
client, err := NewClient(ctx, projectID)
if err != nil {
return 0, fmt.Errorf("spanner.NewClient() failed: %s", err.Error())
@@ -87,25 +91,30 @@ func SaveMergeResult(ctx context.Context, projectID string, manCovMap ManagersCo
session := uuid.New().String()
mutations := []*spanner.Mutation{}
- for manager, covMap := range manCovMap {
- for filePath, record := range covMap {
- mutations = append(mutations, fileRecordMutation(session, manager, filePath, record))
- subsystems := fileSubsystems(filePath, ssMatcher, ssCache)
- mutations = append(mutations, fileSubsystemsMutation(template.Namespace, filePath, subsystems))
- // There is a limit on the number of mutations per transaction (80k) imposed by the DB.
- // This includes both explicit mutations of the fields (6 fields * 1k records = 6k mutations)
- // and implicit index mutations.
- // We keep the number of records low enough for the number of explicit mutations * 10 does not exceed the limit.
- if len(mutations) > 1000 {
- if _, err = client.Apply(ctx, mutations); err != nil {
- return rowsCreated, fmt.Errorf("failed to spanner.Apply(inserts): %s", err.Error())
- }
- rowsCreated += len(mutations)
- mutations = nil
+ jsonlScanner := bufio.NewScanner(jsonl)
+
+ for jsonlScanner.Scan() {
+ var mcr MergedCoverageRecord
+ if err := json.Unmarshal([]byte(jsonlScanner.Text()), &mcr); err != nil {
+ return rowsCreated, fmt.Errorf("json.Unmarshal(MergedCoverageRecord): %w", err)
+ }
+ mutations = append(mutations, fileRecordMutation(session, &mcr))
+ subsystems := fileSubsystems(mcr.FilePath, ssMatcher, ssCache)
+ mutations = append(mutations, fileSubsystemsMutation(descr.Namespace, mcr.FilePath, subsystems))
+ // There is a limit on the number of mutations per transaction (80k) imposed by the DB.
+ // This includes both explicit mutations of the fields (6 fields * 1k records = 6k mutations)
+ // and implicit index mutations.
+ // We keep the number of records low enough for the number of explicit mutations * 10 does not exceed the limit.
+ if len(mutations) > 1000 {
+ if _, err = client.Apply(ctx, mutations); err != nil {
+ return rowsCreated, fmt.Errorf("failed to spanner.Apply(inserts): %s", err.Error())
}
+ rowsCreated += len(mutations)
+ mutations = nil
}
}
- mutations = append(mutations, historyMutation(session, template, totalRows))
+
+ mutations = append(mutations, historyMutation(session, descr))
if _, err = client.Apply(ctx, mutations); err != nil {
return rowsCreated, fmt.Errorf("failed to spanner.Apply(inserts): %s", err.Error())
}
@@ -170,7 +179,7 @@ func ReadLinesHitCount(ctx context.Context, ns, commit, file string, tp TimePeri
return res, nil
}
-func historyMutation(session string, template *HistoryRecord, totalRows int64) *spanner.Mutation {
+func historyMutation(session string, template *HistoryRecord) *spanner.Mutation {
historyInsert, err := spanner.InsertOrUpdateStruct("merge_history", &HistoryRecord{
Session: session,
Time: time.Now(),
@@ -179,7 +188,7 @@ func historyMutation(session string, template *HistoryRecord, totalRows int64) *
Commit: template.Commit,
Duration: template.Duration,
DateTo: template.DateTo,
- TotalRows: totalRows,
+ TotalRows: template.TotalRows,
})
if err != nil {
panic(fmt.Sprintf("failed to spanner.InsertStruct(): %s", err.Error()))
@@ -187,15 +196,15 @@ func historyMutation(session string, template *HistoryRecord, totalRows int64) *
return historyInsert
}
-func fileRecordMutation(session, manager, filePath string, record *Coverage) *spanner.Mutation {
+func fileRecordMutation(session string, mcr *MergedCoverageRecord) *spanner.Mutation {
insert, err := spanner.InsertOrUpdateStruct("files", &FilesRecord{
Session: session,
- FilePath: filePath,
- Instrumented: record.Instrumented,
- Covered: record.Covered,
- LinesInstrumented: record.LinesInstrumented,
- HitCounts: record.HitCounts,
- Manager: manager,
+ FilePath: mcr.FilePath,
+ Instrumented: mcr.FileData.Instrumented,
+ Covered: mcr.FileData.Covered,
+ LinesInstrumented: mcr.FileData.LinesInstrumented,
+ HitCounts: mcr.FileData.HitCounts,
+ Manager: mcr.Manager,
})
if err != nil {
panic(fmt.Sprintf("failed to fileRecordMutation: %v", err))
diff --git a/pkg/covermerger/covermerger.go b/pkg/covermerger/covermerger.go
index 394b606de..0f6ff43e4 100644
--- a/pkg/covermerger/covermerger.go
+++ b/pkg/covermerger/covermerger.go
@@ -10,7 +10,6 @@ import (
"fmt"
"io"
"strconv"
- "sync"
"github.com/google/syzkaller/pkg/log"
"golang.org/x/exp/maps"
@@ -127,13 +126,16 @@ func isSchema(fields, schema []string) bool {
return true
}
-type FilesMergeResults map[string]*MergeResult
+type FileMergeResult struct {
+ FilePath string
+ *MergeResult
+}
-func MergeCSVData(config *Config, reader io.Reader) (FilesMergeResults, error) {
+func MergeCSVData(config *Config, reader io.Reader, results chan<- *FileMergeResult) error {
var schema []string
csvReader := csv.NewReader(reader)
if fields, err := csvReader.Read(); err != nil {
- return nil, fmt.Errorf("failed to read schema: %w", err)
+ return fmt.Errorf("failed to read schema: %w", err)
} else {
schema = fields
}
@@ -163,13 +165,13 @@ func MergeCSVData(config *Config, reader io.Reader) (FilesMergeResults, error) {
}
errStreamChan <- nil
}()
- mergeResult, errMerging := mergeChanData(config, recordsChan)
+ errMerging := mergeChanData(config, recordsChan, results)
errStream := <-errStreamChan
if errMerging != nil || errStream != nil {
- return nil, fmt.Errorf("errors merging stream data:\nmerger err: %w\nstream reader err: %w",
+ return fmt.Errorf("errors merging stream data:\nmerger err: %w\nstream reader err: %w",
errMerging, errStream)
}
- return mergeResult, nil
+ return nil
}
type FileRecords struct {
@@ -177,33 +179,26 @@ type FileRecords struct {
records []*FileRecord
}
-func mergeChanData(c *Config, recordChan <-chan *FileRecord) (FilesMergeResults, error) {
+func mergeChanData(c *Config, recordChan <-chan *FileRecord, results chan<- *FileMergeResult) error {
g, ctx := errgroup.WithContext(context.Background())
frecordChan := groupFileRecords(recordChan, ctx)
- stat := make(FilesMergeResults)
- var mu sync.Mutex
+
for i := 0; i < c.Jobs; i++ {
g.Go(func() error {
for frecord := range frecordChan {
- if mr, err := batchFileData(c, frecord.fileName, frecord.records); err != nil {
+ mr, err := batchFileData(c, frecord.fileName, frecord.records)
+ if err != nil {
return fmt.Errorf("failed to batchFileData(%s): %w", frecord.fileName, err)
- } else {
- mu.Lock()
- if _, exist := stat[frecord.fileName]; exist {
- mu.Unlock()
- return fmt.Errorf("file %s was already processed", frecord.fileName)
- }
- stat[frecord.fileName] = mr
- mu.Unlock()
+ }
+ results <- &FileMergeResult{
+ FilePath: frecord.fileName,
+ MergeResult: mr,
}
}
return nil
})
}
- if err := g.Wait(); err != nil {
- return nil, err
- }
- return stat, nil
+ return g.Wait()
}
func groupFileRecords(recordChan <-chan *FileRecord, ctx context.Context) chan FileRecords {
diff --git a/pkg/covermerger/covermerger_test.go b/pkg/covermerger/covermerger_test.go
index b63ae36be..4580c114f 100644
--- a/pkg/covermerger/covermerger_test.go
+++ b/pkg/covermerger/covermerger_test.go
@@ -172,7 +172,14 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4,
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
- aggregation, err := MergeCSVData(
+ ch := make(chan *FileMergeResult)
+ aggregation := make(map[string]*MergeResult)
+ go func() {
+ for fmr := range ch {
+ aggregation[fmr.FilePath] = fmr.MergeResult
+ }
+ }()
+ err := MergeCSVData(
&Config{
Jobs: 2,
skipRepoClone: true,
@@ -183,10 +190,11 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4,
FileVersProvider: &fileVersProviderMock{Workdir: test.workdir},
},
strings.NewReader(test.bqTable),
+ ch,
)
assert.Nil(t, err)
- var expectedAggregation FilesMergeResults
+ var expectedAggregation map[string]*MergeResult
assert.Nil(t, json.Unmarshal([]byte(test.simpleAggregation), &expectedAggregation))
if !test.checkDetails {
ignoreLineDetailsInTest(aggregation)
@@ -196,7 +204,7 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4,
}
}
-func ignoreLineDetailsInTest(results FilesMergeResults) {
+func ignoreLineDetailsInTest(results map[string]*MergeResult) {
for _, mr := range results {
mr.LineDetails = nil
}