aboutsummaryrefslogtreecommitdiffstats
path: root/pkg/fuzzer/queue/queue.go
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/fuzzer/queue/queue.go')
-rw-r--r--pkg/fuzzer/queue/queue.go49
1 files changed, 37 insertions, 12 deletions
diff --git a/pkg/fuzzer/queue/queue.go b/pkg/fuzzer/queue/queue.go
index 00e83a69e..cb5aa134b 100644
--- a/pkg/fuzzer/queue/queue.go
+++ b/pkg/fuzzer/queue/queue.go
@@ -72,7 +72,7 @@ func (r *Request) Wait(ctx context.Context) *Result {
r.initChannel()
select {
case <-ctx.Done():
- return &Result{Stop: true}
+ return &Result{Status: ExecFailure}
case <-r.done:
return r.result
}
@@ -95,10 +95,23 @@ const (
)
type Result struct {
- Info *ipc.ProgInfo
- Stop bool
+ Info *ipc.ProgInfo
+ Status Status
}
+func (r *Result) Stop() bool {
+ return r.Status == ExecFailure || r.Status == Crashed
+}
+
+type Status int
+
+const (
+ Success Status = iota
+ ExecFailure // For e.g. serialization errors.
+ Crashed // The VM crashed holding the request.
+ Restarted // The VM was restarted holding the request.
+)
+
// Executor describes the interface wanted by the producers of requests.
// After a Request is submitted, it's expected that the consumer will eventually
// take it and report the execution result via Done().
@@ -157,16 +170,28 @@ func (pq *PlainQueue) Submit(req *Request) {
func (pq *PlainQueue) Next() *Request {
pq.mu.Lock()
defer pq.mu.Unlock()
- if pq.pos < len(pq.queue) {
- ret := pq.queue[pq.pos]
- pq.queue[pq.pos] = nil
- pq.pos++
- if pq.stat != nil {
- pq.stat.Add(-1)
- }
- return ret
+ return pq.nextLocked()
+}
+
+func (pq *PlainQueue) tryNext() *Request {
+ if !pq.mu.TryLock() {
+ return nil
}
- return nil
+ defer pq.mu.Unlock()
+ return pq.nextLocked()
+}
+
+func (pq *PlainQueue) nextLocked() *Request {
+ if pq.pos == len(pq.queue) {
+ return nil
+ }
+ ret := pq.queue[pq.pos]
+ pq.queue[pq.pos] = nil
+ pq.pos++
+ if pq.stat != nil {
+ pq.stat.Add(-1)
+ }
+ return ret
}
// Order combines several different sources in a particular order.