diff options
| -rw-r--r-- | pkg/manager/repro.go | 261 | ||||
| -rw-r--r-- | pkg/manager/repro_test.go (renamed from syz-manager/repro_test.go) | 38 | ||||
| -rw-r--r-- | syz-manager/http.go | 4 | ||||
| -rw-r--r-- | syz-manager/hub.go | 9 | ||||
| -rw-r--r-- | syz-manager/manager.go | 146 | ||||
| -rw-r--r-- | syz-manager/repro.go | 230 | ||||
| -rw-r--r-- | syz-manager/snapshot.go | 5 |
7 files changed, 349 insertions, 344 deletions
diff --git a/pkg/manager/repro.go b/pkg/manager/repro.go new file mode 100644 index 000000000..9858a4629 --- /dev/null +++ b/pkg/manager/repro.go @@ -0,0 +1,261 @@ +// Copyright 2024 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package manager + +import ( + "context" + "fmt" + "maps" + "slices" + "sync" + + "github.com/google/syzkaller/pkg/log" + "github.com/google/syzkaller/pkg/report" + "github.com/google/syzkaller/pkg/repro" + "github.com/google/syzkaller/pkg/stat" +) + +type ReproResult struct { + Crash *Crash // the original crash + Repro *repro.Result + Strace *repro.StraceResult + Stats *repro.Stats + Err error +} + +type Crash struct { + InstanceIndex int + FromHub bool // this crash was created based on a repro from syz-hub + FromDashboard bool // .. or from dashboard + Manual bool + *report.Report +} + +func (c *Crash) FullTitle() string { + if c.Report.Title != "" { + return c.Report.Title + } + // Just use some unique, but stable titles. + if c.FromDashboard { + return fmt.Sprintf("dashboard crash %p", c) + } else if c.FromHub { + return fmt.Sprintf("crash from hub %p", c) + } + panic("the crash is expected to have a report") +} + +type ReproManagerView interface { + RunRepro(crash *Crash) *ReproResult + NeedRepro(crash *Crash) bool + ResizeReproPool(size int) +} + +type ReproLoop struct { + statNumReproducing *stat.Val + statPending *stat.Val + + onlyOnce bool + mgr ReproManagerView + parallel chan struct{} + pingQueue chan struct{} + reproVMs int + + mu sync.Mutex + queue []*Crash + reproducing map[string]bool + attempted map[string]bool +} + +func NewReproLoop(mgr ReproManagerView, reproVMs int, onlyOnce bool) *ReproLoop { + ret := &ReproLoop{ + mgr: mgr, + onlyOnce: onlyOnce, + parallel: make(chan struct{}, reproVMs), + reproVMs: reproVMs, + reproducing: map[string]bool{}, + pingQueue: make(chan struct{}, 1), + attempted: map[string]bool{}, + } + ret.statNumReproducing = stat.New("reproducing", "Number of crashes being reproduced", + stat.Console, stat.NoGraph, func() int { + ret.mu.Lock() + defer ret.mu.Unlock() + return len(ret.reproducing) + }) + ret.statPending = stat.New("pending", "Number of pending repro tasks", + stat.Console, stat.NoGraph, func() int { + ret.mu.Lock() + defer ret.mu.Unlock() + return len(ret.queue) + }) + return ret +} + +// startReproduction() is assumed to be called only once. +// The agument is the maximum number of VMs dedicated to the bug reproduction. +func (r *ReproLoop) StartReproduction() { + count := 0 + for ; r.calculateReproVMs(count+1) <= r.reproVMs; count++ { + r.parallel <- struct{}{} + } + log.Logf(0, "starting bug reproductions (max %d VMs, %d repros)", r.reproVMs, count) +} + +func (r *ReproLoop) calculateReproVMs(repros int) int { + // Let's allocate 1.33 VMs per a reproducer thread. + if r.reproVMs == 1 && repros == 1 { + // With one exception -- if we have only one VM, let's still do one repro. + return 1 + } + return (repros*4 + 2) / 3 +} + +func (r *ReproLoop) CanReproMore() bool { + return len(r.parallel) != 0 +} + +func (r *ReproLoop) Reproducing() map[string]bool { + r.mu.Lock() + defer r.mu.Unlock() + return maps.Clone(r.reproducing) +} + +// Empty returns true if there are neither running nor planned bug reproductions. +func (r *ReproLoop) Empty() bool { + r.mu.Lock() + defer r.mu.Unlock() + return len(r.reproducing) == 0 && len(r.queue) == 0 +} + +func (r *ReproLoop) Enqueue(crash *Crash) { + r.mu.Lock() + defer r.mu.Unlock() + + title := crash.FullTitle() + if r.onlyOnce && r.attempted[title] { + // Try to reproduce each bug at most 1 time in this mode. + // Since we don't upload bugs/repros to dashboard, it likely won't have + // the reproducer even if we succeeded last time, and will repeatedly + // say it needs a repro. + return + } + log.Logf(1, "scheduled a reproduction of '%v'", title) + r.attempted[title] = true + r.queue = append(r.queue, crash) + + // Ping the loop. + select { + case r.pingQueue <- struct{}{}: + default: + } +} + +func (r *ReproLoop) popCrash() *Crash { + r.mu.Lock() + defer r.mu.Unlock() + + newBetter := func(base, new *Crash) bool { + // First, serve manual requests. + if new.Manual != base.Manual { + return new.Manual + } + // Then, deprioritize hub reproducers. + if new.FromHub != base.FromHub { + return !new.FromHub + } + return false + } + + idx := -1 + for i, crash := range r.queue { + if r.reproducing[crash.FullTitle()] { + continue + } + if idx == -1 || newBetter(r.queue[idx], r.queue[i]) { + idx = i + } + } + if idx == -1 { + return nil + } + crash := r.queue[idx] + r.queue = slices.Delete(r.queue, idx, idx+1) + return crash +} + +func (r *ReproLoop) Loop(ctx context.Context) { + var wg sync.WaitGroup + defer wg.Wait() + + for { + crash := r.popCrash() + for crash == nil { + select { + case <-r.pingQueue: + crash = r.popCrash() + case <-ctx.Done(): + return + } + if crash == nil || !r.mgr.NeedRepro(crash) { + continue + } + } + + // Now wait until we can schedule another runner. + select { + case <-r.parallel: + case <-ctx.Done(): + return + } + + r.mu.Lock() + r.reproducing[crash.FullTitle()] = true + r.adjustPoolSizeLocked() + r.mu.Unlock() + + wg.Add(1) + go func() { + defer wg.Done() + + r.handle(crash) + + r.mu.Lock() + delete(r.reproducing, crash.FullTitle()) + r.adjustPoolSizeLocked() + r.mu.Unlock() + + r.parallel <- struct{}{} + r.pingQueue <- struct{}{} + }() + } +} + +func (r *ReproLoop) handle(crash *Crash) { + log.Logf(0, "start reproducing '%v'", crash.FullTitle()) + + res := r.mgr.RunRepro(crash) + + crepro := false + title := "" + if res.Repro != nil { + crepro = res.Repro.CRepro + title = res.Repro.Report.Title + } + log.Logf(0, "repro finished '%v', repro=%v crepro=%v desc='%v' hub=%v from_dashboard=%v", + crash.FullTitle(), res.Repro != nil, crepro, title, crash.FromHub, crash.FromDashboard, + ) +} + +func (r *ReproLoop) adjustPoolSizeLocked() { + // Avoid the +-1 jitter by considering the repro queue size as well. + // We process same-titled crashes sequentially, so only count unique ones. + uniqueTitles := maps.Clone(r.reproducing) + for _, crash := range r.queue { + uniqueTitles[crash.FullTitle()] = true + } + + needRepros := len(uniqueTitles) + VMs := min(r.reproVMs, r.calculateReproVMs(needRepros)) + r.mgr.ResizeReproPool(VMs) +} diff --git a/syz-manager/repro_test.go b/pkg/manager/repro_test.go index 84ab94a6b..80d224fd5 100644 --- a/syz-manager/repro_test.go +++ b/pkg/manager/repro_test.go @@ -1,7 +1,7 @@ // Copyright 2024 syzkaller project authors. All rights reserved. // Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. -package main +package manager import ( "context" @@ -17,7 +17,7 @@ func TestReproManager(t *testing.T) { mock := &reproMgrMock{ run: make(chan runCallback), } - obj := newReproManager(mock, 3, false) + obj := NewReproLoop(mock, 3, false) ctx, done := context.WithCancel(context.Background()) complete := make(chan struct{}) @@ -53,8 +53,8 @@ func TestReproManager(t *testing.T) { assert.EqualValues(t, 3, mock.reserved.Load()) // Pretend that reproducers have finished. - called.ret <- &ReproResult{crash: &Crash{fromHub: true}} - called2.ret <- &ReproResult{crash: &Crash{fromHub: true}} + called.ret <- &ReproResult{Crash: &Crash{FromHub: true}} + called2.ret <- &ReproResult{Crash: &Crash{FromHub: true}} mock.onVMShutdown(t, obj) } @@ -63,22 +63,22 @@ func TestReproOrder(t *testing.T) { mock := &reproMgrMock{ run: make(chan runCallback), } - obj := newReproManager(mock, 3, false) + obj := NewReproLoop(mock, 3, false) // The right order is A B C. crashes := []*Crash{ { Report: &report.Report{Title: "A"}, - fromDashboard: true, - manual: true, + FromDashboard: true, + Manual: true, }, { Report: &report.Report{Title: "B"}, - fromDashboard: true, + FromDashboard: true, }, { Report: &report.Report{Title: "C"}, - fromHub: true, + FromHub: true, }, } @@ -101,7 +101,7 @@ func TestReproRWRace(t *testing.T) { mock := &reproMgrMock{ run: make(chan runCallback), } - obj := newReproManager(mock, 3, false) + obj := NewReproLoop(mock, 3, false) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -117,18 +117,18 @@ func TestReproRWRace(t *testing.T) { obj.Enqueue(&Crash{Report: &report.Report{Title: "A"}}) obj.Enqueue(&Crash{Report: &report.Report{Title: "A"}}) - assert.True(t, mock.needRepro(nil)) + assert.True(t, mock.NeedRepro(nil)) called := <-mock.run called.ret <- &ReproResult{} // Pretend that processRepro() is finished and // we've written "repro.prog" to the disk. mock.reproProgExist.Store(true) - assert.False(t, mock.needRepro(nil)) + assert.False(t, mock.NeedRepro(nil)) assert.True(t, obj.CanReproMore()) called2 := <-mock.run called2.ret <- &ReproResult{} - assert.False(t, mock.needRepro(nil)) + assert.False(t, mock.NeedRepro(nil)) assert.True(t, obj.CanReproMore()) // Reproducers may be still running. @@ -147,11 +147,11 @@ type runCallback struct { } // Wait until the number of reserved VMs goes to 0. -func (m *reproMgrMock) onVMShutdown(t *testing.T, reproMgr *reproManager) { +func (m *reproMgrMock) onVMShutdown(t *testing.T, reproLoop *ReproLoop) { for i := 0; i < 100; i++ { if m.reserved.Load() == 0 { - assert.True(t, reproMgr.CanReproMore()) - assert.True(t, reproMgr.Empty()) + assert.True(t, reproLoop.CanReproMore()) + assert.True(t, reproLoop.Empty()) return } time.Sleep(10 * time.Millisecond) @@ -159,7 +159,7 @@ func (m *reproMgrMock) onVMShutdown(t *testing.T, reproMgr *reproManager) { t.Fatal("reserved VMs must have dropped to 0") } -func (m *reproMgrMock) runRepro(crash *Crash) *ReproResult { +func (m *reproMgrMock) RunRepro(crash *Crash) *ReproResult { retCh := make(chan *ReproResult) m.run <- runCallback{crash: crash, ret: retCh} ret := <-retCh @@ -167,10 +167,10 @@ func (m *reproMgrMock) runRepro(crash *Crash) *ReproResult { return ret } -func (m *reproMgrMock) needRepro(crash *Crash) bool { +func (m *reproMgrMock) NeedRepro(crash *Crash) bool { return !m.reproProgExist.Load() } -func (m *reproMgrMock) resizeReproPool(VMs int) { +func (m *reproMgrMock) ResizeReproPool(VMs int) { m.reserved.Store(int64(VMs)) } diff --git a/syz-manager/http.go b/syz-manager/http.go index 34cca4d14..4e4b92d00 100644 --- a/syz-manager/http.go +++ b/syz-manager/http.go @@ -630,8 +630,8 @@ func (mgr *Manager) httpFilterPCs(w http.ResponseWriter, r *http.Request) { func (mgr *Manager) collectCrashes(workdir string) ([]*UICrashType, error) { // Note: mu is not locked here. var repros map[string]bool - if !mgr.cfg.VMLess && mgr.reproMgr != nil { - repros = mgr.reproMgr.Reproducing() + if !mgr.cfg.VMLess && mgr.reproLoop != nil { + repros = mgr.reproLoop.Reproducing() } crashdir := filepath.Join(workdir, "crashes") diff --git a/syz-manager/hub.go b/syz-manager/hub.go index 5fd2356f6..58fac33f2 100644 --- a/syz-manager/hub.go +++ b/syz-manager/hub.go @@ -14,6 +14,7 @@ import ( "github.com/google/syzkaller/pkg/flatrpc" "github.com/google/syzkaller/pkg/fuzzer" "github.com/google/syzkaller/pkg/log" + "github.com/google/syzkaller/pkg/manager" "github.com/google/syzkaller/pkg/mgrconfig" "github.com/google/syzkaller/pkg/report" "github.com/google/syzkaller/pkg/report/crash" @@ -60,7 +61,7 @@ func (mgr *Manager) hubSyncLoop(keyGet keyGetter) { } if mgr.cfg.Reproduce && mgr.dash != nil { // Request reproducers from hub only if there is nothing else to reproduce. - hc.needMoreRepros = mgr.reproMgr.Empty + hc.needMoreRepros = mgr.reproLoop.Empty } hc.loop() } @@ -75,7 +76,7 @@ type HubConnector struct { fresh bool hubCorpus map[string]bool newRepros [][]byte - hubReproQueue chan *Crash + hubReproQueue chan *manager.Crash needMoreRepros func() bool keyGet keyGetter @@ -304,8 +305,8 @@ func (hc *HubConnector) processRepros(repros [][]byte) int { if hc.leak { typ = crash.MemoryLeak } - hc.hubReproQueue <- &Crash{ - fromHub: true, + hc.hubReproQueue <- &manager.Crash{ + FromHub: true, Report: &report.Report{ Type: typ, Output: repro, diff --git a/syz-manager/manager.go b/syz-manager/manager.go index 90aa4198a..fc38b74ed 100644 --- a/syz-manager/manager.go +++ b/syz-manager/manager.go @@ -34,6 +34,7 @@ import ( "github.com/google/syzkaller/pkg/gce" "github.com/google/syzkaller/pkg/hash" "github.com/google/syzkaller/pkg/log" + "github.com/google/syzkaller/pkg/manager" "github.com/google/syzkaller/pkg/mgrconfig" "github.com/google/syzkaller/pkg/osutil" "github.com/google/syzkaller/pkg/report" @@ -112,15 +113,15 @@ type Manager struct { dataRaceFrames map[string]bool saturatedCalls map[string]bool - externalReproQueue chan *Crash - crashes chan *Crash + externalReproQueue chan *manager.Crash + crashes chan *manager.Crash benchMu sync.Mutex benchFile *os.File assetStorage *asset.Storage - reproMgr *reproManager + reproLoop *manager.ReproLoop Stats } @@ -153,27 +154,6 @@ const ( const currentDBVersion = 5 -type Crash struct { - instanceIndex int - fromHub bool // this crash was created based on a repro from syz-hub - fromDashboard bool // .. or from dashboard - manual bool - *report.Report -} - -func (c *Crash) FullTitle() string { - if c.Report.Title != "" { - return c.Report.Title - } - // Just use some unique, but stable titles. - if c.fromDashboard { - return fmt.Sprintf("dashboard crash %p", c) - } else if c.fromHub { - return fmt.Sprintf("crash from hub %p", c) - } - panic("the crash is expected to have a report") -} - func main() { if prog.GitRevision == "" { log.Fatalf("bad syz-manager build: build with make, run bin/syz-manager") @@ -249,8 +229,8 @@ func RunManager(mode Mode, cfg *mgrconfig.Config) { memoryLeakFrames: make(map[string]bool), dataRaceFrames: make(map[string]bool), fresh: true, - externalReproQueue: make(chan *Crash, 10), - crashes: make(chan *Crash, 10), + externalReproQueue: make(chan *manager.Crash, 10), + crashes: make(chan *manager.Crash, 10), saturatedCalls: make(map[string]bool), } @@ -313,10 +293,10 @@ func RunManager(mode Mode, cfg *mgrconfig.Config) { return } mgr.pool = vm.NewDispatcher(mgr.vmPool, mgr.fuzzerInstance) - mgr.reproMgr = newReproManager(mgr, mgr.vmPool.Count()-mgr.cfg.FuzzingVMs, mgr.cfg.DashboardOnlyRepro) + mgr.reproLoop = manager.NewReproLoop(mgr, mgr.vmPool.Count()-mgr.cfg.FuzzingVMs, mgr.cfg.DashboardOnlyRepro) ctx := vm.ShutdownCtx() go mgr.processFuzzingResults(ctx) - go mgr.reproMgr.Loop(ctx) + go mgr.reproLoop.Loop(ctx) mgr.pool.Loop(ctx) } @@ -378,14 +358,6 @@ func (mgr *Manager) writeBench() { } } -type ReproResult struct { - crash *Crash // the original crash - repro *repro.Result - strace *repro.StraceResult - stats *repro.Stats - err error -} - func (mgr *Manager) processFuzzingResults(ctx context.Context) { for { select { @@ -394,7 +366,7 @@ func (mgr *Manager) processFuzzingResults(ctx context.Context) { case crash := <-mgr.crashes: needRepro := mgr.saveCrash(crash) if mgr.cfg.Reproduce && needRepro { - mgr.reproMgr.Enqueue(crash) + mgr.reproLoop.Enqueue(crash) } case err := <-mgr.pool.BootErrors: crash := mgr.convertBootError(err) @@ -402,14 +374,14 @@ func (mgr *Manager) processFuzzingResults(ctx context.Context) { mgr.saveCrash(crash) } case crash := <-mgr.externalReproQueue: - if mgr.needRepro(crash) { - mgr.reproMgr.Enqueue(crash) + if mgr.NeedRepro(crash) { + mgr.reproLoop.Enqueue(crash) } } } } -func (mgr *Manager) convertBootError(err error) *Crash { +func (mgr *Manager) convertBootError(err error) *manager.Crash { var bootErr vm.BootErrorer if errors.As(err, &bootErr) { title, output := bootErr.BootError() @@ -424,7 +396,7 @@ func (mgr *Manager) convertBootError(err error) *Crash { Output: output, } } - return &Crash{ + return &manager.Crash{ Report: rep, } } @@ -453,13 +425,13 @@ func reportReproError(err error) { log.Errorf("repro failed: %v", err) } -func (mgr *Manager) runRepro(crash *Crash) *ReproResult { +func (mgr *Manager) RunRepro(crash *manager.Crash) *manager.ReproResult { res, stats, err := repro.Run(crash.Output, mgr.cfg, mgr.enabledFeatures, mgr.reporter, mgr.pool) - ret := &ReproResult{ - crash: crash, - repro: res, - stats: stats, - err: err, + ret := &manager.ReproResult{ + Crash: crash, + Repro: res, + Stats: stats, + Err: err, } if err == nil && res != nil && mgr.cfg.StraceBin != "" { const straceAttempts = 2 @@ -471,7 +443,7 @@ func (mgr *Manager) runRepro(crash *Crash) *ReproResult { // We only want to save strace output if it resulted in the same bug. // Otherwise, it will be hard to reproduce on syzbot and will confuse users. if sameBug { - ret.strace = strace + ret.Strace = strace break } } @@ -482,17 +454,17 @@ func (mgr *Manager) runRepro(crash *Crash) *ReproResult { return ret } -func (mgr *Manager) processRepro(res *ReproResult) { - if res.err != nil { - reportReproError(res.err) +func (mgr *Manager) processRepro(res *manager.ReproResult) { + if res.Err != nil { + reportReproError(res.Err) } - if res.repro == nil { - if res.crash.Title == "" { + if res.Repro == nil { + if res.Crash.Title == "" { log.Logf(1, "repro '%v' not from dashboard, so not reporting the failure", - res.crash.FullTitle()) + res.Crash.FullTitle()) } else { - log.Logf(1, "report repro failure of '%v'", res.crash.Title) - mgr.saveFailedRepro(res.crash.Report, res.stats) + log.Logf(1, "report repro failure of '%v'", res.Crash.Title) + mgr.saveFailedRepro(res.Crash.Report, res.Stats) } } else { mgr.saveRepro(res) @@ -766,8 +738,8 @@ func (mgr *Manager) fuzzerInstance(ctx context.Context, inst *vm.Instance, updIn rep.MachineInfo = machineInfo } if err == nil && rep != nil { - mgr.crashes <- &Crash{ - instanceIndex: inst.Index(), + mgr.crashes <- &manager.Crash{ + InstanceIndex: inst.Index(), Report: rep, } } @@ -820,7 +792,7 @@ func (mgr *Manager) runInstanceInner(ctx context.Context, inst *vm.Instance, inj return rep, vmInfo, nil } -func (mgr *Manager) emailCrash(crash *Crash) { +func (mgr *Manager) emailCrash(crash *manager.Crash) { if len(mgr.cfg.EmailAddrs) == 0 { return } @@ -835,7 +807,7 @@ func (mgr *Manager) emailCrash(crash *Crash) { } } -func (mgr *Manager) saveCrash(crash *Crash) bool { +func (mgr *Manager) saveCrash(crash *manager.Crash) bool { if err := mgr.reporter.Symbolize(crash.Report); err != nil { log.Errorf("failed to symbolize report: %v", err) } @@ -856,7 +828,7 @@ func (mgr *Manager) saveCrash(crash *Crash) bool { if crash.Suppressed { flags += " [suppressed]" } - log.Logf(0, "VM %v: crash: %v%v", crash.instanceIndex, crash.Title, flags) + log.Logf(0, "VM %v: crash: %v%v", crash.InstanceIndex, crash.Title, flags) if mgr.mode == ModeSmokeTest { data, err := json.Marshal(crash.Report) @@ -949,12 +921,12 @@ func (mgr *Manager) saveCrash(crash *Crash) bool { writeOrRemove("tag", []byte(mgr.cfg.Tag)) writeOrRemove("report", crash.Report.Report) writeOrRemove("machineInfo", crash.MachineInfo) - return mgr.needRepro(crash) + return mgr.NeedRepro(crash) } const maxReproAttempts = 3 -func (mgr *Manager) needLocalRepro(crash *Crash) bool { +func (mgr *Manager) needLocalRepro(crash *manager.Crash) bool { if !mgr.cfg.Reproduce || crash.Corrupted || crash.Suppressed { return false } @@ -971,11 +943,11 @@ func (mgr *Manager) needLocalRepro(crash *Crash) bool { return false } -func (mgr *Manager) needRepro(crash *Crash) bool { +func (mgr *Manager) NeedRepro(crash *manager.Crash) bool { if !mgr.cfg.Reproduce { return false } - if crash.fromHub || crash.fromDashboard { + if crash.FromHub || crash.FromDashboard { return true } if !mgr.checkDone.Load() || (mgr.enabledFeatures&flatrpc.FeatureLeak != 0 && @@ -1043,13 +1015,13 @@ func (mgr *Manager) saveFailedRepro(rep *report.Report, stats *repro.Stats) { } } -func (mgr *Manager) saveRepro(res *ReproResult) { - repro := res.repro +func (mgr *Manager) saveRepro(res *manager.ReproResult) { + repro := res.Repro opts := fmt.Sprintf("# %+v\n", repro.Opts) progText := repro.Prog.Serialize() // Append this repro to repro list to send to hub if it didn't come from hub originally. - if !res.crash.fromHub { + if !res.Crash.FromHub { progForHub := []byte(fmt.Sprintf("# %+v\n# %v\n# %v\n%s", repro.Opts, repro.Report.Title, mgr.cfg.Tag, progText)) mgr.mu.Lock() @@ -1082,11 +1054,11 @@ func (mgr *Manager) saveRepro(res *ReproResult) { output := report.Output var crashFlags dashapi.CrashFlags - if res.strace != nil { + if res.Strace != nil { // If syzkaller managed to successfully run the repro with strace, send // the report and the output generated under strace. - report = res.strace.Report - output = res.strace.Output + report = res.Strace.Report + output = res.Strace.Output crashFlags = dashapi.CrashUnderStrace } @@ -1102,9 +1074,9 @@ func (mgr *Manager) saveRepro(res *ReproResult) { ReproOpts: repro.Opts.Serialize(), ReproSyz: progText, ReproC: cprogText, - ReproLog: truncateReproLog(res.stats.FullLog()), + ReproLog: truncateReproLog(res.Stats.FullLog()), Assets: mgr.uploadReproAssets(repro), - OriginalTitle: res.crash.Title, + OriginalTitle: res.Crash.Title, } setGuiltyFiles(dc, report) if _, err := mgr.dash.ReportCrash(dc); err != nil { @@ -1142,22 +1114,22 @@ func (mgr *Manager) saveRepro(res *ReproResult) { log.Logf(0, "failed to write crash asset: type %d, write error %v", typ, err) } }) - if res.strace != nil { + if res.Strace != nil { // Unlike dashboard reporting, we save strace output separately from the original log. - if res.strace.Error != nil { + if res.Strace.Error != nil { osutil.WriteFile(filepath.Join(dir, "strace.error"), - []byte(fmt.Sprintf("%v", res.strace.Error))) + []byte(fmt.Sprintf("%v", res.Strace.Error))) } - if len(res.strace.Output) > 0 { - osutil.WriteFile(filepath.Join(dir, "strace.log"), res.strace.Output) + if len(res.Strace.Output) > 0 { + osutil.WriteFile(filepath.Join(dir, "strace.log"), res.Strace.Output) } } - if reproLog := res.stats.FullLog(); len(reproLog) > 0 { + if reproLog := res.Stats.FullLog(); len(reproLog) > 0 { osutil.WriteFile(filepath.Join(dir, "repro.stats"), reproLog) } } -func (mgr *Manager) resizeReproPool(size int) { +func (mgr *Manager) ResizeReproPool(size int) { mgr.pool.ReserveForRun(size) } @@ -1534,11 +1506,11 @@ func (mgr *Manager) fuzzerLoop(fuzzer *fuzzer.Fuzzer) { go mgr.hubSyncLoop(pickGetter(mgr.cfg.HubKey)) } else { mgr.phase = phaseTriagedHub - mgr.reproMgr.StartReproduction() + mgr.reproLoop.StartReproduction() } } else if mgr.phase == phaseQueriedHub { mgr.phase = phaseTriagedHub - mgr.reproMgr.StartReproduction() + mgr.reproLoop.StartReproduction() } mgr.mu.Unlock() } @@ -1555,7 +1527,7 @@ func (mgr *Manager) hubIsUnreachable() { if mgr.phase == phaseTriagedCorpus { dash = mgr.dash mgr.phase = phaseTriagedHub - mgr.reproMgr.StartReproduction() + mgr.reproLoop.StartReproduction() log.Errorf("did not manage to connect to syz-hub; moving forward") } mgr.mu.Unlock() @@ -1646,7 +1618,7 @@ func (mgr *Manager) dashboardReporter() { func (mgr *Manager) dashboardReproTasks() { for range time.NewTicker(20 * time.Minute).C { - if !mgr.reproMgr.CanReproMore() { + if !mgr.reproLoop.CanReproMore() { // We don't need reproducers at the moment. continue } @@ -1656,9 +1628,9 @@ func (mgr *Manager) dashboardReproTasks() { continue } if len(resp.CrashLog) > 0 { - mgr.externalReproQueue <- &Crash{ - fromDashboard: true, - manual: resp.Type == dashapi.ManualLog, + mgr.externalReproQueue <- &manager.Crash{ + FromDashboard: true, + Manual: resp.Type == dashapi.ManualLog, Report: &report.Report{ Title: resp.Title, Output: resp.CrashLog, diff --git a/syz-manager/repro.go b/syz-manager/repro.go deleted file mode 100644 index 3232bd9f3..000000000 --- a/syz-manager/repro.go +++ /dev/null @@ -1,230 +0,0 @@ -// Copyright 2024 syzkaller project authors. All rights reserved. -// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. - -package main - -import ( - "context" - "maps" - "slices" - "sync" - - "github.com/google/syzkaller/pkg/log" - "github.com/google/syzkaller/pkg/stat" -) - -type reproManagerView interface { - runRepro(crash *Crash) *ReproResult // TODO: consider moving runRepro() to repro.go. - needRepro(crash *Crash) bool - resizeReproPool(size int) -} - -type reproManager struct { - statNumReproducing *stat.Val - statPending *stat.Val - - onlyOnce bool - mgr reproManagerView - parallel chan struct{} - pingQueue chan struct{} - reproVMs int - - mu sync.Mutex - queue []*Crash - reproducing map[string]bool - attempted map[string]bool -} - -func newReproManager(mgr reproManagerView, reproVMs int, onlyOnce bool) *reproManager { - ret := &reproManager{ - mgr: mgr, - onlyOnce: onlyOnce, - parallel: make(chan struct{}, reproVMs), - reproVMs: reproVMs, - reproducing: map[string]bool{}, - pingQueue: make(chan struct{}, 1), - attempted: map[string]bool{}, - } - ret.statNumReproducing = stat.New("reproducing", "Number of crashes being reproduced", - stat.Console, stat.NoGraph, func() int { - ret.mu.Lock() - defer ret.mu.Unlock() - return len(ret.reproducing) - }) - ret.statPending = stat.New("pending", "Number of pending repro tasks", - stat.Console, stat.NoGraph, func() int { - ret.mu.Lock() - defer ret.mu.Unlock() - return len(ret.queue) - }) - return ret -} - -// startReproduction() is assumed to be called only once. -// The agument is the maximum number of VMs dedicated to the bug reproduction. -func (m *reproManager) StartReproduction() { - count := 0 - for ; m.calculateReproVMs(count+1) <= m.reproVMs; count++ { - m.parallel <- struct{}{} - } - log.Logf(0, "starting bug reproductions (max %d VMs, %d repros)", m.reproVMs, count) -} - -func (m *reproManager) calculateReproVMs(repros int) int { - // Let's allocate 1.33 VMs per a reproducer thread. - if m.reproVMs == 1 && repros == 1 { - // With one exception -- if we have only one VM, let's still do one repro. - return 1 - } - return (repros*4 + 2) / 3 -} - -func (m *reproManager) CanReproMore() bool { - return len(m.parallel) != 0 -} - -func (m *reproManager) Reproducing() map[string]bool { - m.mu.Lock() - defer m.mu.Unlock() - return maps.Clone(m.reproducing) -} - -// Empty returns true if there are neither running nor planned bug reproductions. -func (m *reproManager) Empty() bool { - m.mu.Lock() - defer m.mu.Unlock() - return len(m.reproducing) == 0 && len(m.queue) == 0 -} - -func (m *reproManager) Enqueue(crash *Crash) { - m.mu.Lock() - defer m.mu.Unlock() - - title := crash.FullTitle() - if m.onlyOnce && m.attempted[title] { - // Try to reproduce each bug at most 1 time in this mode. - // Since we don't upload bugs/repros to dashboard, it likely won't have - // the reproducer even if we succeeded last time, and will repeatedly - // say it needs a repro. - return - } - log.Logf(1, "scheduled a reproduction of '%v'", title) - m.attempted[title] = true - m.queue = append(m.queue, crash) - - // Ping the loop. - select { - case m.pingQueue <- struct{}{}: - default: - } -} - -func (m *reproManager) popCrash() *Crash { - m.mu.Lock() - defer m.mu.Unlock() - - newBetter := func(base, new *Crash) bool { - // First, serve manual requests. - if new.manual != base.manual { - return new.manual - } - // Then, deprioritize hub reproducers. - if new.fromHub != base.fromHub { - return !new.fromHub - } - return false - } - - idx := -1 - for i, crash := range m.queue { - if m.reproducing[crash.FullTitle()] { - continue - } - if idx == -1 || newBetter(m.queue[idx], m.queue[i]) { - idx = i - } - } - if idx == -1 { - return nil - } - crash := m.queue[idx] - m.queue = slices.Delete(m.queue, idx, idx+1) - return crash -} - -func (m *reproManager) Loop(ctx context.Context) { - var wg sync.WaitGroup - defer wg.Wait() - - for { - crash := m.popCrash() - for crash == nil { - select { - case <-m.pingQueue: - crash = m.popCrash() - case <-ctx.Done(): - return - } - if crash == nil || !m.mgr.needRepro(crash) { - continue - } - } - - // Now wait until we can schedule another runner. - select { - case <-m.parallel: - case <-ctx.Done(): - return - } - - m.mu.Lock() - m.reproducing[crash.FullTitle()] = true - m.adjustPoolSizeLocked() - m.mu.Unlock() - - wg.Add(1) - go func() { - defer wg.Done() - - m.handle(crash) - - m.mu.Lock() - delete(m.reproducing, crash.FullTitle()) - m.adjustPoolSizeLocked() - m.mu.Unlock() - - m.parallel <- struct{}{} - m.pingQueue <- struct{}{} - }() - } -} - -func (m *reproManager) handle(crash *Crash) { - log.Logf(0, "start reproducing '%v'", crash.FullTitle()) - - res := m.mgr.runRepro(crash) - - crepro := false - title := "" - if res.repro != nil { - crepro = res.repro.CRepro - title = res.repro.Report.Title - } - log.Logf(0, "repro finished '%v', repro=%v crepro=%v desc='%v' hub=%v from_dashboard=%v", - crash.FullTitle(), res.repro != nil, crepro, title, crash.fromHub, crash.fromDashboard, - ) -} - -func (m *reproManager) adjustPoolSizeLocked() { - // Avoid the +-1 jitter by considering the repro queue size as well. - - // We process same-titled crashes sequentially, so only count unique ones. - uniqueTitles := maps.Clone(m.reproducing) - for _, crash := range m.queue { - uniqueTitles[crash.FullTitle()] = true - } - - needRepros := len(uniqueTitles) - VMs := min(m.reproVMs, m.calculateReproVMs(needRepros)) - m.mgr.resizeReproPool(VMs) -} diff --git a/syz-manager/snapshot.go b/syz-manager/snapshot.go index 1413ac744..962589926 100644 --- a/syz-manager/snapshot.go +++ b/syz-manager/snapshot.go @@ -10,10 +10,11 @@ import ( "fmt" "time" - "github.com/google/flatbuffers/go" + flatbuffers "github.com/google/flatbuffers/go" "github.com/google/syzkaller/pkg/flatrpc" "github.com/google/syzkaller/pkg/fuzzer/queue" "github.com/google/syzkaller/pkg/log" + "github.com/google/syzkaller/pkg/manager" "github.com/google/syzkaller/vm" "github.com/google/syzkaller/vm/dispatcher" ) @@ -74,7 +75,7 @@ func (mgr *Manager) snapshotLoop(ctx context.Context, inst *vm.Instance) error { fmt.Fprintf(buf, "program:\n%s\n", req.Prog.Serialize()) buf.Write(rep.Output) rep.Output = buf.Bytes() - mgr.crashes <- &Crash{Report: rep} + mgr.crashes <- &manager.Crash{Report: rep} } req.Done(res) |
