diff options
| author | Aleksandr Nogikh <nogikh@google.com> | 2022-03-31 14:50:18 +0000 |
|---|---|---|
| committer | Aleksandr Nogikh <wp32pw@gmail.com> | 2022-04-29 17:16:33 +0200 |
| commit | 7f2625f8905f7d981475fa314c3d06c56c7bd4b0 (patch) | |
| tree | df492e86214a9d26fec980c1933d6628399a500e | |
| parent | 44a5ca633e186c5836010366c515a4017836121b (diff) | |
syz-manager: refactor vmLoop()
Move instance management routines out of vmLoop().
Move part of repro logic out of vmLoop().
| -rw-r--r-- | syz-manager/manager.go | 133 |
1 files changed, 95 insertions, 38 deletions
diff --git a/syz-manager/manager.go b/syz-manager/manager.go index bb331b542..e34e59322 100644 --- a/syz-manager/manager.go +++ b/syz-manager/manager.go @@ -313,23 +313,15 @@ func (mgr *Manager) vmLoop() { if instancesPerRepro > maxReproVMs && maxReproVMs > 0 { instancesPerRepro = maxReproVMs } - bootInstance := make(chan int) - go func() { - for i := 0; i < vmCount; i++ { - bootInstance <- i - time.Sleep(10 * time.Second * mgr.cfg.Timeouts.Scale) - } - }() - var instances []int + instances := SequentialResourcePool(vmCount, 10*time.Second*mgr.cfg.Timeouts.Scale) runDone := make(chan *RunResult, 1) pendingRepro := make(map[*Crash]bool) reproducing := make(map[string]bool) - reproInstances := 0 var reproQueue []*Crash reproDone := make(chan *ReproResult, 1) stopPending := false shutdown := vm.Shutdown - for shutdown != nil || len(instances) != vmCount { + for shutdown != nil || instances.Len() != vmCount { mgr.mu.Lock() phase := mgr.phase mgr.mu.Unlock() @@ -348,46 +340,39 @@ func (mgr *Manager) vmLoop() { } log.Logf(1, "loop: phase=%v shutdown=%v instances=%v/%v %+v repro: pending=%v reproducing=%v queued=%v", - phase, shutdown == nil, len(instances), vmCount, instances, + phase, shutdown == nil, instances.Len(), vmCount, instances.Snapshot(), len(pendingRepro), len(reproducing), len(reproQueue)) canRepro := func() bool { return phase >= phaseTriagedHub && len(reproQueue) != 0 && - reproInstances+instancesPerRepro <= maxReproVMs + (int(atomic.LoadUint32(&mgr.numReproducing))+1)*instancesPerRepro <= maxReproVMs } if shutdown != nil { - for canRepro() && len(instances) >= instancesPerRepro { + for canRepro() { + vmIndexes := instances.Take(instancesPerRepro) + if vmIndexes == nil { + break + } last := len(reproQueue) - 1 crash := reproQueue[last] reproQueue[last] = nil reproQueue = reproQueue[:last] - vmIndexes := append([]int{}, instances[len(instances)-instancesPerRepro:]...) - instances = instances[:len(instances)-instancesPerRepro] - reproInstances += instancesPerRepro atomic.AddUint32(&mgr.numReproducing, 1) log.Logf(1, "loop: starting repro of '%v' on instances %+v", crash.Title, vmIndexes) go func() { - features := mgr.checkResult.Features - res, stats, err := repro.Run(crash.Output, mgr.cfg, features, mgr.reporter, mgr.vmPool, vmIndexes) - reproDone <- &ReproResult{ - instances: vmIndexes, - report0: crash.Report, - res: res, - stats: stats, - err: err, - hub: crash.hub, - } + reproDone <- mgr.runRepro(crash, vmIndexes) }() } - for !canRepro() && len(instances) != 0 { - last := len(instances) - 1 - idx := instances[last] - instances = instances[:last] - log.Logf(1, "loop: starting instance %v", idx) + for !canRepro() { + idx := instances.TakeOne() + if idx == nil { + break + } + log.Logf(1, "loop: starting instance %v", *idx) go func() { - crash, err := mgr.runInstance(idx) - runDone <- &RunResult{idx, crash, err} + crash, err := mgr.runInstance(*idx) + runDone <- &RunResult{*idx, crash, err} }() } } @@ -399,8 +384,8 @@ func (mgr *Manager) vmLoop() { wait: select { - case idx := <-bootInstance: - instances = append(instances, idx) + case <-instances.Freed: + // An instance has been released. case stopRequest <- true: log.Logf(1, "loop: issued stop request") stopPending = true @@ -410,7 +395,7 @@ func (mgr *Manager) vmLoop() { log.Logf(0, "%v", res.err) } stopPending = false - instances = append(instances, res.idx) + instances.Put(res.idx) // On shutdown qemu crashes with "qemu: terminating on signal 2", // which we detect as "lost connection". Don't save that as crash. if shutdown != nil && res.crash != nil { @@ -434,8 +419,7 @@ func (mgr *Manager) vmLoop() { log.Logf(0, "repro failed: %v", res.err) } delete(reproducing, res.report0.Title) - instances = append(instances, res.instances...) - reproInstances -= instancesPerRepro + instances.Put(res.instances...) if res.res == nil { if !res.hub { mgr.saveFailedRepro(res.report0, res.stats) @@ -464,6 +448,79 @@ func (mgr *Manager) vmLoop() { } } +func (mgr *Manager) runRepro(crash *Crash, vmIndexes []int) *ReproResult { + features := mgr.checkResult.Features + res, stats, err := repro.Run(crash.Output, mgr.cfg, features, mgr.reporter, mgr.vmPool, vmIndexes) + return &ReproResult{ + instances: vmIndexes, + report0: crash.Report, + res: res, + stats: stats, + err: err, + hub: crash.hub, + } +} + +type ResourcePool struct { + ids []int + mu sync.RWMutex + Freed chan interface{} +} + +func SequentialResourcePool(count int, delay time.Duration) *ResourcePool { + ret := &ResourcePool{Freed: make(chan interface{}, 1)} + go func() { + for i := 0; i < count; i++ { + ret.Put(i) + time.Sleep(delay) + } + }() + return ret +} + +func (pool *ResourcePool) Put(ids ...int) { + pool.mu.Lock() + defer pool.mu.Unlock() + pool.ids = append(pool.ids, ids...) + // Notify the listener. + select { + case pool.Freed <- true: + default: + } +} + +func (pool *ResourcePool) Len() int { + pool.mu.RLock() + defer pool.mu.RUnlock() + return len(pool.ids) +} + +func (pool *ResourcePool) Snapshot() []int { + pool.mu.RLock() + defer pool.mu.RUnlock() + return append([]int{}, pool.ids...) +} + +func (pool *ResourcePool) Take(cnt int) []int { + pool.mu.Lock() + defer pool.mu.Unlock() + totalItems := len(pool.ids) + if totalItems < cnt { + return nil + } + ret := append([]int{}, pool.ids[totalItems-cnt:]...) + pool.ids = pool.ids[:totalItems-cnt] + return ret +} + +func (pool *ResourcePool) TakeOne() *int { + ret := pool.Take(1) + if ret == nil { + return nil + } + return &ret[0] +} + func (mgr *Manager) preloadCorpus() { log.Logf(0, "loading corpus...") corpusDB, err := db.Open(filepath.Join(mgr.cfg.Workdir, "corpus.db"), true) |
