diff options
| author | Dmitry Vyukov <dvyukov@google.com> | 2024-07-01 14:26:05 +0200 |
|---|---|---|
| committer | Dmitry Vyukov <dvyukov@google.com> | 2024-08-02 13:16:51 +0000 |
| commit | 66fcb0a84fcd55ad8e1444cdd0bc0ad6592f7329 (patch) | |
| tree | 998e52d5569938e0251da1eb7c54c3746186b488 /pkg/rpcserver | |
| parent | 1e9c4cf3ae82ef82220af312606fffe65e124563 (diff) | |
pkg/fuzzer: try to triage on different VMs
Distribute triage requests to different VMs.
Diffstat (limited to 'pkg/rpcserver')
| -rw-r--r-- | pkg/rpcserver/rpcserver.go | 19 | ||||
| -rw-r--r-- | pkg/rpcserver/runner.go | 19 |
2 files changed, 32 insertions, 6 deletions
diff --git a/pkg/rpcserver/rpcserver.go b/pkg/rpcserver/rpcserver.go index 37e74c91b..da3a0f076 100644 --- a/pkg/rpcserver/rpcserver.go +++ b/pkg/rpcserver/rpcserver.go @@ -76,9 +76,12 @@ type Server struct { canonicalModules *cover.Canonicalizer coverFilter []uint64 - mu sync.Mutex - runners map[string]*Runner - execSource queue.Source + mu sync.Mutex + runners map[string]*Runner + execSource *queue.Distributor + // TODO: replace names with indices. + ids map[string]int + idSeq int triagedCorpus atomic.Bool statVMRestarts *stat.Val *runnerStats @@ -140,9 +143,10 @@ func newImpl(ctx context.Context, cfg *Config, mgr Manager) (*Server, error) { sysTarget: sysTarget, timeouts: sysTarget.Timeouts(cfg.Slowdown), runners: make(map[string]*Runner), + ids: make(map[string]int), checker: checker, baseSource: baseSource, - execSource: queue.Retry(baseSource), + execSource: queue.Distribute(queue.Retry(baseSource)), statVMRestarts: stat.New("vm restarts", "Total number of VM starts", stat.Rate{}, stat.NoGraph), @@ -412,6 +416,13 @@ func (serv *Server) CreateInstance(name string, injectExec chan<- bool, updInfo panic(fmt.Sprintf("duplicate instance %s", name)) } serv.runners[name] = runner + id, ok := serv.ids[name] + if !ok { + id = serv.idSeq + serv.idSeq++ + serv.ids[name] = id + } + runner.id = id return runner.resultCh } diff --git a/pkg/rpcserver/runner.go b/pkg/rpcserver/runner.go index a0b519d20..f14a2ec29 100644 --- a/pkg/rpcserver/runner.go +++ b/pkg/rpcserver/runner.go @@ -25,7 +25,8 @@ import ( ) type Runner struct { - source queue.Source + id int + source *queue.Distributor procs int cover bool coverEdges bool @@ -174,7 +175,7 @@ func (runner *Runner) ConnectionLoop() error { } } for len(runner.requests)-len(runner.executing) < 2*runner.procs { - req := runner.source.Next() + req := runner.source.Next(runner.id) if req == nil { break } @@ -320,11 +321,21 @@ func (runner *Runner) sendRequest(req *queue.Request) error { } data = fileData } + var avoid uint64 + for _, id := range req.Avoid { + if id.VM == runner.id { + avoid |= uint64(1 << id.Proc) + } + } + if avoid == (uint64(1)<<runner.procs)-1 { + avoid = 0 + } msg := &flatrpc.HostMessage{ Msg: &flatrpc.HostMessages{ Type: flatrpc.HostMessagesRawExecRequest, Value: &flatrpc.ExecRequest{ Id: id, + Avoid: avoid, ProgData: data, Flags: flags, ExecOpts: &opts, @@ -408,6 +419,10 @@ func (runner *Runner) handleExecResult(msg *flatrpc.ExecResult) error { resErr = errors.New(msg.Error) } req.Done(&queue.Result{ + Executor: queue.ExecutorID{ + VM: runner.id, + Proc: int(msg.Proc), + }, Status: status, Info: msg.Info, Output: slices.Clone(msg.Output), |
