aboutsummaryrefslogtreecommitdiffstats
path: root/syz-manager/manager.go
diff options
context:
space:
mode:
authorAleksandr Nogikh <nogikh@google.com>2024-07-05 15:41:55 +0200
committerAleksandr Nogikh <nogikh@google.com>2024-07-11 09:03:37 +0000
commit6ca1eb5a578bb1421ad0f3dbde675eb34647e6d7 (patch)
tree935c35d1225bcd55b0d413796168c758734af0f9 /syz-manager/manager.go
parentfc2b083017012b4afbd9324fc6525e34a19aa0b3 (diff)
all: transition to instance.Pool
Rely on instance.Pool to perform fuzzing and do bug reproductions. Extract the reproduction queue logic to separate testable class.
Diffstat (limited to 'syz-manager/manager.go')
-rw-r--r--syz-manager/manager.go377
1 files changed, 96 insertions, 281 deletions
diff --git a/syz-manager/manager.go b/syz-manager/manager.go
index 396ffb97e..e1c2ad050 100644
--- a/syz-manager/manager.go
+++ b/syz-manager/manager.go
@@ -47,6 +47,7 @@ import (
"github.com/google/syzkaller/prog"
"github.com/google/syzkaller/sys/targets"
"github.com/google/syzkaller/vm"
+ "github.com/google/syzkaller/vm/dispatcher"
)
var (
@@ -72,6 +73,7 @@ type Manager struct {
cfg *mgrconfig.Config
mode Mode
vmPool *vm.Pool
+ pool *dispatcher.Pool[*vm.Instance]
target *prog.Target
sysTarget *targets.Target
reporter *report.Reporter
@@ -108,9 +110,8 @@ type Manager struct {
dataRaceFrames map[string]bool
saturatedCalls map[string]bool
- needMoreRepros chan chan bool
externalReproQueue chan *Crash
- reproRequest chan chan map[string]bool
+ crashes chan *Crash
// For checking that files that we are using are not changing under us.
// Maps file name to modification time.
@@ -123,6 +124,8 @@ type Manager struct {
bootTime stats.AverageValue[time.Duration]
+ reproMgr *reproManager
+
Stats
}
@@ -239,12 +242,15 @@ func RunManager(cfg *mgrconfig.Config) {
fresh: true,
vmStop: make(chan bool),
externalReproQueue: make(chan *Crash, 10),
- needMoreRepros: make(chan chan bool),
- reproRequest: make(chan chan map[string]bool),
+ crashes: make(chan *Crash, 10),
usedFiles: make(map[string]time.Time),
saturatedCalls: make(map[string]bool),
}
+ if *flagDebug {
+ mgr.cfg.Procs = 1
+ }
+
mgr.initStats()
if mode == ModeFuzzing || mode == ModeCorpusTriage {
go mgr.preloadCorpus()
@@ -297,7 +303,13 @@ func RunManager(cfg *mgrconfig.Config) {
<-vm.Shutdown
return
}
- mgr.vmLoop()
+ ctx := vm.ShutdownCtx()
+ mgr.pool = vm.NewDispatcher(mgr.vmPool, mgr.fuzzerInstance)
+ mgr.reproMgr = newReproManager(mgr, mgr.vmPool.Count()-mgr.cfg.FuzzingVMs, mgr.cfg.DashboardOnlyRepro)
+ go mgr.processFuzzingResults(ctx)
+ go mgr.checkUsedFiles()
+ go mgr.reproMgr.Loop(ctx)
+ mgr.pool.Loop(ctx)
}
// Exit successfully in special operation modes.
@@ -358,14 +370,7 @@ func (mgr *Manager) writeBench() {
}
}
-type RunResult struct {
- idx int
- crash *Crash
- err error
-}
-
type ReproResult struct {
- instances []int
report0 *report.Report // the original report we started reproducing
repro *repro.Result
strace *repro.StraceResult
@@ -376,135 +381,25 @@ type ReproResult struct {
originalTitle string // crash title before we started bug reproduction
}
-// Manager needs to be refactored (#605).
-// nolint: gocyclo, gocognit, funlen
-func (mgr *Manager) vmLoop() {
- log.Logf(0, "booting test machines...")
- log.Logf(0, "wait for the connection from test machine...")
- instancesPerRepro := 3
- vmCount := mgr.vmPool.Count()
- maxReproVMs := vmCount - mgr.cfg.FuzzingVMs
- if instancesPerRepro > maxReproVMs && maxReproVMs > 0 {
- instancesPerRepro = maxReproVMs
- }
- instances := SequentialResourcePool(vmCount, 5*time.Second)
- runDone := make(chan *RunResult, 1)
- pendingRepro := make(map[*Crash]bool)
- reproducing := make(map[string]bool)
- attemptedRepros := make(map[string]bool)
- var reproQueue []*Crash
- reproDone := make(chan *ReproResult, 1)
- stopPending := false
- shutdown := vm.Shutdown
- for shutdown != nil || instances.Len() != vmCount {
- mgr.mu.Lock()
- phase := mgr.phase
- mgr.mu.Unlock()
-
- for crash := range pendingRepro {
- if reproducing[crash.Title] {
- continue
- }
- delete(pendingRepro, crash)
- if !mgr.needRepro(crash) {
- continue
- }
- if mgr.cfg.DashboardOnlyRepro && attemptedRepros[crash.Title] {
- // Try to reproduce each bug at most 1 time in this mode.
- // Since we don't upload bugs/repros to dashboard, it likely won't have
- // the reproducer even if we succeeded last time, and will repeatedly
- // say it needs a repro.
- continue
- }
- log.Logf(1, "loop: add to repro queue '%v'", crash.Title)
- attemptedRepros[crash.Title] = true
- reproducing[crash.Title] = true
- reproQueue = append(reproQueue, crash)
- }
-
- log.Logf(1, "loop: phase=%v shutdown=%v instances=%v/%v %+v repro: pending=%v reproducing=%v queued=%v",
- phase, shutdown == nil, instances.Len(), vmCount, instances.Snapshot(),
- len(pendingRepro), len(reproducing), len(reproQueue))
-
- canRepro := func() bool {
- return phase >= phaseTriagedHub && len(reproQueue) != 0 &&
- (mgr.statNumReproducing.Val()+1)*instancesPerRepro <= maxReproVMs
- }
-
- if shutdown != nil {
- for canRepro() {
- vmIndexes := instances.Take(instancesPerRepro)
- if vmIndexes == nil {
- break
- }
- last := len(reproQueue) - 1
- crash := reproQueue[last]
- reproQueue[last] = nil
- reproQueue = reproQueue[:last]
- mgr.statNumReproducing.Add(1)
- log.Logf(0, "loop: starting repro of '%v' on instances %+v", crash.Title, vmIndexes)
- go func() {
- reproDone <- mgr.runRepro(crash, vmIndexes, instances.Put)
- }()
- }
- for !canRepro() {
- idx := instances.TakeOne()
- if idx == nil {
- break
- }
- log.Logf(1, "loop: starting instance %v", *idx)
- go func() {
- crash, err := mgr.runInstance(*idx)
- runDone <- &RunResult{*idx, crash, err}
- }()
- }
- }
-
- var stopRequest chan bool
- if !stopPending && canRepro() {
- stopRequest = mgr.vmStop
- }
-
- wait:
+func (mgr *Manager) processFuzzingResults(ctx context.Context) {
+ for {
select {
- case <-instances.Freed:
- // An instance has been released.
- case stopRequest <- true:
- log.Logf(1, "loop: issued stop request")
- stopPending = true
- case res := <-runDone:
- log.Logf(1, "loop: instance %v finished, crash=%v", res.idx, res.crash != nil)
- if res.err != nil && shutdown != nil {
- log.Logf(0, "%v", res.err)
- }
- stopPending = false
- instances.Put(res.idx)
- // On shutdown qemu crashes with "qemu: terminating on signal 2",
- // which we detect as "lost connection". Don't save that as crash.
- if shutdown != nil && res.crash != nil {
- needRepro := mgr.saveCrash(res.crash)
- if needRepro {
- log.Logf(1, "loop: add pending repro for '%v'", res.crash.Title)
- pendingRepro[res.crash] = true
- }
+ case <-ctx.Done():
+ return
+ case crash := <-mgr.crashes:
+ needRepro := mgr.saveCrash(crash)
+ if mgr.cfg.Reproduce && needRepro {
+ mgr.reproMgr.Enqueue(crash)
}
- case res := <-reproDone:
- mgr.statNumReproducing.Add(-1)
- crepro := false
- title := ""
- if res.repro != nil {
- crepro = res.repro.CRepro
- title = res.repro.Report.Title
+ case err := <-mgr.pool.BootErrors:
+ crash := mgr.convertBootError(err)
+ if crash != nil {
+ mgr.saveCrash(crash)
}
- log.Logf(0, "loop: repro on %+v finished '%v', repro=%v crepro=%v desc='%v'"+
- " hub=%v from_dashboard=%v",
- res.instances, res.report0.Title, res.repro != nil, crepro, title,
- res.fromHub, res.fromDashboard,
- )
+ case res := <-mgr.reproMgr.Done:
if res.err != nil {
reportReproError(res.err)
}
- delete(reproducing, res.report0.Title)
if res.repro == nil {
if res.fromHub {
log.Logf(1, "repro '%v' came from syz-hub, not reporting the failure",
@@ -516,25 +411,34 @@ func (mgr *Manager) vmLoop() {
} else {
mgr.saveRepro(res)
}
- case <-shutdown:
- log.Logf(1, "loop: shutting down...")
- shutdown = nil
case crash := <-mgr.externalReproQueue:
- log.Logf(1, "loop: got repro request")
- pendingRepro[crash] = true
- case reply := <-mgr.needMoreRepros:
- reply <- phase >= phaseTriagedHub &&
- len(reproQueue)+len(pendingRepro)+len(reproducing) == 0
- goto wait
- case reply := <-mgr.reproRequest:
- repros := make(map[string]bool)
- for title := range reproducing {
- repros[title] = true
+ if mgr.needRepro(crash) {
+ mgr.reproMgr.Enqueue(crash)
+ }
+ }
+ }
+}
+
+func (mgr *Manager) convertBootError(err error) *Crash {
+ var bootErr vm.BootErrorer
+ if errors.As(err, &bootErr) {
+ title, output := bootErr.BootError()
+ rep := mgr.reporter.Parse(output)
+ if rep != nil && rep.Type == crash_pkg.UnexpectedReboot {
+ // Avoid detecting any boot crash as "unexpected kernel reboot".
+ rep = mgr.reporter.ParseFrom(output, rep.SkipPos)
+ }
+ if rep == nil {
+ rep = &report.Report{
+ Title: title,
+ Output: output,
}
- reply <- repros
- goto wait
+ }
+ return &Crash{
+ Report: rep,
}
}
+ return nil
}
func reportReproError(err error) {
@@ -560,10 +464,9 @@ func reportReproError(err error) {
log.Errorf("repro failed: %v", err)
}
-func (mgr *Manager) runRepro(crash *Crash, vmIndexes []int, putInstances func(...int)) *ReproResult {
- res, stats, err := repro.Run(crash.Output, mgr.cfg, mgr.enabledFeatures, mgr.reporter, mgr.vmPool, vmIndexes)
+func (mgr *Manager) runRepro(crash *Crash) *ReproResult {
+ res, stats, err := repro.Run(crash.Output, mgr.cfg, mgr.enabledFeatures, mgr.reporter, mgr.pool)
ret := &ReproResult{
- instances: vmIndexes,
report0: crash.Report,
repro: res,
stats: stats,
@@ -573,13 +476,9 @@ func (mgr *Manager) runRepro(crash *Crash, vmIndexes []int, putInstances func(..
originalTitle: crash.Title,
}
if err == nil && res != nil && mgr.cfg.StraceBin != "" {
- // We need only one instance to get strace output, release the rest.
- putInstances(vmIndexes[1:]...)
- defer putInstances(vmIndexes[0])
-
const straceAttempts = 2
for i := 1; i <= straceAttempts; i++ {
- strace := repro.RunStrace(res, mgr.cfg, mgr.reporter, mgr.vmPool, vmIndexes[0])
+ strace := repro.RunStrace(res, mgr.cfg, mgr.reporter, mgr.pool)
sameBug := strace.IsSameBug(res)
log.Logf(0, "strace run attempt %d/%d for '%s': same bug %v, error %v",
i, straceAttempts, res.Report.Title, sameBug, strace.Error)
@@ -590,72 +489,10 @@ func (mgr *Manager) runRepro(crash *Crash, vmIndexes []int, putInstances func(..
break
}
}
- } else {
- putInstances(vmIndexes...)
}
return ret
}
-type ResourcePool struct {
- ids []int
- mu sync.RWMutex
- Freed chan interface{}
-}
-
-func SequentialResourcePool(count int, delay time.Duration) *ResourcePool {
- ret := &ResourcePool{Freed: make(chan interface{}, 1)}
- go func() {
- for i := 0; i < count; i++ {
- ret.Put(i)
- time.Sleep(delay)
- }
- }()
- return ret
-}
-
-func (pool *ResourcePool) Put(ids ...int) {
- pool.mu.Lock()
- defer pool.mu.Unlock()
- pool.ids = append(pool.ids, ids...)
- // Notify the listener.
- select {
- case pool.Freed <- true:
- default:
- }
-}
-
-func (pool *ResourcePool) Len() int {
- pool.mu.RLock()
- defer pool.mu.RUnlock()
- return len(pool.ids)
-}
-
-func (pool *ResourcePool) Snapshot() []int {
- pool.mu.RLock()
- defer pool.mu.RUnlock()
- return append([]int{}, pool.ids...)
-}
-
-func (pool *ResourcePool) Take(cnt int) []int {
- pool.mu.Lock()
- defer pool.mu.Unlock()
- totalItems := len(pool.ids)
- if totalItems < cnt {
- return nil
- }
- ret := append([]int{}, pool.ids[totalItems-cnt:]...)
- pool.ids = pool.ids[:totalItems-cnt]
- return ret
-}
-
-func (pool *ResourcePool) TakeOne() *int {
- ret := pool.Take(1)
- if ret == nil {
- return nil
- }
- return &ret[0]
-}
-
func (mgr *Manager) preloadCorpus() {
corpusDB, err := db.Open(filepath.Join(mgr.cfg.Workdir, "corpus.db"), true)
if err != nil {
@@ -857,13 +694,13 @@ func containsDisabled(p *prog.Prog, enabled map[*prog.Syscall]bool) bool {
return false
}
-func (mgr *Manager) runInstance(index int) (*Crash, error) {
- mgr.checkUsedFiles()
+func (mgr *Manager) fuzzerInstance(ctx context.Context, inst *vm.Instance, updInfo dispatcher.UpdateInfo) {
+ index := inst.Index()
instanceName := fmt.Sprintf("vm-%d", index)
injectExec := make(chan bool, 10)
- mgr.serv.CreateInstance(instanceName, injectExec)
+ mgr.serv.CreateInstance(instanceName, injectExec, updInfo)
- rep, vmInfo, err := mgr.runInstanceInner(index, instanceName, injectExec)
+ rep, vmInfo, err := mgr.runInstanceInner(ctx, inst, instanceName, injectExec)
lastExec, machineInfo := mgr.serv.ShutdownInstance(instanceName, rep != nil)
if rep != nil {
prependExecuting(rep, lastExec)
@@ -872,48 +709,21 @@ func (mgr *Manager) runInstance(index int) (*Crash, error) {
}
rep.MachineInfo = machineInfo
}
-
- // Error that is not a VM crash.
- if err != nil {
- return nil, err
- }
- // No crash.
- if rep == nil {
- return nil, nil
+ if err == nil && rep != nil {
+ mgr.crashes <- &Crash{
+ instanceName: instanceName,
+ Report: rep,
+ }
}
- crash := &Crash{
- instanceName: instanceName,
- Report: rep,
+ if err != nil {
+ log.Logf(1, "%s: failed with error: %v", instanceName, err)
}
- return crash, nil
}
-func (mgr *Manager) runInstanceInner(index int, instanceName string, injectExec <-chan bool) (
- *report.Report, []byte, error) {
+func (mgr *Manager) runInstanceInner(ctx context.Context, inst *vm.Instance, instanceName string,
+ injectExec <-chan bool) (*report.Report, []byte, error) {
start := time.Now()
- inst, err := mgr.vmPool.Create(index)
- if err != nil {
- var bootErr vm.BootErrorer
- if errors.As(err, &bootErr) {
- title, output := bootErr.BootError()
- rep := mgr.reporter.Parse(output)
- if rep != nil && rep.Type == crash_pkg.UnexpectedReboot {
- // Avoid detecting any boot crash as "unexpected kernel reboot".
- rep = mgr.reporter.ParseFrom(output, rep.SkipPos)
- }
- if rep == nil {
- rep = &report.Report{
- Title: title,
- Output: output,
- }
- }
- return rep, nil, nil
- }
- return nil, nil, fmt.Errorf("failed to create instance: %w", err)
- }
- defer inst.Close()
-
fwdAddr, err := inst.Forward(mgr.serv.Port)
if err != nil {
return nil, nil, fmt.Errorf("failed to setup port forwarding: %w", err)
@@ -929,10 +739,6 @@ func (mgr *Manager) runInstanceInner(index int, instanceName string, injectExec
}
}
- if *flagDebug {
- mgr.cfg.Procs = 1
- }
-
// Run the fuzzer binary.
mgr.bootTime.Save(time.Since(start))
start = time.Now()
@@ -940,7 +746,7 @@ func (mgr *Manager) runInstanceInner(index int, instanceName string, injectExec
addrPort := strings.Split(fwdAddr, ":")
cmd := fmt.Sprintf("%v runner %v %v %v", executorBin, instanceName, addrPort[0], addrPort[1])
_, rep, err := inst.Run(mgr.cfg.Timeouts.VMRunningTime, mgr.reporter, cmd,
- vm.ExitTimeout, vm.StopChan(mgr.vmStop), vm.InjectExecuting(injectExec),
+ vm.ExitTimeout, vm.StopContext(ctx), vm.InjectExecuting(injectExec),
vm.EarlyFinishCb(func() {
// Depending on the crash type and kernel config, fuzzing may continue
// running for several seconds even after kernel has printed a crash report.
@@ -1314,6 +1120,10 @@ func (mgr *Manager) saveRepro(res *ReproResult) {
}
}
+func (mgr *Manager) resizeReproPool(size int) {
+ mgr.pool.ReserveForRun(size)
+}
+
func (mgr *Manager) uploadReproAssets(repro *repro.Result) []dashapi.NewAsset {
if mgr.assetStorage == nil {
return nil
@@ -1659,7 +1469,7 @@ func (mgr *Manager) fuzzerLoop(fuzzer *fuzzer.Fuzzer) {
if mgr.cfg.Cover {
// Distribute new max signal over all instances.
newSignal := fuzzer.Cover.GrabSignalDelta()
- log.Logf(2, "distributing %d new signal", len(newSignal))
+ log.Logf(3, "distributing %d new signal", len(newSignal))
if len(newSignal) != 0 {
mgr.serv.DistributeSignalDelta(newSignal)
}
@@ -1680,9 +1490,11 @@ func (mgr *Manager) fuzzerLoop(fuzzer *fuzzer.Fuzzer) {
go mgr.hubSyncLoop(pickGetter(mgr.cfg.HubKey))
} else {
mgr.phase = phaseTriagedHub
+ mgr.reproMgr.StartReproduction()
}
} else if mgr.phase == phaseQueriedHub {
mgr.phase = phaseTriagedHub
+ mgr.reproMgr.StartReproduction()
}
mgr.mu.Unlock()
}
@@ -1695,6 +1507,7 @@ func (mgr *Manager) hubIsUnreachable() {
if mgr.phase == phaseTriagedCorpus {
dash = mgr.dash
mgr.phase = phaseTriagedHub
+ mgr.reproMgr.StartReproduction()
log.Errorf("did not manage to connect to syz-hub; moving forward")
}
mgr.mu.Unlock()
@@ -1730,17 +1543,19 @@ func (mgr *Manager) collectUsedFiles() {
}
func (mgr *Manager) checkUsedFiles() {
- for f, mod := range mgr.usedFiles {
- stat, err := os.Stat(f)
- if err != nil {
- log.Fatalf("failed to stat %v: %v", f, err)
- }
- if mod != stat.ModTime() {
- log.Fatalf("file %v that syz-manager uses has been modified by an external program\n"+
- "this can lead to arbitrary syz-manager misbehavior\n"+
- "modification time has changed: %v -> %v\n"+
- "don't modify files that syz-manager uses. exiting to prevent harm",
- f, mod, stat.ModTime())
+ for range time.NewTicker(30 * time.Second).C {
+ for f, mod := range mgr.usedFiles {
+ stat, err := os.Stat(f)
+ if err != nil {
+ log.Fatalf("failed to stat %v: %v", f, err)
+ }
+ if mod != stat.ModTime() {
+ log.Fatalf("file %v that syz-manager uses has been modified by an external program\n"+
+ "this can lead to arbitrary syz-manager misbehavior\n"+
+ "modification time has changed: %v -> %v\n"+
+ "don't modify files that syz-manager uses. exiting to prevent harm",
+ f, mod, stat.ModTime())
+ }
}
}
}
@@ -1786,10 +1601,9 @@ func (mgr *Manager) dashboardReporter() {
}
func (mgr *Manager) dashboardReproTasks() {
+ seq := 0
for range time.NewTicker(20 * time.Minute).C {
- needReproReply := make(chan bool)
- mgr.needMoreRepros <- needReproReply
- if !<-needReproReply {
+ if !mgr.reproMgr.CanReproMore() {
// We don't need reproducers at the moment.
continue
}
@@ -1801,7 +1615,8 @@ func (mgr *Manager) dashboardReproTasks() {
if len(resp.CrashLog) > 0 {
title := resp.Title
if title == "" {
- title = "repro from the dashboard"
+ seq++
+ title = fmt.Sprintf("repro #%d from the dashboard", seq)
}
mgr.externalReproQueue <- &Crash{
fromDashboard: true,