diff options
Diffstat (limited to 'pkg/coveragedb')
| -rw-r--r-- | pkg/coveragedb/spanner.go | 73 |
1 files changed, 41 insertions, 32 deletions
diff --git a/pkg/coveragedb/spanner.go b/pkg/coveragedb/spanner.go index db6d9ac79..e2a197e0c 100644 --- a/pkg/coveragedb/spanner.go +++ b/pkg/coveragedb/spanner.go @@ -4,8 +4,11 @@ package coveragedb import ( + "bufio" "context" + "encoding/json" "fmt" + "io" "os" "sync/atomic" "time" @@ -51,11 +54,6 @@ func NewClient(ctx context.Context, projectID string) (*spanner.Client, error) { return spanner.NewClient(ctx, database) } -type ManagersCoverage map[string]ManagerCoverage - -// ManagerCoverage is a file to coverage mapping. -type ManagerCoverage map[string]*Coverage - type Coverage struct { Instrumented int64 Covered int64 @@ -72,8 +70,14 @@ func (c *Coverage) AddLineHitCount(line, hitCount int) { } } -func SaveMergeResult(ctx context.Context, projectID string, manCovMap ManagersCoverage, - template *HistoryRecord, totalRows int64, sss []*subsystem.Subsystem) (int, error) { +type MergedCoverageRecord struct { + Manager string + FilePath string + FileData *Coverage +} + +func SaveMergeResult(ctx context.Context, projectID string, descr *HistoryRecord, jsonl io.Reader, + sss []*subsystem.Subsystem) (int, error) { client, err := NewClient(ctx, projectID) if err != nil { return 0, fmt.Errorf("spanner.NewClient() failed: %s", err.Error()) @@ -87,25 +91,30 @@ func SaveMergeResult(ctx context.Context, projectID string, manCovMap ManagersCo session := uuid.New().String() mutations := []*spanner.Mutation{} - for manager, covMap := range manCovMap { - for filePath, record := range covMap { - mutations = append(mutations, fileRecordMutation(session, manager, filePath, record)) - subsystems := fileSubsystems(filePath, ssMatcher, ssCache) - mutations = append(mutations, fileSubsystemsMutation(template.Namespace, filePath, subsystems)) - // There is a limit on the number of mutations per transaction (80k) imposed by the DB. - // This includes both explicit mutations of the fields (6 fields * 1k records = 6k mutations) - // and implicit index mutations. - // We keep the number of records low enough for the number of explicit mutations * 10 does not exceed the limit. - if len(mutations) > 1000 { - if _, err = client.Apply(ctx, mutations); err != nil { - return rowsCreated, fmt.Errorf("failed to spanner.Apply(inserts): %s", err.Error()) - } - rowsCreated += len(mutations) - mutations = nil + jsonlScanner := bufio.NewScanner(jsonl) + + for jsonlScanner.Scan() { + var mcr MergedCoverageRecord + if err := json.Unmarshal([]byte(jsonlScanner.Text()), &mcr); err != nil { + return rowsCreated, fmt.Errorf("json.Unmarshal(MergedCoverageRecord): %w", err) + } + mutations = append(mutations, fileRecordMutation(session, &mcr)) + subsystems := fileSubsystems(mcr.FilePath, ssMatcher, ssCache) + mutations = append(mutations, fileSubsystemsMutation(descr.Namespace, mcr.FilePath, subsystems)) + // There is a limit on the number of mutations per transaction (80k) imposed by the DB. + // This includes both explicit mutations of the fields (6 fields * 1k records = 6k mutations) + // and implicit index mutations. + // We keep the number of records low enough for the number of explicit mutations * 10 does not exceed the limit. + if len(mutations) > 1000 { + if _, err = client.Apply(ctx, mutations); err != nil { + return rowsCreated, fmt.Errorf("failed to spanner.Apply(inserts): %s", err.Error()) } + rowsCreated += len(mutations) + mutations = nil } } - mutations = append(mutations, historyMutation(session, template, totalRows)) + + mutations = append(mutations, historyMutation(session, descr)) if _, err = client.Apply(ctx, mutations); err != nil { return rowsCreated, fmt.Errorf("failed to spanner.Apply(inserts): %s", err.Error()) } @@ -170,7 +179,7 @@ func ReadLinesHitCount(ctx context.Context, ns, commit, file string, tp TimePeri return res, nil } -func historyMutation(session string, template *HistoryRecord, totalRows int64) *spanner.Mutation { +func historyMutation(session string, template *HistoryRecord) *spanner.Mutation { historyInsert, err := spanner.InsertOrUpdateStruct("merge_history", &HistoryRecord{ Session: session, Time: time.Now(), @@ -179,7 +188,7 @@ func historyMutation(session string, template *HistoryRecord, totalRows int64) * Commit: template.Commit, Duration: template.Duration, DateTo: template.DateTo, - TotalRows: totalRows, + TotalRows: template.TotalRows, }) if err != nil { panic(fmt.Sprintf("failed to spanner.InsertStruct(): %s", err.Error())) @@ -187,15 +196,15 @@ func historyMutation(session string, template *HistoryRecord, totalRows int64) * return historyInsert } -func fileRecordMutation(session, manager, filePath string, record *Coverage) *spanner.Mutation { +func fileRecordMutation(session string, mcr *MergedCoverageRecord) *spanner.Mutation { insert, err := spanner.InsertOrUpdateStruct("files", &FilesRecord{ Session: session, - FilePath: filePath, - Instrumented: record.Instrumented, - Covered: record.Covered, - LinesInstrumented: record.LinesInstrumented, - HitCounts: record.HitCounts, - Manager: manager, + FilePath: mcr.FilePath, + Instrumented: mcr.FileData.Instrumented, + Covered: mcr.FileData.Covered, + LinesInstrumented: mcr.FileData.LinesInstrumented, + HitCounts: mcr.FileData.HitCounts, + Manager: mcr.Manager, }) if err != nil { panic(fmt.Sprintf("failed to fileRecordMutation: %v", err)) |
