diff options
| -rw-r--r-- | syz-fuzzer/fuzzer.go | 218 | ||||
| -rw-r--r-- | syz-fuzzer/fuzzer_test.go | 14 | ||||
| -rw-r--r-- | syz-fuzzer/proc.go | 132 | ||||
| -rw-r--r-- | syz-manager/manager.go | 27 | ||||
| -rw-r--r-- | syz-manager/rpc.go | 723 | ||||
| -rw-r--r-- | syz-manager/stats.go | 4 |
6 files changed, 558 insertions, 560 deletions
diff --git a/syz-fuzzer/fuzzer.go b/syz-fuzzer/fuzzer.go index 6870583df..5feae7585 100644 --- a/syz-fuzzer/fuzzer.go +++ b/syz-fuzzer/fuzzer.go @@ -12,6 +12,7 @@ import ( "path/filepath" "runtime" "runtime/debug" + "slices" "sync" "sync/atomic" "time" @@ -22,7 +23,6 @@ import ( "github.com/google/syzkaller/pkg/ipc/ipcconfig" "github.com/google/syzkaller/pkg/log" "github.com/google/syzkaller/pkg/osutil" - "github.com/google/syzkaller/pkg/rpctype" "github.com/google/syzkaller/pkg/signal" "github.com/google/syzkaller/pkg/tool" "github.com/google/syzkaller/prog" @@ -31,35 +31,19 @@ import ( ) type FuzzerTool struct { - name string + conn *flatrpc.Conn executor string gate *ipc.Gate - manager *rpctype.RPCClient // TODO: repair triagedCandidates logic, it's broken now. triagedCandidates uint32 timeouts targets.Timeouts leakFrames []string - noExecRequests atomic.Uint64 - noExecDuration atomic.Uint64 - - requests chan rpctype.ExecutionRequest - results chan executionResult + requests chan *flatrpc.ExecRequest signalMu sync.RWMutex maxSignal signal.Signal } -// executionResult offloads some computations from the proc loop -// to the communication thread. -type executionResult struct { - rpctype.ExecutionRequest - procID int - try int - info *ipc.ProgInfo - output []byte - err string -} - // Gate size controls how deep in the log the last executed by every proc // program may be. The intent is to make sure that, given the output log, // we always understand what was happening. @@ -121,69 +105,75 @@ func main() { return } + executorArch, executorSyzRevision, executorGitRevision, err := executorVersion(executor) + if err != nil { + log.SyzFatalf("failed to run executor version: %v ", err) + } + log.Logf(0, "dialing manager at %v", *flagManager) - manager, err := rpctype.NewRPCClient(*flagManager) + conn, err := flatrpc.Dial(*flagManager, timeouts.Scale) if err != nil { - log.SyzFatalf("failed to create an RPC client: %v ", err) + log.SyzFatalf("failed to connect to host: %v ", err) } log.Logf(1, "connecting to manager...") - a := &rpctype.ConnectArgs{ + connectReq := &flatrpc.ConnectRequest{ Name: *flagName, - GitRevision: prog.GitRevision, - SyzRevision: target.Revision, + Arch: executorArch, + GitRevision: executorGitRevision, + SyzRevision: executorSyzRevision, } - a.ExecutorArch, a.ExecutorSyzRevision, a.ExecutorGitRevision, err = executorVersion(executor) - if err != nil { - log.SyzFatalf("failed to run executor version: %v ", err) + if err := flatrpc.Send(conn, connectReq); err != nil { + log.SyzFatal(err) } - r := &rpctype.ConnectRes{} - if err := manager.Call("Manager.Connect", a, r); err != nil { - log.SyzFatalf("failed to call Manager.Connect(): %v ", err) + connectReplyRaw, err := flatrpc.Recv[flatrpc.ConnectReplyRaw](conn) + if err != nil { + log.SyzFatal(err) } - checkReq := &rpctype.CheckArgs{ - Name: *flagName, - Files: host.ReadFiles(r.ReadFiles), - Globs: make(map[string][]string), + connectReply := connectReplyRaw.UnPack() + + infoReq := &flatrpc.InfoRequest{ + Files: host.ReadFiles(connectReply.Files), } - features, err := host.SetupFeatures(target, executor, r.Features, nil) + features, err := host.SetupFeatures(target, executor, connectReply.Features, nil) if err != nil { - log.SyzFatalf("failed to setup features: %v ", err) + infoReq.Error = fmt.Sprintf("failed to setup features: %v ", err) } - checkReq.Features = features - for _, glob := range r.ReadGlobs { + infoReq.Features = features + for _, glob := range connectReply.Globs { files, err := filepath.Glob(filepath.FromSlash(glob)) - if err != nil && checkReq.Error == "" { - checkReq.Error = fmt.Sprintf("failed to read glob %q: %v", glob, err) + if err != nil && infoReq.Error == "" { + infoReq.Error = fmt.Sprintf("failed to read glob %q: %v", glob, err) } - checkReq.Globs[glob] = files + infoReq.Globs = append(infoReq.Globs, &flatrpc.GlobInfo{ + Name: glob, + Files: files, + }) } - checkRes := new(rpctype.CheckRes) - if err := manager.Call("Manager.Check", checkReq, checkRes); err != nil { - log.SyzFatalf("Manager.Check call failed: %v", err) + if err := flatrpc.Send(conn, infoReq); err != nil { + log.SyzFatal(err) } - if checkReq.Error != "" { - log.SyzFatalf("%v", checkReq.Error) + infoReplyRaw, err := flatrpc.Recv[flatrpc.InfoReplyRaw](conn) + if err != nil { + log.SyzFatal(err) } + infoReply := infoReplyRaw.UnPack() - if checkRes.CoverFilterBitmap != nil { - if err := osutil.WriteFile("syz-cover-bitmap", checkRes.CoverFilterBitmap); err != nil { + if len(infoReply.CoverFilter) != 0 { + if err := osutil.WriteFile("syz-cover-bitmap", infoReply.CoverFilter); err != nil { log.SyzFatalf("failed to write syz-cover-bitmap: %v", err) } } - inputsCount := *flagProcs * 2 fuzzerTool := &FuzzerTool{ - name: *flagName, + conn: conn, executor: executor, - manager: manager, timeouts: timeouts, - leakFrames: r.MemoryLeakFrames, + leakFrames: connectReply.LeakFrames, - requests: make(chan rpctype.ExecutionRequest, 2*inputsCount), - results: make(chan executionResult, 2*inputsCount), + requests: make(chan *flatrpc.ExecRequest, *flagProcs*4), } - fuzzerTool.filterDataRaceFrames(r.DataRaceFrames) + fuzzerTool.filterDataRaceFrames(connectReply.RaceFrames) var gateCallback func() for _, feat := range features { if feat.Id == flatrpc.FeatureLeak && feat.Reason == "" { @@ -197,10 +187,7 @@ func main() { startProc(fuzzerTool, pid, config) } - // Query enough inputs at the beginning. - fuzzerTool.exchangeDataCall(nil, 0) - go fuzzerTool.exchangeDataWorker() - fuzzerTool.exchangeDataWorker() + fuzzerTool.handleConn() } func (tool *FuzzerTool) leakGateCallback() { @@ -241,101 +228,46 @@ func (tool *FuzzerTool) filterDataRaceFrames(frames []string) { log.Logf(0, "%s", output) } -func (tool *FuzzerTool) startExecutingCall(progID int64, pid, try int) { - tool.manager.AsyncCall("Manager.StartExecuting", &rpctype.ExecutingRequest{ - Name: tool.name, - ID: progID, - ProcID: pid, - Try: try, - }) -} - -func (tool *FuzzerTool) exchangeDataCall(results []rpctype.ExecutionResult, latency time.Duration) time.Duration { - needProgs := max(0, cap(tool.requests)/2-len(tool.requests)) - a := &rpctype.ExchangeInfoRequest{ - Name: tool.name, - NeedProgs: needProgs, - Results: results, - StatsDelta: tool.grabStats(), - Latency: latency, - } - r := &rpctype.ExchangeInfoReply{} - start := osutil.MonotonicNano() - if err := tool.manager.Call("Manager.ExchangeInfo", a, r); err != nil { - log.SyzFatalf("Manager.ExchangeInfo call failed: %v", err) - } - latency = osutil.MonotonicNano() - start - tool.updateMaxSignal(r.NewMaxSignal, r.DropMaxSignal) - if len(r.Requests) == 0 { - // This is possible during initial checking stage, backoff a bit. - time.Sleep(100 * time.Millisecond) - } - for _, req := range r.Requests { - tool.requests <- req +func (tool *FuzzerTool) startExecutingCall(progID int64, pid, try int, wait time.Duration) { + msg := &flatrpc.ExecutorMessage{ + Msg: &flatrpc.ExecutorMessages{ + Type: flatrpc.ExecutorMessagesRawExecuting, + Value: &flatrpc.ExecutingMessage{ + Id: progID, + ProcId: int32(pid), + Try: int32(try), + WaitDuration: int64(wait), + }, + }, + } + if err := flatrpc.Send(tool.conn, msg); err != nil { + log.SyzFatal(err) } - return latency } -func (tool *FuzzerTool) exchangeDataWorker() { - var latency time.Duration - ticker := time.NewTicker(3 * time.Second * tool.timeouts.Scale).C +func (tool *FuzzerTool) handleConn() { for { - var results []rpctype.ExecutionResult - select { - case res := <-tool.results: - results = append(results, tool.convertExecutionResult(res)) - case <-ticker: - // This is not expected to happen a lot, - // but this is required to resolve potential deadlock - // during initial checking stage when we may get - // no test requests from the host in some requests. + raw, err := flatrpc.Recv[flatrpc.HostMessageRaw](tool.conn) + if err != nil { + log.SyzFatal(err) } - // Grab other finished calls, just in case there are any. - loop: - for { - select { - case res := <-tool.results: - results = append(results, tool.convertExecutionResult(res)) - default: - break loop - } + switch msg := raw.UnPack().Msg.Value.(type) { + case *flatrpc.ExecRequest: + msg.ProgData = slices.Clone(msg.ProgData) + tool.requests <- msg + case *flatrpc.SignalUpdate: + tool.handleSignalUpdate(msg) } - // Replenish exactly the finished requests. - latency = tool.exchangeDataCall(results, latency) - } -} - -func (tool *FuzzerTool) convertExecutionResult(res executionResult) rpctype.ExecutionResult { - ret := rpctype.ExecutionResult{ - ID: res.ID, - ProcID: res.procID, - Try: res.try, - Output: res.output, - Error: res.err, - } - if res.info != nil { - if res.NewSignal { - tool.diffMaxSignal(res.info, res.SignalFilter, res.SignalFilterCall) - } - ret.Info = *res.info - } - return ret -} - -func (tool *FuzzerTool) grabStats() map[string]uint64 { - return map[string]uint64{ - "no exec requests": tool.noExecRequests.Swap(0), - "no exec duration": tool.noExecDuration.Swap(0), } } -func (tool *FuzzerTool) diffMaxSignal(info *ipc.ProgInfo, mask signal.Signal, maskCall int) { +func (tool *FuzzerTool) diffMaxSignal(info *flatrpc.ProgInfo, mask signal.Signal, maskCall int) { tool.signalMu.RLock() defer tool.signalMu.RUnlock() diffMaxSignal(info, tool.maxSignal, mask, maskCall) } -func diffMaxSignal(info *ipc.ProgInfo, max, mask signal.Signal, maskCall int) { +func diffMaxSignal(info *flatrpc.ProgInfo, max, mask signal.Signal, maskCall int) { info.Extra.Signal = diffCallSignal(info.Extra.Signal, max, mask, -1, maskCall) for i := 0; i < len(info.Calls); i++ { info.Calls[i].Signal = diffCallSignal(info.Calls[i].Signal, max, mask, i, maskCall) @@ -349,11 +281,11 @@ func diffCallSignal(raw []uint32, max, mask signal.Signal, call, maskCall int) [ return max.DiffFromRaw(raw) } -func (tool *FuzzerTool) updateMaxSignal(add, drop []uint32) { +func (tool *FuzzerTool) handleSignalUpdate(msg *flatrpc.SignalUpdate) { tool.signalMu.Lock() defer tool.signalMu.Unlock() - tool.maxSignal.Subtract(signal.FromRaw(drop, 0)) - tool.maxSignal.Merge(signal.FromRaw(add, 0)) + tool.maxSignal.Subtract(signal.FromRaw(msg.DropMax, 0)) + tool.maxSignal.Merge(signal.FromRaw(msg.NewMax, 0)) } func setupPprofHandler(port int) { diff --git a/syz-fuzzer/fuzzer_test.go b/syz-fuzzer/fuzzer_test.go index d43b7bc74..2acc70ab9 100644 --- a/syz-fuzzer/fuzzer_test.go +++ b/syz-fuzzer/fuzzer_test.go @@ -6,7 +6,7 @@ package main import ( "testing" - "github.com/google/syzkaller/pkg/ipc" + "github.com/google/syzkaller/pkg/flatrpc" "github.com/google/syzkaller/pkg/signal" "github.com/stretchr/testify/assert" ) @@ -14,8 +14,8 @@ import ( func TestFilterProgInfo(t *testing.T) { max := signal.FromRaw([]uint32{5, 6, 7}, 0) mask := signal.FromRaw([]uint32{2, 4, 6, 8}, 0) - info := ipc.ProgInfo{ - Calls: []ipc.CallInfo{ + info := flatrpc.ProgInfo{ + Calls: []*flatrpc.CallInfo{ { Signal: []uint32{1, 2, 3, 5, 6}, Cover: []uint32{1, 2, 3}, @@ -25,14 +25,14 @@ func TestFilterProgInfo(t *testing.T) { Cover: []uint32{2, 3, 4}, }, }, - Extra: ipc.CallInfo{ + Extra: &flatrpc.CallInfo{ Signal: []uint32{3, 4, 5}, Cover: []uint32{3, 4, 5}, }, } diffMaxSignal(&info, max, mask, 1) - assert.Equal(t, ipc.ProgInfo{ - Calls: []ipc.CallInfo{ + assert.Equal(t, flatrpc.ProgInfo{ + Calls: []*flatrpc.CallInfo{ { Signal: []uint32{1, 2, 3}, Cover: []uint32{1, 2, 3}, @@ -42,7 +42,7 @@ func TestFilterProgInfo(t *testing.T) { Cover: []uint32{2, 3, 4}, }, }, - Extra: ipc.CallInfo{ + Extra: &flatrpc.CallInfo{ Signal: []uint32{3, 4}, Cover: []uint32{3, 4, 5}, }, diff --git a/syz-fuzzer/proc.go b/syz-fuzzer/proc.go index 6542225da..acc333b9b 100644 --- a/syz-fuzzer/proc.go +++ b/syz-fuzzer/proc.go @@ -15,7 +15,7 @@ import ( "github.com/google/syzkaller/pkg/ipc" "github.com/google/syzkaller/pkg/log" "github.com/google/syzkaller/pkg/osutil" - "github.com/google/syzkaller/pkg/rpctype" + "github.com/google/syzkaller/pkg/signal" ) // Proc represents a single fuzzing process (executor). @@ -41,66 +41,74 @@ func startProc(tool *FuzzerTool, pid int, config *ipc.Config) { func (proc *Proc) loop() { rnd := rand.New(rand.NewSource(time.Now().UnixNano() + int64(proc.pid))) for { - req := proc.nextRequest() + req, wait := proc.nextRequest() // Do not let too much state accumulate. const restartIn = 600 resetFlags := flatrpc.ExecFlagCollectSignal | flatrpc.ExecFlagCollectCover | flatrpc.ExecFlagCollectComps - if (req.ExecOpts.ExecFlags&resetFlags != 0 && - rnd.Intn(restartIn) == 0) || req.ResetState { + if (req.ExecFlags&resetFlags != 0 && + rnd.Intn(restartIn) == 0) || req.Flags&flatrpc.RequestFlagResetState != 0 { proc.env.ForceRestart() } - info, output, err, try := proc.execute(req) - res := executionResult{ - ExecutionRequest: req, - procID: proc.pid, - try: try, - info: info, - output: output, - err: err, + info, output, err := proc.execute(req, wait) + res := &flatrpc.ExecResult{ + Id: req.Id, + Info: convertProgInfo(info), + Output: output, + Error: err, } - for i := 1; i < req.Repeat && res.err == "" && !req.IsBinary; i++ { + for i := 1; i < int(req.Repeat) && res.Error == "" && req.Flags&flatrpc.RequestFlagIsBinary == 0; i++ { // Recreate Env every few iterations, this allows to cover more paths. if i%2 == 0 { proc.env.ForceRestart() } - info, output, err, _ := proc.execute(req) - if res.info == nil { - res.info = info - } else { - res.info.Calls = append(res.info.Calls, info.Calls...) + info, output, err := proc.execute(req, 0) + if res.Info == nil { + res.Info = convertProgInfo(info) + } else if info != nil { + res.Info.Calls = append(res.Info.Calls, convertCalls(info)...) } - res.output = append(res.output, output...) - res.err = err + res.Output = append(res.Output, output...) + res.Error = err + } + if res.Info != nil && req.Flags&flatrpc.RequestFlagNewSignal != 0 { + filter := signal.FromRaw(req.SignalFilter, 0) + proc.tool.diffMaxSignal(res.Info, filter, int(req.SignalFilterCall)) + } + msg := &flatrpc.ExecutorMessage{ + Msg: &flatrpc.ExecutorMessages{ + Type: flatrpc.ExecutorMessagesRawExecResult, + Value: res, + }, + } + if err := flatrpc.Send(proc.tool.conn, msg); err != nil { + log.SyzFatal(err) } - // Let's perform signal filtering in a separate thread to get the most - // exec/sec out of a syz-executor instance. - proc.tool.results <- res } } -func (proc *Proc) nextRequest() rpctype.ExecutionRequest { +func (proc *Proc) nextRequest() (*flatrpc.ExecRequest, time.Duration) { select { case req := <-proc.tool.requests: - return req + return req, 0 default: } // Not having enough inputs to execute is a sign of RPC communication problems. // Let's count and report such situations. start := osutil.MonotonicNano() req := <-proc.tool.requests - proc.tool.noExecDuration.Add(uint64(osutil.MonotonicNano() - start)) - proc.tool.noExecRequests.Add(1) - return req + wait := osutil.MonotonicNano() - start + return req, wait } -func (proc *Proc) execute(req rpctype.ExecutionRequest) (info *ipc.ProgInfo, output []byte, errStr string, try int) { +func (proc *Proc) execute(req *flatrpc.ExecRequest, wait time.Duration) ( + info *ipc.ProgInfo, output []byte, errStr string) { var err error - if req.IsBinary { + if req.Flags&flatrpc.RequestFlagIsBinary != 0 { output, err = executeBinary(req) } else { - info, output, try, err = proc.executeProgram(req) + info, output, err = proc.executeProgram(req, wait) } - if !req.ReturnOutput { + if req.Flags&flatrpc.RequestFlagReturnOutput == 0 { output = nil } if err != nil { @@ -109,30 +117,36 @@ func (proc *Proc) execute(req rpctype.ExecutionRequest) (info *ipc.ProgInfo, out return } -func (proc *Proc) executeProgram(req rpctype.ExecutionRequest) (*ipc.ProgInfo, []byte, int, error) { +func (proc *Proc) executeProgram(req *flatrpc.ExecRequest, wait time.Duration) (*ipc.ProgInfo, []byte, error) { + returnError := req.Flags&flatrpc.RequestFlagReturnError != 0 + execOpts := &ipc.ExecOpts{ + EnvFlags: req.ExecEnv, + ExecFlags: req.ExecFlags, + SandboxArg: int(req.SandboxArg), + } for try := 0; ; try++ { var output []byte var info *ipc.ProgInfo var hanged bool // On a heavily loaded VM, syz-executor may take significant time to start. // Let's do it outside of the gate ticket. - err := proc.env.RestartIfNeeded(&req.ExecOpts) + err := proc.env.RestartIfNeeded(execOpts) if err == nil { // Limit concurrency. ticket := proc.tool.gate.Enter() - proc.tool.startExecutingCall(req.ID, proc.pid, try) - output, info, hanged, err = proc.env.ExecProg(&req.ExecOpts, req.ProgData) + proc.tool.startExecutingCall(req.Id, proc.pid, try, wait) + output, info, hanged, err = proc.env.ExecProg(execOpts, req.ProgData) proc.tool.gate.Leave(ticket) // Don't print output if returning error b/c it may contain SYZFAIL. - if !req.ReturnError { + if !returnError { log.Logf(2, "result hanged=%v err=%v: %s", hanged, err, output) } - if hanged && err == nil && req.ReturnError { + if hanged && err == nil && returnError { err = errors.New("hanged") } } - if err == nil || req.ReturnError { - return info, output, try, err + if err == nil || returnError { + return info, output, err } log.Logf(4, "fuzzer detected executor failure='%v', retrying #%d", err, try+1) if try > 10 { @@ -143,7 +157,7 @@ func (proc *Proc) executeProgram(req rpctype.ExecutionRequest) (*ipc.ProgInfo, [ } } -func executeBinary(req rpctype.ExecutionRequest) ([]byte, error) { +func executeBinary(req *flatrpc.ExecRequest) ([]byte, error) { tmp, err := os.MkdirTemp("", "syz-runtest") if err != nil { return nil, fmt.Errorf("failed to create temp dir: %w", err) @@ -166,3 +180,39 @@ func executeBinary(req rpctype.ExecutionRequest) ([]byte, error) { } return output, err } + +func convertProgInfo(info *ipc.ProgInfo) *flatrpc.ProgInfo { + if info == nil { + return nil + } + return &flatrpc.ProgInfo{ + Elapsed: uint64(info.Elapsed), + Freshness: uint64(info.Freshness), + Extra: convertCallInfo(info.Extra), + Calls: convertCalls(info), + } +} + +func convertCalls(info *ipc.ProgInfo) []*flatrpc.CallInfo { + var calls []*flatrpc.CallInfo + for _, call := range info.Calls { + calls = append(calls, convertCallInfo(call)) + } + return calls +} + +func convertCallInfo(info ipc.CallInfo) *flatrpc.CallInfo { + var comps []*flatrpc.Comparison + for op1, ops := range info.Comps { + for op2 := range ops { + comps = append(comps, &flatrpc.Comparison{Op1: op1, Op2: op2}) + } + } + return &flatrpc.CallInfo{ + Flags: flatrpc.CallFlag(info.Flags), + Error: int32(info.Errno), + Cover: info.Cover, + Signal: info.Signal, + Comps: comps, + } +} diff --git a/syz-manager/manager.go b/syz-manager/manager.go index 460c65ba7..c85d59d36 100644 --- a/syz-manager/manager.go +++ b/syz-manager/manager.go @@ -724,14 +724,10 @@ func parseProgram(target *prog.Target, enabled map[*prog.Syscall]bool, data []by func (mgr *Manager) runInstance(index int) (*Crash, error) { mgr.checkUsedFiles() - var maxSignal signal.Signal - if fuzzer := mgr.fuzzer.Load(); fuzzer != nil { - maxSignal = fuzzer.Cover.CopyMaxSignal() - } // Use unique instance names to prevent name collisions in case of untimely RPC messages. instanceName := fmt.Sprintf("vm-%d", mgr.nextInstanceID.Add(1)) injectLog := make(chan []byte, 10) - mgr.serv.createInstance(instanceName, maxSignal, injectLog) + mgr.serv.createInstance(instanceName, injectLog) rep, vmInfo, err := mgr.runInstanceInner(index, instanceName, injectLog) machineInfo := mgr.serv.shutdownInstance(instanceName, rep != nil) @@ -1400,6 +1396,13 @@ func (mgr *Manager) corpusMinimization() { } } +func (mgr *Manager) maxSignal() signal.Signal { + if fuzzer := mgr.fuzzer.Load(); fuzzer != nil { + return fuzzer.Cover.CopyMaxSignal() + } + return nil +} + func (mgr *Manager) fuzzerSignalRotation() { const ( rotateSignals = 1000 @@ -1426,12 +1429,14 @@ func (mgr *Manager) fuzzerSignalRotation() { func (mgr *Manager) fuzzerLoop(fuzzer *fuzzer.Fuzzer) { for ; ; time.Sleep(time.Second / 2) { - // Distribute new max signal over all instances. - newSignal, dropSignal := fuzzer.Cover.GrabSignalDelta() - log.Logf(2, "distributing %d new signal, %d dropped signal", - len(newSignal), len(dropSignal)) - if len(newSignal)+len(dropSignal) != 0 { - mgr.serv.distributeSignalDelta(newSignal, dropSignal) + if mgr.cfg.Cover { + // Distribute new max signal over all instances. + newSignal, dropSignal := fuzzer.Cover.GrabSignalDelta() + log.Logf(2, "distributing %d new signal, %d dropped signal", + len(newSignal), len(dropSignal)) + if len(newSignal)+len(dropSignal) != 0 { + mgr.serv.distributeSignalDelta(newSignal, dropSignal) + } } // Update the state machine. diff --git a/syz-manager/rpc.go b/syz-manager/rpc.go index 86b2a375d..2f26ac4c2 100644 --- a/syz-manager/rpc.go +++ b/syz-manager/rpc.go @@ -5,8 +5,9 @@ package main import ( "bytes" + "errors" "fmt" - "net" + "slices" "sort" "strings" "sync" @@ -19,7 +20,6 @@ import ( "github.com/google/syzkaller/pkg/ipc" "github.com/google/syzkaller/pkg/log" "github.com/google/syzkaller/pkg/mgrconfig" - "github.com/google/syzkaller/pkg/rpctype" "github.com/google/syzkaller/pkg/signal" "github.com/google/syzkaller/pkg/stats" "github.com/google/syzkaller/pkg/vminfo" @@ -30,11 +30,10 @@ type RPCServer struct { mgr RPCManagerView cfg *mgrconfig.Config target *prog.Target - server *rpctype.RPCServer checker *vminfo.Checker port int - infoDone bool + infoOnce sync.Once checkDone atomic.Bool checkFailures int baseSource *queue.DynamicSourceCtl @@ -46,43 +45,31 @@ type RPCServer struct { coverFilter map[uint32]uint32 mu sync.Mutex - runners sync.Map // Instead of map[string]*Runner. + runners map[string]*Runner execSource queue.Source - statExecs *stats.Val - statExecRetries *stats.Val - statExecutorRestarts *stats.Val - statExecBufferTooSmall *stats.Val - statVMRestarts *stats.Val - statExchangeCalls *stats.Val - statExchangeProgs *stats.Val - statExchangeServerLatency *stats.Val - statExchangeClientLatency *stats.Val - statCoverFiltered *stats.Val + statExecs *stats.Val + statExecRetries *stats.Val + statExecutorRestarts *stats.Val + statExecBufferTooSmall *stats.Val + statVMRestarts *stats.Val + statNoExecRequests *stats.Val + statNoExecDuration *stats.Val + statCoverFiltered *stats.Val } type Runner struct { - name string - injectLog chan<- []byte - injectStop chan bool - stopFuzzing atomic.Bool - - machineInfo []byte - instModules *cover.CanonicalizerInstance - - // The mutex protects newMaxSignal, dropMaxSignal, and requests. - mu sync.Mutex - newMaxSignal signal.Signal - dropMaxSignal signal.Signal + stopped bool + shutdown chan bool + injectLog chan<- []byte + injectStop chan bool + injectBuf bytes.Buffer + conn *flatrpc.Conn + machineInfo []byte + canonicalizer *cover.CanonicalizerInstance nextRequestID int64 - requests map[int64]Request -} - -type Request struct { - req *queue.Request - serialized []byte - try int - procID int + requests map[int64]*queue.Request + executing map[int64]bool } type BugFrames struct { @@ -93,6 +80,7 @@ type BugFrames struct { // RPCManagerView restricts interface between RPCServer and Manager. type RPCManagerView interface { currentBugFrames() BugFrames + maxSignal() signal.Signal machineChecked(features flatrpc.Feature, enabledSyscalls map[*prog.Syscall]bool, opts ipc.ExecOpts) queue.Source } @@ -104,6 +92,7 @@ func startRPCServer(mgr *Manager) (*RPCServer, error) { mgr: mgr, cfg: mgr.cfg, target: mgr.target, + runners: make(map[string]*Runner), checker: checker, baseSource: baseSource, execSource: queue.Retry(baseSource), @@ -117,141 +106,320 @@ func startRPCServer(mgr *Manager) (*RPCServer, error) { "Program serialization overflowed exec buffer", stats.NoGraph), statVMRestarts: stats.Create("vm restarts", "Total number of VM starts", stats.Rate{}, stats.NoGraph), - statExchangeCalls: stats.Create("exchange calls", "Number of RPC Exchange calls", - stats.Rate{}), - statExchangeProgs: stats.Create("exchange progs", "Test programs exchanged per RPC call", - stats.Distribution{}), - statExchangeServerLatency: stats.Create("exchange manager latency", - "Manager RPC Exchange call latency (us)", stats.Distribution{}), - statExchangeClientLatency: stats.Create("exchange fuzzer latency", - "End-to-end fuzzer RPC Exchange call latency (us)", stats.Distribution{}), + statNoExecRequests: stats.Create("no exec requests", + "Number of times fuzzer was stalled with no exec requests", stats.Rate{}), + statNoExecDuration: stats.Create("no exec duration", + "Total duration fuzzer was stalled with no exec requests (ns/sec)", stats.Rate{}), statCoverFiltered: stats.Create("filtered coverage", "", stats.NoGraph), } - s, err := rpctype.NewRPCServer(mgr.cfg.RPC, "Manager", serv) + s, err := flatrpc.ListenAndServe(mgr.cfg.RPC, serv.handleConn) if err != nil { return nil, err } baseSource.Store(serv.checker) - log.Logf(0, "serving rpc on tcp://%v", s.Addr()) - serv.port = s.Addr().(*net.TCPAddr).Port - serv.server = s - go s.Serve() + log.Logf(0, "serving rpc on tcp://%v", s.Addr) + serv.port = s.Addr.Port return serv, nil } -func (serv *RPCServer) Connect(a *rpctype.ConnectArgs, r *rpctype.ConnectRes) error { - log.Logf(1, "fuzzer %v connected", a.Name) - checkRevisions(a, serv.cfg.Target) - serv.statVMRestarts.Add(1) - - bugFrames := serv.mgr.currentBugFrames() - r.MemoryLeakFrames = bugFrames.memoryLeaks - r.DataRaceFrames = bugFrames.dataRaces +func (serv *RPCServer) handleConn(conn *flatrpc.Conn) { + name, machineInfo, canonicalizer, err := serv.handshake(conn) + if err != nil { + log.Logf(1, "%v", err) + return + } serv.mu.Lock() - defer serv.mu.Unlock() - r.ReadFiles = serv.checker.RequiredFiles() - if serv.checkDone.Load() { - r.Features = serv.setupFeatures - } else { - r.ReadFiles = append(r.ReadFiles, serv.checker.CheckFiles()...) - r.ReadGlobs = serv.target.RequiredGlobs() - r.Features = flatrpc.AllFeatures + runner := serv.runners[name] + if runner == nil || runner.stopped { + serv.mu.Unlock() + log.Logf(2, "VM %v shut down before connect", name) + return } - return nil -} + runner.conn = conn + runner.machineInfo = machineInfo + runner.canonicalizer = canonicalizer + serv.mu.Unlock() + + err = serv.connectionLoop(runner) + log.Logf(2, "runner %v: %v", name, err) -func checkRevisions(a *rpctype.ConnectArgs, target *prog.Target) { - if target.Arch != a.ExecutorArch { - log.Fatalf("mismatching target/executor arches: %v vs %v", target.Arch, a.ExecutorArch) + crashed := <-runner.shutdown + for id, req := range runner.requests { + status := queue.Restarted + if crashed && runner.executing[id] { + status = queue.Crashed + } + req.Done(&queue.Result{Status: status}) } - if prog.GitRevision != a.GitRevision { - log.Fatalf("mismatching manager/fuzzer git revisions: %v vs %v", - prog.GitRevision, a.GitRevision) +} + +func (serv *RPCServer) handshake(conn *flatrpc.Conn) (string, []byte, *cover.CanonicalizerInstance, error) { + connectReqRaw, err := flatrpc.Recv[flatrpc.ConnectRequestRaw](conn) + if err != nil { + return "", nil, nil, err } - if prog.GitRevision != a.ExecutorGitRevision { - log.Fatalf("mismatching manager/executor git revisions: %v vs %v", - prog.GitRevision, a.ExecutorGitRevision) + connectReq := connectReqRaw.UnPack() + log.Logf(1, "fuzzer %v connected", connectReq.Name) + checkRevisions(connectReq, serv.cfg.Target) + serv.statVMRestarts.Add(1) + + bugFrames := serv.mgr.currentBugFrames() + connectReply := &flatrpc.ConnectReply{ + LeakFrames: bugFrames.memoryLeaks, + RaceFrames: bugFrames.dataRaces, } - if target.Revision != a.SyzRevision { - log.Fatalf("mismatching manager/fuzzer system call descriptions: %v vs %v", - target.Revision, a.SyzRevision) + connectReply.Files = serv.checker.RequiredFiles() + if serv.checkDone.Load() { + connectReply.Features = serv.setupFeatures + } else { + connectReply.Files = append(connectReply.Files, serv.checker.CheckFiles()...) + connectReply.Globs = serv.target.RequiredGlobs() + connectReply.Features = flatrpc.AllFeatures } - if target.Revision != a.ExecutorSyzRevision { - log.Fatalf("mismatching manager/executor system call descriptions: %v vs %v", - target.Revision, a.ExecutorSyzRevision) + if err := flatrpc.Send(conn, connectReply); err != nil { + return "", nil, nil, err } -} - -func (serv *RPCServer) Check(a *rpctype.CheckArgs, r *rpctype.CheckRes) error { - serv.mu.Lock() - defer serv.mu.Unlock() - modules, machineInfo, err := serv.checker.MachineInfo(a.Files) + infoReqRaw, err := flatrpc.Recv[flatrpc.InfoRequestRaw](conn) + if err != nil { + return "", nil, nil, err + } + infoReq := infoReqRaw.UnPack() + modules, machineInfo, err := serv.checker.MachineInfo(infoReq.Files) if err != nil { log.Logf(0, "parsing of machine info failed: %v", err) - if a.Error == "" { - a.Error = err.Error() + if infoReq.Error == "" { + infoReq.Error = err.Error() } } - - if a.Error != "" { - log.Logf(0, "machine check failed: %v", a.Error) + if infoReq.Error != "" { + log.Logf(0, "machine check failed: %v", infoReq.Error) serv.checkFailures++ if serv.checkFailures == 10 { log.Fatalf("machine check failing") } - return fmt.Errorf("machine check failed: %v", a.Error) + return "", nil, nil, errors.New("machine check failed") } - if !serv.infoDone { - serv.infoDone = true - - // Now execute check programs. - go serv.runCheck(a.Files, a.Features) - + serv.infoOnce.Do(func() { serv.modules = modules - serv.target.UpdateGlobs(a.Globs) serv.canonicalModules = cover.NewCanonicalizer(modules, serv.cfg.Cover) var err error serv.execCoverFilter, serv.coverFilter, err = createCoverageFilter(serv.cfg, modules) if err != nil { log.Fatalf("failed to init coverage filter: %v", err) } + globs := make(map[string][]string) + for _, glob := range infoReq.Globs { + globs[glob.Name] = glob.Files + } + serv.target.UpdateGlobs(globs) + // Flatbuffers don't do deep copy of byte slices, + // so clone manually since we pass it a goroutine. + for _, file := range infoReq.Files { + file.Data = slices.Clone(file.Data) + } + // Now execute check programs. + go func() { + if err := serv.runCheck(infoReq.Files, infoReq.Features); err != nil { + log.Fatalf("check failed: %v", err) + } + }() + }) + + canonicalizer := serv.canonicalModules.NewInstance(modules) + instCoverFilter := canonicalizer.DecanonicalizeFilter(serv.execCoverFilter) + infoReply := &flatrpc.InfoReply{ + CoverFilter: createCoverageBitmap(serv.cfg.SysTarget, instCoverFilter), + } + if err := flatrpc.Send(conn, infoReply); err != nil { + return "", nil, nil, err } + return connectReq.Name, machineInfo, canonicalizer, nil +} - runner := serv.findRunner(a.Name) - if runner == nil { - // There may be a parallel shutdownInstance() call that removes the runner. - return fmt.Errorf("unknown runner %s", a.Name) +func (serv *RPCServer) connectionLoop(runner *Runner) error { + if serv.cfg.Cover { + maxSignal := serv.mgr.maxSignal().ToRaw() + for len(maxSignal) != 0 { + // Split coverage into batches to not grow the connection serialization + // buffer too much (we don't want to grow it larger than what will be needed + // to send programs). + n := min(len(maxSignal), 50000) + if err := runner.sendSignalUpdate(maxSignal[:n], nil); err != nil { + return err + } + maxSignal = maxSignal[n:] + } + } + + for { + for len(runner.requests)-len(runner.executing) < 2*serv.cfg.Procs { + req := serv.execSource.Next() + if req == nil { + break + } + if err := serv.sendRequest(runner, req); err != nil { + return err + } + } + if len(runner.requests) == 0 { + // The runner has not requests at all, so don't wait to receive anything from it. + // This is only possible during the initial checking. + time.Sleep(10 * time.Millisecond) + continue + } + raw, err := flatrpc.Recv[flatrpc.ExecutorMessageRaw](runner.conn) + if err != nil { + return err + } + switch msg := raw.UnPack().Msg.Value.(type) { + case *flatrpc.ExecutingMessage: + err = serv.handleExecutingMessage(runner, msg) + case *flatrpc.ExecResult: + err = serv.handleExecResult(runner, msg) + default: + panic(fmt.Sprintf("unknown message %T", msg)) + } + if err != nil { + return err + } } +} - runner.mu.Lock() - defer runner.mu.Unlock() - if runner.machineInfo != nil { - return fmt.Errorf("duplicate connection from %s", a.Name) +func (serv *RPCServer) sendRequest(runner *Runner, req *queue.Request) error { + if err := validateRequest(req); err != nil { + panic(err) } - runner.machineInfo = machineInfo - runner.instModules = serv.canonicalModules.NewInstance(modules) - instCoverFilter := runner.instModules.DecanonicalizeFilter(serv.execCoverFilter) - r.CoverFilterBitmap = createCoverageBitmap(serv.cfg.SysTarget, instCoverFilter) + progData, err := req.Prog.SerializeForExec() + if err != nil { + // It's bad if we systematically fail to serialize programs, + // but so far we don't have a better handling than counting this. + // This error is observed a lot on the seeded syz_mount_image calls. + serv.statExecBufferTooSmall.Add(1) + req.Done(&queue.Result{Status: queue.ExecFailure}) + return nil + } + runner.nextRequestID++ + id := runner.nextRequestID + var flags flatrpc.RequestFlag + if !req.ReturnAllSignal { + flags |= flatrpc.RequestFlagNewSignal + } + if req.ReturnOutput { + flags |= flatrpc.RequestFlagReturnOutput + } + if req.ReturnError { + flags |= flatrpc.RequestFlagReturnError + } + if serv.cfg.Experimental.ResetAccState { + flags |= flatrpc.RequestFlagResetState + } + signalFilter := runner.canonicalizer.Decanonicalize(req.SignalFilter.ToRaw()) + msg := &flatrpc.HostMessage{ + Msg: &flatrpc.HostMessages{ + Type: flatrpc.HostMessagesRawExecRequest, + Value: &flatrpc.ExecRequest{ + Id: id, + ProgData: progData, + Flags: flags, + ExecEnv: req.ExecOpts.EnvFlags, + ExecFlags: req.ExecOpts.ExecFlags, + SandboxArg: int64(req.ExecOpts.SandboxArg), + SignalFilter: signalFilter, + SignalFilterCall: int32(req.SignalFilterCall), + }, + }, + } + runner.requests[id] = req + return flatrpc.Send(runner.conn, msg) +} + +func (serv *RPCServer) handleExecutingMessage(runner *Runner, msg *flatrpc.ExecutingMessage) error { + req := runner.requests[msg.Id] + if req == nil { + return fmt.Errorf("can't find executing request %v", msg.Id) + } + serv.statExecs.Add(1) + if msg.Try == 0 { + if msg.WaitDuration != 0 { + serv.statNoExecRequests.Add(1) + // Cap wait duration to 1 second to avoid extreme peaks on the graph + // which make it impossible to see real data (the rest becomes a flat line). + serv.statNoExecDuration.Add(int(min(msg.WaitDuration, 1e9))) + } + } else { + serv.statExecRetries.Add(1) + } + fmt.Fprintf(&runner.injectBuf, "executing program %v:\n%s\n", msg.ProcId, req.Prog.Serialize()) + select { + case runner.injectLog <- runner.injectBuf.Bytes(): + case <-runner.injectStop: + } + runner.injectBuf.Reset() + runner.executing[msg.Id] = true return nil } -func (serv *RPCServer) runCheck(checkFilesInfo []*flatrpc.FileInfo, checkFeatureInfo []*flatrpc.FeatureInfo) { - if err := serv.finishCheck(checkFilesInfo, checkFeatureInfo); err != nil { - log.Fatalf("check failed: %v", err) +func (serv *RPCServer) handleExecResult(runner *Runner, msg *flatrpc.ExecResult) error { + req := runner.requests[msg.Id] + if req == nil { + return fmt.Errorf("can't find executed request %v", msg.Id) } - serv.checkDone.Store(true) + delete(runner.requests, msg.Id) + delete(runner.executing, msg.Id) + if msg.Info != nil { + if msg.Info.Freshness == 0 { + serv.statExecutorRestarts.Add(1) + } + if !serv.cfg.Cover && req.ExecOpts.ExecFlags&flatrpc.ExecFlagCollectSignal != 0 { + // Coverage collection is disabled, but signal was requested => use a substitute signal. + addFallbackSignal(req.Prog, msg.Info) + } + for i := 0; i < len(msg.Info.Calls); i++ { + call := msg.Info.Calls[i] + call.Cover = runner.canonicalizer.Canonicalize(call.Cover) + call.Signal = runner.canonicalizer.Canonicalize(call.Signal) + } + msg.Info.Extra.Cover = runner.canonicalizer.Canonicalize(msg.Info.Extra.Cover) + msg.Info.Extra.Signal = runner.canonicalizer.Canonicalize(msg.Info.Extra.Signal) + } + status := queue.Success + var resErr error + if msg.Error != "" { + status = queue.ExecFailure + resErr = errors.New(msg.Error) + } + req.Done(&queue.Result{ + Status: status, + Info: convertProgInfo(msg.Info), + Output: slices.Clone(msg.Output), + Err: resErr, + }) + return nil } -func (serv *RPCServer) finishCheck(checkFilesInfo []*flatrpc.FileInfo, checkFeatureInfo []*flatrpc.FeatureInfo) error { - // Note: need to print disbled syscalls before failing due to an error. - // This helps to debug "all system calls are disabled". +func checkRevisions(a *flatrpc.ConnectRequest, target *prog.Target) { + if target.Arch != a.Arch { + log.Fatalf("mismatching target/executor arches: %v vs %v", target.Arch, a.Arch) + } + if prog.GitRevision != a.GitRevision { + log.Fatalf("mismatching manager/fuzzer git revisions: %v vs %v", + prog.GitRevision, a.GitRevision) + } + if target.Revision != a.SyzRevision { + log.Fatalf("mismatching manager/fuzzer system call descriptions: %v vs %v", + target.Revision, a.SyzRevision) + } +} +func (serv *RPCServer) runCheck(checkFilesInfo []*flatrpc.FileInfo, checkFeatureInfo []*flatrpc.FeatureInfo) error { enabledCalls, disabledCalls, features, checkErr := serv.checker.Run(checkFilesInfo, checkFeatureInfo) enabledCalls, transitivelyDisabled := serv.target.TransitivelyEnabledCalls(enabledCalls) + // Note: need to print disbled syscalls before failing due to an error. + // This helps to debug "all system calls are disabled". buf := new(bytes.Buffer) if len(serv.cfg.EnabledSyscalls) != 0 || log.V(1) { if len(disabledCalls) != 0 { @@ -308,94 +476,7 @@ func (serv *RPCServer) finishCheck(checkFilesInfo []*flatrpc.FileInfo, checkFeat serv.setupFeatures = features.NeedSetup() newSource := serv.mgr.machineChecked(serv.enabledFeatures, enabledCalls, serv.execOpts()) serv.baseSource.Store(newSource) - return nil -} - -func (serv *RPCServer) StartExecuting(a *rpctype.ExecutingRequest, r *int) error { - serv.statExecs.Add(1) - if a.Try != 0 { - serv.statExecRetries.Add(1) - } - runner := serv.findRunner(a.Name) - if runner == nil { - return nil - } - runner.mu.Lock() - req, ok := runner.requests[a.ID] - if !ok { - runner.mu.Unlock() - return nil - } - // RPC handlers are invoked in separate goroutines, so start executing notifications - // can outrun each other and completion notification. - if req.try < a.Try { - req.try = a.Try - req.procID = a.ProcID - } - runner.requests[a.ID] = req - runner.mu.Unlock() - runner.logProgram(a.ProcID, req.serialized) - return nil -} - -func (serv *RPCServer) ExchangeInfo(a *rpctype.ExchangeInfoRequest, r *rpctype.ExchangeInfoReply) error { - start := time.Now() - runner := serv.findRunner(a.Name) - if runner == nil { - return nil - } - // First query new inputs and only then post results. - // It should foster a more even distribution of executions - // across all VMs. - for len(r.Requests) < a.NeedProgs { - inp := serv.execSource.Next() - if inp == nil { - // It's unlikely that subsequent Next() calls will yield something. - break - } - if err := validateRequest(inp); err != nil { - panic(fmt.Sprintf("invalid request: %v, req: %#v", err, inp)) - } - if req, ok := serv.newRequest(runner, inp); ok { - r.Requests = append(r.Requests, req) - if inp.Risky() { - // We give crashed inputs only one more chance, so if we resend many of them at once, - // we'll never figure out the actual problematic input. - break - } - } else { - // It's bad if we systematically fail to serialize programs, - // but so far we don't have a better handling than counting this. - // This error is observed a lot on the seeded syz_mount_image calls. - serv.statExecBufferTooSmall.Add(1) - inp.Done(&queue.Result{Status: queue.ExecFailure}) - } - } - - for _, result := range a.Results { - serv.doneRequest(runner, result) - } - - stats.Import(a.StatsDelta) - - runner.mu.Lock() - // Let's transfer new max signal in portions. - - const transferMaxSignal = 500000 - newSignal := runner.newMaxSignal.Split(transferMaxSignal) - dropSignal := runner.dropMaxSignal.Split(transferMaxSignal) - runner.mu.Unlock() - - r.NewMaxSignal = runner.instModules.Decanonicalize(newSignal.ToRaw()) - r.DropMaxSignal = runner.instModules.Decanonicalize(dropSignal.ToRaw()) - - log.Logf(2, "exchange with %s: %d done, %d new requests, %d new max signal, %d drop signal", - a.Name, len(a.Results), len(r.Requests), len(r.NewMaxSignal), len(r.DropMaxSignal)) - - serv.statExchangeCalls.Add(1) - serv.statExchangeProgs.Add(a.NeedProgs) - serv.statExchangeClientLatency.Add(int(a.Latency.Microseconds())) - serv.statExchangeServerLatency.Add(int(time.Since(start).Microseconds())) + serv.checkDone.Store(true) return nil } @@ -411,77 +492,68 @@ func validateRequest(req *queue.Request) error { return nil } -func (serv *RPCServer) findRunner(name string) *Runner { - if val, _ := serv.runners.Load(name); val != nil { - runner := val.(*Runner) - if runner.stopFuzzing.Load() { - return nil - } - return runner - } - // There might be a parallel shutdownInstance(). - // Ignore requests then. - return nil -} - -func (serv *RPCServer) createInstance(name string, maxSignal signal.Signal, injectLog chan<- []byte) { +func (serv *RPCServer) createInstance(name string, injectLog chan<- []byte) { runner := &Runner{ - name: name, - requests: make(map[int64]Request), - newMaxSignal: maxSignal, - injectLog: injectLog, - injectStop: make(chan bool), + injectLog: injectLog, + injectStop: make(chan bool), + shutdown: make(chan bool, 1), + requests: make(map[int64]*queue.Request), + executing: make(map[int64]bool), } - if _, loaded := serv.runners.LoadOrStore(name, runner); loaded { + serv.mu.Lock() + if serv.runners[name] != nil { panic(fmt.Sprintf("duplicate instance %s", name)) } + serv.runners[name] = runner + serv.mu.Unlock() } // stopInstance prevents further request exchange requests. // To make RPCServer fully forget an instance, shutdownInstance() must be called. func (serv *RPCServer) stopFuzzing(name string) { - runner := serv.findRunner(name) - if runner == nil { - return + serv.mu.Lock() + runner := serv.runners[name] + runner.stopped = true + conn := runner.conn + serv.mu.Unlock() + close(runner.injectStop) + if conn != nil { + conn.Close() } - runner.stopFuzzing.Store(true) } func (serv *RPCServer) shutdownInstance(name string, crashed bool) []byte { - runnerPtr, _ := serv.runners.LoadAndDelete(name) - runner := runnerPtr.(*Runner) - runner.mu.Lock() - if runner.requests == nil { - // We are supposed to invoke this code only once. - panic("Runner.requests is already nil") - } - oldRequests := runner.requests - runner.requests = nil - runner.mu.Unlock() - - close(runner.injectStop) + serv.mu.Lock() + runner := serv.runners[name] + delete(serv.runners, name) + serv.mu.Unlock() + runner.shutdown <- crashed + return runner.machineInfo +} +func (serv *RPCServer) distributeSignalDelta(plus, minus signal.Signal) { + plusRaw := plus.ToRaw() + minusRaw := minus.ToRaw() serv.mu.Lock() defer serv.mu.Unlock() - for _, req := range oldRequests { - if crashed && req.try >= 0 { - req.req.Done(&queue.Result{Status: queue.Crashed}) - } else { - req.req.Done(&queue.Result{Status: queue.Restarted}) + for _, runner := range serv.runners { + if runner.conn != nil { + runner.sendSignalUpdate(plusRaw, minusRaw) } } - return runner.machineInfo } -func (serv *RPCServer) distributeSignalDelta(plus, minus signal.Signal) { - serv.runners.Range(func(key, value any) bool { - runner := value.(*Runner) - runner.mu.Lock() - defer runner.mu.Unlock() - runner.newMaxSignal.Merge(plus) - runner.dropMaxSignal.Merge(minus) - return true - }) +func (runner *Runner) sendSignalUpdate(plus, minus []uint32) error { + msg := &flatrpc.HostMessage{ + Msg: &flatrpc.HostMessages{ + Type: flatrpc.HostMessagesRawSignalUpdate, + Value: &flatrpc.SignalUpdate{ + NewMax: runner.canonicalizer.Decanonicalize(plus), + DropMax: runner.canonicalizer.Decanonicalize(minus), + }, + }, + } + return flatrpc.Send(runner.conn, msg) } func (serv *RPCServer) updateCoverFilter(newCover []uint32) { @@ -501,88 +573,6 @@ func (serv *RPCServer) updateCoverFilter(newCover []uint32) { serv.statCoverFiltered.Add(filtered) } -func (serv *RPCServer) doneRequest(runner *Runner, resp rpctype.ExecutionResult) { - info := &resp.Info - if info.Freshness == 0 { - serv.statExecutorRestarts.Add(1) - } - runner.mu.Lock() - req, ok := runner.requests[resp.ID] - if ok { - delete(runner.requests, resp.ID) - } - runner.mu.Unlock() - if !ok { - // There may be a concurrent shutdownInstance() call. - return - } - // RPC handlers are invoked in separate goroutines, so log the program here - // if completion notification outrun start executing notification. - if req.try < resp.Try { - runner.logProgram(resp.ProcID, req.serialized) - } - for i := 0; i < len(info.Calls); i++ { - call := &info.Calls[i] - call.Cover = runner.instModules.Canonicalize(call.Cover) - call.Signal = runner.instModules.Canonicalize(call.Signal) - } - info.Extra.Cover = runner.instModules.Canonicalize(info.Extra.Cover) - info.Extra.Signal = runner.instModules.Canonicalize(info.Extra.Signal) - - result := &queue.Result{ - Status: queue.Success, - Info: info, - Output: resp.Output, - } - if resp.Error != "" { - result.Status = queue.ExecFailure - result.Err = fmt.Errorf("%s", resp.Error) - } else if !serv.cfg.Cover && req.req.ExecOpts.ExecFlags&flatrpc.ExecFlagCollectSignal > 0 { - // Coverage collection is disabled, but signal was requested => use a substitute signal. - addFallbackSignal(req.req.Prog, info) - } - req.req.Done(result) -} - -func (serv *RPCServer) newRequest(runner *Runner, req *queue.Request) (rpctype.ExecutionRequest, bool) { - progData, err := req.Prog.SerializeForExec() - if err != nil { - return rpctype.ExecutionRequest{}, false - } - - // logProgram() may race with Done(), so let's serialize the program right now. - serialized := req.Prog.Serialize() - - var signalFilter signal.Signal - if req.SignalFilter != nil { - newRawSignal := runner.instModules.Decanonicalize(req.SignalFilter.ToRaw()) - // We don't care about specific priorities here. - signalFilter = signal.FromRaw(newRawSignal, 0) - } - runner.mu.Lock() - runner.nextRequestID++ - id := runner.nextRequestID - if runner.requests != nil { - runner.requests[id] = Request{ - req: req, - try: -1, - serialized: serialized, - } - } - runner.mu.Unlock() - return rpctype.ExecutionRequest{ - ID: id, - ProgData: progData, - ExecOpts: req.ExecOpts, - NewSignal: !req.ReturnAllSignal, - SignalFilter: signalFilter, - SignalFilterCall: req.SignalFilterCall, - ResetState: serv.cfg.Experimental.ResetAccState, - ReturnError: req.ReturnError, - ReturnOutput: req.ReturnOutput, - }, true -} - func (serv *RPCServer) execOpts() ipc.ExecOpts { env := ipc.FeaturesToFlags(serv.enabledFeatures, nil) if *flagDebug { @@ -611,34 +601,59 @@ func (serv *RPCServer) execOpts() ipc.ExecOpts { } } -func (runner *Runner) logProgram(procID int, serialized []byte) { - buf := new(bytes.Buffer) - fmt.Fprintf(buf, "executing program %v:\n%s\n", procID, serialized) - select { - case runner.injectLog <- buf.Bytes(): - case <-runner.injectStop: - } -} - // addFallbackSignal computes simple fallback signal in cases we don't have real coverage signal. // We use syscall number or-ed with returned errno value as signal. // At least this gives us all combinations of syscall+errno. -func addFallbackSignal(p *prog.Prog, info *ipc.ProgInfo) { +func addFallbackSignal(p *prog.Prog, info *flatrpc.ProgInfo) { callInfos := make([]prog.CallInfo, len(info.Calls)) for i, inf := range info.Calls { - if inf.Flags&ipc.CallExecuted != 0 { + if inf.Flags&flatrpc.CallFlagExecuted != 0 { callInfos[i].Flags |= prog.CallExecuted } - if inf.Flags&ipc.CallFinished != 0 { + if inf.Flags&flatrpc.CallFlagFinished != 0 { callInfos[i].Flags |= prog.CallFinished } - if inf.Flags&ipc.CallBlocked != 0 { + if inf.Flags&flatrpc.CallFlagBlocked != 0 { callInfos[i].Flags |= prog.CallBlocked } - callInfos[i].Errno = inf.Errno + callInfos[i].Errno = int(inf.Error) } p.FallbackSignal(callInfos) for i, inf := range callInfos { info.Calls[i].Signal = inf.Signal } } + +func convertProgInfo(info *flatrpc.ProgInfo) *ipc.ProgInfo { + if info == nil { + return nil + } + return &ipc.ProgInfo{ + Elapsed: time.Duration(info.Elapsed), + Freshness: int(info.Freshness), + Extra: convertCallInfo(info.Extra), + Calls: convertCalls(info), + } +} + +func convertCalls(info *flatrpc.ProgInfo) []ipc.CallInfo { + var calls []ipc.CallInfo + for _, call := range info.Calls { + calls = append(calls, convertCallInfo(call)) + } + return calls +} + +func convertCallInfo(info *flatrpc.CallInfo) ipc.CallInfo { + comps := make(prog.CompMap) + for _, comp := range info.Comps { + comps.AddComp(comp.Op1, comp.Op2) + } + return ipc.CallInfo{ + Flags: ipc.CallFlags(info.Flags), + Errno: int(info.Error), + Cover: info.Cover, + Signal: info.Signal, + Comps: comps, + } +} diff --git a/syz-manager/stats.go b/syz-manager/stats.go index d0d1eaa8f..77982434f 100644 --- a/syz-manager/stats.go +++ b/syz-manager/stats.go @@ -73,8 +73,4 @@ func (mgr *Manager) initStats() { }, func(v int, period time.Duration) string { return fmt.Sprintf("%v MB", v>>20) }) - - // Stats imported from the fuzzer (names must match the the fuzzer names). - stats.Create("no exec requests", "Number of times fuzzer was stalled with no exec requests", stats.Rate{}) - stats.Create("no exec duration", "Total duration fuzzer was stalled with no exec requests (ns/sec)", stats.Rate{}) } |
