diff options
| author | Dmitry Vyukov <dvyukov@google.com> | 2024-10-21 11:53:44 +0200 |
|---|---|---|
| committer | Dmitry Vyukov <dvyukov@google.com> | 2024-10-24 09:34:38 +0000 |
| commit | 9fc8fe026baab9959459256f2d47f4bbf21d405a (patch) | |
| tree | 6d97a7ac2b8e69f5fa7a92a4b3824b1ad9e571c7 /pkg/rpcserver/runner.go | |
| parent | a85e9d5032fdf305457a6400bd3af4a8df6c45c4 (diff) | |
executor: better handling for hanged test processes
Currently we kill hanged processes and consider the corresponding test finished.
We don't kill/wait for the actual test subprocess (we don't know its pid to kill,
and waiting will presumably hang). This has 2 problems:
1. If the hanged process causes "task hung" report, we can't reproduce it,
since the test finished too long ago (manager thinks its finished and
discards the request).
2. The test process still consumed per-pid resources.
Explicitly detect and handle such cases:
Manager keeps these hanged tests forever,
and we assign a new proc id for future processes
(don't reuse the hanged one).
Diffstat (limited to 'pkg/rpcserver/runner.go')
| -rw-r--r-- | pkg/rpcserver/runner.go | 18 |
1 files changed, 17 insertions, 1 deletions
diff --git a/pkg/rpcserver/runner.go b/pkg/rpcserver/runner.go index 45b376a79..1af35b6d2 100644 --- a/pkg/rpcserver/runner.go +++ b/pkg/rpcserver/runner.go @@ -42,6 +42,7 @@ type Runner struct { nextRequestID int64 requests map[int64]*queue.Request executing map[int64]bool + hanged map[int64]bool lastExec *LastExecuting updInfo dispatcher.UpdateInfo resultCh chan error @@ -343,10 +344,13 @@ func (runner *Runner) sendRequest(req *queue.Request) error { func (runner *Runner) handleExecutingMessage(msg *flatrpc.ExecutingMessage) error { req := runner.requests[msg.Id] if req == nil { + if runner.hanged[msg.Id] { + return nil + } return fmt.Errorf("can't find executing request %v", msg.Id) } proc := int(msg.ProcId) - if proc < 0 || proc >= runner.procs { + if proc < 0 || proc >= prog.MaxPids { return fmt.Errorf("got bad proc id %v", proc) } runner.stats.statExecs.Add(1) @@ -372,6 +376,14 @@ func (runner *Runner) handleExecutingMessage(msg *flatrpc.ExecutingMessage) erro func (runner *Runner) handleExecResult(msg *flatrpc.ExecResult) error { req := runner.requests[msg.Id] if req == nil { + if runner.hanged[msg.Id] { + // Got result for a program that was previously reported hanged + // (probably execution was just extremely slow). Can't report result + // to pkg/fuzzer since it already handled completion of the request, + // but shouldn't report an error and crash the VM as well. + delete(runner.hanged, msg.Id) + return nil + } return fmt.Errorf("can't find executed request %v", msg.Id) } delete(runner.requests, msg.Id) @@ -410,6 +422,10 @@ func (runner *Runner) handleExecResult(msg *flatrpc.ExecResult) error { if msg.Error != "" { status = queue.ExecFailure resErr = errors.New(msg.Error) + } else if msg.Hanged { + status = queue.Hanged + runner.lastExec.Hanged(int(msg.Id), int(msg.Proc), req.Prog.Serialize(), osutil.MonotonicNano()) + runner.hanged[msg.Id] = true } req.Done(&queue.Result{ Executor: queue.ExecutorID{ |
