aboutsummaryrefslogtreecommitdiffstats
path: root/vm/dispatcher/pool.go
diff options
context:
space:
mode:
authorAleksandr Nogikh <nogikh@google.com>2025-04-23 12:32:03 +0200
committerAleksandr Nogikh <nogikh@google.com>2025-04-23 15:16:19 +0000
commit73a168d010b3ba0a82f850b9fe73e6907539ff20 (patch)
treed2142cc472c769438429d694020ae5dc07284c9a /vm/dispatcher/pool.go
parentd971f7e21bf575c68223c77d5bcb784ac4912aa1 (diff)
vm/dispatcher: make pool.Run cancellable
Make the pool.Run() function take a context.Context to be able to abort the callback passed to it or abort its scheduling if it's not yet running. Otherwise, if the callback is not yet started and the pool's Loop is aborted, we risk waiting for pool.Run() forever. It prevents the normal shutdown of repro.Run() and, consequently, the DiffFuzzer functionality.
Diffstat (limited to 'vm/dispatcher/pool.go')
-rw-r--r--vm/dispatcher/pool.go37
1 files changed, 30 insertions, 7 deletions
diff --git a/vm/dispatcher/pool.go b/vm/dispatcher/pool.go
index 331bfd062..20e893499 100644
--- a/vm/dispatcher/pool.go
+++ b/vm/dispatcher/pool.go
@@ -114,7 +114,7 @@ func (p *Pool[T]) Loop(ctx context.Context) {
func (p *Pool[T]) runInstance(ctx context.Context, inst *poolInstance[T]) {
p.waitUnpaused()
ctx, cancel := context.WithCancel(ctx)
-
+ defer cancel()
log.Logf(2, "pool: booting instance %d", inst.idx)
inst.reset(cancel)
@@ -187,13 +187,24 @@ func (p *Pool[T]) ReserveForRun(count int) {
}
// Run blocks until it has found an instance to execute job and until job has finished.
-func (p *Pool[T]) Run(job Runner[T]) {
- done := make(chan struct{})
- p.jobs <- func(ctx context.Context, inst T, upd UpdateInfo) {
- job(ctx, inst, upd)
- close(done)
+// Returns an error if the job was aborted by cancelling the context.
+func (p *Pool[T]) Run(ctx context.Context, job Runner[T]) error {
+ done := make(chan error)
+ // Submit the job.
+ select {
+ case p.jobs <- func(jobCtx context.Context, inst T, upd UpdateInfo) {
+ mergedCtx, cancel := mergeContextCancel(jobCtx, ctx)
+ defer cancel()
+
+ job(mergedCtx, inst, upd)
+ done <- mergedCtx.Err()
+ }:
+ case <-ctx.Done():
+ // If the loop is aborted, no one is going to pick up the job.
+ return ctx.Err()
}
- <-done
+ // Await the job.
+ return <-done
}
func (p *Pool[T]) Total() int {
@@ -311,3 +322,15 @@ func (pi *poolInstance[T]) free(job Runner[T]) {
default:
}
}
+
+func mergeContextCancel(main, monitor context.Context) (context.Context, func()) {
+ withCancel, cancel := context.WithCancel(main)
+ go func() {
+ select {
+ case <-withCancel.Done():
+ case <-monitor.Done():
+ }
+ cancel()
+ }()
+ return withCancel, cancel
+}