aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksandr Nogikh <nogikh@google.com>2022-03-31 14:50:18 +0000
committerAleksandr Nogikh <wp32pw@gmail.com>2022-04-29 17:16:33 +0200
commit7f2625f8905f7d981475fa314c3d06c56c7bd4b0 (patch)
treedf492e86214a9d26fec980c1933d6628399a500e
parent44a5ca633e186c5836010366c515a4017836121b (diff)
syz-manager: refactor vmLoop()
Move instance management routines out of vmLoop(). Move part of repro logic out of vmLoop().
-rw-r--r--syz-manager/manager.go133
1 files changed, 95 insertions, 38 deletions
diff --git a/syz-manager/manager.go b/syz-manager/manager.go
index bb331b542..e34e59322 100644
--- a/syz-manager/manager.go
+++ b/syz-manager/manager.go
@@ -313,23 +313,15 @@ func (mgr *Manager) vmLoop() {
if instancesPerRepro > maxReproVMs && maxReproVMs > 0 {
instancesPerRepro = maxReproVMs
}
- bootInstance := make(chan int)
- go func() {
- for i := 0; i < vmCount; i++ {
- bootInstance <- i
- time.Sleep(10 * time.Second * mgr.cfg.Timeouts.Scale)
- }
- }()
- var instances []int
+ instances := SequentialResourcePool(vmCount, 10*time.Second*mgr.cfg.Timeouts.Scale)
runDone := make(chan *RunResult, 1)
pendingRepro := make(map[*Crash]bool)
reproducing := make(map[string]bool)
- reproInstances := 0
var reproQueue []*Crash
reproDone := make(chan *ReproResult, 1)
stopPending := false
shutdown := vm.Shutdown
- for shutdown != nil || len(instances) != vmCount {
+ for shutdown != nil || instances.Len() != vmCount {
mgr.mu.Lock()
phase := mgr.phase
mgr.mu.Unlock()
@@ -348,46 +340,39 @@ func (mgr *Manager) vmLoop() {
}
log.Logf(1, "loop: phase=%v shutdown=%v instances=%v/%v %+v repro: pending=%v reproducing=%v queued=%v",
- phase, shutdown == nil, len(instances), vmCount, instances,
+ phase, shutdown == nil, instances.Len(), vmCount, instances.Snapshot(),
len(pendingRepro), len(reproducing), len(reproQueue))
canRepro := func() bool {
return phase >= phaseTriagedHub && len(reproQueue) != 0 &&
- reproInstances+instancesPerRepro <= maxReproVMs
+ (int(atomic.LoadUint32(&mgr.numReproducing))+1)*instancesPerRepro <= maxReproVMs
}
if shutdown != nil {
- for canRepro() && len(instances) >= instancesPerRepro {
+ for canRepro() {
+ vmIndexes := instances.Take(instancesPerRepro)
+ if vmIndexes == nil {
+ break
+ }
last := len(reproQueue) - 1
crash := reproQueue[last]
reproQueue[last] = nil
reproQueue = reproQueue[:last]
- vmIndexes := append([]int{}, instances[len(instances)-instancesPerRepro:]...)
- instances = instances[:len(instances)-instancesPerRepro]
- reproInstances += instancesPerRepro
atomic.AddUint32(&mgr.numReproducing, 1)
log.Logf(1, "loop: starting repro of '%v' on instances %+v", crash.Title, vmIndexes)
go func() {
- features := mgr.checkResult.Features
- res, stats, err := repro.Run(crash.Output, mgr.cfg, features, mgr.reporter, mgr.vmPool, vmIndexes)
- reproDone <- &ReproResult{
- instances: vmIndexes,
- report0: crash.Report,
- res: res,
- stats: stats,
- err: err,
- hub: crash.hub,
- }
+ reproDone <- mgr.runRepro(crash, vmIndexes)
}()
}
- for !canRepro() && len(instances) != 0 {
- last := len(instances) - 1
- idx := instances[last]
- instances = instances[:last]
- log.Logf(1, "loop: starting instance %v", idx)
+ for !canRepro() {
+ idx := instances.TakeOne()
+ if idx == nil {
+ break
+ }
+ log.Logf(1, "loop: starting instance %v", *idx)
go func() {
- crash, err := mgr.runInstance(idx)
- runDone <- &RunResult{idx, crash, err}
+ crash, err := mgr.runInstance(*idx)
+ runDone <- &RunResult{*idx, crash, err}
}()
}
}
@@ -399,8 +384,8 @@ func (mgr *Manager) vmLoop() {
wait:
select {
- case idx := <-bootInstance:
- instances = append(instances, idx)
+ case <-instances.Freed:
+ // An instance has been released.
case stopRequest <- true:
log.Logf(1, "loop: issued stop request")
stopPending = true
@@ -410,7 +395,7 @@ func (mgr *Manager) vmLoop() {
log.Logf(0, "%v", res.err)
}
stopPending = false
- instances = append(instances, res.idx)
+ instances.Put(res.idx)
// On shutdown qemu crashes with "qemu: terminating on signal 2",
// which we detect as "lost connection". Don't save that as crash.
if shutdown != nil && res.crash != nil {
@@ -434,8 +419,7 @@ func (mgr *Manager) vmLoop() {
log.Logf(0, "repro failed: %v", res.err)
}
delete(reproducing, res.report0.Title)
- instances = append(instances, res.instances...)
- reproInstances -= instancesPerRepro
+ instances.Put(res.instances...)
if res.res == nil {
if !res.hub {
mgr.saveFailedRepro(res.report0, res.stats)
@@ -464,6 +448,79 @@ func (mgr *Manager) vmLoop() {
}
}
+func (mgr *Manager) runRepro(crash *Crash, vmIndexes []int) *ReproResult {
+ features := mgr.checkResult.Features
+ res, stats, err := repro.Run(crash.Output, mgr.cfg, features, mgr.reporter, mgr.vmPool, vmIndexes)
+ return &ReproResult{
+ instances: vmIndexes,
+ report0: crash.Report,
+ res: res,
+ stats: stats,
+ err: err,
+ hub: crash.hub,
+ }
+}
+
+type ResourcePool struct {
+ ids []int
+ mu sync.RWMutex
+ Freed chan interface{}
+}
+
+func SequentialResourcePool(count int, delay time.Duration) *ResourcePool {
+ ret := &ResourcePool{Freed: make(chan interface{}, 1)}
+ go func() {
+ for i := 0; i < count; i++ {
+ ret.Put(i)
+ time.Sleep(delay)
+ }
+ }()
+ return ret
+}
+
+func (pool *ResourcePool) Put(ids ...int) {
+ pool.mu.Lock()
+ defer pool.mu.Unlock()
+ pool.ids = append(pool.ids, ids...)
+ // Notify the listener.
+ select {
+ case pool.Freed <- true:
+ default:
+ }
+}
+
+func (pool *ResourcePool) Len() int {
+ pool.mu.RLock()
+ defer pool.mu.RUnlock()
+ return len(pool.ids)
+}
+
+func (pool *ResourcePool) Snapshot() []int {
+ pool.mu.RLock()
+ defer pool.mu.RUnlock()
+ return append([]int{}, pool.ids...)
+}
+
+func (pool *ResourcePool) Take(cnt int) []int {
+ pool.mu.Lock()
+ defer pool.mu.Unlock()
+ totalItems := len(pool.ids)
+ if totalItems < cnt {
+ return nil
+ }
+ ret := append([]int{}, pool.ids[totalItems-cnt:]...)
+ pool.ids = pool.ids[:totalItems-cnt]
+ return ret
+}
+
+func (pool *ResourcePool) TakeOne() *int {
+ ret := pool.Take(1)
+ if ret == nil {
+ return nil
+ }
+ return &ret[0]
+}
+
func (mgr *Manager) preloadCorpus() {
log.Logf(0, "loading corpus...")
corpusDB, err := db.Open(filepath.Join(mgr.cfg.Workdir, "corpus.db"), true)