diff options
| author | Aleksandr Nogikh <nogikh@google.com> | 2024-07-11 14:32:35 +0200 |
|---|---|---|
| committer | Aleksandr Nogikh <nogikh@google.com> | 2024-07-15 08:01:39 +0000 |
| commit | f3af87f58e552092404c65e428d82a1a8146c622 (patch) | |
| tree | 8bcd9c120b5d514fb32dac1f9b7e3b43dce52435 /pkg/rpcserver/rpcserver.go | |
| parent | c605e6a2949eb42a4ea12594e3e887837f27d2d2 (diff) | |
pkg/rpcserver: exit on connection loop abortion
For local rpcserver runs, we do not reboot the executor in case of
errors. Moreover, if the error did not lead to the executor process
exit, we may never detect that something went wrong.
Return an error channel from CreateInstance() to be able to act on
connection loop errors.
Explicitly register the instance during local executions and exit from
RunLocal() in case of connection problems.
Diffstat (limited to 'pkg/rpcserver/rpcserver.go')
| -rw-r--r-- | pkg/rpcserver/rpcserver.go | 19 |
1 files changed, 13 insertions, 6 deletions
diff --git a/pkg/rpcserver/rpcserver.go b/pkg/rpcserver/rpcserver.go index 85d4a2bd7..57abafdc9 100644 --- a/pkg/rpcserver/rpcserver.go +++ b/pkg/rpcserver/rpcserver.go @@ -212,6 +212,12 @@ func (serv *Server) handleConn(conn *flatrpc.Conn) { return } + err = serv.handleRunnerConn(runner, conn) + log.Logf(2, "runner %v: %v", name, err) + runner.resultCh <- err +} + +func (serv *Server) handleRunnerConn(runner *Runner, conn *flatrpc.Conn) error { opts := &handshakeConfig{ VMLess: serv.cfg.VMLess, Files: serv.checker.RequiredFiles(), @@ -227,21 +233,20 @@ func (serv *Server) handleConn(conn *flatrpc.Conn) { opts.Features = serv.cfg.Features } - err = runner.Handshake(conn, opts) + err := runner.Handshake(conn, opts) if err != nil { log.Logf(1, "%v", err) - return + return err } if serv.triagedCorpus.Load() { if err := runner.SendCorpusTriaged(); err != nil { log.Logf(2, "%v", err) - return + return err } } - err = serv.connectionLoop(runner) - log.Logf(2, "runner %v: %v", name, err) + return serv.connectionLoop(runner) } func (serv *Server) handleMachineInfo(infoReq *flatrpc.InfoRequestRawT) (handshakeResult, error) { @@ -393,7 +398,7 @@ func (serv *Server) printMachineCheck(checkFilesInfo []*flatrpc.FileInfo, enable log.Logf(0, "machine check:\n%s", buf.Bytes()) } -func (serv *Server) CreateInstance(name string, injectExec chan<- bool, updInfo dispatcher.UpdateInfo) { +func (serv *Server) CreateInstance(name string, injectExec chan<- bool, updInfo dispatcher.UpdateInfo) chan error { runner := &Runner{ source: serv.execSource, cover: serv.cfg.Cover, @@ -411,6 +416,7 @@ func (serv *Server) CreateInstance(name string, injectExec chan<- bool, updInfo stats: serv.runnerStats, procs: serv.cfg.Procs, updInfo: updInfo, + resultCh: make(chan error, 1), } serv.mu.Lock() defer serv.mu.Unlock() @@ -418,6 +424,7 @@ func (serv *Server) CreateInstance(name string, injectExec chan<- bool, updInfo panic(fmt.Sprintf("duplicate instance %s", name)) } serv.runners[name] = runner + return runner.resultCh } // stopInstance prevents further request exchange requests. |
