diff options
| author | Taras Madan <tarasmadan@google.com> | 2022-10-24 16:00:06 +0200 |
|---|---|---|
| committer | Taras Madan <tarasmadan@google.com> | 2022-12-01 13:20:01 +0100 |
| commit | 3b580c21132d00956c79b09377ed6e63a6e030e0 (patch) | |
| tree | 7c6291021770df240c3eee63762414b8c6a84c15 /vm/proxyapp/proxyappclient.go | |
| parent | bfa3a27c476164f105c0d24c420b729234ddadca (diff) | |
vm/proxyapp: rpc over tcp
Diffstat (limited to 'vm/proxyapp/proxyappclient.go')
| -rw-r--r-- | vm/proxyapp/proxyappclient.go | 119 |
1 files changed, 93 insertions, 26 deletions
diff --git a/vm/proxyapp/proxyappclient.go b/vm/proxyapp/proxyappclient.go index e5f1fbcf4..733673d5f 100644 --- a/vm/proxyapp/proxyappclient.go +++ b/vm/proxyapp/proxyappclient.go @@ -38,7 +38,17 @@ func ctor(params *proxyAppParams, env *vmimpl.Env) (vmimpl.Pool, error) { } go func() { + var forceReinit <-chan time.Time for { + var onTerminated chan bool + var onLostConnection chan bool + p.mu.Lock() + if p.proxy != nil { + onTerminated = p.proxy.onTerminated + onLostConnection = p.proxy.onLostConnection + } + p.mu.Unlock() + select { case <-p.close: p.mu.Lock() @@ -47,11 +57,18 @@ func ctor(params *proxyAppParams, env *vmimpl.Env) (vmimpl.Pool, error) { p.onClosed <- nil p.mu.Unlock() return - case <-p.proxy.onTerminated: + case <-onTerminated: + case <-onLostConnection: + case <-forceReinit: } p.mu.Lock() + p.closeProxy() time.Sleep(params.InitRetryDelay) - p.init(params, subConfig) + forceReinit = nil + err := p.init(params, subConfig) + if err != nil { + forceReinit = time.After(100 * time.Millisecond) + } p.mu.Unlock() } }() @@ -69,12 +86,27 @@ type pool struct { } func (p *pool) init(params *proxyAppParams, cfg *Config) error { + usePipedRPC := cfg.RPCServerURI == "" + useTCPRPC := !usePipedRPC var err error - p.proxy, err = runProxyApp(params, cfg.Command) + if cfg.Command != "" { + p.proxy, err = runProxyApp(params, cfg.Command, usePipedRPC) + } else { + p.proxy = &ProxyApp{} + } if err != nil { return fmt.Errorf("failed to run ProxyApp: %w", err) } + if useTCPRPC { + p.proxy.onLostConnection = make(chan bool, 1) + p.proxy.Client, err = initNetworkRPCClient(cfg.RPCServerURI) + if err != nil { + p.closeProxy() + return fmt.Errorf("failed to connect ProxyApp pipes: %w", err) + } + } + p.proxy.doLogPooling(params.LogOutput) count, err := p.proxy.CreatePool(string(cfg.ProxyAppConfig), p.env.Debug) @@ -134,13 +166,38 @@ func (p *pool) Close() error { type ProxyApp struct { *rpc.Client - terminate context.CancelFunc - onTerminated chan bool - stopLogPooling chan bool - logPoolingDone chan bool + terminate context.CancelFunc + onTerminated chan bool + onLostConnection chan bool + stopLogPooling chan bool + logPoolingDone chan bool +} + +func initPipedRPCClient(cmd subProcessCmd) (*rpc.Client, []io.Closer, error) { + subStdout, err := cmd.StdoutPipe() + if err != nil { + return nil, nil, fmt.Errorf("failed to get stdoutpipe: %w", err) + } + + subStdin, err := cmd.StdinPipe() + if err != nil { + subStdout.Close() + return nil, nil, fmt.Errorf("failed to get stdinpipe: %w", err) + } + + return jsonrpc.NewClient(stdInOutCloser{ + subStdout, + subStdin, + }), + []io.Closer{subStdin, subStdout}, + nil } -func runProxyApp(params *proxyAppParams, cmd string) (*ProxyApp, error) { +func initNetworkRPCClient(uri string) (*rpc.Client, error) { + return jsonrpc.Dial("tcp", uri) +} + +func runProxyApp(params *proxyAppParams, cmd string, initRPClient bool) (*ProxyApp, error) { ctx, cancelContext := context.WithCancel(context.Background()) subProcess := params.CommandRunner(ctx, cmd) var toClose []io.Closer @@ -151,19 +208,17 @@ func runProxyApp(params *proxyAppParams, cmd string) (*ProxyApp, error) { cancelContext() } - subStdout, err := subProcess.StdoutPipe() - if err != nil { - freeAll() - return nil, fmt.Errorf("failed to get stdoutpipe: %w", err) - } - toClose = append(toClose, subStdout) - - subStdin, err := subProcess.StdinPipe() - if err != nil { - freeAll() - return nil, fmt.Errorf("failed to get stdinpipe: %w", err) + var client *rpc.Client + if initRPClient { + var err error + var resources []io.Closer + client, resources, err = initPipedRPCClient(subProcess) + if err != nil { + freeAll() + return nil, fmt.Errorf("failed to init piped client: %w", err) + } + toClose = append(toClose, resources...) } - toClose = append(toClose, subStdin) subprocessLogs, err := subProcess.StderrPipe() if err != nil { @@ -172,11 +227,6 @@ func runProxyApp(params *proxyAppParams, cmd string) (*ProxyApp, error) { } toClose = append(toClose, subprocessLogs) - codec := jsonrpc.NewClientCodec(stdInOutCloser{ - io.NopCloser(subStdout), - subStdin, - }) - if err := subProcess.Start(); err != nil { freeAll() return nil, fmt.Errorf("failed to start command %v: %w", cmd, err) @@ -193,12 +243,27 @@ func runProxyApp(params *proxyAppParams, cmd string) (*ProxyApp, error) { }() return &ProxyApp{ - Client: rpc.NewClientWithCodec(codec), + Client: client, terminate: cancelContext, onTerminated: onTerminated, }, nil } +func (proxy *ProxyApp) signalLostConnection() { + select { + case proxy.onLostConnection <- true: + default: + } +} + +func (proxy *ProxyApp) Call(serviceMethod string, args interface{}, reply interface{}) error { + err := proxy.Client.Call(serviceMethod, args, reply) + if err == rpc.ErrShutdown { + proxy.signalLostConnection() + } + return err +} + func (proxy *ProxyApp) doLogPooling(writer io.Writer) { proxy.stopLogPooling = make(chan bool, 1) proxy.logPoolingDone = make(chan bool, 1) @@ -220,7 +285,9 @@ func (proxy *ProxyApp) doLogPooling(writer io.Writer) { // possible errors here are: // "unexpected EOF" // "read tcp 127.0.0.1:56886->127.0.0.1:34603: use of closed network connection" + // rpc.ErrShutdown log.Logf(0, "error pooling ProxyApp logs: %v", c.Error) + proxy.signalLostConnection() return } if log.V(reply.Verbosity) { |
