aboutsummaryrefslogtreecommitdiffstats
path: root/vm/proxyapp/proxyappclient.go
diff options
context:
space:
mode:
authorTaras Madan <tarasmadan@google.com>2022-10-24 16:00:06 +0200
committerTaras Madan <tarasmadan@google.com>2022-12-01 13:20:01 +0100
commit3b580c21132d00956c79b09377ed6e63a6e030e0 (patch)
tree7c6291021770df240c3eee63762414b8c6a84c15 /vm/proxyapp/proxyappclient.go
parentbfa3a27c476164f105c0d24c420b729234ddadca (diff)
vm/proxyapp: rpc over tcp
Diffstat (limited to 'vm/proxyapp/proxyappclient.go')
-rw-r--r--vm/proxyapp/proxyappclient.go119
1 files changed, 93 insertions, 26 deletions
diff --git a/vm/proxyapp/proxyappclient.go b/vm/proxyapp/proxyappclient.go
index e5f1fbcf4..733673d5f 100644
--- a/vm/proxyapp/proxyappclient.go
+++ b/vm/proxyapp/proxyappclient.go
@@ -38,7 +38,17 @@ func ctor(params *proxyAppParams, env *vmimpl.Env) (vmimpl.Pool, error) {
}
go func() {
+ var forceReinit <-chan time.Time
for {
+ var onTerminated chan bool
+ var onLostConnection chan bool
+ p.mu.Lock()
+ if p.proxy != nil {
+ onTerminated = p.proxy.onTerminated
+ onLostConnection = p.proxy.onLostConnection
+ }
+ p.mu.Unlock()
+
select {
case <-p.close:
p.mu.Lock()
@@ -47,11 +57,18 @@ func ctor(params *proxyAppParams, env *vmimpl.Env) (vmimpl.Pool, error) {
p.onClosed <- nil
p.mu.Unlock()
return
- case <-p.proxy.onTerminated:
+ case <-onTerminated:
+ case <-onLostConnection:
+ case <-forceReinit:
}
p.mu.Lock()
+ p.closeProxy()
time.Sleep(params.InitRetryDelay)
- p.init(params, subConfig)
+ forceReinit = nil
+ err := p.init(params, subConfig)
+ if err != nil {
+ forceReinit = time.After(100 * time.Millisecond)
+ }
p.mu.Unlock()
}
}()
@@ -69,12 +86,27 @@ type pool struct {
}
func (p *pool) init(params *proxyAppParams, cfg *Config) error {
+ usePipedRPC := cfg.RPCServerURI == ""
+ useTCPRPC := !usePipedRPC
var err error
- p.proxy, err = runProxyApp(params, cfg.Command)
+ if cfg.Command != "" {
+ p.proxy, err = runProxyApp(params, cfg.Command, usePipedRPC)
+ } else {
+ p.proxy = &ProxyApp{}
+ }
if err != nil {
return fmt.Errorf("failed to run ProxyApp: %w", err)
}
+ if useTCPRPC {
+ p.proxy.onLostConnection = make(chan bool, 1)
+ p.proxy.Client, err = initNetworkRPCClient(cfg.RPCServerURI)
+ if err != nil {
+ p.closeProxy()
+ return fmt.Errorf("failed to connect ProxyApp pipes: %w", err)
+ }
+ }
+
p.proxy.doLogPooling(params.LogOutput)
count, err := p.proxy.CreatePool(string(cfg.ProxyAppConfig), p.env.Debug)
@@ -134,13 +166,38 @@ func (p *pool) Close() error {
type ProxyApp struct {
*rpc.Client
- terminate context.CancelFunc
- onTerminated chan bool
- stopLogPooling chan bool
- logPoolingDone chan bool
+ terminate context.CancelFunc
+ onTerminated chan bool
+ onLostConnection chan bool
+ stopLogPooling chan bool
+ logPoolingDone chan bool
+}
+
+func initPipedRPCClient(cmd subProcessCmd) (*rpc.Client, []io.Closer, error) {
+ subStdout, err := cmd.StdoutPipe()
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to get stdoutpipe: %w", err)
+ }
+
+ subStdin, err := cmd.StdinPipe()
+ if err != nil {
+ subStdout.Close()
+ return nil, nil, fmt.Errorf("failed to get stdinpipe: %w", err)
+ }
+
+ return jsonrpc.NewClient(stdInOutCloser{
+ subStdout,
+ subStdin,
+ }),
+ []io.Closer{subStdin, subStdout},
+ nil
}
-func runProxyApp(params *proxyAppParams, cmd string) (*ProxyApp, error) {
+func initNetworkRPCClient(uri string) (*rpc.Client, error) {
+ return jsonrpc.Dial("tcp", uri)
+}
+
+func runProxyApp(params *proxyAppParams, cmd string, initRPClient bool) (*ProxyApp, error) {
ctx, cancelContext := context.WithCancel(context.Background())
subProcess := params.CommandRunner(ctx, cmd)
var toClose []io.Closer
@@ -151,19 +208,17 @@ func runProxyApp(params *proxyAppParams, cmd string) (*ProxyApp, error) {
cancelContext()
}
- subStdout, err := subProcess.StdoutPipe()
- if err != nil {
- freeAll()
- return nil, fmt.Errorf("failed to get stdoutpipe: %w", err)
- }
- toClose = append(toClose, subStdout)
-
- subStdin, err := subProcess.StdinPipe()
- if err != nil {
- freeAll()
- return nil, fmt.Errorf("failed to get stdinpipe: %w", err)
+ var client *rpc.Client
+ if initRPClient {
+ var err error
+ var resources []io.Closer
+ client, resources, err = initPipedRPCClient(subProcess)
+ if err != nil {
+ freeAll()
+ return nil, fmt.Errorf("failed to init piped client: %w", err)
+ }
+ toClose = append(toClose, resources...)
}
- toClose = append(toClose, subStdin)
subprocessLogs, err := subProcess.StderrPipe()
if err != nil {
@@ -172,11 +227,6 @@ func runProxyApp(params *proxyAppParams, cmd string) (*ProxyApp, error) {
}
toClose = append(toClose, subprocessLogs)
- codec := jsonrpc.NewClientCodec(stdInOutCloser{
- io.NopCloser(subStdout),
- subStdin,
- })
-
if err := subProcess.Start(); err != nil {
freeAll()
return nil, fmt.Errorf("failed to start command %v: %w", cmd, err)
@@ -193,12 +243,27 @@ func runProxyApp(params *proxyAppParams, cmd string) (*ProxyApp, error) {
}()
return &ProxyApp{
- Client: rpc.NewClientWithCodec(codec),
+ Client: client,
terminate: cancelContext,
onTerminated: onTerminated,
}, nil
}
+func (proxy *ProxyApp) signalLostConnection() {
+ select {
+ case proxy.onLostConnection <- true:
+ default:
+ }
+}
+
+func (proxy *ProxyApp) Call(serviceMethod string, args interface{}, reply interface{}) error {
+ err := proxy.Client.Call(serviceMethod, args, reply)
+ if err == rpc.ErrShutdown {
+ proxy.signalLostConnection()
+ }
+ return err
+}
+
func (proxy *ProxyApp) doLogPooling(writer io.Writer) {
proxy.stopLogPooling = make(chan bool, 1)
proxy.logPoolingDone = make(chan bool, 1)
@@ -220,7 +285,9 @@ func (proxy *ProxyApp) doLogPooling(writer io.Writer) {
// possible errors here are:
// "unexpected EOF"
// "read tcp 127.0.0.1:56886->127.0.0.1:34603: use of closed network connection"
+ // rpc.ErrShutdown
log.Logf(0, "error pooling ProxyApp logs: %v", c.Error)
+ proxy.signalLostConnection()
return
}
if log.V(reply.Verbosity) {