aboutsummaryrefslogtreecommitdiffstats
path: root/pkg
diff options
context:
space:
mode:
authorDmitry Vyukov <dvyukov@google.com>2024-07-26 12:00:54 +0200
committerDmitry Vyukov <dvyukov@google.com>2024-08-02 13:16:51 +0000
commite47492d07d852b143ac4f7d33bb4dc4f900d2410 (patch)
treea60162c2997a3ab00282b8edd1af6d3f70598881 /pkg
parent137fdde817df60cccfeb9a40237533e48bc143ec (diff)
pkg/rpcserver: use dense VM indices instead of string names
Using actual VM indices for VM identification allows to match these indices to VMs in the pool, allows to use dense arrays to store information about runners (e.g. in queue.Distributor), and just removes string names as unnecessary additional entities.
Diffstat (limited to 'pkg')
-rw-r--r--pkg/flatrpc/conn_test.go4
-rw-r--r--pkg/flatrpc/flatrpc.fbs2
-rw-r--r--pkg/flatrpc/flatrpc.go21
-rw-r--r--pkg/flatrpc/flatrpc.h30
-rw-r--r--pkg/rpcserver/local.go8
-rw-r--r--pkg/rpcserver/rpcserver.go54
6 files changed, 55 insertions, 64 deletions
diff --git a/pkg/flatrpc/conn_test.go b/pkg/flatrpc/conn_test.go
index c9448872c..3ab83996e 100644
--- a/pkg/flatrpc/conn_test.go
+++ b/pkg/flatrpc/conn_test.go
@@ -18,7 +18,7 @@ import (
func TestConn(t *testing.T) {
connectReq := &ConnectRequest{
- Name: "foo",
+ Id: 1,
Arch: "arch",
GitRevision: "rev1",
SyzRevision: "rev2",
@@ -92,7 +92,7 @@ func TestConn(t *testing.T) {
func BenchmarkConn(b *testing.B) {
connectReq := &ConnectRequest{
- Name: "foo",
+ Id: 1,
Arch: "arch",
GitRevision: "rev1",
SyzRevision: "rev2",
diff --git a/pkg/flatrpc/flatrpc.fbs b/pkg/flatrpc/flatrpc.fbs
index c7516e624..690286681 100644
--- a/pkg/flatrpc/flatrpc.fbs
+++ b/pkg/flatrpc/flatrpc.fbs
@@ -36,7 +36,7 @@ enum Feature : uint64 (bit_flags) {
}
table ConnectRequestRaw {
- name :string;
+ id :int64;
arch :string;
git_revision :string;
syz_revision :string;
diff --git a/pkg/flatrpc/flatrpc.go b/pkg/flatrpc/flatrpc.go
index aa0aa0257..ff2f4c623 100644
--- a/pkg/flatrpc/flatrpc.go
+++ b/pkg/flatrpc/flatrpc.go
@@ -459,7 +459,7 @@ func (v SnapshotState) String() string {
}
type ConnectRequestRawT struct {
- Name string `json:"name"`
+ Id int64 `json:"id"`
Arch string `json:"arch"`
GitRevision string `json:"git_revision"`
SyzRevision string `json:"syz_revision"`
@@ -469,12 +469,11 @@ func (t *ConnectRequestRawT) Pack(builder *flatbuffers.Builder) flatbuffers.UOff
if t == nil {
return 0
}
- nameOffset := builder.CreateString(t.Name)
archOffset := builder.CreateString(t.Arch)
gitRevisionOffset := builder.CreateString(t.GitRevision)
syzRevisionOffset := builder.CreateString(t.SyzRevision)
ConnectRequestRawStart(builder)
- ConnectRequestRawAddName(builder, nameOffset)
+ ConnectRequestRawAddId(builder, t.Id)
ConnectRequestRawAddArch(builder, archOffset)
ConnectRequestRawAddGitRevision(builder, gitRevisionOffset)
ConnectRequestRawAddSyzRevision(builder, syzRevisionOffset)
@@ -482,7 +481,7 @@ func (t *ConnectRequestRawT) Pack(builder *flatbuffers.Builder) flatbuffers.UOff
}
func (rcv *ConnectRequestRaw) UnPackTo(t *ConnectRequestRawT) {
- t.Name = string(rcv.Name())
+ t.Id = rcv.Id()
t.Arch = string(rcv.Arch())
t.GitRevision = string(rcv.GitRevision())
t.SyzRevision = string(rcv.SyzRevision())
@@ -524,12 +523,16 @@ func (rcv *ConnectRequestRaw) Table() flatbuffers.Table {
return rcv._tab
}
-func (rcv *ConnectRequestRaw) Name() []byte {
+func (rcv *ConnectRequestRaw) Id() int64 {
o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
if o != 0 {
- return rcv._tab.ByteVector(o + rcv._tab.Pos)
+ return rcv._tab.GetInt64(o + rcv._tab.Pos)
}
- return nil
+ return 0
+}
+
+func (rcv *ConnectRequestRaw) MutateId(n int64) bool {
+ return rcv._tab.MutateInt64Slot(4, n)
}
func (rcv *ConnectRequestRaw) Arch() []byte {
@@ -559,8 +562,8 @@ func (rcv *ConnectRequestRaw) SyzRevision() []byte {
func ConnectRequestRawStart(builder *flatbuffers.Builder) {
builder.StartObject(4)
}
-func ConnectRequestRawAddName(builder *flatbuffers.Builder, name flatbuffers.UOffsetT) {
- builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(name), 0)
+func ConnectRequestRawAddId(builder *flatbuffers.Builder, id int64) {
+ builder.PrependInt64Slot(0, id, 0)
}
func ConnectRequestRawAddArch(builder *flatbuffers.Builder, arch flatbuffers.UOffsetT) {
builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(arch), 0)
diff --git a/pkg/flatrpc/flatrpc.h b/pkg/flatrpc/flatrpc.h
index bc22ee63b..d1496630e 100644
--- a/pkg/flatrpc/flatrpc.h
+++ b/pkg/flatrpc/flatrpc.h
@@ -809,7 +809,7 @@ FLATBUFFERS_STRUCT_END(ComparisonRaw, 32);
struct ConnectRequestRawT : public flatbuffers::NativeTable {
typedef ConnectRequestRaw TableType;
- std::string name{};
+ int64_t id = 0;
std::string arch{};
std::string git_revision{};
std::string syz_revision{};
@@ -819,13 +819,13 @@ struct ConnectRequestRaw FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
typedef ConnectRequestRawT NativeTableType;
typedef ConnectRequestRawBuilder Builder;
enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE {
- VT_NAME = 4,
+ VT_ID = 4,
VT_ARCH = 6,
VT_GIT_REVISION = 8,
VT_SYZ_REVISION = 10
};
- const flatbuffers::String *name() const {
- return GetPointer<const flatbuffers::String *>(VT_NAME);
+ int64_t id() const {
+ return GetField<int64_t>(VT_ID, 0);
}
const flatbuffers::String *arch() const {
return GetPointer<const flatbuffers::String *>(VT_ARCH);
@@ -838,8 +838,7 @@ struct ConnectRequestRaw FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
}
bool Verify(flatbuffers::Verifier &verifier) const {
return VerifyTableStart(verifier) &&
- VerifyOffset(verifier, VT_NAME) &&
- verifier.VerifyString(name()) &&
+ VerifyField<int64_t>(verifier, VT_ID, 8) &&
VerifyOffset(verifier, VT_ARCH) &&
verifier.VerifyString(arch()) &&
VerifyOffset(verifier, VT_GIT_REVISION) &&
@@ -857,8 +856,8 @@ struct ConnectRequestRawBuilder {
typedef ConnectRequestRaw Table;
flatbuffers::FlatBufferBuilder &fbb_;
flatbuffers::uoffset_t start_;
- void add_name(flatbuffers::Offset<flatbuffers::String> name) {
- fbb_.AddOffset(ConnectRequestRaw::VT_NAME, name);
+ void add_id(int64_t id) {
+ fbb_.AddElement<int64_t>(ConnectRequestRaw::VT_ID, id, 0);
}
void add_arch(flatbuffers::Offset<flatbuffers::String> arch) {
fbb_.AddOffset(ConnectRequestRaw::VT_ARCH, arch);
@@ -882,31 +881,30 @@ struct ConnectRequestRawBuilder {
inline flatbuffers::Offset<ConnectRequestRaw> CreateConnectRequestRaw(
flatbuffers::FlatBufferBuilder &_fbb,
- flatbuffers::Offset<flatbuffers::String> name = 0,
+ int64_t id = 0,
flatbuffers::Offset<flatbuffers::String> arch = 0,
flatbuffers::Offset<flatbuffers::String> git_revision = 0,
flatbuffers::Offset<flatbuffers::String> syz_revision = 0) {
ConnectRequestRawBuilder builder_(_fbb);
+ builder_.add_id(id);
builder_.add_syz_revision(syz_revision);
builder_.add_git_revision(git_revision);
builder_.add_arch(arch);
- builder_.add_name(name);
return builder_.Finish();
}
inline flatbuffers::Offset<ConnectRequestRaw> CreateConnectRequestRawDirect(
flatbuffers::FlatBufferBuilder &_fbb,
- const char *name = nullptr,
+ int64_t id = 0,
const char *arch = nullptr,
const char *git_revision = nullptr,
const char *syz_revision = nullptr) {
- auto name__ = name ? _fbb.CreateString(name) : 0;
auto arch__ = arch ? _fbb.CreateString(arch) : 0;
auto git_revision__ = git_revision ? _fbb.CreateString(git_revision) : 0;
auto syz_revision__ = syz_revision ? _fbb.CreateString(syz_revision) : 0;
return rpc::CreateConnectRequestRaw(
_fbb,
- name__,
+ id,
arch__,
git_revision__,
syz_revision__);
@@ -2874,7 +2872,7 @@ inline ConnectRequestRawT *ConnectRequestRaw::UnPack(const flatbuffers::resolver
inline void ConnectRequestRaw::UnPackTo(ConnectRequestRawT *_o, const flatbuffers::resolver_function_t *_resolver) const {
(void)_o;
(void)_resolver;
- { auto _e = name(); if (_e) _o->name = _e->str(); }
+ { auto _e = id(); _o->id = _e; }
{ auto _e = arch(); if (_e) _o->arch = _e->str(); }
{ auto _e = git_revision(); if (_e) _o->git_revision = _e->str(); }
{ auto _e = syz_revision(); if (_e) _o->syz_revision = _e->str(); }
@@ -2888,13 +2886,13 @@ inline flatbuffers::Offset<ConnectRequestRaw> CreateConnectRequestRaw(flatbuffer
(void)_rehasher;
(void)_o;
struct _VectorArgs { flatbuffers::FlatBufferBuilder *__fbb; const ConnectRequestRawT* __o; const flatbuffers::rehasher_function_t *__rehasher; } _va = { &_fbb, _o, _rehasher}; (void)_va;
- auto _name = _o->name.empty() ? 0 : _fbb.CreateString(_o->name);
+ auto _id = _o->id;
auto _arch = _o->arch.empty() ? 0 : _fbb.CreateString(_o->arch);
auto _git_revision = _o->git_revision.empty() ? 0 : _fbb.CreateString(_o->git_revision);
auto _syz_revision = _o->syz_revision.empty() ? 0 : _fbb.CreateString(_o->syz_revision);
return rpc::CreateConnectRequestRaw(
_fbb,
- _name,
+ _id,
_arch,
_git_revision,
_syz_revision);
diff --git a/pkg/rpcserver/local.go b/pkg/rpcserver/local.go
index 1420af606..e1522aa79 100644
--- a/pkg/rpcserver/local.go
+++ b/pkg/rpcserver/local.go
@@ -57,12 +57,12 @@ func RunLocal(cfg *LocalConfig) error {
// for the race detector b/c it does not understand the synchronization via TCP socket connect/accept.
close(ctx.setupDone)
- name := "local"
- connErr := serv.CreateInstance(name, nil, nil)
- defer serv.ShutdownInstance(name, true)
+ id := 0
+ connErr := serv.CreateInstance(id, nil, nil)
+ defer serv.ShutdownInstance(id, true)
bin := cfg.Executor
- args := []string{"runner", name, "localhost", fmt.Sprint(serv.Port)}
+ args := []string{"runner", fmt.Sprint(id), "localhost", fmt.Sprint(serv.Port)}
if cfg.GDB {
bin = "gdb"
args = append([]string{
diff --git a/pkg/rpcserver/rpcserver.go b/pkg/rpcserver/rpcserver.go
index dd11fe0f8..5a104d81c 100644
--- a/pkg/rpcserver/rpcserver.go
+++ b/pkg/rpcserver/rpcserver.go
@@ -74,12 +74,9 @@ type Server struct {
canonicalModules *cover.Canonicalizer
coverFilter []uint64
- mu sync.Mutex
- runners map[string]*Runner
- execSource *queue.Distributor
- // TODO: replace names with indices.
- ids map[string]int
- idSeq int
+ mu sync.Mutex
+ runners map[int]*Runner
+ execSource *queue.Distributor
triagedCorpus atomic.Bool
statVMRestarts *stat.Val
*runnerStats
@@ -140,8 +137,7 @@ func newImpl(ctx context.Context, cfg *Config, mgr Manager) (*Server, error) {
target: cfg.Target,
sysTarget: sysTarget,
timeouts: sysTarget.Timeouts(cfg.Slowdown),
- runners: make(map[string]*Runner),
- ids: make(map[string]int),
+ runners: make(map[int]*Runner),
checker: checker,
baseSource: baseSource,
execSource: queue.Distribute(queue.Retry(baseSource)),
@@ -179,15 +175,15 @@ func (serv *Server) handleConn(conn *flatrpc.Conn) {
log.Logf(1, "%s", err)
return
}
- name := connectReq.Name
- log.Logf(1, "runner %v connected", name)
+ id := int(connectReq.Id)
+ log.Logf(1, "runner %v connected", id)
if serv.cfg.VMLess {
// There is no VM loop, so minic what it would do.
- serv.CreateInstance(name, nil, nil)
+ serv.CreateInstance(id, nil, nil)
defer func() {
- serv.StopFuzzing(name)
- serv.ShutdownInstance(name, true)
+ serv.StopFuzzing(id)
+ serv.ShutdownInstance(id, true)
}()
} else {
checkRevisions(connectReq, serv.cfg.Target)
@@ -195,15 +191,15 @@ func (serv *Server) handleConn(conn *flatrpc.Conn) {
serv.statVMRestarts.Add(1)
serv.mu.Lock()
- runner := serv.runners[name]
+ runner := serv.runners[id]
serv.mu.Unlock()
if runner == nil {
- log.Logf(2, "unknown VM %v tries to connect", name)
+ log.Logf(2, "unknown VM %v tries to connect", id)
return
}
err = serv.handleRunnerConn(runner, conn)
- log.Logf(2, "runner %v: %v", name, err)
+ log.Logf(2, "runner %v: %v", id, err)
runner.resultCh <- err
}
@@ -388,8 +384,9 @@ func (serv *Server) printMachineCheck(checkFilesInfo []*flatrpc.FileInfo, enable
log.Logf(0, "machine check:\n%s", buf.Bytes())
}
-func (serv *Server) CreateInstance(name string, injectExec chan<- bool, updInfo dispatcher.UpdateInfo) chan error {
+func (serv *Server) CreateInstance(id int, injectExec chan<- bool, updInfo dispatcher.UpdateInfo) chan error {
runner := &Runner{
+ id: id,
source: serv.execSource,
cover: serv.cfg.Cover,
coverEdges: serv.cfg.UseCoverEdges,
@@ -409,25 +406,18 @@ func (serv *Server) CreateInstance(name string, injectExec chan<- bool, updInfo
}
serv.mu.Lock()
defer serv.mu.Unlock()
- if serv.runners[name] != nil {
- panic(fmt.Sprintf("duplicate instance %s", name))
+ if serv.runners[id] != nil {
+ panic(fmt.Sprintf("duplicate instance %v", id))
}
- serv.runners[name] = runner
- id, ok := serv.ids[name]
- if !ok {
- id = serv.idSeq
- serv.idSeq++
- serv.ids[name] = id
- }
- runner.id = id
+ serv.runners[id] = runner
return runner.resultCh
}
// stopInstance prevents further request exchange requests.
// To make RPCServer fully forget an instance, shutdownInstance() must be called.
-func (serv *Server) StopFuzzing(name string) {
+func (serv *Server) StopFuzzing(id int) {
serv.mu.Lock()
- runner := serv.runners[name]
+ runner := serv.runners[id]
serv.mu.Unlock()
if runner.updInfo != nil {
runner.updInfo(func(info *dispatcher.Info) {
@@ -437,10 +427,10 @@ func (serv *Server) StopFuzzing(name string) {
runner.Stop()
}
-func (serv *Server) ShutdownInstance(name string, crashed bool) ([]ExecRecord, []byte) {
+func (serv *Server) ShutdownInstance(id int, crashed bool) ([]ExecRecord, []byte) {
serv.mu.Lock()
- runner := serv.runners[name]
- delete(serv.runners, name)
+ runner := serv.runners[id]
+ delete(serv.runners, id)
serv.mu.Unlock()
return runner.Shutdown(crashed), runner.MachineInfo()
}