aboutsummaryrefslogtreecommitdiffstats
path: root/pkg/coveragedb
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/coveragedb')
-rw-r--r--pkg/coveragedb/spanner.go73
1 files changed, 41 insertions, 32 deletions
diff --git a/pkg/coveragedb/spanner.go b/pkg/coveragedb/spanner.go
index db6d9ac79..e2a197e0c 100644
--- a/pkg/coveragedb/spanner.go
+++ b/pkg/coveragedb/spanner.go
@@ -4,8 +4,11 @@
package coveragedb
import (
+ "bufio"
"context"
+ "encoding/json"
"fmt"
+ "io"
"os"
"sync/atomic"
"time"
@@ -51,11 +54,6 @@ func NewClient(ctx context.Context, projectID string) (*spanner.Client, error) {
return spanner.NewClient(ctx, database)
}
-type ManagersCoverage map[string]ManagerCoverage
-
-// ManagerCoverage is a file to coverage mapping.
-type ManagerCoverage map[string]*Coverage
-
type Coverage struct {
Instrumented int64
Covered int64
@@ -72,8 +70,14 @@ func (c *Coverage) AddLineHitCount(line, hitCount int) {
}
}
-func SaveMergeResult(ctx context.Context, projectID string, manCovMap ManagersCoverage,
- template *HistoryRecord, totalRows int64, sss []*subsystem.Subsystem) (int, error) {
+type MergedCoverageRecord struct {
+ Manager string
+ FilePath string
+ FileData *Coverage
+}
+
+func SaveMergeResult(ctx context.Context, projectID string, descr *HistoryRecord, jsonl io.Reader,
+ sss []*subsystem.Subsystem) (int, error) {
client, err := NewClient(ctx, projectID)
if err != nil {
return 0, fmt.Errorf("spanner.NewClient() failed: %s", err.Error())
@@ -87,25 +91,30 @@ func SaveMergeResult(ctx context.Context, projectID string, manCovMap ManagersCo
session := uuid.New().String()
mutations := []*spanner.Mutation{}
- for manager, covMap := range manCovMap {
- for filePath, record := range covMap {
- mutations = append(mutations, fileRecordMutation(session, manager, filePath, record))
- subsystems := fileSubsystems(filePath, ssMatcher, ssCache)
- mutations = append(mutations, fileSubsystemsMutation(template.Namespace, filePath, subsystems))
- // There is a limit on the number of mutations per transaction (80k) imposed by the DB.
- // This includes both explicit mutations of the fields (6 fields * 1k records = 6k mutations)
- // and implicit index mutations.
- // We keep the number of records low enough for the number of explicit mutations * 10 does not exceed the limit.
- if len(mutations) > 1000 {
- if _, err = client.Apply(ctx, mutations); err != nil {
- return rowsCreated, fmt.Errorf("failed to spanner.Apply(inserts): %s", err.Error())
- }
- rowsCreated += len(mutations)
- mutations = nil
+ jsonlScanner := bufio.NewScanner(jsonl)
+
+ for jsonlScanner.Scan() {
+ var mcr MergedCoverageRecord
+ if err := json.Unmarshal([]byte(jsonlScanner.Text()), &mcr); err != nil {
+ return rowsCreated, fmt.Errorf("json.Unmarshal(MergedCoverageRecord): %w", err)
+ }
+ mutations = append(mutations, fileRecordMutation(session, &mcr))
+ subsystems := fileSubsystems(mcr.FilePath, ssMatcher, ssCache)
+ mutations = append(mutations, fileSubsystemsMutation(descr.Namespace, mcr.FilePath, subsystems))
+ // There is a limit on the number of mutations per transaction (80k) imposed by the DB.
+ // This includes both explicit mutations of the fields (6 fields * 1k records = 6k mutations)
+ // and implicit index mutations.
+ // We keep the number of records low enough for the number of explicit mutations * 10 does not exceed the limit.
+ if len(mutations) > 1000 {
+ if _, err = client.Apply(ctx, mutations); err != nil {
+ return rowsCreated, fmt.Errorf("failed to spanner.Apply(inserts): %s", err.Error())
}
+ rowsCreated += len(mutations)
+ mutations = nil
}
}
- mutations = append(mutations, historyMutation(session, template, totalRows))
+
+ mutations = append(mutations, historyMutation(session, descr))
if _, err = client.Apply(ctx, mutations); err != nil {
return rowsCreated, fmt.Errorf("failed to spanner.Apply(inserts): %s", err.Error())
}
@@ -170,7 +179,7 @@ func ReadLinesHitCount(ctx context.Context, ns, commit, file string, tp TimePeri
return res, nil
}
-func historyMutation(session string, template *HistoryRecord, totalRows int64) *spanner.Mutation {
+func historyMutation(session string, template *HistoryRecord) *spanner.Mutation {
historyInsert, err := spanner.InsertOrUpdateStruct("merge_history", &HistoryRecord{
Session: session,
Time: time.Now(),
@@ -179,7 +188,7 @@ func historyMutation(session string, template *HistoryRecord, totalRows int64) *
Commit: template.Commit,
Duration: template.Duration,
DateTo: template.DateTo,
- TotalRows: totalRows,
+ TotalRows: template.TotalRows,
})
if err != nil {
panic(fmt.Sprintf("failed to spanner.InsertStruct(): %s", err.Error()))
@@ -187,15 +196,15 @@ func historyMutation(session string, template *HistoryRecord, totalRows int64) *
return historyInsert
}
-func fileRecordMutation(session, manager, filePath string, record *Coverage) *spanner.Mutation {
+func fileRecordMutation(session string, mcr *MergedCoverageRecord) *spanner.Mutation {
insert, err := spanner.InsertOrUpdateStruct("files", &FilesRecord{
Session: session,
- FilePath: filePath,
- Instrumented: record.Instrumented,
- Covered: record.Covered,
- LinesInstrumented: record.LinesInstrumented,
- HitCounts: record.HitCounts,
- Manager: manager,
+ FilePath: mcr.FilePath,
+ Instrumented: mcr.FileData.Instrumented,
+ Covered: mcr.FileData.Covered,
+ LinesInstrumented: mcr.FileData.LinesInstrumented,
+ HitCounts: mcr.FileData.HitCounts,
+ Manager: mcr.Manager,
})
if err != nil {
panic(fmt.Sprintf("failed to fileRecordMutation: %v", err))