aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--executor/executor_runner.h50
-rw-r--r--pkg/rpcserver/rpcserver.go3
-rw-r--r--pkg/rpcserver/runner.go8
-rw-r--r--syz-manager/manager.go2
4 files changed, 42 insertions, 21 deletions
diff --git a/executor/executor_runner.h b/executor/executor_runner.h
index 5229fa1a7..14167f361 100644
--- a/executor/executor_runner.h
+++ b/executor/executor_runner.h
@@ -32,11 +32,13 @@ inline std::ostream& operator<<(std::ostream& ss, const rpc::ExecRequestRawT& re
class Proc
{
public:
- Proc(Connection& conn, const char* bin, int id, int max_signal_fd, int cover_filter_fd,
+ Proc(Connection& conn, const char* bin, int id, int& restarting, const bool& corpus_triaged, int max_signal_fd, int cover_filter_fd,
bool use_cover_edges, bool is_kernel_64_bit, uint32 slowdown, uint32 syscall_timeout_ms, uint32 program_timeout_ms)
: conn_(conn),
bin_(bin),
id_(id),
+ restarting_(restarting),
+ corpus_triaged_(corpus_triaged),
max_signal_fd_(max_signal_fd),
cover_filter_fd_(cover_filter_fd),
use_cover_edges_(use_cover_edges),
@@ -61,8 +63,10 @@ public:
fail("already have pending msg");
if (wait_start_)
wait_end_ = current_time_ms();
- if (state_ == State::Idle &&
- (exec_env_ != msg.exec_opts->env_flags() || sandbox_arg_ != msg.exec_opts->sandbox_arg()))
+ // Restart every once in a while to not let too much state accumulate.
+ constexpr uint64 kRestartEvery = 600;
+ if (state_ == State::Idle && ((corpus_triaged_ && restarting_ == 0 && freshness_ >= kRestartEvery) ||
+ exec_env_ != msg.exec_opts->env_flags() || sandbox_arg_ != msg.exec_opts->sandbox_arg()))
Restart();
attempts_ = 0;
msg_ = std::move(msg);
@@ -131,6 +135,8 @@ private:
Connection& conn_;
const char* const bin_;
const int id_;
+ int& restarting_;
+ const bool& corpus_triaged_;
const int max_signal_fd_;
const int cover_filter_fd_;
const bool use_cover_edges_;
@@ -170,6 +176,15 @@ private:
return ss;
}
+ void ChangeState(State state)
+ {
+ if (state_ == State::Handshaking)
+ restarting_--;
+ if (state == State::Handshaking)
+ restarting_++;
+ state_ = state;
+ }
+
void Restart()
{
debug("proc %d: restarting subprocess, current state %u attempts %llu\n", id_, state_, attempts_);
@@ -217,7 +232,7 @@ private:
void Start()
{
- state_ = State::Started;
+ ChangeState(State::Started);
freshness_ = 0;
int req_pipe[2];
if (pipe(req_pipe))
@@ -265,7 +280,7 @@ private:
if (state_ != State::Started || !msg_)
fail("wrong handshake state");
debug("proc %d: handshaking to execute request %llu\n", id_, static_cast<uint64>(msg_->id));
- state_ = State::Handshaking;
+ ChangeState(State::Handshaking);
exec_start_ = current_time_ms();
exec_env_ = msg_->exec_opts->env_flags() & ~rpc::ExecEnv::ResetState;
sandbox_arg_ = msg_->exec_opts->sandbox_arg();
@@ -328,7 +343,7 @@ private:
.all_extra_signal = all_extra_signal,
};
exec_start_ = current_time_ms();
- state_ = State::Executing;
+ ChangeState(State::Executing);
if (write(req_pipe_, &req, sizeof(req)) != sizeof(req)) {
debug("request pipe write failed (errno=%d)\n", errno);
Restart();
@@ -362,7 +377,7 @@ private:
msg_.reset();
output_.clear();
debug_output_pos_ = 0;
- state_ = State::Idle;
+ ChangeState(State::Idle);
#if !SYZ_EXECUTOR_USES_FORK_SERVER
if (process_)
Restart();
@@ -385,7 +400,7 @@ private:
failmsg("proc resp pipe read failed", "n=%zd", n);
if (state_ == State::Handshaking) {
debug("proc %d: got handshake reply\n", id_);
- state_ = State::Idle;
+ ChangeState(State::Idle);
Execute();
} else if (state_ == State::Executing) {
debug("proc %d: got execute reply\n", id_);
@@ -444,7 +459,7 @@ public:
int max_signal_fd = max_signal_ ? max_signal_->FD() : -1;
int cover_filter_fd = cover_filter_ ? cover_filter_->FD() : -1;
for (size_t i = 0; i < num_procs; i++)
- procs_.emplace_back(new Proc(conn, bin, i, max_signal_fd, cover_filter_fd,
+ procs_.emplace_back(new Proc(conn, bin, i, restarting_, corpus_triaged_, max_signal_fd, cover_filter_fd,
use_cover_edges_, is_kernel_64_bit_, slowdown_, syscall_timeout_ms_, program_timeout_ms_));
for (;;)
@@ -459,6 +474,8 @@ private:
std::vector<std::unique_ptr<Proc>> procs_;
std::deque<rpc::ExecRequestRawT> requests_;
std::vector<std::string> leak_frames_;
+ int restarting_ = 0;
+ bool corpus_triaged_ = false;
bool use_cover_edges_ = false;
bool is_kernel_64_bit_ = false;
uint32 slowdown_ = 0;
@@ -467,6 +484,17 @@ private:
friend std::ostream& operator<<(std::ostream& ss, const Runner& runner)
{
+ ss << "name=" << runner.name_
+ << " max_signal=" << !!runner.max_signal_
+ << " cover_filter=" << !!runner.cover_filter_
+ << " restarting=" << runner.restarting_
+ << " corpus_triaged=" << runner.corpus_triaged_
+ << " use_cover_edges=" << runner.use_cover_edges_
+ << " is_kernel_64_bit=" << runner.is_kernel_64_bit_
+ << " slowdown=" << runner.slowdown_
+ << " syscall_timeout_ms=" << runner.syscall_timeout_ms_
+ << " program_timeout_ms=" << runner.program_timeout_ms_
+ << "\n";
ss << "procs:\n";
for (const auto& proc : runner.procs_)
ss << *proc;
@@ -509,6 +537,9 @@ private:
requests_.pop_front();
}
}
+
+ if (restarting_ < 0 || restarting_ > static_cast<int>(procs_.size()))
+ failmsg("bad restarting", "restarting=%d", restarting_);
}
size_t Handshake()
@@ -627,6 +658,7 @@ private:
{
// TODO: repair leak checking (#4728).
debug("recv corpus triaged\n");
+ corpus_triaged_ = true;
}
void Handle(const rpc::StateRequestRawT& msg)
diff --git a/pkg/rpcserver/rpcserver.go b/pkg/rpcserver/rpcserver.go
index da3a0f076..dd11fe0f8 100644
--- a/pkg/rpcserver/rpcserver.go
+++ b/pkg/rpcserver/rpcserver.go
@@ -8,13 +8,11 @@ import (
"context"
"errors"
"fmt"
- "math/rand"
"slices"
"sort"
"strings"
"sync"
"sync/atomic"
- "time"
"github.com/google/syzkaller/pkg/cover"
"github.com/google/syzkaller/pkg/cover/backend"
@@ -404,7 +402,6 @@ func (serv *Server) CreateInstance(name string, injectExec chan<- bool, updInfo
requests: make(map[int64]*queue.Request),
executing: make(map[int64]bool),
lastExec: MakeLastExecuting(serv.cfg.Procs, 6),
- rnd: rand.New(rand.NewSource(time.Now().UnixNano())),
stats: serv.runnerStats,
procs: serv.cfg.Procs,
updInfo: updInfo,
diff --git a/pkg/rpcserver/runner.go b/pkg/rpcserver/runner.go
index f14a2ec29..6100c94f7 100644
--- a/pkg/rpcserver/runner.go
+++ b/pkg/rpcserver/runner.go
@@ -7,7 +7,6 @@ import (
"bytes"
"errors"
"fmt"
- "math/rand"
"os"
"slices"
"sync"
@@ -43,7 +42,6 @@ type Runner struct {
requests map[int64]*queue.Request
executing map[int64]bool
lastExec *LastExecuting
- rnd *rand.Rand
updInfo dispatcher.UpdateInfo
resultCh chan error
@@ -287,13 +285,7 @@ func (runner *Runner) sendRequest(req *queue.Request) error {
for i, call := range req.ReturnAllSignal {
allSignal[i] = int32(call)
}
- // Do not let too much state accumulate.
- const restartIn = 600
- resetFlags := flatrpc.ExecFlagCollectSignal | flatrpc.ExecFlagCollectCover | flatrpc.ExecFlagCollectComps
opts := req.ExecOpts
- if req.ExecOpts.ExecFlags&resetFlags != 0 && runner.rnd.Intn(restartIn) == 0 {
- opts.EnvFlags |= flatrpc.ExecEnvResetState
- }
if runner.debug {
opts.EnvFlags |= flatrpc.ExecEnvDebug
}
diff --git a/syz-manager/manager.go b/syz-manager/manager.go
index 15d76dd4e..fe5e8f8f1 100644
--- a/syz-manager/manager.go
+++ b/syz-manager/manager.go
@@ -1502,7 +1502,7 @@ func (mgr *Manager) fuzzerLoop(fuzzer *fuzzer.Fuzzer) {
}
mgr.mu.Lock()
if mgr.phase == phaseLoadedCorpus {
- if !mgr.cfg.Snapshot && mgr.enabledFeatures&flatrpc.FeatureLeak != 0 {
+ if !mgr.cfg.Snapshot {
mgr.serv.TriagedCorpus()
}
if mgr.cfg.HubClient != "" {