From 941924ebb4fc1ba15d5d3f12730ce32d7a6ace5f Mon Sep 17 00:00:00 2001 From: Taras Madan Date: Thu, 5 Dec 2024 10:08:44 +0100 Subject: pkg/covermerger: aggregate data per-manager It enables us to see the manager unique coverage. --- pkg/cover/heatmap.go | 2 +- pkg/coveragedb/spanner.go | 49 +++++++++++++++++++++++++------------ pkg/covermerger/bq_csv_reader.go | 6 ++--- pkg/covermerger/covermerger.go | 12 ++++++--- pkg/covermerger/covermerger_test.go | 2 +- 5 files changed, 48 insertions(+), 23 deletions(-) (limited to 'pkg') diff --git a/pkg/cover/heatmap.go b/pkg/cover/heatmap.go index d4d29fbd9..0d7c93cd5 100644 --- a/pkg/cover/heatmap.go +++ b/pkg/cover/heatmap.go @@ -169,7 +169,7 @@ from merge_history join file_subsystems on merge_history.namespace = file_subsystems.namespace and files.filepath = file_subsystems.filepath where - merge_history.namespace=$1 and dateto=$2 and duration=$3`, + merge_history.namespace=$1 and dateto=$2 and duration=$3 and manager='*'`, Params: map[string]interface{}{ "p1": ns, "p2": timePeriod.DateTo, diff --git a/pkg/coveragedb/spanner.go b/pkg/coveragedb/spanner.go index b0c974fb1..5bbe4cbbb 100644 --- a/pkg/coveragedb/spanner.go +++ b/pkg/coveragedb/spanner.go @@ -26,6 +26,7 @@ type FilesRecord struct { Covered int64 LinesInstrumented []int64 HitCounts []int64 + Manager string // "*" means "collected from all managers" } type FileSubsystems struct { @@ -50,6 +51,11 @@ func NewClient(ctx context.Context, projectID string) (*spanner.Client, error) { return spanner.NewClient(ctx, database) } +type ManagersCoverage map[string]ManagerCoverage + +// ManagerCoverage is a file to coverage mapping. +type ManagerCoverage map[string]*Coverage + type Coverage struct { Instrumented int64 Covered int64 @@ -57,7 +63,16 @@ type Coverage struct { HitCounts []int64 } -func SaveMergeResult(ctx context.Context, projectID string, covMap map[string]*Coverage, +func (c *Coverage) AddLineHitCount(line, hitCount int) { + c.Instrumented++ + c.LinesInstrumented = append(c.LinesInstrumented, int64(line)) + c.HitCounts = append(c.HitCounts, int64(hitCount)) + if hitCount > 0 { + c.Covered++ + } +} + +func SaveMergeResult(ctx context.Context, projectID string, manCovMap ManagersCoverage, template *HistoryRecord, totalRows int64, sss []*subsystem.Subsystem) error { client, err := NewClient(ctx, projectID) if err != nil { @@ -70,19 +85,22 @@ func SaveMergeResult(ctx context.Context, projectID string, covMap map[string]*C session := uuid.New().String() mutations := []*spanner.Mutation{} - for filePath, record := range covMap { - mutations = append(mutations, fileRecordMutation(session, filePath, record)) - subsystems := fileSubsystems(filePath, ssMatcher, ssCache) - mutations = append(mutations, fileSubsystemsMutation(template.Namespace, filePath, subsystems)) - // There is a limit on the number of mutations per transaction (80k) imposed by the DB. - // This includes both explicit mutations of the fields (6 fields * 1k records = 6k mutations) - // and implicit index mutations. - // We keep the number of records low enough for the number of explicit mutations * 10 does not exceed the limit. - if len(mutations) > 1000 { - if _, err = client.Apply(ctx, mutations); err != nil { - return fmt.Errorf("failed to spanner.Apply(inserts): %s", err.Error()) + + for manager, covMap := range manCovMap { + for filePath, record := range covMap { + mutations = append(mutations, fileRecordMutation(session, manager, filePath, record)) + subsystems := fileSubsystems(filePath, ssMatcher, ssCache) + mutations = append(mutations, fileSubsystemsMutation(template.Namespace, filePath, subsystems)) + // There is a limit on the number of mutations per transaction (80k) imposed by the DB. + // This includes both explicit mutations of the fields (6 fields * 1k records = 6k mutations) + // and implicit index mutations. + // We keep the number of records low enough for the number of explicit mutations * 10 does not exceed the limit. + if len(mutations) > 1000 { + if _, err = client.Apply(ctx, mutations); err != nil { + return fmt.Errorf("failed to spanner.Apply(inserts): %s", err.Error()) + } + mutations = nil } - mutations = nil } } mutations = append(mutations, historyMutation(session, template, totalRows)) @@ -107,7 +125,7 @@ from merge_history join files on merge_history.session = files.session where - namespace=$1 and dateto=$2 and duration=$3 and filepath=$4 and commit=$5`, + namespace=$1 and dateto=$2 and duration=$3 and filepath=$4 and commit=$5 and manager='*'`, Params: map[string]interface{}{ "p1": ns, "p2": timePeriod.DateTo, @@ -167,7 +185,7 @@ func historyMutation(session string, template *HistoryRecord, totalRows int64) * return historyInsert } -func fileRecordMutation(session, filePath string, record *Coverage) *spanner.Mutation { +func fileRecordMutation(session, manager, filePath string, record *Coverage) *spanner.Mutation { insert, err := spanner.InsertOrUpdateStruct("files", &FilesRecord{ Session: session, FilePath: filePath, @@ -175,6 +193,7 @@ func fileRecordMutation(session, filePath string, record *Coverage) *spanner.Mut Covered: record.Covered, LinesInstrumented: record.LinesInstrumented, HitCounts: record.HitCounts, + Manager: manager, }) if err != nil { panic(fmt.Sprintf("failed to fileRecordMutation: %v", err)) diff --git a/pkg/covermerger/bq_csv_reader.go b/pkg/covermerger/bq_csv_reader.go index 187da1812..ef9a4e879 100644 --- a/pkg/covermerger/bq_csv_reader.go +++ b/pkg/covermerger/bq_csv_reader.go @@ -64,15 +64,15 @@ func (r *bqCSVReader) InitNsRecords(ctx context.Context, ns, filePath, commit st compression = "GZIP") AS ( SELECT - kernel_repo, kernel_branch, kernel_commit, file_path, sl, SUM(hit_count) as hit_count + kernel_repo, kernel_branch, kernel_commit, file_path, manager, sl, SUM(hit_count) as hit_count FROM syzkaller.syzbot_coverage.`+"`%s`"+` WHERE TIMESTAMP_TRUNC(timestamp, DAY) >= "%s" AND TIMESTAMP_TRUNC(timestamp, DAY) <= "%s" AND version = 1 AND starts_with(file_path, "%s") %s - GROUP BY file_path, kernel_commit, kernel_repo, kernel_branch, sl - ORDER BY file_path + GROUP BY file_path, manager, kernel_commit, kernel_repo, kernel_branch, sl + ORDER BY file_path, manager ); `, gsURI, ns, from.String(), to.String(), filePath, selectCommit)) job, err := q.Run(ctx) diff --git a/pkg/covermerger/covermerger.go b/pkg/covermerger/covermerger.go index 5af6162a3..8fc058ddd 100644 --- a/pkg/covermerger/covermerger.go +++ b/pkg/covermerger/covermerger.go @@ -24,6 +24,7 @@ const ( KeyFilePath = "file_path" KeyStartLine = "sl" KeyHitCount = "hit_count" + KeyManager = "manager" ) type FileRecord struct { @@ -31,6 +32,7 @@ type FileRecord struct { RepoCommit StartLine int HitCount int + Manager string } type RepoCommit struct { @@ -87,6 +89,8 @@ func makeRecord(fields, schema []string) (*FileRecord, error) { record.StartLine, err = readIntField(key, val) case KeyHitCount: record.HitCount, err = readIntField(key, val) + case KeyManager: + record.Manager = val } if err != nil { return nil, err @@ -124,7 +128,9 @@ func isSchema(fields, schema []string) bool { return true } -func MergeCSVData(config *Config, reader io.Reader) (map[string]*MergeResult, error) { +type FilesMergeResults map[string]*MergeResult + +func MergeCSVData(config *Config, reader io.Reader) (FilesMergeResults, error) { var schema []string csvReader := csv.NewReader(reader) if fields, err := csvReader.Read(); err != nil { @@ -172,10 +178,10 @@ type FileRecords struct { records []*FileRecord } -func mergeChanData(c *Config, recordChan <-chan *FileRecord) (map[string]*MergeResult, error) { +func mergeChanData(c *Config, recordChan <-chan *FileRecord) (FilesMergeResults, error) { g, ctx := errgroup.WithContext(context.Background()) frecordChan := groupFileRecords(recordChan, ctx) - stat := make(map[string]*MergeResult) + stat := make(FilesMergeResults) var mu sync.Mutex for i := 0; i < c.Jobs; i++ { g.Go(func() error { diff --git a/pkg/covermerger/covermerger_test.go b/pkg/covermerger/covermerger_test.go index 286f58737..a8b2a3ca8 100644 --- a/pkg/covermerger/covermerger_test.go +++ b/pkg/covermerger/covermerger_test.go @@ -125,7 +125,7 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4, strings.NewReader(test.bqTable), ) assert.Nil(t, err) - var expectedAggregation map[string]*MergeResult + var expectedAggregation FilesMergeResults assert.Nil(t, json.Unmarshal([]byte(test.simpleAggregation), &expectedAggregation)) assert.Equal(t, expectedAggregation, aggregation) }) -- cgit mrf-deployment