aboutsummaryrefslogtreecommitdiffstats
path: root/pkg/rpcserver
diff options
context:
space:
mode:
authorAleksandr Nogikh <nogikh@google.com>2024-07-11 14:32:35 +0200
committerAleksandr Nogikh <nogikh@google.com>2024-07-15 08:01:39 +0000
commitf3af87f58e552092404c65e428d82a1a8146c622 (patch)
tree8bcd9c120b5d514fb32dac1f9b7e3b43dce52435 /pkg/rpcserver
parentc605e6a2949eb42a4ea12594e3e887837f27d2d2 (diff)
pkg/rpcserver: exit on connection loop abortion
For local rpcserver runs, we do not reboot the executor in case of errors. Moreover, if the error did not lead to the executor process exit, we may never detect that something went wrong. Return an error channel from CreateInstance() to be able to act on connection loop errors. Explicitly register the instance during local executions and exit from RunLocal() in case of connection problems.
Diffstat (limited to 'pkg/rpcserver')
-rw-r--r--pkg/rpcserver/local.go8
-rw-r--r--pkg/rpcserver/rpcserver.go19
-rw-r--r--pkg/rpcserver/runner.go1
3 files changed, 20 insertions, 8 deletions
diff --git a/pkg/rpcserver/local.go b/pkg/rpcserver/local.go
index 1886703e9..1420af606 100644
--- a/pkg/rpcserver/local.go
+++ b/pkg/rpcserver/local.go
@@ -42,7 +42,6 @@ func RunLocal(cfg *LocalConfig) error {
cfg.UseCoverEdges = true
cfg.FilterSignal = true
cfg.RPC = ":0"
- cfg.VMLess = true
cfg.PrintMachineCheck = log.V(1)
ctx := &local{
cfg: cfg,
@@ -58,8 +57,12 @@ func RunLocal(cfg *LocalConfig) error {
// for the race detector b/c it does not understand the synchronization via TCP socket connect/accept.
close(ctx.setupDone)
+ name := "local"
+ connErr := serv.CreateInstance(name, nil, nil)
+ defer serv.ShutdownInstance(name, true)
+
bin := cfg.Executor
- args := []string{"runner", "local", "localhost", fmt.Sprint(serv.Port)}
+ args := []string{"runner", name, "localhost", fmt.Sprint(serv.Port)}
if cfg.GDB {
bin = "gdb"
args = append([]string{
@@ -91,6 +94,7 @@ func RunLocal(cfg *LocalConfig) error {
select {
case <-shutdown:
case <-cfg.Context.Done():
+ case <-connErr:
case err := <-res:
cmdErr = fmt.Errorf("executor process exited: %w", err)
}
diff --git a/pkg/rpcserver/rpcserver.go b/pkg/rpcserver/rpcserver.go
index 85d4a2bd7..57abafdc9 100644
--- a/pkg/rpcserver/rpcserver.go
+++ b/pkg/rpcserver/rpcserver.go
@@ -212,6 +212,12 @@ func (serv *Server) handleConn(conn *flatrpc.Conn) {
return
}
+ err = serv.handleRunnerConn(runner, conn)
+ log.Logf(2, "runner %v: %v", name, err)
+ runner.resultCh <- err
+}
+
+func (serv *Server) handleRunnerConn(runner *Runner, conn *flatrpc.Conn) error {
opts := &handshakeConfig{
VMLess: serv.cfg.VMLess,
Files: serv.checker.RequiredFiles(),
@@ -227,21 +233,20 @@ func (serv *Server) handleConn(conn *flatrpc.Conn) {
opts.Features = serv.cfg.Features
}
- err = runner.Handshake(conn, opts)
+ err := runner.Handshake(conn, opts)
if err != nil {
log.Logf(1, "%v", err)
- return
+ return err
}
if serv.triagedCorpus.Load() {
if err := runner.SendCorpusTriaged(); err != nil {
log.Logf(2, "%v", err)
- return
+ return err
}
}
- err = serv.connectionLoop(runner)
- log.Logf(2, "runner %v: %v", name, err)
+ return serv.connectionLoop(runner)
}
func (serv *Server) handleMachineInfo(infoReq *flatrpc.InfoRequestRawT) (handshakeResult, error) {
@@ -393,7 +398,7 @@ func (serv *Server) printMachineCheck(checkFilesInfo []*flatrpc.FileInfo, enable
log.Logf(0, "machine check:\n%s", buf.Bytes())
}
-func (serv *Server) CreateInstance(name string, injectExec chan<- bool, updInfo dispatcher.UpdateInfo) {
+func (serv *Server) CreateInstance(name string, injectExec chan<- bool, updInfo dispatcher.UpdateInfo) chan error {
runner := &Runner{
source: serv.execSource,
cover: serv.cfg.Cover,
@@ -411,6 +416,7 @@ func (serv *Server) CreateInstance(name string, injectExec chan<- bool, updInfo
stats: serv.runnerStats,
procs: serv.cfg.Procs,
updInfo: updInfo,
+ resultCh: make(chan error, 1),
}
serv.mu.Lock()
defer serv.mu.Unlock()
@@ -418,6 +424,7 @@ func (serv *Server) CreateInstance(name string, injectExec chan<- bool, updInfo
panic(fmt.Sprintf("duplicate instance %s", name))
}
serv.runners[name] = runner
+ return runner.resultCh
}
// stopInstance prevents further request exchange requests.
diff --git a/pkg/rpcserver/runner.go b/pkg/rpcserver/runner.go
index ec4ad2076..691a5b5d5 100644
--- a/pkg/rpcserver/runner.go
+++ b/pkg/rpcserver/runner.go
@@ -44,6 +44,7 @@ type Runner struct {
lastExec *LastExecuting
rnd *rand.Rand
updInfo dispatcher.UpdateInfo
+ resultCh chan error
// The mutex protects all the fields below.
mu sync.Mutex