aboutsummaryrefslogtreecommitdiffstats
path: root/pkg
diff options
context:
space:
mode:
authorAleksandr Nogikh <nogikh@google.com>2024-07-11 11:43:41 +0200
committerAleksandr Nogikh <nogikh@google.com>2024-07-11 10:39:33 +0000
commitbaf24fbe840b2b9fd49636f49fd6fa42f39290b9 (patch)
tree84d4b385c05fc24bf66c3fd73f359ae4e65d8239 /pkg
parenteba8394c4eea3a25682631625fea564acc0c0356 (diff)
pkg/rpcserver: debug executor stalls
In some cases, the executor seems to be mysteriously silent when we were awaiting a reply. During pkg/runtest tests, give it 1 minute to prepare a reply, then try to request the current state and abort the connection.
Diffstat (limited to 'pkg')
-rw-r--r--pkg/flatrpc/conn.go10
-rw-r--r--pkg/rpcserver/rpcserver.go41
-rw-r--r--pkg/rpcserver/runner.go74
-rw-r--r--pkg/runtest/run_test.go7
4 files changed, 91 insertions, 41 deletions
diff --git a/pkg/flatrpc/conn.go b/pkg/flatrpc/conn.go
index 9d8d1ce59..56afdfca4 100644
--- a/pkg/flatrpc/conn.go
+++ b/pkg/flatrpc/conn.go
@@ -109,14 +109,16 @@ func Send[T sendMsg](c *Conn, msg T) error {
return nil
}
+type RecvType[T any] interface {
+ UnPack() *T
+ flatbuffers.FlatBuffer
+}
+
// Recv receives an RPC message.
// The type T is supposed to be a pointer to a normal flatbuffers type (not ending with T, e.g. *ConnectRequestRaw).
// Receiving should be done from a single goroutine, the received message is valid
// only until the next Recv call (messages share the same underlying receive buffer).
-func Recv[Raw interface {
- UnPack() *T
- flatbuffers.FlatBuffer
-}, T any](c *Conn) (res *T, err0 error) {
+func Recv[Raw RecvType[T], T any](c *Conn) (res *T, err0 error) {
defer func() {
if err1 := recover(); err1 != nil {
if err2, ok := err1.(error); ok {
diff --git a/pkg/rpcserver/rpcserver.go b/pkg/rpcserver/rpcserver.go
index a6cae67cd..85d4a2bd7 100644
--- a/pkg/rpcserver/rpcserver.go
+++ b/pkg/rpcserver/rpcserver.go
@@ -41,10 +41,12 @@ type Config struct {
// Disabled for gVisor/Starnix which are not Linux.
FilterSignal bool
PrintMachineCheck bool
- Procs int
- Slowdown int
- pcBase uint64
- localModules []*vminfo.KernelModule
+ // Abort early on syz-executor not replying to requests and print extra debugging information.
+ DebugTimeouts bool
+ Procs int
+ Slowdown int
+ pcBase uint64
+ localModules []*vminfo.KernelModule
}
type Manager interface {
@@ -393,21 +395,22 @@ func (serv *Server) printMachineCheck(checkFilesInfo []*flatrpc.FileInfo, enable
func (serv *Server) CreateInstance(name string, injectExec chan<- bool, updInfo dispatcher.UpdateInfo) {
runner := &Runner{
- source: serv.execSource,
- cover: serv.cfg.Cover,
- coverEdges: serv.cfg.UseCoverEdges,
- filterSignal: serv.cfg.FilterSignal,
- debug: serv.cfg.Debug,
- sysTarget: serv.sysTarget,
- injectExec: injectExec,
- infoc: make(chan chan []byte),
- requests: make(map[int64]*queue.Request),
- executing: make(map[int64]bool),
- lastExec: MakeLastExecuting(serv.cfg.Procs, 6),
- rnd: rand.New(rand.NewSource(time.Now().UnixNano())),
- stats: serv.runnerStats,
- procs: serv.cfg.Procs,
- updInfo: updInfo,
+ source: serv.execSource,
+ cover: serv.cfg.Cover,
+ coverEdges: serv.cfg.UseCoverEdges,
+ filterSignal: serv.cfg.FilterSignal,
+ debug: serv.cfg.Debug,
+ debugTimeouts: serv.cfg.DebugTimeouts,
+ sysTarget: serv.sysTarget,
+ injectExec: injectExec,
+ infoc: make(chan chan []byte),
+ requests: make(map[int64]*queue.Request),
+ executing: make(map[int64]bool),
+ lastExec: MakeLastExecuting(serv.cfg.Procs, 6),
+ rnd: rand.New(rand.NewSource(time.Now().UnixNano())),
+ stats: serv.runnerStats,
+ procs: serv.cfg.Procs,
+ updInfo: updInfo,
}
serv.mu.Lock()
defer serv.mu.Unlock()
diff --git a/pkg/rpcserver/runner.go b/pkg/rpcserver/runner.go
index c45598b9c..ec4ad2076 100644
--- a/pkg/rpcserver/runner.go
+++ b/pkg/rpcserver/runner.go
@@ -16,6 +16,7 @@ import (
"github.com/google/syzkaller/pkg/cover"
"github.com/google/syzkaller/pkg/flatrpc"
"github.com/google/syzkaller/pkg/fuzzer/queue"
+ "github.com/google/syzkaller/pkg/log"
"github.com/google/syzkaller/pkg/osutil"
"github.com/google/syzkaller/pkg/stats"
"github.com/google/syzkaller/prog"
@@ -30,6 +31,7 @@ type Runner struct {
coverEdges bool
filterSignal bool
debug bool
+ debugTimeouts bool
sysTarget *targets.Target
stats *runnerStats
finished chan bool
@@ -163,13 +165,8 @@ func (runner *Runner) ConnectionLoop() error {
if infoc == nil {
select {
case infoc = <-runner.infoc:
- msg := &flatrpc.HostMessage{
- Msg: &flatrpc.HostMessages{
- Type: flatrpc.HostMessagesRawStateRequest,
- Value: &flatrpc.StateRequest{},
- },
- }
- if err := flatrpc.Send(runner.conn, msg); err != nil {
+ err := runner.sendStateRequest()
+ if err != nil {
return err
}
default:
@@ -192,7 +189,7 @@ func (runner *Runner) ConnectionLoop() error {
time.Sleep(10 * time.Millisecond)
continue
}
- raw, err := flatrpc.Recv[*flatrpc.ExecutorMessageRaw](runner.conn)
+ raw, err := wrappedRecv[*flatrpc.ExecutorMessageRaw](runner)
if err != nil {
return err
}
@@ -205,15 +202,19 @@ func (runner *Runner) ConnectionLoop() error {
case *flatrpc.ExecResult:
err = runner.handleExecResult(msg)
case *flatrpc.StateResult:
+ buf := new(bytes.Buffer)
+ fmt.Fprintf(buf, "pending requests on the VM:")
+ for id := range runner.requests {
+ fmt.Fprintf(buf, " %v", id)
+ }
+ fmt.Fprintf(buf, "\n\n")
+ result := append(buf.Bytes(), msg.Data...)
if infoc != nil {
- buf := new(bytes.Buffer)
- fmt.Fprintf(buf, "pending requests on the VM:")
- for id := range runner.requests {
- fmt.Fprintf(buf, " %v", id)
- }
- fmt.Fprintf(buf, "\n\n")
- infoc <- append(buf.Bytes(), msg.Data...)
+ infoc <- result
infoc = nil
+ } else {
+ // The request was solicited in detectTimeout().
+ log.Logf(0, "status result: %s", result)
}
default:
return fmt.Errorf("received unknown message type %T", msg)
@@ -224,6 +225,49 @@ func (runner *Runner) ConnectionLoop() error {
}
}
+func wrappedRecv[Raw flatrpc.RecvType[T], T any](runner *Runner) (*T, error) {
+ if runner.debugTimeouts {
+ abort := runner.detectTimeout()
+ defer close(abort)
+ }
+ return flatrpc.Recv[Raw](runner.conn)
+}
+
+func (runner *Runner) detectTimeout() chan struct{} {
+ abort := make(chan struct{})
+ go func() {
+ select {
+ case <-time.After(time.Minute):
+ log.Logf(0, "timed out waiting for executor reply, aborting the connection in 1 minute")
+ go func() {
+ time.Sleep(time.Minute)
+ runner.conn.Close()
+ }()
+ err := runner.sendStateRequest()
+ if err != nil {
+ log.Logf(0, "failed to send state request: %v", err)
+ return
+ }
+
+ case <-abort:
+ return
+ case <-runner.finished:
+ return
+ }
+ }()
+ return abort
+}
+
+func (runner *Runner) sendStateRequest() error {
+ msg := &flatrpc.HostMessage{
+ Msg: &flatrpc.HostMessages{
+ Type: flatrpc.HostMessagesRawStateRequest,
+ Value: &flatrpc.StateRequest{},
+ },
+ }
+ return flatrpc.Send(runner.conn, msg)
+}
+
func (runner *Runner) sendRequest(req *queue.Request) error {
if err := req.Validate(); err != nil {
panic(err)
diff --git a/pkg/runtest/run_test.go b/pkg/runtest/run_test.go
index acf59c93d..f9a85bf52 100644
--- a/pkg/runtest/run_test.go
+++ b/pkg/runtest/run_test.go
@@ -453,9 +453,10 @@ func startRPCServer(t *testing.T, target *prog.Target, executor, vmArch string,
Features: flatrpc.AllFeatures,
Sandbox: flatrpc.ExecEnvSandboxNone,
},
- VMArch: vmArch,
- Procs: runtime.GOMAXPROCS(0),
- Slowdown: 10, // to deflake slower tests
+ VMArch: vmArch,
+ Procs: runtime.GOMAXPROCS(0),
+ Slowdown: 10, // to deflake slower tests
+ DebugTimeouts: true,
},
Executor: executor,
Dir: dir,