aboutsummaryrefslogtreecommitdiffstats
path: root/pkg/fuzzer/queue/queue.go
diff options
context:
space:
mode:
authorAleksandr Nogikh <nogikh@google.com>2024-05-07 17:04:45 +0200
committerDmitry Vyukov <dvyukov@google.com>2024-05-16 15:38:27 +0000
commitb6954dce2f21b8feb1448edaaeeefc22f5ff4944 (patch)
tree6f4358ba6609826b614847707e180662d986f98e /pkg/fuzzer/queue/queue.go
parentf694ecdc179cf43429135188934eed687ae28645 (diff)
pkg/vminfo: run programs interactively
Use the same interfaces as the fuzzer. Now syz-manager no longer needs to treat machine check executions differently.
Diffstat (limited to 'pkg/fuzzer/queue/queue.go')
-rw-r--r--pkg/fuzzer/queue/queue.go121
1 files changed, 121 insertions, 0 deletions
diff --git a/pkg/fuzzer/queue/queue.go b/pkg/fuzzer/queue/queue.go
index cb5aa134b..0c1735243 100644
--- a/pkg/fuzzer/queue/queue.go
+++ b/pkg/fuzzer/queue/queue.go
@@ -4,10 +4,14 @@
package queue
import (
+ "bytes"
"context"
+ "encoding/gob"
+ "fmt"
"sync"
"sync/atomic"
+ "github.com/google/syzkaller/pkg/hash"
"github.com/google/syzkaller/pkg/ipc"
"github.com/google/syzkaller/pkg/signal"
"github.com/google/syzkaller/pkg/stats"
@@ -19,11 +23,16 @@ type Request struct {
NeedSignal SignalType
NeedCover bool
NeedHints bool
+ ExecOpts *ipc.ExecOpts
+
// If specified, the resulting signal for call SignalFilterCall
// will include subset of it even if it's not new.
SignalFilter signal.Signal
SignalFilterCall int
+ ReturnError bool
+ ReturnOutput bool
+
// This stat will be incremented on request completion.
Stat *stats.Val
@@ -78,6 +87,22 @@ func (r *Request) Wait(ctx context.Context) *Result {
}
}
+func (r *Request) hash() hash.Sig {
+ buf := new(bytes.Buffer)
+ if r.ExecOpts != nil {
+ if err := gob.NewEncoder(buf).Encode(r.ExecOpts); err != nil {
+ panic(err)
+ }
+ }
+ return hash.Hash(
+ []byte(fmt.Sprint(r.NeedSignal)),
+ []byte(fmt.Sprint(r.NeedCover)),
+ []byte(fmt.Sprint(r.NeedHints)),
+ r.Prog.Serialize(),
+ buf.Bytes(),
+ )
+}
+
func (r *Request) initChannel() {
r.mu.Lock()
if r.done == nil {
@@ -96,7 +121,17 @@ const (
type Result struct {
Info *ipc.ProgInfo
+ Output []byte
Status Status
+ Error string // More details in case of ExecFailure.
+}
+
+func (r *Result) clone() *Result {
+ ret := *r
+ if ret.Info != nil {
+ ret.Info = ret.Info.Clone()
+ }
+ return &ret
}
func (r *Result) Stop() bool {
@@ -293,3 +328,89 @@ func (pq *PriorityQueue) Next() *Request {
defer pq.mu.Unlock()
return pq.ops.Pop()
}
+
+type DynamicSource struct {
+ value atomic.Pointer[wrapSource]
+}
+
+type wrapSource struct {
+ source Source
+}
+
+func (ds *DynamicSource) Store(source Source) {
+ ds.value.Store(&wrapSource{source})
+}
+
+func (ds *DynamicSource) Next() *Request {
+ val := ds.value.Load()
+ if val == nil || val.source == nil {
+ return nil
+ }
+ return val.source.Next()
+}
+
+// Deduplicator() keeps track of the previously run requests to avoid re-running them.
+type Deduplicator struct {
+ mu sync.Mutex
+ ctx context.Context
+ source Source
+ mm map[hash.Sig]*duplicateState
+}
+
+type duplicateState struct {
+ res *Result
+ queued []*Request // duplicate requests waiting for the result.
+}
+
+func Deduplicate(ctx context.Context, source Source) Source {
+ return &Deduplicator{
+ ctx: ctx,
+ source: source,
+ mm: map[hash.Sig]*duplicateState{},
+ }
+}
+
+func (d *Deduplicator) Next() *Request {
+ for {
+ req := d.source.Next()
+ if req == nil {
+ return nil
+ }
+ hash := req.hash()
+ d.mu.Lock()
+ entry, ok := d.mm[hash]
+ if !ok {
+ d.mm[hash] = &duplicateState{}
+ } else if entry.res == nil {
+ // There's no result yet, put the request to the queue.
+ entry.queued = append(entry.queued, req)
+ } else {
+ // We already know the result.
+ req.Done(entry.res.clone())
+ }
+ d.mu.Unlock()
+ if !ok {
+ // This is the first time we see such a request.
+ req.OnDone(d.onDone)
+ return req
+ }
+ }
+}
+
+func (d *Deduplicator) onDone(req *Request, res *Result) bool {
+ hash := req.hash()
+ clonedRes := res.clone()
+
+ d.mu.Lock()
+ entry := d.mm[hash]
+ queued := entry.queued
+ entry.queued = nil
+ entry.res = clonedRes
+ d.mu.Unlock()
+
+ // Broadcast the result.
+ for _, waitingReq := range queued {
+ waitingReq.Done(res.clone())
+ }
+ return true
+}