diff options
| author | Taras Madan <tarasmadan@google.com> | 2024-12-18 15:28:18 +0100 |
|---|---|---|
| committer | Taras Madan <tarasmadan@google.com> | 2024-12-19 14:38:21 +0000 |
| commit | 5d3c618ce7bc487350983451f940dbdf2571c428 (patch) | |
| tree | bc76e6374067a5f551d94502811e82c3b2263612 | |
| parent | fef5bb5152a5013d0c4571cd0601f46824a9664b (diff) | |
pkg/coveragedb: test SaveMergeResult
1. Make interface testable.
2. Add Spanner interfaces.
3. Generate mocks for proxy interfaces.
4. Test SaveMergeResult.
5. Test MergeCSVWriteJSONL and coveragedb.SaveMergeResult integration.
| -rw-r--r-- | Makefile | 1 | ||||
| -rw-r--r-- | dashboard/app/api.go | 13 | ||||
| -rw-r--r-- | dashboard/app/entities_spanner.go | 3 | ||||
| -rw-r--r-- | pkg/cover/heatmap.go | 3 | ||||
| -rw-r--r-- | pkg/coveragedb/coveragedb.go (renamed from pkg/coveragedb/spanner.go) | 48 | ||||
| -rw-r--r-- | pkg/coveragedb/coveragedb_mock_test.go | 121 | ||||
| -rw-r--r-- | pkg/coveragedb/mocks/SpannerClient.go | 93 | ||||
| -rw-r--r-- | pkg/coveragedb/spannerclient/spanner_client.go | 79 | ||||
| -rw-r--r-- | pkg/covermerger/covermerger.go | 92 | ||||
| -rw-r--r-- | pkg/covermerger/covermerger_test.go | 121 | ||||
| -rw-r--r-- | tools/syz-covermerger/syz_covermerger.go | 108 |
11 files changed, 532 insertions, 150 deletions
@@ -237,6 +237,7 @@ generate: generate_go: format_cpp $(GO) generate ./executor ./pkg/ifuzz ./pkg/build ./pkg/rpcserver $(GO) generate ./vm/proxyapp + $(GO) generate ./pkg/coveragedb generate_rpc: flatc -o pkg/flatrpc --warnings-as-errors --gen-object-api --filename-suffix "" --go --gen-onefile --go-namespace flatrpc pkg/flatrpc/flatrpc.fbs diff --git a/dashboard/app/api.go b/dashboard/app/api.go index ecdecd6bf..9800d1b40 100644 --- a/dashboard/app/api.go +++ b/dashboard/app/api.go @@ -26,6 +26,7 @@ import ( "github.com/google/syzkaller/pkg/asset" "github.com/google/syzkaller/pkg/auth" "github.com/google/syzkaller/pkg/coveragedb" + "github.com/google/syzkaller/pkg/coveragedb/spannerclient" "github.com/google/syzkaller/pkg/debugtracer" "github.com/google/syzkaller/pkg/email" "github.com/google/syzkaller/pkg/gcs" @@ -1944,15 +1945,21 @@ func apiCreateUploadURL(c context.Context, payload io.Reader) (interface{}, erro // Second+ records are coveragedb.MergedCoverageRecord. func apiSaveCoverage(c context.Context, payload io.Reader) (interface{}, error) { descr := new(coveragedb.HistoryRecord) - if err := json.NewDecoder(payload).Decode(descr); err != nil { - return 0, fmt.Errorf("json.NewDecoder(dashapi.MergedCoverageDescription).Decode: %w", err) + jsonDec := json.NewDecoder(payload) + if err := jsonDec.Decode(descr); err != nil { + return 0, fmt.Errorf("json.NewDecoder(coveragedb.HistoryRecord).Decode: %w", err) } var sss []*subsystem.Subsystem if service := getNsConfig(c, descr.Namespace).Subsystems.Service; service != nil { sss = service.List() log.Infof(c, "found %d subsystems for %s namespace", len(sss), descr.Namespace) } - rowsCreated, err := coveragedb.SaveMergeResult(c, appengine.AppID(context.Background()), descr, payload, sss) + client, err := spannerclient.NewClient(c, appengine.AppID(context.Background())) + if err != nil { + return 0, fmt.Errorf("coveragedb.NewClient() failed: %s", err.Error()) + } + defer client.Close() + rowsCreated, err := coveragedb.SaveMergeResult(c, client, descr, jsonDec, sss) if err != nil { log.Errorf(c, "error storing coverage for ns %s, date %s: %v", descr.Namespace, descr.DateTo.String(), err) diff --git a/dashboard/app/entities_spanner.go b/dashboard/app/entities_spanner.go index ff154986e..e3f3bad9e 100644 --- a/dashboard/app/entities_spanner.go +++ b/dashboard/app/entities_spanner.go @@ -11,6 +11,7 @@ import ( "cloud.google.com/go/civil" "cloud.google.com/go/spanner" "github.com/google/syzkaller/pkg/coveragedb" + "github.com/google/syzkaller/pkg/coveragedb/spannerclient" "google.golang.org/api/iterator" ) @@ -25,7 +26,7 @@ type CoverageHistory struct { // MergedCoverage uses dates, not time. func MergedCoverage(ctx context.Context, ns, periodType string) (*CoverageHistory, error) { projectID := os.Getenv("GOOGLE_CLOUD_PROJECT") - client, err := coveragedb.NewClient(ctx, projectID) + client, err := spannerclient.NewClient(ctx, projectID) if err != nil { return nil, fmt.Errorf("spanner.NewClient() failed: %s", err.Error()) } diff --git a/pkg/cover/heatmap.go b/pkg/cover/heatmap.go index 0d7c93cd5..ef4f7ffab 100644 --- a/pkg/cover/heatmap.go +++ b/pkg/cover/heatmap.go @@ -14,6 +14,7 @@ import ( "cloud.google.com/go/spanner" "github.com/google/syzkaller/pkg/coveragedb" + "github.com/google/syzkaller/pkg/coveragedb/spannerclient" _ "github.com/google/syzkaller/pkg/subsystem/lists" "golang.org/x/exp/maps" "google.golang.org/api/iterator" @@ -185,7 +186,7 @@ where func filesCoverageWithDetails(ctx context.Context, projectID, ns, subsystem string, timePeriods []coveragedb.TimePeriod, ) ([]*fileCoverageWithDetails, error) { - client, err := coveragedb.NewClient(ctx, projectID) + client, err := spannerclient.NewClient(ctx, projectID) if err != nil { return nil, fmt.Errorf("spanner.NewClient() failed: %s", err.Error()) } diff --git a/pkg/coveragedb/spanner.go b/pkg/coveragedb/coveragedb.go index e2a197e0c..0a6175ad5 100644 --- a/pkg/coveragedb/spanner.go +++ b/pkg/coveragedb/coveragedb.go @@ -4,9 +4,9 @@ package coveragedb import ( - "bufio" "context" "encoding/json" + "errors" "fmt" "io" "os" @@ -15,6 +15,7 @@ import ( "cloud.google.com/go/civil" "cloud.google.com/go/spanner" + "github.com/google/syzkaller/pkg/coveragedb/spannerclient" "github.com/google/syzkaller/pkg/subsystem" _ "github.com/google/syzkaller/pkg/subsystem/lists" "github.com/google/uuid" @@ -49,11 +50,6 @@ type HistoryRecord struct { TotalRows int64 } -func NewClient(ctx context.Context, projectID string) (*spanner.Client, error) { - database := "projects/" + projectID + "/instances/syzbot/databases/coverage" - return spanner.NewClient(ctx, database) -} - type Coverage struct { Instrumented int64 Covered int64 @@ -76,27 +72,26 @@ type MergedCoverageRecord struct { FileData *Coverage } -func SaveMergeResult(ctx context.Context, projectID string, descr *HistoryRecord, jsonl io.Reader, +func SaveMergeResult(ctx context.Context, client spannerclient.SpannerClient, descr *HistoryRecord, dec *json.Decoder, sss []*subsystem.Subsystem) (int, error) { - client, err := NewClient(ctx, projectID) - if err != nil { - return 0, fmt.Errorf("spanner.NewClient() failed: %s", err.Error()) - } - defer client.Close() var rowsCreated int - ssMatcher := subsystem.MakePathMatcher(sss) ssCache := make(map[string][]string) session := uuid.New().String() mutations := []*spanner.Mutation{} - jsonlScanner := bufio.NewScanner(jsonl) - - for jsonlScanner.Scan() { - var mcr MergedCoverageRecord - if err := json.Unmarshal([]byte(jsonlScanner.Text()), &mcr); err != nil { - return rowsCreated, fmt.Errorf("json.Unmarshal(MergedCoverageRecord): %w", err) + var mcr MergedCoverageRecord + for { + err := dec.Decode(&mcr) + if err == io.EOF { + break + } + if err != nil { + return rowsCreated, fmt.Errorf("dec.Decode(MergedCoverageRecord): %w", err) + } + if mcr.FileData == nil { + return rowsCreated, errors.New("field MergedCoverageRecord.FileData can't be nil") } mutations = append(mutations, fileRecordMutation(session, &mcr)) subsystems := fileSubsystems(mcr.FilePath, ssMatcher, ssCache) @@ -105,8 +100,8 @@ func SaveMergeResult(ctx context.Context, projectID string, descr *HistoryRecord // This includes both explicit mutations of the fields (6 fields * 1k records = 6k mutations) // and implicit index mutations. // We keep the number of records low enough for the number of explicit mutations * 10 does not exceed the limit. - if len(mutations) > 1000 { - if _, err = client.Apply(ctx, mutations); err != nil { + if len(mutations) >= 1000 { + if _, err := client.Apply(ctx, mutations); err != nil { return rowsCreated, fmt.Errorf("failed to spanner.Apply(inserts): %s", err.Error()) } rowsCreated += len(mutations) @@ -115,9 +110,10 @@ func SaveMergeResult(ctx context.Context, projectID string, descr *HistoryRecord } mutations = append(mutations, historyMutation(session, descr)) - if _, err = client.Apply(ctx, mutations); err != nil { + if _, err := client.Apply(ctx, mutations); err != nil { return rowsCreated, fmt.Errorf("failed to spanner.Apply(inserts): %s", err.Error()) } + rowsCreated += len(mutations) return rowsCreated, nil } @@ -150,7 +146,7 @@ where func ReadLinesHitCount(ctx context.Context, ns, commit, file string, tp TimePeriod, ) (map[int]int, error) { projectID := os.Getenv("GOOGLE_CLOUD_PROJECT") - client, err := NewClient(ctx, projectID) + client, err := spannerclient.NewClient(ctx, projectID) if err != nil { return nil, fmt.Errorf("spanner.NewClient: %w", err) } @@ -236,7 +232,7 @@ func fileSubsystems(filePath string, ssMatcher *subsystem.PathMatcher, ssCache m } func NsDataMerged(ctx context.Context, projectID, ns string) ([]TimePeriod, []int64, error) { - client, err := NewClient(ctx, projectID) + client, err := spannerclient.NewClient(ctx, projectID) if err != nil { return nil, nil, fmt.Errorf("spanner.NewClient() failed: %s", err.Error()) } @@ -292,7 +288,7 @@ func NsDataMerged(ctx context.Context, projectID, ns string) ([]TimePeriod, []in // Returns the number of orphaned file entries successfully deleted. func DeleteGarbage(ctx context.Context) (int64, error) { batchSize := 10_000 - client, err := NewClient(ctx, os.Getenv("GOOGLE_CLOUD_PROJECT")) + client, err := spannerclient.NewClient(ctx, os.Getenv("GOOGLE_CLOUD_PROJECT")) if err != nil { return 0, fmt.Errorf("coveragedb.NewClient: %w", err) } @@ -339,7 +335,7 @@ func DeleteGarbage(ctx context.Context) (int64, error) { return totalDeleted.Load(), nil } -func goSpannerDelete(ctx context.Context, batch []spanner.Key, eg *errgroup.Group, client *spanner.Client, +func goSpannerDelete(ctx context.Context, batch []spanner.Key, eg *errgroup.Group, client spannerclient.SpannerClient, totalDeleted *atomic.Int64) { ks := spanner.KeySetFromKeys(batch...) ksSize := len(batch) diff --git a/pkg/coveragedb/coveragedb_mock_test.go b/pkg/coveragedb/coveragedb_mock_test.go new file mode 100644 index 000000000..8f20a43ed --- /dev/null +++ b/pkg/coveragedb/coveragedb_mock_test.go @@ -0,0 +1,121 @@ +// Copyright 2024 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package coveragedb + +import ( + "context" + "encoding/json" + "io" + "strings" + "testing" + "time" + + "cloud.google.com/go/spanner" + "github.com/google/syzkaller/pkg/coveragedb/mocks" + "github.com/google/syzkaller/pkg/subsystem" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +//go:generate ../../tools/mockery.sh --name SpannerClient -r + +type spannerMockTune func(*testing.T, *mocks.SpannerClient) + +func TestSaveMergeResult(t *testing.T) { + tests := []struct { + name string + sss []*subsystem.Subsystem + jsonl io.Reader + descr *HistoryRecord + mockTune spannerMockTune + wantErr bool + wantRows int + }{ + { + name: "empty jsonl", + jsonl: strings.NewReader(`{}`), + wantErr: true, + }, + { + name: "wrong jsonl content", + jsonl: strings.NewReader(`{a}`), + wantErr: true, + }, + { + name: "1 record, Ok", + jsonl: strings.NewReader(`{"FileData":{}}`), + descr: &HistoryRecord{}, + wantRows: 3, // 1 in files, 1 in file_subsystems and 1 in merge_history + mockTune: func(t *testing.T, m *mocks.SpannerClient) { + m. + On("Apply", mock.Anything, mock.Anything). + Return(time.Now(), nil). + Once() + }, + }, + { + name: "2 records, Ok", + jsonl: strings.NewReader(` {"FileData":{}} + {"FileData":{}}`), + descr: &HistoryRecord{}, + wantRows: 5, + mockTune: func(t *testing.T, m *mocks.SpannerClient) { + m. + On("Apply", + mock.Anything, + mock.MatchedBy(func(ms []*spanner.Mutation) bool { + // 2 in files, 2 in file_subsystems and 1 in merge_history + return len(ms) == 5 + })). + Return(time.Now(), nil). + Once() + }, + }, + { + name: "2k records, Ok", + jsonl: strings.NewReader(strings.Repeat("{\"FileData\":{}}\n", 2000)), + descr: &HistoryRecord{}, + wantRows: 4001, + mockTune: func(t *testing.T, m *mocks.SpannerClient) { + m. + On("Apply", + mock.Anything, + mock.MatchedBy(func(ms []*spanner.Mutation) bool { + // 2k in files, 2k in file_subsystems + return len(ms) == 1000 + })). + Return(time.Now(), nil). + Times(4). + On("Apply", + mock.Anything, + mock.MatchedBy(func(ms []*spanner.Mutation) bool { + // And 1 in merge_history. + return len(ms) == 1 + })). + Return(time.Now(), nil). + Once() + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + t.Parallel() + spannerMock := mocks.NewSpannerClient(t) + if test.mockTune != nil { + test.mockTune(t, spannerMock) + } + gotRows, err := SaveMergeResult( + context.Background(), + spannerMock, + test.descr, + json.NewDecoder(test.jsonl), test.sss) + if test.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + assert.Equal(t, test.wantRows, gotRows) + }) + } +} diff --git a/pkg/coveragedb/mocks/SpannerClient.go b/pkg/coveragedb/mocks/SpannerClient.go new file mode 100644 index 000000000..cb4fa1be3 --- /dev/null +++ b/pkg/coveragedb/mocks/SpannerClient.go @@ -0,0 +1,93 @@ +// Code generated by mockery v2.45.1. DO NOT EDIT. + +package mocks + +import ( + context "context" + + spanner "cloud.google.com/go/spanner" + mock "github.com/stretchr/testify/mock" + + spannerclient "github.com/google/syzkaller/pkg/coveragedb/spannerclient" + + time "time" +) + +// SpannerClient is an autogenerated mock type for the SpannerClient type +type SpannerClient struct { + mock.Mock +} + +// Apply provides a mock function with given fields: ctx, ms, opts +func (_m *SpannerClient) Apply(ctx context.Context, ms []*spanner.Mutation, opts ...spanner.ApplyOption) (time.Time, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, ms) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for Apply") + } + + var r0 time.Time + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, []*spanner.Mutation, ...spanner.ApplyOption) (time.Time, error)); ok { + return rf(ctx, ms, opts...) + } + if rf, ok := ret.Get(0).(func(context.Context, []*spanner.Mutation, ...spanner.ApplyOption) time.Time); ok { + r0 = rf(ctx, ms, opts...) + } else { + r0 = ret.Get(0).(time.Time) + } + + if rf, ok := ret.Get(1).(func(context.Context, []*spanner.Mutation, ...spanner.ApplyOption) error); ok { + r1 = rf(ctx, ms, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Close provides a mock function with given fields: +func (_m *SpannerClient) Close() { + _m.Called() +} + +// Single provides a mock function with given fields: +func (_m *SpannerClient) Single() spannerclient.ReadOnlyTransaction { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Single") + } + + var r0 spannerclient.ReadOnlyTransaction + if rf, ok := ret.Get(0).(func() spannerclient.ReadOnlyTransaction); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(spannerclient.ReadOnlyTransaction) + } + } + + return r0 +} + +// NewSpannerClient creates a new instance of SpannerClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewSpannerClient(t interface { + mock.TestingT + Cleanup(func()) +}) *SpannerClient { + mock := &SpannerClient{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/coveragedb/spannerclient/spanner_client.go b/pkg/coveragedb/spannerclient/spanner_client.go new file mode 100644 index 000000000..4f655c6ed --- /dev/null +++ b/pkg/coveragedb/spannerclient/spanner_client.go @@ -0,0 +1,79 @@ +// Copyright 2024 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package spannerclient + +import ( + "context" + "time" + + "cloud.google.com/go/spanner" +) + +type SpannerClient interface { + Close() + Apply(ctx context.Context, ms []*spanner.Mutation, opts ...spanner.ApplyOption) (commitTimestamp time.Time, err error) + Single() ReadOnlyTransaction +} + +type ReadOnlyTransaction interface { + Query(ctx context.Context, statement spanner.Statement) RowIterator +} + +type RowIterator interface { + Next() (Row, error) + Stop() +} + +type Row interface { + ToStruct(p interface{}) error +} + +type SpannerClientProxy struct { + client *spanner.Client +} + +func (proxy *SpannerClientProxy) Close() { + proxy.client.Close() +} + +func (proxy *SpannerClientProxy) Apply(ctx context.Context, ms []*spanner.Mutation, opts ...spanner.ApplyOption, +) (commitTimestamp time.Time, err error) { + return proxy.client.Apply(ctx, ms, opts...) +} + +func (proxy *SpannerClientProxy) Single() ReadOnlyTransaction { + return &SpannerReadOnlyTransactionProxy{ + readOnlyTransaction: proxy.client.Single(), + } +} + +type SpannerReadOnlyTransactionProxy struct { + readOnlyTransaction *spanner.ReadOnlyTransaction +} + +func (proxy *SpannerReadOnlyTransactionProxy) Query(ctx context.Context, statement spanner.Statement) RowIterator { + return &SpannerRowIteratorProxy{ + rowIterator: proxy.readOnlyTransaction.Query(ctx, statement), + } +} + +type SpannerRowIteratorProxy struct { + rowIterator *spanner.RowIterator +} + +func (proxy *SpannerRowIteratorProxy) Next() (Row, error) { + return proxy.rowIterator.Next() +} + +func (proxy *SpannerRowIteratorProxy) Stop() { + proxy.rowIterator.Stop() +} + +func NewClient(ctx context.Context, projectID string) (SpannerClient, error) { + database := "projects/" + projectID + "/instances/syzbot/databases/coverage" + client, err := spanner.NewClient(ctx, database) + return &SpannerClientProxy{ + client: client, + }, err +} diff --git a/pkg/covermerger/covermerger.go b/pkg/covermerger/covermerger.go index 0f6ff43e4..839a85a8e 100644 --- a/pkg/covermerger/covermerger.go +++ b/pkg/covermerger/covermerger.go @@ -4,13 +4,17 @@ package covermerger import ( + "compress/gzip" "context" "encoding/csv" + "encoding/json" "errors" "fmt" "io" + "slices" "strconv" + "github.com/google/syzkaller/pkg/coveragedb" "github.com/google/syzkaller/pkg/log" "golang.org/x/exp/maps" "golang.org/x/sync/errgroup" @@ -50,6 +54,94 @@ type FileCoverageMerger interface { Result() *MergeResult } +// MergeCSVWriteJSONL mergers input CSV and generates JSONL records. +// The amount of lines generated is [count(managers)+1] * [count(kernel_files)]. +// Returns (totalInstrumentedLines, totalCoveredLines, error). +func MergeCSVWriteJSONL(config *Config, descr *coveragedb.HistoryRecord, csvReader io.Reader, w io.Writer, +) (int, int, error) { + eg := errgroup.Group{} + mergeResults := make(chan *FileMergeResult) + eg.Go(func() error { + defer close(mergeResults) + if err := MergeCSVData(config, csvReader, mergeResults); err != nil { + return fmt.Errorf("covermerger.MergeCSVData: %w", err) + } + return nil + }) + var totalInstrumentedLines, totalCoveredLines int + eg.Go(func() error { + var encoder *json.Encoder + if w != nil { + gzw := gzip.NewWriter(w) + defer gzw.Close() + encoder = json.NewEncoder(gzw) + } + if encoder != nil { + if err := encoder.Encode(descr); err != nil { + return fmt.Errorf("encoder.Encode(MergedCoverageDescription): %w", err) + } + } + for fileMergeResult := range mergeResults { + dashCoverageRecords := mergedCoverageRecords(fileMergeResult) + if encoder != nil { + for _, record := range dashCoverageRecords { + if err := encoder.Encode(record); err != nil { + return fmt.Errorf("encoder.Encode(MergedCoverageRecord): %w", err) + } + } + } + for _, hitCount := range fileMergeResult.HitCounts { + totalInstrumentedLines++ + if hitCount > 0 { + totalCoveredLines++ + } + } + } + return nil + }) + if err := eg.Wait(); err != nil { + return 0, 0, fmt.Errorf("eg.Wait: %w", err) + } + return totalInstrumentedLines, totalCoveredLines, nil +} + +const allManagers = "*" + +func mergedCoverageRecords(fmr *FileMergeResult) []*coveragedb.MergedCoverageRecord { + if !fmr.FileExists { + return nil + } + lines := maps.Keys(fmr.HitCounts) + slices.Sort(lines) + mgrStat := make(map[string]*coveragedb.Coverage) + mgrStat[allManagers] = &coveragedb.Coverage{} + + for _, line := range lines { + mgrStat[allManagers].AddLineHitCount(line, fmr.HitCounts[line]) + managerHitCounts := map[string]int{} + for _, lineDetail := range fmr.LineDetails[line] { + manager := lineDetail.Manager + managerHitCounts[manager] += lineDetail.HitCount + } + for manager, managerHitCount := range managerHitCounts { + if _, ok := mgrStat[manager]; !ok { + mgrStat[manager] = &coveragedb.Coverage{} + } + mgrStat[manager].AddLineHitCount(line, managerHitCount) + } + } + + res := []*coveragedb.MergedCoverageRecord{} + for managerName, managerCoverage := range mgrStat { + res = append(res, &coveragedb.MergedCoverageRecord{ + Manager: managerName, + FilePath: fmr.FilePath, + FileData: managerCoverage, + }) + } + return res +} + func batchFileData(c *Config, targetFilePath string, records []*FileRecord) (*MergeResult, error) { log.Logf(1, "processing %d records for %s", len(records), targetFilePath) repoCommitsMap := make(map[RepoCommit]bool) diff --git a/pkg/covermerger/covermerger_test.go b/pkg/covermerger/covermerger_test.go index 4580c114f..29fee83a6 100644 --- a/pkg/covermerger/covermerger_test.go +++ b/pkg/covermerger/covermerger_test.go @@ -4,18 +4,76 @@ package covermerger import ( + "compress/gzip" + "context" "encoding/json" + "io" "os" "path/filepath" "strings" "testing" + "time" + "cloud.google.com/go/civil" + "cloud.google.com/go/spanner" + "github.com/google/syzkaller/pkg/coveragedb" + "github.com/google/syzkaller/pkg/coveragedb/mocks" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "golang.org/x/sync/errgroup" ) +var testsPath = "testdata/integration" +var defaultTestWorkdir = testsPath + "/all/test-workdir-covermerger" + +func TestMergeCSVWriteJSONL_and_coveragedb_SaveMergeResult(t *testing.T) { + rc, wc := io.Pipe() + eg := errgroup.Group{} + eg.Go(func() error { + defer wc.Close() + totalInstrumented, totalCovered, err := MergeCSVWriteJSONL( + testConfig( + "git://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git", + "fe46a7dd189e25604716c03576d05ac8a5209743", + testsPath+"/aesni-intel_glue/test-workdir-covermerger"), + &coveragedb.HistoryRecord{ + DateTo: civil.DateOf(time.Now()), + }, + strings.NewReader(readFileOrFail(t, testsPath+"/aesni-intel_glue/bqTable.txt")), + wc) + assert.Equal(t, 48, totalInstrumented) + assert.Equal(t, 45, totalCovered) + return err + }) + eg.Go(func() error { + defer rc.Close() + gzrc, err := gzip.NewReader(rc) + assert.NoError(t, err) + defer gzrc.Close() + + spannerMock := mocks.NewSpannerClient(t) + spannerMock. + On("Apply", mock.Anything, mock.MatchedBy(func(ms []*spanner.Mutation) bool { + // 1 file * (5 managers + 1 manager total) x 2 (to update files and subsystems) + 1 merge_history + return len(ms) == 13 + })). + Return(time.Now(), nil). + Once() + + decoder := json.NewDecoder(gzrc) + decoder.DisallowUnknownFields() + + descr := new(coveragedb.HistoryRecord) + assert.NoError(t, decoder.Decode(descr)) + + _, err = coveragedb.SaveMergeResult(context.Background(), spannerMock, descr, decoder, nil) + return err + }) + assert.NoError(t, eg.Wait()) +} + // nolint: lll func TestAggregateStreamData(t *testing.T) { - testsPath := "testdata/integration" type Test struct { name string workdir string @@ -36,7 +94,7 @@ func TestAggregateStreamData(t *testing.T) { }, { name: "code deleted", - workdir: testsPath + "/all/test-workdir-covermerger", + workdir: defaultTestWorkdir, bqTable: `timestamp,version,fuzzing_minutes,arch,build_id,manager,kernel_repo,kernel_branch,kernel_commit,file_path,func_name,sl,sc,el,ec,hit_count,inline,pc samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit1,delete_code.c,func1,2,0,2,-1,1,true,1`, simpleAggregation: `{ @@ -53,7 +111,7 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit1,delete_code.c,func1,2, }, { name: "file deleted", - workdir: testsPath + "/all/test-workdir-covermerger", + workdir: defaultTestWorkdir, bqTable: `timestamp,version,fuzzing_minutes,arch,build_id,manager,kernel_repo,kernel_branch,kernel_commit,file_path,func_name,sl,sc,el,ec,hit_count,inline,pc samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit1,delete_file.c,func1,2,0,2,-1,1,true,1`, simpleAggregation: `{ @@ -68,7 +126,7 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit1,delete_file.c,func1,2, }, { name: "covered line changed", - workdir: testsPath + "/all/test-workdir-covermerger", + workdir: defaultTestWorkdir, bqTable: `timestamp,version,fuzzing_minutes,arch,build_id,manager,kernel_repo,kernel_branch,kernel_commit,file_path,func_name,sl,sc,el,ec,hit_count,inline,pc samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit1,change_line.c,func1,2,0,2,-1,1,true,1 samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit1,change_line.c,func1,3,0,3,-1,1,true,1`, @@ -99,7 +157,7 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit1,change_line.c,func1,3, }, { name: "add line", - workdir: testsPath + "/all/test-workdir-covermerger", + workdir: defaultTestWorkdir, bqTable: `timestamp,version,fuzzing_minutes,arch,build_id,manager,kernel_repo,kernel_branch,kernel_commit,file_path,func_name,sl,sc,el,ec,hit_count,inline,pc samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit1,add_line.c,func1,2,0,2,-1,1,true,1`, simpleAggregation: `{ @@ -129,7 +187,7 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit1,add_line.c,func1,2,0,2 }, { name: "instrumented lines w/o coverage are reported", - workdir: testsPath + "/all/test-workdir-covermerger", + workdir: defaultTestWorkdir, bqTable: `timestamp,version,fuzzing_minutes,arch,build_id,manager,kernel_repo,kernel_branch,kernel_commit,file_path,func_name,sl,sc,el,ec,hit_count,inline,pc samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit1,not_changed.c,func1,3,0,3,-1,0,true,1 samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4,0,4,-1,0,true,1`, @@ -172,34 +230,27 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - ch := make(chan *FileMergeResult) - aggregation := make(map[string]*MergeResult) + mergeResultsCh := make(chan *FileMergeResult) + doneCh := make(chan bool) go func() { - for fmr := range ch { + aggregation := make(map[string]*MergeResult) + for fmr := range mergeResultsCh { aggregation[fmr.FilePath] = fmr.MergeResult } + if !test.checkDetails { + ignoreLineDetailsInTest(aggregation) + } + var expectedAggregation map[string]*MergeResult + assert.NoError(t, json.Unmarshal([]byte(test.simpleAggregation), &expectedAggregation)) + assert.Equal(t, expectedAggregation, aggregation) + doneCh <- true }() - err := MergeCSVData( - &Config{ - Jobs: 2, - skipRepoClone: true, - Base: RepoCommit{ - Repo: test.baseRepo, - Commit: test.baseCommit, - }, - FileVersProvider: &fileVersProviderMock{Workdir: test.workdir}, - }, + assert.NoError(t, MergeCSVData( + testConfig(test.baseRepo, test.baseCommit, test.workdir), strings.NewReader(test.bqTable), - ch, - ) - - assert.Nil(t, err) - var expectedAggregation map[string]*MergeResult - assert.Nil(t, json.Unmarshal([]byte(test.simpleAggregation), &expectedAggregation)) - if !test.checkDetails { - ignoreLineDetailsInTest(aggregation) - } - assert.Equal(t, expectedAggregation, aggregation) + mergeResultsCh)) + close(mergeResultsCh) + <-doneCh }) } } @@ -233,3 +284,15 @@ func readFileOrFail(t *testing.T, path string) string { assert.Nil(t, err) return string(content) } + +func testConfig(repo, commit, workdir string) *Config { + return &Config{ + Jobs: 2, + skipRepoClone: true, + Base: RepoCommit{ + Repo: repo, + Commit: commit, + }, + FileVersProvider: &fileVersProviderMock{Workdir: workdir}, + } +} diff --git a/tools/syz-covermerger/syz_covermerger.go b/tools/syz-covermerger/syz_covermerger.go index a58ef5d8a..9814f6ce7 100644 --- a/tools/syz-covermerger/syz_covermerger.go +++ b/tools/syz-covermerger/syz_covermerger.go @@ -4,14 +4,11 @@ package main import ( - "compress/gzip" "context" - "encoding/json" "flag" "fmt" "io" "runtime" - "slices" "cloud.google.com/go/civil" "github.com/google/syzkaller/dashboard/dashapi" @@ -20,8 +17,6 @@ import ( "github.com/google/syzkaller/pkg/gcs" "github.com/google/syzkaller/pkg/log" _ "github.com/google/syzkaller/pkg/subsystem/lists" - "golang.org/x/exp/maps" - "golang.org/x/sync/errgroup" ) var ( @@ -109,57 +104,28 @@ func do() error { return fmt.Errorf("gcsClient.FileWriter: %w", err) } } - eg := errgroup.Group{} - mergeResults := make(chan *covermerger.FileMergeResult) - eg.Go(func() error { - defer close(mergeResults) - if err := covermerger.MergeCSVData(config, csvReader, mergeResults); err != nil { - return fmt.Errorf("covermerger.MergeCSVData: %w", err) - } - return nil - }) - var totalInstrumentedLines, totalCoveredLines int - eg.Go(func() error { - encoder := json.NewEncoder(gzip.NewWriter(wc)) - if encoder != nil { - if err := encoder.Encode(&coveragedb.HistoryRecord{ - Namespace: *flagNamespace, - Repo: *flagRepo, - Commit: *flagCommit, - Duration: *flagDuration, - DateTo: dateTo, - TotalRows: *flagTotalRows, - }); err != nil { - return fmt.Errorf("encoder.Encode(MergedCoverageDescription): %w", err) - } - } - for fileMergeResult := range mergeResults { - dashCoverageRecords := mergedCoverageRecords(fileMergeResult) - if encoder != nil { - for _, record := range dashCoverageRecords { - if err := encoder.Encode(record); err != nil { - return fmt.Errorf("encoder.Encode(MergedCoverageRecord): %w", err) - } - } - } - for _, hitCount := range fileMergeResult.HitCounts { - totalInstrumentedLines++ - if hitCount > 0 { - totalCoveredLines++ - } - } - } - if err := wc.Close(); err != nil { - return fmt.Errorf("wc.Close: %w", err) - } - return nil - }) - if err := eg.Wait(); err != nil { - return fmt.Errorf("eg.Wait: %w", err) + totalInstrumentedLines, totalCoveredLines, err := covermerger.MergeCSVWriteJSONL( + config, + &coveragedb.HistoryRecord{ + Namespace: *flagNamespace, + Repo: *flagRepo, + Commit: *flagCommit, + Duration: *flagDuration, + DateTo: dateTo, + TotalRows: *flagTotalRows, + }, + csvReader, + wc) + if err != nil { + return fmt.Errorf("covermerger.MergeCSVWriteJSONL: %w", err) + } + if err := wc.Close(); err != nil { + return fmt.Errorf("wc.Close: %w", err) } printCoverage(totalInstrumentedLines, totalCoveredLines) if *flagToDashAPI != "" { + // Merging may take hours. It is better to create new connection instead of reuse. dash, err := dashapi.New(*flagDashboardClientName, *flagToDashAPI, "") if err != nil { return fmt.Errorf("dashapi.New: %w", err) @@ -181,41 +147,3 @@ func printCoverage(instrumented, covered int) { fmt.Printf("total instrumented(%d), covered(%d), %.2f%%\n", instrumented, covered, coverage*100) } - -const allManagers = "*" - -func mergedCoverageRecords(fmr *covermerger.FileMergeResult, -) []*coveragedb.MergedCoverageRecord { - if !fmr.FileExists { - return nil - } - lines := maps.Keys(fmr.HitCounts) - slices.Sort(lines) - mgrStat := make(map[string]*coveragedb.Coverage) - mgrStat[allManagers] = &coveragedb.Coverage{} - - for _, line := range lines { - mgrStat[allManagers].AddLineHitCount(line, fmr.HitCounts[line]) - managerHitCounts := map[string]int{} - for _, lineDetail := range fmr.LineDetails[line] { - manager := lineDetail.Manager - managerHitCounts[manager] += lineDetail.HitCount - } - for manager, managerHitCount := range managerHitCounts { - if _, ok := mgrStat[manager]; !ok { - mgrStat[manager] = &coveragedb.Coverage{} - } - mgrStat[manager].AddLineHitCount(line, managerHitCount) - } - } - - res := []*coveragedb.MergedCoverageRecord{} - for managerName, managerCoverage := range mgrStat { - res = append(res, &coveragedb.MergedCoverageRecord{ - Manager: managerName, - FilePath: fmr.FilePath, - FileData: managerCoverage, - }) - } - return res -} |
