aboutsummaryrefslogtreecommitdiffstats
path: root/pkg/rpcserver
diff options
context:
space:
mode:
authorDmitry Vyukov <dvyukov@google.com>2024-06-04 12:55:41 +0200
committerDmitry Vyukov <dvyukov@google.com>2024-06-24 09:57:34 +0000
commite16e2c9a4cb6937323e861b646792a6c4c978a3c (patch)
tree6c513e98e5f465b44a98546d8984485d2c128582 /pkg/rpcserver
parent90d67044dab68568e8f35bc14b68055dbd166eff (diff)
executor: add runner mode
Move all syz-fuzzer logic into syz-executor and remove syz-fuzzer. Also restore syz-runtest functionality in the manager. Update #4917 (sets most signal handlers to SIG_IGN)
Diffstat (limited to 'pkg/rpcserver')
-rw-r--r--pkg/rpcserver/last_executing.go68
-rw-r--r--pkg/rpcserver/last_executing_test.go56
-rw-r--r--pkg/rpcserver/local.go138
-rw-r--r--pkg/rpcserver/rpcserver.go796
4 files changed, 1058 insertions, 0 deletions
diff --git a/pkg/rpcserver/last_executing.go b/pkg/rpcserver/last_executing.go
new file mode 100644
index 000000000..341ae6534
--- /dev/null
+++ b/pkg/rpcserver/last_executing.go
@@ -0,0 +1,68 @@
+// Copyright 2024 syzkaller project authors. All rights reserved.
+// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
+
+package rpcserver
+
+import (
+ "sort"
+ "time"
+)
+
+// LastExecuting keeps the given number of last executed programs
+// for each proc in a VM, and allows to query this set after a crash.
+type LastExecuting struct {
+ count int
+ procs []ExecRecord
+ positions []int
+}
+
+type ExecRecord struct {
+ ID int
+ Proc int
+ Prog []byte
+ Time time.Duration
+}
+
+func MakeLastExecuting(procs, count int) *LastExecuting {
+ return &LastExecuting{
+ count: count,
+ procs: make([]ExecRecord, procs*count),
+ positions: make([]int, procs),
+ }
+}
+
+// Note execution of the 'prog' on 'proc' at time 'now'.
+func (last *LastExecuting) Note(id, proc int, prog []byte, now time.Duration) {
+ pos := &last.positions[proc]
+ last.procs[proc*last.count+*pos] = ExecRecord{
+ ID: id,
+ Proc: proc,
+ Prog: prog,
+ Time: now,
+ }
+ *pos++
+ if *pos == last.count {
+ *pos = 0
+ }
+}
+
+// Returns a sorted set of last executing programs.
+// The records are sorted by time in ascending order.
+// ExecRecord.Time is the difference in start executing time between this
+// program and the program that started executing last.
+func (last *LastExecuting) Collect() []ExecRecord {
+ procs := last.procs
+ last.procs = nil // The type must not be used after this.
+ sort.Slice(procs, func(i, j int) bool {
+ return procs[i].Time < procs[j].Time
+ })
+ max := procs[len(procs)-1].Time
+ for i := len(procs) - 1; i >= 0; i-- {
+ if procs[i].Time == 0 {
+ procs = procs[i+1:]
+ break
+ }
+ procs[i].Time = max - procs[i].Time
+ }
+ return procs
+}
diff --git a/pkg/rpcserver/last_executing_test.go b/pkg/rpcserver/last_executing_test.go
new file mode 100644
index 000000000..c9f3cc2bf
--- /dev/null
+++ b/pkg/rpcserver/last_executing_test.go
@@ -0,0 +1,56 @@
+// Copyright 2024 syzkaller project authors. All rights reserved.
+// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
+
+package rpcserver
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestLastExecutingEmpty(t *testing.T) {
+ last := MakeLastExecuting(10, 10)
+ assert.Empty(t, last.Collect())
+}
+
+func TestLastExecuting(t *testing.T) {
+ last := MakeLastExecuting(10, 3)
+ last.Note(1, 0, []byte("prog1"), 1)
+
+ last.Note(2, 1, []byte("prog2"), 2)
+ last.Note(3, 1, []byte("prog3"), 3)
+
+ last.Note(4, 3, []byte("prog4"), 4)
+ last.Note(5, 3, []byte("prog5"), 5)
+ last.Note(6, 3, []byte("prog6"), 6)
+
+ last.Note(7, 7, []byte("prog7"), 7)
+ last.Note(8, 7, []byte("prog8"), 8)
+ last.Note(9, 7, []byte("prog9"), 9)
+ last.Note(10, 7, []byte("prog10"), 10)
+ last.Note(11, 7, []byte("prog11"), 11)
+
+ last.Note(12, 9, []byte("prog12"), 12)
+
+ last.Note(13, 8, []byte("prog13"), 13)
+
+ assert.Equal(t, last.Collect(), []ExecRecord{
+ {ID: 1, Proc: 0, Prog: []byte("prog1"), Time: 12},
+
+ {ID: 2, Proc: 1, Prog: []byte("prog2"), Time: 11},
+ {ID: 3, Proc: 1, Prog: []byte("prog3"), Time: 10},
+
+ {ID: 4, Proc: 3, Prog: []byte("prog4"), Time: 9},
+ {ID: 5, Proc: 3, Prog: []byte("prog5"), Time: 8},
+ {ID: 6, Proc: 3, Prog: []byte("prog6"), Time: 7},
+
+ {ID: 9, Proc: 7, Prog: []byte("prog9"), Time: 4},
+ {ID: 10, Proc: 7, Prog: []byte("prog10"), Time: 3},
+ {ID: 11, Proc: 7, Prog: []byte("prog11"), Time: 2},
+
+ {ID: 12, Proc: 9, Prog: []byte("prog12"), Time: 1},
+
+ {ID: 13, Proc: 8, Prog: []byte("prog13"), Time: 0},
+ })
+}
diff --git a/pkg/rpcserver/local.go b/pkg/rpcserver/local.go
new file mode 100644
index 000000000..da1de1fc0
--- /dev/null
+++ b/pkg/rpcserver/local.go
@@ -0,0 +1,138 @@
+// Copyright 2024 syzkaller project authors. All rights reserved.
+// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
+
+package rpcserver
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "os"
+ "os/exec"
+ "time"
+
+ "github.com/google/syzkaller/pkg/cover"
+ "github.com/google/syzkaller/pkg/flatrpc"
+ "github.com/google/syzkaller/pkg/fuzzer/queue"
+ "github.com/google/syzkaller/pkg/log"
+ "github.com/google/syzkaller/pkg/osutil"
+ "github.com/google/syzkaller/pkg/signal"
+ "github.com/google/syzkaller/prog"
+)
+
+type LocalConfig struct {
+ Config
+ // syz-executor binary.
+ Executor string
+ // Temp dir where to run executor process, it's up to the caller to clean it up if necessary.
+ Dir string
+ // Handle ctrl+C and exit.
+ HandleInterrupts bool
+ // Run executor under gdb.
+ GDB bool
+ // RunLocal exits when the context is cancelled.
+ Context context.Context
+ MachineChecked func(features flatrpc.Feature, syscalls map[*prog.Syscall]bool) queue.Source
+}
+
+func RunLocal(cfg *LocalConfig) error {
+ cfg.RPC = ":0"
+ cfg.VMLess = true
+ cfg.PrintMachineCheck = log.V(1)
+ ctx := &local{
+ cfg: cfg,
+ setupDone: make(chan bool),
+ }
+ serv, err := newImpl(&cfg.Config, ctx)
+ if err != nil {
+ return err
+ }
+ defer serv.Close()
+ ctx.serv = serv
+ // setupDone synchronizes assignment to ctx.serv and read of ctx.serv in MachineChecked
+ // for the race detector b/c it does not understand the synchronization via TCP socket connect/accept.
+ close(ctx.setupDone)
+
+ bin := cfg.Executor
+ args := []string{"runner", "local", "localhost", fmt.Sprint(serv.Port)}
+ if cfg.GDB {
+ bin = "gdb"
+ args = append([]string{
+ "--return-child-result",
+ "--ex=handle SIGPIPE nostop",
+ "--args",
+ cfg.Executor,
+ }, args...)
+ }
+ cmd := exec.Command(bin, args...)
+ cmd.Dir = cfg.Dir
+ if cfg.Debug || cfg.GDB {
+ cmd.Stdout = os.Stdout
+ cmd.Stderr = os.Stderr
+ }
+ if cfg.GDB {
+ cmd.Stdin = os.Stdin
+ }
+ if err := cmd.Start(); err != nil {
+ return fmt.Errorf("failed to start executor: %w", err)
+ }
+ res := make(chan error, 1)
+ go func() { res <- cmd.Wait() }()
+ shutdown := make(chan struct{})
+ if cfg.HandleInterrupts {
+ osutil.HandleInterrupts(shutdown)
+ }
+ var cmdErr error
+ select {
+ case <-shutdown:
+ case <-cfg.Context.Done():
+ case err := <-res:
+ cmdErr = fmt.Errorf("executor process exited: %w", err)
+ }
+ if cmdErr == nil {
+ cmd.Process.Kill()
+ <-res
+ }
+ if !cfg.HandleInterrupts {
+ // If the executor has crashed early, reply to all remaining requests to unblock tests.
+ loop:
+ for {
+ req := serv.execSource.Next()
+ if req == nil {
+ select {
+ case <-cfg.Context.Done():
+ break loop
+ default:
+ time.Sleep(time.Millisecond)
+ continue loop
+ }
+ }
+ req.Done(&queue.Result{Status: queue.ExecFailure, Err: errors.New("executor crashed")})
+ }
+ }
+ return cmdErr
+}
+
+type local struct {
+ cfg *LocalConfig
+ serv *Server
+ setupDone chan bool
+}
+
+func (ctx *local) MachineChecked(features flatrpc.Feature, syscalls map[*prog.Syscall]bool) queue.Source {
+ <-ctx.setupDone
+ ctx.serv.TriagedCorpus()
+ return ctx.cfg.MachineChecked(features, syscalls)
+}
+
+func (ctx *local) BugFrames() ([]string, []string) {
+ return nil, nil
+}
+
+func (ctx *local) MaxSignal() signal.Signal {
+ return nil
+}
+
+func (ctx *local) CoverageFilter(modules []*cover.KernelModule) []uint64 {
+ return nil
+}
diff --git a/pkg/rpcserver/rpcserver.go b/pkg/rpcserver/rpcserver.go
new file mode 100644
index 000000000..1b090126b
--- /dev/null
+++ b/pkg/rpcserver/rpcserver.go
@@ -0,0 +1,796 @@
+// Copyright 2024 syzkaller project authors. All rights reserved.
+// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
+
+package rpcserver
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+ "maps"
+ "math/rand"
+ "os"
+ "slices"
+ "sort"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/google/syzkaller/pkg/cover"
+ "github.com/google/syzkaller/pkg/flatrpc"
+ "github.com/google/syzkaller/pkg/fuzzer/queue"
+ "github.com/google/syzkaller/pkg/log"
+ "github.com/google/syzkaller/pkg/mgrconfig"
+ "github.com/google/syzkaller/pkg/osutil"
+ "github.com/google/syzkaller/pkg/signal"
+ "github.com/google/syzkaller/pkg/stats"
+ "github.com/google/syzkaller/pkg/vminfo"
+ "github.com/google/syzkaller/prog"
+ "github.com/google/syzkaller/sys/targets"
+)
+
+type Config struct {
+ vminfo.Config
+ RPC string
+ VMLess bool
+ PrintMachineCheck bool
+ Procs int
+ Slowdown int
+}
+
+type Manager interface {
+ MaxSignal() signal.Signal
+ BugFrames() (leaks []string, races []string)
+ MachineChecked(features flatrpc.Feature, syscalls map[*prog.Syscall]bool) queue.Source
+ CoverageFilter(modules []*cover.KernelModule) []uint64
+}
+
+type Server struct {
+ Port int
+ StatExecs *stats.Val
+ StatNumFuzzing *stats.Val
+
+ cfg *Config
+ mgr Manager
+ serv *flatrpc.Serv
+ target *prog.Target
+ timeouts targets.Timeouts
+ checker *vminfo.Checker
+
+ infoOnce sync.Once
+ checkDone atomic.Bool
+ checkFailures int
+ baseSource *queue.DynamicSourceCtl
+ enabledFeatures flatrpc.Feature
+ setupFeatures flatrpc.Feature
+ modules []*cover.KernelModule
+ canonicalModules *cover.Canonicalizer
+ coverFilter []uint64
+
+ mu sync.Mutex
+ runners map[string]*Runner
+ info map[string]VMState
+ execSource queue.Source
+ triagedCorpus atomic.Bool
+
+ statExecRetries *stats.Val
+ statExecutorRestarts *stats.Val
+ statExecBufferTooSmall *stats.Val
+ statVMRestarts *stats.Val
+ statNoExecRequests *stats.Val
+ statNoExecDuration *stats.Val
+}
+
+type Runner struct {
+ stopped bool
+ finished chan bool
+ injectExec chan<- bool
+ infoc chan chan []byte
+ conn *flatrpc.Conn
+ machineInfo []byte
+ canonicalizer *cover.CanonicalizerInstance
+ nextRequestID int64
+ requests map[int64]*queue.Request
+ executing map[int64]bool
+ lastExec *LastExecuting
+ rnd *rand.Rand
+}
+
+func New(cfg *mgrconfig.Config, mgr Manager, debug bool) (*Server, error) {
+ sandbox, err := flatrpc.SandboxToFlags(cfg.Sandbox)
+ if err != nil {
+ return nil, err
+ }
+ return newImpl(&Config{
+ Config: vminfo.Config{
+ Target: cfg.Target,
+ Features: flatrpc.AllFeatures,
+ Syscalls: cfg.Syscalls,
+ Debug: debug,
+ Cover: cfg.Cover,
+ Sandbox: sandbox,
+ SandboxArg: cfg.SandboxArg,
+ },
+ RPC: cfg.RPC,
+ VMLess: cfg.VMLess,
+ PrintMachineCheck: true,
+ Procs: cfg.Procs,
+ Slowdown: cfg.Timeouts.Slowdown,
+ }, mgr)
+}
+
+func newImpl(cfg *Config, mgr Manager) (*Server, error) {
+ cfg.Procs = min(cfg.Procs, prog.MaxPids)
+ checker := vminfo.New(&cfg.Config)
+ baseSource := queue.DynamicSource(checker)
+ serv := &Server{
+ cfg: cfg,
+ mgr: mgr,
+ target: cfg.Target,
+ timeouts: targets.Get(cfg.Target.OS, cfg.Target.Arch).Timeouts(cfg.Slowdown),
+ runners: make(map[string]*Runner),
+ info: make(map[string]VMState),
+ checker: checker,
+ baseSource: baseSource,
+ execSource: queue.Retry(baseSource),
+
+ StatExecs: stats.Create("exec total", "Total test program executions",
+ stats.Console, stats.Rate{}, stats.Prometheus("syz_exec_total")),
+ StatNumFuzzing: stats.Create("fuzzing VMs", "Number of VMs that are currently fuzzing",
+ stats.Console, stats.Link("/vms")),
+ statExecRetries: stats.Create("exec retries",
+ "Number of times a test program was restarted because the first run failed",
+ stats.Rate{}, stats.Graph("executor")),
+ statExecutorRestarts: stats.Create("executor restarts",
+ "Number of times executor process was restarted", stats.Rate{}, stats.Graph("executor")),
+ statExecBufferTooSmall: stats.Create("buffer too small",
+ "Program serialization overflowed exec buffer", stats.NoGraph),
+ statVMRestarts: stats.Create("vm restarts", "Total number of VM starts",
+ stats.Rate{}, stats.NoGraph),
+ statNoExecRequests: stats.Create("no exec requests",
+ "Number of times fuzzer was stalled with no exec requests", stats.Rate{}),
+ statNoExecDuration: stats.Create("no exec duration",
+ "Total duration fuzzer was stalled with no exec requests (ns/sec)", stats.Rate{}),
+ }
+ s, err := flatrpc.ListenAndServe(cfg.RPC, serv.handleConn)
+ if err != nil {
+ return nil, err
+ }
+ serv.serv = s
+ serv.Port = s.Addr.Port
+ return serv, nil
+}
+
+func (serv *Server) Close() error {
+ return serv.serv.Close()
+}
+
+type VMState struct {
+ State int
+ Timestamp time.Time
+}
+
+const (
+ StateOffline = iota
+ StateBooting
+ StateFuzzing
+ StateStopping
+)
+
+func (serv *Server) VMState() map[string]VMState {
+ serv.mu.Lock()
+ defer serv.mu.Unlock()
+ return maps.Clone(serv.info)
+}
+
+func (serv *Server) MachineInfo(name string) []byte {
+ serv.mu.Lock()
+ runner := serv.runners[name]
+ if runner != nil && (runner.conn == nil || runner.stopped) {
+ runner = nil
+ }
+ serv.mu.Unlock()
+ if runner == nil {
+ return []byte("VM is not alive")
+ }
+ return runner.machineInfo
+}
+
+func (serv *Server) RunnerStatus(name string) []byte {
+ serv.mu.Lock()
+ runner := serv.runners[name]
+ if runner != nil && (runner.conn == nil || runner.stopped) {
+ runner = nil
+ }
+ serv.mu.Unlock()
+ if runner == nil {
+ return []byte("VM is not alive")
+ }
+ resc := make(chan []byte, 1)
+ timeout := time.After(time.Minute)
+ select {
+ case runner.infoc <- resc:
+ case <-timeout:
+ return []byte("VM loop is not responding")
+ }
+ select {
+ case res := <-resc:
+ return res
+ case <-timeout:
+ return []byte("VM is not responding")
+ }
+}
+
+func (serv *Server) handleConn(conn *flatrpc.Conn) {
+ name, machineInfo, canonicalizer, err := serv.handshake(conn)
+ if err != nil {
+ log.Logf(1, "%v", err)
+ return
+ }
+
+ if serv.cfg.VMLess {
+ // There is no VM loop, so minic what it would do.
+ serv.CreateInstance(name, nil)
+ defer func() {
+ serv.StopFuzzing(name)
+ serv.ShutdownInstance(name, true)
+ }()
+ }
+
+ serv.mu.Lock()
+ runner := serv.runners[name]
+ if runner == nil || runner.stopped {
+ serv.mu.Unlock()
+ log.Logf(2, "VM %v shut down before connect", name)
+ return
+ }
+ serv.info[name] = VMState{StateFuzzing, time.Now()}
+ runner.conn = conn
+ runner.machineInfo = machineInfo
+ runner.canonicalizer = canonicalizer
+ serv.mu.Unlock()
+ defer close(runner.finished)
+
+ if serv.triagedCorpus.Load() {
+ if err := runner.sendStartLeakChecks(); err != nil {
+ log.Logf(2, "%v", err)
+ return
+ }
+ }
+
+ err = serv.connectionLoop(runner)
+ log.Logf(2, "runner %v: %v", name, err)
+}
+
+func (serv *Server) handshake(conn *flatrpc.Conn) (string, []byte, *cover.CanonicalizerInstance, error) {
+ connectReq, err := flatrpc.Recv[*flatrpc.ConnectRequestRaw](conn)
+ if err != nil {
+ return "", nil, nil, err
+ }
+ log.Logf(1, "runner %v connected", connectReq.Name)
+ if !serv.cfg.VMLess {
+ checkRevisions(connectReq, serv.cfg.Target)
+ }
+ serv.statVMRestarts.Add(1)
+
+ leaks, races := serv.mgr.BugFrames()
+ connectReply := &flatrpc.ConnectReply{
+ Debug: serv.cfg.Debug,
+ Cover: serv.cfg.Cover,
+ Procs: int32(serv.cfg.Procs),
+ Slowdown: int32(serv.timeouts.Slowdown),
+ SyscallTimeoutMs: int32(serv.timeouts.Syscall / time.Millisecond),
+ ProgramTimeoutMs: int32(serv.timeouts.Program / time.Millisecond),
+ LeakFrames: leaks,
+ RaceFrames: races,
+ }
+ connectReply.Files = serv.checker.RequiredFiles()
+ if serv.checkDone.Load() {
+ connectReply.Features = serv.setupFeatures
+ } else {
+ connectReply.Files = append(connectReply.Files, serv.checker.CheckFiles()...)
+ connectReply.Globs = serv.target.RequiredGlobs()
+ connectReply.Features = serv.cfg.Features
+ }
+ if err := flatrpc.Send(conn, connectReply); err != nil {
+ return "", nil, nil, err
+ }
+
+ infoReq, err := flatrpc.Recv[*flatrpc.InfoRequestRaw](conn)
+ if err != nil {
+ return "", nil, nil, err
+ }
+ modules, machineInfo, err := serv.checker.MachineInfo(infoReq.Files)
+ if err != nil {
+ log.Logf(0, "parsing of machine info failed: %v", err)
+ if infoReq.Error == "" {
+ infoReq.Error = err.Error()
+ }
+ }
+ if infoReq.Error != "" {
+ log.Logf(0, "machine check failed: %v", infoReq.Error)
+ serv.checkFailures++
+ if serv.checkFailures == 10 {
+ log.Fatalf("machine check failing")
+ }
+ return "", nil, nil, errors.New("machine check failed")
+ }
+
+ serv.infoOnce.Do(func() {
+ serv.modules = modules
+ serv.canonicalModules = cover.NewCanonicalizer(modules, serv.cfg.Cover)
+ serv.coverFilter = serv.mgr.CoverageFilter(modules)
+ globs := make(map[string][]string)
+ for _, glob := range infoReq.Globs {
+ globs[glob.Name] = glob.Files
+ }
+ serv.target.UpdateGlobs(globs)
+ // Flatbuffers don't do deep copy of byte slices,
+ // so clone manually since we pass it a goroutine.
+ for _, file := range infoReq.Files {
+ file.Data = slices.Clone(file.Data)
+ }
+ // Now execute check programs.
+ go func() {
+ if err := serv.runCheck(infoReq.Files, infoReq.Features); err != nil {
+ log.Fatalf("check failed: %v", err)
+ }
+ }()
+ })
+
+ canonicalizer := serv.canonicalModules.NewInstance(modules)
+ infoReply := &flatrpc.InfoReply{
+ CoverFilter: canonicalizer.Decanonicalize(serv.coverFilter),
+ }
+ if err := flatrpc.Send(conn, infoReply); err != nil {
+ return "", nil, nil, err
+ }
+ return connectReq.Name, machineInfo, canonicalizer, nil
+}
+
+func (serv *Server) connectionLoop(runner *Runner) error {
+ if serv.cfg.Cover {
+ maxSignal := serv.mgr.MaxSignal().ToRaw()
+ for len(maxSignal) != 0 {
+ // Split coverage into batches to not grow the connection serialization
+ // buffer too much (we don't want to grow it larger than what will be needed
+ // to send programs).
+ n := min(len(maxSignal), 50000)
+ if err := runner.sendSignalUpdate(maxSignal[:n], nil); err != nil {
+ return err
+ }
+ maxSignal = maxSignal[n:]
+ }
+ }
+
+ serv.StatNumFuzzing.Add(1)
+ defer serv.StatNumFuzzing.Add(-1)
+ var infoc chan []byte
+ defer func() {
+ if infoc != nil {
+ infoc <- []byte("VM has crashed")
+ }
+ }()
+ for {
+ if infoc == nil {
+ select {
+ case infoc = <-runner.infoc:
+ msg := &flatrpc.HostMessage{
+ Msg: &flatrpc.HostMessages{
+ Type: flatrpc.HostMessagesRawStateRequest,
+ Value: &flatrpc.StateRequest{},
+ },
+ }
+ if err := flatrpc.Send(runner.conn, msg); err != nil {
+ return err
+ }
+ default:
+ }
+ }
+ for len(runner.requests)-len(runner.executing) < 2*serv.cfg.Procs {
+ req := serv.execSource.Next()
+ if req == nil {
+ break
+ }
+ if err := serv.sendRequest(runner, req); err != nil {
+ return err
+ }
+ }
+ if len(runner.requests) == 0 {
+ // The runner has not requests at all, so don't wait to receive anything from it.
+ time.Sleep(10 * time.Millisecond)
+ continue
+ }
+ raw, err := flatrpc.Recv[*flatrpc.ExecutorMessageRaw](runner.conn)
+ if err != nil {
+ return err
+ }
+ if raw.Msg == nil || raw.Msg.Value == nil {
+ return errors.New("received no message")
+ }
+ switch msg := raw.Msg.Value.(type) {
+ case *flatrpc.ExecutingMessage:
+ err = serv.handleExecutingMessage(runner, msg)
+ case *flatrpc.ExecResult:
+ err = serv.handleExecResult(runner, msg)
+ case *flatrpc.StateResult:
+ if infoc != nil {
+ buf := new(bytes.Buffer)
+ fmt.Fprintf(buf, "pending requests on the VM:")
+ for id := range runner.requests {
+ fmt.Fprintf(buf, " %v", id)
+ }
+ fmt.Fprintf(buf, "\n\n")
+ infoc <- append(buf.Bytes(), msg.Data...)
+ infoc = nil
+ }
+ default:
+ return fmt.Errorf("received unknown message type %T", msg)
+ }
+ if err != nil {
+ return err
+ }
+ }
+}
+
+func (serv *Server) sendRequest(runner *Runner, req *queue.Request) error {
+ if err := req.Validate(); err != nil {
+ panic(err)
+ }
+ runner.nextRequestID++
+ id := runner.nextRequestID
+ var flags flatrpc.RequestFlag
+ if req.ReturnOutput {
+ flags |= flatrpc.RequestFlagReturnOutput
+ }
+ if req.ReturnError {
+ flags |= flatrpc.RequestFlagReturnError
+ }
+ allSignal := make([]int32, len(req.ReturnAllSignal))
+ for i, call := range req.ReturnAllSignal {
+ allSignal[i] = int32(call)
+ }
+ // Do not let too much state accumulate.
+ const restartIn = 600
+ resetFlags := flatrpc.ExecFlagCollectSignal | flatrpc.ExecFlagCollectCover | flatrpc.ExecFlagCollectComps
+ opts := req.ExecOpts
+ if req.ExecOpts.ExecFlags&resetFlags != 0 && runner.rnd.Intn(restartIn) == 0 {
+ opts.EnvFlags |= flatrpc.ExecEnvResetState
+ }
+ if serv.cfg.Debug {
+ opts.EnvFlags |= flatrpc.ExecEnvDebug
+ }
+ var data []byte
+ if req.BinaryFile == "" {
+ progData, err := req.Prog.SerializeForExec()
+ if err != nil {
+ // It's bad if we systematically fail to serialize programs,
+ // but so far we don't have a better handling than counting this.
+ // This error is observed a lot on the seeded syz_mount_image calls.
+ serv.statExecBufferTooSmall.Add(1)
+ req.Done(&queue.Result{Status: queue.ExecFailure})
+ return nil
+ }
+ data = progData
+ } else {
+ flags |= flatrpc.RequestFlagIsBinary
+ fileData, err := os.ReadFile(req.BinaryFile)
+ if err != nil {
+ req.Done(&queue.Result{
+ Status: queue.ExecFailure,
+ Err: err,
+ })
+ return nil
+ }
+ data = fileData
+ }
+ msg := &flatrpc.HostMessage{
+ Msg: &flatrpc.HostMessages{
+ Type: flatrpc.HostMessagesRawExecRequest,
+ Value: &flatrpc.ExecRequest{
+ Id: id,
+ ProgData: data,
+ Flags: flags,
+ ExecOpts: &opts,
+ AllSignal: allSignal,
+ },
+ },
+ }
+ runner.requests[id] = req
+ return flatrpc.Send(runner.conn, msg)
+}
+
+func (serv *Server) handleExecutingMessage(runner *Runner, msg *flatrpc.ExecutingMessage) error {
+ req := runner.requests[msg.Id]
+ if req == nil {
+ return fmt.Errorf("can't find executing request %v", msg.Id)
+ }
+ proc := int(msg.ProcId)
+ if proc < 0 || proc >= serv.cfg.Procs {
+ return fmt.Errorf("got bad proc id %v", proc)
+ }
+ serv.StatExecs.Add(1)
+ if msg.Try == 0 {
+ if msg.WaitDuration != 0 {
+ serv.statNoExecRequests.Add(1)
+ // Cap wait duration to 1 second to avoid extreme peaks on the graph
+ // which make it impossible to see real data (the rest becomes a flat line).
+ serv.statNoExecDuration.Add(int(min(msg.WaitDuration, 1e9)))
+ }
+ } else {
+ serv.statExecRetries.Add(1)
+ }
+ runner.lastExec.Note(int(msg.Id), proc, req.Prog.Serialize(), osutil.MonotonicNano())
+ select {
+ case runner.injectExec <- true:
+ default:
+ }
+ runner.executing[msg.Id] = true
+ return nil
+}
+
+func (serv *Server) handleExecResult(runner *Runner, msg *flatrpc.ExecResult) error {
+ req := runner.requests[msg.Id]
+ if req == nil {
+ return fmt.Errorf("can't find executed request %v", msg.Id)
+ }
+ delete(runner.requests, msg.Id)
+ delete(runner.executing, msg.Id)
+ if msg.Info != nil {
+ for len(msg.Info.Calls) < len(req.Prog.Calls) {
+ msg.Info.Calls = append(msg.Info.Calls, &flatrpc.CallInfo{
+ Error: 999,
+ })
+ }
+ msg.Info.Calls = msg.Info.Calls[:len(req.Prog.Calls)]
+ if msg.Info.Freshness == 0 {
+ serv.statExecutorRestarts.Add(1)
+ }
+ if !serv.cfg.Cover && req.ExecOpts.ExecFlags&flatrpc.ExecFlagCollectSignal != 0 {
+ // Coverage collection is disabled, but signal was requested => use a substitute signal.
+ addFallbackSignal(req.Prog, msg.Info)
+ }
+ for i := 0; i < len(msg.Info.Calls); i++ {
+ call := msg.Info.Calls[i]
+ call.Cover = runner.canonicalizer.Canonicalize(call.Cover)
+ call.Signal = runner.canonicalizer.Canonicalize(call.Signal)
+ }
+ if len(msg.Info.ExtraRaw) != 0 {
+ msg.Info.Extra = msg.Info.ExtraRaw[0]
+ for _, info := range msg.Info.ExtraRaw[1:] {
+ // All processing in the fuzzer later will convert signal/cover to maps and dedup,
+ // so there is little point in deduping here.
+ msg.Info.Extra.Cover = append(msg.Info.Extra.Cover, info.Cover...)
+ msg.Info.Extra.Signal = append(msg.Info.Extra.Signal, info.Signal...)
+ }
+ msg.Info.Extra.Cover = runner.canonicalizer.Canonicalize(msg.Info.Extra.Cover)
+ msg.Info.Extra.Signal = runner.canonicalizer.Canonicalize(msg.Info.Extra.Signal)
+ msg.Info.ExtraRaw = nil
+ }
+ }
+ status := queue.Success
+ var resErr error
+ if msg.Error != "" {
+ status = queue.ExecFailure
+ resErr = errors.New(msg.Error)
+ }
+ req.Done(&queue.Result{
+ Status: status,
+ Info: msg.Info,
+ Output: slices.Clone(msg.Output),
+ Err: resErr,
+ })
+ return nil
+}
+
+func checkRevisions(a *flatrpc.ConnectRequest, target *prog.Target) {
+ if target.Arch != a.Arch {
+ log.Fatalf("mismatching manager/executor arches: %v vs %v", target.Arch, a.Arch)
+ }
+ if prog.GitRevision != a.GitRevision {
+ log.Fatalf("mismatching manager/executor git revisions: %v vs %v",
+ prog.GitRevision, a.GitRevision)
+ }
+ if target.Revision != a.SyzRevision {
+ log.Fatalf("mismatching manager/executor system call descriptions: %v vs %v",
+ target.Revision, a.SyzRevision)
+ }
+}
+
+func (serv *Server) runCheck(checkFilesInfo []*flatrpc.FileInfo, checkFeatureInfo []*flatrpc.FeatureInfo) error {
+ enabledCalls, disabledCalls, features, checkErr := serv.checker.Run(checkFilesInfo, checkFeatureInfo)
+ enabledCalls, transitivelyDisabled := serv.target.TransitivelyEnabledCalls(enabledCalls)
+ // Note: need to print disbled syscalls before failing due to an error.
+ // This helps to debug "all system calls are disabled".
+ if serv.cfg.PrintMachineCheck {
+ serv.printMachineCheck(checkFilesInfo, enabledCalls, disabledCalls, transitivelyDisabled, features)
+ }
+ if checkErr != nil {
+ return checkErr
+ }
+ serv.enabledFeatures = features.Enabled()
+ serv.setupFeatures = features.NeedSetup()
+ newSource := serv.mgr.MachineChecked(serv.enabledFeatures, enabledCalls)
+ serv.baseSource.Store(newSource)
+ serv.checkDone.Store(true)
+ return nil
+}
+
+func (serv *Server) printMachineCheck(checkFilesInfo []*flatrpc.FileInfo, enabledCalls map[*prog.Syscall]bool,
+ disabledCalls, transitivelyDisabled map[*prog.Syscall]string, features vminfo.Features) {
+ buf := new(bytes.Buffer)
+ if len(serv.cfg.Syscalls) != 0 || log.V(1) {
+ if len(disabledCalls) != 0 {
+ var lines []string
+ for call, reason := range disabledCalls {
+ lines = append(lines, fmt.Sprintf("%-44v: %v\n", call.Name, reason))
+ }
+ sort.Strings(lines)
+ fmt.Fprintf(buf, "disabled the following syscalls:\n%s\n", strings.Join(lines, ""))
+ }
+ if len(transitivelyDisabled) != 0 {
+ var lines []string
+ for call, reason := range transitivelyDisabled {
+ lines = append(lines, fmt.Sprintf("%-44v: %v\n", call.Name, reason))
+ }
+ sort.Strings(lines)
+ fmt.Fprintf(buf, "transitively disabled the following syscalls"+
+ " (missing resource [creating syscalls]):\n%s\n",
+ strings.Join(lines, ""))
+ }
+ }
+ hasFileErrors := false
+ for _, file := range checkFilesInfo {
+ if file.Error == "" {
+ continue
+ }
+ if !hasFileErrors {
+ fmt.Fprintf(buf, "failed to read the following files in the VM:\n")
+ }
+ fmt.Fprintf(buf, "%-44v: %v\n", file.Name, file.Error)
+ hasFileErrors = true
+ }
+ if hasFileErrors {
+ fmt.Fprintf(buf, "\n")
+ }
+ var lines []string
+ lines = append(lines, fmt.Sprintf("%-24v: %v/%v\n", "syscalls",
+ len(enabledCalls), len(serv.cfg.Target.Syscalls)))
+ for feat, info := range features {
+ lines = append(lines, fmt.Sprintf("%-24v: %v\n",
+ flatrpc.EnumNamesFeature[feat], info.Reason))
+ }
+ sort.Strings(lines)
+ buf.WriteString(strings.Join(lines, ""))
+ fmt.Fprintf(buf, "\n")
+ log.Logf(0, "machine check:\n%s", buf.Bytes())
+}
+
+func (serv *Server) CreateInstance(name string, injectExec chan<- bool) {
+ runner := &Runner{
+ injectExec: injectExec,
+ infoc: make(chan chan []byte),
+ finished: make(chan bool),
+ requests: make(map[int64]*queue.Request),
+ executing: make(map[int64]bool),
+ lastExec: MakeLastExecuting(serv.cfg.Procs, 6),
+ rnd: rand.New(rand.NewSource(time.Now().UnixNano())),
+ }
+ serv.mu.Lock()
+ if serv.runners[name] != nil {
+ panic(fmt.Sprintf("duplicate instance %s", name))
+ }
+ serv.runners[name] = runner
+ serv.info[name] = VMState{StateBooting, time.Now()}
+ serv.mu.Unlock()
+}
+
+// stopInstance prevents further request exchange requests.
+// To make RPCServer fully forget an instance, shutdownInstance() must be called.
+func (serv *Server) StopFuzzing(name string) {
+ serv.mu.Lock()
+ runner := serv.runners[name]
+ runner.stopped = true
+ conn := runner.conn
+ serv.info[name] = VMState{StateStopping, time.Now()}
+ serv.mu.Unlock()
+ if conn != nil {
+ conn.Close()
+ }
+}
+
+func (serv *Server) ShutdownInstance(name string, crashed bool) ([]ExecRecord, []byte) {
+ serv.mu.Lock()
+ runner := serv.runners[name]
+ delete(serv.runners, name)
+ serv.info[name] = VMState{StateOffline, time.Now()}
+ serv.mu.Unlock()
+ if runner.conn != nil {
+ // Wait for the connection goroutine to finish and stop touching data.
+ // If conn is nil before we removed the runner, then it won't touch anything.
+ <-runner.finished
+ }
+ for id, req := range runner.requests {
+ status := queue.Restarted
+ if crashed && runner.executing[id] {
+ status = queue.Crashed
+ }
+ req.Done(&queue.Result{Status: status})
+ }
+ return runner.lastExec.Collect(), runner.machineInfo
+}
+
+func (serv *Server) DistributeSignalDelta(plus, minus signal.Signal) {
+ plusRaw := plus.ToRaw()
+ minusRaw := minus.ToRaw()
+ serv.foreachRunnerAsync(func(runner *Runner) {
+ runner.sendSignalUpdate(plusRaw, minusRaw)
+ })
+}
+
+func (runner *Runner) sendSignalUpdate(plus, minus []uint64) error {
+ msg := &flatrpc.HostMessage{
+ Msg: &flatrpc.HostMessages{
+ Type: flatrpc.HostMessagesRawSignalUpdate,
+ Value: &flatrpc.SignalUpdate{
+ NewMax: runner.canonicalizer.Decanonicalize(plus),
+ DropMax: runner.canonicalizer.Decanonicalize(minus),
+ },
+ },
+ }
+ return flatrpc.Send(runner.conn, msg)
+}
+
+func (serv *Server) TriagedCorpus() {
+ serv.triagedCorpus.Store(true)
+ serv.foreachRunnerAsync(func(runner *Runner) {
+ runner.sendStartLeakChecks()
+ })
+}
+
+func (runner *Runner) sendStartLeakChecks() error {
+ msg := &flatrpc.HostMessage{
+ Msg: &flatrpc.HostMessages{
+ Type: flatrpc.HostMessagesRawStartLeakChecks,
+ Value: &flatrpc.StartLeakChecks{},
+ },
+ }
+ return flatrpc.Send(runner.conn, msg)
+}
+
+// foreachRunnerAsync runs callback fn for each connected runner asynchronously.
+// If a VM has hanged w/o reading out the socket, we want to avoid blocking
+// important goroutines on the send operations.
+func (serv *Server) foreachRunnerAsync(fn func(runner *Runner)) {
+ serv.mu.Lock()
+ defer serv.mu.Unlock()
+ for _, runner := range serv.runners {
+ if runner.conn != nil {
+ go fn(runner)
+ }
+ }
+}
+
+// addFallbackSignal computes simple fallback signal in cases we don't have real coverage signal.
+// We use syscall number or-ed with returned errno value as signal.
+// At least this gives us all combinations of syscall+errno.
+func addFallbackSignal(p *prog.Prog, info *flatrpc.ProgInfo) {
+ callInfos := make([]prog.CallInfo, len(info.Calls))
+ for i, inf := range info.Calls {
+ if inf.Flags&flatrpc.CallFlagExecuted != 0 {
+ callInfos[i].Flags |= prog.CallExecuted
+ }
+ if inf.Flags&flatrpc.CallFlagFinished != 0 {
+ callInfos[i].Flags |= prog.CallFinished
+ }
+ if inf.Flags&flatrpc.CallFlagBlocked != 0 {
+ callInfos[i].Flags |= prog.CallBlocked
+ }
+ callInfos[i].Errno = int(inf.Error)
+ }
+ p.FallbackSignal(callInfos)
+ for i, inf := range callInfos {
+ info.Calls[i].Signal = inf.Signal
+ }
+}