diff options
| author | Aleksandr Nogikh <nogikh@google.com> | 2024-05-03 13:12:00 +0200 |
|---|---|---|
| committer | Dmitry Vyukov <dvyukov@google.com> | 2024-05-16 15:38:27 +0000 |
| commit | 03820adaef911ce08278d95f034f134c3c0c852e (patch) | |
| tree | 57f87ce0f3dedda459fb1771d3b79ff96e0853bb /pkg/fuzzer/queue/queue.go | |
| parent | ef5d53ed7e3c7d30481a88301f680e37a5cc4775 (diff) | |
pkg/fuzzer: use queue layers
Instead of relying on a fuzzer-internal priority queue, utilize
stackable layers of request-generating steps.
Move the functionality to a separate pkg/fuzzer/queue package.
The pkg/fuzzer/queue package can be reused to add extra processing
layers on top of the fuzzing and to combine machine checking and fuzzing
execution pipelines.
Diffstat (limited to 'pkg/fuzzer/queue/queue.go')
| -rw-r--r-- | pkg/fuzzer/queue/queue.go | 270 |
1 files changed, 270 insertions, 0 deletions
diff --git a/pkg/fuzzer/queue/queue.go b/pkg/fuzzer/queue/queue.go new file mode 100644 index 000000000..00e83a69e --- /dev/null +++ b/pkg/fuzzer/queue/queue.go @@ -0,0 +1,270 @@ +// Copyright 2024 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package queue + +import ( + "context" + "sync" + "sync/atomic" + + "github.com/google/syzkaller/pkg/ipc" + "github.com/google/syzkaller/pkg/signal" + "github.com/google/syzkaller/pkg/stats" + "github.com/google/syzkaller/prog" +) + +type Request struct { + Prog *prog.Prog + NeedSignal SignalType + NeedCover bool + NeedHints bool + // If specified, the resulting signal for call SignalFilterCall + // will include subset of it even if it's not new. + SignalFilter signal.Signal + SignalFilterCall int + + // This stat will be incremented on request completion. + Stat *stats.Val + + // The callback will be called on request completion in the LIFO order. + // If it returns false, all further processing will be stopped. + // It allows wrappers to intercept Done() requests. + callback DoneCallback + + mu sync.Mutex + result *Result + done chan struct{} +} + +type DoneCallback func(*Request, *Result) bool + +func (r *Request) OnDone(cb DoneCallback) { + oldCallback := r.callback + r.callback = func(req *Request, res *Result) bool { + r.callback = oldCallback + if !cb(req, res) { + return false + } + if oldCallback == nil { + return true + } + return oldCallback(req, res) + } +} + +func (r *Request) Done(res *Result) { + if r.callback != nil { + if !r.callback(r, res) { + return + } + } + if r.Stat != nil { + r.Stat.Add(1) + } + r.initChannel() + r.result = res + close(r.done) +} + +// Wait() blocks until we have the result. +func (r *Request) Wait(ctx context.Context) *Result { + r.initChannel() + select { + case <-ctx.Done(): + return &Result{Stop: true} + case <-r.done: + return r.result + } +} + +func (r *Request) initChannel() { + r.mu.Lock() + if r.done == nil { + r.done = make(chan struct{}) + } + r.mu.Unlock() +} + +type SignalType int + +const ( + NoSignal SignalType = iota // we don't need any signal + NewSignal // we need the newly seen signal + AllSignal // we need all signal +) + +type Result struct { + Info *ipc.ProgInfo + Stop bool +} + +// Executor describes the interface wanted by the producers of requests. +// After a Request is submitted, it's expected that the consumer will eventually +// take it and report the execution result via Done(). +type Executor interface { + Submit(req *Request) +} + +// Source describes the interface wanted by the consumers of requests. +type Source interface { + Next() *Request +} + +// PlainQueue is a straighforward thread-safe Request queue implementation. +type PlainQueue struct { + stat *stats.Val + mu sync.Mutex + queue []*Request + pos int +} + +func Plain() *PlainQueue { + return &PlainQueue{} +} + +func PlainWithStat(val *stats.Val) *PlainQueue { + return &PlainQueue{stat: val} +} + +func (pq *PlainQueue) Len() int { + pq.mu.Lock() + defer pq.mu.Unlock() + return len(pq.queue) - pq.pos +} + +func (pq *PlainQueue) Submit(req *Request) { + if pq.stat != nil { + pq.stat.Add(1) + } + pq.mu.Lock() + defer pq.mu.Unlock() + + // It doesn't make sense to compact the queue too often. + const minSizeToCompact = 128 + if pq.pos > len(pq.queue)/2 && len(pq.queue) >= minSizeToCompact { + copy(pq.queue, pq.queue[pq.pos:]) + for pq.pos > 0 { + newLen := len(pq.queue) - 1 + pq.queue[newLen] = nil + pq.queue = pq.queue[:newLen] + pq.pos-- + } + } + pq.queue = append(pq.queue, req) +} + +func (pq *PlainQueue) Next() *Request { + pq.mu.Lock() + defer pq.mu.Unlock() + if pq.pos < len(pq.queue) { + ret := pq.queue[pq.pos] + pq.queue[pq.pos] = nil + pq.pos++ + if pq.stat != nil { + pq.stat.Add(-1) + } + return ret + } + return nil +} + +// Order combines several different sources in a particular order. +type orderImpl struct { + sources []Source +} + +func Order(sources ...Source) Source { + return &orderImpl{sources: sources} +} + +func (o *orderImpl) Next() *Request { + for _, s := range o.sources { + req := s.Next() + if req != nil { + return req + } + } + return nil +} + +type callback struct { + cb func() *Request +} + +// Callback produces a source that calls the callback to serve every Next() request. +func Callback(cb func() *Request) Source { + return &callback{cb} +} + +func (cb *callback) Next() *Request { + return cb.cb() +} + +type alternate struct { + base Source + nth int + seq atomic.Int64 +} + +// Alternate proxies base, but returns nil every nth Next() call. +func Alternate(base Source, nth int) Source { + return &alternate{ + base: base, + nth: nth, + } +} + +func (a *alternate) Next() *Request { + if a.seq.Add(1)%int64(a.nth) == 0 { + return nil + } + return a.base.Next() +} + +type PriorityQueue struct { + mu *sync.Mutex + ops *priorityQueueOps[*Request] + currPrio priority +} + +func Priority() *PriorityQueue { + return &PriorityQueue{ + mu: &sync.Mutex{}, + ops: &priorityQueueOps[*Request]{}, + currPrio: priority{0}, + } +} + +// AppendQueue() can be used to form nested queues. +// That is, if +// q1 := pq.AppendQueue() +// q2 := pq.AppendQueue() +// All elements added via q2.Submit() will always have a *lower* priority +// than all elements added via q1.Submit(). +func (pq *PriorityQueue) AppendQueue() *PriorityQueue { + pq.mu.Lock() + defer pq.mu.Unlock() + pq.currPrio = pq.currPrio.next() + nextPrio := append(priority{}, pq.currPrio...) + return &PriorityQueue{ + // We use the same queue, therefore the same mutex. + mu: pq.mu, + ops: pq.ops, + currPrio: append(nextPrio, 0), + } +} + +// Each subsequent element added via Submit() will have a lower priority. +func (pq *PriorityQueue) Submit(req *Request) { + pq.mu.Lock() + defer pq.mu.Unlock() + pq.currPrio = pq.currPrio.next() + pq.ops.Push(req, pq.currPrio) +} + +func (pq *PriorityQueue) Next() *Request { + pq.mu.Lock() + defer pq.mu.Unlock() + return pq.ops.Pop() +} |
