diff options
| author | Dmitry Vyukov <dvyukov@google.com> | 2024-07-01 14:26:06 +0200 |
|---|---|---|
| committer | Dmitry Vyukov <dvyukov@google.com> | 2024-08-02 13:16:51 +0000 |
| commit | 137fdde817df60cccfeb9a40237533e48bc143ec (patch) | |
| tree | 01df640f5820f1bcc01596134210667f59be1555 /executor | |
| parent | 66fcb0a84fcd55ad8e1444cdd0bc0ad6592f7329 (diff) | |
executor: restart procs more deterministically
Currently we force restart in rpcserver, but this has 2 problems:
1. It does not know the proc where the requets will land.
2. It does not take into account if the proc has already restarted
recently for other reasons.
Restart procs in executor only if they haven't restarted recenlty.
Also make it deterministic. Given all other randomess we have,
there does not seem to be a reason to use randomized restarts
and restart after fewer/more runs.
Also restart only after corpus triage.
Corpus triage is slow already and there does not seem to be enough
benefit to restart during corpus triage.
Also restart at most 1 proc at a time,
since there are lots of serial work in the kernel.
Diffstat (limited to 'executor')
| -rw-r--r-- | executor/executor_runner.h | 50 |
1 files changed, 41 insertions, 9 deletions
diff --git a/executor/executor_runner.h b/executor/executor_runner.h index 5229fa1a7..14167f361 100644 --- a/executor/executor_runner.h +++ b/executor/executor_runner.h @@ -32,11 +32,13 @@ inline std::ostream& operator<<(std::ostream& ss, const rpc::ExecRequestRawT& re class Proc { public: - Proc(Connection& conn, const char* bin, int id, int max_signal_fd, int cover_filter_fd, + Proc(Connection& conn, const char* bin, int id, int& restarting, const bool& corpus_triaged, int max_signal_fd, int cover_filter_fd, bool use_cover_edges, bool is_kernel_64_bit, uint32 slowdown, uint32 syscall_timeout_ms, uint32 program_timeout_ms) : conn_(conn), bin_(bin), id_(id), + restarting_(restarting), + corpus_triaged_(corpus_triaged), max_signal_fd_(max_signal_fd), cover_filter_fd_(cover_filter_fd), use_cover_edges_(use_cover_edges), @@ -61,8 +63,10 @@ public: fail("already have pending msg"); if (wait_start_) wait_end_ = current_time_ms(); - if (state_ == State::Idle && - (exec_env_ != msg.exec_opts->env_flags() || sandbox_arg_ != msg.exec_opts->sandbox_arg())) + // Restart every once in a while to not let too much state accumulate. + constexpr uint64 kRestartEvery = 600; + if (state_ == State::Idle && ((corpus_triaged_ && restarting_ == 0 && freshness_ >= kRestartEvery) || + exec_env_ != msg.exec_opts->env_flags() || sandbox_arg_ != msg.exec_opts->sandbox_arg())) Restart(); attempts_ = 0; msg_ = std::move(msg); @@ -131,6 +135,8 @@ private: Connection& conn_; const char* const bin_; const int id_; + int& restarting_; + const bool& corpus_triaged_; const int max_signal_fd_; const int cover_filter_fd_; const bool use_cover_edges_; @@ -170,6 +176,15 @@ private: return ss; } + void ChangeState(State state) + { + if (state_ == State::Handshaking) + restarting_--; + if (state == State::Handshaking) + restarting_++; + state_ = state; + } + void Restart() { debug("proc %d: restarting subprocess, current state %u attempts %llu\n", id_, state_, attempts_); @@ -217,7 +232,7 @@ private: void Start() { - state_ = State::Started; + ChangeState(State::Started); freshness_ = 0; int req_pipe[2]; if (pipe(req_pipe)) @@ -265,7 +280,7 @@ private: if (state_ != State::Started || !msg_) fail("wrong handshake state"); debug("proc %d: handshaking to execute request %llu\n", id_, static_cast<uint64>(msg_->id)); - state_ = State::Handshaking; + ChangeState(State::Handshaking); exec_start_ = current_time_ms(); exec_env_ = msg_->exec_opts->env_flags() & ~rpc::ExecEnv::ResetState; sandbox_arg_ = msg_->exec_opts->sandbox_arg(); @@ -328,7 +343,7 @@ private: .all_extra_signal = all_extra_signal, }; exec_start_ = current_time_ms(); - state_ = State::Executing; + ChangeState(State::Executing); if (write(req_pipe_, &req, sizeof(req)) != sizeof(req)) { debug("request pipe write failed (errno=%d)\n", errno); Restart(); @@ -362,7 +377,7 @@ private: msg_.reset(); output_.clear(); debug_output_pos_ = 0; - state_ = State::Idle; + ChangeState(State::Idle); #if !SYZ_EXECUTOR_USES_FORK_SERVER if (process_) Restart(); @@ -385,7 +400,7 @@ private: failmsg("proc resp pipe read failed", "n=%zd", n); if (state_ == State::Handshaking) { debug("proc %d: got handshake reply\n", id_); - state_ = State::Idle; + ChangeState(State::Idle); Execute(); } else if (state_ == State::Executing) { debug("proc %d: got execute reply\n", id_); @@ -444,7 +459,7 @@ public: int max_signal_fd = max_signal_ ? max_signal_->FD() : -1; int cover_filter_fd = cover_filter_ ? cover_filter_->FD() : -1; for (size_t i = 0; i < num_procs; i++) - procs_.emplace_back(new Proc(conn, bin, i, max_signal_fd, cover_filter_fd, + procs_.emplace_back(new Proc(conn, bin, i, restarting_, corpus_triaged_, max_signal_fd, cover_filter_fd, use_cover_edges_, is_kernel_64_bit_, slowdown_, syscall_timeout_ms_, program_timeout_ms_)); for (;;) @@ -459,6 +474,8 @@ private: std::vector<std::unique_ptr<Proc>> procs_; std::deque<rpc::ExecRequestRawT> requests_; std::vector<std::string> leak_frames_; + int restarting_ = 0; + bool corpus_triaged_ = false; bool use_cover_edges_ = false; bool is_kernel_64_bit_ = false; uint32 slowdown_ = 0; @@ -467,6 +484,17 @@ private: friend std::ostream& operator<<(std::ostream& ss, const Runner& runner) { + ss << "name=" << runner.name_ + << " max_signal=" << !!runner.max_signal_ + << " cover_filter=" << !!runner.cover_filter_ + << " restarting=" << runner.restarting_ + << " corpus_triaged=" << runner.corpus_triaged_ + << " use_cover_edges=" << runner.use_cover_edges_ + << " is_kernel_64_bit=" << runner.is_kernel_64_bit_ + << " slowdown=" << runner.slowdown_ + << " syscall_timeout_ms=" << runner.syscall_timeout_ms_ + << " program_timeout_ms=" << runner.program_timeout_ms_ + << "\n"; ss << "procs:\n"; for (const auto& proc : runner.procs_) ss << *proc; @@ -509,6 +537,9 @@ private: requests_.pop_front(); } } + + if (restarting_ < 0 || restarting_ > static_cast<int>(procs_.size())) + failmsg("bad restarting", "restarting=%d", restarting_); } size_t Handshake() @@ -627,6 +658,7 @@ private: { // TODO: repair leak checking (#4728). debug("recv corpus triaged\n"); + corpus_triaged_ = true; } void Handle(const rpc::StateRequestRawT& msg) |
