From e47492d07d852b143ac4f7d33bb4dc4f900d2410 Mon Sep 17 00:00:00 2001 From: Dmitry Vyukov Date: Fri, 26 Jul 2024 12:00:54 +0200 Subject: pkg/rpcserver: use dense VM indices instead of string names Using actual VM indices for VM identification allows to match these indices to VMs in the pool, allows to use dense arrays to store information about runners (e.g. in queue.Distributor), and just removes string names as unnecessary additional entities. --- pkg/rpcserver/local.go | 8 +++---- pkg/rpcserver/rpcserver.go | 54 +++++++++++++++++++--------------------------- 2 files changed, 26 insertions(+), 36 deletions(-) (limited to 'pkg/rpcserver') diff --git a/pkg/rpcserver/local.go b/pkg/rpcserver/local.go index 1420af606..e1522aa79 100644 --- a/pkg/rpcserver/local.go +++ b/pkg/rpcserver/local.go @@ -57,12 +57,12 @@ func RunLocal(cfg *LocalConfig) error { // for the race detector b/c it does not understand the synchronization via TCP socket connect/accept. close(ctx.setupDone) - name := "local" - connErr := serv.CreateInstance(name, nil, nil) - defer serv.ShutdownInstance(name, true) + id := 0 + connErr := serv.CreateInstance(id, nil, nil) + defer serv.ShutdownInstance(id, true) bin := cfg.Executor - args := []string{"runner", name, "localhost", fmt.Sprint(serv.Port)} + args := []string{"runner", fmt.Sprint(id), "localhost", fmt.Sprint(serv.Port)} if cfg.GDB { bin = "gdb" args = append([]string{ diff --git a/pkg/rpcserver/rpcserver.go b/pkg/rpcserver/rpcserver.go index dd11fe0f8..5a104d81c 100644 --- a/pkg/rpcserver/rpcserver.go +++ b/pkg/rpcserver/rpcserver.go @@ -74,12 +74,9 @@ type Server struct { canonicalModules *cover.Canonicalizer coverFilter []uint64 - mu sync.Mutex - runners map[string]*Runner - execSource *queue.Distributor - // TODO: replace names with indices. - ids map[string]int - idSeq int + mu sync.Mutex + runners map[int]*Runner + execSource *queue.Distributor triagedCorpus atomic.Bool statVMRestarts *stat.Val *runnerStats @@ -140,8 +137,7 @@ func newImpl(ctx context.Context, cfg *Config, mgr Manager) (*Server, error) { target: cfg.Target, sysTarget: sysTarget, timeouts: sysTarget.Timeouts(cfg.Slowdown), - runners: make(map[string]*Runner), - ids: make(map[string]int), + runners: make(map[int]*Runner), checker: checker, baseSource: baseSource, execSource: queue.Distribute(queue.Retry(baseSource)), @@ -179,15 +175,15 @@ func (serv *Server) handleConn(conn *flatrpc.Conn) { log.Logf(1, "%s", err) return } - name := connectReq.Name - log.Logf(1, "runner %v connected", name) + id := int(connectReq.Id) + log.Logf(1, "runner %v connected", id) if serv.cfg.VMLess { // There is no VM loop, so minic what it would do. - serv.CreateInstance(name, nil, nil) + serv.CreateInstance(id, nil, nil) defer func() { - serv.StopFuzzing(name) - serv.ShutdownInstance(name, true) + serv.StopFuzzing(id) + serv.ShutdownInstance(id, true) }() } else { checkRevisions(connectReq, serv.cfg.Target) @@ -195,15 +191,15 @@ func (serv *Server) handleConn(conn *flatrpc.Conn) { serv.statVMRestarts.Add(1) serv.mu.Lock() - runner := serv.runners[name] + runner := serv.runners[id] serv.mu.Unlock() if runner == nil { - log.Logf(2, "unknown VM %v tries to connect", name) + log.Logf(2, "unknown VM %v tries to connect", id) return } err = serv.handleRunnerConn(runner, conn) - log.Logf(2, "runner %v: %v", name, err) + log.Logf(2, "runner %v: %v", id, err) runner.resultCh <- err } @@ -388,8 +384,9 @@ func (serv *Server) printMachineCheck(checkFilesInfo []*flatrpc.FileInfo, enable log.Logf(0, "machine check:\n%s", buf.Bytes()) } -func (serv *Server) CreateInstance(name string, injectExec chan<- bool, updInfo dispatcher.UpdateInfo) chan error { +func (serv *Server) CreateInstance(id int, injectExec chan<- bool, updInfo dispatcher.UpdateInfo) chan error { runner := &Runner{ + id: id, source: serv.execSource, cover: serv.cfg.Cover, coverEdges: serv.cfg.UseCoverEdges, @@ -409,25 +406,18 @@ func (serv *Server) CreateInstance(name string, injectExec chan<- bool, updInfo } serv.mu.Lock() defer serv.mu.Unlock() - if serv.runners[name] != nil { - panic(fmt.Sprintf("duplicate instance %s", name)) + if serv.runners[id] != nil { + panic(fmt.Sprintf("duplicate instance %v", id)) } - serv.runners[name] = runner - id, ok := serv.ids[name] - if !ok { - id = serv.idSeq - serv.idSeq++ - serv.ids[name] = id - } - runner.id = id + serv.runners[id] = runner return runner.resultCh } // stopInstance prevents further request exchange requests. // To make RPCServer fully forget an instance, shutdownInstance() must be called. -func (serv *Server) StopFuzzing(name string) { +func (serv *Server) StopFuzzing(id int) { serv.mu.Lock() - runner := serv.runners[name] + runner := serv.runners[id] serv.mu.Unlock() if runner.updInfo != nil { runner.updInfo(func(info *dispatcher.Info) { @@ -437,10 +427,10 @@ func (serv *Server) StopFuzzing(name string) { runner.Stop() } -func (serv *Server) ShutdownInstance(name string, crashed bool) ([]ExecRecord, []byte) { +func (serv *Server) ShutdownInstance(id int, crashed bool) ([]ExecRecord, []byte) { serv.mu.Lock() - runner := serv.runners[name] - delete(serv.runners, name) + runner := serv.runners[id] + delete(serv.runners, id) serv.mu.Unlock() return runner.Shutdown(crashed), runner.MachineInfo() } -- cgit mrf-deployment