aboutsummaryrefslogtreecommitdiffstats
path: root/pkg/rpcserver
diff options
context:
space:
mode:
authorDmitry Vyukov <dvyukov@google.com>2024-10-21 11:53:44 +0200
committerDmitry Vyukov <dvyukov@google.com>2024-10-24 09:34:38 +0000
commit9fc8fe026baab9959459256f2d47f4bbf21d405a (patch)
tree6d97a7ac2b8e69f5fa7a92a4b3824b1ad9e571c7 /pkg/rpcserver
parenta85e9d5032fdf305457a6400bd3af4a8df6c45c4 (diff)
executor: better handling for hanged test processes
Currently we kill hanged processes and consider the corresponding test finished. We don't kill/wait for the actual test subprocess (we don't know its pid to kill, and waiting will presumably hang). This has 2 problems: 1. If the hanged process causes "task hung" report, we can't reproduce it, since the test finished too long ago (manager thinks its finished and discards the request). 2. The test process still consumed per-pid resources. Explicitly detect and handle such cases: Manager keeps these hanged tests forever, and we assign a new proc id for future processes (don't reuse the hanged one).
Diffstat (limited to 'pkg/rpcserver')
-rw-r--r--pkg/rpcserver/last_executing.go21
-rw-r--r--pkg/rpcserver/last_executing_test.go24
-rw-r--r--pkg/rpcserver/rpcserver.go12
-rw-r--r--pkg/rpcserver/runner.go18
4 files changed, 65 insertions, 10 deletions
diff --git a/pkg/rpcserver/last_executing.go b/pkg/rpcserver/last_executing.go
index 29b854df2..6064cfe14 100644
--- a/pkg/rpcserver/last_executing.go
+++ b/pkg/rpcserver/last_executing.go
@@ -10,6 +10,7 @@ import (
"time"
"github.com/google/syzkaller/pkg/report"
+ "github.com/google/syzkaller/prog"
)
// LastExecuting keeps the given number of last executed programs
@@ -17,6 +18,7 @@ import (
type LastExecuting struct {
count int
procs []ExecRecord
+ hanged []ExecRecord // hanged programs, kept forever
positions []int
}
@@ -36,12 +38,12 @@ func MakeLastExecuting(procs, count int) *LastExecuting {
}
// Note execution of the 'prog' on 'proc' at time 'now'.
-func (last *LastExecuting) Note(id, proc int, prog []byte, now time.Duration) {
+func (last *LastExecuting) Note(id, proc int, progData []byte, now time.Duration) {
pos := &last.positions[proc]
last.procs[proc*last.count+*pos] = ExecRecord{
ID: id,
Proc: proc,
- Prog: prog,
+ Prog: progData,
Time: now,
}
*pos++
@@ -50,13 +52,26 @@ func (last *LastExecuting) Note(id, proc int, prog []byte, now time.Duration) {
}
}
+// Note a hanged program.
+func (last *LastExecuting) Hanged(id, proc int, progData []byte, now time.Duration) {
+ last.hanged = append(last.hanged, ExecRecord{
+ ID: id,
+ // Use unique proc for these programs b/c pkg/repro will either use the program with matching ID,
+ // of take the last program from each proc, and we want the hanged programs to be included.
+ Proc: prog.MaxPids + len(last.hanged),
+ Prog: progData,
+ Time: now,
+ })
+}
+
// Returns a sorted set of last executing programs.
// The records are sorted by time in ascending order.
// ExecRecord.Time is the difference in start executing time between this
// program and the program that started executing last.
func (last *LastExecuting) Collect() []ExecRecord {
- procs := last.procs
+ procs := append(last.procs, last.hanged...)
last.procs = nil // The type must not be used after this.
+ last.hanged = nil
sort.Slice(procs, func(i, j int) bool {
return procs[i].Time < procs[j].Time
})
diff --git a/pkg/rpcserver/last_executing_test.go b/pkg/rpcserver/last_executing_test.go
index c9f3cc2bf..4384e9a2b 100644
--- a/pkg/rpcserver/last_executing_test.go
+++ b/pkg/rpcserver/last_executing_test.go
@@ -15,7 +15,7 @@ func TestLastExecutingEmpty(t *testing.T) {
}
func TestLastExecuting(t *testing.T) {
- last := MakeLastExecuting(10, 3)
+ last := MakeLastExecuting(21, 3)
last.Note(1, 0, []byte("prog1"), 1)
last.Note(2, 1, []byte("prog2"), 2)
@@ -54,3 +54,25 @@ func TestLastExecuting(t *testing.T) {
{ID: 13, Proc: 8, Prog: []byte("prog13"), Time: 0},
})
}
+
+func TestLastExecutingHanged(t *testing.T) {
+ last := MakeLastExecuting(1, 3)
+ last.Note(1, 0, []byte("prog1"), 10)
+ last.Note(2, 0, []byte("prog2"), 20)
+ last.Hanged(2, 0, []byte("prog2"), 25)
+ last.Note(3, 0, []byte("prog3"), 30)
+ last.Note(4, 0, []byte("prog4"), 40)
+ last.Note(5, 0, []byte("prog5"), 50)
+ last.Hanged(5, 0, []byte("prog5"), 55)
+ last.Note(6, 0, []byte("prog6"), 60)
+ last.Note(7, 0, []byte("prog7"), 70)
+ last.Note(8, 0, []byte("prog8"), 80)
+ last.Note(9, 0, []byte("prog9"), 90)
+ assert.Equal(t, last.Collect(), []ExecRecord{
+ {ID: 2, Proc: 32, Prog: []byte("prog2"), Time: 65},
+ {ID: 5, Proc: 33, Prog: []byte("prog5"), Time: 35},
+ {ID: 7, Proc: 0, Prog: []byte("prog7"), Time: 20},
+ {ID: 8, Proc: 0, Prog: []byte("prog8"), Time: 10},
+ {ID: 9, Proc: 0, Prog: []byte("prog9"), Time: 0},
+ })
+}
diff --git a/pkg/rpcserver/rpcserver.go b/pkg/rpcserver/rpcserver.go
index 4a0587c53..53181dd2b 100644
--- a/pkg/rpcserver/rpcserver.go
+++ b/pkg/rpcserver/rpcserver.go
@@ -416,11 +416,13 @@ func (serv *server) CreateInstance(id int, injectExec chan<- bool, updInfo dispa
infoc: make(chan chan []byte),
requests: make(map[int64]*queue.Request),
executing: make(map[int64]bool),
- lastExec: MakeLastExecuting(serv.cfg.Procs, 6),
- stats: serv.runnerStats,
- procs: serv.cfg.Procs,
- updInfo: updInfo,
- resultCh: make(chan error, 1),
+ hanged: make(map[int64]bool),
+ // Executor may report proc IDs that are larger than serv.cfg.Procs.
+ lastExec: MakeLastExecuting(prog.MaxPids, 6),
+ stats: serv.runnerStats,
+ procs: serv.cfg.Procs,
+ updInfo: updInfo,
+ resultCh: make(chan error, 1),
}
serv.mu.Lock()
defer serv.mu.Unlock()
diff --git a/pkg/rpcserver/runner.go b/pkg/rpcserver/runner.go
index 45b376a79..1af35b6d2 100644
--- a/pkg/rpcserver/runner.go
+++ b/pkg/rpcserver/runner.go
@@ -42,6 +42,7 @@ type Runner struct {
nextRequestID int64
requests map[int64]*queue.Request
executing map[int64]bool
+ hanged map[int64]bool
lastExec *LastExecuting
updInfo dispatcher.UpdateInfo
resultCh chan error
@@ -343,10 +344,13 @@ func (runner *Runner) sendRequest(req *queue.Request) error {
func (runner *Runner) handleExecutingMessage(msg *flatrpc.ExecutingMessage) error {
req := runner.requests[msg.Id]
if req == nil {
+ if runner.hanged[msg.Id] {
+ return nil
+ }
return fmt.Errorf("can't find executing request %v", msg.Id)
}
proc := int(msg.ProcId)
- if proc < 0 || proc >= runner.procs {
+ if proc < 0 || proc >= prog.MaxPids {
return fmt.Errorf("got bad proc id %v", proc)
}
runner.stats.statExecs.Add(1)
@@ -372,6 +376,14 @@ func (runner *Runner) handleExecutingMessage(msg *flatrpc.ExecutingMessage) erro
func (runner *Runner) handleExecResult(msg *flatrpc.ExecResult) error {
req := runner.requests[msg.Id]
if req == nil {
+ if runner.hanged[msg.Id] {
+ // Got result for a program that was previously reported hanged
+ // (probably execution was just extremely slow). Can't report result
+ // to pkg/fuzzer since it already handled completion of the request,
+ // but shouldn't report an error and crash the VM as well.
+ delete(runner.hanged, msg.Id)
+ return nil
+ }
return fmt.Errorf("can't find executed request %v", msg.Id)
}
delete(runner.requests, msg.Id)
@@ -410,6 +422,10 @@ func (runner *Runner) handleExecResult(msg *flatrpc.ExecResult) error {
if msg.Error != "" {
status = queue.ExecFailure
resErr = errors.New(msg.Error)
+ } else if msg.Hanged {
+ status = queue.Hanged
+ runner.lastExec.Hanged(int(msg.Id), int(msg.Proc), req.Prog.Serialize(), osutil.MonotonicNano())
+ runner.hanged[msg.Id] = true
}
req.Done(&queue.Result{
Executor: queue.ExecutorID{