diff options
| author | Aleksandr Nogikh <nogikh@google.com> | 2024-07-05 15:41:55 +0200 |
|---|---|---|
| committer | Aleksandr Nogikh <nogikh@google.com> | 2024-07-11 09:03:37 +0000 |
| commit | 6ca1eb5a578bb1421ad0f3dbde675eb34647e6d7 (patch) | |
| tree | 935c35d1225bcd55b0d413796168c758734af0f9 /syz-manager | |
| parent | fc2b083017012b4afbd9324fc6525e34a19aa0b3 (diff) | |
all: transition to instance.Pool
Rely on instance.Pool to perform fuzzing and do bug reproductions.
Extract the reproduction queue logic to separate testable class.
Diffstat (limited to 'syz-manager')
| -rw-r--r-- | syz-manager/http.go | 73 | ||||
| -rw-r--r-- | syz-manager/hub.go | 12 | ||||
| -rw-r--r-- | syz-manager/manager.go | 377 | ||||
| -rw-r--r-- | syz-manager/repro.go | 209 | ||||
| -rw-r--r-- | syz-manager/repro_test.go | 94 | ||||
| -rw-r--r-- | syz-manager/stats.go | 17 |
6 files changed, 455 insertions, 327 deletions
diff --git a/syz-manager/http.go b/syz-manager/http.go index ab54f86e5..f51014667 100644 --- a/syz-manager/http.go +++ b/syz-manager/http.go @@ -24,10 +24,10 @@ import ( "github.com/google/syzkaller/pkg/html/pages" "github.com/google/syzkaller/pkg/log" "github.com/google/syzkaller/pkg/osutil" - "github.com/google/syzkaller/pkg/rpcserver" "github.com/google/syzkaller/pkg/stats" "github.com/google/syzkaller/pkg/vcs" "github.com/google/syzkaller/prog" + "github.com/google/syzkaller/vm/dispatcher" "github.com/gorilla/handlers" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -172,40 +172,55 @@ func (mgr *Manager) httpVMs(w http.ResponseWriter, r *http.Request) { } // TODO: we could also query vmLoop for VMs that are idle (waiting to start reproducing), // and query the exact bug that is being reproduced by a VM. - for name, state := range mgr.serv.VMState() { + for id, state := range mgr.pool.State() { + name := fmt.Sprintf("#%d", id) info := UIVMInfo{ Name: name, State: "unknown", - Since: time.Since(state.Timestamp), + Since: time.Since(state.LastUpdate), } switch state.State { - case rpcserver.StateOffline: - info.State = "reproducing" - case rpcserver.StateBooting: + case dispatcher.StateOffline: + info.State = "offline" + case dispatcher.StateBooting: info.State = "booting" - case rpcserver.StateFuzzing: - info.State = "fuzzing" - info.MachineInfo = fmt.Sprintf("/vm?type=machine-info&name=%v", name) - info.RunnerStatus = fmt.Sprintf("/vm?type=runner-status&name=%v", name) - case rpcserver.StateStopping: - info.State = "crashed" + case dispatcher.StateWaiting: + info.State = "waiting" + case dispatcher.StateRunning: + info.State = "running: " + state.Status + } + if state.Reserved { + info.State = "[reserved] " + info.State + } + if state.MachineInfo != nil { + info.MachineInfo = fmt.Sprintf("/vm?type=machine-info&id=%d", id) + } + if state.DetailedStatus != nil { + info.DetailedStatus = fmt.Sprintf("/vm?type=detailed-status&id=%v", id) } data.VMs = append(data.VMs, info) } - sort.Slice(data.VMs, func(i, j int) bool { - return data.VMs[i].Name < data.VMs[j].Name - }) executeTemplate(w, vmsTemplate, data) } func (mgr *Manager) httpVM(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", ctTextPlain) - vm := r.FormValue("name") + id, err := strconv.Atoi(r.FormValue("id")) + infos := mgr.pool.State() + if err != nil || id < 0 || id >= len(infos) { + http.Error(w, "invalid instance id", http.StatusBadRequest) + return + } + info := infos[id] switch r.FormValue("type") { case "machine-info": - w.Write(mgr.serv.MachineInfo(vm)) - case "runner-status": - w.Write(mgr.serv.RunnerStatus(vm)) + if info.MachineInfo != nil { + w.Write(info.MachineInfo()) + } + case "detailed-status": + if info.DetailedStatus != nil { + w.Write(info.DetailedStatus()) + } default: w.Write([]byte("unknown info type")) } @@ -619,10 +634,8 @@ func (mgr *Manager) httpFilterPCs(w http.ResponseWriter, r *http.Request) { func (mgr *Manager) collectCrashes(workdir string) ([]*UICrashType, error) { // Note: mu is not locked here. var repros map[string]bool - if !mgr.cfg.VMLess { - reproReply := make(chan map[string]bool) - mgr.reproRequest <- reproReply - repros = <-reproReply + if !mgr.cfg.VMLess && mgr.reproMgr != nil { + repros = mgr.reproMgr.Reproducing() } crashdir := filepath.Join(workdir, "crashes") @@ -777,11 +790,11 @@ type UIVMData struct { } type UIVMInfo struct { - Name string - State string - Since time.Duration - MachineInfo string - RunnerStatus string + Name string + State string + Since time.Duration + MachineInfo string + DetailedStatus string } type UISyscallsData struct { @@ -918,7 +931,7 @@ var vmsTemplate = pages.Create(` <th><a onclick="return sortTable(this, 'State', textSort)" href="#">State</a></th> <th><a onclick="return sortTable(this, 'Since', timeSort)" href="#">Since</a></th> <th><a onclick="return sortTable(this, 'Machine Info', timeSort)" href="#">Machine Info</a></th> - <th><a onclick="return sortTable(this, 'Runner Status', timeSort)" href="#">Runner Status</a></th> + <th><a onclick="return sortTable(this, 'Status', timeSort)" href="#">Status</a></th> </tr> {{range $vm := $.VMs}} <tr> @@ -926,7 +939,7 @@ var vmsTemplate = pages.Create(` <td>{{$vm.State}}</td> <td>{{formatDuration $vm.Since}}</td> <td>{{optlink $vm.MachineInfo "info"}}</td> - <td>{{optlink $vm.RunnerStatus "status"}}</td> + <td>{{optlink $vm.DetailedStatus "status"}}</td> </tr> {{end}} </table> diff --git a/syz-manager/hub.go b/syz-manager/hub.go index 218617060..7f0c847a0 100644 --- a/syz-manager/hub.go +++ b/syz-manager/hub.go @@ -7,6 +7,7 @@ import ( "fmt" "net/http" "strings" + "sync/atomic" "time" "github.com/google/syzkaller/pkg/auth" @@ -59,7 +60,7 @@ func (mgr *Manager) hubSyncLoop(keyGet keyGetter) { statRecvReproDrop: stats.Create("hub recv repro drop", "", stats.NoGraph), } if mgr.cfg.Reproduce && mgr.dash != nil { - hc.needMoreRepros = mgr.needMoreRepros + hc.needMoreRepros = mgr.reproMgr.CanReproMore } hc.loop() } @@ -75,8 +76,9 @@ type HubConnector struct { hubCorpus map[string]bool newRepros [][]byte hubReproQueue chan *Crash - needMoreRepros chan chan bool + needMoreRepros func() bool keyGet keyGetter + reproSeq atomic.Int64 statSendProgAdd *stats.Val statSendProgDel *stats.Val @@ -199,9 +201,7 @@ func (hc *HubConnector) sync(hub *rpctype.RPCClient, corpus []*corpus.Item) erro a.Del = append(a.Del, sig) } if hc.needMoreRepros != nil { - needReproReply := make(chan bool) - hc.needMoreRepros <- needReproReply - a.NeedRepros = <-needReproReply + a.NeedRepros = hc.needMoreRepros() } a.Repros = hc.newRepros for { @@ -307,7 +307,7 @@ func (hc *HubConnector) processRepros(repros [][]byte) int { hc.hubReproQueue <- &Crash{ fromHub: true, Report: &report.Report{ - Title: "external repro", + Title: fmt.Sprintf("external repro #%d", hc.reproSeq.Add(1)), Type: typ, Output: repro, }, diff --git a/syz-manager/manager.go b/syz-manager/manager.go index 396ffb97e..e1c2ad050 100644 --- a/syz-manager/manager.go +++ b/syz-manager/manager.go @@ -47,6 +47,7 @@ import ( "github.com/google/syzkaller/prog" "github.com/google/syzkaller/sys/targets" "github.com/google/syzkaller/vm" + "github.com/google/syzkaller/vm/dispatcher" ) var ( @@ -72,6 +73,7 @@ type Manager struct { cfg *mgrconfig.Config mode Mode vmPool *vm.Pool + pool *dispatcher.Pool[*vm.Instance] target *prog.Target sysTarget *targets.Target reporter *report.Reporter @@ -108,9 +110,8 @@ type Manager struct { dataRaceFrames map[string]bool saturatedCalls map[string]bool - needMoreRepros chan chan bool externalReproQueue chan *Crash - reproRequest chan chan map[string]bool + crashes chan *Crash // For checking that files that we are using are not changing under us. // Maps file name to modification time. @@ -123,6 +124,8 @@ type Manager struct { bootTime stats.AverageValue[time.Duration] + reproMgr *reproManager + Stats } @@ -239,12 +242,15 @@ func RunManager(cfg *mgrconfig.Config) { fresh: true, vmStop: make(chan bool), externalReproQueue: make(chan *Crash, 10), - needMoreRepros: make(chan chan bool), - reproRequest: make(chan chan map[string]bool), + crashes: make(chan *Crash, 10), usedFiles: make(map[string]time.Time), saturatedCalls: make(map[string]bool), } + if *flagDebug { + mgr.cfg.Procs = 1 + } + mgr.initStats() if mode == ModeFuzzing || mode == ModeCorpusTriage { go mgr.preloadCorpus() @@ -297,7 +303,13 @@ func RunManager(cfg *mgrconfig.Config) { <-vm.Shutdown return } - mgr.vmLoop() + ctx := vm.ShutdownCtx() + mgr.pool = vm.NewDispatcher(mgr.vmPool, mgr.fuzzerInstance) + mgr.reproMgr = newReproManager(mgr, mgr.vmPool.Count()-mgr.cfg.FuzzingVMs, mgr.cfg.DashboardOnlyRepro) + go mgr.processFuzzingResults(ctx) + go mgr.checkUsedFiles() + go mgr.reproMgr.Loop(ctx) + mgr.pool.Loop(ctx) } // Exit successfully in special operation modes. @@ -358,14 +370,7 @@ func (mgr *Manager) writeBench() { } } -type RunResult struct { - idx int - crash *Crash - err error -} - type ReproResult struct { - instances []int report0 *report.Report // the original report we started reproducing repro *repro.Result strace *repro.StraceResult @@ -376,135 +381,25 @@ type ReproResult struct { originalTitle string // crash title before we started bug reproduction } -// Manager needs to be refactored (#605). -// nolint: gocyclo, gocognit, funlen -func (mgr *Manager) vmLoop() { - log.Logf(0, "booting test machines...") - log.Logf(0, "wait for the connection from test machine...") - instancesPerRepro := 3 - vmCount := mgr.vmPool.Count() - maxReproVMs := vmCount - mgr.cfg.FuzzingVMs - if instancesPerRepro > maxReproVMs && maxReproVMs > 0 { - instancesPerRepro = maxReproVMs - } - instances := SequentialResourcePool(vmCount, 5*time.Second) - runDone := make(chan *RunResult, 1) - pendingRepro := make(map[*Crash]bool) - reproducing := make(map[string]bool) - attemptedRepros := make(map[string]bool) - var reproQueue []*Crash - reproDone := make(chan *ReproResult, 1) - stopPending := false - shutdown := vm.Shutdown - for shutdown != nil || instances.Len() != vmCount { - mgr.mu.Lock() - phase := mgr.phase - mgr.mu.Unlock() - - for crash := range pendingRepro { - if reproducing[crash.Title] { - continue - } - delete(pendingRepro, crash) - if !mgr.needRepro(crash) { - continue - } - if mgr.cfg.DashboardOnlyRepro && attemptedRepros[crash.Title] { - // Try to reproduce each bug at most 1 time in this mode. - // Since we don't upload bugs/repros to dashboard, it likely won't have - // the reproducer even if we succeeded last time, and will repeatedly - // say it needs a repro. - continue - } - log.Logf(1, "loop: add to repro queue '%v'", crash.Title) - attemptedRepros[crash.Title] = true - reproducing[crash.Title] = true - reproQueue = append(reproQueue, crash) - } - - log.Logf(1, "loop: phase=%v shutdown=%v instances=%v/%v %+v repro: pending=%v reproducing=%v queued=%v", - phase, shutdown == nil, instances.Len(), vmCount, instances.Snapshot(), - len(pendingRepro), len(reproducing), len(reproQueue)) - - canRepro := func() bool { - return phase >= phaseTriagedHub && len(reproQueue) != 0 && - (mgr.statNumReproducing.Val()+1)*instancesPerRepro <= maxReproVMs - } - - if shutdown != nil { - for canRepro() { - vmIndexes := instances.Take(instancesPerRepro) - if vmIndexes == nil { - break - } - last := len(reproQueue) - 1 - crash := reproQueue[last] - reproQueue[last] = nil - reproQueue = reproQueue[:last] - mgr.statNumReproducing.Add(1) - log.Logf(0, "loop: starting repro of '%v' on instances %+v", crash.Title, vmIndexes) - go func() { - reproDone <- mgr.runRepro(crash, vmIndexes, instances.Put) - }() - } - for !canRepro() { - idx := instances.TakeOne() - if idx == nil { - break - } - log.Logf(1, "loop: starting instance %v", *idx) - go func() { - crash, err := mgr.runInstance(*idx) - runDone <- &RunResult{*idx, crash, err} - }() - } - } - - var stopRequest chan bool - if !stopPending && canRepro() { - stopRequest = mgr.vmStop - } - - wait: +func (mgr *Manager) processFuzzingResults(ctx context.Context) { + for { select { - case <-instances.Freed: - // An instance has been released. - case stopRequest <- true: - log.Logf(1, "loop: issued stop request") - stopPending = true - case res := <-runDone: - log.Logf(1, "loop: instance %v finished, crash=%v", res.idx, res.crash != nil) - if res.err != nil && shutdown != nil { - log.Logf(0, "%v", res.err) - } - stopPending = false - instances.Put(res.idx) - // On shutdown qemu crashes with "qemu: terminating on signal 2", - // which we detect as "lost connection". Don't save that as crash. - if shutdown != nil && res.crash != nil { - needRepro := mgr.saveCrash(res.crash) - if needRepro { - log.Logf(1, "loop: add pending repro for '%v'", res.crash.Title) - pendingRepro[res.crash] = true - } + case <-ctx.Done(): + return + case crash := <-mgr.crashes: + needRepro := mgr.saveCrash(crash) + if mgr.cfg.Reproduce && needRepro { + mgr.reproMgr.Enqueue(crash) } - case res := <-reproDone: - mgr.statNumReproducing.Add(-1) - crepro := false - title := "" - if res.repro != nil { - crepro = res.repro.CRepro - title = res.repro.Report.Title + case err := <-mgr.pool.BootErrors: + crash := mgr.convertBootError(err) + if crash != nil { + mgr.saveCrash(crash) } - log.Logf(0, "loop: repro on %+v finished '%v', repro=%v crepro=%v desc='%v'"+ - " hub=%v from_dashboard=%v", - res.instances, res.report0.Title, res.repro != nil, crepro, title, - res.fromHub, res.fromDashboard, - ) + case res := <-mgr.reproMgr.Done: if res.err != nil { reportReproError(res.err) } - delete(reproducing, res.report0.Title) if res.repro == nil { if res.fromHub { log.Logf(1, "repro '%v' came from syz-hub, not reporting the failure", @@ -516,25 +411,34 @@ func (mgr *Manager) vmLoop() { } else { mgr.saveRepro(res) } - case <-shutdown: - log.Logf(1, "loop: shutting down...") - shutdown = nil case crash := <-mgr.externalReproQueue: - log.Logf(1, "loop: got repro request") - pendingRepro[crash] = true - case reply := <-mgr.needMoreRepros: - reply <- phase >= phaseTriagedHub && - len(reproQueue)+len(pendingRepro)+len(reproducing) == 0 - goto wait - case reply := <-mgr.reproRequest: - repros := make(map[string]bool) - for title := range reproducing { - repros[title] = true + if mgr.needRepro(crash) { + mgr.reproMgr.Enqueue(crash) + } + } + } +} + +func (mgr *Manager) convertBootError(err error) *Crash { + var bootErr vm.BootErrorer + if errors.As(err, &bootErr) { + title, output := bootErr.BootError() + rep := mgr.reporter.Parse(output) + if rep != nil && rep.Type == crash_pkg.UnexpectedReboot { + // Avoid detecting any boot crash as "unexpected kernel reboot". + rep = mgr.reporter.ParseFrom(output, rep.SkipPos) + } + if rep == nil { + rep = &report.Report{ + Title: title, + Output: output, } - reply <- repros - goto wait + } + return &Crash{ + Report: rep, } } + return nil } func reportReproError(err error) { @@ -560,10 +464,9 @@ func reportReproError(err error) { log.Errorf("repro failed: %v", err) } -func (mgr *Manager) runRepro(crash *Crash, vmIndexes []int, putInstances func(...int)) *ReproResult { - res, stats, err := repro.Run(crash.Output, mgr.cfg, mgr.enabledFeatures, mgr.reporter, mgr.vmPool, vmIndexes) +func (mgr *Manager) runRepro(crash *Crash) *ReproResult { + res, stats, err := repro.Run(crash.Output, mgr.cfg, mgr.enabledFeatures, mgr.reporter, mgr.pool) ret := &ReproResult{ - instances: vmIndexes, report0: crash.Report, repro: res, stats: stats, @@ -573,13 +476,9 @@ func (mgr *Manager) runRepro(crash *Crash, vmIndexes []int, putInstances func(.. originalTitle: crash.Title, } if err == nil && res != nil && mgr.cfg.StraceBin != "" { - // We need only one instance to get strace output, release the rest. - putInstances(vmIndexes[1:]...) - defer putInstances(vmIndexes[0]) - const straceAttempts = 2 for i := 1; i <= straceAttempts; i++ { - strace := repro.RunStrace(res, mgr.cfg, mgr.reporter, mgr.vmPool, vmIndexes[0]) + strace := repro.RunStrace(res, mgr.cfg, mgr.reporter, mgr.pool) sameBug := strace.IsSameBug(res) log.Logf(0, "strace run attempt %d/%d for '%s': same bug %v, error %v", i, straceAttempts, res.Report.Title, sameBug, strace.Error) @@ -590,72 +489,10 @@ func (mgr *Manager) runRepro(crash *Crash, vmIndexes []int, putInstances func(.. break } } - } else { - putInstances(vmIndexes...) } return ret } -type ResourcePool struct { - ids []int - mu sync.RWMutex - Freed chan interface{} -} - -func SequentialResourcePool(count int, delay time.Duration) *ResourcePool { - ret := &ResourcePool{Freed: make(chan interface{}, 1)} - go func() { - for i := 0; i < count; i++ { - ret.Put(i) - time.Sleep(delay) - } - }() - return ret -} - -func (pool *ResourcePool) Put(ids ...int) { - pool.mu.Lock() - defer pool.mu.Unlock() - pool.ids = append(pool.ids, ids...) - // Notify the listener. - select { - case pool.Freed <- true: - default: - } -} - -func (pool *ResourcePool) Len() int { - pool.mu.RLock() - defer pool.mu.RUnlock() - return len(pool.ids) -} - -func (pool *ResourcePool) Snapshot() []int { - pool.mu.RLock() - defer pool.mu.RUnlock() - return append([]int{}, pool.ids...) -} - -func (pool *ResourcePool) Take(cnt int) []int { - pool.mu.Lock() - defer pool.mu.Unlock() - totalItems := len(pool.ids) - if totalItems < cnt { - return nil - } - ret := append([]int{}, pool.ids[totalItems-cnt:]...) - pool.ids = pool.ids[:totalItems-cnt] - return ret -} - -func (pool *ResourcePool) TakeOne() *int { - ret := pool.Take(1) - if ret == nil { - return nil - } - return &ret[0] -} - func (mgr *Manager) preloadCorpus() { corpusDB, err := db.Open(filepath.Join(mgr.cfg.Workdir, "corpus.db"), true) if err != nil { @@ -857,13 +694,13 @@ func containsDisabled(p *prog.Prog, enabled map[*prog.Syscall]bool) bool { return false } -func (mgr *Manager) runInstance(index int) (*Crash, error) { - mgr.checkUsedFiles() +func (mgr *Manager) fuzzerInstance(ctx context.Context, inst *vm.Instance, updInfo dispatcher.UpdateInfo) { + index := inst.Index() instanceName := fmt.Sprintf("vm-%d", index) injectExec := make(chan bool, 10) - mgr.serv.CreateInstance(instanceName, injectExec) + mgr.serv.CreateInstance(instanceName, injectExec, updInfo) - rep, vmInfo, err := mgr.runInstanceInner(index, instanceName, injectExec) + rep, vmInfo, err := mgr.runInstanceInner(ctx, inst, instanceName, injectExec) lastExec, machineInfo := mgr.serv.ShutdownInstance(instanceName, rep != nil) if rep != nil { prependExecuting(rep, lastExec) @@ -872,48 +709,21 @@ func (mgr *Manager) runInstance(index int) (*Crash, error) { } rep.MachineInfo = machineInfo } - - // Error that is not a VM crash. - if err != nil { - return nil, err - } - // No crash. - if rep == nil { - return nil, nil + if err == nil && rep != nil { + mgr.crashes <- &Crash{ + instanceName: instanceName, + Report: rep, + } } - crash := &Crash{ - instanceName: instanceName, - Report: rep, + if err != nil { + log.Logf(1, "%s: failed with error: %v", instanceName, err) } - return crash, nil } -func (mgr *Manager) runInstanceInner(index int, instanceName string, injectExec <-chan bool) ( - *report.Report, []byte, error) { +func (mgr *Manager) runInstanceInner(ctx context.Context, inst *vm.Instance, instanceName string, + injectExec <-chan bool) (*report.Report, []byte, error) { start := time.Now() - inst, err := mgr.vmPool.Create(index) - if err != nil { - var bootErr vm.BootErrorer - if errors.As(err, &bootErr) { - title, output := bootErr.BootError() - rep := mgr.reporter.Parse(output) - if rep != nil && rep.Type == crash_pkg.UnexpectedReboot { - // Avoid detecting any boot crash as "unexpected kernel reboot". - rep = mgr.reporter.ParseFrom(output, rep.SkipPos) - } - if rep == nil { - rep = &report.Report{ - Title: title, - Output: output, - } - } - return rep, nil, nil - } - return nil, nil, fmt.Errorf("failed to create instance: %w", err) - } - defer inst.Close() - fwdAddr, err := inst.Forward(mgr.serv.Port) if err != nil { return nil, nil, fmt.Errorf("failed to setup port forwarding: %w", err) @@ -929,10 +739,6 @@ func (mgr *Manager) runInstanceInner(index int, instanceName string, injectExec } } - if *flagDebug { - mgr.cfg.Procs = 1 - } - // Run the fuzzer binary. mgr.bootTime.Save(time.Since(start)) start = time.Now() @@ -940,7 +746,7 @@ func (mgr *Manager) runInstanceInner(index int, instanceName string, injectExec addrPort := strings.Split(fwdAddr, ":") cmd := fmt.Sprintf("%v runner %v %v %v", executorBin, instanceName, addrPort[0], addrPort[1]) _, rep, err := inst.Run(mgr.cfg.Timeouts.VMRunningTime, mgr.reporter, cmd, - vm.ExitTimeout, vm.StopChan(mgr.vmStop), vm.InjectExecuting(injectExec), + vm.ExitTimeout, vm.StopContext(ctx), vm.InjectExecuting(injectExec), vm.EarlyFinishCb(func() { // Depending on the crash type and kernel config, fuzzing may continue // running for several seconds even after kernel has printed a crash report. @@ -1314,6 +1120,10 @@ func (mgr *Manager) saveRepro(res *ReproResult) { } } +func (mgr *Manager) resizeReproPool(size int) { + mgr.pool.ReserveForRun(size) +} + func (mgr *Manager) uploadReproAssets(repro *repro.Result) []dashapi.NewAsset { if mgr.assetStorage == nil { return nil @@ -1659,7 +1469,7 @@ func (mgr *Manager) fuzzerLoop(fuzzer *fuzzer.Fuzzer) { if mgr.cfg.Cover { // Distribute new max signal over all instances. newSignal := fuzzer.Cover.GrabSignalDelta() - log.Logf(2, "distributing %d new signal", len(newSignal)) + log.Logf(3, "distributing %d new signal", len(newSignal)) if len(newSignal) != 0 { mgr.serv.DistributeSignalDelta(newSignal) } @@ -1680,9 +1490,11 @@ func (mgr *Manager) fuzzerLoop(fuzzer *fuzzer.Fuzzer) { go mgr.hubSyncLoop(pickGetter(mgr.cfg.HubKey)) } else { mgr.phase = phaseTriagedHub + mgr.reproMgr.StartReproduction() } } else if mgr.phase == phaseQueriedHub { mgr.phase = phaseTriagedHub + mgr.reproMgr.StartReproduction() } mgr.mu.Unlock() } @@ -1695,6 +1507,7 @@ func (mgr *Manager) hubIsUnreachable() { if mgr.phase == phaseTriagedCorpus { dash = mgr.dash mgr.phase = phaseTriagedHub + mgr.reproMgr.StartReproduction() log.Errorf("did not manage to connect to syz-hub; moving forward") } mgr.mu.Unlock() @@ -1730,17 +1543,19 @@ func (mgr *Manager) collectUsedFiles() { } func (mgr *Manager) checkUsedFiles() { - for f, mod := range mgr.usedFiles { - stat, err := os.Stat(f) - if err != nil { - log.Fatalf("failed to stat %v: %v", f, err) - } - if mod != stat.ModTime() { - log.Fatalf("file %v that syz-manager uses has been modified by an external program\n"+ - "this can lead to arbitrary syz-manager misbehavior\n"+ - "modification time has changed: %v -> %v\n"+ - "don't modify files that syz-manager uses. exiting to prevent harm", - f, mod, stat.ModTime()) + for range time.NewTicker(30 * time.Second).C { + for f, mod := range mgr.usedFiles { + stat, err := os.Stat(f) + if err != nil { + log.Fatalf("failed to stat %v: %v", f, err) + } + if mod != stat.ModTime() { + log.Fatalf("file %v that syz-manager uses has been modified by an external program\n"+ + "this can lead to arbitrary syz-manager misbehavior\n"+ + "modification time has changed: %v -> %v\n"+ + "don't modify files that syz-manager uses. exiting to prevent harm", + f, mod, stat.ModTime()) + } } } } @@ -1786,10 +1601,9 @@ func (mgr *Manager) dashboardReporter() { } func (mgr *Manager) dashboardReproTasks() { + seq := 0 for range time.NewTicker(20 * time.Minute).C { - needReproReply := make(chan bool) - mgr.needMoreRepros <- needReproReply - if !<-needReproReply { + if !mgr.reproMgr.CanReproMore() { // We don't need reproducers at the moment. continue } @@ -1801,7 +1615,8 @@ func (mgr *Manager) dashboardReproTasks() { if len(resp.CrashLog) > 0 { title := resp.Title if title == "" { - title = "repro from the dashboard" + seq++ + title = fmt.Sprintf("repro #%d from the dashboard", seq) } mgr.externalReproQueue <- &Crash{ fromDashboard: true, diff --git a/syz-manager/repro.go b/syz-manager/repro.go new file mode 100644 index 000000000..1f5712819 --- /dev/null +++ b/syz-manager/repro.go @@ -0,0 +1,209 @@ +// Copyright 2024 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package main + +import ( + "context" + "maps" + "slices" + "sync" + + "github.com/google/syzkaller/pkg/log" + "github.com/google/syzkaller/pkg/stats" +) + +type reproManagerView interface { + runRepro(crash *Crash) *ReproResult // TODO: consider moving runRepro() to repro.go. + needRepro(crash *Crash) bool + resizeReproPool(size int) +} + +type reproManager struct { + Done chan *ReproResult + + statNumReproducing *stats.Val + statPending *stats.Val + + onlyOnce bool + mgr reproManagerView + parallel chan struct{} + pingQueue chan struct{} + reproVMs int + + mu sync.Mutex + queue []*Crash + reproducing map[string]bool +} + +func newReproManager(mgr reproManagerView, reproVMs int, onlyOnce bool) *reproManager { + ret := &reproManager{ + Done: make(chan *ReproResult, 10), + + mgr: mgr, + onlyOnce: onlyOnce, + parallel: make(chan struct{}, reproVMs), + reproVMs: reproVMs, + reproducing: map[string]bool{}, + pingQueue: make(chan struct{}, 1), + } + ret.statNumReproducing = stats.Create("reproducing", "Number of crashes being reproduced", + stats.Console, stats.NoGraph, func() int { + ret.mu.Lock() + defer ret.mu.Unlock() + return len(ret.reproducing) + }) + ret.statPending = stats.Create("pending", "Number of pending repro tasks", + stats.Console, stats.NoGraph, func() int { + ret.mu.Lock() + defer ret.mu.Unlock() + return len(ret.queue) + }) + return ret +} + +// startReproduction() is assumed to be called only once. +// The agument is the maximum number of VMs dedicated to the bug reproduction. +func (m *reproManager) StartReproduction() { + log.Logf(1, "starting reproductions (max %d VMs)", m.reproVMs) + + for count := 1; m.calculateReproVMs(count) <= m.reproVMs; count++ { + m.parallel <- struct{}{} + } +} + +func (m *reproManager) calculateReproVMs(repros int) int { + // Let's allocate 1.33 VMs per a reproducer thread. + if m.reproVMs == 1 && repros == 1 { + // With one exception -- if we have only one VM, let's still do one repro. + return 1 + } + return (repros*4 + 2) / 3 +} + +func (m *reproManager) CanReproMore() bool { + return len(m.parallel) != 0 +} + +func (m *reproManager) Reproducing() map[string]bool { + m.mu.Lock() + defer m.mu.Unlock() + return maps.Clone(m.reproducing) +} + +func (m *reproManager) Enqueue(crash *Crash) { + m.mu.Lock() + defer m.mu.Unlock() + + if m.onlyOnce { + // Try to reproduce each bug at most 1 time in this mode. + // Since we don't upload bugs/repros to dashboard, it likely won't have + // the reproducer even if we succeeded last time, and will repeatedly + // say it needs a repro. + for _, queued := range m.queue { + if queued.Title == crash.Title { + return + } + } + } + log.Logf(1, "scheduled a reproduction of '%v'", crash.Title) + m.queue = append(m.queue, crash) + + // Ping the loop. + select { + case m.pingQueue <- struct{}{}: + default: + } +} + +func (m *reproManager) popCrash() *Crash { + m.mu.Lock() + defer m.mu.Unlock() + + for i, crash := range m.queue { + if m.reproducing[crash.Title] { + continue + } + m.queue = slices.Delete(m.queue, i, i+1) + return crash + } + return nil +} + +func (m *reproManager) Loop(ctx context.Context) { + var wg sync.WaitGroup + defer wg.Wait() + + for { + crash := m.popCrash() + for crash == nil { + select { + case <-m.pingQueue: + crash = m.popCrash() + case <-ctx.Done(): + return + } + if !m.mgr.needRepro(crash) { + continue + } + } + + // Now wait until we can schedule another runner. + select { + case <-m.parallel: + case <-ctx.Done(): + return + } + + m.mu.Lock() + m.reproducing[crash.Title] = true + m.adjustPoolSizeLocked() + m.mu.Unlock() + + wg.Add(1) + go func() { + defer wg.Done() + + m.handle(crash) + + m.mu.Lock() + delete(m.reproducing, crash.Title) + m.adjustPoolSizeLocked() + m.mu.Unlock() + + m.parallel <- struct{}{} + m.pingQueue <- struct{}{} + }() + } +} + +func (m *reproManager) handle(crash *Crash) { + log.Logf(0, "start reproducing '%v'", crash.Title) + + res := m.mgr.runRepro(crash) + + crepro := false + title := "" + if res.repro != nil { + crepro = res.repro.CRepro + title = res.repro.Report.Title + } + log.Logf(0, "repro finished '%v', repro=%v crepro=%v desc='%v' hub=%v from_dashboard=%v", + res.report0.Title, res.repro != nil, crepro, title, res.fromHub, res.fromDashboard, + ) + m.Done <- res +} + +func (m *reproManager) adjustPoolSizeLocked() { + // Avoid the +-1 jitter by considering the repro queue size as well. + + // We process same-titled crashes sequentially, so only count unique ones. + uniqueTitles := make(map[string]bool) + for _, crash := range m.queue { + uniqueTitles[crash.Title] = true + } + + needRepros := len(m.reproducing) + len(uniqueTitles) + VMs := min(m.reproVMs, m.calculateReproVMs(needRepros)) + m.mgr.resizeReproPool(VMs) +} diff --git a/syz-manager/repro_test.go b/syz-manager/repro_test.go new file mode 100644 index 000000000..61c6dc4b4 --- /dev/null +++ b/syz-manager/repro_test.go @@ -0,0 +1,94 @@ +// Copyright 2024 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package main + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/google/syzkaller/pkg/report" + "github.com/stretchr/testify/assert" +) + +func TestReproManager(t *testing.T) { + mock := &reproMgrMock{ + run: make(chan runCallback), + } + obj := newReproManager(mock, 3, false) + + ctx, done := context.WithCancel(context.Background()) + complete := make(chan struct{}) + go func() { + obj.Loop(ctx) + close(complete) + }() + + defer func() { + done() + <-complete + }() + + // No reproductions until we've signaled to start. + assert.False(t, obj.CanReproMore()) + obj.StartReproduction() + + // No reproducers -- we can definitely take more. + assert.True(t, obj.CanReproMore()) + obj.Enqueue(&Crash{Report: &report.Report{Title: "A"}}) + called := <-mock.run + assert.Equal(t, "A", called.crash.Title) + + // One reproducer is running -- we can take one more. + assert.True(t, obj.CanReproMore()) + assert.EqualValues(t, 2, mock.reserved.Load()) + obj.Enqueue(&Crash{Report: &report.Report{Title: "B"}}) + called2 := <-mock.run + assert.Equal(t, "B", called2.crash.Title) + + assert.False(t, obj.CanReproMore()) + assert.Len(t, obj.Reproducing(), 2) + assert.EqualValues(t, 3, mock.reserved.Load()) + + // Pretend that reproducers have finished. + called.ret <- &ReproResult{report0: &report.Report{}} + called2.ret <- &ReproResult{report0: &report.Report{}} + + // Wait until the number of reserved VMs goes to 0. + for i := 0; i < 100; i++ { + if mock.reserved.Load() == 0 { + assert.True(t, obj.CanReproMore()) + return + } + time.Sleep(10 * time.Millisecond) + } + t.Fatal("reserved VMs must have dropped to 0") +} + +type reproMgrMock struct { + reserved atomic.Int64 + run chan runCallback +} + +type runCallback struct { + crash *Crash + ret chan *ReproResult +} + +func (m *reproMgrMock) runRepro(crash *Crash) *ReproResult { + retCh := make(chan *ReproResult) + m.run <- runCallback{crash: crash, ret: retCh} + ret := <-retCh + close(retCh) + return ret +} + +func (m *reproMgrMock) needRepro(crash *Crash) bool { + return true +} + +func (m *reproMgrMock) resizeReproPool(VMs int) { + m.reserved.Store(int64(VMs)) +} diff --git a/syz-manager/stats.go b/syz-manager/stats.go index cffa7e4c1..dced8fc87 100644 --- a/syz-manager/stats.go +++ b/syz-manager/stats.go @@ -13,19 +13,16 @@ import ( ) type Stats struct { - statNumReproducing *stats.Val - statCrashes *stats.Val - statCrashTypes *stats.Val - statSuppressed *stats.Val - statUptime *stats.Val - statFuzzingTime *stats.Val - statAvgBootTime *stats.Val - statCoverFiltered *stats.Val + statCrashes *stats.Val + statCrashTypes *stats.Val + statSuppressed *stats.Val + statUptime *stats.Val + statFuzzingTime *stats.Val + statAvgBootTime *stats.Val + statCoverFiltered *stats.Val } func (mgr *Manager) initStats() { - mgr.statNumReproducing = stats.Create("reproducing", "Number of crashes being reproduced", - stats.Console, stats.NoGraph) mgr.statCrashes = stats.Create("crashes", "Total number of VM crashes", stats.Simple, stats.Prometheus("syz_crash_total")) mgr.statCrashTypes = stats.Create("crash types", "Number of unique crashes types", |
