diff options
| author | Aleksandr Nogikh <nogikh@google.com> | 2024-03-15 19:53:15 +0100 |
|---|---|---|
| committer | Aleksandr Nogikh <nogikh@google.com> | 2024-03-25 13:12:00 +0000 |
| commit | 409ee912f2c4f07e3064b4e6f4a83e1f812531d8 (patch) | |
| tree | ef7701845a10852597eb2fbf3c962d27f47ca161 /syz-manager | |
| parent | 5d5b1ae5147428cf089a616a3114af1add92068d (diff) | |
all: move fuzzer to the host
Instead of doing fuzzing in parallel in running VM, make all decisions
in the host syz-manager process.
Instantiate and keep a fuzzer.Fuzzer object in syz-manager and update
the RPC between syz-manager and syz-fuzzer to exchange exact programs to
execute and their resulting signal and coverage.
To optimize the networking traffic, exchange mostly only the difference
between the known max signal and the detected signal.
Diffstat (limited to 'syz-manager')
| -rw-r--r-- | syz-manager/http.go | 4 | ||||
| -rw-r--r-- | syz-manager/hub.go | 12 | ||||
| -rw-r--r-- | syz-manager/manager.go | 189 | ||||
| -rw-r--r-- | syz-manager/rpc.go | 309 | ||||
| -rw-r--r-- | syz-manager/stats.go | 14 |
5 files changed, 297 insertions, 231 deletions
diff --git a/syz-manager/http.go b/syz-manager/http.go index f38a7713b..bdac900f7 100644 --- a/syz-manager/http.go +++ b/syz-manager/http.go @@ -131,9 +131,10 @@ func (mgr *Manager) collectStats() []UIStat { {Name: "uptime", Value: fmt.Sprint(time.Since(mgr.startTime) / 1e9 * 1e9)}, {Name: "fuzzing", Value: fmt.Sprint(mgr.fuzzingTime / 60e9 * 60e9)}, {Name: "corpus", Value: fmt.Sprint(mgr.corpus.Stats().Progs), Link: "/corpus"}, - {Name: "triage queue", Value: fmt.Sprint(len(mgr.candidates))}, + {Name: "triage queue", Value: fmt.Sprint(mgr.stats.triageQueueLen.get())}, {Name: "signal", Value: fmt.Sprint(rawStats["signal"])}, {Name: "coverage", Value: fmt.Sprint(rawStats["coverage"]), Link: "/cover"}, + {Name: "fuzzer jobs", Value: fmt.Sprint(mgr.stats.fuzzerJobs.get())}, } if mgr.coverFilter != nil { stats = append(stats, UIStat{ @@ -147,6 +148,7 @@ func (mgr *Manager) collectStats() []UIStat { delete(rawStats, "signal") delete(rawStats, "coverage") delete(rawStats, "filtered coverage") + delete(rawStats, "fuzzer jobs") if mgr.checkResult != nil { stats = append(stats, UIStat{ Name: "syscalls", diff --git a/syz-manager/hub.go b/syz-manager/hub.go index d06a0cd0e..7a38310d5 100644 --- a/syz-manager/hub.go +++ b/syz-manager/hub.go @@ -9,6 +9,7 @@ import ( "time" "github.com/google/syzkaller/pkg/auth" + "github.com/google/syzkaller/pkg/fuzzer" "github.com/google/syzkaller/pkg/hash" "github.com/google/syzkaller/pkg/host" "github.com/google/syzkaller/pkg/log" @@ -73,7 +74,7 @@ type HubConnector struct { // HubManagerView restricts interface between HubConnector and Manager. type HubManagerView interface { getMinimizedCorpus() (corpus, repros [][]byte) - addNewCandidates(candidates []rpctype.Candidate) + addNewCandidates(candidates []fuzzer.Candidate) hubIsUnreachable() } @@ -213,9 +214,9 @@ func (hc *HubConnector) sync(hub *rpctype.RPCClient, corpus [][]byte) error { } func (hc *HubConnector) processProgs(inputs []rpctype.HubInput) (minimized, smashed, dropped int) { - candidates := make([]rpctype.Candidate, 0, len(inputs)) + candidates := make([]fuzzer.Candidate, 0, len(inputs)) for _, inp := range inputs { - _, disabled, bad := parseProgram(hc.target, hc.enabledCalls, inp.Prog) + p, disabled, bad := parseProgram(hc.target, hc.enabledCalls, inp.Prog) if bad != nil || disabled { log.Logf(0, "rejecting program from hub (bad=%v, disabled=%v):\n%s", bad, disabled, inp) @@ -229,8 +230,8 @@ func (hc *HubConnector) processProgs(inputs []rpctype.HubInput) (minimized, smas if smash { smashed++ } - candidates = append(candidates, rpctype.Candidate{ - Prog: inp.Prog, + candidates = append(candidates, fuzzer.Candidate{ + Prog: p, Minimized: min, Smashed: smash, }) @@ -283,7 +284,6 @@ func (hc *HubConnector) processRepros(repros [][]byte) int { typ = crash.MemoryLeak } hc.hubReproQueue <- &Crash{ - vmIndex: -1, fromHub: true, Report: &report.Report{ Title: "external repro", diff --git a/syz-manager/manager.go b/syz-manager/manager.go index 114e455d4..673deadc9 100644 --- a/syz-manager/manager.go +++ b/syz-manager/manager.go @@ -24,6 +24,7 @@ import ( "github.com/google/syzkaller/pkg/corpus" "github.com/google/syzkaller/pkg/csource" "github.com/google/syzkaller/pkg/db" + "github.com/google/syzkaller/pkg/fuzzer" "github.com/google/syzkaller/pkg/gce" "github.com/google/syzkaller/pkg/hash" "github.com/google/syzkaller/pkg/host" @@ -67,14 +68,15 @@ type Manager struct { fresh bool numFuzzing uint32 numReproducing uint32 + nextInstanceID atomic.Uint64 dash *dashapi.Dashboard mu sync.Mutex + fuzzer *fuzzer.Fuzzer phase int targetEnabledSyscalls map[*prog.Syscall]bool - candidates []rpctype.Candidate // untriaged inputs from corpus and hub disabledHashes map[string]struct{} seeds [][]byte newRepros [][]byte @@ -118,7 +120,7 @@ const ( const currentDBVersion = 4 type Crash struct { - vmIndex int + instanceName string fromHub bool // this crash was created based on a repro from syz-hub fromDashboard bool // .. or from dashboard *report.Report @@ -191,7 +193,7 @@ func RunManager(cfg *mgrconfig.Config) { mgr.initStats() // Initializes prometheus variables. mgr.initHTTP() // Creates HTTP server. mgr.collectUsedFiles() - go mgr.saveCorpus(corpusUpdates) + go mgr.corpusInputHandler(corpusUpdates) // Create RPC server for fuzzers. mgr.serv, err = startRPCServer(mgr) @@ -225,13 +227,13 @@ func RunManager(cfg *mgrconfig.Config) { continue } mgr.fuzzingTime += diff * time.Duration(atomic.LoadUint32(&mgr.numFuzzing)) + mgr.mu.Unlock() executed := mgr.stats.execTotal.get() crashes := mgr.stats.crashes.get() corpusCover := mgr.stats.corpusCover.get() corpusSignal := mgr.stats.corpusSignal.get() maxSignal := mgr.stats.maxSignal.get() - triageQLen := len(mgr.candidates) - mgr.mu.Unlock() + triageQLen := mgr.stats.triageQueueLen.get() numReproducing := atomic.LoadUint32(&mgr.numReproducing) numFuzzing := atomic.LoadUint32(&mgr.numFuzzing) @@ -279,7 +281,7 @@ func (mgr *Manager) initBench() { vals["corpus"] = uint64(stat.Progs) vals["uptime"] = uint64(time.Since(mgr.firstConnect)) / 1e9 vals["fuzzing"] = uint64(mgr.fuzzingTime) / 1e9 - vals["candidates"] = uint64(len(mgr.candidates)) + vals["candidates"] = uint64(mgr.fuzzer.Stats().Candidates) mgr.mu.Unlock() data, err := json.MarshalIndent(vals, "", " ") @@ -633,21 +635,29 @@ func (mgr *Manager) loadCorpus() { fallthrough case currentDBVersion: } + var candidates []fuzzer.Candidate broken := 0 for key, rec := range mgr.corpusDB.Records { - if !mgr.loadProg(rec.Val, minimized, smashed) { + drop, item := mgr.loadProg(rec.Val, minimized, smashed) + if drop { mgr.corpusDB.Delete(key) broken++ } + if item != nil { + candidates = append(candidates, *item) + } } mgr.fresh = len(mgr.corpusDB.Records) == 0 - corpusSize := len(mgr.candidates) + corpusSize := len(candidates) log.Logf(0, "%-24v: %v (deleted %v broken)", "corpus", corpusSize, broken) for _, seed := range mgr.seeds { - mgr.loadProg(seed, true, false) + _, item := mgr.loadProg(seed, true, false) + if item != nil { + candidates = append(candidates, *item) + } } - log.Logf(0, "%-24v: %v/%v", "seeds", len(mgr.candidates)-corpusSize, len(mgr.seeds)) + log.Logf(0, "%-24v: %v/%v", "seeds", len(candidates)-corpusSize, len(candidates)) mgr.seeds = nil // We duplicate all inputs in the corpus and shuffle the second part. @@ -655,8 +665,8 @@ func (mgr *Manager) loadCorpus() { // in such case it will also lost all cached candidates. Or, the input can be somewhat flaky // and doesn't give the coverage on first try. So we give each input the second chance. // Shuffling should alleviate deterministically losing the same inputs on fuzzer crashing. - mgr.candidates = append(mgr.candidates, mgr.candidates...) - shuffle := mgr.candidates[len(mgr.candidates)/2:] + candidates = append(candidates, candidates...) + shuffle := candidates[len(candidates)/2:] rand.Shuffle(len(shuffle), func(i, j int) { shuffle[i], shuffle[j] = shuffle[j], shuffle[i] }) @@ -664,12 +674,14 @@ func (mgr *Manager) loadCorpus() { panic(fmt.Sprintf("loadCorpus: bad phase %v", mgr.phase)) } mgr.phase = phaseLoadedCorpus + mgr.fuzzer.AddCandidates(candidates) } -func (mgr *Manager) loadProg(data []byte, minimized, smashed bool) bool { - _, disabled, bad := parseProgram(mgr.target, mgr.targetEnabledSyscalls, data) +// Returns (delete item from the corpus, a fuzzer.Candidate object). +func (mgr *Manager) loadProg(data []byte, minimized, smashed bool) (drop bool, candidate *fuzzer.Candidate) { + p, disabled, bad := parseProgram(mgr.target, mgr.targetEnabledSyscalls, data) if bad != nil { - return false + return true, nil } if disabled { if mgr.cfg.PreserveCorpus { @@ -682,25 +694,24 @@ func (mgr *Manager) loadProg(data []byte, minimized, smashed bool) bool { // minimize what remains from the prog. The original prog will be // deleted from the corpus. leftover := programLeftover(mgr.target, mgr.targetEnabledSyscalls, data) - if len(leftover) > 0 { - mgr.candidates = append(mgr.candidates, rpctype.Candidate{ + if leftover != nil { + candidate = &fuzzer.Candidate{ Prog: leftover, Minimized: false, Smashed: smashed, - }) + } } } - return true + return false, candidate } - mgr.candidates = append(mgr.candidates, rpctype.Candidate{ - Prog: data, + return false, &fuzzer.Candidate{ + Prog: p, Minimized: minimized, Smashed: smashed, - }) - return true + } } -func programLeftover(target *prog.Target, enabled map[*prog.Syscall]bool, data []byte) []byte { +func programLeftover(target *prog.Target, enabled map[*prog.Syscall]bool, data []byte) *prog.Prog { p, err := target.Deserialize(data, prog.NonStrict) if err != nil { panic(fmt.Sprintf("subsequent deserialization failed: %s", data)) @@ -713,7 +724,7 @@ func programLeftover(target *prog.Target, enabled map[*prog.Syscall]bool, data [ } i++ } - return p.Serialize() + return p } func parseProgram(target *prog.Target, enabled map[*prog.Syscall]bool, data []byte) ( @@ -741,7 +752,8 @@ func parseProgram(target *prog.Target, enabled map[*prog.Syscall]bool, data []by func (mgr *Manager) runInstance(index int) (*Crash, error) { mgr.checkUsedFiles() - instanceName := fmt.Sprintf("vm-%d", index) + // Use unique instance names to keep name collisions in case of untimely RPC messages. + instanceName := fmt.Sprintf("vm-%d", mgr.nextInstanceID.Add(1)) rep, vmInfo, err := mgr.runInstanceInner(index, instanceName) @@ -759,9 +771,9 @@ func (mgr *Manager) runInstance(index int) (*Crash, error) { return nil, nil } crash := &Crash{ - vmIndex: index, - Report: rep, - machineInfo: machineInfo, + instanceName: instanceName, + Report: rep, + machineInfo: machineInfo, } return crash, nil } @@ -884,7 +896,7 @@ func (mgr *Manager) saveCrash(crash *Crash) bool { if crash.Suppressed { flags += " [suppressed]" } - log.Logf(0, "vm-%v: crash: %v%v", crash.vmIndex, crash.Title, flags) + log.Logf(0, "%s: crash: %v%v", crash.instanceName, crash.Title, flags) if crash.Suppressed { // Collect all of them into a single bucket so that it's possible to control and assess them, @@ -1202,8 +1214,11 @@ func fullReproLog(stats *repro.Stats) []byte { stats.SimplifyProgTime, stats.ExtractCTime, stats.SimplifyCTime, stats.Log)) } -func (mgr *Manager) saveCorpus(updates <-chan corpus.NewItemEvent) { +func (mgr *Manager) corpusInputHandler(updates <-chan corpus.NewItemEvent) { for update := range updates { + mgr.stats.newInputs.inc() + mgr.serv.updateFilteredCover(update.NewCover) + if update.Exists { // We only save new progs into the corpus.db file. continue @@ -1231,16 +1246,16 @@ func (mgr *Manager) getMinimizedCorpus() (corpus, repros [][]byte) { return } -func (mgr *Manager) addNewCandidates(candidates []rpctype.Candidate) { +func (mgr *Manager) addNewCandidates(candidates []fuzzer.Candidate) { mgr.mu.Lock() defer mgr.mu.Unlock() + if mgr.cfg.Experimental.ResetAccState { // Don't accept new candidates -- the execution is already very slow, // syz-hub will just overwhelm us. return } - - mgr.candidates = append(mgr.candidates, candidates...) + mgr.fuzzer.AddCandidates(candidates) if mgr.phase == phaseTriagedCorpus { mgr.phase = phaseQueriedHub } @@ -1332,16 +1347,11 @@ func (mgr *Manager) collectSyscallInfo() map[string]*corpus.CallCov { } func (mgr *Manager) fuzzerConnect(modules []host.KernelModule) ( - []rpctype.Input, BugFrames, map[uint32]uint32, map[uint32]uint32, error) { + BugFrames, map[uint32]uint32, map[uint32]uint32, error) { mgr.mu.Lock() defer mgr.mu.Unlock() mgr.minimizeCorpusUnlocked() - items := mgr.corpus.Items() - corpus := make([]rpctype.Input, 0, len(items)) - for _, inp := range items { - corpus = append(corpus, inp.RPCInputShort()) - } frames := BugFrames{ memoryLeaks: make([]string, 0, len(mgr.memoryLeakFrames)), dataRaces: make([]string, 0, len(mgr.dataRaceFrames)), @@ -1361,54 +1371,94 @@ func (mgr *Manager) fuzzerConnect(modules []host.KernelModule) ( } mgr.modulesInitialized = true } - return corpus, frames, mgr.coverFilter, mgr.execCoverFilter, nil + return frames, mgr.coverFilter, mgr.execCoverFilter, nil } func (mgr *Manager) machineChecked(a *rpctype.CheckArgs, enabledSyscalls map[*prog.Syscall]bool) { mgr.mu.Lock() defer mgr.mu.Unlock() + if mgr.checkResult != nil { + panic("machineChecked() called twice") + } + mgr.checkResult = a mgr.targetEnabledSyscalls = enabledSyscalls mgr.target.UpdateGlobs(a.GlobFiles) - mgr.loadCorpus() mgr.firstConnect = time.Now() + + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + calls := make(map[*prog.Syscall]bool) + for _, id := range a.EnabledCalls[mgr.cfg.Sandbox] { + calls[mgr.target.Syscalls[id]] = true + } + mgr.fuzzer = fuzzer.NewFuzzer(context.Background(), &fuzzer.Config{ + Corpus: mgr.corpus, + Coverage: mgr.cfg.Cover, + FaultInjection: a.Features[host.FeatureFault].Enabled, + Comparisons: a.Features[host.FeatureComparisons].Enabled, + Collide: true, + EnabledCalls: calls, + NoMutateCalls: mgr.cfg.NoMutateCalls, + FetchRawCover: mgr.cfg.RawCover, + Logf: func(level int, msg string, args ...interface{}) { + if level != 0 { + return + } + log.Logf(level, msg, args...) + }, + NewInputFilter: func(input *corpus.NewInput) bool { + mgr.mu.Lock() + defer mgr.mu.Unlock() + return !mgr.saturatedCalls[input.StringCall()] + }, + }, rnd, mgr.target) + + mgr.loadCorpus() + go mgr.fuzzerLoop() } -func (mgr *Manager) newInput(inp corpus.NewInput) bool { +func (mgr *Manager) getFuzzer() *fuzzer.Fuzzer { mgr.mu.Lock() defer mgr.mu.Unlock() - if mgr.saturatedCalls[inp.StringCall()] { - // TODO: move this logic to pkg/corpus or pkg/fuzzer? - return false - } - mgr.corpus.Save(inp) - return true + return mgr.fuzzer } -func (mgr *Manager) candidateBatch(size int) []rpctype.Candidate { - mgr.mu.Lock() - defer mgr.mu.Unlock() - var res []rpctype.Candidate - for i := 0; i < size && len(mgr.candidates) > 0; i++ { - last := len(mgr.candidates) - 1 - res = append(res, mgr.candidates[last]) - mgr.candidates[last] = rpctype.Candidate{} - mgr.candidates = mgr.candidates[:last] - } - if len(mgr.candidates) == 0 { - mgr.candidates = nil - if mgr.phase == phaseLoadedCorpus { - if mgr.cfg.HubClient != "" { - mgr.phase = phaseTriagedCorpus - go mgr.hubSyncLoop(pickGetter(mgr.cfg.HubKey)) - } else { +func (mgr *Manager) fuzzerLoop() { + for { + time.Sleep(time.Second / 2) + + // Distribute new max signal over all instances. + newSignal := mgr.fuzzer.Cover.GrabNewSignal() + log.Logf(2, "distributing %d new signal", len(newSignal)) + mgr.serv.distributeMaxSignal(newSignal) + + // Collect statistics. + fuzzerStats := mgr.fuzzer.Stats() + mgr.stats.setNamed(fuzzerStats.Named) + mgr.stats.corpusCover.set(fuzzerStats.Cover) + mgr.stats.corpusSignal.set(fuzzerStats.Signal) + mgr.stats.maxSignal.set(fuzzerStats.MaxSignal) + mgr.stats.triageQueueLen.set(fuzzerStats.Candidates) + mgr.stats.fuzzerJobs.set(fuzzerStats.RunningJobs) + mgr.stats.rpcTraffic.add(int(mgr.serv.server.TotalBytes.Swap(0))) + + // Update the state machine. + if fuzzerStats.Candidates == 0 { + mgr.mu.Lock() + if mgr.phase == phaseLoadedCorpus { + mgr.fuzzer.EnableOutOfQueue() + if mgr.cfg.HubClient != "" { + mgr.phase = phaseTriagedCorpus + go mgr.hubSyncLoop(pickGetter(mgr.cfg.HubKey)) + } else { + mgr.phase = phaseTriagedHub + } + } else if mgr.phase == phaseQueriedHub { mgr.phase = phaseTriagedHub } - } else if mgr.phase == phaseQueriedHub { - mgr.phase = phaseTriagedHub + mgr.mu.Unlock() } } - return res } func (mgr *Manager) hubIsUnreachable() { @@ -1535,7 +1585,6 @@ func (mgr *Manager) dashboardReproTasks() { } if len(resp.CrashLog) > 0 { mgr.externalReproQueue <- &Crash{ - vmIndex: -1, fromDashboard: true, Report: &report.Report{ Title: resp.Title, diff --git a/syz-manager/rpc.go b/syz-manager/rpc.go index 700bc6da6..472c406d2 100644 --- a/syz-manager/rpc.go +++ b/syz-manager/rpc.go @@ -5,13 +5,12 @@ package main import ( "fmt" - "math/rand" "net" "sync" - "time" + "sync/atomic" - "github.com/google/syzkaller/pkg/corpus" "github.com/google/syzkaller/pkg/cover" + "github.com/google/syzkaller/pkg/fuzzer" "github.com/google/syzkaller/pkg/host" "github.com/google/syzkaller/pkg/log" "github.com/google/syzkaller/pkg/mgrconfig" @@ -23,34 +22,32 @@ import ( type RPCServer struct { mgr RPCManagerView cfg *mgrconfig.Config + server *rpctype.RPCServer modules []host.KernelModule port int targetEnabledSyscalls map[*prog.Syscall]bool coverFilter map[uint32]uint32 stats *Stats - batchSize int canonicalModules *cover.Canonicalizer mu sync.Mutex - fuzzers map[string]*Fuzzer + runners sync.Map // Instead of map[string]*Runner. checkResult *rpctype.CheckArgs - // TODO: we don't really need these anymore, but there's not much sense - // in rewriting the code that uses them -- most of that code will be dropped - // once we move pkg/fuzzer to the host. - maxSignal signal.Signal - corpusSignal signal.Signal - corpusCover cover.Cover - rnd *rand.Rand checkFailures int } -type Fuzzer struct { - name string - inputs []rpctype.Input - newMaxSignal signal.Signal - machineInfo []byte - instModules *cover.CanonicalizerInstance +type Runner struct { + name string + + machineInfo []byte + instModules *cover.CanonicalizerInstance + + // The mutex protects newMaxSignal and requests. + mu sync.Mutex + newMaxSignal signal.Signal + nextRequestID atomic.Int64 + requests map[int64]*fuzzer.Request } type BugFrames struct { @@ -60,24 +57,16 @@ type BugFrames struct { // RPCManagerView restricts interface between RPCServer and Manager. type RPCManagerView interface { - fuzzerConnect([]host.KernelModule) ( - []rpctype.Input, BugFrames, map[uint32]uint32, map[uint32]uint32, error) + fuzzerConnect([]host.KernelModule) (BugFrames, map[uint32]uint32, map[uint32]uint32, error) machineChecked(result *rpctype.CheckArgs, enabledSyscalls map[*prog.Syscall]bool) - newInput(inp corpus.NewInput) bool - candidateBatch(size int) []rpctype.Candidate + getFuzzer() *fuzzer.Fuzzer } func startRPCServer(mgr *Manager) (*RPCServer, error) { serv := &RPCServer{ - mgr: mgr, - cfg: mgr.cfg, - stats: mgr.stats, - fuzzers: make(map[string]*Fuzzer), - rnd: rand.New(rand.NewSource(time.Now().UnixNano())), - } - serv.batchSize = 5 - if serv.batchSize < mgr.cfg.Procs { - serv.batchSize = mgr.cfg.Procs + mgr: mgr, + cfg: mgr.cfg, + stats: mgr.stats, } s, err := rpctype.NewRPCServer(mgr.cfg.RPC, "Manager", serv) if err != nil { @@ -85,13 +74,8 @@ func startRPCServer(mgr *Manager) (*RPCServer, error) { } log.Logf(0, "serving rpc on tcp://%v", s.Addr()) serv.port = s.Addr().(*net.TCPAddr).Port + serv.server = s go s.Serve() - go func() { - for { - time.Sleep(time.Second) - mgr.stats.rpcTraffic.add(int(s.TotalBytes.Swap(0))) - } - }() return serv, nil } @@ -99,37 +83,47 @@ func (serv *RPCServer) Connect(a *rpctype.ConnectArgs, r *rpctype.ConnectRes) er log.Logf(1, "fuzzer %v connected", a.Name) serv.stats.vmRestarts.inc() + serv.mu.Lock() if serv.canonicalModules == nil { serv.canonicalModules = cover.NewCanonicalizer(a.Modules, serv.cfg.Cover) serv.modules = a.Modules } - corpus, bugFrames, coverFilter, execCoverFilter, err := serv.mgr.fuzzerConnect(serv.modules) + serv.mu.Unlock() + + bugFrames, coverFilter, execCoverFilter, err := serv.mgr.fuzzerConnect(serv.modules) if err != nil { return err } - serv.coverFilter = coverFilter serv.mu.Lock() defer serv.mu.Unlock() - f := &Fuzzer{ + serv.coverFilter = coverFilter + + runner := &Runner{ name: a.Name, machineInfo: a.MachineInfo, instModules: serv.canonicalModules.NewInstance(a.Modules), + requests: make(map[int64]*fuzzer.Request), + } + if _, loaded := serv.runners.LoadOrStore(a.Name, runner); loaded { + return fmt.Errorf("duplicate connection from %s", a.Name) } - serv.fuzzers[a.Name] = f r.MemoryLeakFrames = bugFrames.memoryLeaks r.DataRaceFrames = bugFrames.dataRaces - instCoverFilter := f.instModules.DecanonicalizeFilter(execCoverFilter) + instCoverFilter := runner.instModules.DecanonicalizeFilter(execCoverFilter) r.CoverFilterBitmap = createCoverageBitmap(serv.cfg.SysTarget, instCoverFilter) r.EnabledCalls = serv.cfg.Syscalls - r.NoMutateCalls = serv.cfg.NoMutateCalls r.GitRevision = prog.GitRevision r.TargetRevision = serv.cfg.Target.Revision r.CheckResult = serv.checkResult - f.inputs = corpus - f.newMaxSignal = serv.maxSignal.Copy() + + if fuzzer := serv.mgr.getFuzzer(); fuzzer != nil { + // A Fuzzer object is created after the first Check() call. + // If there was none, there would be no collected max signal either. + runner.newMaxSignal = fuzzer.Cover.CopyMaxSignal() + } return nil } @@ -177,140 +171,147 @@ func (serv *RPCServer) Check(a *rpctype.CheckArgs, r *int) error { return nil } -func (serv *RPCServer) NewInput(a *rpctype.NewInputArgs, r *int) error { - p, disabled, bad := parseProgram(serv.cfg.Target, serv.targetEnabledSyscalls, a.Input.Prog) - if bad != nil || disabled { - log.Errorf("rejecting program from fuzzer (bad=%v, disabled=%v):\n%s", bad, disabled, a.Input.Prog) +func (serv *RPCServer) ExchangeInfo(a *rpctype.ExchangeInfoRequest, r *rpctype.ExchangeInfoReply) error { + var runner *Runner + if val, _ := serv.runners.Load(a.Name); val != nil { + runner = val.(*Runner) + } else { + // There might be a parallel shutdownInstance(). + // Ignore the request then. return nil } - serv.mu.Lock() - defer serv.mu.Unlock() - f := serv.fuzzers[a.Name] - // Note: f may be nil if we called shutdownInstance, - // but this request is already in-flight. - if f != nil { - a.Cover, a.Signal = f.instModules.Canonicalize(a.Cover, a.Signal) + fuzzer := serv.mgr.getFuzzer() + if fuzzer == nil { + // ExchangeInfo calls follow MachineCheck, so the fuzzer must have been initialized. + panic("exchange info call with nil fuzzer") } - inputSignal := a.Signal.Deserialize() - inp := corpus.NewInput{ - Prog: p, - Call: a.Call, - Signal: inputSignal, - Cover: a.Cover, + // First query new inputs and only then post results. + // It should foster a more even distribution of executions + // across all VMs. + for i := 0; i < a.NeedProgs; i++ { + inp := fuzzer.NextInput() + r.Requests = append(r.Requests, runner.newRequest(inp)) } - log.Logf(4, "new input from %v for syscall %v (signal=%v, cover=%v)", - a.Name, inp.StringCall(), inputSignal.Len(), len(a.Cover)) - if serv.corpusSignal.Diff(inputSignal).Empty() { - return nil - } - if !serv.mgr.newInput(inp) { - return nil + for _, result := range a.Results { + runner.doneRequest(result, fuzzer) } - diff := serv.corpusCover.MergeDiff(a.Cover) - serv.stats.corpusCover.set(len(serv.corpusCover)) - if len(diff) != 0 && serv.coverFilter != nil { - // Note: ReportGenerator is already initialized if coverFilter is enabled. - rg, err := getReportGenerator(serv.cfg, serv.modules) - if err != nil { - return err - } - filtered := 0 - for _, pc := range diff { - if serv.coverFilter[uint32(rg.RestorePC(pc))] != 0 { - filtered++ - } - } - serv.stats.corpusCoverFiltered.add(filtered) - } - serv.stats.newInputs.inc() + serv.stats.mergeNamed(a.StatsDelta) - serv.corpusSignal.Merge(inputSignal) - serv.stats.corpusSignal.set(serv.corpusSignal.Len()) + runner.mu.Lock() + // Let's transfer new max signal in portions. + const transferMaxSignal = 500000 + maxSignalDiff := runner.newMaxSignal.Split(transferMaxSignal) + runner.mu.Unlock() - a.Input.Cover = nil // Don't send coverage back to all fuzzers. - a.Input.RawCover = nil - for _, other := range serv.fuzzers { - if other == f { - continue - } - other.inputs = append(other.inputs, a.Input) - } - return nil -} + r.NewMaxSignal = runner.instModules.Decanonicalize(maxSignalDiff.ToRaw()) -func (serv *RPCServer) Poll(a *rpctype.PollArgs, r *rpctype.PollRes) error { - serv.stats.mergeNamed(a.Stats) + log.Logf(2, "exchange with %s: %d done, %d new requests, %d new max signal", + a.Name, len(a.Results), len(r.Requests), len(r.NewMaxSignal)) - serv.mu.Lock() - defer serv.mu.Unlock() + return nil +} - f := serv.fuzzers[a.Name] - if f == nil { - // This is possible if we called shutdownInstance, - // but already have a pending request from this instance in-flight. - log.Logf(1, "poll: fuzzer %v is not connected", a.Name) +func (serv *RPCServer) updateFilteredCover(pcs []uint32) error { + if len(pcs) == 0 || serv.coverFilter == nil { return nil } - newMaxSignal := serv.maxSignal.Diff(a.MaxSignal.Deserialize()) - if !newMaxSignal.Empty() { - serv.maxSignal.Merge(newMaxSignal) - serv.stats.maxSignal.set(len(serv.maxSignal)) - for _, f1 := range serv.fuzzers { - if f1 == f { - continue - } - f1.newMaxSignal.Merge(newMaxSignal) - } - } - r.MaxSignal = f.newMaxSignal.Split(2000).Serialize() - if a.NeedCandidates { - r.Candidates = serv.mgr.candidateBatch(serv.batchSize) + // Note: ReportGenerator is already initialized if coverFilter is enabled. + rg, err := getReportGenerator(serv.cfg, serv.modules) + if err != nil { + return err } - if len(r.Candidates) == 0 { - batchSize := serv.batchSize - // When the fuzzer starts, it pumps the whole corpus. - // If we do it using the final batchSize, it can be very slow - // (batch of size 6 can take more than 10 mins for 50K corpus and slow kernel). - // So use a larger batch initially (we use no stats as approximation of initial pump). - const initialBatch = 50 - if len(a.Stats) == 0 && batchSize < initialBatch { - batchSize = initialBatch - } - for i := 0; i < batchSize && len(f.inputs) > 0; i++ { - last := len(f.inputs) - 1 - r.NewInputs = append(r.NewInputs, f.inputs[last]) - f.inputs[last] = rpctype.Input{} - f.inputs = f.inputs[:last] + filtered := 0 + for _, pc := range pcs { + if serv.coverFilter[uint32(rg.RestorePC(pc))] != 0 { + filtered++ } - if len(f.inputs) == 0 { - f.inputs = nil - } - } - for _, inp := range r.NewInputs { - inp.Cover, inp.Signal = f.instModules.Decanonicalize(inp.Cover, inp.Signal) } - log.Logf(4, "poll from %v: candidates=%v inputs=%v maxsignal=%v", - a.Name, len(r.Candidates), len(r.NewInputs), len(r.MaxSignal.Elems)) + serv.stats.corpusCoverFiltered.add(filtered) return nil } func (serv *RPCServer) shutdownInstance(name string) []byte { - serv.mu.Lock() - defer serv.mu.Unlock() - - fuzzer := serv.fuzzers[name] - if fuzzer == nil { + var runner *Runner + if val, _ := serv.runners.LoadAndDelete(name); val != nil { + runner = val.(*Runner) + } else { return nil } - delete(serv.fuzzers, name) - return fuzzer.machineInfo + + runner.mu.Lock() + if runner.requests == nil { + // We are supposed to invoke this code only once. + panic("Runner.requests is already nil") + } + oldRequests := runner.requests + runner.requests = nil + runner.mu.Unlock() + + // If the object does not exist, there would be no oldRequests either. + fuzzerObj := serv.mgr.getFuzzer() + for _, req := range oldRequests { + // The VM likely crashed, so let's tell pkg/fuzzer to abort the affected jobs. + // TODO: distinguish between real VM crashes and regular VM restarts? + fuzzerObj.Done(req, &fuzzer.Result{Stop: true}) + } + return runner.machineInfo } -func (serv *RPCServer) LogMessage(m *rpctype.LogMessageReq, r *int) error { - log.Logf(m.Level, "%s: %s", m.Name, m.Message) - return nil +func (serv *RPCServer) distributeMaxSignal(delta signal.Signal) { + serv.runners.Range(func(key, value any) bool { + runner := value.(*Runner) + runner.mu.Lock() + defer runner.mu.Unlock() + runner.newMaxSignal.Merge(delta) + return true + }) +} + +func (runner *Runner) doneRequest(resp rpctype.ExecutionResult, fuzzerObj *fuzzer.Fuzzer) { + runner.mu.Lock() + req, ok := runner.requests[resp.ID] + if ok { + delete(runner.requests, resp.ID) + } + runner.mu.Unlock() + if !ok { + // There may be a concurrent shutdownInstance() call. + return + } + info := &resp.Info + for i := 0; i < len(info.Calls); i++ { + call := &info.Calls[i] + call.Cover = runner.instModules.Canonicalize(call.Cover) + call.Signal = runner.instModules.Canonicalize(call.Signal) + } + info.Extra.Cover = runner.instModules.Canonicalize(info.Extra.Cover) + info.Extra.Signal = runner.instModules.Canonicalize(info.Extra.Signal) + fuzzerObj.Done(req, &fuzzer.Result{Info: info}) +} + +func (runner *Runner) newRequest(req *fuzzer.Request) rpctype.ExecutionRequest { + var signalFilter signal.Signal + if req.SignalFilter != nil { + newRawSignal := runner.instModules.Decanonicalize(req.SignalFilter.ToRaw()) + // We don't care about specific priorities here. + signalFilter = signal.FromRaw(newRawSignal, 0) + } + id := runner.nextRequestID.Add(1) + runner.mu.Lock() + if runner.requests != nil { + runner.requests[id] = req + } + runner.mu.Unlock() + return rpctype.ExecutionRequest{ + ID: id, + ProgData: req.Prog.Serialize(), + NeedCover: req.NeedCover, + NeedSignal: req.NeedSignal, + SignalFilter: signalFilter, + NeedHints: req.NeedHints, + } } diff --git a/syz-manager/stats.go b/syz-manager/stats.go index 35770c527..d85a1a78b 100644 --- a/syz-manager/stats.go +++ b/syz-manager/stats.go @@ -32,6 +32,8 @@ type Stats struct { corpusCoverFiltered Stat corpusSignal Stat maxSignal Stat + triageQueueLen Stat + fuzzerJobs Stat mu sync.Mutex namedStats map[string]uint64 @@ -73,6 +75,7 @@ func (stats *Stats) all() map[string]uint64 { "signal": stats.corpusSignal.get(), "max signal": stats.maxSignal.get(), "rpc traffic (MB)": stats.rpcTraffic.get() / 1e6, + "fuzzer jobs": stats.fuzzerJobs.get(), } if stats.haveHub { m["hub: send prog add"] = stats.hubSendProgAdd.get() @@ -107,6 +110,17 @@ func (stats *Stats) mergeNamed(named map[string]uint64) { } } +func (stats *Stats) setNamed(named map[string]uint64) { + stats.mu.Lock() + defer stats.mu.Unlock() + if stats.namedStats == nil { + stats.namedStats = make(map[string]uint64) + } + for k, v := range named { + stats.namedStats[k] = v + } +} + func (s *Stat) get() uint64 { return atomic.LoadUint64((*uint64)(s)) } |
