aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--syz-fuzzer/fuzzer.go218
-rw-r--r--syz-fuzzer/fuzzer_test.go14
-rw-r--r--syz-fuzzer/proc.go132
-rw-r--r--syz-manager/manager.go27
-rw-r--r--syz-manager/rpc.go723
-rw-r--r--syz-manager/stats.go4
6 files changed, 558 insertions, 560 deletions
diff --git a/syz-fuzzer/fuzzer.go b/syz-fuzzer/fuzzer.go
index 6870583df..5feae7585 100644
--- a/syz-fuzzer/fuzzer.go
+++ b/syz-fuzzer/fuzzer.go
@@ -12,6 +12,7 @@ import (
"path/filepath"
"runtime"
"runtime/debug"
+ "slices"
"sync"
"sync/atomic"
"time"
@@ -22,7 +23,6 @@ import (
"github.com/google/syzkaller/pkg/ipc/ipcconfig"
"github.com/google/syzkaller/pkg/log"
"github.com/google/syzkaller/pkg/osutil"
- "github.com/google/syzkaller/pkg/rpctype"
"github.com/google/syzkaller/pkg/signal"
"github.com/google/syzkaller/pkg/tool"
"github.com/google/syzkaller/prog"
@@ -31,35 +31,19 @@ import (
)
type FuzzerTool struct {
- name string
+ conn *flatrpc.Conn
executor string
gate *ipc.Gate
- manager *rpctype.RPCClient
// TODO: repair triagedCandidates logic, it's broken now.
triagedCandidates uint32
timeouts targets.Timeouts
leakFrames []string
- noExecRequests atomic.Uint64
- noExecDuration atomic.Uint64
-
- requests chan rpctype.ExecutionRequest
- results chan executionResult
+ requests chan *flatrpc.ExecRequest
signalMu sync.RWMutex
maxSignal signal.Signal
}
-// executionResult offloads some computations from the proc loop
-// to the communication thread.
-type executionResult struct {
- rpctype.ExecutionRequest
- procID int
- try int
- info *ipc.ProgInfo
- output []byte
- err string
-}
-
// Gate size controls how deep in the log the last executed by every proc
// program may be. The intent is to make sure that, given the output log,
// we always understand what was happening.
@@ -121,69 +105,75 @@ func main() {
return
}
+ executorArch, executorSyzRevision, executorGitRevision, err := executorVersion(executor)
+ if err != nil {
+ log.SyzFatalf("failed to run executor version: %v ", err)
+ }
+
log.Logf(0, "dialing manager at %v", *flagManager)
- manager, err := rpctype.NewRPCClient(*flagManager)
+ conn, err := flatrpc.Dial(*flagManager, timeouts.Scale)
if err != nil {
- log.SyzFatalf("failed to create an RPC client: %v ", err)
+ log.SyzFatalf("failed to connect to host: %v ", err)
}
log.Logf(1, "connecting to manager...")
- a := &rpctype.ConnectArgs{
+ connectReq := &flatrpc.ConnectRequest{
Name: *flagName,
- GitRevision: prog.GitRevision,
- SyzRevision: target.Revision,
+ Arch: executorArch,
+ GitRevision: executorGitRevision,
+ SyzRevision: executorSyzRevision,
}
- a.ExecutorArch, a.ExecutorSyzRevision, a.ExecutorGitRevision, err = executorVersion(executor)
- if err != nil {
- log.SyzFatalf("failed to run executor version: %v ", err)
+ if err := flatrpc.Send(conn, connectReq); err != nil {
+ log.SyzFatal(err)
}
- r := &rpctype.ConnectRes{}
- if err := manager.Call("Manager.Connect", a, r); err != nil {
- log.SyzFatalf("failed to call Manager.Connect(): %v ", err)
+ connectReplyRaw, err := flatrpc.Recv[flatrpc.ConnectReplyRaw](conn)
+ if err != nil {
+ log.SyzFatal(err)
}
- checkReq := &rpctype.CheckArgs{
- Name: *flagName,
- Files: host.ReadFiles(r.ReadFiles),
- Globs: make(map[string][]string),
+ connectReply := connectReplyRaw.UnPack()
+
+ infoReq := &flatrpc.InfoRequest{
+ Files: host.ReadFiles(connectReply.Files),
}
- features, err := host.SetupFeatures(target, executor, r.Features, nil)
+ features, err := host.SetupFeatures(target, executor, connectReply.Features, nil)
if err != nil {
- log.SyzFatalf("failed to setup features: %v ", err)
+ infoReq.Error = fmt.Sprintf("failed to setup features: %v ", err)
}
- checkReq.Features = features
- for _, glob := range r.ReadGlobs {
+ infoReq.Features = features
+ for _, glob := range connectReply.Globs {
files, err := filepath.Glob(filepath.FromSlash(glob))
- if err != nil && checkReq.Error == "" {
- checkReq.Error = fmt.Sprintf("failed to read glob %q: %v", glob, err)
+ if err != nil && infoReq.Error == "" {
+ infoReq.Error = fmt.Sprintf("failed to read glob %q: %v", glob, err)
}
- checkReq.Globs[glob] = files
+ infoReq.Globs = append(infoReq.Globs, &flatrpc.GlobInfo{
+ Name: glob,
+ Files: files,
+ })
}
- checkRes := new(rpctype.CheckRes)
- if err := manager.Call("Manager.Check", checkReq, checkRes); err != nil {
- log.SyzFatalf("Manager.Check call failed: %v", err)
+ if err := flatrpc.Send(conn, infoReq); err != nil {
+ log.SyzFatal(err)
}
- if checkReq.Error != "" {
- log.SyzFatalf("%v", checkReq.Error)
+ infoReplyRaw, err := flatrpc.Recv[flatrpc.InfoReplyRaw](conn)
+ if err != nil {
+ log.SyzFatal(err)
}
+ infoReply := infoReplyRaw.UnPack()
- if checkRes.CoverFilterBitmap != nil {
- if err := osutil.WriteFile("syz-cover-bitmap", checkRes.CoverFilterBitmap); err != nil {
+ if len(infoReply.CoverFilter) != 0 {
+ if err := osutil.WriteFile("syz-cover-bitmap", infoReply.CoverFilter); err != nil {
log.SyzFatalf("failed to write syz-cover-bitmap: %v", err)
}
}
- inputsCount := *flagProcs * 2
fuzzerTool := &FuzzerTool{
- name: *flagName,
+ conn: conn,
executor: executor,
- manager: manager,
timeouts: timeouts,
- leakFrames: r.MemoryLeakFrames,
+ leakFrames: connectReply.LeakFrames,
- requests: make(chan rpctype.ExecutionRequest, 2*inputsCount),
- results: make(chan executionResult, 2*inputsCount),
+ requests: make(chan *flatrpc.ExecRequest, *flagProcs*4),
}
- fuzzerTool.filterDataRaceFrames(r.DataRaceFrames)
+ fuzzerTool.filterDataRaceFrames(connectReply.RaceFrames)
var gateCallback func()
for _, feat := range features {
if feat.Id == flatrpc.FeatureLeak && feat.Reason == "" {
@@ -197,10 +187,7 @@ func main() {
startProc(fuzzerTool, pid, config)
}
- // Query enough inputs at the beginning.
- fuzzerTool.exchangeDataCall(nil, 0)
- go fuzzerTool.exchangeDataWorker()
- fuzzerTool.exchangeDataWorker()
+ fuzzerTool.handleConn()
}
func (tool *FuzzerTool) leakGateCallback() {
@@ -241,101 +228,46 @@ func (tool *FuzzerTool) filterDataRaceFrames(frames []string) {
log.Logf(0, "%s", output)
}
-func (tool *FuzzerTool) startExecutingCall(progID int64, pid, try int) {
- tool.manager.AsyncCall("Manager.StartExecuting", &rpctype.ExecutingRequest{
- Name: tool.name,
- ID: progID,
- ProcID: pid,
- Try: try,
- })
-}
-
-func (tool *FuzzerTool) exchangeDataCall(results []rpctype.ExecutionResult, latency time.Duration) time.Duration {
- needProgs := max(0, cap(tool.requests)/2-len(tool.requests))
- a := &rpctype.ExchangeInfoRequest{
- Name: tool.name,
- NeedProgs: needProgs,
- Results: results,
- StatsDelta: tool.grabStats(),
- Latency: latency,
- }
- r := &rpctype.ExchangeInfoReply{}
- start := osutil.MonotonicNano()
- if err := tool.manager.Call("Manager.ExchangeInfo", a, r); err != nil {
- log.SyzFatalf("Manager.ExchangeInfo call failed: %v", err)
- }
- latency = osutil.MonotonicNano() - start
- tool.updateMaxSignal(r.NewMaxSignal, r.DropMaxSignal)
- if len(r.Requests) == 0 {
- // This is possible during initial checking stage, backoff a bit.
- time.Sleep(100 * time.Millisecond)
- }
- for _, req := range r.Requests {
- tool.requests <- req
+func (tool *FuzzerTool) startExecutingCall(progID int64, pid, try int, wait time.Duration) {
+ msg := &flatrpc.ExecutorMessage{
+ Msg: &flatrpc.ExecutorMessages{
+ Type: flatrpc.ExecutorMessagesRawExecuting,
+ Value: &flatrpc.ExecutingMessage{
+ Id: progID,
+ ProcId: int32(pid),
+ Try: int32(try),
+ WaitDuration: int64(wait),
+ },
+ },
+ }
+ if err := flatrpc.Send(tool.conn, msg); err != nil {
+ log.SyzFatal(err)
}
- return latency
}
-func (tool *FuzzerTool) exchangeDataWorker() {
- var latency time.Duration
- ticker := time.NewTicker(3 * time.Second * tool.timeouts.Scale).C
+func (tool *FuzzerTool) handleConn() {
for {
- var results []rpctype.ExecutionResult
- select {
- case res := <-tool.results:
- results = append(results, tool.convertExecutionResult(res))
- case <-ticker:
- // This is not expected to happen a lot,
- // but this is required to resolve potential deadlock
- // during initial checking stage when we may get
- // no test requests from the host in some requests.
+ raw, err := flatrpc.Recv[flatrpc.HostMessageRaw](tool.conn)
+ if err != nil {
+ log.SyzFatal(err)
}
- // Grab other finished calls, just in case there are any.
- loop:
- for {
- select {
- case res := <-tool.results:
- results = append(results, tool.convertExecutionResult(res))
- default:
- break loop
- }
+ switch msg := raw.UnPack().Msg.Value.(type) {
+ case *flatrpc.ExecRequest:
+ msg.ProgData = slices.Clone(msg.ProgData)
+ tool.requests <- msg
+ case *flatrpc.SignalUpdate:
+ tool.handleSignalUpdate(msg)
}
- // Replenish exactly the finished requests.
- latency = tool.exchangeDataCall(results, latency)
- }
-}
-
-func (tool *FuzzerTool) convertExecutionResult(res executionResult) rpctype.ExecutionResult {
- ret := rpctype.ExecutionResult{
- ID: res.ID,
- ProcID: res.procID,
- Try: res.try,
- Output: res.output,
- Error: res.err,
- }
- if res.info != nil {
- if res.NewSignal {
- tool.diffMaxSignal(res.info, res.SignalFilter, res.SignalFilterCall)
- }
- ret.Info = *res.info
- }
- return ret
-}
-
-func (tool *FuzzerTool) grabStats() map[string]uint64 {
- return map[string]uint64{
- "no exec requests": tool.noExecRequests.Swap(0),
- "no exec duration": tool.noExecDuration.Swap(0),
}
}
-func (tool *FuzzerTool) diffMaxSignal(info *ipc.ProgInfo, mask signal.Signal, maskCall int) {
+func (tool *FuzzerTool) diffMaxSignal(info *flatrpc.ProgInfo, mask signal.Signal, maskCall int) {
tool.signalMu.RLock()
defer tool.signalMu.RUnlock()
diffMaxSignal(info, tool.maxSignal, mask, maskCall)
}
-func diffMaxSignal(info *ipc.ProgInfo, max, mask signal.Signal, maskCall int) {
+func diffMaxSignal(info *flatrpc.ProgInfo, max, mask signal.Signal, maskCall int) {
info.Extra.Signal = diffCallSignal(info.Extra.Signal, max, mask, -1, maskCall)
for i := 0; i < len(info.Calls); i++ {
info.Calls[i].Signal = diffCallSignal(info.Calls[i].Signal, max, mask, i, maskCall)
@@ -349,11 +281,11 @@ func diffCallSignal(raw []uint32, max, mask signal.Signal, call, maskCall int) [
return max.DiffFromRaw(raw)
}
-func (tool *FuzzerTool) updateMaxSignal(add, drop []uint32) {
+func (tool *FuzzerTool) handleSignalUpdate(msg *flatrpc.SignalUpdate) {
tool.signalMu.Lock()
defer tool.signalMu.Unlock()
- tool.maxSignal.Subtract(signal.FromRaw(drop, 0))
- tool.maxSignal.Merge(signal.FromRaw(add, 0))
+ tool.maxSignal.Subtract(signal.FromRaw(msg.DropMax, 0))
+ tool.maxSignal.Merge(signal.FromRaw(msg.NewMax, 0))
}
func setupPprofHandler(port int) {
diff --git a/syz-fuzzer/fuzzer_test.go b/syz-fuzzer/fuzzer_test.go
index d43b7bc74..2acc70ab9 100644
--- a/syz-fuzzer/fuzzer_test.go
+++ b/syz-fuzzer/fuzzer_test.go
@@ -6,7 +6,7 @@ package main
import (
"testing"
- "github.com/google/syzkaller/pkg/ipc"
+ "github.com/google/syzkaller/pkg/flatrpc"
"github.com/google/syzkaller/pkg/signal"
"github.com/stretchr/testify/assert"
)
@@ -14,8 +14,8 @@ import (
func TestFilterProgInfo(t *testing.T) {
max := signal.FromRaw([]uint32{5, 6, 7}, 0)
mask := signal.FromRaw([]uint32{2, 4, 6, 8}, 0)
- info := ipc.ProgInfo{
- Calls: []ipc.CallInfo{
+ info := flatrpc.ProgInfo{
+ Calls: []*flatrpc.CallInfo{
{
Signal: []uint32{1, 2, 3, 5, 6},
Cover: []uint32{1, 2, 3},
@@ -25,14 +25,14 @@ func TestFilterProgInfo(t *testing.T) {
Cover: []uint32{2, 3, 4},
},
},
- Extra: ipc.CallInfo{
+ Extra: &flatrpc.CallInfo{
Signal: []uint32{3, 4, 5},
Cover: []uint32{3, 4, 5},
},
}
diffMaxSignal(&info, max, mask, 1)
- assert.Equal(t, ipc.ProgInfo{
- Calls: []ipc.CallInfo{
+ assert.Equal(t, flatrpc.ProgInfo{
+ Calls: []*flatrpc.CallInfo{
{
Signal: []uint32{1, 2, 3},
Cover: []uint32{1, 2, 3},
@@ -42,7 +42,7 @@ func TestFilterProgInfo(t *testing.T) {
Cover: []uint32{2, 3, 4},
},
},
- Extra: ipc.CallInfo{
+ Extra: &flatrpc.CallInfo{
Signal: []uint32{3, 4},
Cover: []uint32{3, 4, 5},
},
diff --git a/syz-fuzzer/proc.go b/syz-fuzzer/proc.go
index 6542225da..acc333b9b 100644
--- a/syz-fuzzer/proc.go
+++ b/syz-fuzzer/proc.go
@@ -15,7 +15,7 @@ import (
"github.com/google/syzkaller/pkg/ipc"
"github.com/google/syzkaller/pkg/log"
"github.com/google/syzkaller/pkg/osutil"
- "github.com/google/syzkaller/pkg/rpctype"
+ "github.com/google/syzkaller/pkg/signal"
)
// Proc represents a single fuzzing process (executor).
@@ -41,66 +41,74 @@ func startProc(tool *FuzzerTool, pid int, config *ipc.Config) {
func (proc *Proc) loop() {
rnd := rand.New(rand.NewSource(time.Now().UnixNano() + int64(proc.pid)))
for {
- req := proc.nextRequest()
+ req, wait := proc.nextRequest()
// Do not let too much state accumulate.
const restartIn = 600
resetFlags := flatrpc.ExecFlagCollectSignal | flatrpc.ExecFlagCollectCover | flatrpc.ExecFlagCollectComps
- if (req.ExecOpts.ExecFlags&resetFlags != 0 &&
- rnd.Intn(restartIn) == 0) || req.ResetState {
+ if (req.ExecFlags&resetFlags != 0 &&
+ rnd.Intn(restartIn) == 0) || req.Flags&flatrpc.RequestFlagResetState != 0 {
proc.env.ForceRestart()
}
- info, output, err, try := proc.execute(req)
- res := executionResult{
- ExecutionRequest: req,
- procID: proc.pid,
- try: try,
- info: info,
- output: output,
- err: err,
+ info, output, err := proc.execute(req, wait)
+ res := &flatrpc.ExecResult{
+ Id: req.Id,
+ Info: convertProgInfo(info),
+ Output: output,
+ Error: err,
}
- for i := 1; i < req.Repeat && res.err == "" && !req.IsBinary; i++ {
+ for i := 1; i < int(req.Repeat) && res.Error == "" && req.Flags&flatrpc.RequestFlagIsBinary == 0; i++ {
// Recreate Env every few iterations, this allows to cover more paths.
if i%2 == 0 {
proc.env.ForceRestart()
}
- info, output, err, _ := proc.execute(req)
- if res.info == nil {
- res.info = info
- } else {
- res.info.Calls = append(res.info.Calls, info.Calls...)
+ info, output, err := proc.execute(req, 0)
+ if res.Info == nil {
+ res.Info = convertProgInfo(info)
+ } else if info != nil {
+ res.Info.Calls = append(res.Info.Calls, convertCalls(info)...)
}
- res.output = append(res.output, output...)
- res.err = err
+ res.Output = append(res.Output, output...)
+ res.Error = err
+ }
+ if res.Info != nil && req.Flags&flatrpc.RequestFlagNewSignal != 0 {
+ filter := signal.FromRaw(req.SignalFilter, 0)
+ proc.tool.diffMaxSignal(res.Info, filter, int(req.SignalFilterCall))
+ }
+ msg := &flatrpc.ExecutorMessage{
+ Msg: &flatrpc.ExecutorMessages{
+ Type: flatrpc.ExecutorMessagesRawExecResult,
+ Value: res,
+ },
+ }
+ if err := flatrpc.Send(proc.tool.conn, msg); err != nil {
+ log.SyzFatal(err)
}
- // Let's perform signal filtering in a separate thread to get the most
- // exec/sec out of a syz-executor instance.
- proc.tool.results <- res
}
}
-func (proc *Proc) nextRequest() rpctype.ExecutionRequest {
+func (proc *Proc) nextRequest() (*flatrpc.ExecRequest, time.Duration) {
select {
case req := <-proc.tool.requests:
- return req
+ return req, 0
default:
}
// Not having enough inputs to execute is a sign of RPC communication problems.
// Let's count and report such situations.
start := osutil.MonotonicNano()
req := <-proc.tool.requests
- proc.tool.noExecDuration.Add(uint64(osutil.MonotonicNano() - start))
- proc.tool.noExecRequests.Add(1)
- return req
+ wait := osutil.MonotonicNano() - start
+ return req, wait
}
-func (proc *Proc) execute(req rpctype.ExecutionRequest) (info *ipc.ProgInfo, output []byte, errStr string, try int) {
+func (proc *Proc) execute(req *flatrpc.ExecRequest, wait time.Duration) (
+ info *ipc.ProgInfo, output []byte, errStr string) {
var err error
- if req.IsBinary {
+ if req.Flags&flatrpc.RequestFlagIsBinary != 0 {
output, err = executeBinary(req)
} else {
- info, output, try, err = proc.executeProgram(req)
+ info, output, err = proc.executeProgram(req, wait)
}
- if !req.ReturnOutput {
+ if req.Flags&flatrpc.RequestFlagReturnOutput == 0 {
output = nil
}
if err != nil {
@@ -109,30 +117,36 @@ func (proc *Proc) execute(req rpctype.ExecutionRequest) (info *ipc.ProgInfo, out
return
}
-func (proc *Proc) executeProgram(req rpctype.ExecutionRequest) (*ipc.ProgInfo, []byte, int, error) {
+func (proc *Proc) executeProgram(req *flatrpc.ExecRequest, wait time.Duration) (*ipc.ProgInfo, []byte, error) {
+ returnError := req.Flags&flatrpc.RequestFlagReturnError != 0
+ execOpts := &ipc.ExecOpts{
+ EnvFlags: req.ExecEnv,
+ ExecFlags: req.ExecFlags,
+ SandboxArg: int(req.SandboxArg),
+ }
for try := 0; ; try++ {
var output []byte
var info *ipc.ProgInfo
var hanged bool
// On a heavily loaded VM, syz-executor may take significant time to start.
// Let's do it outside of the gate ticket.
- err := proc.env.RestartIfNeeded(&req.ExecOpts)
+ err := proc.env.RestartIfNeeded(execOpts)
if err == nil {
// Limit concurrency.
ticket := proc.tool.gate.Enter()
- proc.tool.startExecutingCall(req.ID, proc.pid, try)
- output, info, hanged, err = proc.env.ExecProg(&req.ExecOpts, req.ProgData)
+ proc.tool.startExecutingCall(req.Id, proc.pid, try, wait)
+ output, info, hanged, err = proc.env.ExecProg(execOpts, req.ProgData)
proc.tool.gate.Leave(ticket)
// Don't print output if returning error b/c it may contain SYZFAIL.
- if !req.ReturnError {
+ if !returnError {
log.Logf(2, "result hanged=%v err=%v: %s", hanged, err, output)
}
- if hanged && err == nil && req.ReturnError {
+ if hanged && err == nil && returnError {
err = errors.New("hanged")
}
}
- if err == nil || req.ReturnError {
- return info, output, try, err
+ if err == nil || returnError {
+ return info, output, err
}
log.Logf(4, "fuzzer detected executor failure='%v', retrying #%d", err, try+1)
if try > 10 {
@@ -143,7 +157,7 @@ func (proc *Proc) executeProgram(req rpctype.ExecutionRequest) (*ipc.ProgInfo, [
}
}
-func executeBinary(req rpctype.ExecutionRequest) ([]byte, error) {
+func executeBinary(req *flatrpc.ExecRequest) ([]byte, error) {
tmp, err := os.MkdirTemp("", "syz-runtest")
if err != nil {
return nil, fmt.Errorf("failed to create temp dir: %w", err)
@@ -166,3 +180,39 @@ func executeBinary(req rpctype.ExecutionRequest) ([]byte, error) {
}
return output, err
}
+
+func convertProgInfo(info *ipc.ProgInfo) *flatrpc.ProgInfo {
+ if info == nil {
+ return nil
+ }
+ return &flatrpc.ProgInfo{
+ Elapsed: uint64(info.Elapsed),
+ Freshness: uint64(info.Freshness),
+ Extra: convertCallInfo(info.Extra),
+ Calls: convertCalls(info),
+ }
+}
+
+func convertCalls(info *ipc.ProgInfo) []*flatrpc.CallInfo {
+ var calls []*flatrpc.CallInfo
+ for _, call := range info.Calls {
+ calls = append(calls, convertCallInfo(call))
+ }
+ return calls
+}
+
+func convertCallInfo(info ipc.CallInfo) *flatrpc.CallInfo {
+ var comps []*flatrpc.Comparison
+ for op1, ops := range info.Comps {
+ for op2 := range ops {
+ comps = append(comps, &flatrpc.Comparison{Op1: op1, Op2: op2})
+ }
+ }
+ return &flatrpc.CallInfo{
+ Flags: flatrpc.CallFlag(info.Flags),
+ Error: int32(info.Errno),
+ Cover: info.Cover,
+ Signal: info.Signal,
+ Comps: comps,
+ }
+}
diff --git a/syz-manager/manager.go b/syz-manager/manager.go
index 460c65ba7..c85d59d36 100644
--- a/syz-manager/manager.go
+++ b/syz-manager/manager.go
@@ -724,14 +724,10 @@ func parseProgram(target *prog.Target, enabled map[*prog.Syscall]bool, data []by
func (mgr *Manager) runInstance(index int) (*Crash, error) {
mgr.checkUsedFiles()
- var maxSignal signal.Signal
- if fuzzer := mgr.fuzzer.Load(); fuzzer != nil {
- maxSignal = fuzzer.Cover.CopyMaxSignal()
- }
// Use unique instance names to prevent name collisions in case of untimely RPC messages.
instanceName := fmt.Sprintf("vm-%d", mgr.nextInstanceID.Add(1))
injectLog := make(chan []byte, 10)
- mgr.serv.createInstance(instanceName, maxSignal, injectLog)
+ mgr.serv.createInstance(instanceName, injectLog)
rep, vmInfo, err := mgr.runInstanceInner(index, instanceName, injectLog)
machineInfo := mgr.serv.shutdownInstance(instanceName, rep != nil)
@@ -1400,6 +1396,13 @@ func (mgr *Manager) corpusMinimization() {
}
}
+func (mgr *Manager) maxSignal() signal.Signal {
+ if fuzzer := mgr.fuzzer.Load(); fuzzer != nil {
+ return fuzzer.Cover.CopyMaxSignal()
+ }
+ return nil
+}
+
func (mgr *Manager) fuzzerSignalRotation() {
const (
rotateSignals = 1000
@@ -1426,12 +1429,14 @@ func (mgr *Manager) fuzzerSignalRotation() {
func (mgr *Manager) fuzzerLoop(fuzzer *fuzzer.Fuzzer) {
for ; ; time.Sleep(time.Second / 2) {
- // Distribute new max signal over all instances.
- newSignal, dropSignal := fuzzer.Cover.GrabSignalDelta()
- log.Logf(2, "distributing %d new signal, %d dropped signal",
- len(newSignal), len(dropSignal))
- if len(newSignal)+len(dropSignal) != 0 {
- mgr.serv.distributeSignalDelta(newSignal, dropSignal)
+ if mgr.cfg.Cover {
+ // Distribute new max signal over all instances.
+ newSignal, dropSignal := fuzzer.Cover.GrabSignalDelta()
+ log.Logf(2, "distributing %d new signal, %d dropped signal",
+ len(newSignal), len(dropSignal))
+ if len(newSignal)+len(dropSignal) != 0 {
+ mgr.serv.distributeSignalDelta(newSignal, dropSignal)
+ }
}
// Update the state machine.
diff --git a/syz-manager/rpc.go b/syz-manager/rpc.go
index 86b2a375d..2f26ac4c2 100644
--- a/syz-manager/rpc.go
+++ b/syz-manager/rpc.go
@@ -5,8 +5,9 @@ package main
import (
"bytes"
+ "errors"
"fmt"
- "net"
+ "slices"
"sort"
"strings"
"sync"
@@ -19,7 +20,6 @@ import (
"github.com/google/syzkaller/pkg/ipc"
"github.com/google/syzkaller/pkg/log"
"github.com/google/syzkaller/pkg/mgrconfig"
- "github.com/google/syzkaller/pkg/rpctype"
"github.com/google/syzkaller/pkg/signal"
"github.com/google/syzkaller/pkg/stats"
"github.com/google/syzkaller/pkg/vminfo"
@@ -30,11 +30,10 @@ type RPCServer struct {
mgr RPCManagerView
cfg *mgrconfig.Config
target *prog.Target
- server *rpctype.RPCServer
checker *vminfo.Checker
port int
- infoDone bool
+ infoOnce sync.Once
checkDone atomic.Bool
checkFailures int
baseSource *queue.DynamicSourceCtl
@@ -46,43 +45,31 @@ type RPCServer struct {
coverFilter map[uint32]uint32
mu sync.Mutex
- runners sync.Map // Instead of map[string]*Runner.
+ runners map[string]*Runner
execSource queue.Source
- statExecs *stats.Val
- statExecRetries *stats.Val
- statExecutorRestarts *stats.Val
- statExecBufferTooSmall *stats.Val
- statVMRestarts *stats.Val
- statExchangeCalls *stats.Val
- statExchangeProgs *stats.Val
- statExchangeServerLatency *stats.Val
- statExchangeClientLatency *stats.Val
- statCoverFiltered *stats.Val
+ statExecs *stats.Val
+ statExecRetries *stats.Val
+ statExecutorRestarts *stats.Val
+ statExecBufferTooSmall *stats.Val
+ statVMRestarts *stats.Val
+ statNoExecRequests *stats.Val
+ statNoExecDuration *stats.Val
+ statCoverFiltered *stats.Val
}
type Runner struct {
- name string
- injectLog chan<- []byte
- injectStop chan bool
- stopFuzzing atomic.Bool
-
- machineInfo []byte
- instModules *cover.CanonicalizerInstance
-
- // The mutex protects newMaxSignal, dropMaxSignal, and requests.
- mu sync.Mutex
- newMaxSignal signal.Signal
- dropMaxSignal signal.Signal
+ stopped bool
+ shutdown chan bool
+ injectLog chan<- []byte
+ injectStop chan bool
+ injectBuf bytes.Buffer
+ conn *flatrpc.Conn
+ machineInfo []byte
+ canonicalizer *cover.CanonicalizerInstance
nextRequestID int64
- requests map[int64]Request
-}
-
-type Request struct {
- req *queue.Request
- serialized []byte
- try int
- procID int
+ requests map[int64]*queue.Request
+ executing map[int64]bool
}
type BugFrames struct {
@@ -93,6 +80,7 @@ type BugFrames struct {
// RPCManagerView restricts interface between RPCServer and Manager.
type RPCManagerView interface {
currentBugFrames() BugFrames
+ maxSignal() signal.Signal
machineChecked(features flatrpc.Feature, enabledSyscalls map[*prog.Syscall]bool,
opts ipc.ExecOpts) queue.Source
}
@@ -104,6 +92,7 @@ func startRPCServer(mgr *Manager) (*RPCServer, error) {
mgr: mgr,
cfg: mgr.cfg,
target: mgr.target,
+ runners: make(map[string]*Runner),
checker: checker,
baseSource: baseSource,
execSource: queue.Retry(baseSource),
@@ -117,141 +106,320 @@ func startRPCServer(mgr *Manager) (*RPCServer, error) {
"Program serialization overflowed exec buffer", stats.NoGraph),
statVMRestarts: stats.Create("vm restarts", "Total number of VM starts",
stats.Rate{}, stats.NoGraph),
- statExchangeCalls: stats.Create("exchange calls", "Number of RPC Exchange calls",
- stats.Rate{}),
- statExchangeProgs: stats.Create("exchange progs", "Test programs exchanged per RPC call",
- stats.Distribution{}),
- statExchangeServerLatency: stats.Create("exchange manager latency",
- "Manager RPC Exchange call latency (us)", stats.Distribution{}),
- statExchangeClientLatency: stats.Create("exchange fuzzer latency",
- "End-to-end fuzzer RPC Exchange call latency (us)", stats.Distribution{}),
+ statNoExecRequests: stats.Create("no exec requests",
+ "Number of times fuzzer was stalled with no exec requests", stats.Rate{}),
+ statNoExecDuration: stats.Create("no exec duration",
+ "Total duration fuzzer was stalled with no exec requests (ns/sec)", stats.Rate{}),
statCoverFiltered: stats.Create("filtered coverage", "", stats.NoGraph),
}
- s, err := rpctype.NewRPCServer(mgr.cfg.RPC, "Manager", serv)
+ s, err := flatrpc.ListenAndServe(mgr.cfg.RPC, serv.handleConn)
if err != nil {
return nil, err
}
baseSource.Store(serv.checker)
- log.Logf(0, "serving rpc on tcp://%v", s.Addr())
- serv.port = s.Addr().(*net.TCPAddr).Port
- serv.server = s
- go s.Serve()
+ log.Logf(0, "serving rpc on tcp://%v", s.Addr)
+ serv.port = s.Addr.Port
return serv, nil
}
-func (serv *RPCServer) Connect(a *rpctype.ConnectArgs, r *rpctype.ConnectRes) error {
- log.Logf(1, "fuzzer %v connected", a.Name)
- checkRevisions(a, serv.cfg.Target)
- serv.statVMRestarts.Add(1)
-
- bugFrames := serv.mgr.currentBugFrames()
- r.MemoryLeakFrames = bugFrames.memoryLeaks
- r.DataRaceFrames = bugFrames.dataRaces
+func (serv *RPCServer) handleConn(conn *flatrpc.Conn) {
+ name, machineInfo, canonicalizer, err := serv.handshake(conn)
+ if err != nil {
+ log.Logf(1, "%v", err)
+ return
+ }
serv.mu.Lock()
- defer serv.mu.Unlock()
- r.ReadFiles = serv.checker.RequiredFiles()
- if serv.checkDone.Load() {
- r.Features = serv.setupFeatures
- } else {
- r.ReadFiles = append(r.ReadFiles, serv.checker.CheckFiles()...)
- r.ReadGlobs = serv.target.RequiredGlobs()
- r.Features = flatrpc.AllFeatures
+ runner := serv.runners[name]
+ if runner == nil || runner.stopped {
+ serv.mu.Unlock()
+ log.Logf(2, "VM %v shut down before connect", name)
+ return
}
- return nil
-}
+ runner.conn = conn
+ runner.machineInfo = machineInfo
+ runner.canonicalizer = canonicalizer
+ serv.mu.Unlock()
+
+ err = serv.connectionLoop(runner)
+ log.Logf(2, "runner %v: %v", name, err)
-func checkRevisions(a *rpctype.ConnectArgs, target *prog.Target) {
- if target.Arch != a.ExecutorArch {
- log.Fatalf("mismatching target/executor arches: %v vs %v", target.Arch, a.ExecutorArch)
+ crashed := <-runner.shutdown
+ for id, req := range runner.requests {
+ status := queue.Restarted
+ if crashed && runner.executing[id] {
+ status = queue.Crashed
+ }
+ req.Done(&queue.Result{Status: status})
}
- if prog.GitRevision != a.GitRevision {
- log.Fatalf("mismatching manager/fuzzer git revisions: %v vs %v",
- prog.GitRevision, a.GitRevision)
+}
+
+func (serv *RPCServer) handshake(conn *flatrpc.Conn) (string, []byte, *cover.CanonicalizerInstance, error) {
+ connectReqRaw, err := flatrpc.Recv[flatrpc.ConnectRequestRaw](conn)
+ if err != nil {
+ return "", nil, nil, err
}
- if prog.GitRevision != a.ExecutorGitRevision {
- log.Fatalf("mismatching manager/executor git revisions: %v vs %v",
- prog.GitRevision, a.ExecutorGitRevision)
+ connectReq := connectReqRaw.UnPack()
+ log.Logf(1, "fuzzer %v connected", connectReq.Name)
+ checkRevisions(connectReq, serv.cfg.Target)
+ serv.statVMRestarts.Add(1)
+
+ bugFrames := serv.mgr.currentBugFrames()
+ connectReply := &flatrpc.ConnectReply{
+ LeakFrames: bugFrames.memoryLeaks,
+ RaceFrames: bugFrames.dataRaces,
}
- if target.Revision != a.SyzRevision {
- log.Fatalf("mismatching manager/fuzzer system call descriptions: %v vs %v",
- target.Revision, a.SyzRevision)
+ connectReply.Files = serv.checker.RequiredFiles()
+ if serv.checkDone.Load() {
+ connectReply.Features = serv.setupFeatures
+ } else {
+ connectReply.Files = append(connectReply.Files, serv.checker.CheckFiles()...)
+ connectReply.Globs = serv.target.RequiredGlobs()
+ connectReply.Features = flatrpc.AllFeatures
}
- if target.Revision != a.ExecutorSyzRevision {
- log.Fatalf("mismatching manager/executor system call descriptions: %v vs %v",
- target.Revision, a.ExecutorSyzRevision)
+ if err := flatrpc.Send(conn, connectReply); err != nil {
+ return "", nil, nil, err
}
-}
-
-func (serv *RPCServer) Check(a *rpctype.CheckArgs, r *rpctype.CheckRes) error {
- serv.mu.Lock()
- defer serv.mu.Unlock()
- modules, machineInfo, err := serv.checker.MachineInfo(a.Files)
+ infoReqRaw, err := flatrpc.Recv[flatrpc.InfoRequestRaw](conn)
+ if err != nil {
+ return "", nil, nil, err
+ }
+ infoReq := infoReqRaw.UnPack()
+ modules, machineInfo, err := serv.checker.MachineInfo(infoReq.Files)
if err != nil {
log.Logf(0, "parsing of machine info failed: %v", err)
- if a.Error == "" {
- a.Error = err.Error()
+ if infoReq.Error == "" {
+ infoReq.Error = err.Error()
}
}
-
- if a.Error != "" {
- log.Logf(0, "machine check failed: %v", a.Error)
+ if infoReq.Error != "" {
+ log.Logf(0, "machine check failed: %v", infoReq.Error)
serv.checkFailures++
if serv.checkFailures == 10 {
log.Fatalf("machine check failing")
}
- return fmt.Errorf("machine check failed: %v", a.Error)
+ return "", nil, nil, errors.New("machine check failed")
}
- if !serv.infoDone {
- serv.infoDone = true
-
- // Now execute check programs.
- go serv.runCheck(a.Files, a.Features)
-
+ serv.infoOnce.Do(func() {
serv.modules = modules
- serv.target.UpdateGlobs(a.Globs)
serv.canonicalModules = cover.NewCanonicalizer(modules, serv.cfg.Cover)
var err error
serv.execCoverFilter, serv.coverFilter, err = createCoverageFilter(serv.cfg, modules)
if err != nil {
log.Fatalf("failed to init coverage filter: %v", err)
}
+ globs := make(map[string][]string)
+ for _, glob := range infoReq.Globs {
+ globs[glob.Name] = glob.Files
+ }
+ serv.target.UpdateGlobs(globs)
+ // Flatbuffers don't do deep copy of byte slices,
+ // so clone manually since we pass it a goroutine.
+ for _, file := range infoReq.Files {
+ file.Data = slices.Clone(file.Data)
+ }
+ // Now execute check programs.
+ go func() {
+ if err := serv.runCheck(infoReq.Files, infoReq.Features); err != nil {
+ log.Fatalf("check failed: %v", err)
+ }
+ }()
+ })
+
+ canonicalizer := serv.canonicalModules.NewInstance(modules)
+ instCoverFilter := canonicalizer.DecanonicalizeFilter(serv.execCoverFilter)
+ infoReply := &flatrpc.InfoReply{
+ CoverFilter: createCoverageBitmap(serv.cfg.SysTarget, instCoverFilter),
+ }
+ if err := flatrpc.Send(conn, infoReply); err != nil {
+ return "", nil, nil, err
}
+ return connectReq.Name, machineInfo, canonicalizer, nil
+}
- runner := serv.findRunner(a.Name)
- if runner == nil {
- // There may be a parallel shutdownInstance() call that removes the runner.
- return fmt.Errorf("unknown runner %s", a.Name)
+func (serv *RPCServer) connectionLoop(runner *Runner) error {
+ if serv.cfg.Cover {
+ maxSignal := serv.mgr.maxSignal().ToRaw()
+ for len(maxSignal) != 0 {
+ // Split coverage into batches to not grow the connection serialization
+ // buffer too much (we don't want to grow it larger than what will be needed
+ // to send programs).
+ n := min(len(maxSignal), 50000)
+ if err := runner.sendSignalUpdate(maxSignal[:n], nil); err != nil {
+ return err
+ }
+ maxSignal = maxSignal[n:]
+ }
+ }
+
+ for {
+ for len(runner.requests)-len(runner.executing) < 2*serv.cfg.Procs {
+ req := serv.execSource.Next()
+ if req == nil {
+ break
+ }
+ if err := serv.sendRequest(runner, req); err != nil {
+ return err
+ }
+ }
+ if len(runner.requests) == 0 {
+ // The runner has not requests at all, so don't wait to receive anything from it.
+ // This is only possible during the initial checking.
+ time.Sleep(10 * time.Millisecond)
+ continue
+ }
+ raw, err := flatrpc.Recv[flatrpc.ExecutorMessageRaw](runner.conn)
+ if err != nil {
+ return err
+ }
+ switch msg := raw.UnPack().Msg.Value.(type) {
+ case *flatrpc.ExecutingMessage:
+ err = serv.handleExecutingMessage(runner, msg)
+ case *flatrpc.ExecResult:
+ err = serv.handleExecResult(runner, msg)
+ default:
+ panic(fmt.Sprintf("unknown message %T", msg))
+ }
+ if err != nil {
+ return err
+ }
}
+}
- runner.mu.Lock()
- defer runner.mu.Unlock()
- if runner.machineInfo != nil {
- return fmt.Errorf("duplicate connection from %s", a.Name)
+func (serv *RPCServer) sendRequest(runner *Runner, req *queue.Request) error {
+ if err := validateRequest(req); err != nil {
+ panic(err)
}
- runner.machineInfo = machineInfo
- runner.instModules = serv.canonicalModules.NewInstance(modules)
- instCoverFilter := runner.instModules.DecanonicalizeFilter(serv.execCoverFilter)
- r.CoverFilterBitmap = createCoverageBitmap(serv.cfg.SysTarget, instCoverFilter)
+ progData, err := req.Prog.SerializeForExec()
+ if err != nil {
+ // It's bad if we systematically fail to serialize programs,
+ // but so far we don't have a better handling than counting this.
+ // This error is observed a lot on the seeded syz_mount_image calls.
+ serv.statExecBufferTooSmall.Add(1)
+ req.Done(&queue.Result{Status: queue.ExecFailure})
+ return nil
+ }
+ runner.nextRequestID++
+ id := runner.nextRequestID
+ var flags flatrpc.RequestFlag
+ if !req.ReturnAllSignal {
+ flags |= flatrpc.RequestFlagNewSignal
+ }
+ if req.ReturnOutput {
+ flags |= flatrpc.RequestFlagReturnOutput
+ }
+ if req.ReturnError {
+ flags |= flatrpc.RequestFlagReturnError
+ }
+ if serv.cfg.Experimental.ResetAccState {
+ flags |= flatrpc.RequestFlagResetState
+ }
+ signalFilter := runner.canonicalizer.Decanonicalize(req.SignalFilter.ToRaw())
+ msg := &flatrpc.HostMessage{
+ Msg: &flatrpc.HostMessages{
+ Type: flatrpc.HostMessagesRawExecRequest,
+ Value: &flatrpc.ExecRequest{
+ Id: id,
+ ProgData: progData,
+ Flags: flags,
+ ExecEnv: req.ExecOpts.EnvFlags,
+ ExecFlags: req.ExecOpts.ExecFlags,
+ SandboxArg: int64(req.ExecOpts.SandboxArg),
+ SignalFilter: signalFilter,
+ SignalFilterCall: int32(req.SignalFilterCall),
+ },
+ },
+ }
+ runner.requests[id] = req
+ return flatrpc.Send(runner.conn, msg)
+}
+
+func (serv *RPCServer) handleExecutingMessage(runner *Runner, msg *flatrpc.ExecutingMessage) error {
+ req := runner.requests[msg.Id]
+ if req == nil {
+ return fmt.Errorf("can't find executing request %v", msg.Id)
+ }
+ serv.statExecs.Add(1)
+ if msg.Try == 0 {
+ if msg.WaitDuration != 0 {
+ serv.statNoExecRequests.Add(1)
+ // Cap wait duration to 1 second to avoid extreme peaks on the graph
+ // which make it impossible to see real data (the rest becomes a flat line).
+ serv.statNoExecDuration.Add(int(min(msg.WaitDuration, 1e9)))
+ }
+ } else {
+ serv.statExecRetries.Add(1)
+ }
+ fmt.Fprintf(&runner.injectBuf, "executing program %v:\n%s\n", msg.ProcId, req.Prog.Serialize())
+ select {
+ case runner.injectLog <- runner.injectBuf.Bytes():
+ case <-runner.injectStop:
+ }
+ runner.injectBuf.Reset()
+ runner.executing[msg.Id] = true
return nil
}
-func (serv *RPCServer) runCheck(checkFilesInfo []*flatrpc.FileInfo, checkFeatureInfo []*flatrpc.FeatureInfo) {
- if err := serv.finishCheck(checkFilesInfo, checkFeatureInfo); err != nil {
- log.Fatalf("check failed: %v", err)
+func (serv *RPCServer) handleExecResult(runner *Runner, msg *flatrpc.ExecResult) error {
+ req := runner.requests[msg.Id]
+ if req == nil {
+ return fmt.Errorf("can't find executed request %v", msg.Id)
}
- serv.checkDone.Store(true)
+ delete(runner.requests, msg.Id)
+ delete(runner.executing, msg.Id)
+ if msg.Info != nil {
+ if msg.Info.Freshness == 0 {
+ serv.statExecutorRestarts.Add(1)
+ }
+ if !serv.cfg.Cover && req.ExecOpts.ExecFlags&flatrpc.ExecFlagCollectSignal != 0 {
+ // Coverage collection is disabled, but signal was requested => use a substitute signal.
+ addFallbackSignal(req.Prog, msg.Info)
+ }
+ for i := 0; i < len(msg.Info.Calls); i++ {
+ call := msg.Info.Calls[i]
+ call.Cover = runner.canonicalizer.Canonicalize(call.Cover)
+ call.Signal = runner.canonicalizer.Canonicalize(call.Signal)
+ }
+ msg.Info.Extra.Cover = runner.canonicalizer.Canonicalize(msg.Info.Extra.Cover)
+ msg.Info.Extra.Signal = runner.canonicalizer.Canonicalize(msg.Info.Extra.Signal)
+ }
+ status := queue.Success
+ var resErr error
+ if msg.Error != "" {
+ status = queue.ExecFailure
+ resErr = errors.New(msg.Error)
+ }
+ req.Done(&queue.Result{
+ Status: status,
+ Info: convertProgInfo(msg.Info),
+ Output: slices.Clone(msg.Output),
+ Err: resErr,
+ })
+ return nil
}
-func (serv *RPCServer) finishCheck(checkFilesInfo []*flatrpc.FileInfo, checkFeatureInfo []*flatrpc.FeatureInfo) error {
- // Note: need to print disbled syscalls before failing due to an error.
- // This helps to debug "all system calls are disabled".
+func checkRevisions(a *flatrpc.ConnectRequest, target *prog.Target) {
+ if target.Arch != a.Arch {
+ log.Fatalf("mismatching target/executor arches: %v vs %v", target.Arch, a.Arch)
+ }
+ if prog.GitRevision != a.GitRevision {
+ log.Fatalf("mismatching manager/fuzzer git revisions: %v vs %v",
+ prog.GitRevision, a.GitRevision)
+ }
+ if target.Revision != a.SyzRevision {
+ log.Fatalf("mismatching manager/fuzzer system call descriptions: %v vs %v",
+ target.Revision, a.SyzRevision)
+ }
+}
+func (serv *RPCServer) runCheck(checkFilesInfo []*flatrpc.FileInfo, checkFeatureInfo []*flatrpc.FeatureInfo) error {
enabledCalls, disabledCalls, features, checkErr := serv.checker.Run(checkFilesInfo, checkFeatureInfo)
enabledCalls, transitivelyDisabled := serv.target.TransitivelyEnabledCalls(enabledCalls)
+ // Note: need to print disbled syscalls before failing due to an error.
+ // This helps to debug "all system calls are disabled".
buf := new(bytes.Buffer)
if len(serv.cfg.EnabledSyscalls) != 0 || log.V(1) {
if len(disabledCalls) != 0 {
@@ -308,94 +476,7 @@ func (serv *RPCServer) finishCheck(checkFilesInfo []*flatrpc.FileInfo, checkFeat
serv.setupFeatures = features.NeedSetup()
newSource := serv.mgr.machineChecked(serv.enabledFeatures, enabledCalls, serv.execOpts())
serv.baseSource.Store(newSource)
- return nil
-}
-
-func (serv *RPCServer) StartExecuting(a *rpctype.ExecutingRequest, r *int) error {
- serv.statExecs.Add(1)
- if a.Try != 0 {
- serv.statExecRetries.Add(1)
- }
- runner := serv.findRunner(a.Name)
- if runner == nil {
- return nil
- }
- runner.mu.Lock()
- req, ok := runner.requests[a.ID]
- if !ok {
- runner.mu.Unlock()
- return nil
- }
- // RPC handlers are invoked in separate goroutines, so start executing notifications
- // can outrun each other and completion notification.
- if req.try < a.Try {
- req.try = a.Try
- req.procID = a.ProcID
- }
- runner.requests[a.ID] = req
- runner.mu.Unlock()
- runner.logProgram(a.ProcID, req.serialized)
- return nil
-}
-
-func (serv *RPCServer) ExchangeInfo(a *rpctype.ExchangeInfoRequest, r *rpctype.ExchangeInfoReply) error {
- start := time.Now()
- runner := serv.findRunner(a.Name)
- if runner == nil {
- return nil
- }
- // First query new inputs and only then post results.
- // It should foster a more even distribution of executions
- // across all VMs.
- for len(r.Requests) < a.NeedProgs {
- inp := serv.execSource.Next()
- if inp == nil {
- // It's unlikely that subsequent Next() calls will yield something.
- break
- }
- if err := validateRequest(inp); err != nil {
- panic(fmt.Sprintf("invalid request: %v, req: %#v", err, inp))
- }
- if req, ok := serv.newRequest(runner, inp); ok {
- r.Requests = append(r.Requests, req)
- if inp.Risky() {
- // We give crashed inputs only one more chance, so if we resend many of them at once,
- // we'll never figure out the actual problematic input.
- break
- }
- } else {
- // It's bad if we systematically fail to serialize programs,
- // but so far we don't have a better handling than counting this.
- // This error is observed a lot on the seeded syz_mount_image calls.
- serv.statExecBufferTooSmall.Add(1)
- inp.Done(&queue.Result{Status: queue.ExecFailure})
- }
- }
-
- for _, result := range a.Results {
- serv.doneRequest(runner, result)
- }
-
- stats.Import(a.StatsDelta)
-
- runner.mu.Lock()
- // Let's transfer new max signal in portions.
-
- const transferMaxSignal = 500000
- newSignal := runner.newMaxSignal.Split(transferMaxSignal)
- dropSignal := runner.dropMaxSignal.Split(transferMaxSignal)
- runner.mu.Unlock()
-
- r.NewMaxSignal = runner.instModules.Decanonicalize(newSignal.ToRaw())
- r.DropMaxSignal = runner.instModules.Decanonicalize(dropSignal.ToRaw())
-
- log.Logf(2, "exchange with %s: %d done, %d new requests, %d new max signal, %d drop signal",
- a.Name, len(a.Results), len(r.Requests), len(r.NewMaxSignal), len(r.DropMaxSignal))
-
- serv.statExchangeCalls.Add(1)
- serv.statExchangeProgs.Add(a.NeedProgs)
- serv.statExchangeClientLatency.Add(int(a.Latency.Microseconds()))
- serv.statExchangeServerLatency.Add(int(time.Since(start).Microseconds()))
+ serv.checkDone.Store(true)
return nil
}
@@ -411,77 +492,68 @@ func validateRequest(req *queue.Request) error {
return nil
}
-func (serv *RPCServer) findRunner(name string) *Runner {
- if val, _ := serv.runners.Load(name); val != nil {
- runner := val.(*Runner)
- if runner.stopFuzzing.Load() {
- return nil
- }
- return runner
- }
- // There might be a parallel shutdownInstance().
- // Ignore requests then.
- return nil
-}
-
-func (serv *RPCServer) createInstance(name string, maxSignal signal.Signal, injectLog chan<- []byte) {
+func (serv *RPCServer) createInstance(name string, injectLog chan<- []byte) {
runner := &Runner{
- name: name,
- requests: make(map[int64]Request),
- newMaxSignal: maxSignal,
- injectLog: injectLog,
- injectStop: make(chan bool),
+ injectLog: injectLog,
+ injectStop: make(chan bool),
+ shutdown: make(chan bool, 1),
+ requests: make(map[int64]*queue.Request),
+ executing: make(map[int64]bool),
}
- if _, loaded := serv.runners.LoadOrStore(name, runner); loaded {
+ serv.mu.Lock()
+ if serv.runners[name] != nil {
panic(fmt.Sprintf("duplicate instance %s", name))
}
+ serv.runners[name] = runner
+ serv.mu.Unlock()
}
// stopInstance prevents further request exchange requests.
// To make RPCServer fully forget an instance, shutdownInstance() must be called.
func (serv *RPCServer) stopFuzzing(name string) {
- runner := serv.findRunner(name)
- if runner == nil {
- return
+ serv.mu.Lock()
+ runner := serv.runners[name]
+ runner.stopped = true
+ conn := runner.conn
+ serv.mu.Unlock()
+ close(runner.injectStop)
+ if conn != nil {
+ conn.Close()
}
- runner.stopFuzzing.Store(true)
}
func (serv *RPCServer) shutdownInstance(name string, crashed bool) []byte {
- runnerPtr, _ := serv.runners.LoadAndDelete(name)
- runner := runnerPtr.(*Runner)
- runner.mu.Lock()
- if runner.requests == nil {
- // We are supposed to invoke this code only once.
- panic("Runner.requests is already nil")
- }
- oldRequests := runner.requests
- runner.requests = nil
- runner.mu.Unlock()
-
- close(runner.injectStop)
+ serv.mu.Lock()
+ runner := serv.runners[name]
+ delete(serv.runners, name)
+ serv.mu.Unlock()
+ runner.shutdown <- crashed
+ return runner.machineInfo
+}
+func (serv *RPCServer) distributeSignalDelta(plus, minus signal.Signal) {
+ plusRaw := plus.ToRaw()
+ minusRaw := minus.ToRaw()
serv.mu.Lock()
defer serv.mu.Unlock()
- for _, req := range oldRequests {
- if crashed && req.try >= 0 {
- req.req.Done(&queue.Result{Status: queue.Crashed})
- } else {
- req.req.Done(&queue.Result{Status: queue.Restarted})
+ for _, runner := range serv.runners {
+ if runner.conn != nil {
+ runner.sendSignalUpdate(plusRaw, minusRaw)
}
}
- return runner.machineInfo
}
-func (serv *RPCServer) distributeSignalDelta(plus, minus signal.Signal) {
- serv.runners.Range(func(key, value any) bool {
- runner := value.(*Runner)
- runner.mu.Lock()
- defer runner.mu.Unlock()
- runner.newMaxSignal.Merge(plus)
- runner.dropMaxSignal.Merge(minus)
- return true
- })
+func (runner *Runner) sendSignalUpdate(plus, minus []uint32) error {
+ msg := &flatrpc.HostMessage{
+ Msg: &flatrpc.HostMessages{
+ Type: flatrpc.HostMessagesRawSignalUpdate,
+ Value: &flatrpc.SignalUpdate{
+ NewMax: runner.canonicalizer.Decanonicalize(plus),
+ DropMax: runner.canonicalizer.Decanonicalize(minus),
+ },
+ },
+ }
+ return flatrpc.Send(runner.conn, msg)
}
func (serv *RPCServer) updateCoverFilter(newCover []uint32) {
@@ -501,88 +573,6 @@ func (serv *RPCServer) updateCoverFilter(newCover []uint32) {
serv.statCoverFiltered.Add(filtered)
}
-func (serv *RPCServer) doneRequest(runner *Runner, resp rpctype.ExecutionResult) {
- info := &resp.Info
- if info.Freshness == 0 {
- serv.statExecutorRestarts.Add(1)
- }
- runner.mu.Lock()
- req, ok := runner.requests[resp.ID]
- if ok {
- delete(runner.requests, resp.ID)
- }
- runner.mu.Unlock()
- if !ok {
- // There may be a concurrent shutdownInstance() call.
- return
- }
- // RPC handlers are invoked in separate goroutines, so log the program here
- // if completion notification outrun start executing notification.
- if req.try < resp.Try {
- runner.logProgram(resp.ProcID, req.serialized)
- }
- for i := 0; i < len(info.Calls); i++ {
- call := &info.Calls[i]
- call.Cover = runner.instModules.Canonicalize(call.Cover)
- call.Signal = runner.instModules.Canonicalize(call.Signal)
- }
- info.Extra.Cover = runner.instModules.Canonicalize(info.Extra.Cover)
- info.Extra.Signal = runner.instModules.Canonicalize(info.Extra.Signal)
-
- result := &queue.Result{
- Status: queue.Success,
- Info: info,
- Output: resp.Output,
- }
- if resp.Error != "" {
- result.Status = queue.ExecFailure
- result.Err = fmt.Errorf("%s", resp.Error)
- } else if !serv.cfg.Cover && req.req.ExecOpts.ExecFlags&flatrpc.ExecFlagCollectSignal > 0 {
- // Coverage collection is disabled, but signal was requested => use a substitute signal.
- addFallbackSignal(req.req.Prog, info)
- }
- req.req.Done(result)
-}
-
-func (serv *RPCServer) newRequest(runner *Runner, req *queue.Request) (rpctype.ExecutionRequest, bool) {
- progData, err := req.Prog.SerializeForExec()
- if err != nil {
- return rpctype.ExecutionRequest{}, false
- }
-
- // logProgram() may race with Done(), so let's serialize the program right now.
- serialized := req.Prog.Serialize()
-
- var signalFilter signal.Signal
- if req.SignalFilter != nil {
- newRawSignal := runner.instModules.Decanonicalize(req.SignalFilter.ToRaw())
- // We don't care about specific priorities here.
- signalFilter = signal.FromRaw(newRawSignal, 0)
- }
- runner.mu.Lock()
- runner.nextRequestID++
- id := runner.nextRequestID
- if runner.requests != nil {
- runner.requests[id] = Request{
- req: req,
- try: -1,
- serialized: serialized,
- }
- }
- runner.mu.Unlock()
- return rpctype.ExecutionRequest{
- ID: id,
- ProgData: progData,
- ExecOpts: req.ExecOpts,
- NewSignal: !req.ReturnAllSignal,
- SignalFilter: signalFilter,
- SignalFilterCall: req.SignalFilterCall,
- ResetState: serv.cfg.Experimental.ResetAccState,
- ReturnError: req.ReturnError,
- ReturnOutput: req.ReturnOutput,
- }, true
-}
-
func (serv *RPCServer) execOpts() ipc.ExecOpts {
env := ipc.FeaturesToFlags(serv.enabledFeatures, nil)
if *flagDebug {
@@ -611,34 +601,59 @@ func (serv *RPCServer) execOpts() ipc.ExecOpts {
}
}
-func (runner *Runner) logProgram(procID int, serialized []byte) {
- buf := new(bytes.Buffer)
- fmt.Fprintf(buf, "executing program %v:\n%s\n", procID, serialized)
- select {
- case runner.injectLog <- buf.Bytes():
- case <-runner.injectStop:
- }
-}
-
// addFallbackSignal computes simple fallback signal in cases we don't have real coverage signal.
// We use syscall number or-ed with returned errno value as signal.
// At least this gives us all combinations of syscall+errno.
-func addFallbackSignal(p *prog.Prog, info *ipc.ProgInfo) {
+func addFallbackSignal(p *prog.Prog, info *flatrpc.ProgInfo) {
callInfos := make([]prog.CallInfo, len(info.Calls))
for i, inf := range info.Calls {
- if inf.Flags&ipc.CallExecuted != 0 {
+ if inf.Flags&flatrpc.CallFlagExecuted != 0 {
callInfos[i].Flags |= prog.CallExecuted
}
- if inf.Flags&ipc.CallFinished != 0 {
+ if inf.Flags&flatrpc.CallFlagFinished != 0 {
callInfos[i].Flags |= prog.CallFinished
}
- if inf.Flags&ipc.CallBlocked != 0 {
+ if inf.Flags&flatrpc.CallFlagBlocked != 0 {
callInfos[i].Flags |= prog.CallBlocked
}
- callInfos[i].Errno = inf.Errno
+ callInfos[i].Errno = int(inf.Error)
}
p.FallbackSignal(callInfos)
for i, inf := range callInfos {
info.Calls[i].Signal = inf.Signal
}
}
+
+func convertProgInfo(info *flatrpc.ProgInfo) *ipc.ProgInfo {
+ if info == nil {
+ return nil
+ }
+ return &ipc.ProgInfo{
+ Elapsed: time.Duration(info.Elapsed),
+ Freshness: int(info.Freshness),
+ Extra: convertCallInfo(info.Extra),
+ Calls: convertCalls(info),
+ }
+}
+
+func convertCalls(info *flatrpc.ProgInfo) []ipc.CallInfo {
+ var calls []ipc.CallInfo
+ for _, call := range info.Calls {
+ calls = append(calls, convertCallInfo(call))
+ }
+ return calls
+}
+
+func convertCallInfo(info *flatrpc.CallInfo) ipc.CallInfo {
+ comps := make(prog.CompMap)
+ for _, comp := range info.Comps {
+ comps.AddComp(comp.Op1, comp.Op2)
+ }
+ return ipc.CallInfo{
+ Flags: ipc.CallFlags(info.Flags),
+ Errno: int(info.Error),
+ Cover: info.Cover,
+ Signal: info.Signal,
+ Comps: comps,
+ }
+}
diff --git a/syz-manager/stats.go b/syz-manager/stats.go
index d0d1eaa8f..77982434f 100644
--- a/syz-manager/stats.go
+++ b/syz-manager/stats.go
@@ -73,8 +73,4 @@ func (mgr *Manager) initStats() {
}, func(v int, period time.Duration) string {
return fmt.Sprintf("%v MB", v>>20)
})
-
- // Stats imported from the fuzzer (names must match the the fuzzer names).
- stats.Create("no exec requests", "Number of times fuzzer was stalled with no exec requests", stats.Rate{})
- stats.Create("no exec duration", "Total duration fuzzer was stalled with no exec requests (ns/sec)", stats.Rate{})
}