diff options
| author | Dmitry Vyukov <dvyukov@google.com> | 2024-05-02 08:12:13 +0200 |
|---|---|---|
| committer | Dmitry Vyukov <dvyukov@google.com> | 2024-05-21 09:03:00 +0000 |
| commit | 98aa8d464020910064d862eff99440bfdf0dc03c (patch) | |
| tree | 2f4ef2ad668b2e3c0f3c460ce7e8b66517506290 | |
| parent | 4c0d3ee3f6ea306acf4d7ce817ef4e279ce73b65 (diff) | |
syz-manager, syz-fuzzer: switch communication to flatrpc
Switch to flatrpc connection between manager and fuzzer.
With flatrpc we have a goroutine per connection instead of async RPC,
which makes things a bit simpler. Now don't reordered messages
(in particular start executing and finish executing for programs),
race on the program during printing is no longer possible
since we finish handlign start executing request before we even
receive finish executing.
We also don't need to lookup Runner for every RPC since it's
now local to the handling goroutine.
We also don't need to protect requests map since only single
goroutine accesses it.
We also send new programs to the fuzzer as soon as we receive
start executing message, which provides better buffering.
We also don't batch new requests and finish executing requests
in a single RPC, which makes things a bit simpler.
In my local run this reduces syz-manager heap size from 1.3GB to 1.1GB.
Update #1541
| -rw-r--r-- | syz-fuzzer/fuzzer.go | 218 | ||||
| -rw-r--r-- | syz-fuzzer/fuzzer_test.go | 14 | ||||
| -rw-r--r-- | syz-fuzzer/proc.go | 132 | ||||
| -rw-r--r-- | syz-manager/manager.go | 27 | ||||
| -rw-r--r-- | syz-manager/rpc.go | 723 | ||||
| -rw-r--r-- | syz-manager/stats.go | 4 |
6 files changed, 558 insertions, 560 deletions
diff --git a/syz-fuzzer/fuzzer.go b/syz-fuzzer/fuzzer.go index 6870583df..5feae7585 100644 --- a/syz-fuzzer/fuzzer.go +++ b/syz-fuzzer/fuzzer.go @@ -12,6 +12,7 @@ import ( "path/filepath" "runtime" "runtime/debug" + "slices" "sync" "sync/atomic" "time" @@ -22,7 +23,6 @@ import ( "github.com/google/syzkaller/pkg/ipc/ipcconfig" "github.com/google/syzkaller/pkg/log" "github.com/google/syzkaller/pkg/osutil" - "github.com/google/syzkaller/pkg/rpctype" "github.com/google/syzkaller/pkg/signal" "github.com/google/syzkaller/pkg/tool" "github.com/google/syzkaller/prog" @@ -31,35 +31,19 @@ import ( ) type FuzzerTool struct { - name string + conn *flatrpc.Conn executor string gate *ipc.Gate - manager *rpctype.RPCClient // TODO: repair triagedCandidates logic, it's broken now. triagedCandidates uint32 timeouts targets.Timeouts leakFrames []string - noExecRequests atomic.Uint64 - noExecDuration atomic.Uint64 - - requests chan rpctype.ExecutionRequest - results chan executionResult + requests chan *flatrpc.ExecRequest signalMu sync.RWMutex maxSignal signal.Signal } -// executionResult offloads some computations from the proc loop -// to the communication thread. -type executionResult struct { - rpctype.ExecutionRequest - procID int - try int - info *ipc.ProgInfo - output []byte - err string -} - // Gate size controls how deep in the log the last executed by every proc // program may be. The intent is to make sure that, given the output log, // we always understand what was happening. @@ -121,69 +105,75 @@ func main() { return } + executorArch, executorSyzRevision, executorGitRevision, err := executorVersion(executor) + if err != nil { + log.SyzFatalf("failed to run executor version: %v ", err) + } + log.Logf(0, "dialing manager at %v", *flagManager) - manager, err := rpctype.NewRPCClient(*flagManager) + conn, err := flatrpc.Dial(*flagManager, timeouts.Scale) if err != nil { - log.SyzFatalf("failed to create an RPC client: %v ", err) + log.SyzFatalf("failed to connect to host: %v ", err) } log.Logf(1, "connecting to manager...") - a := &rpctype.ConnectArgs{ + connectReq := &flatrpc.ConnectRequest{ Name: *flagName, - GitRevision: prog.GitRevision, - SyzRevision: target.Revision, + Arch: executorArch, + GitRevision: executorGitRevision, + SyzRevision: executorSyzRevision, } - a.ExecutorArch, a.ExecutorSyzRevision, a.ExecutorGitRevision, err = executorVersion(executor) - if err != nil { - log.SyzFatalf("failed to run executor version: %v ", err) + if err := flatrpc.Send(conn, connectReq); err != nil { + log.SyzFatal(err) } - r := &rpctype.ConnectRes{} - if err := manager.Call("Manager.Connect", a, r); err != nil { - log.SyzFatalf("failed to call Manager.Connect(): %v ", err) + connectReplyRaw, err := flatrpc.Recv[flatrpc.ConnectReplyRaw](conn) + if err != nil { + log.SyzFatal(err) } - checkReq := &rpctype.CheckArgs{ - Name: *flagName, - Files: host.ReadFiles(r.ReadFiles), - Globs: make(map[string][]string), + connectReply := connectReplyRaw.UnPack() + + infoReq := &flatrpc.InfoRequest{ + Files: host.ReadFiles(connectReply.Files), } - features, err := host.SetupFeatures(target, executor, r.Features, nil) + features, err := host.SetupFeatures(target, executor, connectReply.Features, nil) if err != nil { - log.SyzFatalf("failed to setup features: %v ", err) + infoReq.Error = fmt.Sprintf("failed to setup features: %v ", err) } - checkReq.Features = features - for _, glob := range r.ReadGlobs { + infoReq.Features = features + for _, glob := range connectReply.Globs { files, err := filepath.Glob(filepath.FromSlash(glob)) - if err != nil && checkReq.Error == "" { - checkReq.Error = fmt.Sprintf("failed to read glob %q: %v", glob, err) + if err != nil && infoReq.Error == "" { + infoReq.Error = fmt.Sprintf("failed to read glob %q: %v", glob, err) } - checkReq.Globs[glob] = files + infoReq.Globs = append(infoReq.Globs, &flatrpc.GlobInfo{ + Name: glob, + Files: files, + }) } - checkRes := new(rpctype.CheckRes) - if err := manager.Call("Manager.Check", checkReq, checkRes); err != nil { - log.SyzFatalf("Manager.Check call failed: %v", err) + if err := flatrpc.Send(conn, infoReq); err != nil { + log.SyzFatal(err) } - if checkReq.Error != "" { - log.SyzFatalf("%v", checkReq.Error) + infoReplyRaw, err := flatrpc.Recv[flatrpc.InfoReplyRaw](conn) + if err != nil { + log.SyzFatal(err) } + infoReply := infoReplyRaw.UnPack() - if checkRes.CoverFilterBitmap != nil { - if err := osutil.WriteFile("syz-cover-bitmap", checkRes.CoverFilterBitmap); err != nil { + if len(infoReply.CoverFilter) != 0 { + if err := osutil.WriteFile("syz-cover-bitmap", infoReply.CoverFilter); err != nil { log.SyzFatalf("failed to write syz-cover-bitmap: %v", err) } } - inputsCount := *flagProcs * 2 fuzzerTool := &FuzzerTool{ - name: *flagName, + conn: conn, executor: executor, - manager: manager, timeouts: timeouts, - leakFrames: r.MemoryLeakFrames, + leakFrames: connectReply.LeakFrames, - requests: make(chan rpctype.ExecutionRequest, 2*inputsCount), - results: make(chan executionResult, 2*inputsCount), + requests: make(chan *flatrpc.ExecRequest, *flagProcs*4), } - fuzzerTool.filterDataRaceFrames(r.DataRaceFrames) + fuzzerTool.filterDataRaceFrames(connectReply.RaceFrames) var gateCallback func() for _, feat := range features { if feat.Id == flatrpc.FeatureLeak && feat.Reason == "" { @@ -197,10 +187,7 @@ func main() { startProc(fuzzerTool, pid, config) } - // Query enough inputs at the beginning. - fuzzerTool.exchangeDataCall(nil, 0) - go fuzzerTool.exchangeDataWorker() - fuzzerTool.exchangeDataWorker() + fuzzerTool.handleConn() } func (tool *FuzzerTool) leakGateCallback() { @@ -241,101 +228,46 @@ func (tool *FuzzerTool) filterDataRaceFrames(frames []string) { log.Logf(0, "%s", output) } -func (tool *FuzzerTool) startExecutingCall(progID int64, pid, try int) { - tool.manager.AsyncCall("Manager.StartExecuting", &rpctype.ExecutingRequest{ - Name: tool.name, - ID: progID, - ProcID: pid, - Try: try, - }) -} - -func (tool *FuzzerTool) exchangeDataCall(results []rpctype.ExecutionResult, latency time.Duration) time.Duration { - needProgs := max(0, cap(tool.requests)/2-len(tool.requests)) - a := &rpctype.ExchangeInfoRequest{ - Name: tool.name, - NeedProgs: needProgs, - Results: results, - StatsDelta: tool.grabStats(), - Latency: latency, - } - r := &rpctype.ExchangeInfoReply{} - start := osutil.MonotonicNano() - if err := tool.manager.Call("Manager.ExchangeInfo", a, r); err != nil { - log.SyzFatalf("Manager.ExchangeInfo call failed: %v", err) - } - latency = osutil.MonotonicNano() - start - tool.updateMaxSignal(r.NewMaxSignal, r.DropMaxSignal) - if len(r.Requests) == 0 { - // This is possible during initial checking stage, backoff a bit. - time.Sleep(100 * time.Millisecond) - } - for _, req := range r.Requests { - tool.requests <- req +func (tool *FuzzerTool) startExecutingCall(progID int64, pid, try int, wait time.Duration) { + msg := &flatrpc.ExecutorMessage{ + Msg: &flatrpc.ExecutorMessages{ + Type: flatrpc.ExecutorMessagesRawExecuting, + Value: &flatrpc.ExecutingMessage{ + Id: progID, + ProcId: int32(pid), + Try: int32(try), + WaitDuration: int64(wait), + }, + }, + } + if err := flatrpc.Send(tool.conn, msg); err != nil { + log.SyzFatal(err) } - return latency } -func (tool *FuzzerTool) exchangeDataWorker() { - var latency time.Duration - ticker := time.NewTicker(3 * time.Second * tool.timeouts.Scale).C +func (tool *FuzzerTool) handleConn() { for { - var results []rpctype.ExecutionResult - select { - case res := <-tool.results: - results = append(results, tool.convertExecutionResult(res)) - case <-ticker: - // This is not expected to happen a lot, - // but this is required to resolve potential deadlock - // during initial checking stage when we may get - // no test requests from the host in some requests. + raw, err := flatrpc.Recv[flatrpc.HostMessageRaw](tool.conn) + if err != nil { + log.SyzFatal(err) } - // Grab other finished calls, just in case there are any. - loop: - for { - select { - case res := <-tool.results: - results = append(results, tool.convertExecutionResult(res)) - default: - break loop - } + switch msg := raw.UnPack().Msg.Value.(type) { + case *flatrpc.ExecRequest: + msg.ProgData = slices.Clone(msg.ProgData) + tool.requests <- msg + case *flatrpc.SignalUpdate: + tool.handleSignalUpdate(msg) } - // Replenish exactly the finished requests. - latency = tool.exchangeDataCall(results, latency) - } -} - -func (tool *FuzzerTool) convertExecutionResult(res executionResult) rpctype.ExecutionResult { - ret := rpctype.ExecutionResult{ - ID: res.ID, - ProcID: res.procID, - Try: res.try, - Output: res.output, - Error: res.err, - } - if res.info != nil { - if res.NewSignal { - tool.diffMaxSignal(res.info, res.SignalFilter, res.SignalFilterCall) - } - ret.Info = *res.info - } - return ret -} - -func (tool *FuzzerTool) grabStats() map[string]uint64 { - return map[string]uint64{ - "no exec requests": tool.noExecRequests.Swap(0), - "no exec duration": tool.noExecDuration.Swap(0), } } -func (tool *FuzzerTool) diffMaxSignal(info *ipc.ProgInfo, mask signal.Signal, maskCall int) { +func (tool *FuzzerTool) diffMaxSignal(info *flatrpc.ProgInfo, mask signal.Signal, maskCall int) { tool.signalMu.RLock() defer tool.signalMu.RUnlock() diffMaxSignal(info, tool.maxSignal, mask, maskCall) } -func diffMaxSignal(info *ipc.ProgInfo, max, mask signal.Signal, maskCall int) { +func diffMaxSignal(info *flatrpc.ProgInfo, max, mask signal.Signal, maskCall int) { info.Extra.Signal = diffCallSignal(info.Extra.Signal, max, mask, -1, maskCall) for i := 0; i < len(info.Calls); i++ { info.Calls[i].Signal = diffCallSignal(info.Calls[i].Signal, max, mask, i, maskCall) @@ -349,11 +281,11 @@ func diffCallSignal(raw []uint32, max, mask signal.Signal, call, maskCall int) [ return max.DiffFromRaw(raw) } -func (tool *FuzzerTool) updateMaxSignal(add, drop []uint32) { +func (tool *FuzzerTool) handleSignalUpdate(msg *flatrpc.SignalUpdate) { tool.signalMu.Lock() defer tool.signalMu.Unlock() - tool.maxSignal.Subtract(signal.FromRaw(drop, 0)) - tool.maxSignal.Merge(signal.FromRaw(add, 0)) + tool.maxSignal.Subtract(signal.FromRaw(msg.DropMax, 0)) + tool.maxSignal.Merge(signal.FromRaw(msg.NewMax, 0)) } func setupPprofHandler(port int) { diff --git a/syz-fuzzer/fuzzer_test.go b/syz-fuzzer/fuzzer_test.go index d43b7bc74..2acc70ab9 100644 --- a/syz-fuzzer/fuzzer_test.go +++ b/syz-fuzzer/fuzzer_test.go @@ -6,7 +6,7 @@ package main import ( "testing" - "github.com/google/syzkaller/pkg/ipc" + "github.com/google/syzkaller/pkg/flatrpc" "github.com/google/syzkaller/pkg/signal" "github.com/stretchr/testify/assert" ) @@ -14,8 +14,8 @@ import ( func TestFilterProgInfo(t *testing.T) { max := signal.FromRaw([]uint32{5, 6, 7}, 0) mask := signal.FromRaw([]uint32{2, 4, 6, 8}, 0) - info := ipc.ProgInfo{ - Calls: []ipc.CallInfo{ + info := flatrpc.ProgInfo{ + Calls: []*flatrpc.CallInfo{ { Signal: []uint32{1, 2, 3, 5, 6}, Cover: []uint32{1, 2, 3}, @@ -25,14 +25,14 @@ func TestFilterProgInfo(t *testing.T) { Cover: []uint32{2, 3, 4}, }, }, - Extra: ipc.CallInfo{ + Extra: &flatrpc.CallInfo{ Signal: []uint32{3, 4, 5}, Cover: []uint32{3, 4, 5}, }, } diffMaxSignal(&info, max, mask, 1) - assert.Equal(t, ipc.ProgInfo{ - Calls: []ipc.CallInfo{ + assert.Equal(t, flatrpc.ProgInfo{ + Calls: []*flatrpc.CallInfo{ { Signal: []uint32{1, 2, 3}, Cover: []uint32{1, 2, 3}, @@ -42,7 +42,7 @@ func TestFilterProgInfo(t *testing.T) { Cover: []uint32{2, 3, 4}, }, }, - Extra: ipc.CallInfo{ + Extra: &flatrpc.CallInfo{ Signal: []uint32{3, 4}, Cover: []uint32{3, 4, 5}, }, diff --git a/syz-fuzzer/proc.go b/syz-fuzzer/proc.go index 6542225da..acc333b9b 100644 --- a/syz-fuzzer/proc.go +++ b/syz-fuzzer/proc.go @@ -15,7 +15,7 @@ import ( "github.com/google/syzkaller/pkg/ipc" "github.com/google/syzkaller/pkg/log" "github.com/google/syzkaller/pkg/osutil" - "github.com/google/syzkaller/pkg/rpctype" + "github.com/google/syzkaller/pkg/signal" ) // Proc represents a single fuzzing process (executor). @@ -41,66 +41,74 @@ func startProc(tool *FuzzerTool, pid int, config *ipc.Config) { func (proc *Proc) loop() { rnd := rand.New(rand.NewSource(time.Now().UnixNano() + int64(proc.pid))) for { - req := proc.nextRequest() + req, wait := proc.nextRequest() // Do not let too much state accumulate. const restartIn = 600 resetFlags := flatrpc.ExecFlagCollectSignal | flatrpc.ExecFlagCollectCover | flatrpc.ExecFlagCollectComps - if (req.ExecOpts.ExecFlags&resetFlags != 0 && - rnd.Intn(restartIn) == 0) || req.ResetState { + if (req.ExecFlags&resetFlags != 0 && + rnd.Intn(restartIn) == 0) || req.Flags&flatrpc.RequestFlagResetState != 0 { proc.env.ForceRestart() } - info, output, err, try := proc.execute(req) - res := executionResult{ - ExecutionRequest: req, - procID: proc.pid, - try: try, - info: info, - output: output, - err: err, + info, output, err := proc.execute(req, wait) + res := &flatrpc.ExecResult{ + Id: req.Id, + Info: convertProgInfo(info), + Output: output, + Error: err, } - for i := 1; i < req.Repeat && res.err == "" && !req.IsBinary; i++ { + for i := 1; i < int(req.Repeat) && res.Error == "" && req.Flags&flatrpc.RequestFlagIsBinary == 0; i++ { // Recreate Env every few iterations, this allows to cover more paths. if i%2 == 0 { proc.env.ForceRestart() } - info, output, err, _ := proc.execute(req) - if res.info == nil { - res.info = info - } else { - res.info.Calls = append(res.info.Calls, info.Calls...) + info, output, err := proc.execute(req, 0) + if res.Info == nil { + res.Info = convertProgInfo(info) + } else if info != nil { + res.Info.Calls = append(res.Info.Calls, convertCalls(info)...) } - res.output = append(res.output, output...) - res.err = err + res.Output = append(res.Output, output...) + res.Error = err + } + if res.Info != nil && req.Flags&flatrpc.RequestFlagNewSignal != 0 { + filter := signal.FromRaw(req.SignalFilter, 0) + proc.tool.diffMaxSignal(res.Info, filter, int(req.SignalFilterCall)) + } + msg := &flatrpc.ExecutorMessage{ + Msg: &flatrpc.ExecutorMessages{ + Type: flatrpc.ExecutorMessagesRawExecResult, + Value: res, + }, + } + if err := flatrpc.Send(proc.tool.conn, msg); err != nil { + log.SyzFatal(err) } - // Let's perform signal filtering in a separate thread to get the most - // exec/sec out of a syz-executor instance. - proc.tool.results <- res } } -func (proc *Proc) nextRequest() rpctype.ExecutionRequest { +func (proc *Proc) nextRequest() (*flatrpc.ExecRequest, time.Duration) { select { case req := <-proc.tool.requests: - return req + return req, 0 default: } // Not having enough inputs to execute is a sign of RPC communication problems. // Let's count and report such situations. start := osutil.MonotonicNano() req := <-proc.tool.requests - proc.tool.noExecDuration.Add(uint64(osutil.MonotonicNano() - start)) - proc.tool.noExecRequests.Add(1) - return req + wait := osutil.MonotonicNano() - start + return req, wait } -func (proc *Proc) execute(req rpctype.ExecutionRequest) (info *ipc.ProgInfo, output []byte, errStr string, try int) { +func (proc *Proc) execute(req *flatrpc.ExecRequest, wait time.Duration) ( + info *ipc.ProgInfo, output []byte, errStr string) { var err error - if req.IsBinary { + if req.Flags&flatrpc.RequestFlagIsBinary != 0 { output, err = executeBinary(req) } else { - info, output, try, err = proc.executeProgram(req) + info, output, err = proc.executeProgram(req, wait) } - if !req.ReturnOutput { + if req.Flags&flatrpc.RequestFlagReturnOutput == 0 { output = nil } if err != nil { @@ -109,30 +117,36 @@ func (proc *Proc) execute(req rpctype.ExecutionRequest) (info *ipc.ProgInfo, out return } -func (proc *Proc) executeProgram(req rpctype.ExecutionRequest) (*ipc.ProgInfo, []byte, int, error) { +func (proc *Proc) executeProgram(req *flatrpc.ExecRequest, wait time.Duration) (*ipc.ProgInfo, []byte, error) { + returnError := req.Flags&flatrpc.RequestFlagReturnError != 0 + execOpts := &ipc.ExecOpts{ + EnvFlags: req.ExecEnv, + ExecFlags: req.ExecFlags, + SandboxArg: int(req.SandboxArg), + } for try := 0; ; try++ { var output []byte var info *ipc.ProgInfo var hanged bool // On a heavily loaded VM, syz-executor may take significant time to start. // Let's do it outside of the gate ticket. - err := proc.env.RestartIfNeeded(&req.ExecOpts) + err := proc.env.RestartIfNeeded(execOpts) if err == nil { // Limit concurrency. ticket := proc.tool.gate.Enter() - proc.tool.startExecutingCall(req.ID, proc.pid, try) - output, info, hanged, err = proc.env.ExecProg(&req.ExecOpts, req.ProgData) + proc.tool.startExecutingCall(req.Id, proc.pid, try, wait) + output, info, hanged, err = proc.env.ExecProg(execOpts, req.ProgData) proc.tool.gate.Leave(ticket) // Don't print output if returning error b/c it may contain SYZFAIL. - if !req.ReturnError { + if !returnError { log.Logf(2, "result hanged=%v err=%v: %s", hanged, err, output) } - if hanged && err == nil && req.ReturnError { + if hanged && err == nil && returnError { err = errors.New("hanged") } } - if err == nil || req.ReturnError { - return info, output, try, err + if err == nil || returnError { + return info, output, err } log.Logf(4, "fuzzer detected executor failure='%v', retrying #%d", err, try+1) if try > 10 { @@ -143,7 +157,7 @@ func (proc *Proc) executeProgram(req rpctype.ExecutionRequest) (*ipc.ProgInfo, [ } } -func executeBinary(req rpctype.ExecutionRequest) ([]byte, error) { +func executeBinary(req *flatrpc.ExecRequest) ([]byte, error) { tmp, err := os.MkdirTemp("", "syz-runtest") if err != nil { return nil, fmt.Errorf("failed to create temp dir: %w", err) @@ -166,3 +180,39 @@ func executeBinary(req rpctype.ExecutionRequest) ([]byte, error) { } return output, err } + +func convertProgInfo(info *ipc.ProgInfo) *flatrpc.ProgInfo { + if info == nil { + return nil + } + return &flatrpc.ProgInfo{ + Elapsed: uint64(info.Elapsed), + Freshness: uint64(info.Freshness), + Extra: convertCallInfo(info.Extra), + Calls: convertCalls(info), + } +} + +func convertCalls(info *ipc.ProgInfo) []*flatrpc.CallInfo { + var calls []*flatrpc.CallInfo + for _, call := range info.Calls { + calls = append(calls, convertCallInfo(call)) + } + return calls +} + +func convertCallInfo(info ipc.CallInfo) *flatrpc.CallInfo { + var comps []*flatrpc.Comparison + for op1, ops := range info.Comps { + for op2 := range ops { + comps = append(comps, &flatrpc.Comparison{Op1: op1, Op2: op2}) + } + } + return &flatrpc.CallInfo{ + Flags: flatrpc.CallFlag(info.Flags), + Error: int32(info.Errno), + Cover: info.Cover, + Signal: info.Signal, + Comps: comps, + } +} diff --git a/syz-manager/manager.go b/syz-manager/manager.go index 460c65ba7..c85d59d36 100644 --- a/syz-manager/manager.go +++ b/syz-manager/manager.go @@ -724,14 +724,10 @@ func parseProgram(target *prog.Target, enabled map[*prog.Syscall]bool, data []by func (mgr *Manager) runInstance(index int) (*Crash, error) { mgr.checkUsedFiles() - var maxSignal signal.Signal - if fuzzer := mgr.fuzzer.Load(); fuzzer != nil { - maxSignal = fuzzer.Cover.CopyMaxSignal() - } // Use unique instance names to prevent name collisions in case of untimely RPC messages. instanceName := fmt.Sprintf("vm-%d", mgr.nextInstanceID.Add(1)) injectLog := make(chan []byte, 10) - mgr.serv.createInstance(instanceName, maxSignal, injectLog) + mgr.serv.createInstance(instanceName, injectLog) rep, vmInfo, err := mgr.runInstanceInner(index, instanceName, injectLog) machineInfo := mgr.serv.shutdownInstance(instanceName, rep != nil) @@ -1400,6 +1396,13 @@ func (mgr *Manager) corpusMinimization() { } } +func (mgr *Manager) maxSignal() signal.Signal { + if fuzzer := mgr.fuzzer.Load(); fuzzer != nil { + return fuzzer.Cover.CopyMaxSignal() + } + return nil +} + func (mgr *Manager) fuzzerSignalRotation() { const ( rotateSignals = 1000 @@ -1426,12 +1429,14 @@ func (mgr *Manager) fuzzerSignalRotation() { func (mgr *Manager) fuzzerLoop(fuzzer *fuzzer.Fuzzer) { for ; ; time.Sleep(time.Second / 2) { - // Distribute new max signal over all instances. - newSignal, dropSignal := fuzzer.Cover.GrabSignalDelta() - log.Logf(2, "distributing %d new signal, %d dropped signal", - len(newSignal), len(dropSignal)) - if len(newSignal)+len(dropSignal) != 0 { - mgr.serv.distributeSignalDelta(newSignal, dropSignal) + if mgr.cfg.Cover { + // Distribute new max signal over all instances. + newSignal, dropSignal := fuzzer.Cover.GrabSignalDelta() + log.Logf(2, "distributing %d new signal, %d dropped signal", + len(newSignal), len(dropSignal)) + if len(newSignal)+len(dropSignal) != 0 { + mgr.serv.distributeSignalDelta(newSignal, dropSignal) + } } // Update the state machine. diff --git a/syz-manager/rpc.go b/syz-manager/rpc.go index 86b2a375d..2f26ac4c2 100644 --- a/syz-manager/rpc.go +++ b/syz-manager/rpc.go @@ -5,8 +5,9 @@ package main import ( "bytes" + "errors" "fmt" - "net" + "slices" "sort" "strings" "sync" @@ -19,7 +20,6 @@ import ( "github.com/google/syzkaller/pkg/ipc" "github.com/google/syzkaller/pkg/log" "github.com/google/syzkaller/pkg/mgrconfig" - "github.com/google/syzkaller/pkg/rpctype" "github.com/google/syzkaller/pkg/signal" "github.com/google/syzkaller/pkg/stats" "github.com/google/syzkaller/pkg/vminfo" @@ -30,11 +30,10 @@ type RPCServer struct { mgr RPCManagerView cfg *mgrconfig.Config target *prog.Target - server *rpctype.RPCServer checker *vminfo.Checker port int - infoDone bool + infoOnce sync.Once checkDone atomic.Bool checkFailures int baseSource *queue.DynamicSourceCtl @@ -46,43 +45,31 @@ type RPCServer struct { coverFilter map[uint32]uint32 mu sync.Mutex - runners sync.Map // Instead of map[string]*Runner. + runners map[string]*Runner execSource queue.Source - statExecs *stats.Val - statExecRetries *stats.Val - statExecutorRestarts *stats.Val - statExecBufferTooSmall *stats.Val - statVMRestarts *stats.Val - statExchangeCalls *stats.Val - statExchangeProgs *stats.Val - statExchangeServerLatency *stats.Val - statExchangeClientLatency *stats.Val - statCoverFiltered *stats.Val + statExecs *stats.Val + statExecRetries *stats.Val + statExecutorRestarts *stats.Val + statExecBufferTooSmall *stats.Val + statVMRestarts *stats.Val + statNoExecRequests *stats.Val + statNoExecDuration *stats.Val + statCoverFiltered *stats.Val } type Runner struct { - name string - injectLog chan<- []byte - injectStop chan bool - stopFuzzing atomic.Bool - - machineInfo []byte - instModules *cover.CanonicalizerInstance - - // The mutex protects newMaxSignal, dropMaxSignal, and requests. - mu sync.Mutex - newMaxSignal signal.Signal - dropMaxSignal signal.Signal + stopped bool + shutdown chan bool + injectLog chan<- []byte + injectStop chan bool + injectBuf bytes.Buffer + conn *flatrpc.Conn + machineInfo []byte + canonicalizer *cover.CanonicalizerInstance nextRequestID int64 - requests map[int64]Request -} - -type Request struct { - req *queue.Request - serialized []byte - try int - procID int + requests map[int64]*queue.Request + executing map[int64]bool } type BugFrames struct { @@ -93,6 +80,7 @@ type BugFrames struct { // RPCManagerView restricts interface between RPCServer and Manager. type RPCManagerView interface { currentBugFrames() BugFrames + maxSignal() signal.Signal machineChecked(features flatrpc.Feature, enabledSyscalls map[*prog.Syscall]bool, opts ipc.ExecOpts) queue.Source } @@ -104,6 +92,7 @@ func startRPCServer(mgr *Manager) (*RPCServer, error) { mgr: mgr, cfg: mgr.cfg, target: mgr.target, + runners: make(map[string]*Runner), checker: checker, baseSource: baseSource, execSource: queue.Retry(baseSource), @@ -117,141 +106,320 @@ func startRPCServer(mgr *Manager) (*RPCServer, error) { "Program serialization overflowed exec buffer", stats.NoGraph), statVMRestarts: stats.Create("vm restarts", "Total number of VM starts", stats.Rate{}, stats.NoGraph), - statExchangeCalls: stats.Create("exchange calls", "Number of RPC Exchange calls", - stats.Rate{}), - statExchangeProgs: stats.Create("exchange progs", "Test programs exchanged per RPC call", - stats.Distribution{}), - statExchangeServerLatency: stats.Create("exchange manager latency", - "Manager RPC Exchange call latency (us)", stats.Distribution{}), - statExchangeClientLatency: stats.Create("exchange fuzzer latency", - "End-to-end fuzzer RPC Exchange call latency (us)", stats.Distribution{}), + statNoExecRequests: stats.Create("no exec requests", + "Number of times fuzzer was stalled with no exec requests", stats.Rate{}), + statNoExecDuration: stats.Create("no exec duration", + "Total duration fuzzer was stalled with no exec requests (ns/sec)", stats.Rate{}), statCoverFiltered: stats.Create("filtered coverage", "", stats.NoGraph), } - s, err := rpctype.NewRPCServer(mgr.cfg.RPC, "Manager", serv) + s, err := flatrpc.ListenAndServe(mgr.cfg.RPC, serv.handleConn) if err != nil { return nil, err } baseSource.Store(serv.checker) - log.Logf(0, "serving rpc on tcp://%v", s.Addr()) - serv.port = s.Addr().(*net.TCPAddr).Port - serv.server = s - go s.Serve() + log.Logf(0, "serving rpc on tcp://%v", s.Addr) + serv.port = s.Addr.Port return serv, nil } -func (serv *RPCServer) Connect(a *rpctype.ConnectArgs, r *rpctype.ConnectRes) error { - log.Logf(1, "fuzzer %v connected", a.Name) - checkRevisions(a, serv.cfg.Target) - serv.statVMRestarts.Add(1) - - bugFrames := serv.mgr.currentBugFrames() - r.MemoryLeakFrames = bugFrames.memoryLeaks - r.DataRaceFrames = bugFrames.dataRaces +func (serv *RPCServer) handleConn(conn *flatrpc.Conn) { + name, machineInfo, canonicalizer, err := serv.handshake(conn) + if err != nil { + log.Logf(1, "%v", err) + return + } serv.mu.Lock() - defer serv.mu.Unlock() - r.ReadFiles = serv.checker.RequiredFiles() - if serv.checkDone.Load() { - r.Features = serv.setupFeatures - } else { - r.ReadFiles = append(r.ReadFiles, serv.checker.CheckFiles()...) - r.ReadGlobs = serv.target.RequiredGlobs() - r.Features = flatrpc.AllFeatures + runner := serv.runners[name] + if runner == nil || runner.stopped { + serv.mu.Unlock() + log.Logf(2, "VM %v shut down before connect", name) + return } - return nil -} + runner.conn = conn + runner.machineInfo = machineInfo + runner.canonicalizer = canonicalizer + serv.mu.Unlock() + + err = serv.connectionLoop(runner) + log.Logf(2, "runner %v: %v", name, err) -func checkRevisions(a *rpctype.ConnectArgs, target *prog.Target) { - if target.Arch != a.ExecutorArch { - log.Fatalf("mismatching target/executor arches: %v vs %v", target.Arch, a.ExecutorArch) + crashed := <-runner.shutdown + for id, req := range runner.requests { + status := queue.Restarted + if crashed && runner.executing[id] { + status = queue.Crashed + } + req.Done(&queue.Result{Status: status}) } - if prog.GitRevision != a.GitRevision { - log.Fatalf("mismatching manager/fuzzer git revisions: %v vs %v", - prog.GitRevision, a.GitRevision) +} + +func (serv *RPCServer) handshake(conn *flatrpc.Conn) (string, []byte, *cover.CanonicalizerInstance, error) { + connectReqRaw, err := flatrpc.Recv[flatrpc.ConnectRequestRaw](conn) + if err != nil { + return "", nil, nil, err } - if prog.GitRevision != a.ExecutorGitRevision { - log.Fatalf("mismatching manager/executor git revisions: %v vs %v", - prog.GitRevision, a.ExecutorGitRevision) + connectReq := connectReqRaw.UnPack() + log.Logf(1, "fuzzer %v connected", connectReq.Name) + checkRevisions(connectReq, serv.cfg.Target) + serv.statVMRestarts.Add(1) + + bugFrames := serv.mgr.currentBugFrames() + connectReply := &flatrpc.ConnectReply{ + LeakFrames: bugFrames.memoryLeaks, + RaceFrames: bugFrames.dataRaces, } - if target.Revision != a.SyzRevision { - log.Fatalf("mismatching manager/fuzzer system call descriptions: %v vs %v", - target.Revision, a.SyzRevision) + connectReply.Files = serv.checker.RequiredFiles() + if serv.checkDone.Load() { + connectReply.Features = serv.setupFeatures + } else { + connectReply.Files = append(connectReply.Files, serv.checker.CheckFiles()...) + connectReply.Globs = serv.target.RequiredGlobs() + connectReply.Features = flatrpc.AllFeatures } - if target.Revision != a.ExecutorSyzRevision { - log.Fatalf("mismatching manager/executor system call descriptions: %v vs %v", - target.Revision, a.ExecutorSyzRevision) + if err := flatrpc.Send(conn, connectReply); err != nil { + return "", nil, nil, err } -} - -func (serv *RPCServer) Check(a *rpctype.CheckArgs, r *rpctype.CheckRes) error { - serv.mu.Lock() - defer serv.mu.Unlock() - modules, machineInfo, err := serv.checker.MachineInfo(a.Files) + infoReqRaw, err := flatrpc.Recv[flatrpc.InfoRequestRaw](conn) + if err != nil { + return "", nil, nil, err + } + infoReq := infoReqRaw.UnPack() + modules, machineInfo, err := serv.checker.MachineInfo(infoReq.Files) if err != nil { log.Logf(0, "parsing of machine info failed: %v", err) - if a.Error == "" { - a.Error = err.Error() + if infoReq.Error == "" { + infoReq.Error = err.Error() } } - - if a.Error != "" { - log.Logf(0, "machine check failed: %v", a.Error) + if infoReq.Error != "" { + log.Logf(0, "machine check failed: %v", infoReq.Error) serv.checkFailures++ if serv.checkFailures == 10 { log.Fatalf("machine check failing") } - return fmt.Errorf("machine check failed: %v", a.Error) + return "", nil, nil, errors.New("machine check failed") } - if !serv.infoDone { - serv.infoDone = true - - // Now execute check programs. - go serv.runCheck(a.Files, a.Features) - + serv.infoOnce.Do(func() { serv.modules = modules - serv.target.UpdateGlobs(a.Globs) serv.canonicalModules = cover.NewCanonicalizer(modules, serv.cfg.Cover) var err error serv.execCoverFilter, serv.coverFilter, err = createCoverageFilter(serv.cfg, modules) if err != nil { log.Fatalf("failed to init coverage filter: %v", err) } + globs := make(map[string][]string) + for _, glob := range infoReq.Globs { + globs[glob.Name] = glob.Files + } + serv.target.UpdateGlobs(globs) + // Flatbuffers don't do deep copy of byte slices, + // so clone manually since we pass it a goroutine. + for _, file := range infoReq.Files { + file.Data = slices.Clone(file.Data) + } + // Now execute check programs. + go func() { + if err := serv.runCheck(infoReq.Files, infoReq.Features); err != nil { + log.Fatalf("check failed: %v", err) + } + }() + }) + + canonicalizer := serv.canonicalModules.NewInstance(modules) + instCoverFilter := canonicalizer.DecanonicalizeFilter(serv.execCoverFilter) + infoReply := &flatrpc.InfoReply{ + CoverFilter: createCoverageBitmap(serv.cfg.SysTarget, instCoverFilter), + } + if err := flatrpc.Send(conn, infoReply); err != nil { + return "", nil, nil, err } + return connectReq.Name, machineInfo, canonicalizer, nil +} - runner := serv.findRunner(a.Name) - if runner == nil { - // There may be a parallel shutdownInstance() call that removes the runner. - return fmt.Errorf("unknown runner %s", a.Name) +func (serv *RPCServer) connectionLoop(runner *Runner) error { + if serv.cfg.Cover { + maxSignal := serv.mgr.maxSignal().ToRaw() + for len(maxSignal) != 0 { + // Split coverage into batches to not grow the connection serialization + // buffer too much (we don't want to grow it larger than what will be needed + // to send programs). + n := min(len(maxSignal), 50000) + if err := runner.sendSignalUpdate(maxSignal[:n], nil); err != nil { + return err + } + maxSignal = maxSignal[n:] + } + } + + for { + for len(runner.requests)-len(runner.executing) < 2*serv.cfg.Procs { + req := serv.execSource.Next() + if req == nil { + break + } + if err := serv.sendRequest(runner, req); err != nil { + return err + } + } + if len(runner.requests) == 0 { + // The runner has not requests at all, so don't wait to receive anything from it. + // This is only possible during the initial checking. + time.Sleep(10 * time.Millisecond) + continue + } + raw, err := flatrpc.Recv[flatrpc.ExecutorMessageRaw](runner.conn) + if err != nil { + return err + } + switch msg := raw.UnPack().Msg.Value.(type) { + case *flatrpc.ExecutingMessage: + err = serv.handleExecutingMessage(runner, msg) + case *flatrpc.ExecResult: + err = serv.handleExecResult(runner, msg) + default: + panic(fmt.Sprintf("unknown message %T", msg)) + } + if err != nil { + return err + } } +} - runner.mu.Lock() - defer runner.mu.Unlock() - if runner.machineInfo != nil { - return fmt.Errorf("duplicate connection from %s", a.Name) +func (serv *RPCServer) sendRequest(runner *Runner, req *queue.Request) error { + if err := validateRequest(req); err != nil { + panic(err) } - runner.machineInfo = machineInfo - runner.instModules = serv.canonicalModules.NewInstance(modules) - instCoverFilter := runner.instModules.DecanonicalizeFilter(serv.execCoverFilter) - r.CoverFilterBitmap = createCoverageBitmap(serv.cfg.SysTarget, instCoverFilter) + progData, err := req.Prog.SerializeForExec() + if err != nil { + // It's bad if we systematically fail to serialize programs, + // but so far we don't have a better handling than counting this. + // This error is observed a lot on the seeded syz_mount_image calls. + serv.statExecBufferTooSmall.Add(1) + req.Done(&queue.Result{Status: queue.ExecFailure}) + return nil + } + runner.nextRequestID++ + id := runner.nextRequestID + var flags flatrpc.RequestFlag + if !req.ReturnAllSignal { + flags |= flatrpc.RequestFlagNewSignal + } + if req.ReturnOutput { + flags |= flatrpc.RequestFlagReturnOutput + } + if req.ReturnError { + flags |= flatrpc.RequestFlagReturnError + } + if serv.cfg.Experimental.ResetAccState { + flags |= flatrpc.RequestFlagResetState + } + signalFilter := runner.canonicalizer.Decanonicalize(req.SignalFilter.ToRaw()) + msg := &flatrpc.HostMessage{ + Msg: &flatrpc.HostMessages{ + Type: flatrpc.HostMessagesRawExecRequest, + Value: &flatrpc.ExecRequest{ + Id: id, + ProgData: progData, + Flags: flags, + ExecEnv: req.ExecOpts.EnvFlags, + ExecFlags: req.ExecOpts.ExecFlags, + SandboxArg: int64(req.ExecOpts.SandboxArg), + SignalFilter: signalFilter, + SignalFilterCall: int32(req.SignalFilterCall), + }, + }, + } + runner.requests[id] = req + return flatrpc.Send(runner.conn, msg) +} + +func (serv *RPCServer) handleExecutingMessage(runner *Runner, msg *flatrpc.ExecutingMessage) error { + req := runner.requests[msg.Id] + if req == nil { + return fmt.Errorf("can't find executing request %v", msg.Id) + } + serv.statExecs.Add(1) + if msg.Try == 0 { + if msg.WaitDuration != 0 { + serv.statNoExecRequests.Add(1) + // Cap wait duration to 1 second to avoid extreme peaks on the graph + // which make it impossible to see real data (the rest becomes a flat line). + serv.statNoExecDuration.Add(int(min(msg.WaitDuration, 1e9))) + } + } else { + serv.statExecRetries.Add(1) + } + fmt.Fprintf(&runner.injectBuf, "executing program %v:\n%s\n", msg.ProcId, req.Prog.Serialize()) + select { + case runner.injectLog <- runner.injectBuf.Bytes(): + case <-runner.injectStop: + } + runner.injectBuf.Reset() + runner.executing[msg.Id] = true return nil } -func (serv *RPCServer) runCheck(checkFilesInfo []*flatrpc.FileInfo, checkFeatureInfo []*flatrpc.FeatureInfo) { - if err := serv.finishCheck(checkFilesInfo, checkFeatureInfo); err != nil { - log.Fatalf("check failed: %v", err) +func (serv *RPCServer) handleExecResult(runner *Runner, msg *flatrpc.ExecResult) error { + req := runner.requests[msg.Id] + if req == nil { + return fmt.Errorf("can't find executed request %v", msg.Id) } - serv.checkDone.Store(true) + delete(runner.requests, msg.Id) + delete(runner.executing, msg.Id) + if msg.Info != nil { + if msg.Info.Freshness == 0 { + serv.statExecutorRestarts.Add(1) + } + if !serv.cfg.Cover && req.ExecOpts.ExecFlags&flatrpc.ExecFlagCollectSignal != 0 { + // Coverage collection is disabled, but signal was requested => use a substitute signal. + addFallbackSignal(req.Prog, msg.Info) + } + for i := 0; i < len(msg.Info.Calls); i++ { + call := msg.Info.Calls[i] + call.Cover = runner.canonicalizer.Canonicalize(call.Cover) + call.Signal = runner.canonicalizer.Canonicalize(call.Signal) + } + msg.Info.Extra.Cover = runner.canonicalizer.Canonicalize(msg.Info.Extra.Cover) + msg.Info.Extra.Signal = runner.canonicalizer.Canonicalize(msg.Info.Extra.Signal) + } + status := queue.Success + var resErr error + if msg.Error != "" { + status = queue.ExecFailure + resErr = errors.New(msg.Error) + } + req.Done(&queue.Result{ + Status: status, + Info: convertProgInfo(msg.Info), + Output: slices.Clone(msg.Output), + Err: resErr, + }) + return nil } -func (serv *RPCServer) finishCheck(checkFilesInfo []*flatrpc.FileInfo, checkFeatureInfo []*flatrpc.FeatureInfo) error { - // Note: need to print disbled syscalls before failing due to an error. - // This helps to debug "all system calls are disabled". +func checkRevisions(a *flatrpc.ConnectRequest, target *prog.Target) { + if target.Arch != a.Arch { + log.Fatalf("mismatching target/executor arches: %v vs %v", target.Arch, a.Arch) + } + if prog.GitRevision != a.GitRevision { + log.Fatalf("mismatching manager/fuzzer git revisions: %v vs %v", + prog.GitRevision, a.GitRevision) + } + if target.Revision != a.SyzRevision { + log.Fatalf("mismatching manager/fuzzer system call descriptions: %v vs %v", + target.Revision, a.SyzRevision) + } +} +func (serv *RPCServer) runCheck(checkFilesInfo []*flatrpc.FileInfo, checkFeatureInfo []*flatrpc.FeatureInfo) error { enabledCalls, disabledCalls, features, checkErr := serv.checker.Run(checkFilesInfo, checkFeatureInfo) enabledCalls, transitivelyDisabled := serv.target.TransitivelyEnabledCalls(enabledCalls) + // Note: need to print disbled syscalls before failing due to an error. + // This helps to debug "all system calls are disabled". buf := new(bytes.Buffer) if len(serv.cfg.EnabledSyscalls) != 0 || log.V(1) { if len(disabledCalls) != 0 { @@ -308,94 +476,7 @@ func (serv *RPCServer) finishCheck(checkFilesInfo []*flatrpc.FileInfo, checkFeat serv.setupFeatures = features.NeedSetup() newSource := serv.mgr.machineChecked(serv.enabledFeatures, enabledCalls, serv.execOpts()) serv.baseSource.Store(newSource) - return nil -} - -func (serv *RPCServer) StartExecuting(a *rpctype.ExecutingRequest, r *int) error { - serv.statExecs.Add(1) - if a.Try != 0 { - serv.statExecRetries.Add(1) - } - runner := serv.findRunner(a.Name) - if runner == nil { - return nil - } - runner.mu.Lock() - req, ok := runner.requests[a.ID] - if !ok { - runner.mu.Unlock() - return nil - } - // RPC handlers are invoked in separate goroutines, so start executing notifications - // can outrun each other and completion notification. - if req.try < a.Try { - req.try = a.Try - req.procID = a.ProcID - } - runner.requests[a.ID] = req - runner.mu.Unlock() - runner.logProgram(a.ProcID, req.serialized) - return nil -} - -func (serv *RPCServer) ExchangeInfo(a *rpctype.ExchangeInfoRequest, r *rpctype.ExchangeInfoReply) error { - start := time.Now() - runner := serv.findRunner(a.Name) - if runner == nil { - return nil - } - // First query new inputs and only then post results. - // It should foster a more even distribution of executions - // across all VMs. - for len(r.Requests) < a.NeedProgs { - inp := serv.execSource.Next() - if inp == nil { - // It's unlikely that subsequent Next() calls will yield something. - break - } - if err := validateRequest(inp); err != nil { - panic(fmt.Sprintf("invalid request: %v, req: %#v", err, inp)) - } - if req, ok := serv.newRequest(runner, inp); ok { - r.Requests = append(r.Requests, req) - if inp.Risky() { - // We give crashed inputs only one more chance, so if we resend many of them at once, - // we'll never figure out the actual problematic input. - break - } - } else { - // It's bad if we systematically fail to serialize programs, - // but so far we don't have a better handling than counting this. - // This error is observed a lot on the seeded syz_mount_image calls. - serv.statExecBufferTooSmall.Add(1) - inp.Done(&queue.Result{Status: queue.ExecFailure}) - } - } - - for _, result := range a.Results { - serv.doneRequest(runner, result) - } - - stats.Import(a.StatsDelta) - - runner.mu.Lock() - // Let's transfer new max signal in portions. - - const transferMaxSignal = 500000 - newSignal := runner.newMaxSignal.Split(transferMaxSignal) - dropSignal := runner.dropMaxSignal.Split(transferMaxSignal) - runner.mu.Unlock() - - r.NewMaxSignal = runner.instModules.Decanonicalize(newSignal.ToRaw()) - r.DropMaxSignal = runner.instModules.Decanonicalize(dropSignal.ToRaw()) - - log.Logf(2, "exchange with %s: %d done, %d new requests, %d new max signal, %d drop signal", - a.Name, len(a.Results), len(r.Requests), len(r.NewMaxSignal), len(r.DropMaxSignal)) - - serv.statExchangeCalls.Add(1) - serv.statExchangeProgs.Add(a.NeedProgs) - serv.statExchangeClientLatency.Add(int(a.Latency.Microseconds())) - serv.statExchangeServerLatency.Add(int(time.Since(start).Microseconds())) + serv.checkDone.Store(true) return nil } @@ -411,77 +492,68 @@ func validateRequest(req *queue.Request) error { return nil } -func (serv *RPCServer) findRunner(name string) *Runner { - if val, _ := serv.runners.Load(name); val != nil { - runner := val.(*Runner) - if runner.stopFuzzing.Load() { - return nil - } - return runner - } - // There might be a parallel shutdownInstance(). - // Ignore requests then. - return nil -} - -func (serv *RPCServer) createInstance(name string, maxSignal signal.Signal, injectLog chan<- []byte) { +func (serv *RPCServer) createInstance(name string, injectLog chan<- []byte) { runner := &Runner{ - name: name, - requests: make(map[int64]Request), - newMaxSignal: maxSignal, - injectLog: injectLog, - injectStop: make(chan bool), + injectLog: injectLog, + injectStop: make(chan bool), + shutdown: make(chan bool, 1), + requests: make(map[int64]*queue.Request), + executing: make(map[int64]bool), } - if _, loaded := serv.runners.LoadOrStore(name, runner); loaded { + serv.mu.Lock() + if serv.runners[name] != nil { panic(fmt.Sprintf("duplicate instance %s", name)) } + serv.runners[name] = runner + serv.mu.Unlock() } // stopInstance prevents further request exchange requests. // To make RPCServer fully forget an instance, shutdownInstance() must be called. func (serv *RPCServer) stopFuzzing(name string) { - runner := serv.findRunner(name) - if runner == nil { - return + serv.mu.Lock() + runner := serv.runners[name] + runner.stopped = true + conn := runner.conn + serv.mu.Unlock() + close(runner.injectStop) + if conn != nil { + conn.Close() } - runner.stopFuzzing.Store(true) } func (serv *RPCServer) shutdownInstance(name string, crashed bool) []byte { - runnerPtr, _ := serv.runners.LoadAndDelete(name) - runner := runnerPtr.(*Runner) - runner.mu.Lock() - if runner.requests == nil { - // We are supposed to invoke this code only once. - panic("Runner.requests is already nil") - } - oldRequests := runner.requests - runner.requests = nil - runner.mu.Unlock() - - close(runner.injectStop) + serv.mu.Lock() + runner := serv.runners[name] + delete(serv.runners, name) + serv.mu.Unlock() + runner.shutdown <- crashed + return runner.machineInfo +} +func (serv *RPCServer) distributeSignalDelta(plus, minus signal.Signal) { + plusRaw := plus.ToRaw() + minusRaw := minus.ToRaw() serv.mu.Lock() defer serv.mu.Unlock() - for _, req := range oldRequests { - if crashed && req.try >= 0 { - req.req.Done(&queue.Result{Status: queue.Crashed}) - } else { - req.req.Done(&queue.Result{Status: queue.Restarted}) + for _, runner := range serv.runners { + if runner.conn != nil { + runner.sendSignalUpdate(plusRaw, minusRaw) } } - return runner.machineInfo } -func (serv *RPCServer) distributeSignalDelta(plus, minus signal.Signal) { - serv.runners.Range(func(key, value any) bool { - runner := value.(*Runner) - runner.mu.Lock() - defer runner.mu.Unlock() - runner.newMaxSignal.Merge(plus) - runner.dropMaxSignal.Merge(minus) - return true - }) +func (runner *Runner) sendSignalUpdate(plus, minus []uint32) error { + msg := &flatrpc.HostMessage{ + Msg: &flatrpc.HostMessages{ + Type: flatrpc.HostMessagesRawSignalUpdate, + Value: &flatrpc.SignalUpdate{ + NewMax: runner.canonicalizer.Decanonicalize(plus), + DropMax: runner.canonicalizer.Decanonicalize(minus), + }, + }, + } + return flatrpc.Send(runner.conn, msg) } func (serv *RPCServer) updateCoverFilter(newCover []uint32) { @@ -501,88 +573,6 @@ func (serv *RPCServer) updateCoverFilter(newCover []uint32) { serv.statCoverFiltered.Add(filtered) } -func (serv *RPCServer) doneRequest(runner *Runner, resp rpctype.ExecutionResult) { - info := &resp.Info - if info.Freshness == 0 { - serv.statExecutorRestarts.Add(1) - } - runner.mu.Lock() - req, ok := runner.requests[resp.ID] - if ok { - delete(runner.requests, resp.ID) - } - runner.mu.Unlock() - if !ok { - // There may be a concurrent shutdownInstance() call. - return - } - // RPC handlers are invoked in separate goroutines, so log the program here - // if completion notification outrun start executing notification. - if req.try < resp.Try { - runner.logProgram(resp.ProcID, req.serialized) - } - for i := 0; i < len(info.Calls); i++ { - call := &info.Calls[i] - call.Cover = runner.instModules.Canonicalize(call.Cover) - call.Signal = runner.instModules.Canonicalize(call.Signal) - } - info.Extra.Cover = runner.instModules.Canonicalize(info.Extra.Cover) - info.Extra.Signal = runner.instModules.Canonicalize(info.Extra.Signal) - - result := &queue.Result{ - Status: queue.Success, - Info: info, - Output: resp.Output, - } - if resp.Error != "" { - result.Status = queue.ExecFailure - result.Err = fmt.Errorf("%s", resp.Error) - } else if !serv.cfg.Cover && req.req.ExecOpts.ExecFlags&flatrpc.ExecFlagCollectSignal > 0 { - // Coverage collection is disabled, but signal was requested => use a substitute signal. - addFallbackSignal(req.req.Prog, info) - } - req.req.Done(result) -} - -func (serv *RPCServer) newRequest(runner *Runner, req *queue.Request) (rpctype.ExecutionRequest, bool) { - progData, err := req.Prog.SerializeForExec() - if err != nil { - return rpctype.ExecutionRequest{}, false - } - - // logProgram() may race with Done(), so let's serialize the program right now. - serialized := req.Prog.Serialize() - - var signalFilter signal.Signal - if req.SignalFilter != nil { - newRawSignal := runner.instModules.Decanonicalize(req.SignalFilter.ToRaw()) - // We don't care about specific priorities here. - signalFilter = signal.FromRaw(newRawSignal, 0) - } - runner.mu.Lock() - runner.nextRequestID++ - id := runner.nextRequestID - if runner.requests != nil { - runner.requests[id] = Request{ - req: req, - try: -1, - serialized: serialized, - } - } - runner.mu.Unlock() - return rpctype.ExecutionRequest{ - ID: id, - ProgData: progData, - ExecOpts: req.ExecOpts, - NewSignal: !req.ReturnAllSignal, - SignalFilter: signalFilter, - SignalFilterCall: req.SignalFilterCall, - ResetState: serv.cfg.Experimental.ResetAccState, - ReturnError: req.ReturnError, - ReturnOutput: req.ReturnOutput, - }, true -} - func (serv *RPCServer) execOpts() ipc.ExecOpts { env := ipc.FeaturesToFlags(serv.enabledFeatures, nil) if *flagDebug { @@ -611,34 +601,59 @@ func (serv *RPCServer) execOpts() ipc.ExecOpts { } } -func (runner *Runner) logProgram(procID int, serialized []byte) { - buf := new(bytes.Buffer) - fmt.Fprintf(buf, "executing program %v:\n%s\n", procID, serialized) - select { - case runner.injectLog <- buf.Bytes(): - case <-runner.injectStop: - } -} - // addFallbackSignal computes simple fallback signal in cases we don't have real coverage signal. // We use syscall number or-ed with returned errno value as signal. // At least this gives us all combinations of syscall+errno. -func addFallbackSignal(p *prog.Prog, info *ipc.ProgInfo) { +func addFallbackSignal(p *prog.Prog, info *flatrpc.ProgInfo) { callInfos := make([]prog.CallInfo, len(info.Calls)) for i, inf := range info.Calls { - if inf.Flags&ipc.CallExecuted != 0 { + if inf.Flags&flatrpc.CallFlagExecuted != 0 { callInfos[i].Flags |= prog.CallExecuted } - if inf.Flags&ipc.CallFinished != 0 { + if inf.Flags&flatrpc.CallFlagFinished != 0 { callInfos[i].Flags |= prog.CallFinished } - if inf.Flags&ipc.CallBlocked != 0 { + if inf.Flags&flatrpc.CallFlagBlocked != 0 { callInfos[i].Flags |= prog.CallBlocked } - callInfos[i].Errno = inf.Errno + callInfos[i].Errno = int(inf.Error) } p.FallbackSignal(callInfos) for i, inf := range callInfos { info.Calls[i].Signal = inf.Signal } } + +func convertProgInfo(info *flatrpc.ProgInfo) *ipc.ProgInfo { + if info == nil { + return nil + } + return &ipc.ProgInfo{ + Elapsed: time.Duration(info.Elapsed), + Freshness: int(info.Freshness), + Extra: convertCallInfo(info.Extra), + Calls: convertCalls(info), + } +} + +func convertCalls(info *flatrpc.ProgInfo) []ipc.CallInfo { + var calls []ipc.CallInfo + for _, call := range info.Calls { + calls = append(calls, convertCallInfo(call)) + } + return calls +} + +func convertCallInfo(info *flatrpc.CallInfo) ipc.CallInfo { + comps := make(prog.CompMap) + for _, comp := range info.Comps { + comps.AddComp(comp.Op1, comp.Op2) + } + return ipc.CallInfo{ + Flags: ipc.CallFlags(info.Flags), + Errno: int(info.Error), + Cover: info.Cover, + Signal: info.Signal, + Comps: comps, + } +} diff --git a/syz-manager/stats.go b/syz-manager/stats.go index d0d1eaa8f..77982434f 100644 --- a/syz-manager/stats.go +++ b/syz-manager/stats.go @@ -73,8 +73,4 @@ func (mgr *Manager) initStats() { }, func(v int, period time.Duration) string { return fmt.Sprintf("%v MB", v>>20) }) - - // Stats imported from the fuzzer (names must match the the fuzzer names). - stats.Create("no exec requests", "Number of times fuzzer was stalled with no exec requests", stats.Rate{}) - stats.Create("no exec duration", "Total duration fuzzer was stalled with no exec requests (ns/sec)", stats.Rate{}) } |
