From 6ca1eb5a578bb1421ad0f3dbde675eb34647e6d7 Mon Sep 17 00:00:00 2001 From: Aleksandr Nogikh Date: Fri, 5 Jul 2024 15:41:55 +0200 Subject: all: transition to instance.Pool Rely on instance.Pool to perform fuzzing and do bug reproductions. Extract the reproduction queue logic to separate testable class. --- pkg/repro/repro.go | 192 ++++++++++++++++++++------------------------- pkg/repro/repro_test.go | 41 ++-------- pkg/repro/strace.go | 47 ++++++----- pkg/rpcserver/rpcserver.go | 61 +++----------- pkg/rpcserver/runner.go | 21 +++++ 5 files changed, 149 insertions(+), 213 deletions(-) (limited to 'pkg') diff --git a/pkg/repro/repro.go b/pkg/repro/repro.go index 08ed610ec..5ef2a52b8 100644 --- a/pkg/repro/repro.go +++ b/pkg/repro/repro.go @@ -5,10 +5,10 @@ package repro import ( "bytes" + "context" "errors" "fmt" "sort" - "sync" "time" "github.com/google/syzkaller/pkg/bisect/minimize" @@ -22,6 +22,7 @@ import ( "github.com/google/syzkaller/prog" "github.com/google/syzkaller/sys/targets" "github.com/google/syzkaller/vm" + "github.com/google/syzkaller/vm/dispatcher" ) type Result struct { @@ -43,21 +44,14 @@ type Stats struct { SimplifyCTime time.Duration } -type reproInstance struct { - index int - execProg execInterface -} - -type context struct { +type reproContext struct { + exec execInterface logf func(string, ...interface{}) target *targets.Target - reporter *report.Reporter crashTitle string crashType crash.Type crashStart int entries []*prog.LogEntry - instances chan *reproInstance - bootRequests chan int testTimeouts []time.Duration startOpts csource.Options stats *Stats @@ -65,9 +59,8 @@ type context struct { timeouts targets.Timeouts } -// execInterface describes what's needed from a VM by a pkg/repro. +// execInterface describes the interfaces needed by pkg/repro. type execInterface interface { - Close() RunCProg(p *prog.Prog, duration time.Duration, opts csource.Options) (*instance.RunResult, error) RunSyzProg(syzProg []byte, duration time.Duration, opts csource.Options, exitCondition vm.ExitCondition) ( *instance.RunResult, error) @@ -76,31 +69,22 @@ type execInterface interface { var ErrNoPrograms = errors.New("crash log does not contain any programs") func Run(crashLog []byte, cfg *mgrconfig.Config, features flatrpc.Feature, reporter *report.Reporter, - vmPool *vm.Pool, vmIndexes []int) (*Result, *Stats, error) { - ctx, err := prepareCtx(crashLog, cfg, features, reporter, len(vmIndexes)) + pool *dispatcher.Pool[*vm.Instance]) (*Result, *Stats, error) { + exec := &poolWrapper{ + cfg: cfg, + reporter: reporter, + pool: pool, + } + ctx, err := prepareCtx(crashLog, cfg, features, reporter, exec) if err != nil { return nil, nil, err } - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - ctx.createInstances(cfg, vmPool) - }() - // Prepare VMs in advance. - for _, idx := range vmIndexes { - ctx.bootRequests <- idx - } - // Wait until all VMs are really released. - defer wg.Wait() + exec.logf = ctx.reproLogf return ctx.run() } func prepareCtx(crashLog []byte, cfg *mgrconfig.Config, features flatrpc.Feature, reporter *report.Reporter, - VMs int) (*context, error) { - if VMs == 0 { - return nil, fmt.Errorf("no VMs provided") - } + exec execInterface) (*reproContext, error) { entries := cfg.Target.ParseLog(crashLog) if len(entries) == 0 { return nil, ErrNoPrograms @@ -131,28 +115,23 @@ func prepareCtx(crashLog []byte, cfg *mgrconfig.Config, features flatrpc.Feature case crashType == crash.Hang: testTimeouts = testTimeouts[2:] } - ctx := &context{ + ctx := &reproContext{ + exec: exec, target: cfg.SysTarget, - reporter: reporter, crashTitle: crashTitle, crashType: crashType, crashStart: crashStart, entries: entries, - instances: make(chan *reproInstance, VMs), - bootRequests: make(chan int, VMs), testTimeouts: testTimeouts, startOpts: createStartOptions(cfg, features, crashType), stats: new(Stats), timeouts: cfg.Timeouts, } - ctx.reproLogf(0, "%v programs, %v VMs, timeouts %v", len(entries), VMs, testTimeouts) + ctx.reproLogf(0, "%v programs, timeouts %v", len(entries), testTimeouts) return ctx, nil } -func (ctx *context) run() (*Result, *Stats, error) { - // Indicate that we no longer need VMs. - defer close(ctx.bootRequests) - +func (ctx *reproContext) run() (*Result, *Stats, error) { res, err := ctx.repro() if err != nil { return nil, nil, err @@ -215,7 +194,7 @@ func createStartOptions(cfg *mgrconfig.Config, features flatrpc.Feature, return opts } -func (ctx *context) repro() (*Result, error) { +func (ctx *reproContext) repro() (*Result, error) { // Cut programs that were executed after crash. for i, ent := range ctx.entries { if ent.Start > ctx.crashStart { @@ -266,7 +245,7 @@ func (ctx *context) repro() (*Result, error) { return res, nil } -func (ctx *context) extractProg(entries []*prog.LogEntry) (*Result, error) { +func (ctx *reproContext) extractProg(entries []*prog.LogEntry) (*Result, error) { ctx.reproLogf(2, "extracting reproducer from %v programs", len(entries)) start := time.Now() defer func() { @@ -319,7 +298,7 @@ func (ctx *context) extractProg(entries []*prog.LogEntry) (*Result, error) { return nil, nil } -func (ctx *context) extractProgSingle(entries []*prog.LogEntry, duration time.Duration) (*Result, error) { +func (ctx *reproContext) extractProgSingle(entries []*prog.LogEntry, duration time.Duration) (*Result, error) { ctx.reproLogf(3, "single: executing %d programs separately with timeout %s", len(entries), duration) opts := ctx.startOpts @@ -343,7 +322,7 @@ func (ctx *context) extractProgSingle(entries []*prog.LogEntry, duration time.Du return nil, nil } -func (ctx *context) extractProgBisect(entries []*prog.LogEntry, baseDuration time.Duration) (*Result, error) { +func (ctx *reproContext) extractProgBisect(entries []*prog.LogEntry, baseDuration time.Duration) (*Result, error) { ctx.reproLogf(3, "bisect: bisecting %d programs with base timeout %s", len(entries), baseDuration) opts := ctx.startOpts @@ -385,7 +364,7 @@ func (ctx *context) extractProgBisect(entries []*prog.LogEntry, baseDuration tim // The bisected progs may exceed the prog.MaxCalls limit. // So let's first try to drop unneeded calls. -func (ctx *context) concatenateProgs(entries []*prog.LogEntry, dur time.Duration) (*Result, error) { +func (ctx *reproContext) concatenateProgs(entries []*prog.LogEntry, dur time.Duration) (*Result, error) { ctx.reproLogf(3, "bisect: concatenate %d entries", len(entries)) if len(entries) > 1 { // There's a risk of exceeding prog.MaxCalls, so let's first minimize @@ -446,7 +425,7 @@ func (ctx *context) concatenateProgs(entries []*prog.LogEntry, dur time.Duration } // Minimize calls and arguments. -func (ctx *context) minimizeProg(res *Result) (*Result, error) { +func (ctx *reproContext) minimizeProg(res *Result) (*Result, error) { ctx.reproLogf(2, "minimizing guilty program") start := time.Now() defer func() { @@ -467,7 +446,7 @@ func (ctx *context) minimizeProg(res *Result) (*Result, error) { } // Simplify repro options (threaded, sandbox, etc). -func (ctx *context) simplifyProg(res *Result) (*Result, error) { +func (ctx *reproContext) simplifyProg(res *Result) (*Result, error) { ctx.reproLogf(2, "simplifying guilty program options") start := time.Now() defer func() { @@ -502,7 +481,7 @@ func (ctx *context) simplifyProg(res *Result) (*Result, error) { } // Try triggering crash with a C reproducer. -func (ctx *context) extractC(res *Result) (*Result, error) { +func (ctx *reproContext) extractC(res *Result) (*Result, error) { ctx.reproLogf(2, "extracting C reproducer") start := time.Now() defer func() { @@ -518,7 +497,7 @@ func (ctx *context) extractC(res *Result) (*Result, error) { } // Try to simplify the C reproducer. -func (ctx *context) simplifyC(res *Result) (*Result, error) { +func (ctx *reproContext) simplifyC(res *Result) (*Result, error) { ctx.reproLogf(2, "simplifying C reproducer") start := time.Now() defer func() { @@ -565,13 +544,14 @@ func checkOpts(opts *csource.Options, timeouts targets.Timeouts, timeout time.Du return true } -func (ctx *context) testProg(p *prog.Prog, duration time.Duration, opts csource.Options) (crashed bool, err error) { +func (ctx *reproContext) testProg(p *prog.Prog, duration time.Duration, + opts csource.Options) (crashed bool, err error) { entry := prog.LogEntry{P: p} return ctx.testProgs([]*prog.LogEntry{&entry}, duration, opts) } -func (ctx *context) testWithInstance(callback func(execInterface) (rep *instance.RunResult, - err error)) (bool, error) { +func (ctx *reproContext) getVerdict(callback func() (rep *instance.RunResult, err error)) ( + bool, error) { var result *instance.RunResult var err error @@ -581,7 +561,7 @@ func (ctx *context) testWithInstance(callback func(execInterface) (rep *instance // and not. So let's just retry runs for all errors. // If the problem is transient, it will likely go away. // If the problem is permanent, it will just be the same. - result, err = ctx.runOnInstance(callback) + result, err = callback() if err == nil { break } @@ -607,17 +587,6 @@ func (ctx *context) testWithInstance(callback func(execInterface) (rep *instance var ErrNoVMs = errors.New("all VMs failed to boot") -// A helper method for testWithInstance. -func (ctx *context) runOnInstance(callback func(execInterface) (rep *instance.RunResult, - err error)) (*instance.RunResult, error) { - inst := <-ctx.instances - if inst == nil { - return nil, ErrNoVMs - } - defer ctx.returnInstance(inst) - return callback(inst.execProg) -} - func encodeEntries(entries []*prog.LogEntry) []byte { buf := new(bytes.Buffer) for _, ent := range entries { @@ -629,7 +598,7 @@ func encodeEntries(entries []*prog.LogEntry) []byte { return buf.Bytes() } -func (ctx *context) testProgs(entries []*prog.LogEntry, duration time.Duration, opts csource.Options) ( +func (ctx *reproContext) testProgs(entries []*prog.LogEntry, duration time.Duration, opts csource.Options) ( crashed bool, err error) { if len(entries) == 0 { return false, fmt.Errorf("no programs to execute") @@ -648,23 +617,19 @@ func (ctx *context) testProgs(entries []*prog.LogEntry, duration time.Duration, } ctx.reproLogf(2, "testing program (duration=%v, %+v): %s", duration, opts, program) ctx.reproLogf(3, "detailed listing:\n%s", pstr) - return ctx.testWithInstance(func(exec execInterface) (*instance.RunResult, error) { - return exec.RunSyzProg(pstr, duration, opts, instance.SyzExitConditions) + return ctx.getVerdict(func() (*instance.RunResult, error) { + return ctx.exec.RunSyzProg(pstr, duration, opts, instance.SyzExitConditions) }) } -func (ctx *context) testCProg(p *prog.Prog, duration time.Duration, opts csource.Options) (crashed bool, err error) { - return ctx.testWithInstance(func(exec execInterface) (*instance.RunResult, error) { - return exec.RunCProg(p, duration, opts) +func (ctx *reproContext) testCProg(p *prog.Prog, duration time.Duration, + opts csource.Options) (crashed bool, err error) { + return ctx.getVerdict(func() (*instance.RunResult, error) { + return ctx.exec.RunCProg(p, duration, opts) }) } -func (ctx *context) returnInstance(inst *reproInstance) { - inst.execProg.Close() - ctx.bootRequests <- inst.index -} - -func (ctx *context) reproLogf(level int, format string, args ...interface{}) { +func (ctx *reproContext) reproLogf(level int, format string, args ...interface{}) { if ctx.logf != nil { ctx.logf(format, args...) } @@ -673,7 +638,7 @@ func (ctx *context) reproLogf(level int, format string, args ...interface{}) { ctx.stats.Log = append(ctx.stats.Log, []byte(fmt.Sprintf(format, args...)+"\n")...) } -func (ctx *context) bisectProgs(progs []*prog.LogEntry, pred func([]*prog.LogEntry) (bool, error)) ( +func (ctx *reproContext) bisectProgs(progs []*prog.LogEntry, pred func([]*prog.LogEntry) (bool, error)) ( []*prog.LogEntry, error) { // Set up progs bisection. ctx.reproLogf(3, "bisect: bisecting %d programs", len(progs)) @@ -700,38 +665,49 @@ func (ctx *context) bisectProgs(progs []*prog.LogEntry, pred func([]*prog.LogEnt return ret, err } -func (ctx *context) createInstances(cfg *mgrconfig.Config, vmPool *vm.Pool) { - var wg sync.WaitGroup - for vmIndex := range ctx.bootRequests { - wg.Add(1) - vmIndex := vmIndex - go func() { - defer wg.Done() - - for try := 0; ; try++ { - select { - case <-vm.Shutdown: - return - default: - } - inst, err := instance.CreateExecProgInstance(vmPool, vmIndex, cfg, - ctx.reporter, &instance.OptionalConfig{Logf: ctx.reproLogf}) - if err != nil { - ctx.reproLogf(0, "failed to boot instance (try %v): %v", try+1, err) - time.Sleep(10 * time.Second) - continue - } - ctx.instances <- &reproInstance{execProg: inst, index: vmIndex} - break - } - }() - } - wg.Wait() - // Clean up. - close(ctx.instances) - for inst := range ctx.instances { - inst.execProg.Close() - } +type poolWrapper struct { + cfg *mgrconfig.Config + reporter *report.Reporter + pool *dispatcher.Pool[*vm.Instance] + logf func(level int, format string, args ...interface{}) +} + +func (pw *poolWrapper) RunCProg(p *prog.Prog, duration time.Duration, + opts csource.Options) (*instance.RunResult, error) { + var result *instance.RunResult + var err error + pw.pool.Run(func(ctx context.Context, inst *vm.Instance, updInfo dispatcher.UpdateInfo) { + updInfo(func(info *dispatcher.Info) { + info.Status = fmt.Sprintf("reproducing (C, %.1f min)", duration.Minutes()) + }) + var ret *instance.ExecProgInstance + ret, err = instance.SetupExecProg(inst, pw.cfg, pw.reporter, + &instance.OptionalConfig{Logf: pw.logf}) + if err != nil { + return + } + result, err = ret.RunCProg(p, duration, opts) + }) + return result, err +} + +func (pw *poolWrapper) RunSyzProg(syzProg []byte, duration time.Duration, + opts csource.Options, exitCondition vm.ExitCondition) (*instance.RunResult, error) { + var result *instance.RunResult + var err error + pw.pool.Run(func(ctx context.Context, inst *vm.Instance, updInfo dispatcher.UpdateInfo) { + updInfo(func(info *dispatcher.Info) { + info.Status = fmt.Sprintf("reproducing (syz, %.1f min)", duration.Minutes()) + }) + var ret *instance.ExecProgInstance + ret, err = instance.SetupExecProg(inst, pw.cfg, pw.reporter, + &instance.OptionalConfig{Logf: pw.logf}) + if err != nil { + return + } + result, err = ret.RunSyzProg(syzProg, duration, opts, exitCondition) + }) + return result, err } type Simplify func(opts *csource.Options) bool diff --git a/pkg/repro/repro_test.go b/pkg/repro/repro_test.go index 3945eae0f..5a378b610 100644 --- a/pkg/repro/repro_test.go +++ b/pkg/repro/repro_test.go @@ -7,7 +7,6 @@ import ( "fmt" "math/rand" "regexp" - "sync" "testing" "time" @@ -32,7 +31,7 @@ func initTest(t *testing.T) (*rand.Rand, int) { } func TestBisect(t *testing.T) { - ctx := &context{ + ctx := &reproContext{ stats: new(Stats), logf: t.Logf, } @@ -109,29 +108,11 @@ func TestSimplifies(t *testing.T) { check(opts, 0) } -func generateTestInstances(ctx *context, count int, execInterface execInterface) { - for i := 0; i < count; i++ { - ctx.bootRequests <- i - } - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - for vmIndex := range ctx.bootRequests { - ctx.instances <- &reproInstance{execProg: execInterface, index: vmIndex} - } - }() - wg.Wait() -} - type testExecInterface struct { - t *testing.T // For now only do the simplest imitation. run func([]byte) (*instance.RunResult, error) } -func (tei *testExecInterface) Close() {} - func (tei *testExecInterface) RunCProg(p *prog.Prog, duration time.Duration, opts csource.Options) (*instance.RunResult, error) { return tei.RunSyzProg(p.Serialize(), duration, opts, instance.SyzExitConditions) @@ -142,7 +123,7 @@ func (tei *testExecInterface) RunSyzProg(syzProg []byte, duration time.Duration, return tei.run(syzProg) } -func prepareTestCtx(t *testing.T, log string) *context { +func prepareTestCtx(t *testing.T, log string, exec execInterface) *reproContext { mgrConfig := &mgrconfig.Config{ Derived: mgrconfig.Derived{ TargetOS: targets.Linux, @@ -159,7 +140,7 @@ func prepareTestCtx(t *testing.T, log string) *context { if err != nil { t.Fatal(err) } - ctx, err := prepareCtx([]byte(log), mgrConfig, flatrpc.AllFeatures, reporter, 3) + ctx, err := prepareCtx([]byte(log), mgrConfig, flatrpc.AllFeatures, reporter, exec) if err != nil { t.Fatal(err) } @@ -199,9 +180,7 @@ func testExecRunner(log []byte) (*instance.RunResult, error) { // Just a pkg/repro smoke test: check that we can extract a two-call reproducer. // No focus on error handling and minor corner cases. func TestPlainRepro(t *testing.T) { - ctx := prepareTestCtx(t, testReproLog) - go generateTestInstances(ctx, 3, &testExecInterface{ - t: t, + ctx := prepareTestCtx(t, testReproLog, &testExecInterface{ run: testExecRunner, }) result, _, err := ctx.run() @@ -218,10 +197,8 @@ alarm(0xa) // There happen to be transient errors like ssh/scp connection failures. // Ensure that the code just retries. func TestVMErrorResilience(t *testing.T) { - ctx := prepareTestCtx(t, testReproLog) fail := false - go generateTestInstances(ctx, 3, &testExecInterface{ - t: t, + ctx := prepareTestCtx(t, testReproLog, &testExecInterface{ run: func(log []byte) (*instance.RunResult, error) { fail = !fail if fail { @@ -242,10 +219,8 @@ alarm(0xa) } func TestTooManyErrors(t *testing.T) { - ctx := prepareTestCtx(t, testReproLog) counter := 0 - go generateTestInstances(ctx, 3, &testExecInterface{ - t: t, + ctx := prepareTestCtx(t, testReproLog, &testExecInterface{ run: func(log []byte) (*instance.RunResult, error) { counter++ if counter%4 != 0 { @@ -279,9 +254,7 @@ func TestProgConcatenation(t *testing.T) { execLog += "getpid()\n" } } - ctx := prepareTestCtx(t, execLog) - go generateTestInstances(ctx, 3, &testExecInterface{ - t: t, + ctx := prepareTestCtx(t, execLog, &testExecInterface{ run: testExecRunner, }) result, _, err := ctx.run() diff --git a/pkg/repro/strace.go b/pkg/repro/strace.go index a34760c41..72ecc0954 100644 --- a/pkg/repro/strace.go +++ b/pkg/repro/strace.go @@ -4,6 +4,7 @@ package repro import ( + "context" "fmt" "github.com/google/syzkaller/pkg/instance" @@ -11,6 +12,7 @@ import ( "github.com/google/syzkaller/pkg/mgrconfig" "github.com/google/syzkaller/pkg/report" "github.com/google/syzkaller/vm" + "github.com/google/syzkaller/vm/dispatcher" ) type StraceResult struct { @@ -24,31 +26,36 @@ const ( ) func RunStrace(result *Result, cfg *mgrconfig.Config, reporter *report.Reporter, - vmPool *vm.Pool, vmIndex int) *StraceResult { + pool *dispatcher.Pool[*vm.Instance]) *StraceResult { if cfg.StraceBin == "" { return straceFailed(fmt.Errorf("strace binary is not set in the config")) } - inst, err := instance.CreateExecProgInstance(vmPool, vmIndex, cfg, reporter, - &instance.OptionalConfig{ - StraceBin: cfg.StraceBin, - BeforeContextLen: straceOutputLogSize, - }) - if err != nil { - return straceFailed(fmt.Errorf("failed to set up instance: %w", err)) - } - defer inst.VMInstance.Close() - var runRes *instance.RunResult - if result.CRepro { - log.Logf(1, "running C repro under strace") - runRes, err = inst.RunCProg(result.Prog, result.Duration, result.Opts) - } else { - log.Logf(1, "running syz repro under strace") - runRes, err = inst.RunSyzProg(result.Prog.Serialize(), result.Duration, - result.Opts, instance.SyzExitConditions) - } + var err error + pool.Run(func(ctx context.Context, inst *vm.Instance, updInfo dispatcher.UpdateInfo) { + updInfo(func(info *dispatcher.Info) { + info.Status = "running strace" + }) + ret, setupErr := instance.SetupExecProg(inst, cfg, reporter, + &instance.OptionalConfig{ + StraceBin: cfg.StraceBin, + BeforeContextLen: straceOutputLogSize, + }) + if setupErr != nil { + err = fmt.Errorf("failed to set up instance: %w", setupErr) + return + } + if result.CRepro { + log.Logf(1, "running C repro under strace") + runRes, err = ret.RunCProg(result.Prog, result.Duration, result.Opts) + } else { + log.Logf(1, "running syz repro under strace") + runRes, err = ret.RunSyzProg(result.Prog.Serialize(), result.Duration, + result.Opts, instance.SyzExitConditions) + } + }) if err != nil { - return straceFailed(fmt.Errorf("failed to generate strace log: %w", err)) + return straceFailed(err) } return &StraceResult{ Report: runRes.Report, diff --git a/pkg/rpcserver/rpcserver.go b/pkg/rpcserver/rpcserver.go index 0ad15c040..a6cae67cd 100644 --- a/pkg/rpcserver/rpcserver.go +++ b/pkg/rpcserver/rpcserver.go @@ -8,7 +8,6 @@ import ( "context" "errors" "fmt" - "maps" "math/rand" "slices" "sort" @@ -28,6 +27,7 @@ import ( "github.com/google/syzkaller/pkg/vminfo" "github.com/google/syzkaller/prog" "github.com/google/syzkaller/sys/targets" + "github.com/google/syzkaller/vm/dispatcher" ) type Config struct { @@ -77,7 +77,6 @@ type Server struct { mu sync.Mutex runners map[string]*Runner - info map[string]VMState execSource queue.Source triagedCorpus atomic.Bool statVMRestarts *stats.Val @@ -144,7 +143,6 @@ func newImpl(ctx context.Context, cfg *Config, mgr Manager) (*Server, error) { sysTarget: sysTarget, timeouts: sysTarget.Timeouts(cfg.Slowdown), runners: make(map[string]*Runner), - info: make(map[string]VMState), checker: checker, baseSource: baseSource, execSource: queue.Retry(baseSource), @@ -183,44 +181,6 @@ func (serv *Server) Close() error { return serv.serv.Close() } -const ( - StateOffline = iota - StateBooting - StateFuzzing - StateStopping -) - -type VMState struct { - State int - Timestamp time.Time -} - -func (serv *Server) VMState() map[string]VMState { - serv.mu.Lock() - defer serv.mu.Unlock() - return maps.Clone(serv.info) -} - -func (serv *Server) MachineInfo(name string) []byte { - serv.mu.Lock() - runner := serv.runners[name] - serv.mu.Unlock() - if runner == nil || !runner.Alive() { - return []byte("VM is not alive") - } - return runner.MachineInfo() -} - -func (serv *Server) RunnerStatus(name string) []byte { - serv.mu.Lock() - runner := serv.runners[name] - serv.mu.Unlock() - if runner == nil || !runner.Alive() { - return []byte("VM is not alive") - } - return runner.QueryStatus() -} - func (serv *Server) handleConn(conn *flatrpc.Conn) { connectReq, err := flatrpc.Recv[*flatrpc.ConnectRequestRaw](conn) if err != nil { @@ -232,7 +192,7 @@ func (serv *Server) handleConn(conn *flatrpc.Conn) { if serv.cfg.VMLess { // There is no VM loop, so minic what it would do. - serv.CreateInstance(name, nil) + serv.CreateInstance(name, nil, nil) defer func() { serv.StopFuzzing(name) serv.ShutdownInstance(name, true) @@ -271,10 +231,6 @@ func (serv *Server) handleConn(conn *flatrpc.Conn) { return } - serv.mu.Lock() - serv.info[name] = VMState{StateFuzzing, time.Now()} - serv.mu.Unlock() - if serv.triagedCorpus.Load() { if err := runner.SendCorpusTriaged(); err != nil { log.Logf(2, "%v", err) @@ -435,7 +391,7 @@ func (serv *Server) printMachineCheck(checkFilesInfo []*flatrpc.FileInfo, enable log.Logf(0, "machine check:\n%s", buf.Bytes()) } -func (serv *Server) CreateInstance(name string, injectExec chan<- bool) { +func (serv *Server) CreateInstance(name string, injectExec chan<- bool, updInfo dispatcher.UpdateInfo) { runner := &Runner{ source: serv.execSource, cover: serv.cfg.Cover, @@ -451,14 +407,14 @@ func (serv *Server) CreateInstance(name string, injectExec chan<- bool) { rnd: rand.New(rand.NewSource(time.Now().UnixNano())), stats: serv.runnerStats, procs: serv.cfg.Procs, + updInfo: updInfo, } serv.mu.Lock() + defer serv.mu.Unlock() if serv.runners[name] != nil { panic(fmt.Sprintf("duplicate instance %s", name)) } serv.runners[name] = runner - serv.info[name] = VMState{StateBooting, time.Now()} - serv.mu.Unlock() } // stopInstance prevents further request exchange requests. @@ -466,8 +422,12 @@ func (serv *Server) CreateInstance(name string, injectExec chan<- bool) { func (serv *Server) StopFuzzing(name string) { serv.mu.Lock() runner := serv.runners[name] - serv.info[name] = VMState{StateStopping, time.Now()} serv.mu.Unlock() + if runner.updInfo != nil { + runner.updInfo(func(info *dispatcher.Info) { + info.Status = "fuzzing is stopped" + }) + } runner.Stop() } @@ -475,7 +435,6 @@ func (serv *Server) ShutdownInstance(name string, crashed bool) ([]ExecRecord, [ serv.mu.Lock() runner := serv.runners[name] delete(serv.runners, name) - serv.info[name] = VMState{StateOffline, time.Now()} serv.mu.Unlock() return runner.Shutdown(crashed), runner.MachineInfo() } diff --git a/pkg/rpcserver/runner.go b/pkg/rpcserver/runner.go index 06fa6a581..c45598b9c 100644 --- a/pkg/rpcserver/runner.go +++ b/pkg/rpcserver/runner.go @@ -20,6 +20,7 @@ import ( "github.com/google/syzkaller/pkg/stats" "github.com/google/syzkaller/prog" "github.com/google/syzkaller/sys/targets" + "github.com/google/syzkaller/vm/dispatcher" ) type Runner struct { @@ -40,6 +41,7 @@ type Runner struct { executing map[int64]bool lastExec *LastExecuting rnd *rand.Rand + updInfo dispatcher.UpdateInfo // The mutex protects all the fields below. mu sync.Mutex @@ -78,6 +80,12 @@ type handshakeResult struct { } func (runner *Runner) Handshake(conn *flatrpc.Conn, cfg *handshakeConfig) error { + if runner.updInfo != nil { + runner.updInfo(func(info *dispatcher.Info) { + info.Status = "handshake" + }) + } + connectReply := &flatrpc.ConnectReply{ Debug: runner.debug, Cover: runner.cover, @@ -115,10 +123,23 @@ func (runner *Runner) Handshake(conn *flatrpc.Conn, cfg *handshakeConfig) error runner.machineInfo = ret.MachineInfo runner.canonicalizer = ret.Canonicalizer runner.mu.Unlock() + + if runner.updInfo != nil { + runner.updInfo(func(info *dispatcher.Info) { + info.MachineInfo = runner.MachineInfo + info.DetailedStatus = runner.QueryStatus + }) + } return nil } func (runner *Runner) ConnectionLoop() error { + if runner.updInfo != nil { + runner.updInfo(func(info *dispatcher.Info) { + info.Status = "executing" + }) + } + runner.mu.Lock() stopped := runner.stopped if !stopped { -- cgit mrf-deployment