From 826b5aabc4b92d8e584b1ca47ad3b8fdc4a4d5ce Mon Sep 17 00:00:00 2001 From: Dmitry Vyukov Date: Tue, 26 Jun 2018 13:52:36 +0200 Subject: pkg/rpctype: compress rpc traffic Both manager and fuzzer consume huge amount of memory (lots of gigs for manager) due to excessive caching in rpc connections. Compress traffic to reduce memory consumption. --- pkg/rpctype/rpc.go | 56 +++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 53 insertions(+), 3 deletions(-) (limited to 'pkg') diff --git a/pkg/rpctype/rpc.go b/pkg/rpctype/rpc.go index d45d3d49d..6d9048077 100644 --- a/pkg/rpctype/rpc.go +++ b/pkg/rpctype/rpc.go @@ -4,7 +4,9 @@ package rpctype import ( + "compress/flate" "fmt" + "io" "net" "net/rpc" "os" @@ -40,8 +42,8 @@ func (serv *RPCServer) Serve() { continue } conn.(*net.TCPConn).SetKeepAlive(true) - conn.(*net.TCPConn).SetKeepAlivePeriod(time.Minute) - go serv.s.ServeConn(conn) + conn.(*net.TCPConn).SetKeepAlivePeriod(10 * time.Second) + go serv.s.ServeConn(newFlateConn(conn)) } } @@ -71,7 +73,7 @@ func NewRPCClient(addr string) (*RPCClient, error) { } cli := &RPCClient{ conn: conn, - c: rpc.NewClient(conn), + c: rpc.NewClient(newFlateConn(conn)), } return cli, nil } @@ -95,3 +97,51 @@ func RPCCall(addr, method string, args, reply interface{}) error { defer c.Close() return c.Call(method, args, reply) } + +// flateConn wraps net.Conn in flate.Reader/Writer for compressed traffic. +type flateConn struct { + r io.ReadCloser + w *flate.Writer + c io.Closer +} + +func newFlateConn(conn io.ReadWriteCloser) io.ReadWriteCloser { + w, err := flate.NewWriter(conn, 9) + if err != nil { + panic(err) + } + return &flateConn{ + r: flate.NewReader(conn), + w: w, + c: conn, + } +} + +func (fc *flateConn) Read(data []byte) (int, error) { + return fc.r.Read(data) +} + +func (fc *flateConn) Write(data []byte) (int, error) { + n, err := fc.w.Write(data) + if err != nil { + return n, err + } + if err := fc.w.Flush(); err != nil { + return n, err + } + return n, nil +} + +func (fc *flateConn) Close() error { + var err0 error + if err := fc.r.Close(); err != nil { + err0 = err + } + if err := fc.w.Close(); err != nil { + err0 = err + } + if err := fc.c.Close(); err != nil { + err0 = err + } + return err0 +} -- cgit mrf-deployment