From 9fc8fe026baab9959459256f2d47f4bbf21d405a Mon Sep 17 00:00:00 2001 From: Dmitry Vyukov Date: Mon, 21 Oct 2024 11:53:44 +0200 Subject: executor: better handling for hanged test processes Currently we kill hanged processes and consider the corresponding test finished. We don't kill/wait for the actual test subprocess (we don't know its pid to kill, and waiting will presumably hang). This has 2 problems: 1. If the hanged process causes "task hung" report, we can't reproduce it, since the test finished too long ago (manager thinks its finished and discards the request). 2. The test process still consumed per-pid resources. Explicitly detect and handle such cases: Manager keeps these hanged tests forever, and we assign a new proc id for future processes (don't reuse the hanged one). --- executor/executor_runner.h | 82 +++++++++++++++++++++++++++++++++++++++------- 1 file changed, 70 insertions(+), 12 deletions(-) (limited to 'executor/executor_runner.h') diff --git a/executor/executor_runner.h b/executor/executor_runner.h index 30a5f3a6d..5680f8fc2 100644 --- a/executor/executor_runner.h +++ b/executor/executor_runner.h @@ -27,16 +27,62 @@ inline std::ostream& operator<<(std::ostream& ss, const rpc::ExecRequestRawT& re << "\n"; } +// ProcIDPool allows to reuse a set of unique proc IDs across a set of subprocesses. +// +// When a subprocess hangs, it's a bit unclear what to do (we don't have means to kill +// the whole tree of its children, and waiting for all them will presumably hang as well). +// Later there may appear a "task hung" report from the kernel, so we don't want to terminate +// the VM immidiatly. But the "task hung" report may also not appear at all, so we can't +// just wait for a hanged subprocesses forever. +// +// So in that case we kill/wait just the top subprocesses, and give it a new proc ID +// (since some resources associated with the old proc ID may still be used by the old +// unterminated test processes). However, we don't have infinite number of proc IDs, +// so we recycle them in FIFO order. This is not ideal, but it looks like the best +// practical solution. +class ProcIDPool +{ +public: + ProcIDPool(int num_procs) + { + // Theoretically we have 32 procs (prog.MaxPids), but there are some limitations in descriptions + // that make them work well only for up to 10 procs. For example, we form /dev/loopN + // device name using proc['0', 1, int8]. When these limitations are fixed, + // we can use all 32 here (prog.MaxPids) + constexpr int kNumGoodProcs = 10; + for (int i = 0; i < std::max(num_procs, kNumGoodProcs); i++) + ids_.push_back(i); + } + + int Alloc(int old = -1) + { + if (old >= 0) + ids_.push_back(old); + if (ids_.empty()) + fail("out of proc ids"); + int id = ids_.front(); + ids_.pop_front(); + return id; + } + +private: + std::deque ids_; + + ProcIDPool(const ProcIDPool&) = delete; + ProcIDPool& operator=(const ProcIDPool&) = delete; +}; + // Proc represents one subprocess that runs tests (re-execed syz-executor with 'exec' argument). // The object is persistent and re-starts subprocess when it crashes. class Proc { public: - Proc(Connection& conn, const char* bin, int id, int& restarting, const bool& corpus_triaged, int max_signal_fd, int cover_filter_fd, + Proc(Connection& conn, const char* bin, int id, ProcIDPool& proc_id_pool, int& restarting, const bool& corpus_triaged, int max_signal_fd, int cover_filter_fd, bool use_cover_edges, bool is_kernel_64_bit, uint32 slowdown, uint32 syscall_timeout_ms, uint32 program_timeout_ms) : conn_(conn), bin_(bin), - id_(id), + proc_id_pool_(proc_id_pool), + id_(proc_id_pool.Alloc()), restarting_(restarting), corpus_triaged_(corpus_triaged), max_signal_fd_(max_signal_fd), @@ -92,7 +138,7 @@ public: // fork server is enabled, so we use quite large timeout. Child process can be slow // due to global locks in namespaces and other things, so let's better wait than // report false misleading crashes. - uint64 timeout = 2 * program_timeout_ms_; + uint64 timeout = 3 * program_timeout_ms_; #else uint64 timeout = program_timeout_ms_; #endif @@ -134,7 +180,8 @@ private: Connection& conn_; const char* const bin_; - const int id_; + ProcIDPool& proc_id_pool_; + int id_; int& restarting_; const bool& corpus_triaged_; const int max_signal_fd_; @@ -215,7 +262,15 @@ private: while (ReadOutput()) ; } - HandleCompletion(status); + bool hanged = SYZ_EXECUTOR_USES_FORK_SERVER && state_ == State::Executing; + HandleCompletion(status, hanged); + if (hanged) { + // If the process has hanged, it may still be using per-proc resources, + // so allocate a fresh proc id. + int new_id = proc_id_pool_.Alloc(id_); + debug("proc %d: changing proc id to %d\n", id_, new_id); + id_ = new_id; + } } else if (attempts_ > 3) sleep_ms(100 * attempts_); Start(); @@ -350,7 +405,7 @@ private: } } - void HandleCompletion(uint32 status) + void HandleCompletion(uint32 status, bool hanged = false) { if (!msg_) fail("don't have executed msg"); @@ -370,7 +425,7 @@ private: } } uint32 num_calls = read_input(&prog_data); - auto data = finish_output(resp_mem_, id_, msg_->id, num_calls, elapsed, freshness_++, status, output); + auto data = finish_output(resp_mem_, id_, msg_->id, num_calls, elapsed, freshness_++, status, hanged, output); conn_.Send(data.data(), data.size()); resp_mem_->Reset(); @@ -458,12 +513,14 @@ public: : conn_(conn), vm_index_(vm_index) { - size_t num_procs = Handshake(); + int num_procs = Handshake(); + proc_id_pool_.emplace(num_procs); int max_signal_fd = max_signal_ ? max_signal_->FD() : -1; int cover_filter_fd = cover_filter_ ? cover_filter_->FD() : -1; - for (size_t i = 0; i < num_procs; i++) - procs_.emplace_back(new Proc(conn, bin, i, restarting_, corpus_triaged_, max_signal_fd, cover_filter_fd, - use_cover_edges_, is_kernel_64_bit_, slowdown_, syscall_timeout_ms_, program_timeout_ms_)); + for (int i = 0; i < num_procs; i++) + procs_.emplace_back(new Proc(conn, bin, i, *proc_id_pool_, restarting_, corpus_triaged_, + max_signal_fd, cover_filter_fd, use_cover_edges_, is_kernel_64_bit_, slowdown_, + syscall_timeout_ms_, program_timeout_ms_)); for (;;) Loop(); @@ -474,6 +531,7 @@ private: const int vm_index_; std::optional max_signal_; std::optional cover_filter_; + std::optional proc_id_pool_; std::vector> procs_; std::deque requests_; std::vector leak_frames_; @@ -545,7 +603,7 @@ private: failmsg("bad restarting", "restarting=%d", restarting_); } - size_t Handshake() + int Handshake() { rpc::ConnectRequestRawT conn_req; conn_req.id = vm_index_; -- cgit mrf-deployment