diff options
| author | Aleksandr Nogikh <nogikh@google.com> | 2024-07-11 11:43:41 +0200 |
|---|---|---|
| committer | Aleksandr Nogikh <nogikh@google.com> | 2024-07-11 10:39:33 +0000 |
| commit | baf24fbe840b2b9fd49636f49fd6fa42f39290b9 (patch) | |
| tree | 84d4b385c05fc24bf66c3fd73f359ae4e65d8239 /pkg | |
| parent | eba8394c4eea3a25682631625fea564acc0c0356 (diff) | |
pkg/rpcserver: debug executor stalls
In some cases, the executor seems to be mysteriously silent when we were
awaiting a reply.
During pkg/runtest tests, give it 1 minute to prepare a reply, then try
to request the current state and abort the connection.
Diffstat (limited to 'pkg')
| -rw-r--r-- | pkg/flatrpc/conn.go | 10 | ||||
| -rw-r--r-- | pkg/rpcserver/rpcserver.go | 41 | ||||
| -rw-r--r-- | pkg/rpcserver/runner.go | 74 | ||||
| -rw-r--r-- | pkg/runtest/run_test.go | 7 |
4 files changed, 91 insertions, 41 deletions
diff --git a/pkg/flatrpc/conn.go b/pkg/flatrpc/conn.go index 9d8d1ce59..56afdfca4 100644 --- a/pkg/flatrpc/conn.go +++ b/pkg/flatrpc/conn.go @@ -109,14 +109,16 @@ func Send[T sendMsg](c *Conn, msg T) error { return nil } +type RecvType[T any] interface { + UnPack() *T + flatbuffers.FlatBuffer +} + // Recv receives an RPC message. // The type T is supposed to be a pointer to a normal flatbuffers type (not ending with T, e.g. *ConnectRequestRaw). // Receiving should be done from a single goroutine, the received message is valid // only until the next Recv call (messages share the same underlying receive buffer). -func Recv[Raw interface { - UnPack() *T - flatbuffers.FlatBuffer -}, T any](c *Conn) (res *T, err0 error) { +func Recv[Raw RecvType[T], T any](c *Conn) (res *T, err0 error) { defer func() { if err1 := recover(); err1 != nil { if err2, ok := err1.(error); ok { diff --git a/pkg/rpcserver/rpcserver.go b/pkg/rpcserver/rpcserver.go index a6cae67cd..85d4a2bd7 100644 --- a/pkg/rpcserver/rpcserver.go +++ b/pkg/rpcserver/rpcserver.go @@ -41,10 +41,12 @@ type Config struct { // Disabled for gVisor/Starnix which are not Linux. FilterSignal bool PrintMachineCheck bool - Procs int - Slowdown int - pcBase uint64 - localModules []*vminfo.KernelModule + // Abort early on syz-executor not replying to requests and print extra debugging information. + DebugTimeouts bool + Procs int + Slowdown int + pcBase uint64 + localModules []*vminfo.KernelModule } type Manager interface { @@ -393,21 +395,22 @@ func (serv *Server) printMachineCheck(checkFilesInfo []*flatrpc.FileInfo, enable func (serv *Server) CreateInstance(name string, injectExec chan<- bool, updInfo dispatcher.UpdateInfo) { runner := &Runner{ - source: serv.execSource, - cover: serv.cfg.Cover, - coverEdges: serv.cfg.UseCoverEdges, - filterSignal: serv.cfg.FilterSignal, - debug: serv.cfg.Debug, - sysTarget: serv.sysTarget, - injectExec: injectExec, - infoc: make(chan chan []byte), - requests: make(map[int64]*queue.Request), - executing: make(map[int64]bool), - lastExec: MakeLastExecuting(serv.cfg.Procs, 6), - rnd: rand.New(rand.NewSource(time.Now().UnixNano())), - stats: serv.runnerStats, - procs: serv.cfg.Procs, - updInfo: updInfo, + source: serv.execSource, + cover: serv.cfg.Cover, + coverEdges: serv.cfg.UseCoverEdges, + filterSignal: serv.cfg.FilterSignal, + debug: serv.cfg.Debug, + debugTimeouts: serv.cfg.DebugTimeouts, + sysTarget: serv.sysTarget, + injectExec: injectExec, + infoc: make(chan chan []byte), + requests: make(map[int64]*queue.Request), + executing: make(map[int64]bool), + lastExec: MakeLastExecuting(serv.cfg.Procs, 6), + rnd: rand.New(rand.NewSource(time.Now().UnixNano())), + stats: serv.runnerStats, + procs: serv.cfg.Procs, + updInfo: updInfo, } serv.mu.Lock() defer serv.mu.Unlock() diff --git a/pkg/rpcserver/runner.go b/pkg/rpcserver/runner.go index c45598b9c..ec4ad2076 100644 --- a/pkg/rpcserver/runner.go +++ b/pkg/rpcserver/runner.go @@ -16,6 +16,7 @@ import ( "github.com/google/syzkaller/pkg/cover" "github.com/google/syzkaller/pkg/flatrpc" "github.com/google/syzkaller/pkg/fuzzer/queue" + "github.com/google/syzkaller/pkg/log" "github.com/google/syzkaller/pkg/osutil" "github.com/google/syzkaller/pkg/stats" "github.com/google/syzkaller/prog" @@ -30,6 +31,7 @@ type Runner struct { coverEdges bool filterSignal bool debug bool + debugTimeouts bool sysTarget *targets.Target stats *runnerStats finished chan bool @@ -163,13 +165,8 @@ func (runner *Runner) ConnectionLoop() error { if infoc == nil { select { case infoc = <-runner.infoc: - msg := &flatrpc.HostMessage{ - Msg: &flatrpc.HostMessages{ - Type: flatrpc.HostMessagesRawStateRequest, - Value: &flatrpc.StateRequest{}, - }, - } - if err := flatrpc.Send(runner.conn, msg); err != nil { + err := runner.sendStateRequest() + if err != nil { return err } default: @@ -192,7 +189,7 @@ func (runner *Runner) ConnectionLoop() error { time.Sleep(10 * time.Millisecond) continue } - raw, err := flatrpc.Recv[*flatrpc.ExecutorMessageRaw](runner.conn) + raw, err := wrappedRecv[*flatrpc.ExecutorMessageRaw](runner) if err != nil { return err } @@ -205,15 +202,19 @@ func (runner *Runner) ConnectionLoop() error { case *flatrpc.ExecResult: err = runner.handleExecResult(msg) case *flatrpc.StateResult: + buf := new(bytes.Buffer) + fmt.Fprintf(buf, "pending requests on the VM:") + for id := range runner.requests { + fmt.Fprintf(buf, " %v", id) + } + fmt.Fprintf(buf, "\n\n") + result := append(buf.Bytes(), msg.Data...) if infoc != nil { - buf := new(bytes.Buffer) - fmt.Fprintf(buf, "pending requests on the VM:") - for id := range runner.requests { - fmt.Fprintf(buf, " %v", id) - } - fmt.Fprintf(buf, "\n\n") - infoc <- append(buf.Bytes(), msg.Data...) + infoc <- result infoc = nil + } else { + // The request was solicited in detectTimeout(). + log.Logf(0, "status result: %s", result) } default: return fmt.Errorf("received unknown message type %T", msg) @@ -224,6 +225,49 @@ func (runner *Runner) ConnectionLoop() error { } } +func wrappedRecv[Raw flatrpc.RecvType[T], T any](runner *Runner) (*T, error) { + if runner.debugTimeouts { + abort := runner.detectTimeout() + defer close(abort) + } + return flatrpc.Recv[Raw](runner.conn) +} + +func (runner *Runner) detectTimeout() chan struct{} { + abort := make(chan struct{}) + go func() { + select { + case <-time.After(time.Minute): + log.Logf(0, "timed out waiting for executor reply, aborting the connection in 1 minute") + go func() { + time.Sleep(time.Minute) + runner.conn.Close() + }() + err := runner.sendStateRequest() + if err != nil { + log.Logf(0, "failed to send state request: %v", err) + return + } + + case <-abort: + return + case <-runner.finished: + return + } + }() + return abort +} + +func (runner *Runner) sendStateRequest() error { + msg := &flatrpc.HostMessage{ + Msg: &flatrpc.HostMessages{ + Type: flatrpc.HostMessagesRawStateRequest, + Value: &flatrpc.StateRequest{}, + }, + } + return flatrpc.Send(runner.conn, msg) +} + func (runner *Runner) sendRequest(req *queue.Request) error { if err := req.Validate(); err != nil { panic(err) diff --git a/pkg/runtest/run_test.go b/pkg/runtest/run_test.go index acf59c93d..f9a85bf52 100644 --- a/pkg/runtest/run_test.go +++ b/pkg/runtest/run_test.go @@ -453,9 +453,10 @@ func startRPCServer(t *testing.T, target *prog.Target, executor, vmArch string, Features: flatrpc.AllFeatures, Sandbox: flatrpc.ExecEnvSandboxNone, }, - VMArch: vmArch, - Procs: runtime.GOMAXPROCS(0), - Slowdown: 10, // to deflake slower tests + VMArch: vmArch, + Procs: runtime.GOMAXPROCS(0), + Slowdown: 10, // to deflake slower tests + DebugTimeouts: true, }, Executor: executor, Dir: dir, |
