From 5d3c618ce7bc487350983451f940dbdf2571c428 Mon Sep 17 00:00:00 2001 From: Taras Madan Date: Wed, 18 Dec 2024 15:28:18 +0100 Subject: pkg/coveragedb: test SaveMergeResult 1. Make interface testable. 2. Add Spanner interfaces. 3. Generate mocks for proxy interfaces. 4. Test SaveMergeResult. 5. Test MergeCSVWriteJSONL and coveragedb.SaveMergeResult integration. --- pkg/cover/heatmap.go | 3 +- pkg/coveragedb/coveragedb.go | 350 ++++++++++++++++++++++++ pkg/coveragedb/coveragedb_mock_test.go | 121 +++++++++ pkg/coveragedb/mocks/SpannerClient.go | 93 +++++++ pkg/coveragedb/spanner.go | 354 ------------------------- pkg/coveragedb/spannerclient/spanner_client.go | 79 ++++++ pkg/covermerger/covermerger.go | 92 +++++++ pkg/covermerger/covermerger_test.go | 121 +++++++-- 8 files changed, 829 insertions(+), 384 deletions(-) create mode 100644 pkg/coveragedb/coveragedb.go create mode 100644 pkg/coveragedb/coveragedb_mock_test.go create mode 100644 pkg/coveragedb/mocks/SpannerClient.go delete mode 100644 pkg/coveragedb/spanner.go create mode 100644 pkg/coveragedb/spannerclient/spanner_client.go (limited to 'pkg') diff --git a/pkg/cover/heatmap.go b/pkg/cover/heatmap.go index 0d7c93cd5..ef4f7ffab 100644 --- a/pkg/cover/heatmap.go +++ b/pkg/cover/heatmap.go @@ -14,6 +14,7 @@ import ( "cloud.google.com/go/spanner" "github.com/google/syzkaller/pkg/coveragedb" + "github.com/google/syzkaller/pkg/coveragedb/spannerclient" _ "github.com/google/syzkaller/pkg/subsystem/lists" "golang.org/x/exp/maps" "google.golang.org/api/iterator" @@ -185,7 +186,7 @@ where func filesCoverageWithDetails(ctx context.Context, projectID, ns, subsystem string, timePeriods []coveragedb.TimePeriod, ) ([]*fileCoverageWithDetails, error) { - client, err := coveragedb.NewClient(ctx, projectID) + client, err := spannerclient.NewClient(ctx, projectID) if err != nil { return nil, fmt.Errorf("spanner.NewClient() failed: %s", err.Error()) } diff --git a/pkg/coveragedb/coveragedb.go b/pkg/coveragedb/coveragedb.go new file mode 100644 index 000000000..0a6175ad5 --- /dev/null +++ b/pkg/coveragedb/coveragedb.go @@ -0,0 +1,350 @@ +// Copyright 2024 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package coveragedb + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "os" + "sync/atomic" + "time" + + "cloud.google.com/go/civil" + "cloud.google.com/go/spanner" + "github.com/google/syzkaller/pkg/coveragedb/spannerclient" + "github.com/google/syzkaller/pkg/subsystem" + _ "github.com/google/syzkaller/pkg/subsystem/lists" + "github.com/google/uuid" + "golang.org/x/sync/errgroup" + "google.golang.org/api/iterator" +) + +type FilesRecord struct { + Session string + FilePath string + Instrumented int64 + Covered int64 + LinesInstrumented []int64 + HitCounts []int64 + Manager string // "*" means "collected from all managers" +} + +type FileSubsystems struct { + Namespace string + FilePath string + Subsystems []string +} + +type HistoryRecord struct { + Session string + Time time.Time + Namespace string + Repo string + Commit string + Duration int64 + DateTo civil.Date + TotalRows int64 +} + +type Coverage struct { + Instrumented int64 + Covered int64 + LinesInstrumented []int64 + HitCounts []int64 +} + +func (c *Coverage) AddLineHitCount(line, hitCount int) { + c.Instrumented++ + c.LinesInstrumented = append(c.LinesInstrumented, int64(line)) + c.HitCounts = append(c.HitCounts, int64(hitCount)) + if hitCount > 0 { + c.Covered++ + } +} + +type MergedCoverageRecord struct { + Manager string + FilePath string + FileData *Coverage +} + +func SaveMergeResult(ctx context.Context, client spannerclient.SpannerClient, descr *HistoryRecord, dec *json.Decoder, + sss []*subsystem.Subsystem) (int, error) { + var rowsCreated int + ssMatcher := subsystem.MakePathMatcher(sss) + ssCache := make(map[string][]string) + + session := uuid.New().String() + mutations := []*spanner.Mutation{} + + var mcr MergedCoverageRecord + for { + err := dec.Decode(&mcr) + if err == io.EOF { + break + } + if err != nil { + return rowsCreated, fmt.Errorf("dec.Decode(MergedCoverageRecord): %w", err) + } + if mcr.FileData == nil { + return rowsCreated, errors.New("field MergedCoverageRecord.FileData can't be nil") + } + mutations = append(mutations, fileRecordMutation(session, &mcr)) + subsystems := fileSubsystems(mcr.FilePath, ssMatcher, ssCache) + mutations = append(mutations, fileSubsystemsMutation(descr.Namespace, mcr.FilePath, subsystems)) + // There is a limit on the number of mutations per transaction (80k) imposed by the DB. + // This includes both explicit mutations of the fields (6 fields * 1k records = 6k mutations) + // and implicit index mutations. + // We keep the number of records low enough for the number of explicit mutations * 10 does not exceed the limit. + if len(mutations) >= 1000 { + if _, err := client.Apply(ctx, mutations); err != nil { + return rowsCreated, fmt.Errorf("failed to spanner.Apply(inserts): %s", err.Error()) + } + rowsCreated += len(mutations) + mutations = nil + } + } + + mutations = append(mutations, historyMutation(session, descr)) + if _, err := client.Apply(ctx, mutations); err != nil { + return rowsCreated, fmt.Errorf("failed to spanner.Apply(inserts): %s", err.Error()) + } + rowsCreated += len(mutations) + return rowsCreated, nil +} + +type linesCoverage struct { + LinesInstrumented []int64 + HitCounts []int64 +} + +func linesCoverageStmt(ns, filepath, commit string, timePeriod TimePeriod) spanner.Statement { + return spanner.Statement{ + SQL: ` +select + linesinstrumented, + hitcounts +from merge_history + join files + on merge_history.session = files.session +where + namespace=$1 and dateto=$2 and duration=$3 and filepath=$4 and commit=$5 and manager='*'`, + Params: map[string]interface{}{ + "p1": ns, + "p2": timePeriod.DateTo, + "p3": timePeriod.Days, + "p4": filepath, + "p5": commit, + }, + } +} + +func ReadLinesHitCount(ctx context.Context, ns, commit, file string, tp TimePeriod, +) (map[int]int, error) { + projectID := os.Getenv("GOOGLE_CLOUD_PROJECT") + client, err := spannerclient.NewClient(ctx, projectID) + if err != nil { + return nil, fmt.Errorf("spanner.NewClient: %w", err) + } + defer client.Close() + + stmt := linesCoverageStmt(ns, file, commit, tp) + iter := client.Single().Query(ctx, stmt) + defer iter.Stop() + + row, err := iter.Next() + if err == iterator.Done { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("iter.Next: %w", err) + } + var r linesCoverage + if err = row.ToStruct(&r); err != nil { + return nil, fmt.Errorf("failed to row.ToStruct() spanner DB: %w", err) + } + + res := map[int]int{} + for i, instrLine := range r.LinesInstrumented { + res[int(instrLine)] = int(r.HitCounts[i]) + } + return res, nil +} + +func historyMutation(session string, template *HistoryRecord) *spanner.Mutation { + historyInsert, err := spanner.InsertOrUpdateStruct("merge_history", &HistoryRecord{ + Session: session, + Time: time.Now(), + Namespace: template.Namespace, + Repo: template.Repo, + Commit: template.Commit, + Duration: template.Duration, + DateTo: template.DateTo, + TotalRows: template.TotalRows, + }) + if err != nil { + panic(fmt.Sprintf("failed to spanner.InsertStruct(): %s", err.Error())) + } + return historyInsert +} + +func fileRecordMutation(session string, mcr *MergedCoverageRecord) *spanner.Mutation { + insert, err := spanner.InsertOrUpdateStruct("files", &FilesRecord{ + Session: session, + FilePath: mcr.FilePath, + Instrumented: mcr.FileData.Instrumented, + Covered: mcr.FileData.Covered, + LinesInstrumented: mcr.FileData.LinesInstrumented, + HitCounts: mcr.FileData.HitCounts, + Manager: mcr.Manager, + }) + if err != nil { + panic(fmt.Sprintf("failed to fileRecordMutation: %v", err)) + } + return insert +} + +func fileSubsystemsMutation(ns, filePath string, subsystems []string) *spanner.Mutation { + insert, err := spanner.InsertOrUpdateStruct("file_subsystems", &FileSubsystems{ + Namespace: ns, + FilePath: filePath, + Subsystems: subsystems, + }) + if err != nil { + panic(fmt.Sprintf("failed to fileSubsystemsMutation(): %s", err.Error())) + } + return insert +} + +func fileSubsystems(filePath string, ssMatcher *subsystem.PathMatcher, ssCache map[string][]string) []string { + sss, cached := ssCache[filePath] + if !cached { + for _, match := range ssMatcher.Match(filePath) { + sss = append(sss, match.Name) + } + ssCache[filePath] = sss + } + return sss +} + +func NsDataMerged(ctx context.Context, projectID, ns string) ([]TimePeriod, []int64, error) { + client, err := spannerclient.NewClient(ctx, projectID) + if err != nil { + return nil, nil, fmt.Errorf("spanner.NewClient() failed: %s", err.Error()) + } + defer client.Close() + + stmt := spanner.Statement{ + SQL: ` + select + dateto, + duration as days, + totalrows + from merge_history + where + namespace=$1`, + Params: map[string]interface{}{ + "p1": ns, + }, + } + iter := client.Single().Query(ctx, stmt) + defer iter.Stop() + var periods []TimePeriod + var totalRows []int64 + for { + row, err := iter.Next() + if err == iterator.Done { + break + } + if err != nil { + return nil, nil, fmt.Errorf("failed to iter.Next() spanner DB: %w", err) + } + var r struct { + Days int64 + DateTo civil.Date + TotalRows int64 + } + if err = row.ToStruct(&r); err != nil { + return nil, nil, fmt.Errorf("failed to row.ToStruct() spanner DB: %w", err) + } + periods = append(periods, TimePeriod{DateTo: r.DateTo, Days: int(r.Days)}) + totalRows = append(totalRows, r.TotalRows) + } + return periods, totalRows, nil +} + +// DeleteGarbage removes orphaned file entries from the database. +// +// It identifies files in the "files" table that are not referenced by any entries in the "merge_history" table, +// indicating they are no longer associated with an active merge session. +// +// To avoid exceeding Spanner transaction limits, orphaned files are deleted in batches of 10,000. +// Note that in case of an error during batch deletion, some files may be deleted but not counted in the total. +// +// Returns the number of orphaned file entries successfully deleted. +func DeleteGarbage(ctx context.Context) (int64, error) { + batchSize := 10_000 + client, err := spannerclient.NewClient(ctx, os.Getenv("GOOGLE_CLOUD_PROJECT")) + if err != nil { + return 0, fmt.Errorf("coveragedb.NewClient: %w", err) + } + defer client.Close() + + iter := client.Single().Query(ctx, spanner.Statement{ + SQL: `SELECT session, filepath + FROM files + WHERE NOT EXISTS ( + SELECT 1 + FROM merge_history + WHERE merge_history.session = files.session + )`}) + defer iter.Stop() + + var totalDeleted atomic.Int64 + eg, _ := errgroup.WithContext(ctx) + var batch []spanner.Key + for { + row, err := iter.Next() + if err == iterator.Done { + break + } + if err != nil { + return 0, fmt.Errorf("iter.Next: %w", err) + } + var r struct { + Session string + Filepath string + } + if err = row.ToStruct(&r); err != nil { + return 0, fmt.Errorf("row.ToStruct: %w", err) + } + batch = append(batch, spanner.Key{r.Session, r.Filepath}) + if len(batch) > batchSize { + goSpannerDelete(ctx, batch, eg, client, &totalDeleted) + batch = nil + } + } + goSpannerDelete(ctx, batch, eg, client, &totalDeleted) + if err = eg.Wait(); err != nil { + return 0, fmt.Errorf("spanner.Delete: %w", err) + } + return totalDeleted.Load(), nil +} + +func goSpannerDelete(ctx context.Context, batch []spanner.Key, eg *errgroup.Group, client spannerclient.SpannerClient, + totalDeleted *atomic.Int64) { + ks := spanner.KeySetFromKeys(batch...) + ksSize := len(batch) + eg.Go(func() error { + mutation := spanner.Delete("files", ks) + _, err := client.Apply(ctx, []*spanner.Mutation{mutation}) + if err == nil { + totalDeleted.Add(int64(ksSize)) + } + return err + }) +} diff --git a/pkg/coveragedb/coveragedb_mock_test.go b/pkg/coveragedb/coveragedb_mock_test.go new file mode 100644 index 000000000..8f20a43ed --- /dev/null +++ b/pkg/coveragedb/coveragedb_mock_test.go @@ -0,0 +1,121 @@ +// Copyright 2024 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package coveragedb + +import ( + "context" + "encoding/json" + "io" + "strings" + "testing" + "time" + + "cloud.google.com/go/spanner" + "github.com/google/syzkaller/pkg/coveragedb/mocks" + "github.com/google/syzkaller/pkg/subsystem" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +//go:generate ../../tools/mockery.sh --name SpannerClient -r + +type spannerMockTune func(*testing.T, *mocks.SpannerClient) + +func TestSaveMergeResult(t *testing.T) { + tests := []struct { + name string + sss []*subsystem.Subsystem + jsonl io.Reader + descr *HistoryRecord + mockTune spannerMockTune + wantErr bool + wantRows int + }{ + { + name: "empty jsonl", + jsonl: strings.NewReader(`{}`), + wantErr: true, + }, + { + name: "wrong jsonl content", + jsonl: strings.NewReader(`{a}`), + wantErr: true, + }, + { + name: "1 record, Ok", + jsonl: strings.NewReader(`{"FileData":{}}`), + descr: &HistoryRecord{}, + wantRows: 3, // 1 in files, 1 in file_subsystems and 1 in merge_history + mockTune: func(t *testing.T, m *mocks.SpannerClient) { + m. + On("Apply", mock.Anything, mock.Anything). + Return(time.Now(), nil). + Once() + }, + }, + { + name: "2 records, Ok", + jsonl: strings.NewReader(` {"FileData":{}} + {"FileData":{}}`), + descr: &HistoryRecord{}, + wantRows: 5, + mockTune: func(t *testing.T, m *mocks.SpannerClient) { + m. + On("Apply", + mock.Anything, + mock.MatchedBy(func(ms []*spanner.Mutation) bool { + // 2 in files, 2 in file_subsystems and 1 in merge_history + return len(ms) == 5 + })). + Return(time.Now(), nil). + Once() + }, + }, + { + name: "2k records, Ok", + jsonl: strings.NewReader(strings.Repeat("{\"FileData\":{}}\n", 2000)), + descr: &HistoryRecord{}, + wantRows: 4001, + mockTune: func(t *testing.T, m *mocks.SpannerClient) { + m. + On("Apply", + mock.Anything, + mock.MatchedBy(func(ms []*spanner.Mutation) bool { + // 2k in files, 2k in file_subsystems + return len(ms) == 1000 + })). + Return(time.Now(), nil). + Times(4). + On("Apply", + mock.Anything, + mock.MatchedBy(func(ms []*spanner.Mutation) bool { + // And 1 in merge_history. + return len(ms) == 1 + })). + Return(time.Now(), nil). + Once() + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + t.Parallel() + spannerMock := mocks.NewSpannerClient(t) + if test.mockTune != nil { + test.mockTune(t, spannerMock) + } + gotRows, err := SaveMergeResult( + context.Background(), + spannerMock, + test.descr, + json.NewDecoder(test.jsonl), test.sss) + if test.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + assert.Equal(t, test.wantRows, gotRows) + }) + } +} diff --git a/pkg/coveragedb/mocks/SpannerClient.go b/pkg/coveragedb/mocks/SpannerClient.go new file mode 100644 index 000000000..cb4fa1be3 --- /dev/null +++ b/pkg/coveragedb/mocks/SpannerClient.go @@ -0,0 +1,93 @@ +// Code generated by mockery v2.45.1. DO NOT EDIT. + +package mocks + +import ( + context "context" + + spanner "cloud.google.com/go/spanner" + mock "github.com/stretchr/testify/mock" + + spannerclient "github.com/google/syzkaller/pkg/coveragedb/spannerclient" + + time "time" +) + +// SpannerClient is an autogenerated mock type for the SpannerClient type +type SpannerClient struct { + mock.Mock +} + +// Apply provides a mock function with given fields: ctx, ms, opts +func (_m *SpannerClient) Apply(ctx context.Context, ms []*spanner.Mutation, opts ...spanner.ApplyOption) (time.Time, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, ms) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for Apply") + } + + var r0 time.Time + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, []*spanner.Mutation, ...spanner.ApplyOption) (time.Time, error)); ok { + return rf(ctx, ms, opts...) + } + if rf, ok := ret.Get(0).(func(context.Context, []*spanner.Mutation, ...spanner.ApplyOption) time.Time); ok { + r0 = rf(ctx, ms, opts...) + } else { + r0 = ret.Get(0).(time.Time) + } + + if rf, ok := ret.Get(1).(func(context.Context, []*spanner.Mutation, ...spanner.ApplyOption) error); ok { + r1 = rf(ctx, ms, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Close provides a mock function with given fields: +func (_m *SpannerClient) Close() { + _m.Called() +} + +// Single provides a mock function with given fields: +func (_m *SpannerClient) Single() spannerclient.ReadOnlyTransaction { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Single") + } + + var r0 spannerclient.ReadOnlyTransaction + if rf, ok := ret.Get(0).(func() spannerclient.ReadOnlyTransaction); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(spannerclient.ReadOnlyTransaction) + } + } + + return r0 +} + +// NewSpannerClient creates a new instance of SpannerClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewSpannerClient(t interface { + mock.TestingT + Cleanup(func()) +}) *SpannerClient { + mock := &SpannerClient{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/coveragedb/spanner.go b/pkg/coveragedb/spanner.go deleted file mode 100644 index e2a197e0c..000000000 --- a/pkg/coveragedb/spanner.go +++ /dev/null @@ -1,354 +0,0 @@ -// Copyright 2024 syzkaller project authors. All rights reserved. -// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. - -package coveragedb - -import ( - "bufio" - "context" - "encoding/json" - "fmt" - "io" - "os" - "sync/atomic" - "time" - - "cloud.google.com/go/civil" - "cloud.google.com/go/spanner" - "github.com/google/syzkaller/pkg/subsystem" - _ "github.com/google/syzkaller/pkg/subsystem/lists" - "github.com/google/uuid" - "golang.org/x/sync/errgroup" - "google.golang.org/api/iterator" -) - -type FilesRecord struct { - Session string - FilePath string - Instrumented int64 - Covered int64 - LinesInstrumented []int64 - HitCounts []int64 - Manager string // "*" means "collected from all managers" -} - -type FileSubsystems struct { - Namespace string - FilePath string - Subsystems []string -} - -type HistoryRecord struct { - Session string - Time time.Time - Namespace string - Repo string - Commit string - Duration int64 - DateTo civil.Date - TotalRows int64 -} - -func NewClient(ctx context.Context, projectID string) (*spanner.Client, error) { - database := "projects/" + projectID + "/instances/syzbot/databases/coverage" - return spanner.NewClient(ctx, database) -} - -type Coverage struct { - Instrumented int64 - Covered int64 - LinesInstrumented []int64 - HitCounts []int64 -} - -func (c *Coverage) AddLineHitCount(line, hitCount int) { - c.Instrumented++ - c.LinesInstrumented = append(c.LinesInstrumented, int64(line)) - c.HitCounts = append(c.HitCounts, int64(hitCount)) - if hitCount > 0 { - c.Covered++ - } -} - -type MergedCoverageRecord struct { - Manager string - FilePath string - FileData *Coverage -} - -func SaveMergeResult(ctx context.Context, projectID string, descr *HistoryRecord, jsonl io.Reader, - sss []*subsystem.Subsystem) (int, error) { - client, err := NewClient(ctx, projectID) - if err != nil { - return 0, fmt.Errorf("spanner.NewClient() failed: %s", err.Error()) - } - defer client.Close() - var rowsCreated int - - ssMatcher := subsystem.MakePathMatcher(sss) - ssCache := make(map[string][]string) - - session := uuid.New().String() - mutations := []*spanner.Mutation{} - - jsonlScanner := bufio.NewScanner(jsonl) - - for jsonlScanner.Scan() { - var mcr MergedCoverageRecord - if err := json.Unmarshal([]byte(jsonlScanner.Text()), &mcr); err != nil { - return rowsCreated, fmt.Errorf("json.Unmarshal(MergedCoverageRecord): %w", err) - } - mutations = append(mutations, fileRecordMutation(session, &mcr)) - subsystems := fileSubsystems(mcr.FilePath, ssMatcher, ssCache) - mutations = append(mutations, fileSubsystemsMutation(descr.Namespace, mcr.FilePath, subsystems)) - // There is a limit on the number of mutations per transaction (80k) imposed by the DB. - // This includes both explicit mutations of the fields (6 fields * 1k records = 6k mutations) - // and implicit index mutations. - // We keep the number of records low enough for the number of explicit mutations * 10 does not exceed the limit. - if len(mutations) > 1000 { - if _, err = client.Apply(ctx, mutations); err != nil { - return rowsCreated, fmt.Errorf("failed to spanner.Apply(inserts): %s", err.Error()) - } - rowsCreated += len(mutations) - mutations = nil - } - } - - mutations = append(mutations, historyMutation(session, descr)) - if _, err = client.Apply(ctx, mutations); err != nil { - return rowsCreated, fmt.Errorf("failed to spanner.Apply(inserts): %s", err.Error()) - } - return rowsCreated, nil -} - -type linesCoverage struct { - LinesInstrumented []int64 - HitCounts []int64 -} - -func linesCoverageStmt(ns, filepath, commit string, timePeriod TimePeriod) spanner.Statement { - return spanner.Statement{ - SQL: ` -select - linesinstrumented, - hitcounts -from merge_history - join files - on merge_history.session = files.session -where - namespace=$1 and dateto=$2 and duration=$3 and filepath=$4 and commit=$5 and manager='*'`, - Params: map[string]interface{}{ - "p1": ns, - "p2": timePeriod.DateTo, - "p3": timePeriod.Days, - "p4": filepath, - "p5": commit, - }, - } -} - -func ReadLinesHitCount(ctx context.Context, ns, commit, file string, tp TimePeriod, -) (map[int]int, error) { - projectID := os.Getenv("GOOGLE_CLOUD_PROJECT") - client, err := NewClient(ctx, projectID) - if err != nil { - return nil, fmt.Errorf("spanner.NewClient: %w", err) - } - defer client.Close() - - stmt := linesCoverageStmt(ns, file, commit, tp) - iter := client.Single().Query(ctx, stmt) - defer iter.Stop() - - row, err := iter.Next() - if err == iterator.Done { - return nil, nil - } - if err != nil { - return nil, fmt.Errorf("iter.Next: %w", err) - } - var r linesCoverage - if err = row.ToStruct(&r); err != nil { - return nil, fmt.Errorf("failed to row.ToStruct() spanner DB: %w", err) - } - - res := map[int]int{} - for i, instrLine := range r.LinesInstrumented { - res[int(instrLine)] = int(r.HitCounts[i]) - } - return res, nil -} - -func historyMutation(session string, template *HistoryRecord) *spanner.Mutation { - historyInsert, err := spanner.InsertOrUpdateStruct("merge_history", &HistoryRecord{ - Session: session, - Time: time.Now(), - Namespace: template.Namespace, - Repo: template.Repo, - Commit: template.Commit, - Duration: template.Duration, - DateTo: template.DateTo, - TotalRows: template.TotalRows, - }) - if err != nil { - panic(fmt.Sprintf("failed to spanner.InsertStruct(): %s", err.Error())) - } - return historyInsert -} - -func fileRecordMutation(session string, mcr *MergedCoverageRecord) *spanner.Mutation { - insert, err := spanner.InsertOrUpdateStruct("files", &FilesRecord{ - Session: session, - FilePath: mcr.FilePath, - Instrumented: mcr.FileData.Instrumented, - Covered: mcr.FileData.Covered, - LinesInstrumented: mcr.FileData.LinesInstrumented, - HitCounts: mcr.FileData.HitCounts, - Manager: mcr.Manager, - }) - if err != nil { - panic(fmt.Sprintf("failed to fileRecordMutation: %v", err)) - } - return insert -} - -func fileSubsystemsMutation(ns, filePath string, subsystems []string) *spanner.Mutation { - insert, err := spanner.InsertOrUpdateStruct("file_subsystems", &FileSubsystems{ - Namespace: ns, - FilePath: filePath, - Subsystems: subsystems, - }) - if err != nil { - panic(fmt.Sprintf("failed to fileSubsystemsMutation(): %s", err.Error())) - } - return insert -} - -func fileSubsystems(filePath string, ssMatcher *subsystem.PathMatcher, ssCache map[string][]string) []string { - sss, cached := ssCache[filePath] - if !cached { - for _, match := range ssMatcher.Match(filePath) { - sss = append(sss, match.Name) - } - ssCache[filePath] = sss - } - return sss -} - -func NsDataMerged(ctx context.Context, projectID, ns string) ([]TimePeriod, []int64, error) { - client, err := NewClient(ctx, projectID) - if err != nil { - return nil, nil, fmt.Errorf("spanner.NewClient() failed: %s", err.Error()) - } - defer client.Close() - - stmt := spanner.Statement{ - SQL: ` - select - dateto, - duration as days, - totalrows - from merge_history - where - namespace=$1`, - Params: map[string]interface{}{ - "p1": ns, - }, - } - iter := client.Single().Query(ctx, stmt) - defer iter.Stop() - var periods []TimePeriod - var totalRows []int64 - for { - row, err := iter.Next() - if err == iterator.Done { - break - } - if err != nil { - return nil, nil, fmt.Errorf("failed to iter.Next() spanner DB: %w", err) - } - var r struct { - Days int64 - DateTo civil.Date - TotalRows int64 - } - if err = row.ToStruct(&r); err != nil { - return nil, nil, fmt.Errorf("failed to row.ToStruct() spanner DB: %w", err) - } - periods = append(periods, TimePeriod{DateTo: r.DateTo, Days: int(r.Days)}) - totalRows = append(totalRows, r.TotalRows) - } - return periods, totalRows, nil -} - -// DeleteGarbage removes orphaned file entries from the database. -// -// It identifies files in the "files" table that are not referenced by any entries in the "merge_history" table, -// indicating they are no longer associated with an active merge session. -// -// To avoid exceeding Spanner transaction limits, orphaned files are deleted in batches of 10,000. -// Note that in case of an error during batch deletion, some files may be deleted but not counted in the total. -// -// Returns the number of orphaned file entries successfully deleted. -func DeleteGarbage(ctx context.Context) (int64, error) { - batchSize := 10_000 - client, err := NewClient(ctx, os.Getenv("GOOGLE_CLOUD_PROJECT")) - if err != nil { - return 0, fmt.Errorf("coveragedb.NewClient: %w", err) - } - defer client.Close() - - iter := client.Single().Query(ctx, spanner.Statement{ - SQL: `SELECT session, filepath - FROM files - WHERE NOT EXISTS ( - SELECT 1 - FROM merge_history - WHERE merge_history.session = files.session - )`}) - defer iter.Stop() - - var totalDeleted atomic.Int64 - eg, _ := errgroup.WithContext(ctx) - var batch []spanner.Key - for { - row, err := iter.Next() - if err == iterator.Done { - break - } - if err != nil { - return 0, fmt.Errorf("iter.Next: %w", err) - } - var r struct { - Session string - Filepath string - } - if err = row.ToStruct(&r); err != nil { - return 0, fmt.Errorf("row.ToStruct: %w", err) - } - batch = append(batch, spanner.Key{r.Session, r.Filepath}) - if len(batch) > batchSize { - goSpannerDelete(ctx, batch, eg, client, &totalDeleted) - batch = nil - } - } - goSpannerDelete(ctx, batch, eg, client, &totalDeleted) - if err = eg.Wait(); err != nil { - return 0, fmt.Errorf("spanner.Delete: %w", err) - } - return totalDeleted.Load(), nil -} - -func goSpannerDelete(ctx context.Context, batch []spanner.Key, eg *errgroup.Group, client *spanner.Client, - totalDeleted *atomic.Int64) { - ks := spanner.KeySetFromKeys(batch...) - ksSize := len(batch) - eg.Go(func() error { - mutation := spanner.Delete("files", ks) - _, err := client.Apply(ctx, []*spanner.Mutation{mutation}) - if err == nil { - totalDeleted.Add(int64(ksSize)) - } - return err - }) -} diff --git a/pkg/coveragedb/spannerclient/spanner_client.go b/pkg/coveragedb/spannerclient/spanner_client.go new file mode 100644 index 000000000..4f655c6ed --- /dev/null +++ b/pkg/coveragedb/spannerclient/spanner_client.go @@ -0,0 +1,79 @@ +// Copyright 2024 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package spannerclient + +import ( + "context" + "time" + + "cloud.google.com/go/spanner" +) + +type SpannerClient interface { + Close() + Apply(ctx context.Context, ms []*spanner.Mutation, opts ...spanner.ApplyOption) (commitTimestamp time.Time, err error) + Single() ReadOnlyTransaction +} + +type ReadOnlyTransaction interface { + Query(ctx context.Context, statement spanner.Statement) RowIterator +} + +type RowIterator interface { + Next() (Row, error) + Stop() +} + +type Row interface { + ToStruct(p interface{}) error +} + +type SpannerClientProxy struct { + client *spanner.Client +} + +func (proxy *SpannerClientProxy) Close() { + proxy.client.Close() +} + +func (proxy *SpannerClientProxy) Apply(ctx context.Context, ms []*spanner.Mutation, opts ...spanner.ApplyOption, +) (commitTimestamp time.Time, err error) { + return proxy.client.Apply(ctx, ms, opts...) +} + +func (proxy *SpannerClientProxy) Single() ReadOnlyTransaction { + return &SpannerReadOnlyTransactionProxy{ + readOnlyTransaction: proxy.client.Single(), + } +} + +type SpannerReadOnlyTransactionProxy struct { + readOnlyTransaction *spanner.ReadOnlyTransaction +} + +func (proxy *SpannerReadOnlyTransactionProxy) Query(ctx context.Context, statement spanner.Statement) RowIterator { + return &SpannerRowIteratorProxy{ + rowIterator: proxy.readOnlyTransaction.Query(ctx, statement), + } +} + +type SpannerRowIteratorProxy struct { + rowIterator *spanner.RowIterator +} + +func (proxy *SpannerRowIteratorProxy) Next() (Row, error) { + return proxy.rowIterator.Next() +} + +func (proxy *SpannerRowIteratorProxy) Stop() { + proxy.rowIterator.Stop() +} + +func NewClient(ctx context.Context, projectID string) (SpannerClient, error) { + database := "projects/" + projectID + "/instances/syzbot/databases/coverage" + client, err := spanner.NewClient(ctx, database) + return &SpannerClientProxy{ + client: client, + }, err +} diff --git a/pkg/covermerger/covermerger.go b/pkg/covermerger/covermerger.go index 0f6ff43e4..839a85a8e 100644 --- a/pkg/covermerger/covermerger.go +++ b/pkg/covermerger/covermerger.go @@ -4,13 +4,17 @@ package covermerger import ( + "compress/gzip" "context" "encoding/csv" + "encoding/json" "errors" "fmt" "io" + "slices" "strconv" + "github.com/google/syzkaller/pkg/coveragedb" "github.com/google/syzkaller/pkg/log" "golang.org/x/exp/maps" "golang.org/x/sync/errgroup" @@ -50,6 +54,94 @@ type FileCoverageMerger interface { Result() *MergeResult } +// MergeCSVWriteJSONL mergers input CSV and generates JSONL records. +// The amount of lines generated is [count(managers)+1] * [count(kernel_files)]. +// Returns (totalInstrumentedLines, totalCoveredLines, error). +func MergeCSVWriteJSONL(config *Config, descr *coveragedb.HistoryRecord, csvReader io.Reader, w io.Writer, +) (int, int, error) { + eg := errgroup.Group{} + mergeResults := make(chan *FileMergeResult) + eg.Go(func() error { + defer close(mergeResults) + if err := MergeCSVData(config, csvReader, mergeResults); err != nil { + return fmt.Errorf("covermerger.MergeCSVData: %w", err) + } + return nil + }) + var totalInstrumentedLines, totalCoveredLines int + eg.Go(func() error { + var encoder *json.Encoder + if w != nil { + gzw := gzip.NewWriter(w) + defer gzw.Close() + encoder = json.NewEncoder(gzw) + } + if encoder != nil { + if err := encoder.Encode(descr); err != nil { + return fmt.Errorf("encoder.Encode(MergedCoverageDescription): %w", err) + } + } + for fileMergeResult := range mergeResults { + dashCoverageRecords := mergedCoverageRecords(fileMergeResult) + if encoder != nil { + for _, record := range dashCoverageRecords { + if err := encoder.Encode(record); err != nil { + return fmt.Errorf("encoder.Encode(MergedCoverageRecord): %w", err) + } + } + } + for _, hitCount := range fileMergeResult.HitCounts { + totalInstrumentedLines++ + if hitCount > 0 { + totalCoveredLines++ + } + } + } + return nil + }) + if err := eg.Wait(); err != nil { + return 0, 0, fmt.Errorf("eg.Wait: %w", err) + } + return totalInstrumentedLines, totalCoveredLines, nil +} + +const allManagers = "*" + +func mergedCoverageRecords(fmr *FileMergeResult) []*coveragedb.MergedCoverageRecord { + if !fmr.FileExists { + return nil + } + lines := maps.Keys(fmr.HitCounts) + slices.Sort(lines) + mgrStat := make(map[string]*coveragedb.Coverage) + mgrStat[allManagers] = &coveragedb.Coverage{} + + for _, line := range lines { + mgrStat[allManagers].AddLineHitCount(line, fmr.HitCounts[line]) + managerHitCounts := map[string]int{} + for _, lineDetail := range fmr.LineDetails[line] { + manager := lineDetail.Manager + managerHitCounts[manager] += lineDetail.HitCount + } + for manager, managerHitCount := range managerHitCounts { + if _, ok := mgrStat[manager]; !ok { + mgrStat[manager] = &coveragedb.Coverage{} + } + mgrStat[manager].AddLineHitCount(line, managerHitCount) + } + } + + res := []*coveragedb.MergedCoverageRecord{} + for managerName, managerCoverage := range mgrStat { + res = append(res, &coveragedb.MergedCoverageRecord{ + Manager: managerName, + FilePath: fmr.FilePath, + FileData: managerCoverage, + }) + } + return res +} + func batchFileData(c *Config, targetFilePath string, records []*FileRecord) (*MergeResult, error) { log.Logf(1, "processing %d records for %s", len(records), targetFilePath) repoCommitsMap := make(map[RepoCommit]bool) diff --git a/pkg/covermerger/covermerger_test.go b/pkg/covermerger/covermerger_test.go index 4580c114f..29fee83a6 100644 --- a/pkg/covermerger/covermerger_test.go +++ b/pkg/covermerger/covermerger_test.go @@ -4,18 +4,76 @@ package covermerger import ( + "compress/gzip" + "context" "encoding/json" + "io" "os" "path/filepath" "strings" "testing" + "time" + "cloud.google.com/go/civil" + "cloud.google.com/go/spanner" + "github.com/google/syzkaller/pkg/coveragedb" + "github.com/google/syzkaller/pkg/coveragedb/mocks" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "golang.org/x/sync/errgroup" ) +var testsPath = "testdata/integration" +var defaultTestWorkdir = testsPath + "/all/test-workdir-covermerger" + +func TestMergeCSVWriteJSONL_and_coveragedb_SaveMergeResult(t *testing.T) { + rc, wc := io.Pipe() + eg := errgroup.Group{} + eg.Go(func() error { + defer wc.Close() + totalInstrumented, totalCovered, err := MergeCSVWriteJSONL( + testConfig( + "git://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git", + "fe46a7dd189e25604716c03576d05ac8a5209743", + testsPath+"/aesni-intel_glue/test-workdir-covermerger"), + &coveragedb.HistoryRecord{ + DateTo: civil.DateOf(time.Now()), + }, + strings.NewReader(readFileOrFail(t, testsPath+"/aesni-intel_glue/bqTable.txt")), + wc) + assert.Equal(t, 48, totalInstrumented) + assert.Equal(t, 45, totalCovered) + return err + }) + eg.Go(func() error { + defer rc.Close() + gzrc, err := gzip.NewReader(rc) + assert.NoError(t, err) + defer gzrc.Close() + + spannerMock := mocks.NewSpannerClient(t) + spannerMock. + On("Apply", mock.Anything, mock.MatchedBy(func(ms []*spanner.Mutation) bool { + // 1 file * (5 managers + 1 manager total) x 2 (to update files and subsystems) + 1 merge_history + return len(ms) == 13 + })). + Return(time.Now(), nil). + Once() + + decoder := json.NewDecoder(gzrc) + decoder.DisallowUnknownFields() + + descr := new(coveragedb.HistoryRecord) + assert.NoError(t, decoder.Decode(descr)) + + _, err = coveragedb.SaveMergeResult(context.Background(), spannerMock, descr, decoder, nil) + return err + }) + assert.NoError(t, eg.Wait()) +} + // nolint: lll func TestAggregateStreamData(t *testing.T) { - testsPath := "testdata/integration" type Test struct { name string workdir string @@ -36,7 +94,7 @@ func TestAggregateStreamData(t *testing.T) { }, { name: "code deleted", - workdir: testsPath + "/all/test-workdir-covermerger", + workdir: defaultTestWorkdir, bqTable: `timestamp,version,fuzzing_minutes,arch,build_id,manager,kernel_repo,kernel_branch,kernel_commit,file_path,func_name,sl,sc,el,ec,hit_count,inline,pc samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit1,delete_code.c,func1,2,0,2,-1,1,true,1`, simpleAggregation: `{ @@ -53,7 +111,7 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit1,delete_code.c,func1,2, }, { name: "file deleted", - workdir: testsPath + "/all/test-workdir-covermerger", + workdir: defaultTestWorkdir, bqTable: `timestamp,version,fuzzing_minutes,arch,build_id,manager,kernel_repo,kernel_branch,kernel_commit,file_path,func_name,sl,sc,el,ec,hit_count,inline,pc samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit1,delete_file.c,func1,2,0,2,-1,1,true,1`, simpleAggregation: `{ @@ -68,7 +126,7 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit1,delete_file.c,func1,2, }, { name: "covered line changed", - workdir: testsPath + "/all/test-workdir-covermerger", + workdir: defaultTestWorkdir, bqTable: `timestamp,version,fuzzing_minutes,arch,build_id,manager,kernel_repo,kernel_branch,kernel_commit,file_path,func_name,sl,sc,el,ec,hit_count,inline,pc samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit1,change_line.c,func1,2,0,2,-1,1,true,1 samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit1,change_line.c,func1,3,0,3,-1,1,true,1`, @@ -99,7 +157,7 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit1,change_line.c,func1,3, }, { name: "add line", - workdir: testsPath + "/all/test-workdir-covermerger", + workdir: defaultTestWorkdir, bqTable: `timestamp,version,fuzzing_minutes,arch,build_id,manager,kernel_repo,kernel_branch,kernel_commit,file_path,func_name,sl,sc,el,ec,hit_count,inline,pc samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit1,add_line.c,func1,2,0,2,-1,1,true,1`, simpleAggregation: `{ @@ -129,7 +187,7 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit1,add_line.c,func1,2,0,2 }, { name: "instrumented lines w/o coverage are reported", - workdir: testsPath + "/all/test-workdir-covermerger", + workdir: defaultTestWorkdir, bqTable: `timestamp,version,fuzzing_minutes,arch,build_id,manager,kernel_repo,kernel_branch,kernel_commit,file_path,func_name,sl,sc,el,ec,hit_count,inline,pc samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit1,not_changed.c,func1,3,0,3,-1,0,true,1 samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4,0,4,-1,0,true,1`, @@ -172,34 +230,27 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - ch := make(chan *FileMergeResult) - aggregation := make(map[string]*MergeResult) + mergeResultsCh := make(chan *FileMergeResult) + doneCh := make(chan bool) go func() { - for fmr := range ch { + aggregation := make(map[string]*MergeResult) + for fmr := range mergeResultsCh { aggregation[fmr.FilePath] = fmr.MergeResult } + if !test.checkDetails { + ignoreLineDetailsInTest(aggregation) + } + var expectedAggregation map[string]*MergeResult + assert.NoError(t, json.Unmarshal([]byte(test.simpleAggregation), &expectedAggregation)) + assert.Equal(t, expectedAggregation, aggregation) + doneCh <- true }() - err := MergeCSVData( - &Config{ - Jobs: 2, - skipRepoClone: true, - Base: RepoCommit{ - Repo: test.baseRepo, - Commit: test.baseCommit, - }, - FileVersProvider: &fileVersProviderMock{Workdir: test.workdir}, - }, + assert.NoError(t, MergeCSVData( + testConfig(test.baseRepo, test.baseCommit, test.workdir), strings.NewReader(test.bqTable), - ch, - ) - - assert.Nil(t, err) - var expectedAggregation map[string]*MergeResult - assert.Nil(t, json.Unmarshal([]byte(test.simpleAggregation), &expectedAggregation)) - if !test.checkDetails { - ignoreLineDetailsInTest(aggregation) - } - assert.Equal(t, expectedAggregation, aggregation) + mergeResultsCh)) + close(mergeResultsCh) + <-doneCh }) } } @@ -233,3 +284,15 @@ func readFileOrFail(t *testing.T, path string) string { assert.Nil(t, err) return string(content) } + +func testConfig(repo, commit, workdir string) *Config { + return &Config{ + Jobs: 2, + skipRepoClone: true, + Base: RepoCommit{ + Repo: repo, + Commit: commit, + }, + FileVersProvider: &fileVersProviderMock{Workdir: workdir}, + } +} -- cgit mrf-deployment