aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTaras Madan <tarasmadan@google.com>2024-12-18 15:28:18 +0100
committerTaras Madan <tarasmadan@google.com>2024-12-19 14:38:21 +0000
commit5d3c618ce7bc487350983451f940dbdf2571c428 (patch)
treebc76e6374067a5f551d94502811e82c3b2263612
parentfef5bb5152a5013d0c4571cd0601f46824a9664b (diff)
pkg/coveragedb: test SaveMergeResult
1. Make interface testable. 2. Add Spanner interfaces. 3. Generate mocks for proxy interfaces. 4. Test SaveMergeResult. 5. Test MergeCSVWriteJSONL and coveragedb.SaveMergeResult integration.
-rw-r--r--Makefile1
-rw-r--r--dashboard/app/api.go13
-rw-r--r--dashboard/app/entities_spanner.go3
-rw-r--r--pkg/cover/heatmap.go3
-rw-r--r--pkg/coveragedb/coveragedb.go (renamed from pkg/coveragedb/spanner.go)48
-rw-r--r--pkg/coveragedb/coveragedb_mock_test.go121
-rw-r--r--pkg/coveragedb/mocks/SpannerClient.go93
-rw-r--r--pkg/coveragedb/spannerclient/spanner_client.go79
-rw-r--r--pkg/covermerger/covermerger.go92
-rw-r--r--pkg/covermerger/covermerger_test.go121
-rw-r--r--tools/syz-covermerger/syz_covermerger.go108
11 files changed, 532 insertions, 150 deletions
diff --git a/Makefile b/Makefile
index 39e0804bf..b76e0918d 100644
--- a/Makefile
+++ b/Makefile
@@ -237,6 +237,7 @@ generate:
generate_go: format_cpp
$(GO) generate ./executor ./pkg/ifuzz ./pkg/build ./pkg/rpcserver
$(GO) generate ./vm/proxyapp
+ $(GO) generate ./pkg/coveragedb
generate_rpc:
flatc -o pkg/flatrpc --warnings-as-errors --gen-object-api --filename-suffix "" --go --gen-onefile --go-namespace flatrpc pkg/flatrpc/flatrpc.fbs
diff --git a/dashboard/app/api.go b/dashboard/app/api.go
index ecdecd6bf..9800d1b40 100644
--- a/dashboard/app/api.go
+++ b/dashboard/app/api.go
@@ -26,6 +26,7 @@ import (
"github.com/google/syzkaller/pkg/asset"
"github.com/google/syzkaller/pkg/auth"
"github.com/google/syzkaller/pkg/coveragedb"
+ "github.com/google/syzkaller/pkg/coveragedb/spannerclient"
"github.com/google/syzkaller/pkg/debugtracer"
"github.com/google/syzkaller/pkg/email"
"github.com/google/syzkaller/pkg/gcs"
@@ -1944,15 +1945,21 @@ func apiCreateUploadURL(c context.Context, payload io.Reader) (interface{}, erro
// Second+ records are coveragedb.MergedCoverageRecord.
func apiSaveCoverage(c context.Context, payload io.Reader) (interface{}, error) {
descr := new(coveragedb.HistoryRecord)
- if err := json.NewDecoder(payload).Decode(descr); err != nil {
- return 0, fmt.Errorf("json.NewDecoder(dashapi.MergedCoverageDescription).Decode: %w", err)
+ jsonDec := json.NewDecoder(payload)
+ if err := jsonDec.Decode(descr); err != nil {
+ return 0, fmt.Errorf("json.NewDecoder(coveragedb.HistoryRecord).Decode: %w", err)
}
var sss []*subsystem.Subsystem
if service := getNsConfig(c, descr.Namespace).Subsystems.Service; service != nil {
sss = service.List()
log.Infof(c, "found %d subsystems for %s namespace", len(sss), descr.Namespace)
}
- rowsCreated, err := coveragedb.SaveMergeResult(c, appengine.AppID(context.Background()), descr, payload, sss)
+ client, err := spannerclient.NewClient(c, appengine.AppID(context.Background()))
+ if err != nil {
+ return 0, fmt.Errorf("coveragedb.NewClient() failed: %s", err.Error())
+ }
+ defer client.Close()
+ rowsCreated, err := coveragedb.SaveMergeResult(c, client, descr, jsonDec, sss)
if err != nil {
log.Errorf(c, "error storing coverage for ns %s, date %s: %v",
descr.Namespace, descr.DateTo.String(), err)
diff --git a/dashboard/app/entities_spanner.go b/dashboard/app/entities_spanner.go
index ff154986e..e3f3bad9e 100644
--- a/dashboard/app/entities_spanner.go
+++ b/dashboard/app/entities_spanner.go
@@ -11,6 +11,7 @@ import (
"cloud.google.com/go/civil"
"cloud.google.com/go/spanner"
"github.com/google/syzkaller/pkg/coveragedb"
+ "github.com/google/syzkaller/pkg/coveragedb/spannerclient"
"google.golang.org/api/iterator"
)
@@ -25,7 +26,7 @@ type CoverageHistory struct {
// MergedCoverage uses dates, not time.
func MergedCoverage(ctx context.Context, ns, periodType string) (*CoverageHistory, error) {
projectID := os.Getenv("GOOGLE_CLOUD_PROJECT")
- client, err := coveragedb.NewClient(ctx, projectID)
+ client, err := spannerclient.NewClient(ctx, projectID)
if err != nil {
return nil, fmt.Errorf("spanner.NewClient() failed: %s", err.Error())
}
diff --git a/pkg/cover/heatmap.go b/pkg/cover/heatmap.go
index 0d7c93cd5..ef4f7ffab 100644
--- a/pkg/cover/heatmap.go
+++ b/pkg/cover/heatmap.go
@@ -14,6 +14,7 @@ import (
"cloud.google.com/go/spanner"
"github.com/google/syzkaller/pkg/coveragedb"
+ "github.com/google/syzkaller/pkg/coveragedb/spannerclient"
_ "github.com/google/syzkaller/pkg/subsystem/lists"
"golang.org/x/exp/maps"
"google.golang.org/api/iterator"
@@ -185,7 +186,7 @@ where
func filesCoverageWithDetails(ctx context.Context, projectID, ns, subsystem string, timePeriods []coveragedb.TimePeriod,
) ([]*fileCoverageWithDetails, error) {
- client, err := coveragedb.NewClient(ctx, projectID)
+ client, err := spannerclient.NewClient(ctx, projectID)
if err != nil {
return nil, fmt.Errorf("spanner.NewClient() failed: %s", err.Error())
}
diff --git a/pkg/coveragedb/spanner.go b/pkg/coveragedb/coveragedb.go
index e2a197e0c..0a6175ad5 100644
--- a/pkg/coveragedb/spanner.go
+++ b/pkg/coveragedb/coveragedb.go
@@ -4,9 +4,9 @@
package coveragedb
import (
- "bufio"
"context"
"encoding/json"
+ "errors"
"fmt"
"io"
"os"
@@ -15,6 +15,7 @@ import (
"cloud.google.com/go/civil"
"cloud.google.com/go/spanner"
+ "github.com/google/syzkaller/pkg/coveragedb/spannerclient"
"github.com/google/syzkaller/pkg/subsystem"
_ "github.com/google/syzkaller/pkg/subsystem/lists"
"github.com/google/uuid"
@@ -49,11 +50,6 @@ type HistoryRecord struct {
TotalRows int64
}
-func NewClient(ctx context.Context, projectID string) (*spanner.Client, error) {
- database := "projects/" + projectID + "/instances/syzbot/databases/coverage"
- return spanner.NewClient(ctx, database)
-}
-
type Coverage struct {
Instrumented int64
Covered int64
@@ -76,27 +72,26 @@ type MergedCoverageRecord struct {
FileData *Coverage
}
-func SaveMergeResult(ctx context.Context, projectID string, descr *HistoryRecord, jsonl io.Reader,
+func SaveMergeResult(ctx context.Context, client spannerclient.SpannerClient, descr *HistoryRecord, dec *json.Decoder,
sss []*subsystem.Subsystem) (int, error) {
- client, err := NewClient(ctx, projectID)
- if err != nil {
- return 0, fmt.Errorf("spanner.NewClient() failed: %s", err.Error())
- }
- defer client.Close()
var rowsCreated int
-
ssMatcher := subsystem.MakePathMatcher(sss)
ssCache := make(map[string][]string)
session := uuid.New().String()
mutations := []*spanner.Mutation{}
- jsonlScanner := bufio.NewScanner(jsonl)
-
- for jsonlScanner.Scan() {
- var mcr MergedCoverageRecord
- if err := json.Unmarshal([]byte(jsonlScanner.Text()), &mcr); err != nil {
- return rowsCreated, fmt.Errorf("json.Unmarshal(MergedCoverageRecord): %w", err)
+ var mcr MergedCoverageRecord
+ for {
+ err := dec.Decode(&mcr)
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return rowsCreated, fmt.Errorf("dec.Decode(MergedCoverageRecord): %w", err)
+ }
+ if mcr.FileData == nil {
+ return rowsCreated, errors.New("field MergedCoverageRecord.FileData can't be nil")
}
mutations = append(mutations, fileRecordMutation(session, &mcr))
subsystems := fileSubsystems(mcr.FilePath, ssMatcher, ssCache)
@@ -105,8 +100,8 @@ func SaveMergeResult(ctx context.Context, projectID string, descr *HistoryRecord
// This includes both explicit mutations of the fields (6 fields * 1k records = 6k mutations)
// and implicit index mutations.
// We keep the number of records low enough for the number of explicit mutations * 10 does not exceed the limit.
- if len(mutations) > 1000 {
- if _, err = client.Apply(ctx, mutations); err != nil {
+ if len(mutations) >= 1000 {
+ if _, err := client.Apply(ctx, mutations); err != nil {
return rowsCreated, fmt.Errorf("failed to spanner.Apply(inserts): %s", err.Error())
}
rowsCreated += len(mutations)
@@ -115,9 +110,10 @@ func SaveMergeResult(ctx context.Context, projectID string, descr *HistoryRecord
}
mutations = append(mutations, historyMutation(session, descr))
- if _, err = client.Apply(ctx, mutations); err != nil {
+ if _, err := client.Apply(ctx, mutations); err != nil {
return rowsCreated, fmt.Errorf("failed to spanner.Apply(inserts): %s", err.Error())
}
+ rowsCreated += len(mutations)
return rowsCreated, nil
}
@@ -150,7 +146,7 @@ where
func ReadLinesHitCount(ctx context.Context, ns, commit, file string, tp TimePeriod,
) (map[int]int, error) {
projectID := os.Getenv("GOOGLE_CLOUD_PROJECT")
- client, err := NewClient(ctx, projectID)
+ client, err := spannerclient.NewClient(ctx, projectID)
if err != nil {
return nil, fmt.Errorf("spanner.NewClient: %w", err)
}
@@ -236,7 +232,7 @@ func fileSubsystems(filePath string, ssMatcher *subsystem.PathMatcher, ssCache m
}
func NsDataMerged(ctx context.Context, projectID, ns string) ([]TimePeriod, []int64, error) {
- client, err := NewClient(ctx, projectID)
+ client, err := spannerclient.NewClient(ctx, projectID)
if err != nil {
return nil, nil, fmt.Errorf("spanner.NewClient() failed: %s", err.Error())
}
@@ -292,7 +288,7 @@ func NsDataMerged(ctx context.Context, projectID, ns string) ([]TimePeriod, []in
// Returns the number of orphaned file entries successfully deleted.
func DeleteGarbage(ctx context.Context) (int64, error) {
batchSize := 10_000
- client, err := NewClient(ctx, os.Getenv("GOOGLE_CLOUD_PROJECT"))
+ client, err := spannerclient.NewClient(ctx, os.Getenv("GOOGLE_CLOUD_PROJECT"))
if err != nil {
return 0, fmt.Errorf("coveragedb.NewClient: %w", err)
}
@@ -339,7 +335,7 @@ func DeleteGarbage(ctx context.Context) (int64, error) {
return totalDeleted.Load(), nil
}
-func goSpannerDelete(ctx context.Context, batch []spanner.Key, eg *errgroup.Group, client *spanner.Client,
+func goSpannerDelete(ctx context.Context, batch []spanner.Key, eg *errgroup.Group, client spannerclient.SpannerClient,
totalDeleted *atomic.Int64) {
ks := spanner.KeySetFromKeys(batch...)
ksSize := len(batch)
diff --git a/pkg/coveragedb/coveragedb_mock_test.go b/pkg/coveragedb/coveragedb_mock_test.go
new file mode 100644
index 000000000..8f20a43ed
--- /dev/null
+++ b/pkg/coveragedb/coveragedb_mock_test.go
@@ -0,0 +1,121 @@
+// Copyright 2024 syzkaller project authors. All rights reserved.
+// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
+
+package coveragedb
+
+import (
+ "context"
+ "encoding/json"
+ "io"
+ "strings"
+ "testing"
+ "time"
+
+ "cloud.google.com/go/spanner"
+ "github.com/google/syzkaller/pkg/coveragedb/mocks"
+ "github.com/google/syzkaller/pkg/subsystem"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/mock"
+)
+
+//go:generate ../../tools/mockery.sh --name SpannerClient -r
+
+type spannerMockTune func(*testing.T, *mocks.SpannerClient)
+
+func TestSaveMergeResult(t *testing.T) {
+ tests := []struct {
+ name string
+ sss []*subsystem.Subsystem
+ jsonl io.Reader
+ descr *HistoryRecord
+ mockTune spannerMockTune
+ wantErr bool
+ wantRows int
+ }{
+ {
+ name: "empty jsonl",
+ jsonl: strings.NewReader(`{}`),
+ wantErr: true,
+ },
+ {
+ name: "wrong jsonl content",
+ jsonl: strings.NewReader(`{a}`),
+ wantErr: true,
+ },
+ {
+ name: "1 record, Ok",
+ jsonl: strings.NewReader(`{"FileData":{}}`),
+ descr: &HistoryRecord{},
+ wantRows: 3, // 1 in files, 1 in file_subsystems and 1 in merge_history
+ mockTune: func(t *testing.T, m *mocks.SpannerClient) {
+ m.
+ On("Apply", mock.Anything, mock.Anything).
+ Return(time.Now(), nil).
+ Once()
+ },
+ },
+ {
+ name: "2 records, Ok",
+ jsonl: strings.NewReader(` {"FileData":{}}
+ {"FileData":{}}`),
+ descr: &HistoryRecord{},
+ wantRows: 5,
+ mockTune: func(t *testing.T, m *mocks.SpannerClient) {
+ m.
+ On("Apply",
+ mock.Anything,
+ mock.MatchedBy(func(ms []*spanner.Mutation) bool {
+ // 2 in files, 2 in file_subsystems and 1 in merge_history
+ return len(ms) == 5
+ })).
+ Return(time.Now(), nil).
+ Once()
+ },
+ },
+ {
+ name: "2k records, Ok",
+ jsonl: strings.NewReader(strings.Repeat("{\"FileData\":{}}\n", 2000)),
+ descr: &HistoryRecord{},
+ wantRows: 4001,
+ mockTune: func(t *testing.T, m *mocks.SpannerClient) {
+ m.
+ On("Apply",
+ mock.Anything,
+ mock.MatchedBy(func(ms []*spanner.Mutation) bool {
+ // 2k in files, 2k in file_subsystems
+ return len(ms) == 1000
+ })).
+ Return(time.Now(), nil).
+ Times(4).
+ On("Apply",
+ mock.Anything,
+ mock.MatchedBy(func(ms []*spanner.Mutation) bool {
+ // And 1 in merge_history.
+ return len(ms) == 1
+ })).
+ Return(time.Now(), nil).
+ Once()
+ },
+ },
+ }
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ t.Parallel()
+ spannerMock := mocks.NewSpannerClient(t)
+ if test.mockTune != nil {
+ test.mockTune(t, spannerMock)
+ }
+ gotRows, err := SaveMergeResult(
+ context.Background(),
+ spannerMock,
+ test.descr,
+ json.NewDecoder(test.jsonl), test.sss)
+ if test.wantErr {
+ assert.Error(t, err)
+ } else {
+ assert.NoError(t, err)
+ }
+ assert.Equal(t, test.wantRows, gotRows)
+ })
+ }
+}
diff --git a/pkg/coveragedb/mocks/SpannerClient.go b/pkg/coveragedb/mocks/SpannerClient.go
new file mode 100644
index 000000000..cb4fa1be3
--- /dev/null
+++ b/pkg/coveragedb/mocks/SpannerClient.go
@@ -0,0 +1,93 @@
+// Code generated by mockery v2.45.1. DO NOT EDIT.
+
+package mocks
+
+import (
+ context "context"
+
+ spanner "cloud.google.com/go/spanner"
+ mock "github.com/stretchr/testify/mock"
+
+ spannerclient "github.com/google/syzkaller/pkg/coveragedb/spannerclient"
+
+ time "time"
+)
+
+// SpannerClient is an autogenerated mock type for the SpannerClient type
+type SpannerClient struct {
+ mock.Mock
+}
+
+// Apply provides a mock function with given fields: ctx, ms, opts
+func (_m *SpannerClient) Apply(ctx context.Context, ms []*spanner.Mutation, opts ...spanner.ApplyOption) (time.Time, error) {
+ _va := make([]interface{}, len(opts))
+ for _i := range opts {
+ _va[_i] = opts[_i]
+ }
+ var _ca []interface{}
+ _ca = append(_ca, ctx, ms)
+ _ca = append(_ca, _va...)
+ ret := _m.Called(_ca...)
+
+ if len(ret) == 0 {
+ panic("no return value specified for Apply")
+ }
+
+ var r0 time.Time
+ var r1 error
+ if rf, ok := ret.Get(0).(func(context.Context, []*spanner.Mutation, ...spanner.ApplyOption) (time.Time, error)); ok {
+ return rf(ctx, ms, opts...)
+ }
+ if rf, ok := ret.Get(0).(func(context.Context, []*spanner.Mutation, ...spanner.ApplyOption) time.Time); ok {
+ r0 = rf(ctx, ms, opts...)
+ } else {
+ r0 = ret.Get(0).(time.Time)
+ }
+
+ if rf, ok := ret.Get(1).(func(context.Context, []*spanner.Mutation, ...spanner.ApplyOption) error); ok {
+ r1 = rf(ctx, ms, opts...)
+ } else {
+ r1 = ret.Error(1)
+ }
+
+ return r0, r1
+}
+
+// Close provides a mock function with given fields:
+func (_m *SpannerClient) Close() {
+ _m.Called()
+}
+
+// Single provides a mock function with given fields:
+func (_m *SpannerClient) Single() spannerclient.ReadOnlyTransaction {
+ ret := _m.Called()
+
+ if len(ret) == 0 {
+ panic("no return value specified for Single")
+ }
+
+ var r0 spannerclient.ReadOnlyTransaction
+ if rf, ok := ret.Get(0).(func() spannerclient.ReadOnlyTransaction); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(spannerclient.ReadOnlyTransaction)
+ }
+ }
+
+ return r0
+}
+
+// NewSpannerClient creates a new instance of SpannerClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
+// The first argument is typically a *testing.T value.
+func NewSpannerClient(t interface {
+ mock.TestingT
+ Cleanup(func())
+}) *SpannerClient {
+ mock := &SpannerClient{}
+ mock.Mock.Test(t)
+
+ t.Cleanup(func() { mock.AssertExpectations(t) })
+
+ return mock
+}
diff --git a/pkg/coveragedb/spannerclient/spanner_client.go b/pkg/coveragedb/spannerclient/spanner_client.go
new file mode 100644
index 000000000..4f655c6ed
--- /dev/null
+++ b/pkg/coveragedb/spannerclient/spanner_client.go
@@ -0,0 +1,79 @@
+// Copyright 2024 syzkaller project authors. All rights reserved.
+// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
+
+package spannerclient
+
+import (
+ "context"
+ "time"
+
+ "cloud.google.com/go/spanner"
+)
+
+type SpannerClient interface {
+ Close()
+ Apply(ctx context.Context, ms []*spanner.Mutation, opts ...spanner.ApplyOption) (commitTimestamp time.Time, err error)
+ Single() ReadOnlyTransaction
+}
+
+type ReadOnlyTransaction interface {
+ Query(ctx context.Context, statement spanner.Statement) RowIterator
+}
+
+type RowIterator interface {
+ Next() (Row, error)
+ Stop()
+}
+
+type Row interface {
+ ToStruct(p interface{}) error
+}
+
+type SpannerClientProxy struct {
+ client *spanner.Client
+}
+
+func (proxy *SpannerClientProxy) Close() {
+ proxy.client.Close()
+}
+
+func (proxy *SpannerClientProxy) Apply(ctx context.Context, ms []*spanner.Mutation, opts ...spanner.ApplyOption,
+) (commitTimestamp time.Time, err error) {
+ return proxy.client.Apply(ctx, ms, opts...)
+}
+
+func (proxy *SpannerClientProxy) Single() ReadOnlyTransaction {
+ return &SpannerReadOnlyTransactionProxy{
+ readOnlyTransaction: proxy.client.Single(),
+ }
+}
+
+type SpannerReadOnlyTransactionProxy struct {
+ readOnlyTransaction *spanner.ReadOnlyTransaction
+}
+
+func (proxy *SpannerReadOnlyTransactionProxy) Query(ctx context.Context, statement spanner.Statement) RowIterator {
+ return &SpannerRowIteratorProxy{
+ rowIterator: proxy.readOnlyTransaction.Query(ctx, statement),
+ }
+}
+
+type SpannerRowIteratorProxy struct {
+ rowIterator *spanner.RowIterator
+}
+
+func (proxy *SpannerRowIteratorProxy) Next() (Row, error) {
+ return proxy.rowIterator.Next()
+}
+
+func (proxy *SpannerRowIteratorProxy) Stop() {
+ proxy.rowIterator.Stop()
+}
+
+func NewClient(ctx context.Context, projectID string) (SpannerClient, error) {
+ database := "projects/" + projectID + "/instances/syzbot/databases/coverage"
+ client, err := spanner.NewClient(ctx, database)
+ return &SpannerClientProxy{
+ client: client,
+ }, err
+}
diff --git a/pkg/covermerger/covermerger.go b/pkg/covermerger/covermerger.go
index 0f6ff43e4..839a85a8e 100644
--- a/pkg/covermerger/covermerger.go
+++ b/pkg/covermerger/covermerger.go
@@ -4,13 +4,17 @@
package covermerger
import (
+ "compress/gzip"
"context"
"encoding/csv"
+ "encoding/json"
"errors"
"fmt"
"io"
+ "slices"
"strconv"
+ "github.com/google/syzkaller/pkg/coveragedb"
"github.com/google/syzkaller/pkg/log"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"
@@ -50,6 +54,94 @@ type FileCoverageMerger interface {
Result() *MergeResult
}
+// MergeCSVWriteJSONL mergers input CSV and generates JSONL records.
+// The amount of lines generated is [count(managers)+1] * [count(kernel_files)].
+// Returns (totalInstrumentedLines, totalCoveredLines, error).
+func MergeCSVWriteJSONL(config *Config, descr *coveragedb.HistoryRecord, csvReader io.Reader, w io.Writer,
+) (int, int, error) {
+ eg := errgroup.Group{}
+ mergeResults := make(chan *FileMergeResult)
+ eg.Go(func() error {
+ defer close(mergeResults)
+ if err := MergeCSVData(config, csvReader, mergeResults); err != nil {
+ return fmt.Errorf("covermerger.MergeCSVData: %w", err)
+ }
+ return nil
+ })
+ var totalInstrumentedLines, totalCoveredLines int
+ eg.Go(func() error {
+ var encoder *json.Encoder
+ if w != nil {
+ gzw := gzip.NewWriter(w)
+ defer gzw.Close()
+ encoder = json.NewEncoder(gzw)
+ }
+ if encoder != nil {
+ if err := encoder.Encode(descr); err != nil {
+ return fmt.Errorf("encoder.Encode(MergedCoverageDescription): %w", err)
+ }
+ }
+ for fileMergeResult := range mergeResults {
+ dashCoverageRecords := mergedCoverageRecords(fileMergeResult)
+ if encoder != nil {
+ for _, record := range dashCoverageRecords {
+ if err := encoder.Encode(record); err != nil {
+ return fmt.Errorf("encoder.Encode(MergedCoverageRecord): %w", err)
+ }
+ }
+ }
+ for _, hitCount := range fileMergeResult.HitCounts {
+ totalInstrumentedLines++
+ if hitCount > 0 {
+ totalCoveredLines++
+ }
+ }
+ }
+ return nil
+ })
+ if err := eg.Wait(); err != nil {
+ return 0, 0, fmt.Errorf("eg.Wait: %w", err)
+ }
+ return totalInstrumentedLines, totalCoveredLines, nil
+}
+
+const allManagers = "*"
+
+func mergedCoverageRecords(fmr *FileMergeResult) []*coveragedb.MergedCoverageRecord {
+ if !fmr.FileExists {
+ return nil
+ }
+ lines := maps.Keys(fmr.HitCounts)
+ slices.Sort(lines)
+ mgrStat := make(map[string]*coveragedb.Coverage)
+ mgrStat[allManagers] = &coveragedb.Coverage{}
+
+ for _, line := range lines {
+ mgrStat[allManagers].AddLineHitCount(line, fmr.HitCounts[line])
+ managerHitCounts := map[string]int{}
+ for _, lineDetail := range fmr.LineDetails[line] {
+ manager := lineDetail.Manager
+ managerHitCounts[manager] += lineDetail.HitCount
+ }
+ for manager, managerHitCount := range managerHitCounts {
+ if _, ok := mgrStat[manager]; !ok {
+ mgrStat[manager] = &coveragedb.Coverage{}
+ }
+ mgrStat[manager].AddLineHitCount(line, managerHitCount)
+ }
+ }
+
+ res := []*coveragedb.MergedCoverageRecord{}
+ for managerName, managerCoverage := range mgrStat {
+ res = append(res, &coveragedb.MergedCoverageRecord{
+ Manager: managerName,
+ FilePath: fmr.FilePath,
+ FileData: managerCoverage,
+ })
+ }
+ return res
+}
+
func batchFileData(c *Config, targetFilePath string, records []*FileRecord) (*MergeResult, error) {
log.Logf(1, "processing %d records for %s", len(records), targetFilePath)
repoCommitsMap := make(map[RepoCommit]bool)
diff --git a/pkg/covermerger/covermerger_test.go b/pkg/covermerger/covermerger_test.go
index 4580c114f..29fee83a6 100644
--- a/pkg/covermerger/covermerger_test.go
+++ b/pkg/covermerger/covermerger_test.go
@@ -4,18 +4,76 @@
package covermerger
import (
+ "compress/gzip"
+ "context"
"encoding/json"
+ "io"
"os"
"path/filepath"
"strings"
"testing"
+ "time"
+ "cloud.google.com/go/civil"
+ "cloud.google.com/go/spanner"
+ "github.com/google/syzkaller/pkg/coveragedb"
+ "github.com/google/syzkaller/pkg/coveragedb/mocks"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/mock"
+ "golang.org/x/sync/errgroup"
)
+var testsPath = "testdata/integration"
+var defaultTestWorkdir = testsPath + "/all/test-workdir-covermerger"
+
+func TestMergeCSVWriteJSONL_and_coveragedb_SaveMergeResult(t *testing.T) {
+ rc, wc := io.Pipe()
+ eg := errgroup.Group{}
+ eg.Go(func() error {
+ defer wc.Close()
+ totalInstrumented, totalCovered, err := MergeCSVWriteJSONL(
+ testConfig(
+ "git://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git",
+ "fe46a7dd189e25604716c03576d05ac8a5209743",
+ testsPath+"/aesni-intel_glue/test-workdir-covermerger"),
+ &coveragedb.HistoryRecord{
+ DateTo: civil.DateOf(time.Now()),
+ },
+ strings.NewReader(readFileOrFail(t, testsPath+"/aesni-intel_glue/bqTable.txt")),
+ wc)
+ assert.Equal(t, 48, totalInstrumented)
+ assert.Equal(t, 45, totalCovered)
+ return err
+ })
+ eg.Go(func() error {
+ defer rc.Close()
+ gzrc, err := gzip.NewReader(rc)
+ assert.NoError(t, err)
+ defer gzrc.Close()
+
+ spannerMock := mocks.NewSpannerClient(t)
+ spannerMock.
+ On("Apply", mock.Anything, mock.MatchedBy(func(ms []*spanner.Mutation) bool {
+ // 1 file * (5 managers + 1 manager total) x 2 (to update files and subsystems) + 1 merge_history
+ return len(ms) == 13
+ })).
+ Return(time.Now(), nil).
+ Once()
+
+ decoder := json.NewDecoder(gzrc)
+ decoder.DisallowUnknownFields()
+
+ descr := new(coveragedb.HistoryRecord)
+ assert.NoError(t, decoder.Decode(descr))
+
+ _, err = coveragedb.SaveMergeResult(context.Background(), spannerMock, descr, decoder, nil)
+ return err
+ })
+ assert.NoError(t, eg.Wait())
+}
+
// nolint: lll
func TestAggregateStreamData(t *testing.T) {
- testsPath := "testdata/integration"
type Test struct {
name string
workdir string
@@ -36,7 +94,7 @@ func TestAggregateStreamData(t *testing.T) {
},
{
name: "code deleted",
- workdir: testsPath + "/all/test-workdir-covermerger",
+ workdir: defaultTestWorkdir,
bqTable: `timestamp,version,fuzzing_minutes,arch,build_id,manager,kernel_repo,kernel_branch,kernel_commit,file_path,func_name,sl,sc,el,ec,hit_count,inline,pc
samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit1,delete_code.c,func1,2,0,2,-1,1,true,1`,
simpleAggregation: `{
@@ -53,7 +111,7 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit1,delete_code.c,func1,2,
},
{
name: "file deleted",
- workdir: testsPath + "/all/test-workdir-covermerger",
+ workdir: defaultTestWorkdir,
bqTable: `timestamp,version,fuzzing_minutes,arch,build_id,manager,kernel_repo,kernel_branch,kernel_commit,file_path,func_name,sl,sc,el,ec,hit_count,inline,pc
samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit1,delete_file.c,func1,2,0,2,-1,1,true,1`,
simpleAggregation: `{
@@ -68,7 +126,7 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit1,delete_file.c,func1,2,
},
{
name: "covered line changed",
- workdir: testsPath + "/all/test-workdir-covermerger",
+ workdir: defaultTestWorkdir,
bqTable: `timestamp,version,fuzzing_minutes,arch,build_id,manager,kernel_repo,kernel_branch,kernel_commit,file_path,func_name,sl,sc,el,ec,hit_count,inline,pc
samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit1,change_line.c,func1,2,0,2,-1,1,true,1
samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit1,change_line.c,func1,3,0,3,-1,1,true,1`,
@@ -99,7 +157,7 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit1,change_line.c,func1,3,
},
{
name: "add line",
- workdir: testsPath + "/all/test-workdir-covermerger",
+ workdir: defaultTestWorkdir,
bqTable: `timestamp,version,fuzzing_minutes,arch,build_id,manager,kernel_repo,kernel_branch,kernel_commit,file_path,func_name,sl,sc,el,ec,hit_count,inline,pc
samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit1,add_line.c,func1,2,0,2,-1,1,true,1`,
simpleAggregation: `{
@@ -129,7 +187,7 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit1,add_line.c,func1,2,0,2
},
{
name: "instrumented lines w/o coverage are reported",
- workdir: testsPath + "/all/test-workdir-covermerger",
+ workdir: defaultTestWorkdir,
bqTable: `timestamp,version,fuzzing_minutes,arch,build_id,manager,kernel_repo,kernel_branch,kernel_commit,file_path,func_name,sl,sc,el,ec,hit_count,inline,pc
samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit1,not_changed.c,func1,3,0,3,-1,0,true,1
samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4,0,4,-1,0,true,1`,
@@ -172,34 +230,27 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4,
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
- ch := make(chan *FileMergeResult)
- aggregation := make(map[string]*MergeResult)
+ mergeResultsCh := make(chan *FileMergeResult)
+ doneCh := make(chan bool)
go func() {
- for fmr := range ch {
+ aggregation := make(map[string]*MergeResult)
+ for fmr := range mergeResultsCh {
aggregation[fmr.FilePath] = fmr.MergeResult
}
+ if !test.checkDetails {
+ ignoreLineDetailsInTest(aggregation)
+ }
+ var expectedAggregation map[string]*MergeResult
+ assert.NoError(t, json.Unmarshal([]byte(test.simpleAggregation), &expectedAggregation))
+ assert.Equal(t, expectedAggregation, aggregation)
+ doneCh <- true
}()
- err := MergeCSVData(
- &Config{
- Jobs: 2,
- skipRepoClone: true,
- Base: RepoCommit{
- Repo: test.baseRepo,
- Commit: test.baseCommit,
- },
- FileVersProvider: &fileVersProviderMock{Workdir: test.workdir},
- },
+ assert.NoError(t, MergeCSVData(
+ testConfig(test.baseRepo, test.baseCommit, test.workdir),
strings.NewReader(test.bqTable),
- ch,
- )
-
- assert.Nil(t, err)
- var expectedAggregation map[string]*MergeResult
- assert.Nil(t, json.Unmarshal([]byte(test.simpleAggregation), &expectedAggregation))
- if !test.checkDetails {
- ignoreLineDetailsInTest(aggregation)
- }
- assert.Equal(t, expectedAggregation, aggregation)
+ mergeResultsCh))
+ close(mergeResultsCh)
+ <-doneCh
})
}
}
@@ -233,3 +284,15 @@ func readFileOrFail(t *testing.T, path string) string {
assert.Nil(t, err)
return string(content)
}
+
+func testConfig(repo, commit, workdir string) *Config {
+ return &Config{
+ Jobs: 2,
+ skipRepoClone: true,
+ Base: RepoCommit{
+ Repo: repo,
+ Commit: commit,
+ },
+ FileVersProvider: &fileVersProviderMock{Workdir: workdir},
+ }
+}
diff --git a/tools/syz-covermerger/syz_covermerger.go b/tools/syz-covermerger/syz_covermerger.go
index a58ef5d8a..9814f6ce7 100644
--- a/tools/syz-covermerger/syz_covermerger.go
+++ b/tools/syz-covermerger/syz_covermerger.go
@@ -4,14 +4,11 @@
package main
import (
- "compress/gzip"
"context"
- "encoding/json"
"flag"
"fmt"
"io"
"runtime"
- "slices"
"cloud.google.com/go/civil"
"github.com/google/syzkaller/dashboard/dashapi"
@@ -20,8 +17,6 @@ import (
"github.com/google/syzkaller/pkg/gcs"
"github.com/google/syzkaller/pkg/log"
_ "github.com/google/syzkaller/pkg/subsystem/lists"
- "golang.org/x/exp/maps"
- "golang.org/x/sync/errgroup"
)
var (
@@ -109,57 +104,28 @@ func do() error {
return fmt.Errorf("gcsClient.FileWriter: %w", err)
}
}
- eg := errgroup.Group{}
- mergeResults := make(chan *covermerger.FileMergeResult)
- eg.Go(func() error {
- defer close(mergeResults)
- if err := covermerger.MergeCSVData(config, csvReader, mergeResults); err != nil {
- return fmt.Errorf("covermerger.MergeCSVData: %w", err)
- }
- return nil
- })
- var totalInstrumentedLines, totalCoveredLines int
- eg.Go(func() error {
- encoder := json.NewEncoder(gzip.NewWriter(wc))
- if encoder != nil {
- if err := encoder.Encode(&coveragedb.HistoryRecord{
- Namespace: *flagNamespace,
- Repo: *flagRepo,
- Commit: *flagCommit,
- Duration: *flagDuration,
- DateTo: dateTo,
- TotalRows: *flagTotalRows,
- }); err != nil {
- return fmt.Errorf("encoder.Encode(MergedCoverageDescription): %w", err)
- }
- }
- for fileMergeResult := range mergeResults {
- dashCoverageRecords := mergedCoverageRecords(fileMergeResult)
- if encoder != nil {
- for _, record := range dashCoverageRecords {
- if err := encoder.Encode(record); err != nil {
- return fmt.Errorf("encoder.Encode(MergedCoverageRecord): %w", err)
- }
- }
- }
- for _, hitCount := range fileMergeResult.HitCounts {
- totalInstrumentedLines++
- if hitCount > 0 {
- totalCoveredLines++
- }
- }
- }
- if err := wc.Close(); err != nil {
- return fmt.Errorf("wc.Close: %w", err)
- }
- return nil
- })
- if err := eg.Wait(); err != nil {
- return fmt.Errorf("eg.Wait: %w", err)
+ totalInstrumentedLines, totalCoveredLines, err := covermerger.MergeCSVWriteJSONL(
+ config,
+ &coveragedb.HistoryRecord{
+ Namespace: *flagNamespace,
+ Repo: *flagRepo,
+ Commit: *flagCommit,
+ Duration: *flagDuration,
+ DateTo: dateTo,
+ TotalRows: *flagTotalRows,
+ },
+ csvReader,
+ wc)
+ if err != nil {
+ return fmt.Errorf("covermerger.MergeCSVWriteJSONL: %w", err)
+ }
+ if err := wc.Close(); err != nil {
+ return fmt.Errorf("wc.Close: %w", err)
}
printCoverage(totalInstrumentedLines, totalCoveredLines)
if *flagToDashAPI != "" {
+ // Merging may take hours. It is better to create new connection instead of reuse.
dash, err := dashapi.New(*flagDashboardClientName, *flagToDashAPI, "")
if err != nil {
return fmt.Errorf("dashapi.New: %w", err)
@@ -181,41 +147,3 @@ func printCoverage(instrumented, covered int) {
fmt.Printf("total instrumented(%d), covered(%d), %.2f%%\n",
instrumented, covered, coverage*100)
}
-
-const allManagers = "*"
-
-func mergedCoverageRecords(fmr *covermerger.FileMergeResult,
-) []*coveragedb.MergedCoverageRecord {
- if !fmr.FileExists {
- return nil
- }
- lines := maps.Keys(fmr.HitCounts)
- slices.Sort(lines)
- mgrStat := make(map[string]*coveragedb.Coverage)
- mgrStat[allManagers] = &coveragedb.Coverage{}
-
- for _, line := range lines {
- mgrStat[allManagers].AddLineHitCount(line, fmr.HitCounts[line])
- managerHitCounts := map[string]int{}
- for _, lineDetail := range fmr.LineDetails[line] {
- manager := lineDetail.Manager
- managerHitCounts[manager] += lineDetail.HitCount
- }
- for manager, managerHitCount := range managerHitCounts {
- if _, ok := mgrStat[manager]; !ok {
- mgrStat[manager] = &coveragedb.Coverage{}
- }
- mgrStat[manager].AddLineHitCount(line, managerHitCount)
- }
- }
-
- res := []*coveragedb.MergedCoverageRecord{}
- for managerName, managerCoverage := range mgrStat {
- res = append(res, &coveragedb.MergedCoverageRecord{
- Manager: managerName,
- FilePath: fmr.FilePath,
- FileData: managerCoverage,
- })
- }
- return res
-}