From 0e6221e714759ad3d12fb1a94b62c5c607a4dbab Mon Sep 17 00:00:00 2001 From: Dmitry Vyukov Date: Thu, 7 Nov 2024 16:01:04 +0100 Subject: syz-manager: add pause function The pause function is useful to make manager stop consuming CPU for some time, e.g. when it runs on a shared machine, and a user wants to use CPU for something else. Previously one would need to kill the manager process and restart later to achieve this, but the restart is costly, and aborts all bug reproductions. --- vm/dispatcher/pool.go | 36 ++++++++++++++++++++++++++++++++---- vm/dispatcher/pool_test.go | 44 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+), 4 deletions(-) (limited to 'vm') diff --git a/vm/dispatcher/pool.go b/vm/dispatcher/pool.go index 7bfedf804..aa3228cbb 100644 --- a/vm/dispatcher/pool.go +++ b/vm/dispatcher/pool.go @@ -34,8 +34,10 @@ type Pool[T Instance] struct { jobs chan Runner[T] // The mutex serializes ReserveForRun() and SetDefault() calls. - mu sync.Mutex + mu *sync.Mutex + cv *sync.Cond instances []*poolInstance[T] + paused bool } func NewPool[T Instance](count int, creator CreateInstance[T], def Runner[T]) *Pool[T] { @@ -48,12 +50,15 @@ func NewPool[T Instance](count int, creator CreateInstance[T], def Runner[T]) *P inst.reset(func() {}) instances[i] = inst } + mu := new(sync.Mutex) return &Pool[T]{ BootErrors: make(chan error, 16), creator: creator, defaultJob: def, instances: instances, jobs: make(chan Runner[T]), + mu: mu, + cv: sync.NewCond(mu), } } @@ -62,11 +67,33 @@ func (p *Pool[T]) SetDefault(def Runner[T]) { p.mu.Lock() defer p.mu.Unlock() p.defaultJob = def + p.kickDefault() +} + +func (p *Pool[T]) kickDefault() { for _, inst := range p.instances { - if inst.reserved() { - continue + if !inst.reserved() { + inst.free(p.defaultJob) } - inst.free(def) + } +} + +func (p *Pool[T]) TogglePause(paused bool) { + p.mu.Lock() + defer p.mu.Unlock() + p.paused = paused + if paused { + p.kickDefault() + } else { + p.cv.Broadcast() + } +} + +func (p *Pool[T]) waitUnpaused() { + p.mu.Lock() + defer p.mu.Unlock() + for p.paused { + p.cv.Wait() } } @@ -86,6 +113,7 @@ func (p *Pool[T]) Loop(ctx context.Context) { } func (p *Pool[T]) runInstance(ctx context.Context, inst *poolInstance[T]) { + p.waitUnpaused() ctx, cancel := context.WithCancel(ctx) log.Logf(2, "pool: booting instance %d", inst.idx) diff --git a/vm/dispatcher/pool_test.go b/vm/dispatcher/pool_test.go index 36d822b71..452cd598a 100644 --- a/vm/dispatcher/pool_test.go +++ b/vm/dispatcher/pool_test.go @@ -5,6 +5,7 @@ package dispatcher import ( "context" + "runtime" "sync/atomic" "testing" "time" @@ -137,10 +138,17 @@ func TestPoolStress(t *testing.T) { ) done := make(chan bool) ctx, cancel := context.WithCancel(context.Background()) + go func() { mgr.Loop(ctx) close(done) }() + go func() { + for i := 0; i < 128; i++ { + mgr.TogglePause(i%2 == 0) + runtime.Gosched() + } + }() for i := 0; i < 128; i++ { go mgr.Run(func(ctx context.Context, _ *nilInstance, _ UpdateInfo) { }) @@ -193,6 +201,42 @@ func TestPoolNewDefault(t *testing.T) { <-done } +func TestPoolPause(t *testing.T) { + mgr := NewPool[*nilInstance]( + 10, + func(idx int) (*nilInstance, error) { + return &nilInstance{}, nil + }, + func(ctx context.Context, _ *nilInstance, _ UpdateInfo) { + t.Fatal("must not be called") + }, + ) + mgr.ReserveForRun(10) + mgr.TogglePause(true) + done := make(chan bool) + ctx, cancel := context.WithCancel(context.Background()) + go func() { + mgr.Loop(ctx) + close(done) + }() + + run := make(chan bool, 1) + go mgr.Run(func(ctx context.Context, _ *nilInstance, _ UpdateInfo) { + run <- true + }) + time.Sleep(10 * time.Millisecond) + if len(run) != 0 { + t.Fatalf("job run while paused") + } + mgr.TogglePause(false) + <-run + + mgr.Run(func(ctx context.Context, _ *nilInstance, _ UpdateInfo) {}) + + cancel() + <-done +} + func makePool(count int) []testInstance { var ret []testInstance for i := 0; i < count; i++ { -- cgit mrf-deployment