diff options
| author | Aleksandr Nogikh <nogikh@google.com> | 2025-08-05 20:30:58 +0200 |
|---|---|---|
| committer | Aleksandr Nogikh <nogikh@google.com> | 2025-08-06 09:55:49 +0000 |
| commit | 81aa8cec401bfdbde1c8d695368c67352fca64ee (patch) | |
| tree | 0b21cb4c41a6b8183f23885cb779e5167524d6ca /vm | |
| parent | 745510422fa0fd508f681b06c98e3a74d41344c5 (diff) | |
vm/dispatcher: don't block on the boot error chan on exit
React on the context cancellation even if the boot error channel is
blocked.
Add a test that verifies this behavior.
Print a log message if the channel is full.
Diffstat (limited to 'vm')
| -rw-r--r-- | vm/dispatcher/pool.go | 17 | ||||
| -rw-r--r-- | vm/dispatcher/pool_test.go | 33 |
2 files changed, 48 insertions, 2 deletions
diff --git a/vm/dispatcher/pool.go b/vm/dispatcher/pool.go index 20e893499..d3f937b13 100644 --- a/vm/dispatcher/pool.go +++ b/vm/dispatcher/pool.go @@ -40,6 +40,8 @@ type Pool[T Instance] struct { paused bool } +const bootErrorChanCap = 16 + func NewPool[T Instance](count int, creator CreateInstance[T], def Runner[T]) *Pool[T] { instances := make([]*poolInstance[T], count) for i := 0; i < count; i++ { @@ -52,7 +54,7 @@ func NewPool[T Instance](count int, creator CreateInstance[T], def Runner[T]) *P } mu := new(sync.Mutex) return &Pool[T]{ - BootErrors: make(chan error, 16), + BootErrors: make(chan error, bootErrorChanCap), creator: creator, defaultJob: def, instances: instances, @@ -125,7 +127,18 @@ func (p *Pool[T]) runInstance(ctx context.Context, inst *poolInstance[T]) { obj, err := p.creator(inst.idx) if err != nil { - p.BootErrors <- err + select { + case p.BootErrors <- err: + return + default: + // Print some log message to make it visible. + log.Logf(0, "WARNING: boot error channel is full!") + } + select { + case p.BootErrors <- err: + case <-ctx.Done(): + // On context cancellation, no one might be listening on the channel. + } return } defer obj.Close() diff --git a/vm/dispatcher/pool_test.go b/vm/dispatcher/pool_test.go index cd85367c2..738ddfe27 100644 --- a/vm/dispatcher/pool_test.go +++ b/vm/dispatcher/pool_test.go @@ -5,6 +5,7 @@ package dispatcher import ( "context" + "fmt" "runtime" "sync" "sync/atomic" @@ -286,6 +287,38 @@ func TestPoolCancelRun(t *testing.T) { wg.Wait() } +// Check that the loop terminates even if no one reads from the boot error channel. +func TestPoolBootErrors(t *testing.T) { + var failCount atomic.Int64 + + mgr := NewPool[*testInstance]( + 3, + func(idx int) (*testInstance, error) { + failCount.Add(1) + return nil, fmt.Errorf("boot error") + }, + func(ctx context.Context, _ *testInstance, _ UpdateInfo) { + <-ctx.Done() + }, + ) + + done := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + go func() { + mgr.Loop(ctx) + close(done) + }() + + // Wait till the boot error channel saturates. + for failCount.Load() < bootErrorChanCap { + time.Sleep(10 * time.Millisecond) + } + + // Now terminate the loop. + cancel() + <-done +} + func makePool(count int) []testInstance { var ret []testInstance for i := 0; i < count; i++ { |
