aboutsummaryrefslogtreecommitdiffstats
path: root/vm/dispatcher/pool_test.go
diff options
context:
space:
mode:
authorAleksandr Nogikh <nogikh@google.com>2025-04-23 12:32:03 +0200
committerAleksandr Nogikh <nogikh@google.com>2025-04-23 15:16:19 +0000
commit73a168d010b3ba0a82f850b9fe73e6907539ff20 (patch)
treed2142cc472c769438429d694020ae5dc07284c9a /vm/dispatcher/pool_test.go
parentd971f7e21bf575c68223c77d5bcb784ac4912aa1 (diff)
vm/dispatcher: make pool.Run cancellable
Make the pool.Run() function take a context.Context to be able to abort the callback passed to it or abort its scheduling if it's not yet running. Otherwise, if the callback is not yet started and the pool's Loop is aborted, we risk waiting for pool.Run() forever. It prevents the normal shutdown of repro.Run() and, consequently, the DiffFuzzer functionality.
Diffstat (limited to 'vm/dispatcher/pool_test.go')
-rw-r--r--vm/dispatcher/pool_test.go61
1 files changed, 55 insertions, 6 deletions
diff --git a/vm/dispatcher/pool_test.go b/vm/dispatcher/pool_test.go
index 452cd598a..cd85367c2 100644
--- a/vm/dispatcher/pool_test.go
+++ b/vm/dispatcher/pool_test.go
@@ -6,6 +6,7 @@ package dispatcher
import (
"context"
"runtime"
+ "sync"
"sync/atomic"
"testing"
"time"
@@ -87,7 +88,7 @@ func TestPoolSplit(t *testing.T) {
case <-stopRuns:
}
}
- go mgr.Run(job)
+ go mgr.Run(ctx, job)
// So far, there are no reserved instances.
for i := 0; i < count; i++ {
@@ -113,7 +114,7 @@ func TestPoolSplit(t *testing.T) {
// Now let's create and finish more jobs.
for i := 0; i < 10; i++ {
- go mgr.Run(job)
+ go mgr.Run(ctx, job)
}
mgr.ReserveForRun(2)
for i := 0; i < 10; i++ {
@@ -150,8 +151,7 @@ func TestPoolStress(t *testing.T) {
}
}()
for i := 0; i < 128; i++ {
- go mgr.Run(func(ctx context.Context, _ *nilInstance, _ UpdateInfo) {
- })
+ go mgr.Run(ctx, func(ctx context.Context, _ *nilInstance, _ UpdateInfo) {})
mgr.ReserveForRun(5 + i%5)
}
@@ -221,7 +221,7 @@ func TestPoolPause(t *testing.T) {
}()
run := make(chan bool, 1)
- go mgr.Run(func(ctx context.Context, _ *nilInstance, _ UpdateInfo) {
+ go mgr.Run(ctx, func(ctx context.Context, _ *nilInstance, _ UpdateInfo) {
run <- true
})
time.Sleep(10 * time.Millisecond)
@@ -231,12 +231,61 @@ func TestPoolPause(t *testing.T) {
mgr.TogglePause(false)
<-run
- mgr.Run(func(ctx context.Context, _ *nilInstance, _ UpdateInfo) {})
+ mgr.Run(ctx, func(ctx context.Context, _ *nilInstance, _ UpdateInfo) {})
cancel()
<-done
}
+func TestPoolCancelRun(t *testing.T) {
+ // The test to aid the race detector.
+ mgr := NewPool[*nilInstance](
+ 10,
+ func(idx int) (*nilInstance, error) {
+ return &nilInstance{}, nil
+ },
+ func(ctx context.Context, _ *nilInstance, _ UpdateInfo) {
+ <-ctx.Done()
+ },
+ )
+ var wg sync.WaitGroup
+ wg.Add(1)
+ ctx, cancel := context.WithCancel(context.Background())
+ go func() {
+ mgr.Loop(ctx)
+ wg.Done()
+ }()
+
+ mgr.ReserveForRun(2)
+
+ started := make(chan struct{})
+ // Schedule more jobs than could be processed simultaneously.
+ for i := 0; i < 15; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ mgr.Run(ctx, func(ctx context.Context, _ *nilInstance, _ UpdateInfo) {
+ select {
+ case <-ctx.Done():
+ return
+ case started <- struct{}{}:
+ }
+ <-ctx.Done()
+ })
+ }()
+ }
+
+ // Two can be started.
+ <-started
+ <-started
+
+ // Now stop the loop and the jbos.
+ cancel()
+
+ // Everything must really stop.
+ wg.Wait()
+}
+
func makePool(count int) []testInstance {
var ret []testInstance
for i := 0; i < count; i++ {