aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--pkg/manager/diff.go6
-rw-r--r--pkg/repro/repro.go5
-rw-r--r--pkg/repro/strace.go6
-rw-r--r--vm/dispatcher/pool.go37
-rw-r--r--vm/dispatcher/pool_test.go61
5 files changed, 98 insertions, 17 deletions
diff --git a/pkg/manager/diff.go b/pkg/manager/diff.go
index 379fd246c..d7860cc9a 100644
--- a/pkg/manager/diff.go
+++ b/pkg/manager/diff.go
@@ -6,6 +6,7 @@ package manager
import (
"context"
"encoding/json"
+ "errors"
"fmt"
"math/rand"
"net"
@@ -583,7 +584,7 @@ func (rr *reproRunner) Run(ctx context.Context, r *repro.Result) {
// The third time we leave it as is in case it was important.
opts.Threaded = true
}
- pool.Run(func(ctx context.Context, inst *vm.Instance, updInfo dispatcher.UpdateInfo) {
+ runErr := pool.Run(ctx, func(ctx context.Context, inst *vm.Instance, updInfo dispatcher.UpdateInfo) {
var ret *instance.ExecProgInstance
ret, err = instance.SetupExecProg(inst, rr.kernel.cfg, rr.kernel.reporter, nil)
if err != nil {
@@ -595,6 +596,9 @@ func (rr *reproRunner) Run(ctx context.Context, r *repro.Result) {
Opts: opts,
})
})
+ if errors.Is(runErr, context.Canceled) {
+ break
+ }
crashed := result != nil && result.Report != nil
log.Logf(1, "attempt #%d to run %q on base: crashed=%v", i, ret.origReport.Title, crashed)
if crashed {
diff --git a/pkg/repro/repro.go b/pkg/repro/repro.go
index c196f71c7..1b3a70246 100644
--- a/pkg/repro/repro.go
+++ b/pkg/repro/repro.go
@@ -767,7 +767,7 @@ func (pw *poolWrapper) Run(ctx context.Context, params instance.ExecParams,
var result *instance.RunResult
var err error
- pw.pool.Run(func(ctx context.Context, inst *vm.Instance, updInfo dispatcher.UpdateInfo) {
+ runErr := pw.pool.Run(ctx, func(ctx context.Context, inst *vm.Instance, updInfo dispatcher.UpdateInfo) {
updInfo(func(info *dispatcher.Info) {
typ := "syz"
if params.CProg != nil {
@@ -787,6 +787,9 @@ func (pw *poolWrapper) Run(ctx context.Context, params instance.ExecParams,
result, err = ret.RunSyzProg(params)
}
})
+ if runErr != nil {
+ return nil, runErr
+ }
return result, err
}
diff --git a/pkg/repro/strace.go b/pkg/repro/strace.go
index e101945cf..ceb31de93 100644
--- a/pkg/repro/strace.go
+++ b/pkg/repro/strace.go
@@ -31,7 +31,7 @@ func RunStrace(result *Result, cfg *mgrconfig.Config, reporter *report.Reporter,
}
var runRes *instance.RunResult
var err error
- pool.Run(func(ctx context.Context, inst *vm.Instance, updInfo dispatcher.UpdateInfo) {
+ runErr := pool.Run(context.Background(), func(ctx context.Context, inst *vm.Instance, updInfo dispatcher.UpdateInfo) {
updInfo(func(info *dispatcher.Info) {
info.Status = "running strace"
})
@@ -58,7 +58,9 @@ func RunStrace(result *Result, cfg *mgrconfig.Config, reporter *report.Reporter,
runRes, err = ret.RunSyzProg(params)
}
})
- if err != nil {
+ if runErr != nil {
+ return straceFailed(runErr)
+ } else if err != nil {
return straceFailed(err)
}
return &StraceResult{
diff --git a/vm/dispatcher/pool.go b/vm/dispatcher/pool.go
index 331bfd062..20e893499 100644
--- a/vm/dispatcher/pool.go
+++ b/vm/dispatcher/pool.go
@@ -114,7 +114,7 @@ func (p *Pool[T]) Loop(ctx context.Context) {
func (p *Pool[T]) runInstance(ctx context.Context, inst *poolInstance[T]) {
p.waitUnpaused()
ctx, cancel := context.WithCancel(ctx)
-
+ defer cancel()
log.Logf(2, "pool: booting instance %d", inst.idx)
inst.reset(cancel)
@@ -187,13 +187,24 @@ func (p *Pool[T]) ReserveForRun(count int) {
}
// Run blocks until it has found an instance to execute job and until job has finished.
-func (p *Pool[T]) Run(job Runner[T]) {
- done := make(chan struct{})
- p.jobs <- func(ctx context.Context, inst T, upd UpdateInfo) {
- job(ctx, inst, upd)
- close(done)
+// Returns an error if the job was aborted by cancelling the context.
+func (p *Pool[T]) Run(ctx context.Context, job Runner[T]) error {
+ done := make(chan error)
+ // Submit the job.
+ select {
+ case p.jobs <- func(jobCtx context.Context, inst T, upd UpdateInfo) {
+ mergedCtx, cancel := mergeContextCancel(jobCtx, ctx)
+ defer cancel()
+
+ job(mergedCtx, inst, upd)
+ done <- mergedCtx.Err()
+ }:
+ case <-ctx.Done():
+ // If the loop is aborted, no one is going to pick up the job.
+ return ctx.Err()
}
- <-done
+ // Await the job.
+ return <-done
}
func (p *Pool[T]) Total() int {
@@ -311,3 +322,15 @@ func (pi *poolInstance[T]) free(job Runner[T]) {
default:
}
}
+
+func mergeContextCancel(main, monitor context.Context) (context.Context, func()) {
+ withCancel, cancel := context.WithCancel(main)
+ go func() {
+ select {
+ case <-withCancel.Done():
+ case <-monitor.Done():
+ }
+ cancel()
+ }()
+ return withCancel, cancel
+}
diff --git a/vm/dispatcher/pool_test.go b/vm/dispatcher/pool_test.go
index 452cd598a..cd85367c2 100644
--- a/vm/dispatcher/pool_test.go
+++ b/vm/dispatcher/pool_test.go
@@ -6,6 +6,7 @@ package dispatcher
import (
"context"
"runtime"
+ "sync"
"sync/atomic"
"testing"
"time"
@@ -87,7 +88,7 @@ func TestPoolSplit(t *testing.T) {
case <-stopRuns:
}
}
- go mgr.Run(job)
+ go mgr.Run(ctx, job)
// So far, there are no reserved instances.
for i := 0; i < count; i++ {
@@ -113,7 +114,7 @@ func TestPoolSplit(t *testing.T) {
// Now let's create and finish more jobs.
for i := 0; i < 10; i++ {
- go mgr.Run(job)
+ go mgr.Run(ctx, job)
}
mgr.ReserveForRun(2)
for i := 0; i < 10; i++ {
@@ -150,8 +151,7 @@ func TestPoolStress(t *testing.T) {
}
}()
for i := 0; i < 128; i++ {
- go mgr.Run(func(ctx context.Context, _ *nilInstance, _ UpdateInfo) {
- })
+ go mgr.Run(ctx, func(ctx context.Context, _ *nilInstance, _ UpdateInfo) {})
mgr.ReserveForRun(5 + i%5)
}
@@ -221,7 +221,7 @@ func TestPoolPause(t *testing.T) {
}()
run := make(chan bool, 1)
- go mgr.Run(func(ctx context.Context, _ *nilInstance, _ UpdateInfo) {
+ go mgr.Run(ctx, func(ctx context.Context, _ *nilInstance, _ UpdateInfo) {
run <- true
})
time.Sleep(10 * time.Millisecond)
@@ -231,12 +231,61 @@ func TestPoolPause(t *testing.T) {
mgr.TogglePause(false)
<-run
- mgr.Run(func(ctx context.Context, _ *nilInstance, _ UpdateInfo) {})
+ mgr.Run(ctx, func(ctx context.Context, _ *nilInstance, _ UpdateInfo) {})
cancel()
<-done
}
+func TestPoolCancelRun(t *testing.T) {
+ // The test to aid the race detector.
+ mgr := NewPool[*nilInstance](
+ 10,
+ func(idx int) (*nilInstance, error) {
+ return &nilInstance{}, nil
+ },
+ func(ctx context.Context, _ *nilInstance, _ UpdateInfo) {
+ <-ctx.Done()
+ },
+ )
+ var wg sync.WaitGroup
+ wg.Add(1)
+ ctx, cancel := context.WithCancel(context.Background())
+ go func() {
+ mgr.Loop(ctx)
+ wg.Done()
+ }()
+
+ mgr.ReserveForRun(2)
+
+ started := make(chan struct{})
+ // Schedule more jobs than could be processed simultaneously.
+ for i := 0; i < 15; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ mgr.Run(ctx, func(ctx context.Context, _ *nilInstance, _ UpdateInfo) {
+ select {
+ case <-ctx.Done():
+ return
+ case started <- struct{}{}:
+ }
+ <-ctx.Done()
+ })
+ }()
+ }
+
+ // Two can be started.
+ <-started
+ <-started
+
+ // Now stop the loop and the jbos.
+ cancel()
+
+ // Everything must really stop.
+ wg.Wait()
+}
+
func makePool(count int) []testInstance {
var ret []testInstance
for i := 0; i < count; i++ {