diff options
| author | Aleksandr Nogikh <nogikh@google.com> | 2024-05-03 18:49:35 +0200 |
|---|---|---|
| committer | Dmitry Vyukov <dvyukov@google.com> | 2024-05-16 15:38:27 +0000 |
| commit | 5b8f52cdd0b790158fdbe88d0fd902d24f8a996d (patch) | |
| tree | 2827ebd34598426f41ad4c1e43c0549f29e8446a | |
| parent | 03820adaef911ce08278d95f034f134c3c0c852e (diff) | |
pkg/fuzzer: introduce a request restarter layer
Make Result statuses more elaborate.
Instead of retrying inputs directly in rpc.go, extract this logic to a
separate entity in pkg/fuzzer/queue.
| -rw-r--r-- | pkg/fuzzer/fuzzer_test.go | 2 | ||||
| -rw-r--r-- | pkg/fuzzer/job.go | 15 | ||||
| -rw-r--r-- | pkg/fuzzer/queue/queue.go | 49 | ||||
| -rw-r--r-- | pkg/fuzzer/queue/retry.go | 37 | ||||
| -rw-r--r-- | syz-manager/manager.go | 7 | ||||
| -rw-r--r-- | syz-manager/rpc.go | 43 |
6 files changed, 99 insertions, 54 deletions
diff --git a/pkg/fuzzer/fuzzer_test.go b/pkg/fuzzer/fuzzer_test.go index ab38fa783..5a903dedb 100644 --- a/pkg/fuzzer/fuzzer_test.go +++ b/pkg/fuzzer/fuzzer_test.go @@ -242,7 +242,7 @@ func (f *testFuzzer) registerExecutor(proc *executorProc) { return err } if crash != "" { - res = &queue.Result{Stop: true} + res = &queue.Result{Status: queue.Crashed} if !f.expectedCrashes[crash] { return fmt.Errorf("unexpected crash: %q", crash) } diff --git a/pkg/fuzzer/job.go b/pkg/fuzzer/job.go index 8f81ef9fa..9fd294ca0 100644 --- a/pkg/fuzzer/job.go +++ b/pkg/fuzzer/job.go @@ -164,7 +164,7 @@ func (job *triageJob) deflake(exec func(*queue.Request, ...execOpt) *queue.Resul NeedCover: true, Stat: stat, }, &dontTriage{}) - if result.Stop { + if result.Stop() { stop = true return } @@ -204,7 +204,7 @@ func (job *triageJob) minimize(newSignal signal.Signal) (stop bool) { SignalFilterCall: call1, Stat: job.fuzzer.statExecMinimize, }) - if result.Stop { + if result.Stop() { stop = true return false } @@ -273,7 +273,7 @@ func (job *smashJob) run(fuzzer *Fuzzer) { NeedSignal: queue.NewSignal, Stat: fuzzer.statExecSmash, }) - if result.Stop { + if result.Stop() { return } if fuzzer.Config.Collide { @@ -281,7 +281,7 @@ func (job *smashJob) run(fuzzer *Fuzzer) { Prog: randomCollide(p, rnd), Stat: fuzzer.statExecCollide, }) - if result.Stop { + if result.Stop() { return } } @@ -323,7 +323,7 @@ func (job *smashJob) faultInjection(fuzzer *Fuzzer) { Prog: newProg, Stat: fuzzer.statExecSmash, }) - if result.Stop { + if result.Stop() { return } info := result.Info @@ -343,6 +343,7 @@ func (job *hintsJob) run(fuzzer *Fuzzer) { // First execute the original program twice to get comparisons from KCOV. // The second execution lets us filter out flaky values, which seem to constitute ~30-40%. p := job.p + var comps prog.CompMap for i := 0; i < 2; i++ { result := fuzzer.execute(fuzzer.smashQueue, &queue.Request{ @@ -350,7 +351,7 @@ func (job *hintsJob) run(fuzzer *Fuzzer) { NeedHints: true, Stat: fuzzer.statExecSeed, }) - if result.Stop || result.Info == nil { + if result.Stop() || result.Info == nil { return } if i == 0 { @@ -373,6 +374,6 @@ func (job *hintsJob) run(fuzzer *Fuzzer) { NeedSignal: queue.NewSignal, Stat: fuzzer.statExecHint, }) - return !result.Stop + return !result.Stop() }) } diff --git a/pkg/fuzzer/queue/queue.go b/pkg/fuzzer/queue/queue.go index 00e83a69e..cb5aa134b 100644 --- a/pkg/fuzzer/queue/queue.go +++ b/pkg/fuzzer/queue/queue.go @@ -72,7 +72,7 @@ func (r *Request) Wait(ctx context.Context) *Result { r.initChannel() select { case <-ctx.Done(): - return &Result{Stop: true} + return &Result{Status: ExecFailure} case <-r.done: return r.result } @@ -95,10 +95,23 @@ const ( ) type Result struct { - Info *ipc.ProgInfo - Stop bool + Info *ipc.ProgInfo + Status Status } +func (r *Result) Stop() bool { + return r.Status == ExecFailure || r.Status == Crashed +} + +type Status int + +const ( + Success Status = iota + ExecFailure // For e.g. serialization errors. + Crashed // The VM crashed holding the request. + Restarted // The VM was restarted holding the request. +) + // Executor describes the interface wanted by the producers of requests. // After a Request is submitted, it's expected that the consumer will eventually // take it and report the execution result via Done(). @@ -157,16 +170,28 @@ func (pq *PlainQueue) Submit(req *Request) { func (pq *PlainQueue) Next() *Request { pq.mu.Lock() defer pq.mu.Unlock() - if pq.pos < len(pq.queue) { - ret := pq.queue[pq.pos] - pq.queue[pq.pos] = nil - pq.pos++ - if pq.stat != nil { - pq.stat.Add(-1) - } - return ret + return pq.nextLocked() +} + +func (pq *PlainQueue) tryNext() *Request { + if !pq.mu.TryLock() { + return nil } - return nil + defer pq.mu.Unlock() + return pq.nextLocked() +} + +func (pq *PlainQueue) nextLocked() *Request { + if pq.pos == len(pq.queue) { + return nil + } + ret := pq.queue[pq.pos] + pq.queue[pq.pos] = nil + pq.pos++ + if pq.stat != nil { + pq.stat.Add(-1) + } + return ret } // Order combines several different sources in a particular order. diff --git a/pkg/fuzzer/queue/retry.go b/pkg/fuzzer/queue/retry.go new file mode 100644 index 000000000..0b2e02ba5 --- /dev/null +++ b/pkg/fuzzer/queue/retry.go @@ -0,0 +1,37 @@ +// Copyright 2024 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package queue + +type retryer struct { + pq *PlainQueue + base Source +} + +// Retry adds a layer that resends results with Status=Restarted. +func Retry(base Source) Source { + return &retryer{ + base: base, + pq: Plain(), + } +} + +func (r *retryer) Next() *Request { + req := r.pq.tryNext() + if req == nil { + req = r.base.Next() + } + if req != nil { + req.OnDone(r.done) + } + return req +} + +func (r *retryer) done(req *Request, res *Result) bool { + // The input was on a restarted VM. + if res.Status == Restarted { + r.pq.Submit(req) + return false + } + return true +} diff --git a/syz-manager/manager.go b/syz-manager/manager.go index c379bd3ee..864a3676f 100644 --- a/syz-manager/manager.go +++ b/syz-manager/manager.go @@ -26,6 +26,7 @@ import ( "github.com/google/syzkaller/pkg/db" "github.com/google/syzkaller/pkg/flatrpc" "github.com/google/syzkaller/pkg/fuzzer" + "github.com/google/syzkaller/pkg/fuzzer/queue" "github.com/google/syzkaller/pkg/gce" "github.com/google/syzkaller/pkg/hash" "github.com/google/syzkaller/pkg/instance" @@ -73,6 +74,7 @@ type Manager struct { mu sync.Mutex fuzzer atomic.Pointer[fuzzer.Fuzzer] + execSource atomic.Value // queue.Source phase int targetEnabledSyscalls map[*prog.Syscall]bool @@ -1374,6 +1376,7 @@ func (mgr *Manager) machineChecked(features flatrpc.Feature, enabledSyscalls map }, }, rnd, mgr.target) mgr.fuzzer.Store(fuzzerObj) + mgr.execSource.Store(queue.Retry(fuzzerObj)) mgr.loadCorpus() mgr.firstConnect.Store(time.Now().Unix()) @@ -1396,8 +1399,8 @@ func (mgr *Manager) corpusMinimization() { } // We need this method since we're not supposed to access Manager fields from RPCServer. -func (mgr *Manager) getFuzzer() *fuzzer.Fuzzer { - return mgr.fuzzer.Load() +func (mgr *Manager) getExecSource() queue.Source { + return mgr.execSource.Load().(queue.Source) } func (mgr *Manager) fuzzerSignalRotation() { diff --git a/syz-manager/rpc.go b/syz-manager/rpc.go index bcfc53991..4e95849b0 100644 --- a/syz-manager/rpc.go +++ b/syz-manager/rpc.go @@ -15,7 +15,6 @@ import ( "github.com/google/syzkaller/pkg/cover" "github.com/google/syzkaller/pkg/flatrpc" - "github.com/google/syzkaller/pkg/fuzzer" "github.com/google/syzkaller/pkg/fuzzer/queue" "github.com/google/syzkaller/pkg/ipc" "github.com/google/syzkaller/pkg/log" @@ -54,10 +53,6 @@ type RPCServer struct { mu sync.Mutex runners sync.Map // Instead of map[string]*Runner. - // We did not finish these requests because of VM restarts. - // They will be eventually given to other VMs. - rescuedInputs []*queue.Request - statExecs *stats.Val statExecRetries *stats.Val statExecutorRestarts *stats.Val @@ -103,7 +98,7 @@ type BugFrames struct { type RPCManagerView interface { currentBugFrames() BugFrames machineChecked(features flatrpc.Feature, enabledSyscalls map[*prog.Syscall]bool) - getFuzzer() *fuzzer.Fuzzer + getExecSource() queue.Source } func startRPCServer(mgr *Manager) (*RPCServer, error) { @@ -364,13 +359,17 @@ func (serv *RPCServer) ExchangeInfo(a *rpctype.ExchangeInfoRequest, r *rpctype.E return nil } - fuzzerObj := serv.mgr.getFuzzer() - if fuzzerObj == nil { + source := serv.mgr.getExecSource() + if source == nil { // ExchangeInfo calls follow MachineCheck, so the fuzzer must have been initialized. panic("exchange info call with nil fuzzer") } - appendRequest := func(inp *queue.Request) { + // First query new inputs and only then post results. + // It should foster a more even distribution of executions + // across all VMs. + for len(r.Requests) < a.NeedProgs { + inp := source.Next() if req, ok := serv.newRequest(runner, inp); ok { r.Requests = append(r.Requests, req) } else { @@ -378,27 +377,8 @@ func (serv *RPCServer) ExchangeInfo(a *rpctype.ExchangeInfoRequest, r *rpctype.E // but so far we don't have a better handling than counting this. // This error is observed a lot on the seeded syz_mount_image calls. serv.statExecBufferTooSmall.Add(1) - inp.Done(&queue.Result{Stop: true}) - } - } - - // Try to collect some of the postponed requests. - if serv.mu.TryLock() { - for len(serv.rescuedInputs) != 0 && len(r.Requests) < a.NeedProgs { - last := len(serv.rescuedInputs) - 1 - inp := serv.rescuedInputs[last] - serv.rescuedInputs[last] = nil - serv.rescuedInputs = serv.rescuedInputs[:last] - appendRequest(inp) + inp.Done(&queue.Result{Status: queue.ExecFailure}) } - serv.mu.Unlock() - } - - // First query new inputs and only then post results. - // It should foster a more even distribution of executions - // across all VMs. - for len(r.Requests) < a.NeedProgs { - appendRequest(fuzzerObj.Next()) } for _, result := range a.Results { @@ -485,10 +465,9 @@ func (serv *RPCServer) shutdownInstance(name string, crashed bool) []byte { } for _, req := range oldRequests { if crashed && req.try >= 0 { - req.req.Done(&queue.Result{Stop: true}) + req.req.Done(&queue.Result{Status: queue.Crashed}) } else { - // We will resend these inputs to another VM. - serv.rescuedInputs = append(serv.rescuedInputs, req.req) + req.req.Done(&queue.Result{Status: queue.Restarted}) } } return runner.machineInfo |
