diff options
| author | Dmitry Vyukov <dvyukov@google.com> | 2024-06-04 12:55:41 +0200 |
|---|---|---|
| committer | Dmitry Vyukov <dvyukov@google.com> | 2024-06-24 09:57:34 +0000 |
| commit | e16e2c9a4cb6937323e861b646792a6c4c978a3c (patch) | |
| tree | 6c513e98e5f465b44a98546d8984485d2c128582 /pkg/rpcserver | |
| parent | 90d67044dab68568e8f35bc14b68055dbd166eff (diff) | |
executor: add runner mode
Move all syz-fuzzer logic into syz-executor and remove syz-fuzzer.
Also restore syz-runtest functionality in the manager.
Update #4917 (sets most signal handlers to SIG_IGN)
Diffstat (limited to 'pkg/rpcserver')
| -rw-r--r-- | pkg/rpcserver/last_executing.go | 68 | ||||
| -rw-r--r-- | pkg/rpcserver/last_executing_test.go | 56 | ||||
| -rw-r--r-- | pkg/rpcserver/local.go | 138 | ||||
| -rw-r--r-- | pkg/rpcserver/rpcserver.go | 796 |
4 files changed, 1058 insertions, 0 deletions
diff --git a/pkg/rpcserver/last_executing.go b/pkg/rpcserver/last_executing.go new file mode 100644 index 000000000..341ae6534 --- /dev/null +++ b/pkg/rpcserver/last_executing.go @@ -0,0 +1,68 @@ +// Copyright 2024 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package rpcserver + +import ( + "sort" + "time" +) + +// LastExecuting keeps the given number of last executed programs +// for each proc in a VM, and allows to query this set after a crash. +type LastExecuting struct { + count int + procs []ExecRecord + positions []int +} + +type ExecRecord struct { + ID int + Proc int + Prog []byte + Time time.Duration +} + +func MakeLastExecuting(procs, count int) *LastExecuting { + return &LastExecuting{ + count: count, + procs: make([]ExecRecord, procs*count), + positions: make([]int, procs), + } +} + +// Note execution of the 'prog' on 'proc' at time 'now'. +func (last *LastExecuting) Note(id, proc int, prog []byte, now time.Duration) { + pos := &last.positions[proc] + last.procs[proc*last.count+*pos] = ExecRecord{ + ID: id, + Proc: proc, + Prog: prog, + Time: now, + } + *pos++ + if *pos == last.count { + *pos = 0 + } +} + +// Returns a sorted set of last executing programs. +// The records are sorted by time in ascending order. +// ExecRecord.Time is the difference in start executing time between this +// program and the program that started executing last. +func (last *LastExecuting) Collect() []ExecRecord { + procs := last.procs + last.procs = nil // The type must not be used after this. + sort.Slice(procs, func(i, j int) bool { + return procs[i].Time < procs[j].Time + }) + max := procs[len(procs)-1].Time + for i := len(procs) - 1; i >= 0; i-- { + if procs[i].Time == 0 { + procs = procs[i+1:] + break + } + procs[i].Time = max - procs[i].Time + } + return procs +} diff --git a/pkg/rpcserver/last_executing_test.go b/pkg/rpcserver/last_executing_test.go new file mode 100644 index 000000000..c9f3cc2bf --- /dev/null +++ b/pkg/rpcserver/last_executing_test.go @@ -0,0 +1,56 @@ +// Copyright 2024 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package rpcserver + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestLastExecutingEmpty(t *testing.T) { + last := MakeLastExecuting(10, 10) + assert.Empty(t, last.Collect()) +} + +func TestLastExecuting(t *testing.T) { + last := MakeLastExecuting(10, 3) + last.Note(1, 0, []byte("prog1"), 1) + + last.Note(2, 1, []byte("prog2"), 2) + last.Note(3, 1, []byte("prog3"), 3) + + last.Note(4, 3, []byte("prog4"), 4) + last.Note(5, 3, []byte("prog5"), 5) + last.Note(6, 3, []byte("prog6"), 6) + + last.Note(7, 7, []byte("prog7"), 7) + last.Note(8, 7, []byte("prog8"), 8) + last.Note(9, 7, []byte("prog9"), 9) + last.Note(10, 7, []byte("prog10"), 10) + last.Note(11, 7, []byte("prog11"), 11) + + last.Note(12, 9, []byte("prog12"), 12) + + last.Note(13, 8, []byte("prog13"), 13) + + assert.Equal(t, last.Collect(), []ExecRecord{ + {ID: 1, Proc: 0, Prog: []byte("prog1"), Time: 12}, + + {ID: 2, Proc: 1, Prog: []byte("prog2"), Time: 11}, + {ID: 3, Proc: 1, Prog: []byte("prog3"), Time: 10}, + + {ID: 4, Proc: 3, Prog: []byte("prog4"), Time: 9}, + {ID: 5, Proc: 3, Prog: []byte("prog5"), Time: 8}, + {ID: 6, Proc: 3, Prog: []byte("prog6"), Time: 7}, + + {ID: 9, Proc: 7, Prog: []byte("prog9"), Time: 4}, + {ID: 10, Proc: 7, Prog: []byte("prog10"), Time: 3}, + {ID: 11, Proc: 7, Prog: []byte("prog11"), Time: 2}, + + {ID: 12, Proc: 9, Prog: []byte("prog12"), Time: 1}, + + {ID: 13, Proc: 8, Prog: []byte("prog13"), Time: 0}, + }) +} diff --git a/pkg/rpcserver/local.go b/pkg/rpcserver/local.go new file mode 100644 index 000000000..da1de1fc0 --- /dev/null +++ b/pkg/rpcserver/local.go @@ -0,0 +1,138 @@ +// Copyright 2024 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package rpcserver + +import ( + "context" + "errors" + "fmt" + "os" + "os/exec" + "time" + + "github.com/google/syzkaller/pkg/cover" + "github.com/google/syzkaller/pkg/flatrpc" + "github.com/google/syzkaller/pkg/fuzzer/queue" + "github.com/google/syzkaller/pkg/log" + "github.com/google/syzkaller/pkg/osutil" + "github.com/google/syzkaller/pkg/signal" + "github.com/google/syzkaller/prog" +) + +type LocalConfig struct { + Config + // syz-executor binary. + Executor string + // Temp dir where to run executor process, it's up to the caller to clean it up if necessary. + Dir string + // Handle ctrl+C and exit. + HandleInterrupts bool + // Run executor under gdb. + GDB bool + // RunLocal exits when the context is cancelled. + Context context.Context + MachineChecked func(features flatrpc.Feature, syscalls map[*prog.Syscall]bool) queue.Source +} + +func RunLocal(cfg *LocalConfig) error { + cfg.RPC = ":0" + cfg.VMLess = true + cfg.PrintMachineCheck = log.V(1) + ctx := &local{ + cfg: cfg, + setupDone: make(chan bool), + } + serv, err := newImpl(&cfg.Config, ctx) + if err != nil { + return err + } + defer serv.Close() + ctx.serv = serv + // setupDone synchronizes assignment to ctx.serv and read of ctx.serv in MachineChecked + // for the race detector b/c it does not understand the synchronization via TCP socket connect/accept. + close(ctx.setupDone) + + bin := cfg.Executor + args := []string{"runner", "local", "localhost", fmt.Sprint(serv.Port)} + if cfg.GDB { + bin = "gdb" + args = append([]string{ + "--return-child-result", + "--ex=handle SIGPIPE nostop", + "--args", + cfg.Executor, + }, args...) + } + cmd := exec.Command(bin, args...) + cmd.Dir = cfg.Dir + if cfg.Debug || cfg.GDB { + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + } + if cfg.GDB { + cmd.Stdin = os.Stdin + } + if err := cmd.Start(); err != nil { + return fmt.Errorf("failed to start executor: %w", err) + } + res := make(chan error, 1) + go func() { res <- cmd.Wait() }() + shutdown := make(chan struct{}) + if cfg.HandleInterrupts { + osutil.HandleInterrupts(shutdown) + } + var cmdErr error + select { + case <-shutdown: + case <-cfg.Context.Done(): + case err := <-res: + cmdErr = fmt.Errorf("executor process exited: %w", err) + } + if cmdErr == nil { + cmd.Process.Kill() + <-res + } + if !cfg.HandleInterrupts { + // If the executor has crashed early, reply to all remaining requests to unblock tests. + loop: + for { + req := serv.execSource.Next() + if req == nil { + select { + case <-cfg.Context.Done(): + break loop + default: + time.Sleep(time.Millisecond) + continue loop + } + } + req.Done(&queue.Result{Status: queue.ExecFailure, Err: errors.New("executor crashed")}) + } + } + return cmdErr +} + +type local struct { + cfg *LocalConfig + serv *Server + setupDone chan bool +} + +func (ctx *local) MachineChecked(features flatrpc.Feature, syscalls map[*prog.Syscall]bool) queue.Source { + <-ctx.setupDone + ctx.serv.TriagedCorpus() + return ctx.cfg.MachineChecked(features, syscalls) +} + +func (ctx *local) BugFrames() ([]string, []string) { + return nil, nil +} + +func (ctx *local) MaxSignal() signal.Signal { + return nil +} + +func (ctx *local) CoverageFilter(modules []*cover.KernelModule) []uint64 { + return nil +} diff --git a/pkg/rpcserver/rpcserver.go b/pkg/rpcserver/rpcserver.go new file mode 100644 index 000000000..1b090126b --- /dev/null +++ b/pkg/rpcserver/rpcserver.go @@ -0,0 +1,796 @@ +// Copyright 2024 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package rpcserver + +import ( + "bytes" + "errors" + "fmt" + "maps" + "math/rand" + "os" + "slices" + "sort" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/google/syzkaller/pkg/cover" + "github.com/google/syzkaller/pkg/flatrpc" + "github.com/google/syzkaller/pkg/fuzzer/queue" + "github.com/google/syzkaller/pkg/log" + "github.com/google/syzkaller/pkg/mgrconfig" + "github.com/google/syzkaller/pkg/osutil" + "github.com/google/syzkaller/pkg/signal" + "github.com/google/syzkaller/pkg/stats" + "github.com/google/syzkaller/pkg/vminfo" + "github.com/google/syzkaller/prog" + "github.com/google/syzkaller/sys/targets" +) + +type Config struct { + vminfo.Config + RPC string + VMLess bool + PrintMachineCheck bool + Procs int + Slowdown int +} + +type Manager interface { + MaxSignal() signal.Signal + BugFrames() (leaks []string, races []string) + MachineChecked(features flatrpc.Feature, syscalls map[*prog.Syscall]bool) queue.Source + CoverageFilter(modules []*cover.KernelModule) []uint64 +} + +type Server struct { + Port int + StatExecs *stats.Val + StatNumFuzzing *stats.Val + + cfg *Config + mgr Manager + serv *flatrpc.Serv + target *prog.Target + timeouts targets.Timeouts + checker *vminfo.Checker + + infoOnce sync.Once + checkDone atomic.Bool + checkFailures int + baseSource *queue.DynamicSourceCtl + enabledFeatures flatrpc.Feature + setupFeatures flatrpc.Feature + modules []*cover.KernelModule + canonicalModules *cover.Canonicalizer + coverFilter []uint64 + + mu sync.Mutex + runners map[string]*Runner + info map[string]VMState + execSource queue.Source + triagedCorpus atomic.Bool + + statExecRetries *stats.Val + statExecutorRestarts *stats.Val + statExecBufferTooSmall *stats.Val + statVMRestarts *stats.Val + statNoExecRequests *stats.Val + statNoExecDuration *stats.Val +} + +type Runner struct { + stopped bool + finished chan bool + injectExec chan<- bool + infoc chan chan []byte + conn *flatrpc.Conn + machineInfo []byte + canonicalizer *cover.CanonicalizerInstance + nextRequestID int64 + requests map[int64]*queue.Request + executing map[int64]bool + lastExec *LastExecuting + rnd *rand.Rand +} + +func New(cfg *mgrconfig.Config, mgr Manager, debug bool) (*Server, error) { + sandbox, err := flatrpc.SandboxToFlags(cfg.Sandbox) + if err != nil { + return nil, err + } + return newImpl(&Config{ + Config: vminfo.Config{ + Target: cfg.Target, + Features: flatrpc.AllFeatures, + Syscalls: cfg.Syscalls, + Debug: debug, + Cover: cfg.Cover, + Sandbox: sandbox, + SandboxArg: cfg.SandboxArg, + }, + RPC: cfg.RPC, + VMLess: cfg.VMLess, + PrintMachineCheck: true, + Procs: cfg.Procs, + Slowdown: cfg.Timeouts.Slowdown, + }, mgr) +} + +func newImpl(cfg *Config, mgr Manager) (*Server, error) { + cfg.Procs = min(cfg.Procs, prog.MaxPids) + checker := vminfo.New(&cfg.Config) + baseSource := queue.DynamicSource(checker) + serv := &Server{ + cfg: cfg, + mgr: mgr, + target: cfg.Target, + timeouts: targets.Get(cfg.Target.OS, cfg.Target.Arch).Timeouts(cfg.Slowdown), + runners: make(map[string]*Runner), + info: make(map[string]VMState), + checker: checker, + baseSource: baseSource, + execSource: queue.Retry(baseSource), + + StatExecs: stats.Create("exec total", "Total test program executions", + stats.Console, stats.Rate{}, stats.Prometheus("syz_exec_total")), + StatNumFuzzing: stats.Create("fuzzing VMs", "Number of VMs that are currently fuzzing", + stats.Console, stats.Link("/vms")), + statExecRetries: stats.Create("exec retries", + "Number of times a test program was restarted because the first run failed", + stats.Rate{}, stats.Graph("executor")), + statExecutorRestarts: stats.Create("executor restarts", + "Number of times executor process was restarted", stats.Rate{}, stats.Graph("executor")), + statExecBufferTooSmall: stats.Create("buffer too small", + "Program serialization overflowed exec buffer", stats.NoGraph), + statVMRestarts: stats.Create("vm restarts", "Total number of VM starts", + stats.Rate{}, stats.NoGraph), + statNoExecRequests: stats.Create("no exec requests", + "Number of times fuzzer was stalled with no exec requests", stats.Rate{}), + statNoExecDuration: stats.Create("no exec duration", + "Total duration fuzzer was stalled with no exec requests (ns/sec)", stats.Rate{}), + } + s, err := flatrpc.ListenAndServe(cfg.RPC, serv.handleConn) + if err != nil { + return nil, err + } + serv.serv = s + serv.Port = s.Addr.Port + return serv, nil +} + +func (serv *Server) Close() error { + return serv.serv.Close() +} + +type VMState struct { + State int + Timestamp time.Time +} + +const ( + StateOffline = iota + StateBooting + StateFuzzing + StateStopping +) + +func (serv *Server) VMState() map[string]VMState { + serv.mu.Lock() + defer serv.mu.Unlock() + return maps.Clone(serv.info) +} + +func (serv *Server) MachineInfo(name string) []byte { + serv.mu.Lock() + runner := serv.runners[name] + if runner != nil && (runner.conn == nil || runner.stopped) { + runner = nil + } + serv.mu.Unlock() + if runner == nil { + return []byte("VM is not alive") + } + return runner.machineInfo +} + +func (serv *Server) RunnerStatus(name string) []byte { + serv.mu.Lock() + runner := serv.runners[name] + if runner != nil && (runner.conn == nil || runner.stopped) { + runner = nil + } + serv.mu.Unlock() + if runner == nil { + return []byte("VM is not alive") + } + resc := make(chan []byte, 1) + timeout := time.After(time.Minute) + select { + case runner.infoc <- resc: + case <-timeout: + return []byte("VM loop is not responding") + } + select { + case res := <-resc: + return res + case <-timeout: + return []byte("VM is not responding") + } +} + +func (serv *Server) handleConn(conn *flatrpc.Conn) { + name, machineInfo, canonicalizer, err := serv.handshake(conn) + if err != nil { + log.Logf(1, "%v", err) + return + } + + if serv.cfg.VMLess { + // There is no VM loop, so minic what it would do. + serv.CreateInstance(name, nil) + defer func() { + serv.StopFuzzing(name) + serv.ShutdownInstance(name, true) + }() + } + + serv.mu.Lock() + runner := serv.runners[name] + if runner == nil || runner.stopped { + serv.mu.Unlock() + log.Logf(2, "VM %v shut down before connect", name) + return + } + serv.info[name] = VMState{StateFuzzing, time.Now()} + runner.conn = conn + runner.machineInfo = machineInfo + runner.canonicalizer = canonicalizer + serv.mu.Unlock() + defer close(runner.finished) + + if serv.triagedCorpus.Load() { + if err := runner.sendStartLeakChecks(); err != nil { + log.Logf(2, "%v", err) + return + } + } + + err = serv.connectionLoop(runner) + log.Logf(2, "runner %v: %v", name, err) +} + +func (serv *Server) handshake(conn *flatrpc.Conn) (string, []byte, *cover.CanonicalizerInstance, error) { + connectReq, err := flatrpc.Recv[*flatrpc.ConnectRequestRaw](conn) + if err != nil { + return "", nil, nil, err + } + log.Logf(1, "runner %v connected", connectReq.Name) + if !serv.cfg.VMLess { + checkRevisions(connectReq, serv.cfg.Target) + } + serv.statVMRestarts.Add(1) + + leaks, races := serv.mgr.BugFrames() + connectReply := &flatrpc.ConnectReply{ + Debug: serv.cfg.Debug, + Cover: serv.cfg.Cover, + Procs: int32(serv.cfg.Procs), + Slowdown: int32(serv.timeouts.Slowdown), + SyscallTimeoutMs: int32(serv.timeouts.Syscall / time.Millisecond), + ProgramTimeoutMs: int32(serv.timeouts.Program / time.Millisecond), + LeakFrames: leaks, + RaceFrames: races, + } + connectReply.Files = serv.checker.RequiredFiles() + if serv.checkDone.Load() { + connectReply.Features = serv.setupFeatures + } else { + connectReply.Files = append(connectReply.Files, serv.checker.CheckFiles()...) + connectReply.Globs = serv.target.RequiredGlobs() + connectReply.Features = serv.cfg.Features + } + if err := flatrpc.Send(conn, connectReply); err != nil { + return "", nil, nil, err + } + + infoReq, err := flatrpc.Recv[*flatrpc.InfoRequestRaw](conn) + if err != nil { + return "", nil, nil, err + } + modules, machineInfo, err := serv.checker.MachineInfo(infoReq.Files) + if err != nil { + log.Logf(0, "parsing of machine info failed: %v", err) + if infoReq.Error == "" { + infoReq.Error = err.Error() + } + } + if infoReq.Error != "" { + log.Logf(0, "machine check failed: %v", infoReq.Error) + serv.checkFailures++ + if serv.checkFailures == 10 { + log.Fatalf("machine check failing") + } + return "", nil, nil, errors.New("machine check failed") + } + + serv.infoOnce.Do(func() { + serv.modules = modules + serv.canonicalModules = cover.NewCanonicalizer(modules, serv.cfg.Cover) + serv.coverFilter = serv.mgr.CoverageFilter(modules) + globs := make(map[string][]string) + for _, glob := range infoReq.Globs { + globs[glob.Name] = glob.Files + } + serv.target.UpdateGlobs(globs) + // Flatbuffers don't do deep copy of byte slices, + // so clone manually since we pass it a goroutine. + for _, file := range infoReq.Files { + file.Data = slices.Clone(file.Data) + } + // Now execute check programs. + go func() { + if err := serv.runCheck(infoReq.Files, infoReq.Features); err != nil { + log.Fatalf("check failed: %v", err) + } + }() + }) + + canonicalizer := serv.canonicalModules.NewInstance(modules) + infoReply := &flatrpc.InfoReply{ + CoverFilter: canonicalizer.Decanonicalize(serv.coverFilter), + } + if err := flatrpc.Send(conn, infoReply); err != nil { + return "", nil, nil, err + } + return connectReq.Name, machineInfo, canonicalizer, nil +} + +func (serv *Server) connectionLoop(runner *Runner) error { + if serv.cfg.Cover { + maxSignal := serv.mgr.MaxSignal().ToRaw() + for len(maxSignal) != 0 { + // Split coverage into batches to not grow the connection serialization + // buffer too much (we don't want to grow it larger than what will be needed + // to send programs). + n := min(len(maxSignal), 50000) + if err := runner.sendSignalUpdate(maxSignal[:n], nil); err != nil { + return err + } + maxSignal = maxSignal[n:] + } + } + + serv.StatNumFuzzing.Add(1) + defer serv.StatNumFuzzing.Add(-1) + var infoc chan []byte + defer func() { + if infoc != nil { + infoc <- []byte("VM has crashed") + } + }() + for { + if infoc == nil { + select { + case infoc = <-runner.infoc: + msg := &flatrpc.HostMessage{ + Msg: &flatrpc.HostMessages{ + Type: flatrpc.HostMessagesRawStateRequest, + Value: &flatrpc.StateRequest{}, + }, + } + if err := flatrpc.Send(runner.conn, msg); err != nil { + return err + } + default: + } + } + for len(runner.requests)-len(runner.executing) < 2*serv.cfg.Procs { + req := serv.execSource.Next() + if req == nil { + break + } + if err := serv.sendRequest(runner, req); err != nil { + return err + } + } + if len(runner.requests) == 0 { + // The runner has not requests at all, so don't wait to receive anything from it. + time.Sleep(10 * time.Millisecond) + continue + } + raw, err := flatrpc.Recv[*flatrpc.ExecutorMessageRaw](runner.conn) + if err != nil { + return err + } + if raw.Msg == nil || raw.Msg.Value == nil { + return errors.New("received no message") + } + switch msg := raw.Msg.Value.(type) { + case *flatrpc.ExecutingMessage: + err = serv.handleExecutingMessage(runner, msg) + case *flatrpc.ExecResult: + err = serv.handleExecResult(runner, msg) + case *flatrpc.StateResult: + if infoc != nil { + buf := new(bytes.Buffer) + fmt.Fprintf(buf, "pending requests on the VM:") + for id := range runner.requests { + fmt.Fprintf(buf, " %v", id) + } + fmt.Fprintf(buf, "\n\n") + infoc <- append(buf.Bytes(), msg.Data...) + infoc = nil + } + default: + return fmt.Errorf("received unknown message type %T", msg) + } + if err != nil { + return err + } + } +} + +func (serv *Server) sendRequest(runner *Runner, req *queue.Request) error { + if err := req.Validate(); err != nil { + panic(err) + } + runner.nextRequestID++ + id := runner.nextRequestID + var flags flatrpc.RequestFlag + if req.ReturnOutput { + flags |= flatrpc.RequestFlagReturnOutput + } + if req.ReturnError { + flags |= flatrpc.RequestFlagReturnError + } + allSignal := make([]int32, len(req.ReturnAllSignal)) + for i, call := range req.ReturnAllSignal { + allSignal[i] = int32(call) + } + // Do not let too much state accumulate. + const restartIn = 600 + resetFlags := flatrpc.ExecFlagCollectSignal | flatrpc.ExecFlagCollectCover | flatrpc.ExecFlagCollectComps + opts := req.ExecOpts + if req.ExecOpts.ExecFlags&resetFlags != 0 && runner.rnd.Intn(restartIn) == 0 { + opts.EnvFlags |= flatrpc.ExecEnvResetState + } + if serv.cfg.Debug { + opts.EnvFlags |= flatrpc.ExecEnvDebug + } + var data []byte + if req.BinaryFile == "" { + progData, err := req.Prog.SerializeForExec() + if err != nil { + // It's bad if we systematically fail to serialize programs, + // but so far we don't have a better handling than counting this. + // This error is observed a lot on the seeded syz_mount_image calls. + serv.statExecBufferTooSmall.Add(1) + req.Done(&queue.Result{Status: queue.ExecFailure}) + return nil + } + data = progData + } else { + flags |= flatrpc.RequestFlagIsBinary + fileData, err := os.ReadFile(req.BinaryFile) + if err != nil { + req.Done(&queue.Result{ + Status: queue.ExecFailure, + Err: err, + }) + return nil + } + data = fileData + } + msg := &flatrpc.HostMessage{ + Msg: &flatrpc.HostMessages{ + Type: flatrpc.HostMessagesRawExecRequest, + Value: &flatrpc.ExecRequest{ + Id: id, + ProgData: data, + Flags: flags, + ExecOpts: &opts, + AllSignal: allSignal, + }, + }, + } + runner.requests[id] = req + return flatrpc.Send(runner.conn, msg) +} + +func (serv *Server) handleExecutingMessage(runner *Runner, msg *flatrpc.ExecutingMessage) error { + req := runner.requests[msg.Id] + if req == nil { + return fmt.Errorf("can't find executing request %v", msg.Id) + } + proc := int(msg.ProcId) + if proc < 0 || proc >= serv.cfg.Procs { + return fmt.Errorf("got bad proc id %v", proc) + } + serv.StatExecs.Add(1) + if msg.Try == 0 { + if msg.WaitDuration != 0 { + serv.statNoExecRequests.Add(1) + // Cap wait duration to 1 second to avoid extreme peaks on the graph + // which make it impossible to see real data (the rest becomes a flat line). + serv.statNoExecDuration.Add(int(min(msg.WaitDuration, 1e9))) + } + } else { + serv.statExecRetries.Add(1) + } + runner.lastExec.Note(int(msg.Id), proc, req.Prog.Serialize(), osutil.MonotonicNano()) + select { + case runner.injectExec <- true: + default: + } + runner.executing[msg.Id] = true + return nil +} + +func (serv *Server) handleExecResult(runner *Runner, msg *flatrpc.ExecResult) error { + req := runner.requests[msg.Id] + if req == nil { + return fmt.Errorf("can't find executed request %v", msg.Id) + } + delete(runner.requests, msg.Id) + delete(runner.executing, msg.Id) + if msg.Info != nil { + for len(msg.Info.Calls) < len(req.Prog.Calls) { + msg.Info.Calls = append(msg.Info.Calls, &flatrpc.CallInfo{ + Error: 999, + }) + } + msg.Info.Calls = msg.Info.Calls[:len(req.Prog.Calls)] + if msg.Info.Freshness == 0 { + serv.statExecutorRestarts.Add(1) + } + if !serv.cfg.Cover && req.ExecOpts.ExecFlags&flatrpc.ExecFlagCollectSignal != 0 { + // Coverage collection is disabled, but signal was requested => use a substitute signal. + addFallbackSignal(req.Prog, msg.Info) + } + for i := 0; i < len(msg.Info.Calls); i++ { + call := msg.Info.Calls[i] + call.Cover = runner.canonicalizer.Canonicalize(call.Cover) + call.Signal = runner.canonicalizer.Canonicalize(call.Signal) + } + if len(msg.Info.ExtraRaw) != 0 { + msg.Info.Extra = msg.Info.ExtraRaw[0] + for _, info := range msg.Info.ExtraRaw[1:] { + // All processing in the fuzzer later will convert signal/cover to maps and dedup, + // so there is little point in deduping here. + msg.Info.Extra.Cover = append(msg.Info.Extra.Cover, info.Cover...) + msg.Info.Extra.Signal = append(msg.Info.Extra.Signal, info.Signal...) + } + msg.Info.Extra.Cover = runner.canonicalizer.Canonicalize(msg.Info.Extra.Cover) + msg.Info.Extra.Signal = runner.canonicalizer.Canonicalize(msg.Info.Extra.Signal) + msg.Info.ExtraRaw = nil + } + } + status := queue.Success + var resErr error + if msg.Error != "" { + status = queue.ExecFailure + resErr = errors.New(msg.Error) + } + req.Done(&queue.Result{ + Status: status, + Info: msg.Info, + Output: slices.Clone(msg.Output), + Err: resErr, + }) + return nil +} + +func checkRevisions(a *flatrpc.ConnectRequest, target *prog.Target) { + if target.Arch != a.Arch { + log.Fatalf("mismatching manager/executor arches: %v vs %v", target.Arch, a.Arch) + } + if prog.GitRevision != a.GitRevision { + log.Fatalf("mismatching manager/executor git revisions: %v vs %v", + prog.GitRevision, a.GitRevision) + } + if target.Revision != a.SyzRevision { + log.Fatalf("mismatching manager/executor system call descriptions: %v vs %v", + target.Revision, a.SyzRevision) + } +} + +func (serv *Server) runCheck(checkFilesInfo []*flatrpc.FileInfo, checkFeatureInfo []*flatrpc.FeatureInfo) error { + enabledCalls, disabledCalls, features, checkErr := serv.checker.Run(checkFilesInfo, checkFeatureInfo) + enabledCalls, transitivelyDisabled := serv.target.TransitivelyEnabledCalls(enabledCalls) + // Note: need to print disbled syscalls before failing due to an error. + // This helps to debug "all system calls are disabled". + if serv.cfg.PrintMachineCheck { + serv.printMachineCheck(checkFilesInfo, enabledCalls, disabledCalls, transitivelyDisabled, features) + } + if checkErr != nil { + return checkErr + } + serv.enabledFeatures = features.Enabled() + serv.setupFeatures = features.NeedSetup() + newSource := serv.mgr.MachineChecked(serv.enabledFeatures, enabledCalls) + serv.baseSource.Store(newSource) + serv.checkDone.Store(true) + return nil +} + +func (serv *Server) printMachineCheck(checkFilesInfo []*flatrpc.FileInfo, enabledCalls map[*prog.Syscall]bool, + disabledCalls, transitivelyDisabled map[*prog.Syscall]string, features vminfo.Features) { + buf := new(bytes.Buffer) + if len(serv.cfg.Syscalls) != 0 || log.V(1) { + if len(disabledCalls) != 0 { + var lines []string + for call, reason := range disabledCalls { + lines = append(lines, fmt.Sprintf("%-44v: %v\n", call.Name, reason)) + } + sort.Strings(lines) + fmt.Fprintf(buf, "disabled the following syscalls:\n%s\n", strings.Join(lines, "")) + } + if len(transitivelyDisabled) != 0 { + var lines []string + for call, reason := range transitivelyDisabled { + lines = append(lines, fmt.Sprintf("%-44v: %v\n", call.Name, reason)) + } + sort.Strings(lines) + fmt.Fprintf(buf, "transitively disabled the following syscalls"+ + " (missing resource [creating syscalls]):\n%s\n", + strings.Join(lines, "")) + } + } + hasFileErrors := false + for _, file := range checkFilesInfo { + if file.Error == "" { + continue + } + if !hasFileErrors { + fmt.Fprintf(buf, "failed to read the following files in the VM:\n") + } + fmt.Fprintf(buf, "%-44v: %v\n", file.Name, file.Error) + hasFileErrors = true + } + if hasFileErrors { + fmt.Fprintf(buf, "\n") + } + var lines []string + lines = append(lines, fmt.Sprintf("%-24v: %v/%v\n", "syscalls", + len(enabledCalls), len(serv.cfg.Target.Syscalls))) + for feat, info := range features { + lines = append(lines, fmt.Sprintf("%-24v: %v\n", + flatrpc.EnumNamesFeature[feat], info.Reason)) + } + sort.Strings(lines) + buf.WriteString(strings.Join(lines, "")) + fmt.Fprintf(buf, "\n") + log.Logf(0, "machine check:\n%s", buf.Bytes()) +} + +func (serv *Server) CreateInstance(name string, injectExec chan<- bool) { + runner := &Runner{ + injectExec: injectExec, + infoc: make(chan chan []byte), + finished: make(chan bool), + requests: make(map[int64]*queue.Request), + executing: make(map[int64]bool), + lastExec: MakeLastExecuting(serv.cfg.Procs, 6), + rnd: rand.New(rand.NewSource(time.Now().UnixNano())), + } + serv.mu.Lock() + if serv.runners[name] != nil { + panic(fmt.Sprintf("duplicate instance %s", name)) + } + serv.runners[name] = runner + serv.info[name] = VMState{StateBooting, time.Now()} + serv.mu.Unlock() +} + +// stopInstance prevents further request exchange requests. +// To make RPCServer fully forget an instance, shutdownInstance() must be called. +func (serv *Server) StopFuzzing(name string) { + serv.mu.Lock() + runner := serv.runners[name] + runner.stopped = true + conn := runner.conn + serv.info[name] = VMState{StateStopping, time.Now()} + serv.mu.Unlock() + if conn != nil { + conn.Close() + } +} + +func (serv *Server) ShutdownInstance(name string, crashed bool) ([]ExecRecord, []byte) { + serv.mu.Lock() + runner := serv.runners[name] + delete(serv.runners, name) + serv.info[name] = VMState{StateOffline, time.Now()} + serv.mu.Unlock() + if runner.conn != nil { + // Wait for the connection goroutine to finish and stop touching data. + // If conn is nil before we removed the runner, then it won't touch anything. + <-runner.finished + } + for id, req := range runner.requests { + status := queue.Restarted + if crashed && runner.executing[id] { + status = queue.Crashed + } + req.Done(&queue.Result{Status: status}) + } + return runner.lastExec.Collect(), runner.machineInfo +} + +func (serv *Server) DistributeSignalDelta(plus, minus signal.Signal) { + plusRaw := plus.ToRaw() + minusRaw := minus.ToRaw() + serv.foreachRunnerAsync(func(runner *Runner) { + runner.sendSignalUpdate(plusRaw, minusRaw) + }) +} + +func (runner *Runner) sendSignalUpdate(plus, minus []uint64) error { + msg := &flatrpc.HostMessage{ + Msg: &flatrpc.HostMessages{ + Type: flatrpc.HostMessagesRawSignalUpdate, + Value: &flatrpc.SignalUpdate{ + NewMax: runner.canonicalizer.Decanonicalize(plus), + DropMax: runner.canonicalizer.Decanonicalize(minus), + }, + }, + } + return flatrpc.Send(runner.conn, msg) +} + +func (serv *Server) TriagedCorpus() { + serv.triagedCorpus.Store(true) + serv.foreachRunnerAsync(func(runner *Runner) { + runner.sendStartLeakChecks() + }) +} + +func (runner *Runner) sendStartLeakChecks() error { + msg := &flatrpc.HostMessage{ + Msg: &flatrpc.HostMessages{ + Type: flatrpc.HostMessagesRawStartLeakChecks, + Value: &flatrpc.StartLeakChecks{}, + }, + } + return flatrpc.Send(runner.conn, msg) +} + +// foreachRunnerAsync runs callback fn for each connected runner asynchronously. +// If a VM has hanged w/o reading out the socket, we want to avoid blocking +// important goroutines on the send operations. +func (serv *Server) foreachRunnerAsync(fn func(runner *Runner)) { + serv.mu.Lock() + defer serv.mu.Unlock() + for _, runner := range serv.runners { + if runner.conn != nil { + go fn(runner) + } + } +} + +// addFallbackSignal computes simple fallback signal in cases we don't have real coverage signal. +// We use syscall number or-ed with returned errno value as signal. +// At least this gives us all combinations of syscall+errno. +func addFallbackSignal(p *prog.Prog, info *flatrpc.ProgInfo) { + callInfos := make([]prog.CallInfo, len(info.Calls)) + for i, inf := range info.Calls { + if inf.Flags&flatrpc.CallFlagExecuted != 0 { + callInfos[i].Flags |= prog.CallExecuted + } + if inf.Flags&flatrpc.CallFlagFinished != 0 { + callInfos[i].Flags |= prog.CallFinished + } + if inf.Flags&flatrpc.CallFlagBlocked != 0 { + callInfos[i].Flags |= prog.CallBlocked + } + callInfos[i].Errno = int(inf.Error) + } + p.FallbackSignal(callInfos) + for i, inf := range callInfos { + info.Calls[i].Signal = inf.Signal + } +} |
