From e47492d07d852b143ac4f7d33bb4dc4f900d2410 Mon Sep 17 00:00:00 2001 From: Dmitry Vyukov Date: Fri, 26 Jul 2024 12:00:54 +0200 Subject: pkg/rpcserver: use dense VM indices instead of string names Using actual VM indices for VM identification allows to match these indices to VMs in the pool, allows to use dense arrays to store information about runners (e.g. in queue.Distributor), and just removes string names as unnecessary additional entities. --- executor/executor_runner.h | 19 +++++++++------- pkg/flatrpc/conn_test.go | 4 ++-- pkg/flatrpc/flatrpc.fbs | 2 +- pkg/flatrpc/flatrpc.go | 21 ++++++++++-------- pkg/flatrpc/flatrpc.h | 30 ++++++++++++-------------- pkg/rpcserver/local.go | 8 +++---- pkg/rpcserver/rpcserver.go | 54 +++++++++++++++++++--------------------------- syz-manager/manager.go | 26 +++++++++++----------- 8 files changed, 78 insertions(+), 86 deletions(-) diff --git a/executor/executor_runner.h b/executor/executor_runner.h index 14167f361..f1a4f701d 100644 --- a/executor/executor_runner.h +++ b/executor/executor_runner.h @@ -451,9 +451,9 @@ private: class Runner { public: - Runner(Connection& conn, const char* name, const char* bin) + Runner(Connection& conn, int vm_index, const char* bin) : conn_(conn), - name_(name) + vm_index_(vm_index) { size_t num_procs = Handshake(); int max_signal_fd = max_signal_ ? max_signal_->FD() : -1; @@ -468,7 +468,7 @@ public: private: Connection& conn_; - const char* const name_; + const int vm_index_; std::optional max_signal_; std::optional cover_filter_; std::vector> procs_; @@ -484,7 +484,7 @@ private: friend std::ostream& operator<<(std::ostream& ss, const Runner& runner) { - ss << "name=" << runner.name_ + ss << "vm_index=" << runner.vm_index_ << " max_signal=" << !!runner.max_signal_ << " cover_filter=" << !!runner.cover_filter_ << " restarting=" << runner.restarting_ @@ -545,7 +545,7 @@ private: size_t Handshake() { rpc::ConnectRequestRawT conn_req; - conn_req.name = name_; + conn_req.id = vm_index_; conn_req.arch = GOARCH; conn_req.git_revision = GIT_REVISION; conn_req.syz_revision = SYZ_REVISION; @@ -800,8 +800,11 @@ static void FatalHandler(int sig, siginfo_t* info, void* ucontext) static void runner(char** argv, int argc) { if (argc != 5) - fail("usage: syz-executor runner "); - const char* const name = argv[2]; + fail("usage: syz-executor runner "); + char* endptr = nullptr; + int vm_index = strtol(argv[2], &endptr, 10); + if (vm_index < 0 || *endptr != 0) + failmsg("failed to parse VM index", "str='%s'", argv[2]); const char* const manager_addr = argv[3]; const char* const manager_port = argv[4]; @@ -837,5 +840,5 @@ static void runner(char** argv, int argc) for (int fd = conn.FD(); fd < kCoverFilterFd;) fd = dup(fd); - Runner(conn, name, argv[0]); + Runner(conn, vm_index, argv[0]); } diff --git a/pkg/flatrpc/conn_test.go b/pkg/flatrpc/conn_test.go index c9448872c..3ab83996e 100644 --- a/pkg/flatrpc/conn_test.go +++ b/pkg/flatrpc/conn_test.go @@ -18,7 +18,7 @@ import ( func TestConn(t *testing.T) { connectReq := &ConnectRequest{ - Name: "foo", + Id: 1, Arch: "arch", GitRevision: "rev1", SyzRevision: "rev2", @@ -92,7 +92,7 @@ func TestConn(t *testing.T) { func BenchmarkConn(b *testing.B) { connectReq := &ConnectRequest{ - Name: "foo", + Id: 1, Arch: "arch", GitRevision: "rev1", SyzRevision: "rev2", diff --git a/pkg/flatrpc/flatrpc.fbs b/pkg/flatrpc/flatrpc.fbs index c7516e624..690286681 100644 --- a/pkg/flatrpc/flatrpc.fbs +++ b/pkg/flatrpc/flatrpc.fbs @@ -36,7 +36,7 @@ enum Feature : uint64 (bit_flags) { } table ConnectRequestRaw { - name :string; + id :int64; arch :string; git_revision :string; syz_revision :string; diff --git a/pkg/flatrpc/flatrpc.go b/pkg/flatrpc/flatrpc.go index aa0aa0257..ff2f4c623 100644 --- a/pkg/flatrpc/flatrpc.go +++ b/pkg/flatrpc/flatrpc.go @@ -459,7 +459,7 @@ func (v SnapshotState) String() string { } type ConnectRequestRawT struct { - Name string `json:"name"` + Id int64 `json:"id"` Arch string `json:"arch"` GitRevision string `json:"git_revision"` SyzRevision string `json:"syz_revision"` @@ -469,12 +469,11 @@ func (t *ConnectRequestRawT) Pack(builder *flatbuffers.Builder) flatbuffers.UOff if t == nil { return 0 } - nameOffset := builder.CreateString(t.Name) archOffset := builder.CreateString(t.Arch) gitRevisionOffset := builder.CreateString(t.GitRevision) syzRevisionOffset := builder.CreateString(t.SyzRevision) ConnectRequestRawStart(builder) - ConnectRequestRawAddName(builder, nameOffset) + ConnectRequestRawAddId(builder, t.Id) ConnectRequestRawAddArch(builder, archOffset) ConnectRequestRawAddGitRevision(builder, gitRevisionOffset) ConnectRequestRawAddSyzRevision(builder, syzRevisionOffset) @@ -482,7 +481,7 @@ func (t *ConnectRequestRawT) Pack(builder *flatbuffers.Builder) flatbuffers.UOff } func (rcv *ConnectRequestRaw) UnPackTo(t *ConnectRequestRawT) { - t.Name = string(rcv.Name()) + t.Id = rcv.Id() t.Arch = string(rcv.Arch()) t.GitRevision = string(rcv.GitRevision()) t.SyzRevision = string(rcv.SyzRevision()) @@ -524,12 +523,16 @@ func (rcv *ConnectRequestRaw) Table() flatbuffers.Table { return rcv._tab } -func (rcv *ConnectRequestRaw) Name() []byte { +func (rcv *ConnectRequestRaw) Id() int64 { o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) if o != 0 { - return rcv._tab.ByteVector(o + rcv._tab.Pos) + return rcv._tab.GetInt64(o + rcv._tab.Pos) } - return nil + return 0 +} + +func (rcv *ConnectRequestRaw) MutateId(n int64) bool { + return rcv._tab.MutateInt64Slot(4, n) } func (rcv *ConnectRequestRaw) Arch() []byte { @@ -559,8 +562,8 @@ func (rcv *ConnectRequestRaw) SyzRevision() []byte { func ConnectRequestRawStart(builder *flatbuffers.Builder) { builder.StartObject(4) } -func ConnectRequestRawAddName(builder *flatbuffers.Builder, name flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(name), 0) +func ConnectRequestRawAddId(builder *flatbuffers.Builder, id int64) { + builder.PrependInt64Slot(0, id, 0) } func ConnectRequestRawAddArch(builder *flatbuffers.Builder, arch flatbuffers.UOffsetT) { builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(arch), 0) diff --git a/pkg/flatrpc/flatrpc.h b/pkg/flatrpc/flatrpc.h index bc22ee63b..d1496630e 100644 --- a/pkg/flatrpc/flatrpc.h +++ b/pkg/flatrpc/flatrpc.h @@ -809,7 +809,7 @@ FLATBUFFERS_STRUCT_END(ComparisonRaw, 32); struct ConnectRequestRawT : public flatbuffers::NativeTable { typedef ConnectRequestRaw TableType; - std::string name{}; + int64_t id = 0; std::string arch{}; std::string git_revision{}; std::string syz_revision{}; @@ -819,13 +819,13 @@ struct ConnectRequestRaw FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { typedef ConnectRequestRawT NativeTableType; typedef ConnectRequestRawBuilder Builder; enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { - VT_NAME = 4, + VT_ID = 4, VT_ARCH = 6, VT_GIT_REVISION = 8, VT_SYZ_REVISION = 10 }; - const flatbuffers::String *name() const { - return GetPointer(VT_NAME); + int64_t id() const { + return GetField(VT_ID, 0); } const flatbuffers::String *arch() const { return GetPointer(VT_ARCH); @@ -838,8 +838,7 @@ struct ConnectRequestRaw FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { } bool Verify(flatbuffers::Verifier &verifier) const { return VerifyTableStart(verifier) && - VerifyOffset(verifier, VT_NAME) && - verifier.VerifyString(name()) && + VerifyField(verifier, VT_ID, 8) && VerifyOffset(verifier, VT_ARCH) && verifier.VerifyString(arch()) && VerifyOffset(verifier, VT_GIT_REVISION) && @@ -857,8 +856,8 @@ struct ConnectRequestRawBuilder { typedef ConnectRequestRaw Table; flatbuffers::FlatBufferBuilder &fbb_; flatbuffers::uoffset_t start_; - void add_name(flatbuffers::Offset name) { - fbb_.AddOffset(ConnectRequestRaw::VT_NAME, name); + void add_id(int64_t id) { + fbb_.AddElement(ConnectRequestRaw::VT_ID, id, 0); } void add_arch(flatbuffers::Offset arch) { fbb_.AddOffset(ConnectRequestRaw::VT_ARCH, arch); @@ -882,31 +881,30 @@ struct ConnectRequestRawBuilder { inline flatbuffers::Offset CreateConnectRequestRaw( flatbuffers::FlatBufferBuilder &_fbb, - flatbuffers::Offset name = 0, + int64_t id = 0, flatbuffers::Offset arch = 0, flatbuffers::Offset git_revision = 0, flatbuffers::Offset syz_revision = 0) { ConnectRequestRawBuilder builder_(_fbb); + builder_.add_id(id); builder_.add_syz_revision(syz_revision); builder_.add_git_revision(git_revision); builder_.add_arch(arch); - builder_.add_name(name); return builder_.Finish(); } inline flatbuffers::Offset CreateConnectRequestRawDirect( flatbuffers::FlatBufferBuilder &_fbb, - const char *name = nullptr, + int64_t id = 0, const char *arch = nullptr, const char *git_revision = nullptr, const char *syz_revision = nullptr) { - auto name__ = name ? _fbb.CreateString(name) : 0; auto arch__ = arch ? _fbb.CreateString(arch) : 0; auto git_revision__ = git_revision ? _fbb.CreateString(git_revision) : 0; auto syz_revision__ = syz_revision ? _fbb.CreateString(syz_revision) : 0; return rpc::CreateConnectRequestRaw( _fbb, - name__, + id, arch__, git_revision__, syz_revision__); @@ -2874,7 +2872,7 @@ inline ConnectRequestRawT *ConnectRequestRaw::UnPack(const flatbuffers::resolver inline void ConnectRequestRaw::UnPackTo(ConnectRequestRawT *_o, const flatbuffers::resolver_function_t *_resolver) const { (void)_o; (void)_resolver; - { auto _e = name(); if (_e) _o->name = _e->str(); } + { auto _e = id(); _o->id = _e; } { auto _e = arch(); if (_e) _o->arch = _e->str(); } { auto _e = git_revision(); if (_e) _o->git_revision = _e->str(); } { auto _e = syz_revision(); if (_e) _o->syz_revision = _e->str(); } @@ -2888,13 +2886,13 @@ inline flatbuffers::Offset CreateConnectRequestRaw(flatbuffer (void)_rehasher; (void)_o; struct _VectorArgs { flatbuffers::FlatBufferBuilder *__fbb; const ConnectRequestRawT* __o; const flatbuffers::rehasher_function_t *__rehasher; } _va = { &_fbb, _o, _rehasher}; (void)_va; - auto _name = _o->name.empty() ? 0 : _fbb.CreateString(_o->name); + auto _id = _o->id; auto _arch = _o->arch.empty() ? 0 : _fbb.CreateString(_o->arch); auto _git_revision = _o->git_revision.empty() ? 0 : _fbb.CreateString(_o->git_revision); auto _syz_revision = _o->syz_revision.empty() ? 0 : _fbb.CreateString(_o->syz_revision); return rpc::CreateConnectRequestRaw( _fbb, - _name, + _id, _arch, _git_revision, _syz_revision); diff --git a/pkg/rpcserver/local.go b/pkg/rpcserver/local.go index 1420af606..e1522aa79 100644 --- a/pkg/rpcserver/local.go +++ b/pkg/rpcserver/local.go @@ -57,12 +57,12 @@ func RunLocal(cfg *LocalConfig) error { // for the race detector b/c it does not understand the synchronization via TCP socket connect/accept. close(ctx.setupDone) - name := "local" - connErr := serv.CreateInstance(name, nil, nil) - defer serv.ShutdownInstance(name, true) + id := 0 + connErr := serv.CreateInstance(id, nil, nil) + defer serv.ShutdownInstance(id, true) bin := cfg.Executor - args := []string{"runner", name, "localhost", fmt.Sprint(serv.Port)} + args := []string{"runner", fmt.Sprint(id), "localhost", fmt.Sprint(serv.Port)} if cfg.GDB { bin = "gdb" args = append([]string{ diff --git a/pkg/rpcserver/rpcserver.go b/pkg/rpcserver/rpcserver.go index dd11fe0f8..5a104d81c 100644 --- a/pkg/rpcserver/rpcserver.go +++ b/pkg/rpcserver/rpcserver.go @@ -74,12 +74,9 @@ type Server struct { canonicalModules *cover.Canonicalizer coverFilter []uint64 - mu sync.Mutex - runners map[string]*Runner - execSource *queue.Distributor - // TODO: replace names with indices. - ids map[string]int - idSeq int + mu sync.Mutex + runners map[int]*Runner + execSource *queue.Distributor triagedCorpus atomic.Bool statVMRestarts *stat.Val *runnerStats @@ -140,8 +137,7 @@ func newImpl(ctx context.Context, cfg *Config, mgr Manager) (*Server, error) { target: cfg.Target, sysTarget: sysTarget, timeouts: sysTarget.Timeouts(cfg.Slowdown), - runners: make(map[string]*Runner), - ids: make(map[string]int), + runners: make(map[int]*Runner), checker: checker, baseSource: baseSource, execSource: queue.Distribute(queue.Retry(baseSource)), @@ -179,15 +175,15 @@ func (serv *Server) handleConn(conn *flatrpc.Conn) { log.Logf(1, "%s", err) return } - name := connectReq.Name - log.Logf(1, "runner %v connected", name) + id := int(connectReq.Id) + log.Logf(1, "runner %v connected", id) if serv.cfg.VMLess { // There is no VM loop, so minic what it would do. - serv.CreateInstance(name, nil, nil) + serv.CreateInstance(id, nil, nil) defer func() { - serv.StopFuzzing(name) - serv.ShutdownInstance(name, true) + serv.StopFuzzing(id) + serv.ShutdownInstance(id, true) }() } else { checkRevisions(connectReq, serv.cfg.Target) @@ -195,15 +191,15 @@ func (serv *Server) handleConn(conn *flatrpc.Conn) { serv.statVMRestarts.Add(1) serv.mu.Lock() - runner := serv.runners[name] + runner := serv.runners[id] serv.mu.Unlock() if runner == nil { - log.Logf(2, "unknown VM %v tries to connect", name) + log.Logf(2, "unknown VM %v tries to connect", id) return } err = serv.handleRunnerConn(runner, conn) - log.Logf(2, "runner %v: %v", name, err) + log.Logf(2, "runner %v: %v", id, err) runner.resultCh <- err } @@ -388,8 +384,9 @@ func (serv *Server) printMachineCheck(checkFilesInfo []*flatrpc.FileInfo, enable log.Logf(0, "machine check:\n%s", buf.Bytes()) } -func (serv *Server) CreateInstance(name string, injectExec chan<- bool, updInfo dispatcher.UpdateInfo) chan error { +func (serv *Server) CreateInstance(id int, injectExec chan<- bool, updInfo dispatcher.UpdateInfo) chan error { runner := &Runner{ + id: id, source: serv.execSource, cover: serv.cfg.Cover, coverEdges: serv.cfg.UseCoverEdges, @@ -409,25 +406,18 @@ func (serv *Server) CreateInstance(name string, injectExec chan<- bool, updInfo } serv.mu.Lock() defer serv.mu.Unlock() - if serv.runners[name] != nil { - panic(fmt.Sprintf("duplicate instance %s", name)) + if serv.runners[id] != nil { + panic(fmt.Sprintf("duplicate instance %v", id)) } - serv.runners[name] = runner - id, ok := serv.ids[name] - if !ok { - id = serv.idSeq - serv.idSeq++ - serv.ids[name] = id - } - runner.id = id + serv.runners[id] = runner return runner.resultCh } // stopInstance prevents further request exchange requests. // To make RPCServer fully forget an instance, shutdownInstance() must be called. -func (serv *Server) StopFuzzing(name string) { +func (serv *Server) StopFuzzing(id int) { serv.mu.Lock() - runner := serv.runners[name] + runner := serv.runners[id] serv.mu.Unlock() if runner.updInfo != nil { runner.updInfo(func(info *dispatcher.Info) { @@ -437,10 +427,10 @@ func (serv *Server) StopFuzzing(name string) { runner.Stop() } -func (serv *Server) ShutdownInstance(name string, crashed bool) ([]ExecRecord, []byte) { +func (serv *Server) ShutdownInstance(id int, crashed bool) ([]ExecRecord, []byte) { serv.mu.Lock() - runner := serv.runners[name] - delete(serv.runners, name) + runner := serv.runners[id] + delete(serv.runners, id) serv.mu.Unlock() return runner.Shutdown(crashed), runner.MachineInfo() } diff --git a/syz-manager/manager.go b/syz-manager/manager.go index fe5e8f8f1..c167e171f 100644 --- a/syz-manager/manager.go +++ b/syz-manager/manager.go @@ -152,7 +152,7 @@ const ( const currentDBVersion = 5 type Crash struct { - instanceName string + instanceIndex int fromHub bool // this crash was created based on a repro from syz-hub fromDashboard bool // .. or from dashboard manual bool @@ -701,13 +701,11 @@ func containsDisabled(p *prog.Prog, enabled map[*prog.Syscall]bool) bool { } func (mgr *Manager) fuzzerInstance(ctx context.Context, inst *vm.Instance, updInfo dispatcher.UpdateInfo) { - index := inst.Index() - instanceName := fmt.Sprintf("vm-%d", index) injectExec := make(chan bool, 10) - mgr.serv.CreateInstance(instanceName, injectExec, updInfo) + mgr.serv.CreateInstance(inst.Index(), injectExec, updInfo) - rep, vmInfo, err := mgr.runInstanceInner(ctx, inst, instanceName, injectExec) - lastExec, machineInfo := mgr.serv.ShutdownInstance(instanceName, rep != nil) + rep, vmInfo, err := mgr.runInstanceInner(ctx, inst, injectExec) + lastExec, machineInfo := mgr.serv.ShutdownInstance(inst.Index(), rep != nil) if rep != nil { prependExecuting(rep, lastExec) if len(vmInfo) != 0 { @@ -717,16 +715,16 @@ func (mgr *Manager) fuzzerInstance(ctx context.Context, inst *vm.Instance, updIn } if err == nil && rep != nil { mgr.crashes <- &Crash{ - instanceName: instanceName, - Report: rep, + instanceIndex: inst.Index(), + Report: rep, } } if err != nil { - log.Logf(1, "%s: failed with error: %v", instanceName, err) + log.Logf(1, "VM %v: failed with error: %v", inst.Index(), err) } } -func (mgr *Manager) runInstanceInner(ctx context.Context, inst *vm.Instance, instanceName string, +func (mgr *Manager) runInstanceInner(ctx context.Context, inst *vm.Instance, injectExec <-chan bool) (*report.Report, []byte, error) { fwdAddr, err := inst.Forward(mgr.serv.Port) if err != nil { @@ -750,14 +748,14 @@ func (mgr *Manager) runInstanceInner(ctx context.Context, inst *vm.Instance, ins if err != nil { return nil, nil, fmt.Errorf("failed to parse manager's address") } - cmd := fmt.Sprintf("%v runner %v %v %v", executorBin, instanceName, host, port) + cmd := fmt.Sprintf("%v runner %v %v %v", executorBin, inst.Index(), host, port) _, rep, err := inst.Run(mgr.cfg.Timeouts.VMRunningTime, mgr.reporter, cmd, vm.ExitTimeout, vm.StopContext(ctx), vm.InjectExecuting(injectExec), vm.EarlyFinishCb(func() { // Depending on the crash type and kernel config, fuzzing may continue // running for several seconds even after kernel has printed a crash report. // This litters the log and we want to prevent it. - mgr.serv.StopFuzzing(instanceName) + mgr.serv.StopFuzzing(inst.Index()) }), ) if err != nil { @@ -765,7 +763,7 @@ func (mgr *Manager) runInstanceInner(ctx context.Context, inst *vm.Instance, ins } if rep == nil { // This is the only "OK" outcome. - log.Logf(0, "%s: running for %v, restarting", instanceName, time.Since(start)) + log.Logf(0, "VM %v: running for %v, restarting", inst.Index(), time.Since(start)) return nil, nil, nil } vmInfo, err := inst.Info() @@ -825,7 +823,7 @@ func (mgr *Manager) saveCrash(crash *Crash) bool { if crash.Suppressed { flags += " [suppressed]" } - log.Logf(0, "%s: crash: %v%v", crash.instanceName, crash.Title, flags) + log.Logf(0, "VM %v: crash: %v%v", crash.instanceIndex, crash.Title, flags) if mgr.mode == ModeSmokeTest { data, err := json.Marshal(crash.Report) -- cgit mrf-deployment