diff options
| -rw-r--r-- | pkg/rpcserver/rpcserver.go | 10 | ||||
| -rw-r--r-- | vm/dispatcher/pool.go | 266 | ||||
| -rw-r--r-- | vm/dispatcher/pool_test.go | 171 | ||||
| -rw-r--r-- | vm/vm.go | 9 |
4 files changed, 451 insertions, 5 deletions
diff --git a/pkg/rpcserver/rpcserver.go b/pkg/rpcserver/rpcserver.go index c50f483b1..0ad15c040 100644 --- a/pkg/rpcserver/rpcserver.go +++ b/pkg/rpcserver/rpcserver.go @@ -183,11 +183,6 @@ func (serv *Server) Close() error { return serv.serv.Close() } -type VMState struct { - State int - Timestamp time.Time -} - const ( StateOffline = iota StateBooting @@ -195,6 +190,11 @@ const ( StateStopping ) +type VMState struct { + State int + Timestamp time.Time +} + func (serv *Server) VMState() map[string]VMState { serv.mu.Lock() defer serv.mu.Unlock() diff --git a/vm/dispatcher/pool.go b/vm/dispatcher/pool.go new file mode 100644 index 000000000..5135a231d --- /dev/null +++ b/vm/dispatcher/pool.go @@ -0,0 +1,266 @@ +// Copyright 2024 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package dispatcher + +import ( + "context" + "io" + "sync" + "time" + + "github.com/google/syzkaller/pkg/log" +) + +type Instance interface { + io.Closer +} + +type UpdateInfo func(cb func(info *Info)) +type Runner[T Instance] func(ctx context.Context, inst T, updInfo UpdateInfo) +type CreateInstance[T Instance] func(int) (T, error) + +// Pool[T] provides the functionality of a generic pool of instances. +// The instance is assumed to boot, be controlled by one Runner and then be re-created. +// The pool is assumed to have one default Runner (e.g. to be used for fuzzing), while a +// dynamically controlled sub-pool might be reserved for the arbitrary Runners. +type Pool[T Instance] struct { + BootErrors chan error + + creator CreateInstance[T] + defaultJob Runner[T] + jobs chan Runner[T] + + // The mutex serializes ReserveForRun() calls. + mu sync.Mutex + instances []*poolInstance[T] +} + +func NewPool[T Instance](count int, creator CreateInstance[T], def Runner[T]) *Pool[T] { + instances := make([]*poolInstance[T], count) + for i := 0; i < count; i++ { + inst := &poolInstance[T]{ + job: def, + idx: i, + } + inst.reset(func() {}) + instances[i] = inst + } + return &Pool[T]{ + BootErrors: make(chan error, 16), + creator: creator, + defaultJob: def, + instances: instances, + jobs: make(chan Runner[T]), + } +} + +func (p *Pool[T]) Loop(ctx context.Context) { + var wg sync.WaitGroup + wg.Add(len(p.instances)) + for _, inst := range p.instances { + inst := inst + go func() { + for ctx.Err() == nil { + p.runInstance(ctx, inst) + } + wg.Done() + }() + } + wg.Wait() +} + +func (p *Pool[T]) runInstance(ctx context.Context, inst *poolInstance[T]) { + ctx, cancel := context.WithCancel(ctx) + + log.Logf(2, "pool: booting instance %d", inst.idx) + + p.mu.Lock() + // Avoid races with ReserveForRun(). + inst.reset(cancel) + p.mu.Unlock() + + inst.status(StateBooting) + defer inst.status(StateOffline) + + obj, err := p.creator(inst.idx) + if err != nil { + p.BootErrors <- err + return + } + defer obj.Close() + + inst.status(StateWaiting) + // The job and jobChan fields are subject to concurrent updates. + inst.mu.Lock() + job, jobChan := inst.job, inst.jobChan + inst.mu.Unlock() + + if job == nil { + select { + case newJob := <-jobChan: + job = newJob + case newJob := <-inst.switchToJob: + job = newJob + case <-ctx.Done(): + return + } + } + + inst.status(StateRunning) + job(ctx, obj, inst.updateInfo) +} + +// ReserveForRun specifies the size of the sub-pool for the execution of custom runners. +// The reserved instances will be booted, but the pool will not start the default runner. +// To unreserve all instances, execute ReserveForRun(0). +func (p *Pool[T]) ReserveForRun(count int) { + p.mu.Lock() + defer p.mu.Unlock() + + if count > len(p.instances) { + panic("trying to reserve more VMs than present") + } + + var free, reserved []*poolInstance[T] + for _, inst := range p.instances { + if inst.reserved() { + reserved = append(reserved, inst) + } else { + free = append(free, inst) + } + } + + needReserve := count - len(reserved) + for i := 0; i < needReserve; i++ { + log.Logf(2, "pool: reserving instance %d", free[i].idx) + free[i].reserve(p.jobs) + } + + needFree := len(reserved) - count + for i := 0; i < needFree; i++ { + log.Logf(2, "pool: releasing instance %d", reserved[i].idx) + reserved[i].free(p.defaultJob) + } +} + +// Run blocks until it has found an instance to execute job and until job has finished. +func (p *Pool[T]) Run(job Runner[T]) { + done := make(chan struct{}) + p.jobs <- func(ctx context.Context, inst T, upd UpdateInfo) { + job(ctx, inst, upd) + close(done) + } + <-done +} + +func (p *Pool[T]) Total() int { + return len(p.instances) +} + +type Info struct { + State InstanceState + Status string + LastUpdate time.Time + Reserved bool + + // The optional callbacks. + MachineInfo func() []byte + DetailedStatus func() []byte +} + +func (p *Pool[T]) State() []Info { + p.mu.Lock() + defer p.mu.Unlock() + + ret := make([]Info, len(p.instances)) + for i, inst := range p.instances { + ret[i] = inst.getInfo() + } + return ret +} + +// poolInstance is not thread safe. +type poolInstance[T Instance] struct { + mu sync.Mutex + info Info + idx int + + // Either job or jobChan will be set. + job Runner[T] + jobChan chan Runner[T] + switchToJob chan Runner[T] + stop func() +} + +type InstanceState int + +const ( + StateOffline InstanceState = iota + StateBooting + StateWaiting + StateRunning +) + +// reset() and status() may be called concurrently to all other methods. +// Other methods themselves are serialized. +func (pi *poolInstance[T]) reset(stop func()) { + pi.mu.Lock() + defer pi.mu.Unlock() + + pi.info = Info{ + State: StateOffline, + LastUpdate: time.Now(), + Reserved: pi.info.Reserved, + } + pi.stop = stop + pi.switchToJob = make(chan Runner[T]) +} + +func (pi *poolInstance[T]) updateInfo(upd func(*Info)) { + pi.mu.Lock() + defer pi.mu.Unlock() + upd(&pi.info) + pi.info.LastUpdate = time.Now() +} + +func (pi *poolInstance[T]) status(status InstanceState) { + pi.updateInfo(func(info *Info) { + info.State = status + }) +} + +func (pi *poolInstance[T]) reserved() bool { + return pi.jobChan != nil +} + +func (pi *poolInstance[T]) getInfo() Info { + pi.mu.Lock() + defer pi.mu.Unlock() + return pi.info +} + +func (pi *poolInstance[T]) reserve(ch chan Runner[T]) { + pi.stop() + pi.jobChan = ch + pi.job = nil + pi.updateInfo(func(info *Info) { + info.Reserved = true + }) +} + +func (pi *poolInstance[T]) free(job Runner[T]) { + pi.job = job + pi.jobChan = nil + + pi.updateInfo(func(info *Info) { + info.Reserved = false + }) + + select { + case pi.switchToJob <- job: + // Just in case the instance has been waiting. + return + default: + } +} diff --git a/vm/dispatcher/pool_test.go b/vm/dispatcher/pool_test.go new file mode 100644 index 000000000..275186a15 --- /dev/null +++ b/vm/dispatcher/pool_test.go @@ -0,0 +1,171 @@ +// Copyright 2024 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package dispatcher + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestPoolDefault(t *testing.T) { + count := 3 + pool := makePool(count) + + mgr := NewPool[*testInstance]( + count, + func(idx int) (*testInstance, error) { + pool[idx].reset() + return &pool[idx], nil + }, + func(ctx context.Context, inst *testInstance, _ UpdateInfo) { + pool[inst.Index()].run(ctx) + }, + ) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan bool) + go func() { + mgr.Loop(ctx) + close(done) + }() + + // Eventually all instances are up and busy. + for i := 0; i < count; i++ { + pool[i].waitRun() + } + + // The pool restarts failed jobs. + for i := 0; i < 10; i++ { + pool[0].stopRun() + pool[2].stopRun() + + pool[0].waitRun() + pool[2].waitRun() + } + + cancel() + <-done +} + +func TestPoolSplit(t *testing.T) { + count := 3 + pool := makePool(count) + var defaultCount atomic.Int64 + + mgr := NewPool[*testInstance]( + count, + func(idx int) (*testInstance, error) { + pool[idx].reset() + return &pool[idx], nil + }, + func(ctx context.Context, inst *testInstance, _ UpdateInfo) { + defaultCount.Add(1) + pool[inst.Index()].run(ctx) + defaultCount.Add(-1) + }, + ) + + done := make(chan bool) + ctx, cancel := context.WithCancel(context.Background()) + go func() { + mgr.Loop(ctx) + close(done) + }() + + startedRuns := make(chan bool) + stopRuns := make(chan bool) + for i := 0; i < 10; i++ { + go func() { + mgr.Run(func(ctx context.Context, _ *testInstance, _ UpdateInfo) { + startedRuns <- true + select { + case <-ctx.Done(): + case <-stopRuns: + } + }) + }() + } + + // So far, there are no reserved instances. + for i := 0; i < count; i++ { + pool[i].waitRun() + } + + // Dedicate one instance to the pool. + mgr.ReserveForRun(1) + + // The first job must start. + <-startedRuns + // Two default jobs are running. + assert.EqualValues(t, 2, defaultCount.Load()) + stopRuns <- true + + // Take away the pool instance. + mgr.ReserveForRun(0) + // All instances must be busy with the default jobs. + for i := 0; i < count; i++ { + pool[i].waitRun() + } + assert.EqualValues(t, 3, defaultCount.Load()) + + // Now let's finish all jobs. + mgr.ReserveForRun(2) + for i := 0; i < 9; i++ { + <-startedRuns + stopRuns <- true + } + + cancel() + <-done +} + +func makePool(count int) []testInstance { + var ret []testInstance + for i := 0; i < count; i++ { + ret = append(ret, testInstance{index: i}) + } + return ret +} + +type testInstance struct { + index int + hasRun atomic.Bool + stop chan bool +} + +func (ti *testInstance) reset() { + ti.stop = make(chan bool) + ti.hasRun.Store(false) +} + +func (ti *testInstance) run(ctx context.Context) { + ti.hasRun.Store(true) + select { + case <-ti.stop: + case <-ctx.Done(): + } +} + +func (ti *testInstance) waitRun() { + for !ti.hasRun.Load() { + time.Sleep(10 * time.Millisecond) + } +} + +func (ti *testInstance) stopRun() { + close(ti.stop) + ti.hasRun.Store(false) // make subsequent waitRun() actually wait for the next command. +} + +func (ti *testInstance) Index() int { + return ti.index +} + +func (ti *testInstance) Close() error { + return nil +} @@ -23,6 +23,7 @@ import ( "github.com/google/syzkaller/pkg/report" "github.com/google/syzkaller/pkg/stats" "github.com/google/syzkaller/sys/targets" + "github.com/google/syzkaller/vm/dispatcher" "github.com/google/syzkaller/vm/vmimpl" // Import all VM implementations, so that users only need to import vm. @@ -273,6 +274,14 @@ func (inst *Instance) Close() error { return err } +func NewDispatcher(pool *Pool, def dispatcher.Runner[*Instance]) *dispatcher.Pool[*Instance] { + return dispatcher.NewPool(pool.Count(), + func(idx int) (*Instance, error) { + return pool.Create(idx) + }, + def) +} + type monitor struct { inst *Instance outc <-chan []byte |
