diff options
| author | Aleksandr Nogikh <nogikh@google.com> | 2024-08-07 14:56:26 +0200 |
|---|---|---|
| committer | Aleksandr Nogikh <nogikh@google.com> | 2024-08-13 14:20:08 +0000 |
| commit | eed4a69f8097dc228b4e11cfb84f0162ae58adb5 (patch) | |
| tree | 08177350c21031b75a598379ecd856f3ffffa353 | |
| parent | f21a18ca2012d229fb7061b37f26c8fa7de96e59 (diff) | |
vm/dispatcher: support updating the default function
Stop all default VMs and restart them with the new handler.
| -rw-r--r-- | vm/dispatcher/pool.go | 19 | ||||
| -rw-r--r-- | vm/dispatcher/pool_test.go | 42 |
2 files changed, 60 insertions, 1 deletions
diff --git a/vm/dispatcher/pool.go b/vm/dispatcher/pool.go index d13812172..7bfedf804 100644 --- a/vm/dispatcher/pool.go +++ b/vm/dispatcher/pool.go @@ -33,7 +33,7 @@ type Pool[T Instance] struct { defaultJob Runner[T] jobs chan Runner[T] - // The mutex serializes ReserveForRun() calls. + // The mutex serializes ReserveForRun() and SetDefault() calls. mu sync.Mutex instances []*poolInstance[T] } @@ -57,6 +57,19 @@ func NewPool[T Instance](count int, creator CreateInstance[T], def Runner[T]) *P } } +// UpdateDefault forces all VMs to restart. +func (p *Pool[T]) SetDefault(def Runner[T]) { + p.mu.Lock() + defer p.mu.Unlock() + p.defaultJob = def + for _, inst := range p.instances { + if inst.reserved() { + continue + } + inst.free(def) + } +} + func (p *Pool[T]) Loop(ctx context.Context) { var wg sync.WaitGroup wg.Add(len(p.instances)) @@ -254,6 +267,10 @@ func (pi *poolInstance[T]) reserve(ch chan Runner[T]) { func (pi *poolInstance[T]) free(job Runner[T]) { pi.mu.Lock() + if pi.job != nil { + // A change of a default job, let's force restart the instance. + pi.stop() + } pi.job = job pi.jobChan = nil switchToJob := pi.switchToJob diff --git a/vm/dispatcher/pool_test.go b/vm/dispatcher/pool_test.go index f48534e56..36d822b71 100644 --- a/vm/dispatcher/pool_test.go +++ b/vm/dispatcher/pool_test.go @@ -151,6 +151,48 @@ func TestPoolStress(t *testing.T) { <-done } +func TestPoolNewDefault(t *testing.T) { + var originalCount atomic.Int64 + + // The test to aid the race detector. + mgr := NewPool[*nilInstance]( + 10, + func(idx int) (*nilInstance, error) { + return &nilInstance{}, nil + }, + func(ctx context.Context, _ *nilInstance, _ UpdateInfo) { + originalCount.Add(1) + <-ctx.Done() + originalCount.Add(-1) + }, + ) + done := make(chan bool) + ctx, cancel := context.WithCancel(context.Background()) + go func() { + mgr.Loop(ctx) + close(done) + }() + + for originalCount.Load() != 10 { + time.Sleep(time.Second / 10) + } + + var newCount atomic.Int64 + mgr.SetDefault(func(ctx context.Context, _ *nilInstance, _ UpdateInfo) { + newCount.Add(1) + <-ctx.Done() + newCount.Add(-1) + }) + + for newCount.Load() != 10 { + time.Sleep(time.Second / 10) + } + assert.Equal(t, int64(0), originalCount.Load()) + + cancel() + <-done +} + func makePool(count int) []testInstance { var ret []testInstance for i := 0; i < count; i++ { |
