aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDmitry Vyukov <dvyukov@google.com>2018-06-26 13:52:36 +0200
committerDmitry Vyukov <dvyukov@google.com>2018-06-26 13:52:36 +0200
commit826b5aabc4b92d8e584b1ca47ad3b8fdc4a4d5ce (patch)
tree588696d5cf999d0a476e492bed66b4bceb92eb6f
parent3f9c2931992b8362c6a33be18d6cd6f831cc8623 (diff)
pkg/rpctype: compress rpc traffic
Both manager and fuzzer consume huge amount of memory (lots of gigs for manager) due to excessive caching in rpc connections. Compress traffic to reduce memory consumption.
-rw-r--r--pkg/rpctype/rpc.go56
1 files changed, 53 insertions, 3 deletions
diff --git a/pkg/rpctype/rpc.go b/pkg/rpctype/rpc.go
index d45d3d49d..6d9048077 100644
--- a/pkg/rpctype/rpc.go
+++ b/pkg/rpctype/rpc.go
@@ -4,7 +4,9 @@
package rpctype
import (
+ "compress/flate"
"fmt"
+ "io"
"net"
"net/rpc"
"os"
@@ -40,8 +42,8 @@ func (serv *RPCServer) Serve() {
continue
}
conn.(*net.TCPConn).SetKeepAlive(true)
- conn.(*net.TCPConn).SetKeepAlivePeriod(time.Minute)
- go serv.s.ServeConn(conn)
+ conn.(*net.TCPConn).SetKeepAlivePeriod(10 * time.Second)
+ go serv.s.ServeConn(newFlateConn(conn))
}
}
@@ -71,7 +73,7 @@ func NewRPCClient(addr string) (*RPCClient, error) {
}
cli := &RPCClient{
conn: conn,
- c: rpc.NewClient(conn),
+ c: rpc.NewClient(newFlateConn(conn)),
}
return cli, nil
}
@@ -95,3 +97,51 @@ func RPCCall(addr, method string, args, reply interface{}) error {
defer c.Close()
return c.Call(method, args, reply)
}
+
+// flateConn wraps net.Conn in flate.Reader/Writer for compressed traffic.
+type flateConn struct {
+ r io.ReadCloser
+ w *flate.Writer
+ c io.Closer
+}
+
+func newFlateConn(conn io.ReadWriteCloser) io.ReadWriteCloser {
+ w, err := flate.NewWriter(conn, 9)
+ if err != nil {
+ panic(err)
+ }
+ return &flateConn{
+ r: flate.NewReader(conn),
+ w: w,
+ c: conn,
+ }
+}
+
+func (fc *flateConn) Read(data []byte) (int, error) {
+ return fc.r.Read(data)
+}
+
+func (fc *flateConn) Write(data []byte) (int, error) {
+ n, err := fc.w.Write(data)
+ if err != nil {
+ return n, err
+ }
+ if err := fc.w.Flush(); err != nil {
+ return n, err
+ }
+ return n, nil
+}
+
+func (fc *flateConn) Close() error {
+ var err0 error
+ if err := fc.r.Close(); err != nil {
+ err0 = err
+ }
+ if err := fc.w.Close(); err != nil {
+ err0 = err
+ }
+ if err := fc.c.Close(); err != nil {
+ err0 = err
+ }
+ return err0
+}