From 9fc8fe026baab9959459256f2d47f4bbf21d405a Mon Sep 17 00:00:00 2001 From: Dmitry Vyukov Date: Mon, 21 Oct 2024 11:53:44 +0200 Subject: executor: better handling for hanged test processes Currently we kill hanged processes and consider the corresponding test finished. We don't kill/wait for the actual test subprocess (we don't know its pid to kill, and waiting will presumably hang). This has 2 problems: 1. If the hanged process causes "task hung" report, we can't reproduce it, since the test finished too long ago (manager thinks its finished and discards the request). 2. The test process still consumed per-pid resources. Explicitly detect and handle such cases: Manager keeps these hanged tests forever, and we assign a new proc id for future processes (don't reuse the hanged one). --- executor/executor.cc | 7 +-- executor/executor_runner.h | 82 ++++++++++++++++++++++++++++++------ executor/snapshot.h | 2 +- pkg/flatrpc/flatrpc.fbs | 3 ++ pkg/flatrpc/flatrpc.go | 28 +++++++++--- pkg/flatrpc/flatrpc.h | 22 +++++++++- pkg/fuzzer/fuzzer.go | 9 ++-- pkg/fuzzer/queue/queue.go | 10 ++++- pkg/fuzzer/queue/retry.go | 28 ++++++++---- pkg/repro/repro.go | 2 + pkg/rpcserver/last_executing.go | 21 +++++++-- pkg/rpcserver/last_executing_test.go | 24 ++++++++++- pkg/rpcserver/rpcserver.go | 12 +++--- pkg/rpcserver/runner.go | 18 +++++++- 14 files changed, 221 insertions(+), 47 deletions(-) diff --git a/executor/executor.cc b/executor/executor.cc index 305675bce..5b5e06422 100644 --- a/executor/executor.cc +++ b/executor/executor.cc @@ -470,7 +470,8 @@ static void setup_control_pipes(); static bool coverage_filter(uint64 pc); static rpc::ComparisonRaw convert(const kcov_comparison_t& cmp); static flatbuffers::span finish_output(OutputData* output, int proc_id, uint64 req_id, uint32 num_calls, - uint64 elapsed, uint64 freshness, uint32 status, const std::vector* process_output); + uint64 elapsed, uint64 freshness, uint32 status, bool hanged, + const std::vector* process_output); static void parse_execute(const execute_req& req); static void parse_handshake(const handshake_req& req); @@ -1343,7 +1344,7 @@ void write_extra_output() } flatbuffers::span finish_output(OutputData* output, int proc_id, uint64 req_id, uint32 num_calls, uint64 elapsed, - uint64 freshness, uint32 status, const std::vector* process_output) + uint64 freshness, uint32 status, bool hanged, const std::vector* process_output) { // In snapshot mode the output size is fixed and output_size is always initialized, so use it. int out_size = flag_snapshot ? output_size : output->size.load(std::memory_order_relaxed) ? @@ -1376,7 +1377,7 @@ flatbuffers::span finish_output(OutputData* output, int proc_id, uint64 flatbuffers::Offset> output_off = 0; if (process_output) output_off = fbb.CreateVector(*process_output); - auto exec_off = rpc::CreateExecResultRaw(fbb, req_id, proc_id, output_off, error_off, prog_info_off); + auto exec_off = rpc::CreateExecResultRaw(fbb, req_id, proc_id, output_off, hanged, error_off, prog_info_off); auto msg_off = rpc::CreateExecutorMessageRaw(fbb, rpc::ExecutorMessagesRaw::ExecResult, flatbuffers::Offset(exec_off.o)); fbb.FinishSizePrefixed(msg_off); diff --git a/executor/executor_runner.h b/executor/executor_runner.h index 30a5f3a6d..5680f8fc2 100644 --- a/executor/executor_runner.h +++ b/executor/executor_runner.h @@ -27,16 +27,62 @@ inline std::ostream& operator<<(std::ostream& ss, const rpc::ExecRequestRawT& re << "\n"; } +// ProcIDPool allows to reuse a set of unique proc IDs across a set of subprocesses. +// +// When a subprocess hangs, it's a bit unclear what to do (we don't have means to kill +// the whole tree of its children, and waiting for all them will presumably hang as well). +// Later there may appear a "task hung" report from the kernel, so we don't want to terminate +// the VM immidiatly. But the "task hung" report may also not appear at all, so we can't +// just wait for a hanged subprocesses forever. +// +// So in that case we kill/wait just the top subprocesses, and give it a new proc ID +// (since some resources associated with the old proc ID may still be used by the old +// unterminated test processes). However, we don't have infinite number of proc IDs, +// so we recycle them in FIFO order. This is not ideal, but it looks like the best +// practical solution. +class ProcIDPool +{ +public: + ProcIDPool(int num_procs) + { + // Theoretically we have 32 procs (prog.MaxPids), but there are some limitations in descriptions + // that make them work well only for up to 10 procs. For example, we form /dev/loopN + // device name using proc['0', 1, int8]. When these limitations are fixed, + // we can use all 32 here (prog.MaxPids) + constexpr int kNumGoodProcs = 10; + for (int i = 0; i < std::max(num_procs, kNumGoodProcs); i++) + ids_.push_back(i); + } + + int Alloc(int old = -1) + { + if (old >= 0) + ids_.push_back(old); + if (ids_.empty()) + fail("out of proc ids"); + int id = ids_.front(); + ids_.pop_front(); + return id; + } + +private: + std::deque ids_; + + ProcIDPool(const ProcIDPool&) = delete; + ProcIDPool& operator=(const ProcIDPool&) = delete; +}; + // Proc represents one subprocess that runs tests (re-execed syz-executor with 'exec' argument). // The object is persistent and re-starts subprocess when it crashes. class Proc { public: - Proc(Connection& conn, const char* bin, int id, int& restarting, const bool& corpus_triaged, int max_signal_fd, int cover_filter_fd, + Proc(Connection& conn, const char* bin, int id, ProcIDPool& proc_id_pool, int& restarting, const bool& corpus_triaged, int max_signal_fd, int cover_filter_fd, bool use_cover_edges, bool is_kernel_64_bit, uint32 slowdown, uint32 syscall_timeout_ms, uint32 program_timeout_ms) : conn_(conn), bin_(bin), - id_(id), + proc_id_pool_(proc_id_pool), + id_(proc_id_pool.Alloc()), restarting_(restarting), corpus_triaged_(corpus_triaged), max_signal_fd_(max_signal_fd), @@ -92,7 +138,7 @@ public: // fork server is enabled, so we use quite large timeout. Child process can be slow // due to global locks in namespaces and other things, so let's better wait than // report false misleading crashes. - uint64 timeout = 2 * program_timeout_ms_; + uint64 timeout = 3 * program_timeout_ms_; #else uint64 timeout = program_timeout_ms_; #endif @@ -134,7 +180,8 @@ private: Connection& conn_; const char* const bin_; - const int id_; + ProcIDPool& proc_id_pool_; + int id_; int& restarting_; const bool& corpus_triaged_; const int max_signal_fd_; @@ -215,7 +262,15 @@ private: while (ReadOutput()) ; } - HandleCompletion(status); + bool hanged = SYZ_EXECUTOR_USES_FORK_SERVER && state_ == State::Executing; + HandleCompletion(status, hanged); + if (hanged) { + // If the process has hanged, it may still be using per-proc resources, + // so allocate a fresh proc id. + int new_id = proc_id_pool_.Alloc(id_); + debug("proc %d: changing proc id to %d\n", id_, new_id); + id_ = new_id; + } } else if (attempts_ > 3) sleep_ms(100 * attempts_); Start(); @@ -350,7 +405,7 @@ private: } } - void HandleCompletion(uint32 status) + void HandleCompletion(uint32 status, bool hanged = false) { if (!msg_) fail("don't have executed msg"); @@ -370,7 +425,7 @@ private: } } uint32 num_calls = read_input(&prog_data); - auto data = finish_output(resp_mem_, id_, msg_->id, num_calls, elapsed, freshness_++, status, output); + auto data = finish_output(resp_mem_, id_, msg_->id, num_calls, elapsed, freshness_++, status, hanged, output); conn_.Send(data.data(), data.size()); resp_mem_->Reset(); @@ -458,12 +513,14 @@ public: : conn_(conn), vm_index_(vm_index) { - size_t num_procs = Handshake(); + int num_procs = Handshake(); + proc_id_pool_.emplace(num_procs); int max_signal_fd = max_signal_ ? max_signal_->FD() : -1; int cover_filter_fd = cover_filter_ ? cover_filter_->FD() : -1; - for (size_t i = 0; i < num_procs; i++) - procs_.emplace_back(new Proc(conn, bin, i, restarting_, corpus_triaged_, max_signal_fd, cover_filter_fd, - use_cover_edges_, is_kernel_64_bit_, slowdown_, syscall_timeout_ms_, program_timeout_ms_)); + for (int i = 0; i < num_procs; i++) + procs_.emplace_back(new Proc(conn, bin, i, *proc_id_pool_, restarting_, corpus_triaged_, + max_signal_fd, cover_filter_fd, use_cover_edges_, is_kernel_64_bit_, slowdown_, + syscall_timeout_ms_, program_timeout_ms_)); for (;;) Loop(); @@ -474,6 +531,7 @@ private: const int vm_index_; std::optional max_signal_; std::optional cover_filter_; + std::optional proc_id_pool_; std::vector> procs_; std::deque requests_; std::vector leak_frames_; @@ -545,7 +603,7 @@ private: failmsg("bad restarting", "restarting=%d", restarting_); } - size_t Handshake() + int Handshake() { rpc::ConnectRequestRawT conn_req; conn_req.id = vm_index_; diff --git a/executor/snapshot.h b/executor/snapshot.h index ce3e355b0..462fb2c56 100644 --- a/executor/snapshot.h +++ b/executor/snapshot.h @@ -251,7 +251,7 @@ NORETURN static void SnapshotDone(bool failed) debug("SnapshotDone\n"); CoverAccessScope scope(nullptr); uint32 num_calls = output_data->num_calls.load(std::memory_order_relaxed); - auto data = finish_output(output_data, 0, 0, num_calls, 0, 0, failed ? kFailStatus : 0, nullptr); + auto data = finish_output(output_data, 0, 0, num_calls, 0, 0, failed ? kFailStatus : 0, false, nullptr); ivs.hdr->output_offset = data.data() - reinterpret_cast(ivs.hdr); ivs.hdr->output_size = data.size(); SnapshotSetState(failed ? rpc::SnapshotState::Failed : rpc::SnapshotState::Executed); diff --git a/pkg/flatrpc/flatrpc.fbs b/pkg/flatrpc/flatrpc.fbs index 690286681..51066d9a4 100644 --- a/pkg/flatrpc/flatrpc.fbs +++ b/pkg/flatrpc/flatrpc.fbs @@ -243,6 +243,9 @@ table ExecResultRaw { id :int64; proc :int32; output :[uint8]; + // The program has hanged and we were not able to properly join it. + // So in some sense it's still running (e.g. can trigger a delayed kernel hang report). + hanged :bool; error :string; info :ProgInfoRaw; } diff --git a/pkg/flatrpc/flatrpc.go b/pkg/flatrpc/flatrpc.go index ff2f4c623..0a9c0e0d0 100644 --- a/pkg/flatrpc/flatrpc.go +++ b/pkg/flatrpc/flatrpc.go @@ -3023,6 +3023,7 @@ type ExecResultRawT struct { Id int64 `json:"id"` Proc int32 `json:"proc"` Output []byte `json:"output"` + Hanged bool `json:"hanged"` Error string `json:"error"` Info *ProgInfoRawT `json:"info"` } @@ -3041,6 +3042,7 @@ func (t *ExecResultRawT) Pack(builder *flatbuffers.Builder) flatbuffers.UOffsetT ExecResultRawAddId(builder, t.Id) ExecResultRawAddProc(builder, t.Proc) ExecResultRawAddOutput(builder, outputOffset) + ExecResultRawAddHanged(builder, t.Hanged) ExecResultRawAddError(builder, errorOffset) ExecResultRawAddInfo(builder, infoOffset) return ExecResultRawEnd(builder) @@ -3050,6 +3052,7 @@ func (rcv *ExecResultRaw) UnPackTo(t *ExecResultRawT) { t.Id = rcv.Id() t.Proc = rcv.Proc() t.Output = rcv.OutputBytes() + t.Hanged = rcv.Hanged() t.Error = string(rcv.Error()) t.Info = rcv.Info(nil).UnPack() } @@ -3148,8 +3151,20 @@ func (rcv *ExecResultRaw) MutateOutput(j int, n byte) bool { return false } -func (rcv *ExecResultRaw) Error() []byte { +func (rcv *ExecResultRaw) Hanged() bool { o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) + if o != 0 { + return rcv._tab.GetBool(o + rcv._tab.Pos) + } + return false +} + +func (rcv *ExecResultRaw) MutateHanged(n bool) bool { + return rcv._tab.MutateBoolSlot(10, n) +} + +func (rcv *ExecResultRaw) Error() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(12)) if o != 0 { return rcv._tab.ByteVector(o + rcv._tab.Pos) } @@ -3157,7 +3172,7 @@ func (rcv *ExecResultRaw) Error() []byte { } func (rcv *ExecResultRaw) Info(obj *ProgInfoRaw) *ProgInfoRaw { - o := flatbuffers.UOffsetT(rcv._tab.Offset(12)) + o := flatbuffers.UOffsetT(rcv._tab.Offset(14)) if o != 0 { x := rcv._tab.Indirect(o + rcv._tab.Pos) if obj == nil { @@ -3170,7 +3185,7 @@ func (rcv *ExecResultRaw) Info(obj *ProgInfoRaw) *ProgInfoRaw { } func ExecResultRawStart(builder *flatbuffers.Builder) { - builder.StartObject(5) + builder.StartObject(6) } func ExecResultRawAddId(builder *flatbuffers.Builder, id int64) { builder.PrependInt64Slot(0, id, 0) @@ -3184,11 +3199,14 @@ func ExecResultRawAddOutput(builder *flatbuffers.Builder, output flatbuffers.UOf func ExecResultRawStartOutputVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { return builder.StartVector(1, numElems, 1) } +func ExecResultRawAddHanged(builder *flatbuffers.Builder, hanged bool) { + builder.PrependBoolSlot(3, hanged, false) +} func ExecResultRawAddError(builder *flatbuffers.Builder, error flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(3, flatbuffers.UOffsetT(error), 0) + builder.PrependUOffsetTSlot(4, flatbuffers.UOffsetT(error), 0) } func ExecResultRawAddInfo(builder *flatbuffers.Builder, info flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(4, flatbuffers.UOffsetT(info), 0) + builder.PrependUOffsetTSlot(5, flatbuffers.UOffsetT(info), 0) } func ExecResultRawEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { return builder.EndObject() diff --git a/pkg/flatrpc/flatrpc.h b/pkg/flatrpc/flatrpc.h index d1496630e..5232114b3 100644 --- a/pkg/flatrpc/flatrpc.h +++ b/pkg/flatrpc/flatrpc.h @@ -2368,6 +2368,7 @@ struct ExecResultRawT : public flatbuffers::NativeTable { int64_t id = 0; int32_t proc = 0; std::vector output{}; + bool hanged = false; std::string error{}; std::unique_ptr info{}; ExecResultRawT() = default; @@ -2383,8 +2384,9 @@ struct ExecResultRaw FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { VT_ID = 4, VT_PROC = 6, VT_OUTPUT = 8, - VT_ERROR = 10, - VT_INFO = 12 + VT_HANGED = 10, + VT_ERROR = 12, + VT_INFO = 14 }; int64_t id() const { return GetField(VT_ID, 0); @@ -2395,6 +2397,9 @@ struct ExecResultRaw FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { const flatbuffers::Vector *output() const { return GetPointer *>(VT_OUTPUT); } + bool hanged() const { + return GetField(VT_HANGED, 0) != 0; + } const flatbuffers::String *error() const { return GetPointer(VT_ERROR); } @@ -2407,6 +2412,7 @@ struct ExecResultRaw FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { VerifyField(verifier, VT_PROC, 4) && VerifyOffset(verifier, VT_OUTPUT) && verifier.VerifyVector(output()) && + VerifyField(verifier, VT_HANGED, 1) && VerifyOffset(verifier, VT_ERROR) && verifier.VerifyString(error()) && VerifyOffset(verifier, VT_INFO) && @@ -2431,6 +2437,9 @@ struct ExecResultRawBuilder { void add_output(flatbuffers::Offset> output) { fbb_.AddOffset(ExecResultRaw::VT_OUTPUT, output); } + void add_hanged(bool hanged) { + fbb_.AddElement(ExecResultRaw::VT_HANGED, static_cast(hanged), 0); + } void add_error(flatbuffers::Offset error) { fbb_.AddOffset(ExecResultRaw::VT_ERROR, error); } @@ -2453,6 +2462,7 @@ inline flatbuffers::Offset CreateExecResultRaw( int64_t id = 0, int32_t proc = 0, flatbuffers::Offset> output = 0, + bool hanged = false, flatbuffers::Offset error = 0, flatbuffers::Offset info = 0) { ExecResultRawBuilder builder_(_fbb); @@ -2461,6 +2471,7 @@ inline flatbuffers::Offset CreateExecResultRaw( builder_.add_error(error); builder_.add_output(output); builder_.add_proc(proc); + builder_.add_hanged(hanged); return builder_.Finish(); } @@ -2469,6 +2480,7 @@ inline flatbuffers::Offset CreateExecResultRawDirect( int64_t id = 0, int32_t proc = 0, const std::vector *output = nullptr, + bool hanged = false, const char *error = nullptr, flatbuffers::Offset info = 0) { auto output__ = output ? _fbb.CreateVector(*output) : 0; @@ -2478,6 +2490,7 @@ inline flatbuffers::Offset CreateExecResultRawDirect( id, proc, output__, + hanged, error__, info); } @@ -3459,6 +3472,7 @@ inline ExecResultRawT::ExecResultRawT(const ExecResultRawT &o) : id(o.id), proc(o.proc), output(o.output), + hanged(o.hanged), error(o.error), info((o.info) ? new rpc::ProgInfoRawT(*o.info) : nullptr) { } @@ -3467,6 +3481,7 @@ inline ExecResultRawT &ExecResultRawT::operator=(ExecResultRawT o) FLATBUFFERS_N std::swap(id, o.id); std::swap(proc, o.proc); std::swap(output, o.output); + std::swap(hanged, o.hanged); std::swap(error, o.error); std::swap(info, o.info); return *this; @@ -3484,6 +3499,7 @@ inline void ExecResultRaw::UnPackTo(ExecResultRawT *_o, const flatbuffers::resol { auto _e = id(); _o->id = _e; } { auto _e = proc(); _o->proc = _e; } { auto _e = output(); if (_e) { _o->output.resize(_e->size()); std::copy(_e->begin(), _e->end(), _o->output.begin()); } } + { auto _e = hanged(); _o->hanged = _e; } { auto _e = error(); if (_e) _o->error = _e->str(); } { auto _e = info(); if (_e) _o->info = std::unique_ptr(_e->UnPack(_resolver)); } } @@ -3499,6 +3515,7 @@ inline flatbuffers::Offset CreateExecResultRaw(flatbuffers::FlatB auto _id = _o->id; auto _proc = _o->proc; auto _output = _o->output.size() ? _fbb.CreateVector(_o->output) : 0; + auto _hanged = _o->hanged; auto _error = _o->error.empty() ? 0 : _fbb.CreateString(_o->error); auto _info = _o->info ? CreateProgInfoRaw(_fbb, _o->info.get(), _rehasher) : 0; return rpc::CreateExecResultRaw( @@ -3506,6 +3523,7 @@ inline flatbuffers::Offset CreateExecResultRaw(flatbuffers::FlatB _id, _proc, _output, + _hanged, _error, _info); } diff --git a/pkg/fuzzer/fuzzer.go b/pkg/fuzzer/fuzzer.go index c5f5526ea..1f2fd90a8 100644 --- a/pkg/fuzzer/fuzzer.go +++ b/pkg/fuzzer/fuzzer.go @@ -122,13 +122,14 @@ func (fuzzer *Fuzzer) enqueue(executor queue.Executor, req *queue.Request, flags } func (fuzzer *Fuzzer) processResult(req *queue.Request, res *queue.Result, flags ProgFlags, attempt int) bool { - inTriage := flags&progInTriage > 0 + // If we are already triaging this exact prog, this is flaky coverage. + // Hanged programs are harmful as they consume executor procs. + dontTriage := flags&progInTriage > 0 || res.Status == queue.Hanged // Triage the program. // We do it before unblocking the waiting threads because // it may result it concurrent modification of req.Prog. - // If we are already triaging this exact prog, this is flaky coverage. var triage map[int]*triageCall - if req.ExecOpts.ExecFlags&flatrpc.ExecFlagCollectSignal > 0 && res.Info != nil && !inTriage { + if req.ExecOpts.ExecFlags&flatrpc.ExecFlagCollectSignal > 0 && res.Info != nil && !dontTriage { for call, info := range res.Info.Calls { fuzzer.triageProgCall(req.Prog, info, call, &triage) } @@ -168,7 +169,7 @@ func (fuzzer *Fuzzer) processResult(req *queue.Request, res *queue.Result, flags // In non-snapshot mode usually we are not sure which exactly input caused the crash, // so give it one more chance. In snapshot mode we know for sure, so don't retry. maxCandidateAttempts = 2 - if fuzzer.Config.Snapshot { + if fuzzer.Config.Snapshot || res.Status == queue.Hanged { maxCandidateAttempts = 0 } } diff --git a/pkg/fuzzer/queue/queue.go b/pkg/fuzzer/queue/queue.go index cbdb2ba19..0a56c76d3 100644 --- a/pkg/fuzzer/queue/queue.go +++ b/pkg/fuzzer/queue/queue.go @@ -161,7 +161,14 @@ func (r *Result) clone() *Result { } func (r *Result) Stop() bool { - return r.Status == ExecFailure || r.Status == Crashed + switch r.Status { + case Success, Restarted: + return false + case ExecFailure, Crashed, Hanged: + return true + default: + panic(fmt.Sprintf("unhandled status %v", r.Status)) + } } type Status int @@ -171,6 +178,7 @@ const ( ExecFailure // For e.g. serialization errors. Crashed // The VM crashed holding the request. Restarted // The VM was restarted holding the request. + Hanged // The program has hanged (can't be killed/waited). ) // Executor describes the interface wanted by the producers of requests. diff --git a/pkg/fuzzer/queue/retry.go b/pkg/fuzzer/queue/retry.go index c59a2c048..186850b5b 100644 --- a/pkg/fuzzer/queue/retry.go +++ b/pkg/fuzzer/queue/retry.go @@ -3,6 +3,10 @@ package queue +import ( + "fmt" +) + type retryer struct { pq *PlainQueue base Source @@ -28,16 +32,22 @@ func (r *retryer) Next() *Request { } func (r *retryer) done(req *Request, res *Result) bool { - // The input was on a restarted VM. - if res.Status == Restarted { - r.pq.Submit(req) - return false - } - // Retry important requests from crashed VMs once. - if res.Status == Crashed && req.Important && !req.onceCrashed { - req.onceCrashed = true + switch res.Status { + case Success, ExecFailure, Hanged: + return true + case Restarted: + // The input was on a restarted VM. r.pq.Submit(req) return false + case Crashed: + // Retry important requests from crashed VMs once. + if req.Important && !req.onceCrashed { + req.onceCrashed = true + r.pq.Submit(req) + return false + } + return true + default: + panic(fmt.Sprintf("unhandled status %v", res.Status)) } - return true } diff --git a/pkg/repro/repro.go b/pkg/repro/repro.go index d2076ffbf..67171a368 100644 --- a/pkg/repro/repro.go +++ b/pkg/repro/repro.go @@ -262,6 +262,8 @@ func (ctx *reproContext) extractProg(entries []*prog.LogEntry) (*Result, error) var toTest []*prog.LogEntry if ctx.crashExecutor != nil { for _, entry := range entries { + // Note: we don't check ProcID b/c hanged programs are assigned fake unique proc IDs + // that don't match "Comm" in the kernel panic message. if entry.ID == ctx.crashExecutor.ExecID { toTest = append(toTest, entry) ctx.reproLogf(3, "first checking the prog from the crash report") diff --git a/pkg/rpcserver/last_executing.go b/pkg/rpcserver/last_executing.go index 29b854df2..6064cfe14 100644 --- a/pkg/rpcserver/last_executing.go +++ b/pkg/rpcserver/last_executing.go @@ -10,6 +10,7 @@ import ( "time" "github.com/google/syzkaller/pkg/report" + "github.com/google/syzkaller/prog" ) // LastExecuting keeps the given number of last executed programs @@ -17,6 +18,7 @@ import ( type LastExecuting struct { count int procs []ExecRecord + hanged []ExecRecord // hanged programs, kept forever positions []int } @@ -36,12 +38,12 @@ func MakeLastExecuting(procs, count int) *LastExecuting { } // Note execution of the 'prog' on 'proc' at time 'now'. -func (last *LastExecuting) Note(id, proc int, prog []byte, now time.Duration) { +func (last *LastExecuting) Note(id, proc int, progData []byte, now time.Duration) { pos := &last.positions[proc] last.procs[proc*last.count+*pos] = ExecRecord{ ID: id, Proc: proc, - Prog: prog, + Prog: progData, Time: now, } *pos++ @@ -50,13 +52,26 @@ func (last *LastExecuting) Note(id, proc int, prog []byte, now time.Duration) { } } +// Note a hanged program. +func (last *LastExecuting) Hanged(id, proc int, progData []byte, now time.Duration) { + last.hanged = append(last.hanged, ExecRecord{ + ID: id, + // Use unique proc for these programs b/c pkg/repro will either use the program with matching ID, + // of take the last program from each proc, and we want the hanged programs to be included. + Proc: prog.MaxPids + len(last.hanged), + Prog: progData, + Time: now, + }) +} + // Returns a sorted set of last executing programs. // The records are sorted by time in ascending order. // ExecRecord.Time is the difference in start executing time between this // program and the program that started executing last. func (last *LastExecuting) Collect() []ExecRecord { - procs := last.procs + procs := append(last.procs, last.hanged...) last.procs = nil // The type must not be used after this. + last.hanged = nil sort.Slice(procs, func(i, j int) bool { return procs[i].Time < procs[j].Time }) diff --git a/pkg/rpcserver/last_executing_test.go b/pkg/rpcserver/last_executing_test.go index c9f3cc2bf..4384e9a2b 100644 --- a/pkg/rpcserver/last_executing_test.go +++ b/pkg/rpcserver/last_executing_test.go @@ -15,7 +15,7 @@ func TestLastExecutingEmpty(t *testing.T) { } func TestLastExecuting(t *testing.T) { - last := MakeLastExecuting(10, 3) + last := MakeLastExecuting(21, 3) last.Note(1, 0, []byte("prog1"), 1) last.Note(2, 1, []byte("prog2"), 2) @@ -54,3 +54,25 @@ func TestLastExecuting(t *testing.T) { {ID: 13, Proc: 8, Prog: []byte("prog13"), Time: 0}, }) } + +func TestLastExecutingHanged(t *testing.T) { + last := MakeLastExecuting(1, 3) + last.Note(1, 0, []byte("prog1"), 10) + last.Note(2, 0, []byte("prog2"), 20) + last.Hanged(2, 0, []byte("prog2"), 25) + last.Note(3, 0, []byte("prog3"), 30) + last.Note(4, 0, []byte("prog4"), 40) + last.Note(5, 0, []byte("prog5"), 50) + last.Hanged(5, 0, []byte("prog5"), 55) + last.Note(6, 0, []byte("prog6"), 60) + last.Note(7, 0, []byte("prog7"), 70) + last.Note(8, 0, []byte("prog8"), 80) + last.Note(9, 0, []byte("prog9"), 90) + assert.Equal(t, last.Collect(), []ExecRecord{ + {ID: 2, Proc: 32, Prog: []byte("prog2"), Time: 65}, + {ID: 5, Proc: 33, Prog: []byte("prog5"), Time: 35}, + {ID: 7, Proc: 0, Prog: []byte("prog7"), Time: 20}, + {ID: 8, Proc: 0, Prog: []byte("prog8"), Time: 10}, + {ID: 9, Proc: 0, Prog: []byte("prog9"), Time: 0}, + }) +} diff --git a/pkg/rpcserver/rpcserver.go b/pkg/rpcserver/rpcserver.go index 4a0587c53..53181dd2b 100644 --- a/pkg/rpcserver/rpcserver.go +++ b/pkg/rpcserver/rpcserver.go @@ -416,11 +416,13 @@ func (serv *server) CreateInstance(id int, injectExec chan<- bool, updInfo dispa infoc: make(chan chan []byte), requests: make(map[int64]*queue.Request), executing: make(map[int64]bool), - lastExec: MakeLastExecuting(serv.cfg.Procs, 6), - stats: serv.runnerStats, - procs: serv.cfg.Procs, - updInfo: updInfo, - resultCh: make(chan error, 1), + hanged: make(map[int64]bool), + // Executor may report proc IDs that are larger than serv.cfg.Procs. + lastExec: MakeLastExecuting(prog.MaxPids, 6), + stats: serv.runnerStats, + procs: serv.cfg.Procs, + updInfo: updInfo, + resultCh: make(chan error, 1), } serv.mu.Lock() defer serv.mu.Unlock() diff --git a/pkg/rpcserver/runner.go b/pkg/rpcserver/runner.go index 45b376a79..1af35b6d2 100644 --- a/pkg/rpcserver/runner.go +++ b/pkg/rpcserver/runner.go @@ -42,6 +42,7 @@ type Runner struct { nextRequestID int64 requests map[int64]*queue.Request executing map[int64]bool + hanged map[int64]bool lastExec *LastExecuting updInfo dispatcher.UpdateInfo resultCh chan error @@ -343,10 +344,13 @@ func (runner *Runner) sendRequest(req *queue.Request) error { func (runner *Runner) handleExecutingMessage(msg *flatrpc.ExecutingMessage) error { req := runner.requests[msg.Id] if req == nil { + if runner.hanged[msg.Id] { + return nil + } return fmt.Errorf("can't find executing request %v", msg.Id) } proc := int(msg.ProcId) - if proc < 0 || proc >= runner.procs { + if proc < 0 || proc >= prog.MaxPids { return fmt.Errorf("got bad proc id %v", proc) } runner.stats.statExecs.Add(1) @@ -372,6 +376,14 @@ func (runner *Runner) handleExecutingMessage(msg *flatrpc.ExecutingMessage) erro func (runner *Runner) handleExecResult(msg *flatrpc.ExecResult) error { req := runner.requests[msg.Id] if req == nil { + if runner.hanged[msg.Id] { + // Got result for a program that was previously reported hanged + // (probably execution was just extremely slow). Can't report result + // to pkg/fuzzer since it already handled completion of the request, + // but shouldn't report an error and crash the VM as well. + delete(runner.hanged, msg.Id) + return nil + } return fmt.Errorf("can't find executed request %v", msg.Id) } delete(runner.requests, msg.Id) @@ -410,6 +422,10 @@ func (runner *Runner) handleExecResult(msg *flatrpc.ExecResult) error { if msg.Error != "" { status = queue.ExecFailure resErr = errors.New(msg.Error) + } else if msg.Hanged { + status = queue.Hanged + runner.lastExec.Hanged(int(msg.Id), int(msg.Proc), req.Prog.Serialize(), osutil.MonotonicNano()) + runner.hanged[msg.Id] = true } req.Done(&queue.Result{ Executor: queue.ExecutorID{ -- cgit mrf-deployment