aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksandr Nogikh <nogikh@google.com>2024-08-07 14:56:26 +0200
committerAleksandr Nogikh <nogikh@google.com>2024-08-13 14:20:08 +0000
commiteed4a69f8097dc228b4e11cfb84f0162ae58adb5 (patch)
tree08177350c21031b75a598379ecd856f3ffffa353
parentf21a18ca2012d229fb7061b37f26c8fa7de96e59 (diff)
vm/dispatcher: support updating the default function
Stop all default VMs and restart them with the new handler.
-rw-r--r--vm/dispatcher/pool.go19
-rw-r--r--vm/dispatcher/pool_test.go42
2 files changed, 60 insertions, 1 deletions
diff --git a/vm/dispatcher/pool.go b/vm/dispatcher/pool.go
index d13812172..7bfedf804 100644
--- a/vm/dispatcher/pool.go
+++ b/vm/dispatcher/pool.go
@@ -33,7 +33,7 @@ type Pool[T Instance] struct {
defaultJob Runner[T]
jobs chan Runner[T]
- // The mutex serializes ReserveForRun() calls.
+ // The mutex serializes ReserveForRun() and SetDefault() calls.
mu sync.Mutex
instances []*poolInstance[T]
}
@@ -57,6 +57,19 @@ func NewPool[T Instance](count int, creator CreateInstance[T], def Runner[T]) *P
}
}
+// UpdateDefault forces all VMs to restart.
+func (p *Pool[T]) SetDefault(def Runner[T]) {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ p.defaultJob = def
+ for _, inst := range p.instances {
+ if inst.reserved() {
+ continue
+ }
+ inst.free(def)
+ }
+}
+
func (p *Pool[T]) Loop(ctx context.Context) {
var wg sync.WaitGroup
wg.Add(len(p.instances))
@@ -254,6 +267,10 @@ func (pi *poolInstance[T]) reserve(ch chan Runner[T]) {
func (pi *poolInstance[T]) free(job Runner[T]) {
pi.mu.Lock()
+ if pi.job != nil {
+ // A change of a default job, let's force restart the instance.
+ pi.stop()
+ }
pi.job = job
pi.jobChan = nil
switchToJob := pi.switchToJob
diff --git a/vm/dispatcher/pool_test.go b/vm/dispatcher/pool_test.go
index f48534e56..36d822b71 100644
--- a/vm/dispatcher/pool_test.go
+++ b/vm/dispatcher/pool_test.go
@@ -151,6 +151,48 @@ func TestPoolStress(t *testing.T) {
<-done
}
+func TestPoolNewDefault(t *testing.T) {
+ var originalCount atomic.Int64
+
+ // The test to aid the race detector.
+ mgr := NewPool[*nilInstance](
+ 10,
+ func(idx int) (*nilInstance, error) {
+ return &nilInstance{}, nil
+ },
+ func(ctx context.Context, _ *nilInstance, _ UpdateInfo) {
+ originalCount.Add(1)
+ <-ctx.Done()
+ originalCount.Add(-1)
+ },
+ )
+ done := make(chan bool)
+ ctx, cancel := context.WithCancel(context.Background())
+ go func() {
+ mgr.Loop(ctx)
+ close(done)
+ }()
+
+ for originalCount.Load() != 10 {
+ time.Sleep(time.Second / 10)
+ }
+
+ var newCount atomic.Int64
+ mgr.SetDefault(func(ctx context.Context, _ *nilInstance, _ UpdateInfo) {
+ newCount.Add(1)
+ <-ctx.Done()
+ newCount.Add(-1)
+ })
+
+ for newCount.Load() != 10 {
+ time.Sleep(time.Second / 10)
+ }
+ assert.Equal(t, int64(0), originalCount.Load())
+
+ cancel()
+ <-done
+}
+
func makePool(count int) []testInstance {
var ret []testInstance
for i := 0; i < count; i++ {