aboutsummaryrefslogtreecommitdiffstats
path: root/pkg
diff options
context:
space:
mode:
authorTaras Madan <tarasmadan@google.com>2024-06-06 18:00:09 +0200
committerTaras Madan <tarasmadan@google.com>2024-06-26 15:04:16 +0000
commit5c045c049cdc7a5c511f4b387909ba2729843dce (patch)
treecef9b68368d58a5d58813108b1dbc5ffed75d996 /pkg
parent6271cc7c97723fed48c6a953662320a0effe1b19 (diff)
covermerger: add more control options and spanner integration
Diffstat (limited to 'pkg')
-rw-r--r--pkg/covermerger/covermerger.go164
-rw-r--r--pkg/covermerger/covermerger_test.go34
-rw-r--r--pkg/covermerger/deleted_file_merger.go2
-rw-r--r--pkg/covermerger/file_line_merger.go41
-rw-r--r--pkg/covermerger/repos.go86
5 files changed, 221 insertions, 106 deletions
diff --git a/pkg/covermerger/covermerger.go b/pkg/covermerger/covermerger.go
index aa47d83b1..b5bc8cbdb 100644
--- a/pkg/covermerger/covermerger.go
+++ b/pkg/covermerger/covermerger.go
@@ -7,22 +7,19 @@ import (
"encoding/csv"
"fmt"
"io"
+ "log"
"strconv"
"golang.org/x/exp/maps"
)
const (
- keyKernelRepo = "kernel_repo"
- keyKernelBranch = "kernel_branch"
- keyKernelCommit = "kernel_commit"
- keyFilePath = "file_path"
- keyStartLine = "sl"
- keyStartCol = "sc"
- keyEndLine = "el"
- keyEndCol = "ec"
- keyHitCount = "hit_count"
- keyArch = "arch"
+ KeyKernelRepo = "kernel_repo"
+ KeyKernelBranch = "kernel_branch"
+ KeyKernelCommit = "kernel_commit"
+ KeyFilePath = "file_path"
+ KeyStartLine = "sl"
+ KeyHitCount = "hit_count"
)
type FileRecord map[string]string
@@ -35,9 +32,9 @@ type RepoBranchCommit struct {
func (fr FileRecord) RepoBranchCommit() RepoBranchCommit {
return RepoBranchCommit{
- fr[keyKernelRepo],
- fr[keyKernelBranch],
- fr[keyKernelCommit],
+ fr[KeyKernelRepo],
+ fr[KeyKernelBranch],
+ fr[KeyKernelCommit],
}
}
@@ -48,65 +45,69 @@ type Frame struct {
EndCol int
}
-func (fr FileRecord) Frame() Frame {
- f := Frame{}
+func (fr FileRecord) Frame() (*Frame, error) {
+ f := &Frame{}
var err error
- if f.StartCol, err = strconv.Atoi(fr[keyStartCol]); err != nil {
- panic(fmt.Sprintf("failed to Atoi(%s)", fr[keyStartCol]))
+ if f.StartLine, err = strconv.Atoi(fr[KeyStartLine]); err != nil {
+ return nil, fmt.Errorf("failed to Atoi(%s): %w", fr[KeyStartLine], err)
}
- if f.StartLine, err = strconv.Atoi(fr[keyStartLine]); err != nil {
- panic(fmt.Sprintf("failed to Atoi(%s)", fr[keyStartLine]))
- }
- if f.EndCol, err = strconv.Atoi(fr[keyEndCol]); err != nil {
- panic(fmt.Sprintf("failed to Atoi(%s)", fr[keyEndCol]))
- }
- if f.EndLine, err = strconv.Atoi(fr[keyEndLine]); err != nil {
- panic(fmt.Sprintf("failed to Atoi(%s)", fr[keyEndLine]))
- }
- return f
+ return f, nil
}
-func (fr FileRecord) HitCount() int {
- if hitCount, err := strconv.Atoi(fr[keyHitCount]); err != nil {
- panic(fmt.Sprintf("failed to Atoi(%s)", fr[keyHitCount]))
+func (fr FileRecord) HitCount() (int, error) {
+ if hitCount, err := strconv.Atoi(fr[KeyHitCount]); err != nil {
+ return 0, fmt.Errorf("failed to Atoi(%s): %w", fr[KeyHitCount], err)
} else {
- return hitCount
+ return hitCount, nil
}
}
-func (fr FileRecord) Arch() string {
- return fr[keyArch]
-}
-
type MergeResult struct {
HitCounts map[int]int
FileExists bool
+ LostFrames map[RepoBranchCommit]int64
}
type FileCoverageMerger interface {
- AddRecord(rbc RepoBranchCommit, arch string, f Frame, hitCount int)
+ AddRecord(rbc RepoBranchCommit, f *Frame, hitCount int)
Result() *MergeResult
}
-func batchFileData(c *Config, targetFilePath string, records FileRecords, base RepoBranchCommit,
+func batchFileData(c *Config, targetFilePath string, records FileRecords, processedFiles map[string]struct{},
) (*MergeResult, error) {
+ log.Printf("processing %d records for %s", len(records), targetFilePath)
+ if _, exists := processedFiles[targetFilePath]; exists {
+ return nil, fmt.Errorf("file was already processed, check the input ordering")
+ }
+ processedFiles[targetFilePath] = struct{}{}
repoBranchCommitsMap := make(map[RepoBranchCommit]bool)
for _, record := range records {
repoBranchCommitsMap[record.RepoBranchCommit()] = true
}
- repoBranchCommitsMap[base] = true
+ repoBranchCommitsMap[c.Base] = true
repoBranchCommits := maps.Keys(repoBranchCommitsMap)
- fileVersions, err := getFileVersions(c, targetFilePath, repoBranchCommits)
+ getFiles := getFileVersions
+ if c.getFileVersionsMock != nil {
+ getFiles = c.getFileVersionsMock
+ }
+ fvs, err := getFiles(c, targetFilePath, repoBranchCommits)
if err != nil {
return nil, fmt.Errorf("failed to getFileVersions: %w", err)
}
- merger := makeFileLineCoverMerger(fileVersions, base)
+ merger := makeFileLineCoverMerger(fvs, c.Base)
for _, record := range records {
+ var f *Frame
+ if f, err = record.Frame(); err != nil {
+ return nil, fmt.Errorf("error parsing records: %w", err)
+ }
+ var hitCount int
+ if hitCount, err = record.HitCount(); err != nil {
+ return nil, fmt.Errorf("error parsing records: %w", err)
+ }
merger.AddRecord(
record.RepoBranchCommit(),
- record.Arch(),
- record.Frame(),
- record.HitCount())
+ f,
+ hitCount)
}
return merger.Result(), nil
}
@@ -124,37 +125,76 @@ func makeRecord(fields, schema []string) FileRecord {
}
type Config struct {
- Workdir string
- skipRepoClone bool
+ Workdir string
+ skipRepoClone bool
+ Base RepoBranchCommit
+ getFileVersionsMock func(*Config, string, []RepoBranchCommit) (fileVersions, error)
+ repoCache repoCache
}
-func AggregateStreamData(c *Config, stream io.Reader, base RepoBranchCommit,
-) (map[string]*MergeResult, error) {
- stat := make(map[string]*MergeResult)
+func isSchema(fields, schema []string) bool {
+ if len(fields) != len(schema) {
+ return false
+ }
+ for i := 0; i < len(fields); i++ {
+ if fields[i] != schema[i] {
+ return false
+ }
+ }
+ return true
+}
+
+func MergeCSVData(config *Config, reader io.Reader) (map[string]*MergeResult, error) {
var schema []string
- targetFile := ""
- var records FileRecords
- csvReader := csv.NewReader(stream)
+ csvReader := csv.NewReader(reader)
if fields, err := csvReader.Read(); err != nil {
return nil, fmt.Errorf("failed to read schema: %w", err)
} else {
schema = fields
}
- for {
- fields, err := csvReader.Read()
- if err == io.EOF {
- break
- }
- if err != nil {
- return nil, fmt.Errorf("failed to read CSV line: %w", err)
+ errStdinReadChan := make(chan error, 1)
+ recordsChan := make(chan FileRecord)
+ go func() {
+ defer close(recordsChan)
+ for {
+ fields, err := csvReader.Read()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ errStdinReadChan <- fmt.Errorf("failed to read CSV line: %w", err)
+ return
+ }
+ if isSchema(fields, schema) {
+ // The input may be the merged CVS files with multiple schemas.
+ continue
+ }
+ recordsChan <- makeRecord(fields, schema)
}
- record := makeRecord(fields, schema)
- curTargetFile := record[keyFilePath]
+ errStdinReadChan <- nil
+ }()
+ mergeResult, errMerging := mergeChanData(config, recordsChan)
+ errStdinRead := <-errStdinReadChan
+ if errMerging != nil || errStdinRead != nil {
+ return nil, fmt.Errorf("errors merging stdin data:\nmerger err: %w\nstdin reader err: %w",
+ errMerging, errStdinRead)
+ }
+ return mergeResult, nil
+}
+
+func mergeChanData(c *Config, recordsChan <-chan FileRecord) (map[string]*MergeResult, error) {
+ stat := make(map[string]*MergeResult)
+ targetFile := ""
+ var records []FileRecord
+ processedFiles := map[string]struct{}{}
+ for record := range recordsChan {
+ curTargetFile := record[KeyFilePath]
if targetFile == "" {
targetFile = curTargetFile
}
if curTargetFile != targetFile {
- if stat[targetFile], err = batchFileData(c, targetFile, records, base); err != nil {
+ var err error
+ if stat[targetFile], err = batchFileData(c, targetFile, records, processedFiles); err != nil {
return nil, fmt.Errorf("failed to batchFileData(%s): %w", targetFile, err)
}
records = nil
@@ -164,7 +204,7 @@ func AggregateStreamData(c *Config, stream io.Reader, base RepoBranchCommit,
}
if records != nil {
var err error
- if stat[targetFile], err = batchFileData(c, targetFile, records, base); err != nil {
+ if stat[targetFile], err = batchFileData(c, targetFile, records, processedFiles); err != nil {
return nil, fmt.Errorf("failed to batchFileData(%s): %w", targetFile, err)
}
}
diff --git a/pkg/covermerger/covermerger_test.go b/pkg/covermerger/covermerger_test.go
index 825402aa1..5d99e35df 100644
--- a/pkg/covermerger/covermerger_test.go
+++ b/pkg/covermerger/covermerger_test.go
@@ -119,25 +119,41 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4,
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
- aggregation, err := AggregateStreamData(
+ aggregation, err := MergeCSVData(
&Config{
Workdir: test.workdir,
skipRepoClone: true,
+ Base: RepoBranchCommit{
+ Repo: test.baseRepo,
+ Branch: test.baseBranch,
+ Commit: test.baseCommit,
+ },
+ getFileVersionsMock: mockGetFileVersions,
},
strings.NewReader(test.bqTable),
- RepoBranchCommit{
- Repo: test.baseRepo,
- Branch: test.baseBranch,
- Commit: test.baseCommit,
- })
+ )
assert.Nil(t, err)
- var simpleAggregationJSON map[string]*MergeResult
- assert.Nil(t, json.Unmarshal([]byte(test.simpleAggregation), &simpleAggregationJSON))
- assert.Equal(t, simpleAggregationJSON, aggregation)
+ var expectedAggregation map[string]*MergeResult
+ assert.Nil(t, json.Unmarshal([]byte(test.simpleAggregation), &expectedAggregation))
+ assert.Equal(t, expectedAggregation, aggregation)
})
}
}
+func mockGetFileVersions(c *Config, targetFilePath string, rbcs []RepoBranchCommit,
+) (fileVersions, error) {
+ res := make(fileVersions)
+ for _, rbc := range rbcs {
+ filePath := c.Workdir + "/repos/" + rbc.Commit + "/" + targetFilePath
+ if bytes, err := os.ReadFile(filePath); err == nil {
+ res[rbc] = fileVersion{
+ content: string(bytes),
+ }
+ }
+ }
+ return res, nil
+}
+
func readFileOrFail(t *testing.T, path string) string {
absPath, err := filepath.Abs(path)
assert.Nil(t, err)
diff --git a/pkg/covermerger/deleted_file_merger.go b/pkg/covermerger/deleted_file_merger.go
index 7313c8dae..32513ecb1 100644
--- a/pkg/covermerger/deleted_file_merger.go
+++ b/pkg/covermerger/deleted_file_merger.go
@@ -6,7 +6,7 @@ package covermerger
type DeletedFileLineMerger struct {
}
-func (a *DeletedFileLineMerger) AddRecord(RepoBranchCommit, string, Frame, int) {
+func (a *DeletedFileLineMerger) AddRecord(RepoBranchCommit, *Frame, int) {
}
func (a *DeletedFileLineMerger) Result() *MergeResult {
diff --git a/pkg/covermerger/file_line_merger.go b/pkg/covermerger/file_line_merger.go
index 2cd2eea78..29fd4200b 100644
--- a/pkg/covermerger/file_line_merger.go
+++ b/pkg/covermerger/file_line_merger.go
@@ -4,12 +4,12 @@
package covermerger
func makeFileLineCoverMerger(
- fileVersions map[RepoBranchCommit]string, base RepoBranchCommit) FileCoverageMerger {
+ fvs fileVersions, base RepoBranchCommit) FileCoverageMerger {
baseFile := ""
baseFileExists := false
- for rbc, fileContent := range fileVersions {
+ for rbc, fv := range fvs {
if rbc == base {
- baseFile = fileContent
+ baseFile = fv.content
baseFileExists = true
break
}
@@ -18,33 +18,46 @@ func makeFileLineCoverMerger(
return &DeletedFileLineMerger{}
}
a := &FileLineCoverMerger{
- rbcToFile: fileVersions,
- baseFile: baseFile,
- hitCounts: make(map[int]int),
- matchers: make(map[RepoBranchCommit]*LineToLineMatcher),
+ rbcToFile: fvs,
+ baseFile: baseFile,
+ hitCounts: make(map[int]int),
+ matchers: make(map[RepoBranchCommit]*LineToLineMatcher),
+ lostFrames: map[RepoBranchCommit]int64{},
}
- for rbc, fileVersion := range fileVersions {
- a.matchers[rbc] = makeLineToLineMatcher(fileVersion, baseFile)
+ for rbc, fv := range fvs {
+ a.matchers[rbc] = makeLineToLineMatcher(fv.content, baseFile)
}
return a
}
type FileLineCoverMerger struct {
- rbcToFile map[RepoBranchCommit]string
- baseFile string
- hitCounts map[int]int
- matchers map[RepoBranchCommit]*LineToLineMatcher
+ rbcToFile fileVersions
+ baseFile string
+ hitCounts map[int]int
+ matchers map[RepoBranchCommit]*LineToLineMatcher
+ lostFrames map[RepoBranchCommit]int64
}
-func (a *FileLineCoverMerger) AddRecord(rbc RepoBranchCommit, arch string, f Frame, hitCount int) {
+func (a *FileLineCoverMerger) AddRecord(rbc RepoBranchCommit, f *Frame, hitCount int) {
+ if a.matchers[rbc] == nil {
+ if hitCount > 0 {
+ a.lostFrames[rbc]++
+ }
+ return
+ }
if targetLine := a.matchers[rbc].SameLinePos(f.StartLine); targetLine != -1 {
a.hitCounts[f.StartLine] += hitCount
}
}
func (a *FileLineCoverMerger) Result() *MergeResult {
+ lostFrames := a.lostFrames
+ if len(lostFrames) == 0 {
+ lostFrames = nil
+ }
return &MergeResult{
HitCounts: a.hitCounts,
FileExists: true,
+ LostFrames: lostFrames,
}
}
diff --git a/pkg/covermerger/repos.go b/pkg/covermerger/repos.go
index 77a2e557e..2c63e2d5d 100644
--- a/pkg/covermerger/repos.go
+++ b/pkg/covermerger/repos.go
@@ -5,39 +5,85 @@ package covermerger
import (
"fmt"
- "os"
+ "log"
"github.com/google/syzkaller/pkg/vcs"
"github.com/google/syzkaller/sys/targets"
)
+type fileVersion struct {
+ content string
+}
+
+type fileVersions map[RepoBranchCommit]fileVersion
+
func getFileVersions(c *Config, targetFilePath string, rbcs []RepoBranchCommit,
-) (map[RepoBranchCommit]string, error) {
- reposPath := c.Workdir + "/repos"
+) (fileVersions, error) {
+ repos, err := cloneRepos(c, rbcs)
+ if err != nil {
+ return nil, fmt.Errorf("failed to clone repos: %w", err)
+ }
+
+ res := make(fileVersions)
for _, rbc := range rbcs {
- commitPath := reposPath + "/" + rbc.Commit
- if _, err := os.Stat(commitPath); err == nil || c.skipRepoClone {
- continue
- }
- repo, err := vcs.NewRepo(targets.Linux, "none", commitPath)
+ fileBytes, err := repos[rbc].Object(targetFilePath, rbc.Commit)
+ // It is ok if some file doesn't exist. It means we have repo FS diff.
if err != nil {
- return nil, fmt.Errorf("failed to create new repo at %s: %w", commitPath, err)
+ continue
}
- if _, err = repo.CheckoutCommit(rbc.Repo, rbc.Commit); err != nil {
- return nil, fmt.Errorf("failed to get commit %s from repo %s to folder %s: %w",
- rbc.Commit, rbc.Repo, commitPath, err)
+ res[rbc] = fileVersion{
+ content: string(fileBytes),
}
}
+ return res, nil
+}
- res := make(map[RepoBranchCommit]string)
+type repoCache struct {
+ cache map[RepoBranchCommit]vcs.Repo
+}
+
+func (rc *repoCache) get(rbc RepoBranchCommit) vcs.Repo {
+ rbc.Commit = ""
+ if repo, ok := rc.cache[rbc]; ok {
+ return repo
+ }
+ return nil
+}
+
+func (rc *repoCache) put(rbc RepoBranchCommit, repo vcs.Repo) {
+ rbc.Commit = ""
+ if rc.cache == nil {
+ rc.cache = map[RepoBranchCommit]vcs.Repo{}
+ }
+ rc.cache[rbc] = repo
+}
+
+func cloneRepos(c *Config, rbcs []RepoBranchCommit) (map[RepoBranchCommit]vcs.Repo, error) {
+ cache := &c.repoCache
+ repos := make(map[RepoBranchCommit]vcs.Repo)
for _, rbc := range rbcs {
- filePath := reposPath + "/" + rbc.Commit + "/" + targetFilePath
- fileBytes, err := os.ReadFile(filePath)
- // It is ok if some file doesn't exist. It means we have repo FS diff.
- if err == nil {
- res[rbc] = string(fileBytes)
+ repos[rbc] = cache.get(rbc)
+ if repos[rbc] != nil {
+ continue
+ }
+ rbcPath := c.Workdir + "/repos/linux_kernels"
+ repo, err := vcs.NewRepo(targets.Linux, "none", rbcPath)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create/open repo at %s: %w", rbcPath, err)
+ }
+ repos[rbc] = repo
+ cache.put(rbc, repo)
+ if c.skipRepoClone {
+ continue
+ }
+ log.Printf("cloning repo: %s, branch: %s", rbc.Repo, rbc.Branch)
+ if rbc.Branch == "" {
+ panic("repo and branch are needed")
+ }
+ if _, err = repo.CheckoutBranch(rbc.Repo, rbc.Branch); err != nil {
+ return nil, fmt.Errorf("failed to CheckoutBranch(repo %s, branch %s): %w",
+ rbc.Repo, rbc.Branch, err)
}
}
-
- return res, nil
+ return repos, nil
}