aboutsummaryrefslogtreecommitdiffstats
path: root/syz-manager
diff options
context:
space:
mode:
authorAleksandr Nogikh <nogikh@google.com>2024-07-05 15:41:55 +0200
committerAleksandr Nogikh <nogikh@google.com>2024-07-11 09:03:37 +0000
commit6ca1eb5a578bb1421ad0f3dbde675eb34647e6d7 (patch)
tree935c35d1225bcd55b0d413796168c758734af0f9 /syz-manager
parentfc2b083017012b4afbd9324fc6525e34a19aa0b3 (diff)
all: transition to instance.Pool
Rely on instance.Pool to perform fuzzing and do bug reproductions. Extract the reproduction queue logic to separate testable class.
Diffstat (limited to 'syz-manager')
-rw-r--r--syz-manager/http.go73
-rw-r--r--syz-manager/hub.go12
-rw-r--r--syz-manager/manager.go377
-rw-r--r--syz-manager/repro.go209
-rw-r--r--syz-manager/repro_test.go94
-rw-r--r--syz-manager/stats.go17
6 files changed, 455 insertions, 327 deletions
diff --git a/syz-manager/http.go b/syz-manager/http.go
index ab54f86e5..f51014667 100644
--- a/syz-manager/http.go
+++ b/syz-manager/http.go
@@ -24,10 +24,10 @@ import (
"github.com/google/syzkaller/pkg/html/pages"
"github.com/google/syzkaller/pkg/log"
"github.com/google/syzkaller/pkg/osutil"
- "github.com/google/syzkaller/pkg/rpcserver"
"github.com/google/syzkaller/pkg/stats"
"github.com/google/syzkaller/pkg/vcs"
"github.com/google/syzkaller/prog"
+ "github.com/google/syzkaller/vm/dispatcher"
"github.com/gorilla/handlers"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
@@ -172,40 +172,55 @@ func (mgr *Manager) httpVMs(w http.ResponseWriter, r *http.Request) {
}
// TODO: we could also query vmLoop for VMs that are idle (waiting to start reproducing),
// and query the exact bug that is being reproduced by a VM.
- for name, state := range mgr.serv.VMState() {
+ for id, state := range mgr.pool.State() {
+ name := fmt.Sprintf("#%d", id)
info := UIVMInfo{
Name: name,
State: "unknown",
- Since: time.Since(state.Timestamp),
+ Since: time.Since(state.LastUpdate),
}
switch state.State {
- case rpcserver.StateOffline:
- info.State = "reproducing"
- case rpcserver.StateBooting:
+ case dispatcher.StateOffline:
+ info.State = "offline"
+ case dispatcher.StateBooting:
info.State = "booting"
- case rpcserver.StateFuzzing:
- info.State = "fuzzing"
- info.MachineInfo = fmt.Sprintf("/vm?type=machine-info&name=%v", name)
- info.RunnerStatus = fmt.Sprintf("/vm?type=runner-status&name=%v", name)
- case rpcserver.StateStopping:
- info.State = "crashed"
+ case dispatcher.StateWaiting:
+ info.State = "waiting"
+ case dispatcher.StateRunning:
+ info.State = "running: " + state.Status
+ }
+ if state.Reserved {
+ info.State = "[reserved] " + info.State
+ }
+ if state.MachineInfo != nil {
+ info.MachineInfo = fmt.Sprintf("/vm?type=machine-info&id=%d", id)
+ }
+ if state.DetailedStatus != nil {
+ info.DetailedStatus = fmt.Sprintf("/vm?type=detailed-status&id=%v", id)
}
data.VMs = append(data.VMs, info)
}
- sort.Slice(data.VMs, func(i, j int) bool {
- return data.VMs[i].Name < data.VMs[j].Name
- })
executeTemplate(w, vmsTemplate, data)
}
func (mgr *Manager) httpVM(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", ctTextPlain)
- vm := r.FormValue("name")
+ id, err := strconv.Atoi(r.FormValue("id"))
+ infos := mgr.pool.State()
+ if err != nil || id < 0 || id >= len(infos) {
+ http.Error(w, "invalid instance id", http.StatusBadRequest)
+ return
+ }
+ info := infos[id]
switch r.FormValue("type") {
case "machine-info":
- w.Write(mgr.serv.MachineInfo(vm))
- case "runner-status":
- w.Write(mgr.serv.RunnerStatus(vm))
+ if info.MachineInfo != nil {
+ w.Write(info.MachineInfo())
+ }
+ case "detailed-status":
+ if info.DetailedStatus != nil {
+ w.Write(info.DetailedStatus())
+ }
default:
w.Write([]byte("unknown info type"))
}
@@ -619,10 +634,8 @@ func (mgr *Manager) httpFilterPCs(w http.ResponseWriter, r *http.Request) {
func (mgr *Manager) collectCrashes(workdir string) ([]*UICrashType, error) {
// Note: mu is not locked here.
var repros map[string]bool
- if !mgr.cfg.VMLess {
- reproReply := make(chan map[string]bool)
- mgr.reproRequest <- reproReply
- repros = <-reproReply
+ if !mgr.cfg.VMLess && mgr.reproMgr != nil {
+ repros = mgr.reproMgr.Reproducing()
}
crashdir := filepath.Join(workdir, "crashes")
@@ -777,11 +790,11 @@ type UIVMData struct {
}
type UIVMInfo struct {
- Name string
- State string
- Since time.Duration
- MachineInfo string
- RunnerStatus string
+ Name string
+ State string
+ Since time.Duration
+ MachineInfo string
+ DetailedStatus string
}
type UISyscallsData struct {
@@ -918,7 +931,7 @@ var vmsTemplate = pages.Create(`
<th><a onclick="return sortTable(this, 'State', textSort)" href="#">State</a></th>
<th><a onclick="return sortTable(this, 'Since', timeSort)" href="#">Since</a></th>
<th><a onclick="return sortTable(this, 'Machine Info', timeSort)" href="#">Machine Info</a></th>
- <th><a onclick="return sortTable(this, 'Runner Status', timeSort)" href="#">Runner Status</a></th>
+ <th><a onclick="return sortTable(this, 'Status', timeSort)" href="#">Status</a></th>
</tr>
{{range $vm := $.VMs}}
<tr>
@@ -926,7 +939,7 @@ var vmsTemplate = pages.Create(`
<td>{{$vm.State}}</td>
<td>{{formatDuration $vm.Since}}</td>
<td>{{optlink $vm.MachineInfo "info"}}</td>
- <td>{{optlink $vm.RunnerStatus "status"}}</td>
+ <td>{{optlink $vm.DetailedStatus "status"}}</td>
</tr>
{{end}}
</table>
diff --git a/syz-manager/hub.go b/syz-manager/hub.go
index 218617060..7f0c847a0 100644
--- a/syz-manager/hub.go
+++ b/syz-manager/hub.go
@@ -7,6 +7,7 @@ import (
"fmt"
"net/http"
"strings"
+ "sync/atomic"
"time"
"github.com/google/syzkaller/pkg/auth"
@@ -59,7 +60,7 @@ func (mgr *Manager) hubSyncLoop(keyGet keyGetter) {
statRecvReproDrop: stats.Create("hub recv repro drop", "", stats.NoGraph),
}
if mgr.cfg.Reproduce && mgr.dash != nil {
- hc.needMoreRepros = mgr.needMoreRepros
+ hc.needMoreRepros = mgr.reproMgr.CanReproMore
}
hc.loop()
}
@@ -75,8 +76,9 @@ type HubConnector struct {
hubCorpus map[string]bool
newRepros [][]byte
hubReproQueue chan *Crash
- needMoreRepros chan chan bool
+ needMoreRepros func() bool
keyGet keyGetter
+ reproSeq atomic.Int64
statSendProgAdd *stats.Val
statSendProgDel *stats.Val
@@ -199,9 +201,7 @@ func (hc *HubConnector) sync(hub *rpctype.RPCClient, corpus []*corpus.Item) erro
a.Del = append(a.Del, sig)
}
if hc.needMoreRepros != nil {
- needReproReply := make(chan bool)
- hc.needMoreRepros <- needReproReply
- a.NeedRepros = <-needReproReply
+ a.NeedRepros = hc.needMoreRepros()
}
a.Repros = hc.newRepros
for {
@@ -307,7 +307,7 @@ func (hc *HubConnector) processRepros(repros [][]byte) int {
hc.hubReproQueue <- &Crash{
fromHub: true,
Report: &report.Report{
- Title: "external repro",
+ Title: fmt.Sprintf("external repro #%d", hc.reproSeq.Add(1)),
Type: typ,
Output: repro,
},
diff --git a/syz-manager/manager.go b/syz-manager/manager.go
index 396ffb97e..e1c2ad050 100644
--- a/syz-manager/manager.go
+++ b/syz-manager/manager.go
@@ -47,6 +47,7 @@ import (
"github.com/google/syzkaller/prog"
"github.com/google/syzkaller/sys/targets"
"github.com/google/syzkaller/vm"
+ "github.com/google/syzkaller/vm/dispatcher"
)
var (
@@ -72,6 +73,7 @@ type Manager struct {
cfg *mgrconfig.Config
mode Mode
vmPool *vm.Pool
+ pool *dispatcher.Pool[*vm.Instance]
target *prog.Target
sysTarget *targets.Target
reporter *report.Reporter
@@ -108,9 +110,8 @@ type Manager struct {
dataRaceFrames map[string]bool
saturatedCalls map[string]bool
- needMoreRepros chan chan bool
externalReproQueue chan *Crash
- reproRequest chan chan map[string]bool
+ crashes chan *Crash
// For checking that files that we are using are not changing under us.
// Maps file name to modification time.
@@ -123,6 +124,8 @@ type Manager struct {
bootTime stats.AverageValue[time.Duration]
+ reproMgr *reproManager
+
Stats
}
@@ -239,12 +242,15 @@ func RunManager(cfg *mgrconfig.Config) {
fresh: true,
vmStop: make(chan bool),
externalReproQueue: make(chan *Crash, 10),
- needMoreRepros: make(chan chan bool),
- reproRequest: make(chan chan map[string]bool),
+ crashes: make(chan *Crash, 10),
usedFiles: make(map[string]time.Time),
saturatedCalls: make(map[string]bool),
}
+ if *flagDebug {
+ mgr.cfg.Procs = 1
+ }
+
mgr.initStats()
if mode == ModeFuzzing || mode == ModeCorpusTriage {
go mgr.preloadCorpus()
@@ -297,7 +303,13 @@ func RunManager(cfg *mgrconfig.Config) {
<-vm.Shutdown
return
}
- mgr.vmLoop()
+ ctx := vm.ShutdownCtx()
+ mgr.pool = vm.NewDispatcher(mgr.vmPool, mgr.fuzzerInstance)
+ mgr.reproMgr = newReproManager(mgr, mgr.vmPool.Count()-mgr.cfg.FuzzingVMs, mgr.cfg.DashboardOnlyRepro)
+ go mgr.processFuzzingResults(ctx)
+ go mgr.checkUsedFiles()
+ go mgr.reproMgr.Loop(ctx)
+ mgr.pool.Loop(ctx)
}
// Exit successfully in special operation modes.
@@ -358,14 +370,7 @@ func (mgr *Manager) writeBench() {
}
}
-type RunResult struct {
- idx int
- crash *Crash
- err error
-}
-
type ReproResult struct {
- instances []int
report0 *report.Report // the original report we started reproducing
repro *repro.Result
strace *repro.StraceResult
@@ -376,135 +381,25 @@ type ReproResult struct {
originalTitle string // crash title before we started bug reproduction
}
-// Manager needs to be refactored (#605).
-// nolint: gocyclo, gocognit, funlen
-func (mgr *Manager) vmLoop() {
- log.Logf(0, "booting test machines...")
- log.Logf(0, "wait for the connection from test machine...")
- instancesPerRepro := 3
- vmCount := mgr.vmPool.Count()
- maxReproVMs := vmCount - mgr.cfg.FuzzingVMs
- if instancesPerRepro > maxReproVMs && maxReproVMs > 0 {
- instancesPerRepro = maxReproVMs
- }
- instances := SequentialResourcePool(vmCount, 5*time.Second)
- runDone := make(chan *RunResult, 1)
- pendingRepro := make(map[*Crash]bool)
- reproducing := make(map[string]bool)
- attemptedRepros := make(map[string]bool)
- var reproQueue []*Crash
- reproDone := make(chan *ReproResult, 1)
- stopPending := false
- shutdown := vm.Shutdown
- for shutdown != nil || instances.Len() != vmCount {
- mgr.mu.Lock()
- phase := mgr.phase
- mgr.mu.Unlock()
-
- for crash := range pendingRepro {
- if reproducing[crash.Title] {
- continue
- }
- delete(pendingRepro, crash)
- if !mgr.needRepro(crash) {
- continue
- }
- if mgr.cfg.DashboardOnlyRepro && attemptedRepros[crash.Title] {
- // Try to reproduce each bug at most 1 time in this mode.
- // Since we don't upload bugs/repros to dashboard, it likely won't have
- // the reproducer even if we succeeded last time, and will repeatedly
- // say it needs a repro.
- continue
- }
- log.Logf(1, "loop: add to repro queue '%v'", crash.Title)
- attemptedRepros[crash.Title] = true
- reproducing[crash.Title] = true
- reproQueue = append(reproQueue, crash)
- }
-
- log.Logf(1, "loop: phase=%v shutdown=%v instances=%v/%v %+v repro: pending=%v reproducing=%v queued=%v",
- phase, shutdown == nil, instances.Len(), vmCount, instances.Snapshot(),
- len(pendingRepro), len(reproducing), len(reproQueue))
-
- canRepro := func() bool {
- return phase >= phaseTriagedHub && len(reproQueue) != 0 &&
- (mgr.statNumReproducing.Val()+1)*instancesPerRepro <= maxReproVMs
- }
-
- if shutdown != nil {
- for canRepro() {
- vmIndexes := instances.Take(instancesPerRepro)
- if vmIndexes == nil {
- break
- }
- last := len(reproQueue) - 1
- crash := reproQueue[last]
- reproQueue[last] = nil
- reproQueue = reproQueue[:last]
- mgr.statNumReproducing.Add(1)
- log.Logf(0, "loop: starting repro of '%v' on instances %+v", crash.Title, vmIndexes)
- go func() {
- reproDone <- mgr.runRepro(crash, vmIndexes, instances.Put)
- }()
- }
- for !canRepro() {
- idx := instances.TakeOne()
- if idx == nil {
- break
- }
- log.Logf(1, "loop: starting instance %v", *idx)
- go func() {
- crash, err := mgr.runInstance(*idx)
- runDone <- &RunResult{*idx, crash, err}
- }()
- }
- }
-
- var stopRequest chan bool
- if !stopPending && canRepro() {
- stopRequest = mgr.vmStop
- }
-
- wait:
+func (mgr *Manager) processFuzzingResults(ctx context.Context) {
+ for {
select {
- case <-instances.Freed:
- // An instance has been released.
- case stopRequest <- true:
- log.Logf(1, "loop: issued stop request")
- stopPending = true
- case res := <-runDone:
- log.Logf(1, "loop: instance %v finished, crash=%v", res.idx, res.crash != nil)
- if res.err != nil && shutdown != nil {
- log.Logf(0, "%v", res.err)
- }
- stopPending = false
- instances.Put(res.idx)
- // On shutdown qemu crashes with "qemu: terminating on signal 2",
- // which we detect as "lost connection". Don't save that as crash.
- if shutdown != nil && res.crash != nil {
- needRepro := mgr.saveCrash(res.crash)
- if needRepro {
- log.Logf(1, "loop: add pending repro for '%v'", res.crash.Title)
- pendingRepro[res.crash] = true
- }
+ case <-ctx.Done():
+ return
+ case crash := <-mgr.crashes:
+ needRepro := mgr.saveCrash(crash)
+ if mgr.cfg.Reproduce && needRepro {
+ mgr.reproMgr.Enqueue(crash)
}
- case res := <-reproDone:
- mgr.statNumReproducing.Add(-1)
- crepro := false
- title := ""
- if res.repro != nil {
- crepro = res.repro.CRepro
- title = res.repro.Report.Title
+ case err := <-mgr.pool.BootErrors:
+ crash := mgr.convertBootError(err)
+ if crash != nil {
+ mgr.saveCrash(crash)
}
- log.Logf(0, "loop: repro on %+v finished '%v', repro=%v crepro=%v desc='%v'"+
- " hub=%v from_dashboard=%v",
- res.instances, res.report0.Title, res.repro != nil, crepro, title,
- res.fromHub, res.fromDashboard,
- )
+ case res := <-mgr.reproMgr.Done:
if res.err != nil {
reportReproError(res.err)
}
- delete(reproducing, res.report0.Title)
if res.repro == nil {
if res.fromHub {
log.Logf(1, "repro '%v' came from syz-hub, not reporting the failure",
@@ -516,25 +411,34 @@ func (mgr *Manager) vmLoop() {
} else {
mgr.saveRepro(res)
}
- case <-shutdown:
- log.Logf(1, "loop: shutting down...")
- shutdown = nil
case crash := <-mgr.externalReproQueue:
- log.Logf(1, "loop: got repro request")
- pendingRepro[crash] = true
- case reply := <-mgr.needMoreRepros:
- reply <- phase >= phaseTriagedHub &&
- len(reproQueue)+len(pendingRepro)+len(reproducing) == 0
- goto wait
- case reply := <-mgr.reproRequest:
- repros := make(map[string]bool)
- for title := range reproducing {
- repros[title] = true
+ if mgr.needRepro(crash) {
+ mgr.reproMgr.Enqueue(crash)
+ }
+ }
+ }
+}
+
+func (mgr *Manager) convertBootError(err error) *Crash {
+ var bootErr vm.BootErrorer
+ if errors.As(err, &bootErr) {
+ title, output := bootErr.BootError()
+ rep := mgr.reporter.Parse(output)
+ if rep != nil && rep.Type == crash_pkg.UnexpectedReboot {
+ // Avoid detecting any boot crash as "unexpected kernel reboot".
+ rep = mgr.reporter.ParseFrom(output, rep.SkipPos)
+ }
+ if rep == nil {
+ rep = &report.Report{
+ Title: title,
+ Output: output,
}
- reply <- repros
- goto wait
+ }
+ return &Crash{
+ Report: rep,
}
}
+ return nil
}
func reportReproError(err error) {
@@ -560,10 +464,9 @@ func reportReproError(err error) {
log.Errorf("repro failed: %v", err)
}
-func (mgr *Manager) runRepro(crash *Crash, vmIndexes []int, putInstances func(...int)) *ReproResult {
- res, stats, err := repro.Run(crash.Output, mgr.cfg, mgr.enabledFeatures, mgr.reporter, mgr.vmPool, vmIndexes)
+func (mgr *Manager) runRepro(crash *Crash) *ReproResult {
+ res, stats, err := repro.Run(crash.Output, mgr.cfg, mgr.enabledFeatures, mgr.reporter, mgr.pool)
ret := &ReproResult{
- instances: vmIndexes,
report0: crash.Report,
repro: res,
stats: stats,
@@ -573,13 +476,9 @@ func (mgr *Manager) runRepro(crash *Crash, vmIndexes []int, putInstances func(..
originalTitle: crash.Title,
}
if err == nil && res != nil && mgr.cfg.StraceBin != "" {
- // We need only one instance to get strace output, release the rest.
- putInstances(vmIndexes[1:]...)
- defer putInstances(vmIndexes[0])
-
const straceAttempts = 2
for i := 1; i <= straceAttempts; i++ {
- strace := repro.RunStrace(res, mgr.cfg, mgr.reporter, mgr.vmPool, vmIndexes[0])
+ strace := repro.RunStrace(res, mgr.cfg, mgr.reporter, mgr.pool)
sameBug := strace.IsSameBug(res)
log.Logf(0, "strace run attempt %d/%d for '%s': same bug %v, error %v",
i, straceAttempts, res.Report.Title, sameBug, strace.Error)
@@ -590,72 +489,10 @@ func (mgr *Manager) runRepro(crash *Crash, vmIndexes []int, putInstances func(..
break
}
}
- } else {
- putInstances(vmIndexes...)
}
return ret
}
-type ResourcePool struct {
- ids []int
- mu sync.RWMutex
- Freed chan interface{}
-}
-
-func SequentialResourcePool(count int, delay time.Duration) *ResourcePool {
- ret := &ResourcePool{Freed: make(chan interface{}, 1)}
- go func() {
- for i := 0; i < count; i++ {
- ret.Put(i)
- time.Sleep(delay)
- }
- }()
- return ret
-}
-
-func (pool *ResourcePool) Put(ids ...int) {
- pool.mu.Lock()
- defer pool.mu.Unlock()
- pool.ids = append(pool.ids, ids...)
- // Notify the listener.
- select {
- case pool.Freed <- true:
- default:
- }
-}
-
-func (pool *ResourcePool) Len() int {
- pool.mu.RLock()
- defer pool.mu.RUnlock()
- return len(pool.ids)
-}
-
-func (pool *ResourcePool) Snapshot() []int {
- pool.mu.RLock()
- defer pool.mu.RUnlock()
- return append([]int{}, pool.ids...)
-}
-
-func (pool *ResourcePool) Take(cnt int) []int {
- pool.mu.Lock()
- defer pool.mu.Unlock()
- totalItems := len(pool.ids)
- if totalItems < cnt {
- return nil
- }
- ret := append([]int{}, pool.ids[totalItems-cnt:]...)
- pool.ids = pool.ids[:totalItems-cnt]
- return ret
-}
-
-func (pool *ResourcePool) TakeOne() *int {
- ret := pool.Take(1)
- if ret == nil {
- return nil
- }
- return &ret[0]
-}
-
func (mgr *Manager) preloadCorpus() {
corpusDB, err := db.Open(filepath.Join(mgr.cfg.Workdir, "corpus.db"), true)
if err != nil {
@@ -857,13 +694,13 @@ func containsDisabled(p *prog.Prog, enabled map[*prog.Syscall]bool) bool {
return false
}
-func (mgr *Manager) runInstance(index int) (*Crash, error) {
- mgr.checkUsedFiles()
+func (mgr *Manager) fuzzerInstance(ctx context.Context, inst *vm.Instance, updInfo dispatcher.UpdateInfo) {
+ index := inst.Index()
instanceName := fmt.Sprintf("vm-%d", index)
injectExec := make(chan bool, 10)
- mgr.serv.CreateInstance(instanceName, injectExec)
+ mgr.serv.CreateInstance(instanceName, injectExec, updInfo)
- rep, vmInfo, err := mgr.runInstanceInner(index, instanceName, injectExec)
+ rep, vmInfo, err := mgr.runInstanceInner(ctx, inst, instanceName, injectExec)
lastExec, machineInfo := mgr.serv.ShutdownInstance(instanceName, rep != nil)
if rep != nil {
prependExecuting(rep, lastExec)
@@ -872,48 +709,21 @@ func (mgr *Manager) runInstance(index int) (*Crash, error) {
}
rep.MachineInfo = machineInfo
}
-
- // Error that is not a VM crash.
- if err != nil {
- return nil, err
- }
- // No crash.
- if rep == nil {
- return nil, nil
+ if err == nil && rep != nil {
+ mgr.crashes <- &Crash{
+ instanceName: instanceName,
+ Report: rep,
+ }
}
- crash := &Crash{
- instanceName: instanceName,
- Report: rep,
+ if err != nil {
+ log.Logf(1, "%s: failed with error: %v", instanceName, err)
}
- return crash, nil
}
-func (mgr *Manager) runInstanceInner(index int, instanceName string, injectExec <-chan bool) (
- *report.Report, []byte, error) {
+func (mgr *Manager) runInstanceInner(ctx context.Context, inst *vm.Instance, instanceName string,
+ injectExec <-chan bool) (*report.Report, []byte, error) {
start := time.Now()
- inst, err := mgr.vmPool.Create(index)
- if err != nil {
- var bootErr vm.BootErrorer
- if errors.As(err, &bootErr) {
- title, output := bootErr.BootError()
- rep := mgr.reporter.Parse(output)
- if rep != nil && rep.Type == crash_pkg.UnexpectedReboot {
- // Avoid detecting any boot crash as "unexpected kernel reboot".
- rep = mgr.reporter.ParseFrom(output, rep.SkipPos)
- }
- if rep == nil {
- rep = &report.Report{
- Title: title,
- Output: output,
- }
- }
- return rep, nil, nil
- }
- return nil, nil, fmt.Errorf("failed to create instance: %w", err)
- }
- defer inst.Close()
-
fwdAddr, err := inst.Forward(mgr.serv.Port)
if err != nil {
return nil, nil, fmt.Errorf("failed to setup port forwarding: %w", err)
@@ -929,10 +739,6 @@ func (mgr *Manager) runInstanceInner(index int, instanceName string, injectExec
}
}
- if *flagDebug {
- mgr.cfg.Procs = 1
- }
-
// Run the fuzzer binary.
mgr.bootTime.Save(time.Since(start))
start = time.Now()
@@ -940,7 +746,7 @@ func (mgr *Manager) runInstanceInner(index int, instanceName string, injectExec
addrPort := strings.Split(fwdAddr, ":")
cmd := fmt.Sprintf("%v runner %v %v %v", executorBin, instanceName, addrPort[0], addrPort[1])
_, rep, err := inst.Run(mgr.cfg.Timeouts.VMRunningTime, mgr.reporter, cmd,
- vm.ExitTimeout, vm.StopChan(mgr.vmStop), vm.InjectExecuting(injectExec),
+ vm.ExitTimeout, vm.StopContext(ctx), vm.InjectExecuting(injectExec),
vm.EarlyFinishCb(func() {
// Depending on the crash type and kernel config, fuzzing may continue
// running for several seconds even after kernel has printed a crash report.
@@ -1314,6 +1120,10 @@ func (mgr *Manager) saveRepro(res *ReproResult) {
}
}
+func (mgr *Manager) resizeReproPool(size int) {
+ mgr.pool.ReserveForRun(size)
+}
+
func (mgr *Manager) uploadReproAssets(repro *repro.Result) []dashapi.NewAsset {
if mgr.assetStorage == nil {
return nil
@@ -1659,7 +1469,7 @@ func (mgr *Manager) fuzzerLoop(fuzzer *fuzzer.Fuzzer) {
if mgr.cfg.Cover {
// Distribute new max signal over all instances.
newSignal := fuzzer.Cover.GrabSignalDelta()
- log.Logf(2, "distributing %d new signal", len(newSignal))
+ log.Logf(3, "distributing %d new signal", len(newSignal))
if len(newSignal) != 0 {
mgr.serv.DistributeSignalDelta(newSignal)
}
@@ -1680,9 +1490,11 @@ func (mgr *Manager) fuzzerLoop(fuzzer *fuzzer.Fuzzer) {
go mgr.hubSyncLoop(pickGetter(mgr.cfg.HubKey))
} else {
mgr.phase = phaseTriagedHub
+ mgr.reproMgr.StartReproduction()
}
} else if mgr.phase == phaseQueriedHub {
mgr.phase = phaseTriagedHub
+ mgr.reproMgr.StartReproduction()
}
mgr.mu.Unlock()
}
@@ -1695,6 +1507,7 @@ func (mgr *Manager) hubIsUnreachable() {
if mgr.phase == phaseTriagedCorpus {
dash = mgr.dash
mgr.phase = phaseTriagedHub
+ mgr.reproMgr.StartReproduction()
log.Errorf("did not manage to connect to syz-hub; moving forward")
}
mgr.mu.Unlock()
@@ -1730,17 +1543,19 @@ func (mgr *Manager) collectUsedFiles() {
}
func (mgr *Manager) checkUsedFiles() {
- for f, mod := range mgr.usedFiles {
- stat, err := os.Stat(f)
- if err != nil {
- log.Fatalf("failed to stat %v: %v", f, err)
- }
- if mod != stat.ModTime() {
- log.Fatalf("file %v that syz-manager uses has been modified by an external program\n"+
- "this can lead to arbitrary syz-manager misbehavior\n"+
- "modification time has changed: %v -> %v\n"+
- "don't modify files that syz-manager uses. exiting to prevent harm",
- f, mod, stat.ModTime())
+ for range time.NewTicker(30 * time.Second).C {
+ for f, mod := range mgr.usedFiles {
+ stat, err := os.Stat(f)
+ if err != nil {
+ log.Fatalf("failed to stat %v: %v", f, err)
+ }
+ if mod != stat.ModTime() {
+ log.Fatalf("file %v that syz-manager uses has been modified by an external program\n"+
+ "this can lead to arbitrary syz-manager misbehavior\n"+
+ "modification time has changed: %v -> %v\n"+
+ "don't modify files that syz-manager uses. exiting to prevent harm",
+ f, mod, stat.ModTime())
+ }
}
}
}
@@ -1786,10 +1601,9 @@ func (mgr *Manager) dashboardReporter() {
}
func (mgr *Manager) dashboardReproTasks() {
+ seq := 0
for range time.NewTicker(20 * time.Minute).C {
- needReproReply := make(chan bool)
- mgr.needMoreRepros <- needReproReply
- if !<-needReproReply {
+ if !mgr.reproMgr.CanReproMore() {
// We don't need reproducers at the moment.
continue
}
@@ -1801,7 +1615,8 @@ func (mgr *Manager) dashboardReproTasks() {
if len(resp.CrashLog) > 0 {
title := resp.Title
if title == "" {
- title = "repro from the dashboard"
+ seq++
+ title = fmt.Sprintf("repro #%d from the dashboard", seq)
}
mgr.externalReproQueue <- &Crash{
fromDashboard: true,
diff --git a/syz-manager/repro.go b/syz-manager/repro.go
new file mode 100644
index 000000000..1f5712819
--- /dev/null
+++ b/syz-manager/repro.go
@@ -0,0 +1,209 @@
+// Copyright 2024 syzkaller project authors. All rights reserved.
+// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
+
+package main
+
+import (
+ "context"
+ "maps"
+ "slices"
+ "sync"
+
+ "github.com/google/syzkaller/pkg/log"
+ "github.com/google/syzkaller/pkg/stats"
+)
+
+type reproManagerView interface {
+ runRepro(crash *Crash) *ReproResult // TODO: consider moving runRepro() to repro.go.
+ needRepro(crash *Crash) bool
+ resizeReproPool(size int)
+}
+
+type reproManager struct {
+ Done chan *ReproResult
+
+ statNumReproducing *stats.Val
+ statPending *stats.Val
+
+ onlyOnce bool
+ mgr reproManagerView
+ parallel chan struct{}
+ pingQueue chan struct{}
+ reproVMs int
+
+ mu sync.Mutex
+ queue []*Crash
+ reproducing map[string]bool
+}
+
+func newReproManager(mgr reproManagerView, reproVMs int, onlyOnce bool) *reproManager {
+ ret := &reproManager{
+ Done: make(chan *ReproResult, 10),
+
+ mgr: mgr,
+ onlyOnce: onlyOnce,
+ parallel: make(chan struct{}, reproVMs),
+ reproVMs: reproVMs,
+ reproducing: map[string]bool{},
+ pingQueue: make(chan struct{}, 1),
+ }
+ ret.statNumReproducing = stats.Create("reproducing", "Number of crashes being reproduced",
+ stats.Console, stats.NoGraph, func() int {
+ ret.mu.Lock()
+ defer ret.mu.Unlock()
+ return len(ret.reproducing)
+ })
+ ret.statPending = stats.Create("pending", "Number of pending repro tasks",
+ stats.Console, stats.NoGraph, func() int {
+ ret.mu.Lock()
+ defer ret.mu.Unlock()
+ return len(ret.queue)
+ })
+ return ret
+}
+
+// startReproduction() is assumed to be called only once.
+// The agument is the maximum number of VMs dedicated to the bug reproduction.
+func (m *reproManager) StartReproduction() {
+ log.Logf(1, "starting reproductions (max %d VMs)", m.reproVMs)
+
+ for count := 1; m.calculateReproVMs(count) <= m.reproVMs; count++ {
+ m.parallel <- struct{}{}
+ }
+}
+
+func (m *reproManager) calculateReproVMs(repros int) int {
+ // Let's allocate 1.33 VMs per a reproducer thread.
+ if m.reproVMs == 1 && repros == 1 {
+ // With one exception -- if we have only one VM, let's still do one repro.
+ return 1
+ }
+ return (repros*4 + 2) / 3
+}
+
+func (m *reproManager) CanReproMore() bool {
+ return len(m.parallel) != 0
+}
+
+func (m *reproManager) Reproducing() map[string]bool {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ return maps.Clone(m.reproducing)
+}
+
+func (m *reproManager) Enqueue(crash *Crash) {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ if m.onlyOnce {
+ // Try to reproduce each bug at most 1 time in this mode.
+ // Since we don't upload bugs/repros to dashboard, it likely won't have
+ // the reproducer even if we succeeded last time, and will repeatedly
+ // say it needs a repro.
+ for _, queued := range m.queue {
+ if queued.Title == crash.Title {
+ return
+ }
+ }
+ }
+ log.Logf(1, "scheduled a reproduction of '%v'", crash.Title)
+ m.queue = append(m.queue, crash)
+
+ // Ping the loop.
+ select {
+ case m.pingQueue <- struct{}{}:
+ default:
+ }
+}
+
+func (m *reproManager) popCrash() *Crash {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ for i, crash := range m.queue {
+ if m.reproducing[crash.Title] {
+ continue
+ }
+ m.queue = slices.Delete(m.queue, i, i+1)
+ return crash
+ }
+ return nil
+}
+
+func (m *reproManager) Loop(ctx context.Context) {
+ var wg sync.WaitGroup
+ defer wg.Wait()
+
+ for {
+ crash := m.popCrash()
+ for crash == nil {
+ select {
+ case <-m.pingQueue:
+ crash = m.popCrash()
+ case <-ctx.Done():
+ return
+ }
+ if !m.mgr.needRepro(crash) {
+ continue
+ }
+ }
+
+ // Now wait until we can schedule another runner.
+ select {
+ case <-m.parallel:
+ case <-ctx.Done():
+ return
+ }
+
+ m.mu.Lock()
+ m.reproducing[crash.Title] = true
+ m.adjustPoolSizeLocked()
+ m.mu.Unlock()
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+
+ m.handle(crash)
+
+ m.mu.Lock()
+ delete(m.reproducing, crash.Title)
+ m.adjustPoolSizeLocked()
+ m.mu.Unlock()
+
+ m.parallel <- struct{}{}
+ m.pingQueue <- struct{}{}
+ }()
+ }
+}
+
+func (m *reproManager) handle(crash *Crash) {
+ log.Logf(0, "start reproducing '%v'", crash.Title)
+
+ res := m.mgr.runRepro(crash)
+
+ crepro := false
+ title := ""
+ if res.repro != nil {
+ crepro = res.repro.CRepro
+ title = res.repro.Report.Title
+ }
+ log.Logf(0, "repro finished '%v', repro=%v crepro=%v desc='%v' hub=%v from_dashboard=%v",
+ res.report0.Title, res.repro != nil, crepro, title, res.fromHub, res.fromDashboard,
+ )
+ m.Done <- res
+}
+
+func (m *reproManager) adjustPoolSizeLocked() {
+ // Avoid the +-1 jitter by considering the repro queue size as well.
+
+ // We process same-titled crashes sequentially, so only count unique ones.
+ uniqueTitles := make(map[string]bool)
+ for _, crash := range m.queue {
+ uniqueTitles[crash.Title] = true
+ }
+
+ needRepros := len(m.reproducing) + len(uniqueTitles)
+ VMs := min(m.reproVMs, m.calculateReproVMs(needRepros))
+ m.mgr.resizeReproPool(VMs)
+}
diff --git a/syz-manager/repro_test.go b/syz-manager/repro_test.go
new file mode 100644
index 000000000..61c6dc4b4
--- /dev/null
+++ b/syz-manager/repro_test.go
@@ -0,0 +1,94 @@
+// Copyright 2024 syzkaller project authors. All rights reserved.
+// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
+
+package main
+
+import (
+ "context"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/google/syzkaller/pkg/report"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestReproManager(t *testing.T) {
+ mock := &reproMgrMock{
+ run: make(chan runCallback),
+ }
+ obj := newReproManager(mock, 3, false)
+
+ ctx, done := context.WithCancel(context.Background())
+ complete := make(chan struct{})
+ go func() {
+ obj.Loop(ctx)
+ close(complete)
+ }()
+
+ defer func() {
+ done()
+ <-complete
+ }()
+
+ // No reproductions until we've signaled to start.
+ assert.False(t, obj.CanReproMore())
+ obj.StartReproduction()
+
+ // No reproducers -- we can definitely take more.
+ assert.True(t, obj.CanReproMore())
+ obj.Enqueue(&Crash{Report: &report.Report{Title: "A"}})
+ called := <-mock.run
+ assert.Equal(t, "A", called.crash.Title)
+
+ // One reproducer is running -- we can take one more.
+ assert.True(t, obj.CanReproMore())
+ assert.EqualValues(t, 2, mock.reserved.Load())
+ obj.Enqueue(&Crash{Report: &report.Report{Title: "B"}})
+ called2 := <-mock.run
+ assert.Equal(t, "B", called2.crash.Title)
+
+ assert.False(t, obj.CanReproMore())
+ assert.Len(t, obj.Reproducing(), 2)
+ assert.EqualValues(t, 3, mock.reserved.Load())
+
+ // Pretend that reproducers have finished.
+ called.ret <- &ReproResult{report0: &report.Report{}}
+ called2.ret <- &ReproResult{report0: &report.Report{}}
+
+ // Wait until the number of reserved VMs goes to 0.
+ for i := 0; i < 100; i++ {
+ if mock.reserved.Load() == 0 {
+ assert.True(t, obj.CanReproMore())
+ return
+ }
+ time.Sleep(10 * time.Millisecond)
+ }
+ t.Fatal("reserved VMs must have dropped to 0")
+}
+
+type reproMgrMock struct {
+ reserved atomic.Int64
+ run chan runCallback
+}
+
+type runCallback struct {
+ crash *Crash
+ ret chan *ReproResult
+}
+
+func (m *reproMgrMock) runRepro(crash *Crash) *ReproResult {
+ retCh := make(chan *ReproResult)
+ m.run <- runCallback{crash: crash, ret: retCh}
+ ret := <-retCh
+ close(retCh)
+ return ret
+}
+
+func (m *reproMgrMock) needRepro(crash *Crash) bool {
+ return true
+}
+
+func (m *reproMgrMock) resizeReproPool(VMs int) {
+ m.reserved.Store(int64(VMs))
+}
diff --git a/syz-manager/stats.go b/syz-manager/stats.go
index cffa7e4c1..dced8fc87 100644
--- a/syz-manager/stats.go
+++ b/syz-manager/stats.go
@@ -13,19 +13,16 @@ import (
)
type Stats struct {
- statNumReproducing *stats.Val
- statCrashes *stats.Val
- statCrashTypes *stats.Val
- statSuppressed *stats.Val
- statUptime *stats.Val
- statFuzzingTime *stats.Val
- statAvgBootTime *stats.Val
- statCoverFiltered *stats.Val
+ statCrashes *stats.Val
+ statCrashTypes *stats.Val
+ statSuppressed *stats.Val
+ statUptime *stats.Val
+ statFuzzingTime *stats.Val
+ statAvgBootTime *stats.Val
+ statCoverFiltered *stats.Val
}
func (mgr *Manager) initStats() {
- mgr.statNumReproducing = stats.Create("reproducing", "Number of crashes being reproduced",
- stats.Console, stats.NoGraph)
mgr.statCrashes = stats.Create("crashes", "Total number of VM crashes",
stats.Simple, stats.Prometheus("syz_crash_total"))
mgr.statCrashTypes = stats.Create("crash types", "Number of unique crashes types",