From 7e9780e93983e03547322aab489429ae4a7d2fa3 Mon Sep 17 00:00:00 2001 From: Dmitry Vyukov Date: Mon, 15 Apr 2024 14:54:59 +0200 Subject: syz-manager, syz-fuzzer: send RPC notifications about executing programs Instead of printing full program from the fuzzer, send a short notification with program ID to the manager and let manager emit the program into the log. This significnatly reduces amount of communication and makes it possible to not send text programs to the fuzzer at all. --- pkg/rpctype/rpc.go | 4 ++ pkg/rpctype/rpctype.go | 17 +++++- syz-fuzzer/fuzzer.go | 24 +++++--- syz-fuzzer/proc.go | 25 +++------ syz-manager/manager.go | 10 +++- syz-manager/rpc.go | 147 ++++++++++++++++++++++++++++++++++++------------- syz-manager/stats.go | 3 - 7 files changed, 159 insertions(+), 71 deletions(-) diff --git a/pkg/rpctype/rpc.go b/pkg/rpctype/rpc.go index 39a299653..73e4512c4 100644 --- a/pkg/rpctype/rpc.go +++ b/pkg/rpctype/rpc.go @@ -110,6 +110,10 @@ func (cli *RPCClient) Call(method string, args, reply interface{}) error { return cli.c.Call(method, args, reply) } +func (cli *RPCClient) AsyncCall(method string, args interface{}) { + cli.c.Go(method, args, nil, nil) +} + func (cli *RPCClient) Close() { cli.c.Close() } diff --git a/pkg/rpctype/rpctype.go b/pkg/rpctype/rpctype.go index 7e15ba69c..07a217e05 100644 --- a/pkg/rpctype/rpctype.go +++ b/pkg/rpctype/rpctype.go @@ -37,8 +37,10 @@ type ExecutionRequest struct { // ExecutionResult is sent after ExecutionRequest is completed. type ExecutionResult struct { - ID int64 - Info ipc.ProgInfo + ID int64 + ProcID int + Try int + Info ipc.ProgInfo } // ExchangeInfoRequest is periodically sent by syz-fuzzer to syz-manager. @@ -57,6 +59,17 @@ type ExchangeInfoReply struct { DropMaxSignal []uint32 } +// ExecutingRequest is notification from the fuzzer that it started executing +// the program ProgID. We want this request to be as small and as fast as possible +// b/c we want it to reach manager (or at least leave the VM) before it crashes +// executing this program. +type ExecutingRequest struct { + Name string + ID int64 + ProcID int + Try int +} + // TODO: merge ExecutionRequest and ExecTask. type ExecTask struct { Prog []byte diff --git a/syz-fuzzer/fuzzer.go b/syz-fuzzer/fuzzer.go index eb44ff2ae..7b4fa4217 100644 --- a/syz-fuzzer/fuzzer.go +++ b/syz-fuzzer/fuzzer.go @@ -39,10 +39,7 @@ type FuzzerTool struct { triagedCandidates uint32 timeouts targets.Timeouts - logMu sync.Mutex - bufferTooSmall atomic.Uint64 - execRetries atomic.Uint64 noExecRequests atomic.Uint64 noExecDuration atomic.Uint64 resetAccState bool @@ -57,7 +54,9 @@ type FuzzerTool struct { // to the communication thread. type executionResult struct { rpctype.ExecutionRequest - info *ipc.ProgInfo + procID int + try int + info *ipc.ProgInfo } // executionRequest offloads prog deseralization to another thread. @@ -322,6 +321,15 @@ func (tool *FuzzerTool) filterDataRaceFrames(frames []string) { log.Logf(0, "%s", output) } +func (tool *FuzzerTool) startExecutingCall(progID int64, pid, try int) { + tool.manager.AsyncCall("Manager.StartExecuting", &rpctype.ExecutingRequest{ + Name: tool.name, + ID: progID, + ProcID: pid, + Try: try, + }) +} + func (tool *FuzzerTool) exchangeDataCall(needProgs int, results []executionResult, latency time.Duration) time.Duration { a := &rpctype.ExchangeInfoRequest{ @@ -378,7 +386,11 @@ func (tool *FuzzerTool) exchangeDataWorker() { } func (tool *FuzzerTool) convertExecutionResult(res executionResult) rpctype.ExecutionResult { - ret := rpctype.ExecutionResult{ID: res.ID} + ret := rpctype.ExecutionResult{ + ID: res.ID, + ProcID: res.procID, + Try: res.try, + } if res.info != nil { if res.NeedSignal == rpctype.NewSignal { tool.diffMaxSignal(res.info, res.SignalFilter, res.SignalFilterCall) @@ -391,11 +403,9 @@ func (tool *FuzzerTool) convertExecutionResult(res executionResult) rpctype.Exec func (tool *FuzzerTool) grabStats() map[string]uint64 { stats := map[string]uint64{} for _, proc := range tool.procs { - stats["exec total"] += atomic.SwapUint64(&proc.env.StatExecs, 0) stats["executor restarts"] += atomic.SwapUint64(&proc.env.StatRestarts, 0) } stats["buffer too small"] = tool.bufferTooSmall.Swap(0) - stats["exec retries"] = tool.execRetries.Swap(0) stats["no exec requests"] = tool.noExecRequests.Swap(0) stats["no exec duration"] = tool.noExecDuration.Swap(0) return stats diff --git a/syz-fuzzer/proc.go b/syz-fuzzer/proc.go index b47c7ca3e..86f798792 100644 --- a/syz-fuzzer/proc.go +++ b/syz-fuzzer/proc.go @@ -4,7 +4,6 @@ package main import ( - "fmt" "math/rand" "time" @@ -61,11 +60,13 @@ func (proc *Proc) loop() { (req.NeedCover || req.NeedSignal != rpctype.NoSignal || req.NeedHints) { proc.env.ForceRestart() } - info := proc.executeRaw(&opts, req.prog) + info, try := proc.executeRaw(&opts, req.ID, req.prog) // Let's perform signal filtering in a separate thread to get the most // exec/sec out of a syz-executor instance. proc.tool.results <- executionResult{ ExecutionRequest: req.ExecutionRequest, + procID: proc.pid, + try: try, info: info, } } @@ -86,7 +87,7 @@ func (proc *Proc) nextRequest() executionRequest { return req } -func (proc *Proc) executeRaw(opts *ipc.ExecOpts, p *prog.Prog) *ipc.ProgInfo { +func (proc *Proc) executeRaw(opts *ipc.ExecOpts, progID int64, p *prog.Prog) (*ipc.ProgInfo, int) { for try := 0; ; try++ { var output []byte var info *ipc.ProgInfo @@ -97,7 +98,7 @@ func (proc *Proc) executeRaw(opts *ipc.ExecOpts, p *prog.Prog) *ipc.ProgInfo { if err == nil { // Limit concurrency. ticket := proc.tool.gate.Enter() - proc.logProgram(p) + proc.tool.startExecutingCall(progID, proc.pid, try) output, info, hanged, err = proc.env.Exec(opts, p) proc.tool.gate.Leave(ticket) } @@ -107,9 +108,8 @@ func (proc *Proc) executeRaw(opts *ipc.ExecOpts, p *prog.Prog) *ipc.ProgInfo { // but so far we don't have a better handling than counting this. // This error is observed a lot on the seeded syz_mount_image calls. proc.tool.bufferTooSmall.Add(1) - return nil + return nil, try } - proc.tool.execRetries.Add(1) if try > 10 { log.SyzFatalf("executor %v failed %v times: %v\n%s", proc.pid, try, err, output) } @@ -120,17 +120,6 @@ func (proc *Proc) executeRaw(opts *ipc.ExecOpts, p *prog.Prog) *ipc.ProgInfo { continue } log.Logf(2, "result hanged=%v: %s", hanged, output) - return info + return info, try } } - -func (proc *Proc) logProgram(p *prog.Prog) { - // The following output helps to understand what program crashed kernel. - // It must not be intermixed. - now := time.Now() - data := p.Serialize() - proc.tool.logMu.Lock() - fmt.Printf("%02v:%02v:%02v executing program %v:\n%s\n", - now.Hour(), now.Minute(), now.Second(), proc.pid, data) - proc.tool.logMu.Unlock() -} diff --git a/syz-manager/manager.go b/syz-manager/manager.go index 9295be7f6..523e47b2c 100644 --- a/syz-manager/manager.go +++ b/syz-manager/manager.go @@ -736,8 +736,10 @@ func (mgr *Manager) runInstance(index int) (*Crash, error) { mgr.checkUsedFiles() // Use unique instance names to keep name collisions in case of untimely RPC messages. instanceName := fmt.Sprintf("vm-%d", mgr.nextInstanceID.Add(1)) + injectLog := make(chan []byte, 10) + mgr.serv.createInstance(instanceName, injectLog) - rep, vmInfo, err := mgr.runInstanceInner(index, instanceName) + rep, vmInfo, err := mgr.runInstanceInner(index, instanceName, injectLog) machineInfo := mgr.serv.shutdownInstance(instanceName, rep != nil) if len(vmInfo) != 0 { @@ -760,7 +762,8 @@ func (mgr *Manager) runInstance(index int) (*Crash, error) { return crash, nil } -func (mgr *Manager) runInstanceInner(index int, instanceName string) (*report.Report, []byte, error) { +func (mgr *Manager) runInstanceInner(index int, instanceName string, injectLog <-chan []byte) ( + *report.Report, []byte, error) { start := time.Now() inst, err := mgr.vmPool.Create(index) @@ -826,7 +829,8 @@ func (mgr *Manager) runInstanceInner(index int, instanceName string) (*report.Re }, } cmd := instance.FuzzerCmd(args) - _, rep, err := inst.Run(mgr.cfg.Timeouts.VMRunningTime, mgr.reporter, cmd, vm.ExitTimeout, vm.StopChan(mgr.vmStop)) + _, rep, err := inst.Run(mgr.cfg.Timeouts.VMRunningTime, mgr.reporter, cmd, + vm.ExitTimeout, vm.StopChan(mgr.vmStop), vm.InjectOutput(injectLog)) if err != nil { return nil, nil, fmt.Errorf("failed to run fuzzer: %w", err) } diff --git a/syz-manager/rpc.go b/syz-manager/rpc.go index 3603d3b42..cd151fd78 100644 --- a/syz-manager/rpc.go +++ b/syz-manager/rpc.go @@ -4,6 +4,7 @@ package main import ( + "bytes" "fmt" "net" "sync" @@ -41,6 +42,8 @@ type RPCServer struct { // They will be eventually given to other VMs. rescuedInputs []*fuzzer.Request + statExecs *stats.Val + statExecRetries *stats.Val statVMRestarts *stats.Val statExchangeCalls *stats.Val statExchangeProgs *stats.Val @@ -50,7 +53,9 @@ type RPCServer struct { } type Runner struct { - name string + name string + injectLog chan<- []byte + injectStop chan bool machineInfo []byte instModules *cover.CanonicalizerInstance @@ -60,7 +65,13 @@ type Runner struct { newMaxSignal signal.Signal dropMaxSignal signal.Signal nextRequestID atomic.Int64 - requests map[int64]*fuzzer.Request + requests map[int64]Request +} + +type Request struct { + req *fuzzer.Request + try int + procID int } type BugFrames struct { @@ -77,8 +88,12 @@ type RPCManagerView interface { func startRPCServer(mgr *Manager) (*RPCServer, error) { serv := &RPCServer{ - mgr: mgr, - cfg: mgr.cfg, + mgr: mgr, + cfg: mgr.cfg, + statExecs: mgr.statExecs, + statExecRetries: stats.Create("exec retries", + "Number of times a test program was restarted because the first run failed", + stats.Rate{}, stats.Graph("executor")), statVMRestarts: stats.Create("vm restarts", "Total number of VM starts", stats.Rate{}, stats.NoGraph), statExchangeCalls: stats.Create("exchange calls", "Number of RPC Exchange calls", @@ -123,15 +138,15 @@ func (serv *RPCServer) Connect(a *rpctype.ConnectArgs, r *rpctype.ConnectRes) er serv.coverFilter = coverFilter - runner := &Runner{ - name: a.Name, - machineInfo: a.MachineInfo, - instModules: serv.canonicalModules.NewInstance(a.Modules), - requests: make(map[int64]*fuzzer.Request), - } - if _, loaded := serv.runners.LoadOrStore(a.Name, runner); loaded { + runner := serv.findRunner(a.Name) + runner.mu.Lock() + defer runner.mu.Unlock() + if runner.machineInfo != nil { return fmt.Errorf("duplicate connection from %s", a.Name) } + runner.machineInfo = a.MachineInfo + runner.instModules = serv.canonicalModules.NewInstance(a.Modules) + r.MemoryLeakFrames = bugFrames.memoryLeaks r.DataRaceFrames = bugFrames.dataRaces @@ -193,14 +208,37 @@ func (serv *RPCServer) Check(a *rpctype.CheckArgs, r *int) error { return nil } +func (serv *RPCServer) StartExecuting(a *rpctype.ExecutingRequest, r *int) error { + serv.statExecs.Add(1) + if a.Try != 0 { + serv.statExecRetries.Add(1) + } + runner := serv.findRunner(a.Name) + if runner == nil { + return nil + } + runner.mu.Lock() + req, ok := runner.requests[a.ID] + if !ok { + runner.mu.Unlock() + return nil + } + // RPC handlers are invoked in separate goroutines, so start executing notifications + // can outrun each other and completion notification. + if req.try < a.Try { + req.try = a.Try + req.procID = a.ProcID + } + runner.requests[a.ID] = req + runner.mu.Unlock() + runner.logProgram(a.ProcID, req.req.Prog) + return nil +} + func (serv *RPCServer) ExchangeInfo(a *rpctype.ExchangeInfoRequest, r *rpctype.ExchangeInfoReply) error { start := time.Now() - var runner *Runner - if val, _ := serv.runners.Load(a.Name); val != nil { - runner = val.(*Runner) - } else { - // There might be a parallel shutdownInstance(). - // Ignore the request then. + runner := serv.findRunner(a.Name) + if runner == nil { return nil } @@ -257,6 +295,15 @@ func (serv *RPCServer) ExchangeInfo(a *rpctype.ExchangeInfoRequest, r *rpctype.E return nil } +func (serv *RPCServer) findRunner(name string) *Runner { + if val, _ := serv.runners.Load(name); val != nil { + return val.(*Runner) + } + // There might be a parallel shutdownInstance(). + // Ignore requests then. + return nil +} + func (serv *RPCServer) updateFilteredCover(pcs []uint32) error { if len(pcs) == 0 || serv.coverFilter == nil { return nil @@ -276,14 +323,21 @@ func (serv *RPCServer) updateFilteredCover(pcs []uint32) error { return nil } -func (serv *RPCServer) shutdownInstance(name string, crashed bool) []byte { - var runner *Runner - if val, _ := serv.runners.LoadAndDelete(name); val != nil { - runner = val.(*Runner) - } else { - return nil +func (serv *RPCServer) createInstance(name string, injectLog chan<- []byte) { + runner := &Runner{ + name: name, + requests: make(map[int64]Request), + injectLog: injectLog, + injectStop: make(chan bool), + } + if _, loaded := serv.runners.LoadOrStore(name, runner); loaded { + panic(fmt.Sprintf("duplicate instance %s", name)) } +} +func (serv *RPCServer) shutdownInstance(name string, crashed bool) []byte { + runnerPtr, _ := serv.runners.LoadAndDelete(name) + runner := runnerPtr.(*Runner) runner.mu.Lock() if runner.requests == nil { // We are supposed to invoke this code only once. @@ -293,20 +347,20 @@ func (serv *RPCServer) shutdownInstance(name string, crashed bool) []byte { runner.requests = nil runner.mu.Unlock() - if crashed { - // The VM likely crashed, so let's tell pkg/fuzzer to abort the affected jobs. - // fuzzerObj may be null, but in that case oldRequests would be empty as well. - fuzzerObj := serv.mgr.getFuzzer() - for _, req := range oldRequests { - fuzzerObj.Done(req, &fuzzer.Result{Stop: true}) - } - } else { - // We will resend these inputs to another VM. - serv.mu.Lock() - for _, req := range oldRequests { - serv.rescuedInputs = append(serv.rescuedInputs, req) + close(runner.injectStop) + + // The VM likely crashed, so let's tell pkg/fuzzer to abort the affected jobs. + // fuzzerObj may be null, but in that case oldRequests would be empty as well. + serv.mu.Lock() + defer serv.mu.Unlock() + fuzzerObj := serv.mgr.getFuzzer() + for _, req := range oldRequests { + if crashed && req.try >= 0 { + fuzzerObj.Done(req.req, &fuzzer.Result{Stop: true}) + } else { + // We will resend these inputs to another VM. + serv.rescuedInputs = append(serv.rescuedInputs, req.req) } - serv.mu.Unlock() } return runner.machineInfo } @@ -333,6 +387,11 @@ func (runner *Runner) doneRequest(resp rpctype.ExecutionResult, fuzzerObj *fuzze // There may be a concurrent shutdownInstance() call. return } + // RPC handlers are invoked in separate goroutines, so log the program here + // if completion notification outrun start executing notification. + if req.try < resp.Try { + runner.logProgram(resp.ProcID, req.req.Prog) + } info := &resp.Info for i := 0; i < len(info.Calls); i++ { call := &info.Calls[i] @@ -341,7 +400,7 @@ func (runner *Runner) doneRequest(resp rpctype.ExecutionResult, fuzzerObj *fuzze } info.Extra.Cover = runner.instModules.Canonicalize(info.Extra.Cover) info.Extra.Signal = runner.instModules.Canonicalize(info.Extra.Signal) - fuzzerObj.Done(req, &fuzzer.Result{Info: info}) + fuzzerObj.Done(req.req, &fuzzer.Result{Info: info}) } func (runner *Runner) newRequest(req *fuzzer.Request) rpctype.ExecutionRequest { @@ -354,7 +413,10 @@ func (runner *Runner) newRequest(req *fuzzer.Request) rpctype.ExecutionRequest { id := runner.nextRequestID.Add(1) runner.mu.Lock() if runner.requests != nil { - runner.requests[id] = req + runner.requests[id] = Request{ + req: req, + try: -1, + } } runner.mu.Unlock() return rpctype.ExecutionRequest{ @@ -367,3 +429,12 @@ func (runner *Runner) newRequest(req *fuzzer.Request) rpctype.ExecutionRequest { NeedHints: req.NeedHints, } } + +func (runner *Runner) logProgram(procID int, p *prog.Prog) { + buf := new(bytes.Buffer) + fmt.Fprintf(buf, "executing program %v:\n%s\n", procID, p.Serialize()) + select { + case runner.injectLog <- buf.Bytes(): + case <-runner.injectStop: + } +} diff --git a/syz-manager/stats.go b/syz-manager/stats.go index b8e53e8f7..53373cf7e 100644 --- a/syz-manager/stats.go +++ b/syz-manager/stats.go @@ -78,9 +78,6 @@ func (mgr *Manager) initStats() { // Stats imported from the fuzzer (names must match the the fuzzer names). stats.Create("executor restarts", "Number of times executor process was restarted", stats.Rate{}, stats.Graph("executor")) - stats.Create("exec retries", - "Number of times a test program was restarted because the first run failed", - stats.Rate{}, stats.Graph("executor")) stats.Create("buffer too small", "Program serialization overflowed exec buffer", stats.NoGraph) stats.Create("no exec requests", "Number of times fuzzer was stalled with no exec requests", stats.Rate{}) stats.Create("no exec duration", "Total duration fuzzer was stalled with no exec requests (ns/sec)", stats.Rate{}) -- cgit mrf-deployment