aboutsummaryrefslogtreecommitdiffstats
path: root/pkg
diff options
context:
space:
mode:
authorDmitry Vyukov <dvyukov@google.com>2018-06-26 13:52:36 +0200
committerDmitry Vyukov <dvyukov@google.com>2018-06-26 13:52:36 +0200
commit826b5aabc4b92d8e584b1ca47ad3b8fdc4a4d5ce (patch)
tree588696d5cf999d0a476e492bed66b4bceb92eb6f /pkg
parent3f9c2931992b8362c6a33be18d6cd6f831cc8623 (diff)
pkg/rpctype: compress rpc traffic
Both manager and fuzzer consume huge amount of memory (lots of gigs for manager) due to excessive caching in rpc connections. Compress traffic to reduce memory consumption.
Diffstat (limited to 'pkg')
-rw-r--r--pkg/rpctype/rpc.go56
1 files changed, 53 insertions, 3 deletions
diff --git a/pkg/rpctype/rpc.go b/pkg/rpctype/rpc.go
index d45d3d49d..6d9048077 100644
--- a/pkg/rpctype/rpc.go
+++ b/pkg/rpctype/rpc.go
@@ -4,7 +4,9 @@
package rpctype
import (
+ "compress/flate"
"fmt"
+ "io"
"net"
"net/rpc"
"os"
@@ -40,8 +42,8 @@ func (serv *RPCServer) Serve() {
continue
}
conn.(*net.TCPConn).SetKeepAlive(true)
- conn.(*net.TCPConn).SetKeepAlivePeriod(time.Minute)
- go serv.s.ServeConn(conn)
+ conn.(*net.TCPConn).SetKeepAlivePeriod(10 * time.Second)
+ go serv.s.ServeConn(newFlateConn(conn))
}
}
@@ -71,7 +73,7 @@ func NewRPCClient(addr string) (*RPCClient, error) {
}
cli := &RPCClient{
conn: conn,
- c: rpc.NewClient(conn),
+ c: rpc.NewClient(newFlateConn(conn)),
}
return cli, nil
}
@@ -95,3 +97,51 @@ func RPCCall(addr, method string, args, reply interface{}) error {
defer c.Close()
return c.Call(method, args, reply)
}
+
+// flateConn wraps net.Conn in flate.Reader/Writer for compressed traffic.
+type flateConn struct {
+ r io.ReadCloser
+ w *flate.Writer
+ c io.Closer
+}
+
+func newFlateConn(conn io.ReadWriteCloser) io.ReadWriteCloser {
+ w, err := flate.NewWriter(conn, 9)
+ if err != nil {
+ panic(err)
+ }
+ return &flateConn{
+ r: flate.NewReader(conn),
+ w: w,
+ c: conn,
+ }
+}
+
+func (fc *flateConn) Read(data []byte) (int, error) {
+ return fc.r.Read(data)
+}
+
+func (fc *flateConn) Write(data []byte) (int, error) {
+ n, err := fc.w.Write(data)
+ if err != nil {
+ return n, err
+ }
+ if err := fc.w.Flush(); err != nil {
+ return n, err
+ }
+ return n, nil
+}
+
+func (fc *flateConn) Close() error {
+ var err0 error
+ if err := fc.r.Close(); err != nil {
+ err0 = err
+ }
+ if err := fc.w.Close(); err != nil {
+ err0 = err
+ }
+ if err := fc.c.Close(); err != nil {
+ err0 = err
+ }
+ return err0
+}