From d59ba98314e02be939938f682fd67cd68bbb3b68 Mon Sep 17 00:00:00 2001 From: Taras Madan Date: Mon, 26 Sep 2022 09:23:11 +0200 Subject: vm: add the proxyapp support (#3269) * vm: add pool.Close() support * vm: add proxyapp client implementation * vm/proxyapp: autogenerate mocks * vm/proxyapp: add proxyapp tests * pkg/mgrconfig: add proxyapp type tests --- vm/proxyapp/init.go | 63 ++++ vm/proxyapp/mocks/ProxyAppInterface.go | 154 ++++++++++ vm/proxyapp/mocks/subProcessCmd.go | 126 ++++++++ vm/proxyapp/proxyappclient.go | 399 ++++++++++++++++++++++++ vm/proxyapp/proxyappclient_mocks_test.go | 45 +++ vm/proxyapp/proxyappclient_test.go | 503 +++++++++++++++++++++++++++++++ vm/proxyapp/proxyrpc/proxyrpc.go | 99 ++++++ 7 files changed, 1389 insertions(+) create mode 100644 vm/proxyapp/init.go create mode 100644 vm/proxyapp/mocks/ProxyAppInterface.go create mode 100644 vm/proxyapp/mocks/subProcessCmd.go create mode 100644 vm/proxyapp/proxyappclient.go create mode 100644 vm/proxyapp/proxyappclient_mocks_test.go create mode 100644 vm/proxyapp/proxyappclient_test.go create mode 100644 vm/proxyapp/proxyrpc/proxyrpc.go (limited to 'vm/proxyapp') diff --git a/vm/proxyapp/init.go b/vm/proxyapp/init.go new file mode 100644 index 000000000..17db527f9 --- /dev/null +++ b/vm/proxyapp/init.go @@ -0,0 +1,63 @@ +// Copyright 2022 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package proxyapp + +import ( + "context" + "encoding/json" + "fmt" + "io" + "time" + + "github.com/google/syzkaller/pkg/config" + "github.com/google/syzkaller/pkg/osutil" + "github.com/google/syzkaller/vm/vmimpl" +) + +func makeDefaultParams() *proxyAppParams { + return &proxyAppParams{ + CommandRunner: osutilCommandContext, + InitRetryDelay: 10 * time.Second, + } +} + +func init() { + vmimpl.Register( + "proxyapp", + func(env *vmimpl.Env) (vmimpl.Pool, error) { + return ctor(makeDefaultParams(), env) + }, + false) +} + +// Package configuration VARs are mostly needed for tests. +type proxyAppParams struct { + CommandRunner func(context.Context, string, ...string) subProcessCmd + InitRetryDelay time.Duration +} + +func osutilCommandContext(ctx context.Context, bin string, args ...string) subProcessCmd { + return osutil.CommandContext(ctx, bin, args...) +} + +type subProcessCmd interface { + StdinPipe() (io.WriteCloser, error) + StdoutPipe() (io.ReadCloser, error) + StderrPipe() (io.ReadCloser, error) + Start() error + Wait() error +} + +type Config struct { + Command string `json:"cmd"` + ProxyAppConfig json.RawMessage `json:"config"` +} + +func parseConfig(conf []byte) (*Config, error) { + vmCfg := new(Config) + if err := config.LoadData(conf, vmCfg); err != nil { + return nil, fmt.Errorf("failed to parseConfig(): %w", err) + } + return vmCfg, nil +} diff --git a/vm/proxyapp/mocks/ProxyAppInterface.go b/vm/proxyapp/mocks/ProxyAppInterface.go new file mode 100644 index 000000000..38f4f1418 --- /dev/null +++ b/vm/proxyapp/mocks/ProxyAppInterface.go @@ -0,0 +1,154 @@ +// Code generated by mockery v2.12.3. DO NOT EDIT. + +package mocks + +import ( + proxyrpc "github.com/google/syzkaller/vm/proxyapp/proxyrpc" + mock "github.com/stretchr/testify/mock" +) + +// ProxyAppInterface is an autogenerated mock type for the ProxyAppInterface type +type ProxyAppInterface struct { + mock.Mock +} + +// Close provides a mock function with given fields: in, out +func (_m *ProxyAppInterface) Close(in proxyrpc.CloseParams, out *proxyrpc.CloseReply) error { + ret := _m.Called(in, out) + + var r0 error + if rf, ok := ret.Get(0).(func(proxyrpc.CloseParams, *proxyrpc.CloseReply) error); ok { + r0 = rf(in, out) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Copy provides a mock function with given fields: in, out +func (_m *ProxyAppInterface) Copy(in proxyrpc.CopyParams, out *proxyrpc.CopyResult) error { + ret := _m.Called(in, out) + + var r0 error + if rf, ok := ret.Get(0).(func(proxyrpc.CopyParams, *proxyrpc.CopyResult) error); ok { + r0 = rf(in, out) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// CreateInstance provides a mock function with given fields: in, out +func (_m *ProxyAppInterface) CreateInstance(in proxyrpc.CreateInstanceParams, out *proxyrpc.CreateInstanceResult) error { + ret := _m.Called(in, out) + + var r0 error + if rf, ok := ret.Get(0).(func(proxyrpc.CreateInstanceParams, *proxyrpc.CreateInstanceResult) error); ok { + r0 = rf(in, out) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// CreatePool provides a mock function with given fields: in, out +func (_m *ProxyAppInterface) CreatePool(in proxyrpc.CreatePoolParams, out *proxyrpc.CreatePoolResult) error { + ret := _m.Called(in, out) + + var r0 error + if rf, ok := ret.Get(0).(func(proxyrpc.CreatePoolParams, *proxyrpc.CreatePoolResult) error); ok { + r0 = rf(in, out) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Diagnose provides a mock function with given fields: in, out +func (_m *ProxyAppInterface) Diagnose(in proxyrpc.DiagnoseParams, out *proxyrpc.DiagnoseReply) error { + ret := _m.Called(in, out) + + var r0 error + if rf, ok := ret.Get(0).(func(proxyrpc.DiagnoseParams, *proxyrpc.DiagnoseReply) error); ok { + r0 = rf(in, out) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Forward provides a mock function with given fields: in, out +func (_m *ProxyAppInterface) Forward(in proxyrpc.ForwardParams, out *proxyrpc.ForwardResult) error { + ret := _m.Called(in, out) + + var r0 error + if rf, ok := ret.Get(0).(func(proxyrpc.ForwardParams, *proxyrpc.ForwardResult) error); ok { + r0 = rf(in, out) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// RunReadProgress provides a mock function with given fields: in, out +func (_m *ProxyAppInterface) RunReadProgress(in proxyrpc.RunReadProgressParams, out *proxyrpc.RunReadProgressReply) error { + ret := _m.Called(in, out) + + var r0 error + if rf, ok := ret.Get(0).(func(proxyrpc.RunReadProgressParams, *proxyrpc.RunReadProgressReply) error); ok { + r0 = rf(in, out) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// RunStart provides a mock function with given fields: in, out +func (_m *ProxyAppInterface) RunStart(in proxyrpc.RunStartParams, out *proxyrpc.RunStartReply) error { + ret := _m.Called(in, out) + + var r0 error + if rf, ok := ret.Get(0).(func(proxyrpc.RunStartParams, *proxyrpc.RunStartReply) error); ok { + r0 = rf(in, out) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// RunStop provides a mock function with given fields: in, out +func (_m *ProxyAppInterface) RunStop(in proxyrpc.RunStopParams, out *proxyrpc.RunStopReply) error { + ret := _m.Called(in, out) + + var r0 error + if rf, ok := ret.Get(0).(func(proxyrpc.RunStopParams, *proxyrpc.RunStopReply) error); ok { + r0 = rf(in, out) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +type NewProxyAppInterfaceT interface { + mock.TestingT + Cleanup(func()) +} + +// NewProxyAppInterface creates a new instance of ProxyAppInterface. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewProxyAppInterface(t NewProxyAppInterfaceT) *ProxyAppInterface { + mock := &ProxyAppInterface{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/vm/proxyapp/mocks/subProcessCmd.go b/vm/proxyapp/mocks/subProcessCmd.go new file mode 100644 index 000000000..e39e02329 --- /dev/null +++ b/vm/proxyapp/mocks/subProcessCmd.go @@ -0,0 +1,126 @@ +// Code generated by mockery v2.12.3. DO NOT EDIT. + +package mocks + +import ( + io "io" + + mock "github.com/stretchr/testify/mock" +) + +// SubProcessCmd is an autogenerated mock type for the subProcessCmd type +type SubProcessCmd struct { + mock.Mock +} + +// Start provides a mock function with given fields: +func (_m *SubProcessCmd) Start() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// StderrPipe provides a mock function with given fields: +func (_m *SubProcessCmd) StderrPipe() (io.ReadCloser, error) { + ret := _m.Called() + + var r0 io.ReadCloser + if rf, ok := ret.Get(0).(func() io.ReadCloser); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(io.ReadCloser) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// StdinPipe provides a mock function with given fields: +func (_m *SubProcessCmd) StdinPipe() (io.WriteCloser, error) { + ret := _m.Called() + + var r0 io.WriteCloser + if rf, ok := ret.Get(0).(func() io.WriteCloser); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(io.WriteCloser) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// StdoutPipe provides a mock function with given fields: +func (_m *SubProcessCmd) StdoutPipe() (io.ReadCloser, error) { + ret := _m.Called() + + var r0 io.ReadCloser + if rf, ok := ret.Get(0).(func() io.ReadCloser); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(io.ReadCloser) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Wait provides a mock function with given fields: +func (_m *SubProcessCmd) Wait() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +type NewSubProcessCmdT interface { + mock.TestingT + Cleanup(func()) +} + +// NewSubProcessCmd creates a new instance of SubProcessCmd. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewSubProcessCmd(t NewSubProcessCmdT) *SubProcessCmd { + mock := &SubProcessCmd{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/vm/proxyapp/proxyappclient.go b/vm/proxyapp/proxyappclient.go new file mode 100644 index 000000000..d51039456 --- /dev/null +++ b/vm/proxyapp/proxyappclient.go @@ -0,0 +1,399 @@ +// Copyright 2022 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +// Package proxyapp package implements the experimental plugins support. +// We promise interface part will not be stable until documented. +package proxyapp + +import ( + "context" + "fmt" + "io" + "net/rpc" + "net/rpc/jsonrpc" + "os" + "sync" + "time" + + "github.com/google/syzkaller/pkg/log" + "github.com/google/syzkaller/pkg/report" + "github.com/google/syzkaller/vm/proxyapp/proxyrpc" + "github.com/google/syzkaller/vm/vmimpl" +) + +func ctor(params *proxyAppParams, env *vmimpl.Env) (vmimpl.Pool, error) { + subConfig, err := parseConfig(env.Config) + if err != nil { + return nil, fmt.Errorf("config parse error: %w", err) + } + + p := &pool{ + env: env, + close: make(chan bool, 1), + onClosed: make(chan error, 1), + } + + err = p.init(params, subConfig) + if err != nil { + return nil, fmt.Errorf("can't initialize pool: %w", err) + } + + go func() { + for { + select { + case <-p.close: + p.mu.Lock() + if p.proxy != nil { + p.proxy.terminate() + <-p.proxy.onTerminated + } + p.proxy = nil + p.onClosed <- nil + p.mu.Unlock() + return + case <-p.proxy.onTerminated: + } + p.mu.Lock() + time.Sleep(params.InitRetryDelay) + p.init(params, subConfig) + p.mu.Unlock() + } + }() + + return p, nil +} + +type pool struct { + mu sync.Mutex + env *vmimpl.Env + proxy *ProxyApp + count int + close chan bool + onClosed chan error +} + +func (p *pool) init(params *proxyAppParams, cfg *Config) error { + var err error + p.proxy, err = runProxyApp(params, cfg.Command) + if err != nil { + return fmt.Errorf("failed to run ProxyApp: %w", err) + } + + count, err := p.proxy.CreatePool(string(cfg.ProxyAppConfig), p.env.Debug) + if err != nil || count == 0 || (p.count != 0 && p.count != count) { + if err == nil { + err = fmt.Errorf("wrong pool size %v, prev was %v", count, p.count) + } + + p.proxy.terminate() + <-p.proxy.onTerminated + p.proxy = nil + + return fmt.Errorf("failed to construct pool: %w", err) + } + + if p.count == 0 { + p.count = count + } + return nil +} + +func (p *pool) Count() int { + return p.count +} + +func (p *pool) Create(workdir string, index int) (vmimpl.Instance, error) { + p.mu.Lock() + proxy := p.proxy + p.mu.Unlock() + + if proxy == nil { + return nil, fmt.Errorf("can't create instance using nil pool") + } + + return proxy.CreateInstance(workdir, index) +} + +// Close is not used now. Its support require wide code changes. +// TODO: support the pool cleanup on syz-manager level. +func (p *pool) Close() error { + close(p.close) + return <-p.onClosed +} + +type ProxyApp struct { + *rpc.Client + terminate context.CancelFunc + onTerminated chan bool +} + +func runProxyApp(params *proxyAppParams, cmd string) (*ProxyApp, error) { + ctx, cancelContext := context.WithCancel(context.Background()) + subProcess := params.CommandRunner(ctx, cmd) + var toClose []io.Closer + freeAll := func() { + for _, closer := range toClose { + closer.Close() + } + cancelContext() + } + + subStdout, err := subProcess.StdoutPipe() + if err != nil { + freeAll() + return nil, fmt.Errorf("failed to get stdoutpipe: %w", err) + } + toClose = append(toClose, subStdout) + + subStdin, err := subProcess.StdinPipe() + if err != nil { + freeAll() + return nil, fmt.Errorf("failed to get stdinpipe: %w", err) + } + toClose = append(toClose, subStdin) + + subprocessLogs, err := subProcess.StderrPipe() + if err != nil { + freeAll() + return nil, fmt.Errorf("failed to get stderrpipe: %w", err) + } + toClose = append(toClose, subprocessLogs) + + codec := jsonrpc.NewClientCodec(stdInOutCloser{ + io.NopCloser(subStdout), + subStdin, + }) + + if err := subProcess.Start(); err != nil { + freeAll() + return nil, fmt.Errorf("failed to start command %v: %w", cmd, err) + } + + onTerminated := make(chan bool, 1) + + go func() { + io.Copy(os.Stdout, subprocessLogs) + if err := subProcess.Wait(); err != nil { + log.Logf(0, "failed to Wait() subprocess: %v", err) + } + onTerminated <- true + }() + + return &ProxyApp{ + Client: rpc.NewClientWithCodec(codec), + terminate: cancelContext, + onTerminated: onTerminated, + }, nil +} + +func (proxy *ProxyApp) CreatePool(config string, debug bool) (int, error) { + var reply proxyrpc.CreatePoolResult + err := proxy.Call( + "ProxyVM.CreatePool", + proxyrpc.CreatePoolParams{ + Debug: debug, + Param: config, + }, + &reply) + if err != nil { + return 0, err + } + + return reply.Count, nil +} + +func (proxy *ProxyApp) CreateInstance(workdir string, index int) (vmimpl.Instance, error) { + var reply proxyrpc.CreateInstanceResult + err := proxy.Call( + "ProxyVM.CreateInstance", + proxyrpc.CreateInstanceParams{ + Workdir: workdir, + Index: index}, + &reply) + if err != nil { + return nil, fmt.Errorf("failed to proxy.Call(\"ProxyVM.CreateInstance\"): %w", err) + } + + return &instance{ + ProxyApp: proxy, + ID: reply.ID, + }, nil +} + +type instance struct { + *ProxyApp + ID string +} + +// Copy copies a hostSrc file into VM and returns file name in VM. +// nolint: dupl +func (inst *instance) Copy(hostSrc string) (string, error) { + var reply proxyrpc.CopyResult + err := inst.ProxyApp.Call( + "ProxyVM.Copy", + proxyrpc.CopyParams{ + ID: inst.ID, + HostSrc: hostSrc, + }, + &reply) + if err != nil { + return "", err + } + + return reply.VMFileName, nil +} + +// Forward sets up forwarding from within VM to the given tcp +// port on the host and returns the address to use in VM. +// nolint: dupl +func (inst *instance) Forward(port int) (string, error) { + var reply proxyrpc.ForwardResult + err := inst.ProxyApp.Call( + "ProxyVM.Forward", + proxyrpc.ForwardParams{ + ID: inst.ID, + Port: port, + }, + &reply) + if err != nil { + return "", err + } + return reply.ManagerAddress, nil +} + +func buildMerger(names ...string) (*vmimpl.OutputMerger, []io.Writer) { + var wPipes []io.Writer + merger := vmimpl.NewOutputMerger(nil) + for _, name := range names { + rpipe, wpipe := io.Pipe() + wPipes = append(wPipes, wpipe) + merger.Add(name, rpipe) + } + return merger, wPipes +} + +func (inst *instance) Run( + timeout time.Duration, + stop <-chan bool, + command string, +) (<-chan []byte, <-chan error, error) { + merger, wPipes := buildMerger("stdout", "stderr", "console") + receivedStdoutChunks := wPipes[0] + receivedStderrChunks := wPipes[1] + receivedConsoleChunks := wPipes[2] + outc := merger.Output + + var reply proxyrpc.RunStartReply + err := inst.ProxyApp.Call( + "ProxyVM.RunStart", + proxyrpc.RunStartParams{ + ID: inst.ID, + Command: command}, + &reply) + + if err != nil { + return nil, nil, fmt.Errorf("error calling ProxyVM.Run with command %v: %w", command, err) + } + + runID := reply.RunID + terminationError := make(chan error, 1) + timeoutSignal := time.After(timeout) + signalClientErrorf := clientErrorf(receivedStderrChunks) + + go func() { + for { + var progress proxyrpc.RunReadProgressReply + readProgressCall := inst.ProxyApp.Go( + "ProxyVM.RunReadProgress", + proxyrpc.RunReadProgressParams{ + ID: inst.ID, + RunID: runID, + }, + &progress, + nil) + select { + case <-readProgressCall.Done: + receivedStdoutChunks.Write([]byte(progress.StdoutChunk)) + receivedStderrChunks.Write([]byte(progress.StderrChunk)) + receivedConsoleChunks.Write([]byte(progress.ConsoleOutChunk)) + if readProgressCall.Error != nil { + signalClientErrorf("error reading progress from %v:%v: %v", + inst.ID, runID, readProgressCall.Error) + } else if progress.Error != "" { + signalClientErrorf("%v", progress.Error) + } else if progress.Finished { + terminationError <- nil + } else { + continue + } + case <-timeoutSignal: + // It is the happy path. + inst.runStop(runID) + terminationError <- vmimpl.ErrTimeout + case <-stop: + inst.runStop(runID) + terminationError <- vmimpl.ErrTimeout + } + break + } + }() + return outc, terminationError, nil +} + +func (inst *instance) runStop(runID string) { + err := inst.ProxyApp.Call( + "ProxyVM.RunStop", + proxyrpc.RunStopParams{ + ID: inst.ID, + RunID: runID, + }, + &proxyrpc.RunStopParams{}) + if err != nil { + log.Logf(0, "error calling runStop(%v) on %v: %v", runID, inst.ID, err) + } +} + +func (inst *instance) Diagnose(r *report.Report) (diagnosis []byte, wait bool) { + var title string + if r != nil { + title = r.Title + } + var reply proxyrpc.DiagnoseReply + err := inst.ProxyApp.Call( + "ProxyVM.Diagnose", + proxyrpc.DiagnoseParams{ + ID: inst.ID, + ReasonTitle: title, + }, + &reply) + if err != nil { + return nil, false + } + + return []byte(reply.Diagnosis), false +} + +func (inst *instance) Close() { + var reply proxyrpc.CloseReply + err := inst.ProxyApp.Call( + "ProxyVM.Close", + proxyrpc.CloseParams{ + ID: inst.ID, + }, + &reply) + if err != nil { + log.Logf(0, "error closing instance %v: %v", inst.ID, err) + } +} + +type stdInOutCloser struct { + io.ReadCloser + io.Writer +} + +func clientErrorf(writer io.Writer) func(fmt string, s ...interface{}) { + return func(f string, s ...interface{}) { + writer.Write([]byte(fmt.Sprintf(f, s...))) + writer.Write([]byte("\nSYZFAIL: proxy app plugin error\n")) + } +} diff --git a/vm/proxyapp/proxyappclient_mocks_test.go b/vm/proxyapp/proxyappclient_mocks_test.go new file mode 100644 index 000000000..149c37c0c --- /dev/null +++ b/vm/proxyapp/proxyappclient_mocks_test.go @@ -0,0 +1,45 @@ +// Copyright 2022 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package proxyapp + +//go:generate ../../tools/mockery.sh --name subProcessCmd --exported +//go:generate ../../tools/mockery.sh --name ProxyAppInterface -r + +import ( + "context" + "testing" + + "github.com/google/syzkaller/vm/proxyapp/mocks" + "github.com/google/syzkaller/vm/proxyapp/proxyrpc" +) + +var ( + _ subProcessCmd = &mocks.SubProcessCmd{} + _ proxyrpc.ProxyAppInterface = &mocks.ProxyAppInterface{} +) + +type mockCommandRunner struct { + *mocks.SubProcessCmd + ctx context.Context + onWaitCalled chan bool +} + +func makeMockCommandRunner(t *testing.T) (*mockCommandRunner, *proxyAppParams) { + cmdRunner := &mockCommandRunner{ + SubProcessCmd: mocks.NewSubProcessCmd(t), + onWaitCalled: make(chan bool, 1), + } + + params := makeTestParams() + params.CommandRunner = func(ctx context.Context, cmd string, params ...string) subProcessCmd { + cmdRunner.ctx = ctx + return cmdRunner + } + return cmdRunner, params +} + +func (cmd *mockCommandRunner) Wait() error { + cmd.onWaitCalled <- true + return cmd.SubProcessCmd.Wait() +} diff --git a/vm/proxyapp/proxyappclient_test.go b/vm/proxyapp/proxyappclient_test.go new file mode 100644 index 000000000..db9649672 --- /dev/null +++ b/vm/proxyapp/proxyappclient_test.go @@ -0,0 +1,503 @@ +// Copyright 2022 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package proxyapp + +import ( + "bytes" + "fmt" + "io" + "net/rpc" + "net/rpc/jsonrpc" + "strings" + "testing" + "time" + + "github.com/google/syzkaller/pkg/report" + "github.com/google/syzkaller/vm/proxyapp/mocks" + "github.com/google/syzkaller/vm/proxyapp/proxyrpc" + "github.com/google/syzkaller/vm/vmimpl" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +var testEnv = &vmimpl.Env{ + Config: []byte(` +{ + "cmd": "/path/to/proxyapp_binary", + "config": { + "internal_values": 123 + } + } +`)} + +func makeTestParams() *proxyAppParams { + return &proxyAppParams{ + CommandRunner: osutilCommandContext, + InitRetryDelay: 0, + } +} + +func makeMockProxyAppProcess(t *testing.T) ( + *mock.Mock, io.WriteCloser, io.ReadCloser, io.ReadCloser) { + rStdin, wStdin := io.Pipe() + rStdout, wStdout := io.Pipe() + rStderr, wStderr := io.Pipe() + wStderr.Close() + + server := rpc.NewServer() + handler := mocks.NewProxyAppInterface(t) + server.RegisterName("ProxyVM", struct{ proxyrpc.ProxyAppInterface }{handler}) + + go server.ServeCodec(jsonrpc.NewServerCodec(stdInOutCloser{ + rStdin, + wStdout, + })) + + return &handler.Mock, wStdin, rStdout, rStderr +} + +type nopWriteCloser struct { + io.Writer +} + +func (nopWriteCloser) Close() error { + return nil +} + +func TestCtor_Ok(t *testing.T) { + _, mCmdRunner, params := proxyAppServerFixture(t) + p, err := ctor(params, testEnv) + + assert.Nil(t, err) + assert.Equal(t, 2, p.Count()) + + <-mCmdRunner.onWaitCalled +} + +func TestCtor_ReadBadConfig(t *testing.T) { + pool, err := ctor(makeTestParams(), &vmimpl.Env{ + Config: []byte(`{"wrong_key": 1}`), + }) + assert.NotNil(t, err) + assert.Nil(t, pool) +} + +func TestCtor_FailedPipes(t *testing.T) { + mCmdRunner, params := makeMockCommandRunner(t) + mCmdRunner. + On("StdinPipe"). + Return(nil, fmt.Errorf("stdinpipe error")). + Once(). + On("StdinPipe"). + Return(nopWriteCloser{&bytes.Buffer{}}, nil). + On("StdoutPipe"). + Return(nil, fmt.Errorf("stdoutpipe error")). + Once(). + On("StdoutPipe"). + Return(io.NopCloser(strings.NewReader("")), nil). + On("StderrPipe"). + Return(nil, fmt.Errorf("stderrpipe error")) + + for i := 0; i < 3; i++ { + p, err := ctor(params, testEnv) + assert.NotNil(t, err) + assert.Nil(t, p) + } +} + +func TestClose_waitDone(t *testing.T) { + _, mCmdRunner, params := proxyAppServerFixture(t) + mCmdRunner. + On("waitDone"). + Return(nil) + + p, _ := ctor(params, testEnv) + p.(io.Closer).Close() +} + +func TestCtor_FailedStartProxyApp(t *testing.T) { + mCmdRunner, params := makeMockCommandRunner(t) + mCmdRunner. + On("StdinPipe"). + Return(nopWriteCloser{&bytes.Buffer{}}, nil). + On("StdoutPipe"). + Return(io.NopCloser(strings.NewReader("")), nil). + On("StderrPipe"). + Return(io.NopCloser(strings.NewReader("")), nil). + On("Start"). + Return(fmt.Errorf("failed to start program")) + + p, err := ctor(params, testEnv) + assert.NotNil(t, err) + assert.Nil(t, p) +} + +// TODO: reuse proxyAppServerFixture() code: func could be called here once Mock.Unset() error +// fixed https://github.com/stretchr/testify/issues/1236 +// nolint: dupl +func TestCtor_FailedConstructPool(t *testing.T) { + mProxyAppServer, stdin, stdout, stderr := + makeMockProxyAppProcess(t) + + mProxyAppServer. + On("CreatePool", mock.Anything, mock.Anything). + Return(fmt.Errorf("failed to construct pool")) + + mCmdRunner, params := makeMockCommandRunner(t) + mCmdRunner. + On("StdinPipe"). + Return(stdin, nil). + On("StdoutPipe"). + Return(stdout, nil). + On("StderrPipe"). + Return(stderr, nil). + On("Start"). + Return(nil). + On("Wait"). + Run(func(args mock.Arguments) { + <-mCmdRunner.ctx.Done() + }). + Return(nil) + + p, err := ctor(params, testEnv) + assert.NotNil(t, err) + assert.Nil(t, p) +} + +// TODO: to remove duplicate see TestCtor_FailedConstructPool() comment +// nolint: dupl +func proxyAppServerFixture(t *testing.T) (*mock.Mock, *mockCommandRunner, *proxyAppParams) { + mProxyAppServer, stdin, stdout, stderr := + makeMockProxyAppProcess(t) + + mProxyAppServer. + On("CreatePool", mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + out := args.Get(1).(*proxyrpc.CreatePoolResult) + out.Count = 2 + }). + Return(nil) + + mCmdRunner, params := makeMockCommandRunner(t) + mCmdRunner. + On("StdinPipe"). + Return(stdin, nil). + On("StdoutPipe"). + Return(stdout, nil). + On("StderrPipe"). + Return(stderr, nil). + On("Start"). + Return(nil). + On("Wait"). + Run(func(args mock.Arguments) { + <-mCmdRunner.ctx.Done() + mCmdRunner.MethodCalled("waitDone") + }). + Return(nil). + Maybe() + + return mProxyAppServer, mCmdRunner, params +} + +func poolFixture(t *testing.T) (*mock.Mock, *mockCommandRunner, vmimpl.Pool) { + mProxyAppServer, mCmdRunner, params := proxyAppServerFixture(t) + p, _ := ctor(params, testEnv) + return mProxyAppServer, mCmdRunner, p +} + +func TestPool_Create_Ok(t *testing.T) { + mockServer, _, p := poolFixture(t) + mockServer. + On("CreateInstance", mock.Anything, mock.Anything). + Return(nil) + + inst, err := p.Create("workdir", 0) + assert.NotNil(t, inst) + assert.Nil(t, err) +} + +func TestPool_Create_ProxyNilError(t *testing.T) { + _, mCmdRunner, p := poolFixture(t) + mCmdRunner. + On("waitDone"). + Return(nil) + + p.(io.Closer).Close() + + inst, err := p.Create("workdir", 0) + assert.Nil(t, inst) + assert.NotNil(t, err) +} + +func TestPool_Create_OutOfPoolError(t *testing.T) { + mockServer, _, p := poolFixture(t) + mockServer. + On("CreateInstance", mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + in := args.Get(0).(proxyrpc.CreateInstanceParams) + assert.Equal(t, p.Count(), in.Index) + }). + Return(fmt.Errorf("out of pool size")) + + inst, err := p.Create("workdir", p.Count()) + assert.Nil(t, inst) + assert.NotNil(t, err) +} + +func TestPool_Create_ProxyFailure(t *testing.T) { + mockServer, _, p := poolFixture(t) + mockServer. + On("CreateInstance", mock.Anything, mock.Anything). + Return(fmt.Errorf("create instance failure")) + + inst, err := p.Create("workdir", 0) + assert.Nil(t, inst) + assert.NotNil(t, err) +} + +// nolint: dupl +func createInstanceFixture(t *testing.T) (*mock.Mock, vmimpl.Instance) { + mockServer, _, p := poolFixture(t) + mockServer. + On("CreateInstance", mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + in := args.Get(0).(proxyrpc.CreateInstanceParams) + out := args.Get(1).(*proxyrpc.CreateInstanceResult) + out.ID = fmt.Sprintf("instance_id_%v", in.Index) + }). + Return(nil) + + inst, err := p.Create("workdir", 0) + assert.Nil(t, err) + assert.NotNil(t, inst) + + return mockServer, inst +} + +func TestInstance_Close(t *testing.T) { + mockInstance, inst := createInstanceFixture(t) + mockInstance. + On("Close", mock.Anything, mock.Anything). + Return(fmt.Errorf("mock error")) + + inst.Close() +} + +func TestInstance_Diagnose_Ok(t *testing.T) { + mockInstance, inst := createInstanceFixture(t) + mockInstance. + On("Diagnose", mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + out := args.Get(1).(*proxyrpc.DiagnoseReply) + out.Diagnosis = "diagnostic result" + }). + Return(nil) + + diagnosis, wait := inst.Diagnose(nil) + assert.NotNil(t, diagnosis) + assert.Equal(t, wait, false) + + diagnosis, wait = inst.Diagnose(&report.Report{}) + assert.NotNil(t, diagnosis) + assert.Equal(t, wait, false) +} + +func TestInstance_Diagnose_Failure(t *testing.T) { + mockInstance, inst := createInstanceFixture(t) + mockInstance. + On("Diagnose", mock.Anything, mock.Anything). + Return(fmt.Errorf("diagnose failed")) + + diagnosis, wait := inst.Diagnose(&report.Report{}) + assert.Nil(t, diagnosis) + assert.Equal(t, wait, false) +} + +func TestInstance_Copy_OK(t *testing.T) { + mockInstance, inst := createInstanceFixture(t) + mockInstance. + On("Copy", mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + out := args.Get(1).(*proxyrpc.CopyResult) + out.VMFileName = "remote_file_path" + }). + Return(nil) + + remotePath, err := inst.Copy("host/path") + assert.Nil(t, err) + assert.NotEmpty(t, remotePath) +} + +// nolint: dupl +func TestInstance_Copy_Failure(t *testing.T) { + mockInstance, inst := createInstanceFixture(t) + mockInstance. + On("Copy", mock.Anything, mock.Anything). + Return(fmt.Errorf("copy failure")) + + remotePath, err := inst.Copy("host/path") + assert.NotNil(t, err) + assert.Empty(t, remotePath) +} + +// nolint: dupl +func TestInstance_Forward_OK(t *testing.T) { + mockInstance, inst := createInstanceFixture(t) + mockInstance. + On("Forward", mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + in := args.Get(0).(proxyrpc.ForwardParams) + out := args.Get(1).(*proxyrpc.ForwardResult) + out.ManagerAddress = fmt.Sprintf("manager_address:%v", in.Port) + }). + Return(nil) + + remoteAddressToUse, err := inst.Forward(12345) + assert.Nil(t, err) + assert.Equal(t, "manager_address:12345", remoteAddressToUse) +} + +// nolint: dupl +func TestInstance_Forward_Failure(t *testing.T) { + mockInstance, inst := createInstanceFixture(t) + mockInstance. + On("Forward", mock.Anything, mock.Anything). + Return(fmt.Errorf("forward failure")) + + remoteAddressToUse, err := inst.Forward(12345) + assert.NotNil(t, err) + assert.Empty(t, remoteAddressToUse) +} + +func TestInstance_Run_SimpleOk(t *testing.T) { + mockInstance, inst := createInstanceFixture(t) + mockInstance. + On("RunStart", mock.Anything, mock.Anything). + Return(nil). + On("RunReadProgress", mock.Anything, mock.Anything). + Return(nil). + Maybe() + + outc, errc, err := inst.Run(10*time.Second, make(chan bool), "command") + assert.NotNil(t, outc) + assert.NotNil(t, errc) + assert.Nil(t, err) +} + +func TestInstance_Run_Failure(t *testing.T) { + mockInstance, inst := createInstanceFixture(t) + mockInstance. + On("RunStart", mock.Anything, mock.Anything). + Return(fmt.Errorf("run start error")) + + outc, errc, err := inst.Run(10*time.Second, make(chan bool), "command") + assert.Nil(t, outc) + assert.Nil(t, errc) + assert.NotEmpty(t, err) +} + +func TestInstance_Run_OnTimeout(t *testing.T) { + mockInstance, inst := createInstanceFixture(t) + mockInstance. + On("RunStart", mock.Anything, mock.Anything). + Return(nil). + On("RunReadProgress", mock.Anything, mock.Anything). + Return(nil).Maybe(). + On("RunStop", mock.Anything, mock.Anything). + Return(nil) + + _, errc, _ := inst.Run(time.Second, make(chan bool), "command") + err := <-errc + + assert.Equal(t, err, vmimpl.ErrTimeout) +} + +func TestInstance_Run_OnStop(t *testing.T) { + mockInstance, inst := createInstanceFixture(t) + mockInstance. + On("RunStart", mock.Anything, mock.Anything). + Return(nil). + On("RunReadProgress", mock.Anything, mock.Anything). + Return(nil). + Maybe(). + On("RunStop", mock.Anything, mock.Anything). + Return(nil) + + stop := make(chan bool) + _, errc, _ := inst.Run(10*time.Second, stop, "command") + stop <- true + err := <-errc + assert.Equal(t, err, vmimpl.ErrTimeout) +} + +func TestInstance_RunReadProgress_OnErrorReceived(t *testing.T) { + mockInstance, inst := createInstanceFixture(t) + mockInstance. + On("RunStart", mock.Anything, mock.Anything). + Return(nil). + On("RunReadProgress", mock.Anything, mock.Anything). + Return(nil). + Times(100). + On("RunReadProgress", mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + out := args.Get(1).(*proxyrpc.RunReadProgressReply) + out.Error = "mock error" + }). + Return(nil). + Once() + + outc, _, _ := inst.Run(10*time.Second, make(chan bool), "command") + output := string(<-outc) + + assert.Equal(t, "mock error\nSYZFAIL: proxy app plugin error\n", output) +} + +// nolint: dupl +func TestInstance_RunReadProgress_OnFinished(t *testing.T) { + mockInstance, inst := createInstanceFixture(t) + mockInstance. + On("RunStart", mock.Anything, mock.Anything). + Return(nil). + On("RunReadProgress", mock.Anything, mock.Anything). + Return(nil).Times(100). + On("RunReadProgress", mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + out := args.Get(1).(*proxyrpc.RunReadProgressReply) + out.Finished = true + }). + Return(nil). + Once() + + _, errc, _ := inst.Run(10*time.Second, make(chan bool), "command") + err := <-errc + + assert.Equal(t, err, nil) +} + +func TestInstance_RunReadProgress_Failed(t *testing.T) { + mockInstance, inst := createInstanceFixture(t) + mockInstance. + On("RunStart", mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + out := args.Get(1).(*proxyrpc.RunStartReply) + out.RunID = "test_run_id" + }). + Return(nil). + On("RunReadProgress", mock.Anything, mock.Anything). + Return(fmt.Errorf("runreadprogresserror")). + Once() + + outc, _, _ := inst.Run(10*time.Second, make(chan bool), "command") + output := string(<-outc) + + assert.Equal(t, + "error reading progress from instance_id_0:test_run_id: runreadprogresserror\nSYZFAIL: proxy app plugin error\n", + output, + ) +} + +// TODO: test for periodical proxyapp subprocess crashes handling. +// [option] check pool size was changed + +// TODO: test pool.Close() calls plugin API and return error. diff --git a/vm/proxyapp/proxyrpc/proxyrpc.go b/vm/proxyapp/proxyrpc/proxyrpc.go new file mode 100644 index 000000000..9e5cfdd24 --- /dev/null +++ b/vm/proxyapp/proxyrpc/proxyrpc.go @@ -0,0 +1,99 @@ +// Copyright 2022 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package proxyrpc + +// ProxyAppInterface is the interface you need to implement. +type ProxyAppInterface interface { + CreatePool(in CreatePoolParams, out *CreatePoolResult) error + CreateInstance(in CreateInstanceParams, out *CreateInstanceResult) error + Diagnose(in DiagnoseParams, out *DiagnoseReply) error + Copy(in CopyParams, out *CopyResult) error + Forward(in ForwardParams, out *ForwardResult) error + RunStart(in RunStartParams, out *RunStartReply) error + RunStop(in RunStopParams, out *RunStopReply) error + RunReadProgress(in RunReadProgressParams, out *RunReadProgressReply) error + Close(in CloseParams, out *CloseReply) error +} + +type CreatePoolParams struct { + Debug bool + Param string +} + +type CreatePoolResult struct { + Count int // signal the created pool size +} + +type CreateInstanceParams struct { + Workdir string + Index int +} + +type CreateInstanceResult struct { + ID string // allocated instance id +} + +type CopyParams struct { + ID string + HostSrc string +} + +type CopyResult struct { + VMFileName string +} + +type ForwardParams struct { + ID string + Port int +} + +type ForwardResult struct { + ManagerAddress string +} + +type RunStartParams struct { + ID string + Command string +} + +type RunStartReply struct { + RunID string +} + +type RunStopParams struct { + ID string + RunID string +} + +type RunStopReply struct { +} + +type RunReadProgressParams struct { + ID string + RunID string +} + +type RunReadProgressReply struct { + StdoutChunk string + StderrChunk string + ConsoleOutChunk string + Error string + Finished bool +} + +type CloseParams struct { + ID string +} + +type CloseReply struct { +} + +type DiagnoseParams struct { + ID string + ReasonTitle string +} + +type DiagnoseReply struct { + Diagnosis string +} -- cgit mrf-deployment