diff options
| author | Taras Madan <tarasmadan@google.com> | 2024-06-26 16:11:08 +0200 |
|---|---|---|
| committer | Taras Madan <tarasmadan@google.com> | 2024-07-01 15:37:36 +0000 |
| commit | 08010a854ef401b7a00748b77170cfb02bee83cc (patch) | |
| tree | 8744f2b0620930111181b24ecc619c5985c37143 | |
| parent | a6f99ace4014896f81a2f101416fd5413579f2bd (diff) | |
covermerger: parallelize and use monorepo
| -rw-r--r-- | pkg/covermerger/covermerger.go | 111 | ||||
| -rw-r--r-- | pkg/covermerger/covermerger_test.go | 11 | ||||
| -rw-r--r-- | pkg/covermerger/file_line_merger.go | 4 | ||||
| -rw-r--r-- | pkg/covermerger/monorepo.go | 104 | ||||
| -rw-r--r-- | pkg/covermerger/repos.go | 89 | ||||
| -rw-r--r-- | tools/syz-covermerger/syz_covermerger.go | 3 |
6 files changed, 186 insertions, 136 deletions
diff --git a/pkg/covermerger/covermerger.go b/pkg/covermerger/covermerger.go index b5bc8cbdb..602a0a263 100644 --- a/pkg/covermerger/covermerger.go +++ b/pkg/covermerger/covermerger.go @@ -4,13 +4,16 @@ package covermerger import ( + "context" "encoding/csv" "fmt" "io" "log" "strconv" + "sync" "golang.org/x/exp/maps" + "golang.org/x/sync/errgroup" ) const ( @@ -23,7 +26,6 @@ const ( ) type FileRecord map[string]string -type FileRecords []FileRecord type RepoBranchCommit struct { Repo string Branch string @@ -73,24 +75,15 @@ type FileCoverageMerger interface { Result() *MergeResult } -func batchFileData(c *Config, targetFilePath string, records FileRecords, processedFiles map[string]struct{}, -) (*MergeResult, error) { +func batchFileData(c *Config, targetFilePath string, records []FileRecord) (*MergeResult, error) { log.Printf("processing %d records for %s", len(records), targetFilePath) - if _, exists := processedFiles[targetFilePath]; exists { - return nil, fmt.Errorf("file was already processed, check the input ordering") - } - processedFiles[targetFilePath] = struct{}{} repoBranchCommitsMap := make(map[RepoBranchCommit]bool) for _, record := range records { repoBranchCommitsMap[record.RepoBranchCommit()] = true } repoBranchCommitsMap[c.Base] = true repoBranchCommits := maps.Keys(repoBranchCommitsMap) - getFiles := getFileVersions - if c.getFileVersionsMock != nil { - getFiles = c.getFileVersionsMock - } - fvs, err := getFiles(c, targetFilePath, repoBranchCommits) + fvs, err := c.FileVersProvider.GetFileVersions(c, targetFilePath, repoBranchCommits) if err != nil { return nil, fmt.Errorf("failed to getFileVersions: %w", err) } @@ -125,11 +118,11 @@ func makeRecord(fields, schema []string) FileRecord { } type Config struct { - Workdir string - skipRepoClone bool - Base RepoBranchCommit - getFileVersionsMock func(*Config, string, []RepoBranchCommit) (fileVersions, error) - repoCache repoCache + Jobs int + Workdir string + skipRepoClone bool + Base RepoBranchCommit + FileVersProvider fileVersProvider } func isSchema(fields, schema []string) bool { @@ -182,32 +175,70 @@ func MergeCSVData(config *Config, reader io.Reader) (map[string]*MergeResult, er return mergeResult, nil } -func mergeChanData(c *Config, recordsChan <-chan FileRecord) (map[string]*MergeResult, error) { +type FileRecords struct { + fileName string + records []FileRecord +} + +func mergeChanData(c *Config, recordChan <-chan FileRecord) (map[string]*MergeResult, error) { + g, ctx := errgroup.WithContext(context.Background()) + frecordChan := groupFileRecords(recordChan, ctx) stat := make(map[string]*MergeResult) - targetFile := "" - var records []FileRecord - processedFiles := map[string]struct{}{} - for record := range recordsChan { - curTargetFile := record[KeyFilePath] - if targetFile == "" { - targetFile = curTargetFile - } - if curTargetFile != targetFile { - var err error - if stat[targetFile], err = batchFileData(c, targetFile, records, processedFiles); err != nil { - return nil, fmt.Errorf("failed to batchFileData(%s): %w", targetFile, err) + var mu sync.Mutex + for i := 0; i < c.Jobs; i++ { + g.Go(func() error { + for frecord := range frecordChan { + if mr, err := batchFileData(c, frecord.fileName, frecord.records); err != nil { + return fmt.Errorf("failed to batchFileData(%s): %w", frecord.fileName, err) + } else { + mu.Lock() + if _, exist := stat[frecord.fileName]; exist { + mu.Unlock() + return fmt.Errorf("file %s was already processed", frecord.fileName) + } + stat[frecord.fileName] = mr + mu.Unlock() + } } - records = nil - targetFile = curTargetFile - } - records = append(records, record) + return nil + }) } - if records != nil { - var err error - if stat[targetFile], err = batchFileData(c, targetFile, records, processedFiles); err != nil { - return nil, fmt.Errorf("failed to batchFileData(%s): %w", targetFile, err) - } + if err := g.Wait(); err != nil { + return nil, err } - return stat, nil } + +func groupFileRecords(recordChan <-chan FileRecord, ctx context.Context) chan FileRecords { + frecordChan := make(chan FileRecords) + go func() { + defer close(frecordChan) + targetFile := "" + var records []FileRecord + for record := range recordChan { + select { + case <-ctx.Done(): + return + default: + } + curTargetFile := record[KeyFilePath] + if targetFile == "" { + targetFile = curTargetFile + } + if curTargetFile != targetFile { + frecordChan <- FileRecords{ + fileName: targetFile, + records: records, + } + records = nil + targetFile = curTargetFile + } + records = append(records, record) + } + frecordChan <- FileRecords{ + fileName: targetFile, + records: records, + } + }() + return frecordChan +} diff --git a/pkg/covermerger/covermerger_test.go b/pkg/covermerger/covermerger_test.go index 5d99e35df..becb3e9c9 100644 --- a/pkg/covermerger/covermerger_test.go +++ b/pkg/covermerger/covermerger_test.go @@ -121,6 +121,7 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4, t.Run(test.name, func(t *testing.T) { aggregation, err := MergeCSVData( &Config{ + Jobs: 2, Workdir: test.workdir, skipRepoClone: true, Base: RepoBranchCommit{ @@ -128,7 +129,7 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4, Branch: test.baseBranch, Commit: test.baseCommit, }, - getFileVersionsMock: mockGetFileVersions, + FileVersProvider: &fileVersProviderMock{}, }, strings.NewReader(test.bqTable), ) @@ -140,15 +141,15 @@ samp_time,1,360,arch,b1,ci-mock,git://repo,master,commit2,not_changed.c,func1,4, } } -func mockGetFileVersions(c *Config, targetFilePath string, rbcs []RepoBranchCommit, +type fileVersProviderMock struct{} + +func (m *fileVersProviderMock) GetFileVersions(c *Config, targetFilePath string, rbcs []RepoBranchCommit, ) (fileVersions, error) { res := make(fileVersions) for _, rbc := range rbcs { filePath := c.Workdir + "/repos/" + rbc.Commit + "/" + targetFilePath if bytes, err := os.ReadFile(filePath); err == nil { - res[rbc] = fileVersion{ - content: string(bytes), - } + res[rbc] = string(bytes) } } return res, nil diff --git a/pkg/covermerger/file_line_merger.go b/pkg/covermerger/file_line_merger.go index 29fd4200b..b53509df5 100644 --- a/pkg/covermerger/file_line_merger.go +++ b/pkg/covermerger/file_line_merger.go @@ -9,7 +9,7 @@ func makeFileLineCoverMerger( baseFileExists := false for rbc, fv := range fvs { if rbc == base { - baseFile = fv.content + baseFile = fv baseFileExists = true break } @@ -25,7 +25,7 @@ func makeFileLineCoverMerger( lostFrames: map[RepoBranchCommit]int64{}, } for rbc, fv := range fvs { - a.matchers[rbc] = makeLineToLineMatcher(fv.content, baseFile) + a.matchers[rbc] = makeLineToLineMatcher(fv, baseFile) } return a } diff --git a/pkg/covermerger/monorepo.go b/pkg/covermerger/monorepo.go new file mode 100644 index 000000000..5ed67e311 --- /dev/null +++ b/pkg/covermerger/monorepo.go @@ -0,0 +1,104 @@ +// Copyright 2024 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package covermerger + +import ( + "fmt" + "log" + "sync" + + "github.com/google/syzkaller/pkg/vcs" + "github.com/google/syzkaller/sys/targets" +) + +type fileVersProvider interface { + GetFileVersions(c *Config, targetFilePath string, rbcs []RepoBranchCommit, + ) (fileVersions, error) +} + +type monoRepo struct { + branches map[RepoBranchCommit]struct{} + mu sync.RWMutex + repo vcs.Repo +} + +type fileVersions map[RepoBranchCommit]string + +func (mr *monoRepo) GetFileVersions(c *Config, targetFilePath string, rbcs []RepoBranchCommit, +) (fileVersions, error) { + mr.mu.RLock() + if !mr.allRepoBranchesPresent(rbcs) { + mr.mu.RUnlock() + if err := mr.cloneBranches(rbcs); err != nil { + return nil, fmt.Errorf("failed to clone repos: %w", err) + } + mr.mu.RLock() + } + defer mr.mu.RUnlock() + res := make(fileVersions) + for _, rbc := range rbcs { + fileBytes, err := mr.repo.Object(targetFilePath, rbc.Commit) + // It is ok if some file doesn't exist. It means we have repo FS diff. + if err != nil { + continue + } + res[rbc] = string(fileBytes) + } + return res, nil +} + +func (mr *monoRepo) allRepoBranchesPresent(rbcs []RepoBranchCommit) bool { + for _, rbc := range rbcs { + if !mr.repoBranchPresent(rbc) { + return false + } + } + return true +} + +func (mr *monoRepo) repoBranchPresent(rbc RepoBranchCommit) bool { + rbc.Commit = "" + _, ok := mr.branches[rbc] + return ok +} + +func (mr *monoRepo) addRepoBranch(rbc RepoBranchCommit) error { + rbc.Commit = "" + mr.branches[rbc] = struct{}{} + log.Printf("cloning repo: %s, branch: %s", rbc.Repo, rbc.Branch) + if rbc.Repo == "" || rbc.Branch == "" { + panic("repo and branch are needed") + } + if _, err := mr.repo.CheckoutBranch(rbc.Repo, rbc.Branch); err != nil { + return fmt.Errorf("failed to CheckoutBranch(repo %s, branch %s): %w", + rbc.Repo, rbc.Branch, err) + } + return nil +} + +func MakeMonoRepo(workdir string) fileVersProvider { + rbcPath := workdir + "/repos/linux_kernels" + mr := &monoRepo{ + branches: map[RepoBranchCommit]struct{}{}, + } + var err error + if mr.repo, err = vcs.NewRepo(targets.Linux, "none", rbcPath); err != nil { + panic(fmt.Sprintf("failed to create/open repo at %s: %s", rbcPath, err.Error())) + } + return mr +} + +func (mr *monoRepo) cloneBranches(rbcs []RepoBranchCommit) error { + mr.mu.Lock() + defer mr.mu.Unlock() + for _, rbc := range rbcs { + if mr.repoBranchPresent(rbc) { + continue + } + if err := mr.addRepoBranch(rbc); err != nil { + return err + } + } + return nil +} diff --git a/pkg/covermerger/repos.go b/pkg/covermerger/repos.go deleted file mode 100644 index 2c63e2d5d..000000000 --- a/pkg/covermerger/repos.go +++ /dev/null @@ -1,89 +0,0 @@ -// Copyright 2024 syzkaller project authors. All rights reserved. -// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. - -package covermerger - -import ( - "fmt" - "log" - - "github.com/google/syzkaller/pkg/vcs" - "github.com/google/syzkaller/sys/targets" -) - -type fileVersion struct { - content string -} - -type fileVersions map[RepoBranchCommit]fileVersion - -func getFileVersions(c *Config, targetFilePath string, rbcs []RepoBranchCommit, -) (fileVersions, error) { - repos, err := cloneRepos(c, rbcs) - if err != nil { - return nil, fmt.Errorf("failed to clone repos: %w", err) - } - - res := make(fileVersions) - for _, rbc := range rbcs { - fileBytes, err := repos[rbc].Object(targetFilePath, rbc.Commit) - // It is ok if some file doesn't exist. It means we have repo FS diff. - if err != nil { - continue - } - res[rbc] = fileVersion{ - content: string(fileBytes), - } - } - return res, nil -} - -type repoCache struct { - cache map[RepoBranchCommit]vcs.Repo -} - -func (rc *repoCache) get(rbc RepoBranchCommit) vcs.Repo { - rbc.Commit = "" - if repo, ok := rc.cache[rbc]; ok { - return repo - } - return nil -} - -func (rc *repoCache) put(rbc RepoBranchCommit, repo vcs.Repo) { - rbc.Commit = "" - if rc.cache == nil { - rc.cache = map[RepoBranchCommit]vcs.Repo{} - } - rc.cache[rbc] = repo -} - -func cloneRepos(c *Config, rbcs []RepoBranchCommit) (map[RepoBranchCommit]vcs.Repo, error) { - cache := &c.repoCache - repos := make(map[RepoBranchCommit]vcs.Repo) - for _, rbc := range rbcs { - repos[rbc] = cache.get(rbc) - if repos[rbc] != nil { - continue - } - rbcPath := c.Workdir + "/repos/linux_kernels" - repo, err := vcs.NewRepo(targets.Linux, "none", rbcPath) - if err != nil { - return nil, fmt.Errorf("failed to create/open repo at %s: %w", rbcPath, err) - } - repos[rbc] = repo - cache.put(rbc, repo) - if c.skipRepoClone { - continue - } - log.Printf("cloning repo: %s, branch: %s", rbc.Repo, rbc.Branch) - if rbc.Branch == "" { - panic("repo and branch are needed") - } - if _, err = repo.CheckoutBranch(rbc.Repo, rbc.Branch); err != nil { - return nil, fmt.Errorf("failed to CheckoutBranch(repo %s, branch %s): %w", - rbc.Repo, rbc.Branch, err) - } - } - return repos, nil -} diff --git a/tools/syz-covermerger/syz_covermerger.go b/tools/syz-covermerger/syz_covermerger.go index afa518487..063e8c85b 100644 --- a/tools/syz-covermerger/syz_covermerger.go +++ b/tools/syz-covermerger/syz_covermerger.go @@ -9,6 +9,7 @@ import ( "fmt" "log" "os" + "runtime" "sort" "cloud.google.com/go/civil" @@ -32,12 +33,14 @@ var ( func main() { flag.Parse() config := &covermerger.Config{ + Jobs: runtime.NumCPU(), Workdir: *flagWorkdir, Base: covermerger.RepoBranchCommit{ Repo: *flagRepo, Branch: *flagBranch, Commit: *flagCommit, }, + FileVersProvider: covermerger.MakeMonoRepo(*flagWorkdir), } mergeResult, err := covermerger.MergeCSVData(config, os.Stdin) if err != nil { |
