aboutsummaryrefslogtreecommitdiffstats
path: root/vm
diff options
context:
space:
mode:
authorAleksandr Nogikh <nogikh@google.com>2025-08-05 20:30:58 +0200
committerAleksandr Nogikh <nogikh@google.com>2025-08-06 09:55:49 +0000
commit81aa8cec401bfdbde1c8d695368c67352fca64ee (patch)
tree0b21cb4c41a6b8183f23885cb779e5167524d6ca /vm
parent745510422fa0fd508f681b06c98e3a74d41344c5 (diff)
vm/dispatcher: don't block on the boot error chan on exit
React on the context cancellation even if the boot error channel is blocked. Add a test that verifies this behavior. Print a log message if the channel is full.
Diffstat (limited to 'vm')
-rw-r--r--vm/dispatcher/pool.go17
-rw-r--r--vm/dispatcher/pool_test.go33
2 files changed, 48 insertions, 2 deletions
diff --git a/vm/dispatcher/pool.go b/vm/dispatcher/pool.go
index 20e893499..d3f937b13 100644
--- a/vm/dispatcher/pool.go
+++ b/vm/dispatcher/pool.go
@@ -40,6 +40,8 @@ type Pool[T Instance] struct {
paused bool
}
+const bootErrorChanCap = 16
+
func NewPool[T Instance](count int, creator CreateInstance[T], def Runner[T]) *Pool[T] {
instances := make([]*poolInstance[T], count)
for i := 0; i < count; i++ {
@@ -52,7 +54,7 @@ func NewPool[T Instance](count int, creator CreateInstance[T], def Runner[T]) *P
}
mu := new(sync.Mutex)
return &Pool[T]{
- BootErrors: make(chan error, 16),
+ BootErrors: make(chan error, bootErrorChanCap),
creator: creator,
defaultJob: def,
instances: instances,
@@ -125,7 +127,18 @@ func (p *Pool[T]) runInstance(ctx context.Context, inst *poolInstance[T]) {
obj, err := p.creator(inst.idx)
if err != nil {
- p.BootErrors <- err
+ select {
+ case p.BootErrors <- err:
+ return
+ default:
+ // Print some log message to make it visible.
+ log.Logf(0, "WARNING: boot error channel is full!")
+ }
+ select {
+ case p.BootErrors <- err:
+ case <-ctx.Done():
+ // On context cancellation, no one might be listening on the channel.
+ }
return
}
defer obj.Close()
diff --git a/vm/dispatcher/pool_test.go b/vm/dispatcher/pool_test.go
index cd85367c2..738ddfe27 100644
--- a/vm/dispatcher/pool_test.go
+++ b/vm/dispatcher/pool_test.go
@@ -5,6 +5,7 @@ package dispatcher
import (
"context"
+ "fmt"
"runtime"
"sync"
"sync/atomic"
@@ -286,6 +287,38 @@ func TestPoolCancelRun(t *testing.T) {
wg.Wait()
}
+// Check that the loop terminates even if no one reads from the boot error channel.
+func TestPoolBootErrors(t *testing.T) {
+ var failCount atomic.Int64
+
+ mgr := NewPool[*testInstance](
+ 3,
+ func(idx int) (*testInstance, error) {
+ failCount.Add(1)
+ return nil, fmt.Errorf("boot error")
+ },
+ func(ctx context.Context, _ *testInstance, _ UpdateInfo) {
+ <-ctx.Done()
+ },
+ )
+
+ done := make(chan struct{})
+ ctx, cancel := context.WithCancel(context.Background())
+ go func() {
+ mgr.Loop(ctx)
+ close(done)
+ }()
+
+ // Wait till the boot error channel saturates.
+ for failCount.Load() < bootErrorChanCap {
+ time.Sleep(10 * time.Millisecond)
+ }
+
+ // Now terminate the loop.
+ cancel()
+ <-done
+}
+
func makePool(count int) []testInstance {
var ret []testInstance
for i := 0; i < count; i++ {