aboutsummaryrefslogtreecommitdiffstats
path: root/pkg/coveragedb
diff options
context:
space:
mode:
authorTaras Madan <tarasmadan@google.com>2024-12-18 13:33:01 +0100
committerTaras Madan <tarasmadan@google.com>2024-12-19 14:38:21 +0000
commitfef5bb5152a5013d0c4571cd0601f46824a9664b (patch)
treec8fc887f6bfe48ae314a55f439ca1e7cbea370c7 /pkg/coveragedb
parentf1c188dc6963bf016692df095d783e4945d84f7b (diff)
tools/syz-covermerger: upload coverage as jsonl
Previous implementation store only the summary of processed records. The summary was <1GB and single processing node was able to manipulate the data. Current implementation stores all the details about records read to make post-processing more flexible. This change was needed to get access to the source manager name and will help to analyze other details. This new implementation requires 20GB mem to process single day records. CSV log interning experiment allowed to merge using 10G. Quarter data aggregation will cost ~100 times more. The alternative is to use stream processing. We can process data kernel-file-by-file. It allows to /15000 memory consumption. This approach is implemented here. We're batching coverage signals by file and store per-file results in GCS JSONL file. See https://jsonlines.org/ to learn about jsonl.
Diffstat (limited to 'pkg/coveragedb')
-rw-r--r--pkg/coveragedb/spanner.go73
1 files changed, 41 insertions, 32 deletions
diff --git a/pkg/coveragedb/spanner.go b/pkg/coveragedb/spanner.go
index db6d9ac79..e2a197e0c 100644
--- a/pkg/coveragedb/spanner.go
+++ b/pkg/coveragedb/spanner.go
@@ -4,8 +4,11 @@
package coveragedb
import (
+ "bufio"
"context"
+ "encoding/json"
"fmt"
+ "io"
"os"
"sync/atomic"
"time"
@@ -51,11 +54,6 @@ func NewClient(ctx context.Context, projectID string) (*spanner.Client, error) {
return spanner.NewClient(ctx, database)
}
-type ManagersCoverage map[string]ManagerCoverage
-
-// ManagerCoverage is a file to coverage mapping.
-type ManagerCoverage map[string]*Coverage
-
type Coverage struct {
Instrumented int64
Covered int64
@@ -72,8 +70,14 @@ func (c *Coverage) AddLineHitCount(line, hitCount int) {
}
}
-func SaveMergeResult(ctx context.Context, projectID string, manCovMap ManagersCoverage,
- template *HistoryRecord, totalRows int64, sss []*subsystem.Subsystem) (int, error) {
+type MergedCoverageRecord struct {
+ Manager string
+ FilePath string
+ FileData *Coverage
+}
+
+func SaveMergeResult(ctx context.Context, projectID string, descr *HistoryRecord, jsonl io.Reader,
+ sss []*subsystem.Subsystem) (int, error) {
client, err := NewClient(ctx, projectID)
if err != nil {
return 0, fmt.Errorf("spanner.NewClient() failed: %s", err.Error())
@@ -87,25 +91,30 @@ func SaveMergeResult(ctx context.Context, projectID string, manCovMap ManagersCo
session := uuid.New().String()
mutations := []*spanner.Mutation{}
- for manager, covMap := range manCovMap {
- for filePath, record := range covMap {
- mutations = append(mutations, fileRecordMutation(session, manager, filePath, record))
- subsystems := fileSubsystems(filePath, ssMatcher, ssCache)
- mutations = append(mutations, fileSubsystemsMutation(template.Namespace, filePath, subsystems))
- // There is a limit on the number of mutations per transaction (80k) imposed by the DB.
- // This includes both explicit mutations of the fields (6 fields * 1k records = 6k mutations)
- // and implicit index mutations.
- // We keep the number of records low enough for the number of explicit mutations * 10 does not exceed the limit.
- if len(mutations) > 1000 {
- if _, err = client.Apply(ctx, mutations); err != nil {
- return rowsCreated, fmt.Errorf("failed to spanner.Apply(inserts): %s", err.Error())
- }
- rowsCreated += len(mutations)
- mutations = nil
+ jsonlScanner := bufio.NewScanner(jsonl)
+
+ for jsonlScanner.Scan() {
+ var mcr MergedCoverageRecord
+ if err := json.Unmarshal([]byte(jsonlScanner.Text()), &mcr); err != nil {
+ return rowsCreated, fmt.Errorf("json.Unmarshal(MergedCoverageRecord): %w", err)
+ }
+ mutations = append(mutations, fileRecordMutation(session, &mcr))
+ subsystems := fileSubsystems(mcr.FilePath, ssMatcher, ssCache)
+ mutations = append(mutations, fileSubsystemsMutation(descr.Namespace, mcr.FilePath, subsystems))
+ // There is a limit on the number of mutations per transaction (80k) imposed by the DB.
+ // This includes both explicit mutations of the fields (6 fields * 1k records = 6k mutations)
+ // and implicit index mutations.
+ // We keep the number of records low enough for the number of explicit mutations * 10 does not exceed the limit.
+ if len(mutations) > 1000 {
+ if _, err = client.Apply(ctx, mutations); err != nil {
+ return rowsCreated, fmt.Errorf("failed to spanner.Apply(inserts): %s", err.Error())
}
+ rowsCreated += len(mutations)
+ mutations = nil
}
}
- mutations = append(mutations, historyMutation(session, template, totalRows))
+
+ mutations = append(mutations, historyMutation(session, descr))
if _, err = client.Apply(ctx, mutations); err != nil {
return rowsCreated, fmt.Errorf("failed to spanner.Apply(inserts): %s", err.Error())
}
@@ -170,7 +179,7 @@ func ReadLinesHitCount(ctx context.Context, ns, commit, file string, tp TimePeri
return res, nil
}
-func historyMutation(session string, template *HistoryRecord, totalRows int64) *spanner.Mutation {
+func historyMutation(session string, template *HistoryRecord) *spanner.Mutation {
historyInsert, err := spanner.InsertOrUpdateStruct("merge_history", &HistoryRecord{
Session: session,
Time: time.Now(),
@@ -179,7 +188,7 @@ func historyMutation(session string, template *HistoryRecord, totalRows int64) *
Commit: template.Commit,
Duration: template.Duration,
DateTo: template.DateTo,
- TotalRows: totalRows,
+ TotalRows: template.TotalRows,
})
if err != nil {
panic(fmt.Sprintf("failed to spanner.InsertStruct(): %s", err.Error()))
@@ -187,15 +196,15 @@ func historyMutation(session string, template *HistoryRecord, totalRows int64) *
return historyInsert
}
-func fileRecordMutation(session, manager, filePath string, record *Coverage) *spanner.Mutation {
+func fileRecordMutation(session string, mcr *MergedCoverageRecord) *spanner.Mutation {
insert, err := spanner.InsertOrUpdateStruct("files", &FilesRecord{
Session: session,
- FilePath: filePath,
- Instrumented: record.Instrumented,
- Covered: record.Covered,
- LinesInstrumented: record.LinesInstrumented,
- HitCounts: record.HitCounts,
- Manager: manager,
+ FilePath: mcr.FilePath,
+ Instrumented: mcr.FileData.Instrumented,
+ Covered: mcr.FileData.Covered,
+ LinesInstrumented: mcr.FileData.LinesInstrumented,
+ HitCounts: mcr.FileData.HitCounts,
+ Manager: mcr.Manager,
})
if err != nil {
panic(fmt.Sprintf("failed to fileRecordMutation: %v", err))