aboutsummaryrefslogtreecommitdiffstats
path: root/pkg
diff options
context:
space:
mode:
Diffstat (limited to 'pkg')
-rw-r--r--pkg/covermerger/covermerger.go111
-rw-r--r--pkg/covermerger/covermerger_test.go11
-rw-r--r--pkg/covermerger/file_line_merger.go4
-rw-r--r--pkg/covermerger/monorepo.go104
-rw-r--r--pkg/covermerger/repos.go89
5 files changed, 183 insertions, 136 deletions
diff --git a/pkg/covermerger/covermerger.go b/pkg/covermerger/covermerger.go
index b5bc8cbdb..602a0a263 100644
--- a/pkg/covermerger/covermerger.go
+++ b/pkg/covermerger/covermerger.go
@@ -4,13 +4,16 @@
package covermerger
import (
+ "context"
"encoding/csv"
"fmt"
"io"
"log"
"strconv"
+ "sync"
"golang.org/x/exp/maps"
+ "golang.org/x/sync/errgroup"
)
const (
@@ -23,7 +26,6 @@ const (
)
type FileRecord map[string]string
-type FileRecords []FileRecord
type RepoBranchCommit struct {
Repo string
Branch string
@@ -73,24 +75,15 @@ type FileCoverageMerger interface {
Result() *MergeResult
}
-func batchFileData(c *Config, targetFilePath string, records FileRecords, processedFiles map[string]struct{},
-) (*MergeResult, error) {
+func batchFileData(c *Config, targetFilePath string, records []FileRecord) (*MergeResult, error) {
log.Printf("processing %d records for %s", len(records), targetFilePath)
- if _, exists := processedFiles[targetFilePath]; exists {
- return nil, fmt.Errorf("file was already processed, check the input ordering")
- }
- processedFiles[targetFilePath] = struct{}{}
repoBranchCommitsMap := make(map[RepoBranchCommit]bool)
for _, record := range records {
repoBranchCommitsMap[record.RepoBranchCommit()] = true
}
repoBranchCommitsMap[c.Base] = true
repoBranchCommits := maps.Keys(repoBranchCommitsMap)
- getFiles := getFileVersions
- if c.getFileVersionsMock != nil {
- getFiles = c.getFileVersionsMock
- }
- fvs, err := getFiles(c, targetFilePath, repoBranchCommits)
+ fvs, err := c.FileVersProvider.GetFileVersions(c, targetFilePath, repoBranchCommits)
if err != nil {
return nil, fmt.Errorf("failed to getFileVersions: %w", err)
}
@@ -125,11 +118,11 @@ func makeRecord(fields, schema []string) FileRecord {
}
type Config struct {
- Workdir string
- skipRepoClone bool
- Base RepoBranchCommit
- getFileVersionsMock func(*Config, string, []RepoBranchCommit) (fileVersions, error)
- repoCache repoCache
+ Jobs int
+ Workdir string
+ skipRepoClone bool
+ Base RepoBranchCommit
+ FileVersProvider fileVersProvider
}
func isSchema(fields, schema []string) bool {
@@ -182,32 +175,70 @@ func MergeCSVData(config *Config, reader io.Reader) (map[string]*MergeResult, er
return mergeResult, nil
}
-func mergeChanData(c *Config, recordsChan <-chan FileRecord) (map[string]*MergeResult, error) {
+type FileRecords struct {
+ fileName string
+ records []FileRecord
+}
+
+func mergeChanData(c *Config, recordChan <-chan FileRecord) (map[string]*MergeResult, error) {
+ g, ctx := errgroup.WithContext(context.Background())
+ frecordChan := groupFileRecords(recordChan, ctx)
stat := make(map[string]*MergeResult)
- targetFile := ""
- var records []FileRecord
- processedFiles := map[string]struct{}{}
- for record := range recordsChan {
- curTargetFile := record[KeyFilePath]
- if targetFile == "" {
- targetFile = curTargetFile
- }
- if curTargetFile != targetFile {
- var err error
- if stat[targetFile], err = batchFileData(c, targetFile, records, processedFiles); err != nil {
- return nil, fmt.Errorf("failed to batchFileData(%s): %w", targetFile, err)
+ var mu sync.Mutex
+ for i := 0; i < c.Jobs; i++ {
+ g.Go(func() error {
+ for frecord := range frecordChan {
+ if mr, err := batchFileData(c, frecord.fileName, frecord.records); err != nil {
+ return fmt.Errorf("failed to batchFileData(%s): %w", frecord.fileName, err)
+ } else {
+ mu.Lock()
+ if _, exist := stat[frecord.fileName]; exist {
+ mu.Unlock()
+ return fmt.Errorf("file %s was already processed", frecord.fileName)
+ }
+ stat[frecord.fileName] = mr
+ mu.Unlock()
+ }
}
- records = nil
- targetFile = curTargetFile
- }
- records = append(records, record)
+ return nil
+ })
}
- if records != nil {
- var err error
- if stat[targetFile], err = batchFileData(c, targetFile, records, processedFiles); err != nil {
- return nil, fmt.Errorf("failed to batchFileData(%s): %w", targetFile, err)
- }
+ if err := g.Wait(); err != nil {
+ return nil, err
}
-
return stat, nil
}
+
+func groupFileRecords(recordChan <-chan FileRecord, ctx context.Context) chan FileRecords {
+ frecordChan := make(chan FileRecords)
+ go func() {
+ defer close(frecordChan)
+ targetFile := ""
+ var records []FileRecord
+ for record := range recordChan {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ }
+ curTargetFile := record[KeyFilePath]
+ if targetFile == "" {
+ targetFile = curTargetFile
+ }
+ if curTargetFile != targetFile {
+ frecordChan <- FileRecords{
+ fileName: targetFile,
+ records: records,
+ }
+ records = nil
+ targetFile = curTargetFile
+ }
+ records = append(records, record)
+ }
+ frecordChan <- FileRecords{
+ fileName: targetFile,
+ records: records,
+ }
+ }()
+ return frecordChan
+}
diff --git a/pkg/covermerger/covermerger_test.go b/pkg/covermerger/covermerger_test.go
index 5d99e35df..becb3e9c9 100644
--- a/pkg/covermerger/covermerger_test.go
+++ b/pkg/covermerger/covermerger_test.go
@@ -121,6 +121,7 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4,
t.Run(test.name, func(t *testing.T) {
aggregation, err := MergeCSVData(
&Config{
+ Jobs: 2,
Workdir: test.workdir,
skipRepoClone: true,
Base: RepoBranchCommit{
@@ -128,7 +129,7 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4,
Branch: test.baseBranch,
Commit: test.baseCommit,
},
- getFileVersionsMock: mockGetFileVersions,
+ FileVersProvider: &fileVersProviderMock{},
},
strings.NewReader(test.bqTable),
)
@@ -140,15 +141,15 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4,
}
}
-func mockGetFileVersions(c *Config, targetFilePath string, rbcs []RepoBranchCommit,
+type fileVersProviderMock struct{}
+
+func (m *fileVersProviderMock) GetFileVersions(c *Config, targetFilePath string, rbcs []RepoBranchCommit,
) (fileVersions, error) {
res := make(fileVersions)
for _, rbc := range rbcs {
filePath := c.Workdir + "/repos/" + rbc.Commit + "/" + targetFilePath
if bytes, err := os.ReadFile(filePath); err == nil {
- res[rbc] = fileVersion{
- content: string(bytes),
- }
+ res[rbc] = string(bytes)
}
}
return res, nil
diff --git a/pkg/covermerger/file_line_merger.go b/pkg/covermerger/file_line_merger.go
index 29fd4200b..b53509df5 100644
--- a/pkg/covermerger/file_line_merger.go
+++ b/pkg/covermerger/file_line_merger.go
@@ -9,7 +9,7 @@ func makeFileLineCoverMerger(
baseFileExists := false
for rbc, fv := range fvs {
if rbc == base {
- baseFile = fv.content
+ baseFile = fv
baseFileExists = true
break
}
@@ -25,7 +25,7 @@ func makeFileLineCoverMerger(
lostFrames: map[RepoBranchCommit]int64{},
}
for rbc, fv := range fvs {
- a.matchers[rbc] = makeLineToLineMatcher(fv.content, baseFile)
+ a.matchers[rbc] = makeLineToLineMatcher(fv, baseFile)
}
return a
}
diff --git a/pkg/covermerger/monorepo.go b/pkg/covermerger/monorepo.go
new file mode 100644
index 000000000..5ed67e311
--- /dev/null
+++ b/pkg/covermerger/monorepo.go
@@ -0,0 +1,104 @@
+// Copyright 2024 syzkaller project authors. All rights reserved.
+// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
+
+package covermerger
+
+import (
+ "fmt"
+ "log"
+ "sync"
+
+ "github.com/google/syzkaller/pkg/vcs"
+ "github.com/google/syzkaller/sys/targets"
+)
+
+type fileVersProvider interface {
+ GetFileVersions(c *Config, targetFilePath string, rbcs []RepoBranchCommit,
+ ) (fileVersions, error)
+}
+
+type monoRepo struct {
+ branches map[RepoBranchCommit]struct{}
+ mu sync.RWMutex
+ repo vcs.Repo
+}
+
+type fileVersions map[RepoBranchCommit]string
+
+func (mr *monoRepo) GetFileVersions(c *Config, targetFilePath string, rbcs []RepoBranchCommit,
+) (fileVersions, error) {
+ mr.mu.RLock()
+ if !mr.allRepoBranchesPresent(rbcs) {
+ mr.mu.RUnlock()
+ if err := mr.cloneBranches(rbcs); err != nil {
+ return nil, fmt.Errorf("failed to clone repos: %w", err)
+ }
+ mr.mu.RLock()
+ }
+ defer mr.mu.RUnlock()
+ res := make(fileVersions)
+ for _, rbc := range rbcs {
+ fileBytes, err := mr.repo.Object(targetFilePath, rbc.Commit)
+ // It is ok if some file doesn't exist. It means we have repo FS diff.
+ if err != nil {
+ continue
+ }
+ res[rbc] = string(fileBytes)
+ }
+ return res, nil
+}
+
+func (mr *monoRepo) allRepoBranchesPresent(rbcs []RepoBranchCommit) bool {
+ for _, rbc := range rbcs {
+ if !mr.repoBranchPresent(rbc) {
+ return false
+ }
+ }
+ return true
+}
+
+func (mr *monoRepo) repoBranchPresent(rbc RepoBranchCommit) bool {
+ rbc.Commit = ""
+ _, ok := mr.branches[rbc]
+ return ok
+}
+
+func (mr *monoRepo) addRepoBranch(rbc RepoBranchCommit) error {
+ rbc.Commit = ""
+ mr.branches[rbc] = struct{}{}
+ log.Printf("cloning repo: %s, branch: %s", rbc.Repo, rbc.Branch)
+ if rbc.Repo == "" || rbc.Branch == "" {
+ panic("repo and branch are needed")
+ }
+ if _, err := mr.repo.CheckoutBranch(rbc.Repo, rbc.Branch); err != nil {
+ return fmt.Errorf("failed to CheckoutBranch(repo %s, branch %s): %w",
+ rbc.Repo, rbc.Branch, err)
+ }
+ return nil
+}
+
+func MakeMonoRepo(workdir string) fileVersProvider {
+ rbcPath := workdir + "/repos/linux_kernels"
+ mr := &monoRepo{
+ branches: map[RepoBranchCommit]struct{}{},
+ }
+ var err error
+ if mr.repo, err = vcs.NewRepo(targets.Linux, "none", rbcPath); err != nil {
+ panic(fmt.Sprintf("failed to create/open repo at %s: %s", rbcPath, err.Error()))
+ }
+ return mr
+}
+
+func (mr *monoRepo) cloneBranches(rbcs []RepoBranchCommit) error {
+ mr.mu.Lock()
+ defer mr.mu.Unlock()
+ for _, rbc := range rbcs {
+ if mr.repoBranchPresent(rbc) {
+ continue
+ }
+ if err := mr.addRepoBranch(rbc); err != nil {
+ return err
+ }
+ }
+ return nil
+}
diff --git a/pkg/covermerger/repos.go b/pkg/covermerger/repos.go
deleted file mode 100644
index 2c63e2d5d..000000000
--- a/pkg/covermerger/repos.go
+++ /dev/null
@@ -1,89 +0,0 @@
-// Copyright 2024 syzkaller project authors. All rights reserved.
-// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
-
-package covermerger
-
-import (
- "fmt"
- "log"
-
- "github.com/google/syzkaller/pkg/vcs"
- "github.com/google/syzkaller/sys/targets"
-)
-
-type fileVersion struct {
- content string
-}
-
-type fileVersions map[RepoBranchCommit]fileVersion
-
-func getFileVersions(c *Config, targetFilePath string, rbcs []RepoBranchCommit,
-) (fileVersions, error) {
- repos, err := cloneRepos(c, rbcs)
- if err != nil {
- return nil, fmt.Errorf("failed to clone repos: %w", err)
- }
-
- res := make(fileVersions)
- for _, rbc := range rbcs {
- fileBytes, err := repos[rbc].Object(targetFilePath, rbc.Commit)
- // It is ok if some file doesn't exist. It means we have repo FS diff.
- if err != nil {
- continue
- }
- res[rbc] = fileVersion{
- content: string(fileBytes),
- }
- }
- return res, nil
-}
-
-type repoCache struct {
- cache map[RepoBranchCommit]vcs.Repo
-}
-
-func (rc *repoCache) get(rbc RepoBranchCommit) vcs.Repo {
- rbc.Commit = ""
- if repo, ok := rc.cache[rbc]; ok {
- return repo
- }
- return nil
-}
-
-func (rc *repoCache) put(rbc RepoBranchCommit, repo vcs.Repo) {
- rbc.Commit = ""
- if rc.cache == nil {
- rc.cache = map[RepoBranchCommit]vcs.Repo{}
- }
- rc.cache[rbc] = repo
-}
-
-func cloneRepos(c *Config, rbcs []RepoBranchCommit) (map[RepoBranchCommit]vcs.Repo, error) {
- cache := &c.repoCache
- repos := make(map[RepoBranchCommit]vcs.Repo)
- for _, rbc := range rbcs {
- repos[rbc] = cache.get(rbc)
- if repos[rbc] != nil {
- continue
- }
- rbcPath := c.Workdir + "/repos/linux_kernels"
- repo, err := vcs.NewRepo(targets.Linux, "none", rbcPath)
- if err != nil {
- return nil, fmt.Errorf("failed to create/open repo at %s: %w", rbcPath, err)
- }
- repos[rbc] = repo
- cache.put(rbc, repo)
- if c.skipRepoClone {
- continue
- }
- log.Printf("cloning repo: %s, branch: %s", rbc.Repo, rbc.Branch)
- if rbc.Branch == "" {
- panic("repo and branch are needed")
- }
- if _, err = repo.CheckoutBranch(rbc.Repo, rbc.Branch); err != nil {
- return nil, fmt.Errorf("failed to CheckoutBranch(repo %s, branch %s): %w",
- rbc.Repo, rbc.Branch, err)
- }
- }
- return repos, nil
-}