aboutsummaryrefslogtreecommitdiffstats
path: root/vm
diff options
context:
space:
mode:
authorDmitry Vyukov <dvyukov@google.com>2024-11-07 16:01:04 +0100
committerDmitry Vyukov <dvyukov@google.com>2024-11-08 14:15:20 +0000
commit0e6221e714759ad3d12fb1a94b62c5c607a4dbab (patch)
tree7b0c2da5c189a282ad7ea83b61a97d1c14716083 /vm
parent9c60c31484778260e7eb20d563c33554cf39e8fd (diff)
syz-manager: add pause function
The pause function is useful to make manager stop consuming CPU for some time, e.g. when it runs on a shared machine, and a user wants to use CPU for something else. Previously one would need to kill the manager process and restart later to achieve this, but the restart is costly, and aborts all bug reproductions.
Diffstat (limited to 'vm')
-rw-r--r--vm/dispatcher/pool.go36
-rw-r--r--vm/dispatcher/pool_test.go44
2 files changed, 76 insertions, 4 deletions
diff --git a/vm/dispatcher/pool.go b/vm/dispatcher/pool.go
index 7bfedf804..aa3228cbb 100644
--- a/vm/dispatcher/pool.go
+++ b/vm/dispatcher/pool.go
@@ -34,8 +34,10 @@ type Pool[T Instance] struct {
jobs chan Runner[T]
// The mutex serializes ReserveForRun() and SetDefault() calls.
- mu sync.Mutex
+ mu *sync.Mutex
+ cv *sync.Cond
instances []*poolInstance[T]
+ paused bool
}
func NewPool[T Instance](count int, creator CreateInstance[T], def Runner[T]) *Pool[T] {
@@ -48,12 +50,15 @@ func NewPool[T Instance](count int, creator CreateInstance[T], def Runner[T]) *P
inst.reset(func() {})
instances[i] = inst
}
+ mu := new(sync.Mutex)
return &Pool[T]{
BootErrors: make(chan error, 16),
creator: creator,
defaultJob: def,
instances: instances,
jobs: make(chan Runner[T]),
+ mu: mu,
+ cv: sync.NewCond(mu),
}
}
@@ -62,11 +67,33 @@ func (p *Pool[T]) SetDefault(def Runner[T]) {
p.mu.Lock()
defer p.mu.Unlock()
p.defaultJob = def
+ p.kickDefault()
+}
+
+func (p *Pool[T]) kickDefault() {
for _, inst := range p.instances {
- if inst.reserved() {
- continue
+ if !inst.reserved() {
+ inst.free(p.defaultJob)
}
- inst.free(def)
+ }
+}
+
+func (p *Pool[T]) TogglePause(paused bool) {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ p.paused = paused
+ if paused {
+ p.kickDefault()
+ } else {
+ p.cv.Broadcast()
+ }
+}
+
+func (p *Pool[T]) waitUnpaused() {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ for p.paused {
+ p.cv.Wait()
}
}
@@ -86,6 +113,7 @@ func (p *Pool[T]) Loop(ctx context.Context) {
}
func (p *Pool[T]) runInstance(ctx context.Context, inst *poolInstance[T]) {
+ p.waitUnpaused()
ctx, cancel := context.WithCancel(ctx)
log.Logf(2, "pool: booting instance %d", inst.idx)
diff --git a/vm/dispatcher/pool_test.go b/vm/dispatcher/pool_test.go
index 36d822b71..452cd598a 100644
--- a/vm/dispatcher/pool_test.go
+++ b/vm/dispatcher/pool_test.go
@@ -5,6 +5,7 @@ package dispatcher
import (
"context"
+ "runtime"
"sync/atomic"
"testing"
"time"
@@ -137,10 +138,17 @@ func TestPoolStress(t *testing.T) {
)
done := make(chan bool)
ctx, cancel := context.WithCancel(context.Background())
+
go func() {
mgr.Loop(ctx)
close(done)
}()
+ go func() {
+ for i := 0; i < 128; i++ {
+ mgr.TogglePause(i%2 == 0)
+ runtime.Gosched()
+ }
+ }()
for i := 0; i < 128; i++ {
go mgr.Run(func(ctx context.Context, _ *nilInstance, _ UpdateInfo) {
})
@@ -193,6 +201,42 @@ func TestPoolNewDefault(t *testing.T) {
<-done
}
+func TestPoolPause(t *testing.T) {
+ mgr := NewPool[*nilInstance](
+ 10,
+ func(idx int) (*nilInstance, error) {
+ return &nilInstance{}, nil
+ },
+ func(ctx context.Context, _ *nilInstance, _ UpdateInfo) {
+ t.Fatal("must not be called")
+ },
+ )
+ mgr.ReserveForRun(10)
+ mgr.TogglePause(true)
+ done := make(chan bool)
+ ctx, cancel := context.WithCancel(context.Background())
+ go func() {
+ mgr.Loop(ctx)
+ close(done)
+ }()
+
+ run := make(chan bool, 1)
+ go mgr.Run(func(ctx context.Context, _ *nilInstance, _ UpdateInfo) {
+ run <- true
+ })
+ time.Sleep(10 * time.Millisecond)
+ if len(run) != 0 {
+ t.Fatalf("job run while paused")
+ }
+ mgr.TogglePause(false)
+ <-run
+
+ mgr.Run(func(ctx context.Context, _ *nilInstance, _ UpdateInfo) {})
+
+ cancel()
+ <-done
+}
+
func makePool(count int) []testInstance {
var ret []testInstance
for i := 0; i < count; i++ {