diff options
| author | Aleksandr Nogikh <nogikh@google.com> | 2024-07-05 15:41:55 +0200 |
|---|---|---|
| committer | Aleksandr Nogikh <nogikh@google.com> | 2024-07-11 09:03:37 +0000 |
| commit | 6ca1eb5a578bb1421ad0f3dbde675eb34647e6d7 (patch) | |
| tree | 935c35d1225bcd55b0d413796168c758734af0f9 /syz-manager/manager.go | |
| parent | fc2b083017012b4afbd9324fc6525e34a19aa0b3 (diff) | |
all: transition to instance.Pool
Rely on instance.Pool to perform fuzzing and do bug reproductions.
Extract the reproduction queue logic to separate testable class.
Diffstat (limited to 'syz-manager/manager.go')
| -rw-r--r-- | syz-manager/manager.go | 377 |
1 files changed, 96 insertions, 281 deletions
diff --git a/syz-manager/manager.go b/syz-manager/manager.go index 396ffb97e..e1c2ad050 100644 --- a/syz-manager/manager.go +++ b/syz-manager/manager.go @@ -47,6 +47,7 @@ import ( "github.com/google/syzkaller/prog" "github.com/google/syzkaller/sys/targets" "github.com/google/syzkaller/vm" + "github.com/google/syzkaller/vm/dispatcher" ) var ( @@ -72,6 +73,7 @@ type Manager struct { cfg *mgrconfig.Config mode Mode vmPool *vm.Pool + pool *dispatcher.Pool[*vm.Instance] target *prog.Target sysTarget *targets.Target reporter *report.Reporter @@ -108,9 +110,8 @@ type Manager struct { dataRaceFrames map[string]bool saturatedCalls map[string]bool - needMoreRepros chan chan bool externalReproQueue chan *Crash - reproRequest chan chan map[string]bool + crashes chan *Crash // For checking that files that we are using are not changing under us. // Maps file name to modification time. @@ -123,6 +124,8 @@ type Manager struct { bootTime stats.AverageValue[time.Duration] + reproMgr *reproManager + Stats } @@ -239,12 +242,15 @@ func RunManager(cfg *mgrconfig.Config) { fresh: true, vmStop: make(chan bool), externalReproQueue: make(chan *Crash, 10), - needMoreRepros: make(chan chan bool), - reproRequest: make(chan chan map[string]bool), + crashes: make(chan *Crash, 10), usedFiles: make(map[string]time.Time), saturatedCalls: make(map[string]bool), } + if *flagDebug { + mgr.cfg.Procs = 1 + } + mgr.initStats() if mode == ModeFuzzing || mode == ModeCorpusTriage { go mgr.preloadCorpus() @@ -297,7 +303,13 @@ func RunManager(cfg *mgrconfig.Config) { <-vm.Shutdown return } - mgr.vmLoop() + ctx := vm.ShutdownCtx() + mgr.pool = vm.NewDispatcher(mgr.vmPool, mgr.fuzzerInstance) + mgr.reproMgr = newReproManager(mgr, mgr.vmPool.Count()-mgr.cfg.FuzzingVMs, mgr.cfg.DashboardOnlyRepro) + go mgr.processFuzzingResults(ctx) + go mgr.checkUsedFiles() + go mgr.reproMgr.Loop(ctx) + mgr.pool.Loop(ctx) } // Exit successfully in special operation modes. @@ -358,14 +370,7 @@ func (mgr *Manager) writeBench() { } } -type RunResult struct { - idx int - crash *Crash - err error -} - type ReproResult struct { - instances []int report0 *report.Report // the original report we started reproducing repro *repro.Result strace *repro.StraceResult @@ -376,135 +381,25 @@ type ReproResult struct { originalTitle string // crash title before we started bug reproduction } -// Manager needs to be refactored (#605). -// nolint: gocyclo, gocognit, funlen -func (mgr *Manager) vmLoop() { - log.Logf(0, "booting test machines...") - log.Logf(0, "wait for the connection from test machine...") - instancesPerRepro := 3 - vmCount := mgr.vmPool.Count() - maxReproVMs := vmCount - mgr.cfg.FuzzingVMs - if instancesPerRepro > maxReproVMs && maxReproVMs > 0 { - instancesPerRepro = maxReproVMs - } - instances := SequentialResourcePool(vmCount, 5*time.Second) - runDone := make(chan *RunResult, 1) - pendingRepro := make(map[*Crash]bool) - reproducing := make(map[string]bool) - attemptedRepros := make(map[string]bool) - var reproQueue []*Crash - reproDone := make(chan *ReproResult, 1) - stopPending := false - shutdown := vm.Shutdown - for shutdown != nil || instances.Len() != vmCount { - mgr.mu.Lock() - phase := mgr.phase - mgr.mu.Unlock() - - for crash := range pendingRepro { - if reproducing[crash.Title] { - continue - } - delete(pendingRepro, crash) - if !mgr.needRepro(crash) { - continue - } - if mgr.cfg.DashboardOnlyRepro && attemptedRepros[crash.Title] { - // Try to reproduce each bug at most 1 time in this mode. - // Since we don't upload bugs/repros to dashboard, it likely won't have - // the reproducer even if we succeeded last time, and will repeatedly - // say it needs a repro. - continue - } - log.Logf(1, "loop: add to repro queue '%v'", crash.Title) - attemptedRepros[crash.Title] = true - reproducing[crash.Title] = true - reproQueue = append(reproQueue, crash) - } - - log.Logf(1, "loop: phase=%v shutdown=%v instances=%v/%v %+v repro: pending=%v reproducing=%v queued=%v", - phase, shutdown == nil, instances.Len(), vmCount, instances.Snapshot(), - len(pendingRepro), len(reproducing), len(reproQueue)) - - canRepro := func() bool { - return phase >= phaseTriagedHub && len(reproQueue) != 0 && - (mgr.statNumReproducing.Val()+1)*instancesPerRepro <= maxReproVMs - } - - if shutdown != nil { - for canRepro() { - vmIndexes := instances.Take(instancesPerRepro) - if vmIndexes == nil { - break - } - last := len(reproQueue) - 1 - crash := reproQueue[last] - reproQueue[last] = nil - reproQueue = reproQueue[:last] - mgr.statNumReproducing.Add(1) - log.Logf(0, "loop: starting repro of '%v' on instances %+v", crash.Title, vmIndexes) - go func() { - reproDone <- mgr.runRepro(crash, vmIndexes, instances.Put) - }() - } - for !canRepro() { - idx := instances.TakeOne() - if idx == nil { - break - } - log.Logf(1, "loop: starting instance %v", *idx) - go func() { - crash, err := mgr.runInstance(*idx) - runDone <- &RunResult{*idx, crash, err} - }() - } - } - - var stopRequest chan bool - if !stopPending && canRepro() { - stopRequest = mgr.vmStop - } - - wait: +func (mgr *Manager) processFuzzingResults(ctx context.Context) { + for { select { - case <-instances.Freed: - // An instance has been released. - case stopRequest <- true: - log.Logf(1, "loop: issued stop request") - stopPending = true - case res := <-runDone: - log.Logf(1, "loop: instance %v finished, crash=%v", res.idx, res.crash != nil) - if res.err != nil && shutdown != nil { - log.Logf(0, "%v", res.err) - } - stopPending = false - instances.Put(res.idx) - // On shutdown qemu crashes with "qemu: terminating on signal 2", - // which we detect as "lost connection". Don't save that as crash. - if shutdown != nil && res.crash != nil { - needRepro := mgr.saveCrash(res.crash) - if needRepro { - log.Logf(1, "loop: add pending repro for '%v'", res.crash.Title) - pendingRepro[res.crash] = true - } + case <-ctx.Done(): + return + case crash := <-mgr.crashes: + needRepro := mgr.saveCrash(crash) + if mgr.cfg.Reproduce && needRepro { + mgr.reproMgr.Enqueue(crash) } - case res := <-reproDone: - mgr.statNumReproducing.Add(-1) - crepro := false - title := "" - if res.repro != nil { - crepro = res.repro.CRepro - title = res.repro.Report.Title + case err := <-mgr.pool.BootErrors: + crash := mgr.convertBootError(err) + if crash != nil { + mgr.saveCrash(crash) } - log.Logf(0, "loop: repro on %+v finished '%v', repro=%v crepro=%v desc='%v'"+ - " hub=%v from_dashboard=%v", - res.instances, res.report0.Title, res.repro != nil, crepro, title, - res.fromHub, res.fromDashboard, - ) + case res := <-mgr.reproMgr.Done: if res.err != nil { reportReproError(res.err) } - delete(reproducing, res.report0.Title) if res.repro == nil { if res.fromHub { log.Logf(1, "repro '%v' came from syz-hub, not reporting the failure", @@ -516,25 +411,34 @@ func (mgr *Manager) vmLoop() { } else { mgr.saveRepro(res) } - case <-shutdown: - log.Logf(1, "loop: shutting down...") - shutdown = nil case crash := <-mgr.externalReproQueue: - log.Logf(1, "loop: got repro request") - pendingRepro[crash] = true - case reply := <-mgr.needMoreRepros: - reply <- phase >= phaseTriagedHub && - len(reproQueue)+len(pendingRepro)+len(reproducing) == 0 - goto wait - case reply := <-mgr.reproRequest: - repros := make(map[string]bool) - for title := range reproducing { - repros[title] = true + if mgr.needRepro(crash) { + mgr.reproMgr.Enqueue(crash) + } + } + } +} + +func (mgr *Manager) convertBootError(err error) *Crash { + var bootErr vm.BootErrorer + if errors.As(err, &bootErr) { + title, output := bootErr.BootError() + rep := mgr.reporter.Parse(output) + if rep != nil && rep.Type == crash_pkg.UnexpectedReboot { + // Avoid detecting any boot crash as "unexpected kernel reboot". + rep = mgr.reporter.ParseFrom(output, rep.SkipPos) + } + if rep == nil { + rep = &report.Report{ + Title: title, + Output: output, } - reply <- repros - goto wait + } + return &Crash{ + Report: rep, } } + return nil } func reportReproError(err error) { @@ -560,10 +464,9 @@ func reportReproError(err error) { log.Errorf("repro failed: %v", err) } -func (mgr *Manager) runRepro(crash *Crash, vmIndexes []int, putInstances func(...int)) *ReproResult { - res, stats, err := repro.Run(crash.Output, mgr.cfg, mgr.enabledFeatures, mgr.reporter, mgr.vmPool, vmIndexes) +func (mgr *Manager) runRepro(crash *Crash) *ReproResult { + res, stats, err := repro.Run(crash.Output, mgr.cfg, mgr.enabledFeatures, mgr.reporter, mgr.pool) ret := &ReproResult{ - instances: vmIndexes, report0: crash.Report, repro: res, stats: stats, @@ -573,13 +476,9 @@ func (mgr *Manager) runRepro(crash *Crash, vmIndexes []int, putInstances func(.. originalTitle: crash.Title, } if err == nil && res != nil && mgr.cfg.StraceBin != "" { - // We need only one instance to get strace output, release the rest. - putInstances(vmIndexes[1:]...) - defer putInstances(vmIndexes[0]) - const straceAttempts = 2 for i := 1; i <= straceAttempts; i++ { - strace := repro.RunStrace(res, mgr.cfg, mgr.reporter, mgr.vmPool, vmIndexes[0]) + strace := repro.RunStrace(res, mgr.cfg, mgr.reporter, mgr.pool) sameBug := strace.IsSameBug(res) log.Logf(0, "strace run attempt %d/%d for '%s': same bug %v, error %v", i, straceAttempts, res.Report.Title, sameBug, strace.Error) @@ -590,72 +489,10 @@ func (mgr *Manager) runRepro(crash *Crash, vmIndexes []int, putInstances func(.. break } } - } else { - putInstances(vmIndexes...) } return ret } -type ResourcePool struct { - ids []int - mu sync.RWMutex - Freed chan interface{} -} - -func SequentialResourcePool(count int, delay time.Duration) *ResourcePool { - ret := &ResourcePool{Freed: make(chan interface{}, 1)} - go func() { - for i := 0; i < count; i++ { - ret.Put(i) - time.Sleep(delay) - } - }() - return ret -} - -func (pool *ResourcePool) Put(ids ...int) { - pool.mu.Lock() - defer pool.mu.Unlock() - pool.ids = append(pool.ids, ids...) - // Notify the listener. - select { - case pool.Freed <- true: - default: - } -} - -func (pool *ResourcePool) Len() int { - pool.mu.RLock() - defer pool.mu.RUnlock() - return len(pool.ids) -} - -func (pool *ResourcePool) Snapshot() []int { - pool.mu.RLock() - defer pool.mu.RUnlock() - return append([]int{}, pool.ids...) -} - -func (pool *ResourcePool) Take(cnt int) []int { - pool.mu.Lock() - defer pool.mu.Unlock() - totalItems := len(pool.ids) - if totalItems < cnt { - return nil - } - ret := append([]int{}, pool.ids[totalItems-cnt:]...) - pool.ids = pool.ids[:totalItems-cnt] - return ret -} - -func (pool *ResourcePool) TakeOne() *int { - ret := pool.Take(1) - if ret == nil { - return nil - } - return &ret[0] -} - func (mgr *Manager) preloadCorpus() { corpusDB, err := db.Open(filepath.Join(mgr.cfg.Workdir, "corpus.db"), true) if err != nil { @@ -857,13 +694,13 @@ func containsDisabled(p *prog.Prog, enabled map[*prog.Syscall]bool) bool { return false } -func (mgr *Manager) runInstance(index int) (*Crash, error) { - mgr.checkUsedFiles() +func (mgr *Manager) fuzzerInstance(ctx context.Context, inst *vm.Instance, updInfo dispatcher.UpdateInfo) { + index := inst.Index() instanceName := fmt.Sprintf("vm-%d", index) injectExec := make(chan bool, 10) - mgr.serv.CreateInstance(instanceName, injectExec) + mgr.serv.CreateInstance(instanceName, injectExec, updInfo) - rep, vmInfo, err := mgr.runInstanceInner(index, instanceName, injectExec) + rep, vmInfo, err := mgr.runInstanceInner(ctx, inst, instanceName, injectExec) lastExec, machineInfo := mgr.serv.ShutdownInstance(instanceName, rep != nil) if rep != nil { prependExecuting(rep, lastExec) @@ -872,48 +709,21 @@ func (mgr *Manager) runInstance(index int) (*Crash, error) { } rep.MachineInfo = machineInfo } - - // Error that is not a VM crash. - if err != nil { - return nil, err - } - // No crash. - if rep == nil { - return nil, nil + if err == nil && rep != nil { + mgr.crashes <- &Crash{ + instanceName: instanceName, + Report: rep, + } } - crash := &Crash{ - instanceName: instanceName, - Report: rep, + if err != nil { + log.Logf(1, "%s: failed with error: %v", instanceName, err) } - return crash, nil } -func (mgr *Manager) runInstanceInner(index int, instanceName string, injectExec <-chan bool) ( - *report.Report, []byte, error) { +func (mgr *Manager) runInstanceInner(ctx context.Context, inst *vm.Instance, instanceName string, + injectExec <-chan bool) (*report.Report, []byte, error) { start := time.Now() - inst, err := mgr.vmPool.Create(index) - if err != nil { - var bootErr vm.BootErrorer - if errors.As(err, &bootErr) { - title, output := bootErr.BootError() - rep := mgr.reporter.Parse(output) - if rep != nil && rep.Type == crash_pkg.UnexpectedReboot { - // Avoid detecting any boot crash as "unexpected kernel reboot". - rep = mgr.reporter.ParseFrom(output, rep.SkipPos) - } - if rep == nil { - rep = &report.Report{ - Title: title, - Output: output, - } - } - return rep, nil, nil - } - return nil, nil, fmt.Errorf("failed to create instance: %w", err) - } - defer inst.Close() - fwdAddr, err := inst.Forward(mgr.serv.Port) if err != nil { return nil, nil, fmt.Errorf("failed to setup port forwarding: %w", err) @@ -929,10 +739,6 @@ func (mgr *Manager) runInstanceInner(index int, instanceName string, injectExec } } - if *flagDebug { - mgr.cfg.Procs = 1 - } - // Run the fuzzer binary. mgr.bootTime.Save(time.Since(start)) start = time.Now() @@ -940,7 +746,7 @@ func (mgr *Manager) runInstanceInner(index int, instanceName string, injectExec addrPort := strings.Split(fwdAddr, ":") cmd := fmt.Sprintf("%v runner %v %v %v", executorBin, instanceName, addrPort[0], addrPort[1]) _, rep, err := inst.Run(mgr.cfg.Timeouts.VMRunningTime, mgr.reporter, cmd, - vm.ExitTimeout, vm.StopChan(mgr.vmStop), vm.InjectExecuting(injectExec), + vm.ExitTimeout, vm.StopContext(ctx), vm.InjectExecuting(injectExec), vm.EarlyFinishCb(func() { // Depending on the crash type and kernel config, fuzzing may continue // running for several seconds even after kernel has printed a crash report. @@ -1314,6 +1120,10 @@ func (mgr *Manager) saveRepro(res *ReproResult) { } } +func (mgr *Manager) resizeReproPool(size int) { + mgr.pool.ReserveForRun(size) +} + func (mgr *Manager) uploadReproAssets(repro *repro.Result) []dashapi.NewAsset { if mgr.assetStorage == nil { return nil @@ -1659,7 +1469,7 @@ func (mgr *Manager) fuzzerLoop(fuzzer *fuzzer.Fuzzer) { if mgr.cfg.Cover { // Distribute new max signal over all instances. newSignal := fuzzer.Cover.GrabSignalDelta() - log.Logf(2, "distributing %d new signal", len(newSignal)) + log.Logf(3, "distributing %d new signal", len(newSignal)) if len(newSignal) != 0 { mgr.serv.DistributeSignalDelta(newSignal) } @@ -1680,9 +1490,11 @@ func (mgr *Manager) fuzzerLoop(fuzzer *fuzzer.Fuzzer) { go mgr.hubSyncLoop(pickGetter(mgr.cfg.HubKey)) } else { mgr.phase = phaseTriagedHub + mgr.reproMgr.StartReproduction() } } else if mgr.phase == phaseQueriedHub { mgr.phase = phaseTriagedHub + mgr.reproMgr.StartReproduction() } mgr.mu.Unlock() } @@ -1695,6 +1507,7 @@ func (mgr *Manager) hubIsUnreachable() { if mgr.phase == phaseTriagedCorpus { dash = mgr.dash mgr.phase = phaseTriagedHub + mgr.reproMgr.StartReproduction() log.Errorf("did not manage to connect to syz-hub; moving forward") } mgr.mu.Unlock() @@ -1730,17 +1543,19 @@ func (mgr *Manager) collectUsedFiles() { } func (mgr *Manager) checkUsedFiles() { - for f, mod := range mgr.usedFiles { - stat, err := os.Stat(f) - if err != nil { - log.Fatalf("failed to stat %v: %v", f, err) - } - if mod != stat.ModTime() { - log.Fatalf("file %v that syz-manager uses has been modified by an external program\n"+ - "this can lead to arbitrary syz-manager misbehavior\n"+ - "modification time has changed: %v -> %v\n"+ - "don't modify files that syz-manager uses. exiting to prevent harm", - f, mod, stat.ModTime()) + for range time.NewTicker(30 * time.Second).C { + for f, mod := range mgr.usedFiles { + stat, err := os.Stat(f) + if err != nil { + log.Fatalf("failed to stat %v: %v", f, err) + } + if mod != stat.ModTime() { + log.Fatalf("file %v that syz-manager uses has been modified by an external program\n"+ + "this can lead to arbitrary syz-manager misbehavior\n"+ + "modification time has changed: %v -> %v\n"+ + "don't modify files that syz-manager uses. exiting to prevent harm", + f, mod, stat.ModTime()) + } } } } @@ -1786,10 +1601,9 @@ func (mgr *Manager) dashboardReporter() { } func (mgr *Manager) dashboardReproTasks() { + seq := 0 for range time.NewTicker(20 * time.Minute).C { - needReproReply := make(chan bool) - mgr.needMoreRepros <- needReproReply - if !<-needReproReply { + if !mgr.reproMgr.CanReproMore() { // We don't need reproducers at the moment. continue } @@ -1801,7 +1615,8 @@ func (mgr *Manager) dashboardReproTasks() { if len(resp.CrashLog) > 0 { title := resp.Title if title == "" { - title = "repro from the dashboard" + seq++ + title = fmt.Sprintf("repro #%d from the dashboard", seq) } mgr.externalReproQueue <- &Crash{ fromDashboard: true, |
