aboutsummaryrefslogtreecommitdiffstats
path: root/pkg/rpcserver
diff options
context:
space:
mode:
authorDmitry Vyukov <dvyukov@google.com>2024-07-01 14:26:05 +0200
committerDmitry Vyukov <dvyukov@google.com>2024-08-02 13:16:51 +0000
commit66fcb0a84fcd55ad8e1444cdd0bc0ad6592f7329 (patch)
tree998e52d5569938e0251da1eb7c54c3746186b488 /pkg/rpcserver
parent1e9c4cf3ae82ef82220af312606fffe65e124563 (diff)
pkg/fuzzer: try to triage on different VMs
Distribute triage requests to different VMs.
Diffstat (limited to 'pkg/rpcserver')
-rw-r--r--pkg/rpcserver/rpcserver.go19
-rw-r--r--pkg/rpcserver/runner.go19
2 files changed, 32 insertions, 6 deletions
diff --git a/pkg/rpcserver/rpcserver.go b/pkg/rpcserver/rpcserver.go
index 37e74c91b..da3a0f076 100644
--- a/pkg/rpcserver/rpcserver.go
+++ b/pkg/rpcserver/rpcserver.go
@@ -76,9 +76,12 @@ type Server struct {
canonicalModules *cover.Canonicalizer
coverFilter []uint64
- mu sync.Mutex
- runners map[string]*Runner
- execSource queue.Source
+ mu sync.Mutex
+ runners map[string]*Runner
+ execSource *queue.Distributor
+ // TODO: replace names with indices.
+ ids map[string]int
+ idSeq int
triagedCorpus atomic.Bool
statVMRestarts *stat.Val
*runnerStats
@@ -140,9 +143,10 @@ func newImpl(ctx context.Context, cfg *Config, mgr Manager) (*Server, error) {
sysTarget: sysTarget,
timeouts: sysTarget.Timeouts(cfg.Slowdown),
runners: make(map[string]*Runner),
+ ids: make(map[string]int),
checker: checker,
baseSource: baseSource,
- execSource: queue.Retry(baseSource),
+ execSource: queue.Distribute(queue.Retry(baseSource)),
statVMRestarts: stat.New("vm restarts", "Total number of VM starts",
stat.Rate{}, stat.NoGraph),
@@ -412,6 +416,13 @@ func (serv *Server) CreateInstance(name string, injectExec chan<- bool, updInfo
panic(fmt.Sprintf("duplicate instance %s", name))
}
serv.runners[name] = runner
+ id, ok := serv.ids[name]
+ if !ok {
+ id = serv.idSeq
+ serv.idSeq++
+ serv.ids[name] = id
+ }
+ runner.id = id
return runner.resultCh
}
diff --git a/pkg/rpcserver/runner.go b/pkg/rpcserver/runner.go
index a0b519d20..f14a2ec29 100644
--- a/pkg/rpcserver/runner.go
+++ b/pkg/rpcserver/runner.go
@@ -25,7 +25,8 @@ import (
)
type Runner struct {
- source queue.Source
+ id int
+ source *queue.Distributor
procs int
cover bool
coverEdges bool
@@ -174,7 +175,7 @@ func (runner *Runner) ConnectionLoop() error {
}
}
for len(runner.requests)-len(runner.executing) < 2*runner.procs {
- req := runner.source.Next()
+ req := runner.source.Next(runner.id)
if req == nil {
break
}
@@ -320,11 +321,21 @@ func (runner *Runner) sendRequest(req *queue.Request) error {
}
data = fileData
}
+ var avoid uint64
+ for _, id := range req.Avoid {
+ if id.VM == runner.id {
+ avoid |= uint64(1 << id.Proc)
+ }
+ }
+ if avoid == (uint64(1)<<runner.procs)-1 {
+ avoid = 0
+ }
msg := &flatrpc.HostMessage{
Msg: &flatrpc.HostMessages{
Type: flatrpc.HostMessagesRawExecRequest,
Value: &flatrpc.ExecRequest{
Id: id,
+ Avoid: avoid,
ProgData: data,
Flags: flags,
ExecOpts: &opts,
@@ -408,6 +419,10 @@ func (runner *Runner) handleExecResult(msg *flatrpc.ExecResult) error {
resErr = errors.New(msg.Error)
}
req.Done(&queue.Result{
+ Executor: queue.ExecutorID{
+ VM: runner.id,
+ Proc: int(msg.Proc),
+ },
Status: status,
Info: msg.Info,
Output: slices.Clone(msg.Output),