diff options
| -rw-r--r-- | executor/executor_runner.h | 50 | ||||
| -rw-r--r-- | pkg/rpcserver/rpcserver.go | 3 | ||||
| -rw-r--r-- | pkg/rpcserver/runner.go | 8 | ||||
| -rw-r--r-- | syz-manager/manager.go | 2 |
4 files changed, 42 insertions, 21 deletions
diff --git a/executor/executor_runner.h b/executor/executor_runner.h index 5229fa1a7..14167f361 100644 --- a/executor/executor_runner.h +++ b/executor/executor_runner.h @@ -32,11 +32,13 @@ inline std::ostream& operator<<(std::ostream& ss, const rpc::ExecRequestRawT& re class Proc { public: - Proc(Connection& conn, const char* bin, int id, int max_signal_fd, int cover_filter_fd, + Proc(Connection& conn, const char* bin, int id, int& restarting, const bool& corpus_triaged, int max_signal_fd, int cover_filter_fd, bool use_cover_edges, bool is_kernel_64_bit, uint32 slowdown, uint32 syscall_timeout_ms, uint32 program_timeout_ms) : conn_(conn), bin_(bin), id_(id), + restarting_(restarting), + corpus_triaged_(corpus_triaged), max_signal_fd_(max_signal_fd), cover_filter_fd_(cover_filter_fd), use_cover_edges_(use_cover_edges), @@ -61,8 +63,10 @@ public: fail("already have pending msg"); if (wait_start_) wait_end_ = current_time_ms(); - if (state_ == State::Idle && - (exec_env_ != msg.exec_opts->env_flags() || sandbox_arg_ != msg.exec_opts->sandbox_arg())) + // Restart every once in a while to not let too much state accumulate. + constexpr uint64 kRestartEvery = 600; + if (state_ == State::Idle && ((corpus_triaged_ && restarting_ == 0 && freshness_ >= kRestartEvery) || + exec_env_ != msg.exec_opts->env_flags() || sandbox_arg_ != msg.exec_opts->sandbox_arg())) Restart(); attempts_ = 0; msg_ = std::move(msg); @@ -131,6 +135,8 @@ private: Connection& conn_; const char* const bin_; const int id_; + int& restarting_; + const bool& corpus_triaged_; const int max_signal_fd_; const int cover_filter_fd_; const bool use_cover_edges_; @@ -170,6 +176,15 @@ private: return ss; } + void ChangeState(State state) + { + if (state_ == State::Handshaking) + restarting_--; + if (state == State::Handshaking) + restarting_++; + state_ = state; + } + void Restart() { debug("proc %d: restarting subprocess, current state %u attempts %llu\n", id_, state_, attempts_); @@ -217,7 +232,7 @@ private: void Start() { - state_ = State::Started; + ChangeState(State::Started); freshness_ = 0; int req_pipe[2]; if (pipe(req_pipe)) @@ -265,7 +280,7 @@ private: if (state_ != State::Started || !msg_) fail("wrong handshake state"); debug("proc %d: handshaking to execute request %llu\n", id_, static_cast<uint64>(msg_->id)); - state_ = State::Handshaking; + ChangeState(State::Handshaking); exec_start_ = current_time_ms(); exec_env_ = msg_->exec_opts->env_flags() & ~rpc::ExecEnv::ResetState; sandbox_arg_ = msg_->exec_opts->sandbox_arg(); @@ -328,7 +343,7 @@ private: .all_extra_signal = all_extra_signal, }; exec_start_ = current_time_ms(); - state_ = State::Executing; + ChangeState(State::Executing); if (write(req_pipe_, &req, sizeof(req)) != sizeof(req)) { debug("request pipe write failed (errno=%d)\n", errno); Restart(); @@ -362,7 +377,7 @@ private: msg_.reset(); output_.clear(); debug_output_pos_ = 0; - state_ = State::Idle; + ChangeState(State::Idle); #if !SYZ_EXECUTOR_USES_FORK_SERVER if (process_) Restart(); @@ -385,7 +400,7 @@ private: failmsg("proc resp pipe read failed", "n=%zd", n); if (state_ == State::Handshaking) { debug("proc %d: got handshake reply\n", id_); - state_ = State::Idle; + ChangeState(State::Idle); Execute(); } else if (state_ == State::Executing) { debug("proc %d: got execute reply\n", id_); @@ -444,7 +459,7 @@ public: int max_signal_fd = max_signal_ ? max_signal_->FD() : -1; int cover_filter_fd = cover_filter_ ? cover_filter_->FD() : -1; for (size_t i = 0; i < num_procs; i++) - procs_.emplace_back(new Proc(conn, bin, i, max_signal_fd, cover_filter_fd, + procs_.emplace_back(new Proc(conn, bin, i, restarting_, corpus_triaged_, max_signal_fd, cover_filter_fd, use_cover_edges_, is_kernel_64_bit_, slowdown_, syscall_timeout_ms_, program_timeout_ms_)); for (;;) @@ -459,6 +474,8 @@ private: std::vector<std::unique_ptr<Proc>> procs_; std::deque<rpc::ExecRequestRawT> requests_; std::vector<std::string> leak_frames_; + int restarting_ = 0; + bool corpus_triaged_ = false; bool use_cover_edges_ = false; bool is_kernel_64_bit_ = false; uint32 slowdown_ = 0; @@ -467,6 +484,17 @@ private: friend std::ostream& operator<<(std::ostream& ss, const Runner& runner) { + ss << "name=" << runner.name_ + << " max_signal=" << !!runner.max_signal_ + << " cover_filter=" << !!runner.cover_filter_ + << " restarting=" << runner.restarting_ + << " corpus_triaged=" << runner.corpus_triaged_ + << " use_cover_edges=" << runner.use_cover_edges_ + << " is_kernel_64_bit=" << runner.is_kernel_64_bit_ + << " slowdown=" << runner.slowdown_ + << " syscall_timeout_ms=" << runner.syscall_timeout_ms_ + << " program_timeout_ms=" << runner.program_timeout_ms_ + << "\n"; ss << "procs:\n"; for (const auto& proc : runner.procs_) ss << *proc; @@ -509,6 +537,9 @@ private: requests_.pop_front(); } } + + if (restarting_ < 0 || restarting_ > static_cast<int>(procs_.size())) + failmsg("bad restarting", "restarting=%d", restarting_); } size_t Handshake() @@ -627,6 +658,7 @@ private: { // TODO: repair leak checking (#4728). debug("recv corpus triaged\n"); + corpus_triaged_ = true; } void Handle(const rpc::StateRequestRawT& msg) diff --git a/pkg/rpcserver/rpcserver.go b/pkg/rpcserver/rpcserver.go index da3a0f076..dd11fe0f8 100644 --- a/pkg/rpcserver/rpcserver.go +++ b/pkg/rpcserver/rpcserver.go @@ -8,13 +8,11 @@ import ( "context" "errors" "fmt" - "math/rand" "slices" "sort" "strings" "sync" "sync/atomic" - "time" "github.com/google/syzkaller/pkg/cover" "github.com/google/syzkaller/pkg/cover/backend" @@ -404,7 +402,6 @@ func (serv *Server) CreateInstance(name string, injectExec chan<- bool, updInfo requests: make(map[int64]*queue.Request), executing: make(map[int64]bool), lastExec: MakeLastExecuting(serv.cfg.Procs, 6), - rnd: rand.New(rand.NewSource(time.Now().UnixNano())), stats: serv.runnerStats, procs: serv.cfg.Procs, updInfo: updInfo, diff --git a/pkg/rpcserver/runner.go b/pkg/rpcserver/runner.go index f14a2ec29..6100c94f7 100644 --- a/pkg/rpcserver/runner.go +++ b/pkg/rpcserver/runner.go @@ -7,7 +7,6 @@ import ( "bytes" "errors" "fmt" - "math/rand" "os" "slices" "sync" @@ -43,7 +42,6 @@ type Runner struct { requests map[int64]*queue.Request executing map[int64]bool lastExec *LastExecuting - rnd *rand.Rand updInfo dispatcher.UpdateInfo resultCh chan error @@ -287,13 +285,7 @@ func (runner *Runner) sendRequest(req *queue.Request) error { for i, call := range req.ReturnAllSignal { allSignal[i] = int32(call) } - // Do not let too much state accumulate. - const restartIn = 600 - resetFlags := flatrpc.ExecFlagCollectSignal | flatrpc.ExecFlagCollectCover | flatrpc.ExecFlagCollectComps opts := req.ExecOpts - if req.ExecOpts.ExecFlags&resetFlags != 0 && runner.rnd.Intn(restartIn) == 0 { - opts.EnvFlags |= flatrpc.ExecEnvResetState - } if runner.debug { opts.EnvFlags |= flatrpc.ExecEnvDebug } diff --git a/syz-manager/manager.go b/syz-manager/manager.go index 15d76dd4e..fe5e8f8f1 100644 --- a/syz-manager/manager.go +++ b/syz-manager/manager.go @@ -1502,7 +1502,7 @@ func (mgr *Manager) fuzzerLoop(fuzzer *fuzzer.Fuzzer) { } mgr.mu.Lock() if mgr.phase == phaseLoadedCorpus { - if !mgr.cfg.Snapshot && mgr.enabledFeatures&flatrpc.FeatureLeak != 0 { + if !mgr.cfg.Snapshot { mgr.serv.TriagedCorpus() } if mgr.cfg.HubClient != "" { |
