From 137fdde817df60cccfeb9a40237533e48bc143ec Mon Sep 17 00:00:00 2001 From: Dmitry Vyukov Date: Mon, 1 Jul 2024 14:26:06 +0200 Subject: executor: restart procs more deterministically Currently we force restart in rpcserver, but this has 2 problems: 1. It does not know the proc where the requets will land. 2. It does not take into account if the proc has already restarted recently for other reasons. Restart procs in executor only if they haven't restarted recenlty. Also make it deterministic. Given all other randomess we have, there does not seem to be a reason to use randomized restarts and restart after fewer/more runs. Also restart only after corpus triage. Corpus triage is slow already and there does not seem to be enough benefit to restart during corpus triage. Also restart at most 1 proc at a time, since there are lots of serial work in the kernel. --- executor/executor_runner.h | 50 +++++++++++++++++++++++++++++++++++++--------- 1 file changed, 41 insertions(+), 9 deletions(-) (limited to 'executor/executor_runner.h') diff --git a/executor/executor_runner.h b/executor/executor_runner.h index 5229fa1a7..14167f361 100644 --- a/executor/executor_runner.h +++ b/executor/executor_runner.h @@ -32,11 +32,13 @@ inline std::ostream& operator<<(std::ostream& ss, const rpc::ExecRequestRawT& re class Proc { public: - Proc(Connection& conn, const char* bin, int id, int max_signal_fd, int cover_filter_fd, + Proc(Connection& conn, const char* bin, int id, int& restarting, const bool& corpus_triaged, int max_signal_fd, int cover_filter_fd, bool use_cover_edges, bool is_kernel_64_bit, uint32 slowdown, uint32 syscall_timeout_ms, uint32 program_timeout_ms) : conn_(conn), bin_(bin), id_(id), + restarting_(restarting), + corpus_triaged_(corpus_triaged), max_signal_fd_(max_signal_fd), cover_filter_fd_(cover_filter_fd), use_cover_edges_(use_cover_edges), @@ -61,8 +63,10 @@ public: fail("already have pending msg"); if (wait_start_) wait_end_ = current_time_ms(); - if (state_ == State::Idle && - (exec_env_ != msg.exec_opts->env_flags() || sandbox_arg_ != msg.exec_opts->sandbox_arg())) + // Restart every once in a while to not let too much state accumulate. + constexpr uint64 kRestartEvery = 600; + if (state_ == State::Idle && ((corpus_triaged_ && restarting_ == 0 && freshness_ >= kRestartEvery) || + exec_env_ != msg.exec_opts->env_flags() || sandbox_arg_ != msg.exec_opts->sandbox_arg())) Restart(); attempts_ = 0; msg_ = std::move(msg); @@ -131,6 +135,8 @@ private: Connection& conn_; const char* const bin_; const int id_; + int& restarting_; + const bool& corpus_triaged_; const int max_signal_fd_; const int cover_filter_fd_; const bool use_cover_edges_; @@ -170,6 +176,15 @@ private: return ss; } + void ChangeState(State state) + { + if (state_ == State::Handshaking) + restarting_--; + if (state == State::Handshaking) + restarting_++; + state_ = state; + } + void Restart() { debug("proc %d: restarting subprocess, current state %u attempts %llu\n", id_, state_, attempts_); @@ -217,7 +232,7 @@ private: void Start() { - state_ = State::Started; + ChangeState(State::Started); freshness_ = 0; int req_pipe[2]; if (pipe(req_pipe)) @@ -265,7 +280,7 @@ private: if (state_ != State::Started || !msg_) fail("wrong handshake state"); debug("proc %d: handshaking to execute request %llu\n", id_, static_cast(msg_->id)); - state_ = State::Handshaking; + ChangeState(State::Handshaking); exec_start_ = current_time_ms(); exec_env_ = msg_->exec_opts->env_flags() & ~rpc::ExecEnv::ResetState; sandbox_arg_ = msg_->exec_opts->sandbox_arg(); @@ -328,7 +343,7 @@ private: .all_extra_signal = all_extra_signal, }; exec_start_ = current_time_ms(); - state_ = State::Executing; + ChangeState(State::Executing); if (write(req_pipe_, &req, sizeof(req)) != sizeof(req)) { debug("request pipe write failed (errno=%d)\n", errno); Restart(); @@ -362,7 +377,7 @@ private: msg_.reset(); output_.clear(); debug_output_pos_ = 0; - state_ = State::Idle; + ChangeState(State::Idle); #if !SYZ_EXECUTOR_USES_FORK_SERVER if (process_) Restart(); @@ -385,7 +400,7 @@ private: failmsg("proc resp pipe read failed", "n=%zd", n); if (state_ == State::Handshaking) { debug("proc %d: got handshake reply\n", id_); - state_ = State::Idle; + ChangeState(State::Idle); Execute(); } else if (state_ == State::Executing) { debug("proc %d: got execute reply\n", id_); @@ -444,7 +459,7 @@ public: int max_signal_fd = max_signal_ ? max_signal_->FD() : -1; int cover_filter_fd = cover_filter_ ? cover_filter_->FD() : -1; for (size_t i = 0; i < num_procs; i++) - procs_.emplace_back(new Proc(conn, bin, i, max_signal_fd, cover_filter_fd, + procs_.emplace_back(new Proc(conn, bin, i, restarting_, corpus_triaged_, max_signal_fd, cover_filter_fd, use_cover_edges_, is_kernel_64_bit_, slowdown_, syscall_timeout_ms_, program_timeout_ms_)); for (;;) @@ -459,6 +474,8 @@ private: std::vector> procs_; std::deque requests_; std::vector leak_frames_; + int restarting_ = 0; + bool corpus_triaged_ = false; bool use_cover_edges_ = false; bool is_kernel_64_bit_ = false; uint32 slowdown_ = 0; @@ -467,6 +484,17 @@ private: friend std::ostream& operator<<(std::ostream& ss, const Runner& runner) { + ss << "name=" << runner.name_ + << " max_signal=" << !!runner.max_signal_ + << " cover_filter=" << !!runner.cover_filter_ + << " restarting=" << runner.restarting_ + << " corpus_triaged=" << runner.corpus_triaged_ + << " use_cover_edges=" << runner.use_cover_edges_ + << " is_kernel_64_bit=" << runner.is_kernel_64_bit_ + << " slowdown=" << runner.slowdown_ + << " syscall_timeout_ms=" << runner.syscall_timeout_ms_ + << " program_timeout_ms=" << runner.program_timeout_ms_ + << "\n"; ss << "procs:\n"; for (const auto& proc : runner.procs_) ss << *proc; @@ -509,6 +537,9 @@ private: requests_.pop_front(); } } + + if (restarting_ < 0 || restarting_ > static_cast(procs_.size())) + failmsg("bad restarting", "restarting=%d", restarting_); } size_t Handshake() @@ -627,6 +658,7 @@ private: { // TODO: repair leak checking (#4728). debug("recv corpus triaged\n"); + corpus_triaged_ = true; } void Handle(const rpc::StateRequestRawT& msg) -- cgit mrf-deployment