diff options
| author | Dmitry Vyukov <dvyukov@google.com> | 2024-05-03 08:44:48 +0200 |
|---|---|---|
| committer | Dmitry Vyukov <dvyukov@google.com> | 2024-05-03 15:57:46 +0000 |
| commit | 610f2a54d02f8cf4f2454c03bf679b602e6e59b6 (patch) | |
| tree | dd1768a4e516fe9fb6945613ebf009eda82317f6 /pkg | |
| parent | 3a81775029176dd4c693542e6715b985fa7ade4d (diff) | |
pkg/rpctype: prepare for not using for target communication
Remove things that are only needed for target VM communication:
conditional compression, timeout scaling, traffic stats.
To minimize diffs when we switch target VM communication to flatrpc.
Diffstat (limited to 'pkg')
| -rw-r--r-- | pkg/instance/instance.go | 8 | ||||
| -rw-r--r-- | pkg/rpctype/rpc.go | 94 |
2 files changed, 23 insertions, 79 deletions
diff --git a/pkg/instance/instance.go b/pkg/instance/instance.go index 3e59209e2..d47c093ec 100644 --- a/pkg/instance/instance.go +++ b/pkg/instance/instance.go @@ -456,10 +456,9 @@ func (inst *inst) testRepro() ([]byte, error) { } type OptionalFuzzerArgs struct { - Slowdown int - SandboxArg int - PprofPort int - NetCompression bool + Slowdown int + SandboxArg int + PprofPort int } type FuzzerCmdArgs struct { @@ -496,7 +495,6 @@ func FuzzerCmd(args *FuzzerCmdArgs) string { {Name: "slowdown", Value: fmt.Sprint(args.Optional.Slowdown)}, {Name: "sandbox_arg", Value: fmt.Sprint(args.Optional.SandboxArg)}, {Name: "pprof_port", Value: fmt.Sprint(args.Optional.PprofPort)}, - {Name: "net_compression", Value: fmt.Sprint(args.Optional.NetCompression)}, } optionalArg = " " + tool.OptionalFlags(flags) } diff --git a/pkg/rpctype/rpc.go b/pkg/rpctype/rpc.go index d195e993d..f6b8d1eff 100644 --- a/pkg/rpctype/rpc.go +++ b/pkg/rpctype/rpc.go @@ -13,18 +13,14 @@ import ( "time" "github.com/google/syzkaller/pkg/log" - "github.com/google/syzkaller/pkg/stats" ) type RPCServer struct { - ln net.Listener - s *rpc.Server - useCompression bool - statSent *stats.Val - statRecv *stats.Val + ln net.Listener + s *rpc.Server } -func NewRPCServer(addr, name string, receiver interface{}, useCompression bool) (*RPCServer, error) { +func NewRPCServer(addr, name string, receiver interface{}) (*RPCServer, error) { ln, err := net.Listen("tcp", addr) if err != nil { return nil, fmt.Errorf("failed to listen on %v: %w", addr, err) @@ -34,13 +30,8 @@ func NewRPCServer(addr, name string, receiver interface{}, useCompression bool) return nil, err } serv := &RPCServer{ - ln: ln, - s: s, - useCompression: useCompression, - statSent: stats.Create("go rpc sent", "Uncompressed outbound RPC traffic", - stats.Graph("traffic"), stats.Rate{}, stats.FormatMB), - statRecv: stats.Create("go rpc recv", "Uncompressed inbound RPC traffic", - stats.Graph("traffic"), stats.Rate{}, stats.FormatMB), + ln: ln, + s: s, } return serv, nil } @@ -53,7 +44,7 @@ func (serv *RPCServer) Serve() { continue } setupKeepAlive(conn, time.Minute) - go serv.s.ServeConn(maybeFlateConn(newCountedConn(serv, conn), serv.useCompression)) + go serv.s.ServeConn(newFlateConn(conn)) } } @@ -62,51 +53,35 @@ func (serv *RPCServer) Addr() net.Addr { } type RPCClient struct { - conn net.Conn - c *rpc.Client - timeScale time.Duration - useTimeouts bool - useCompression bool + conn net.Conn + c *rpc.Client } -func Dial(addr string, timeScale time.Duration) (net.Conn, error) { - if timeScale <= 0 { - return nil, fmt.Errorf("bad rpc time scale %v", timeScale) - } +func NewRPCClient(addr string) (*RPCClient, error) { var conn net.Conn var err error if addr == "stdin" { // This is used by vm/gvisor which passes us a unix socket connection in stdin. - return net.FileConn(os.Stdin) - } - if conn, err = net.DialTimeout("tcp", addr, time.Minute*timeScale); err != nil { - return nil, err + // TODO: remove this once we switch to flatrpc for target communication. + conn, err = net.FileConn(os.Stdin) + } else { + conn, err = net.DialTimeout("tcp", addr, 3*time.Minute) } - setupKeepAlive(conn, time.Minute*timeScale) - return conn, nil -} - -func NewRPCClient(addr string, timeScale time.Duration, useTimeouts, useCompression bool) (*RPCClient, error) { - conn, err := Dial(addr, timeScale) if err != nil { return nil, err } + setupKeepAlive(conn, time.Minute) cli := &RPCClient{ - conn: conn, - c: rpc.NewClient(maybeFlateConn(conn, useCompression)), - timeScale: timeScale, - useTimeouts: useTimeouts, - useCompression: useCompression, + conn: conn, + c: rpc.NewClient(newFlateConn(conn)), } return cli, nil } func (cli *RPCClient) Call(method string, args, reply interface{}) error { - if cli.useTimeouts { - // Note: SetDeadline is not implemented on fuchsia, so don't fail on error. - cli.conn.SetDeadline(time.Now().Add(3 * time.Minute * cli.timeScale)) - defer cli.conn.SetDeadline(time.Time{}) - } + // Note: SetDeadline is not implemented on fuchsia, so don't fail on error. + cli.conn.SetDeadline(time.Now().Add(10 * time.Minute)) + defer cli.conn.SetDeadline(time.Time{}) return cli.c.Call(method, args, reply) } @@ -130,10 +105,7 @@ type flateConn struct { c io.Closer } -func maybeFlateConn(conn io.ReadWriteCloser, useCompression bool) io.ReadWriteCloser { - if !useCompression { - return conn - } +func newFlateConn(conn io.ReadWriteCloser) io.ReadWriteCloser { w, err := flate.NewWriter(conn, 9) if err != nil { panic(err) @@ -173,29 +145,3 @@ func (fc *flateConn) Close() error { } return err0 } - -// countedConn wraps net.Conn to record the transferred bytes. -type countedConn struct { - io.ReadWriteCloser - server *RPCServer -} - -func newCountedConn(server *RPCServer, - conn io.ReadWriteCloser) io.ReadWriteCloser { - return &countedConn{ - ReadWriteCloser: conn, - server: server, - } -} - -func (cc countedConn) Read(p []byte) (n int, err error) { - n, err = cc.ReadWriteCloser.Read(p) - cc.server.statRecv.Add(n) - return -} - -func (cc countedConn) Write(b []byte) (n int, err error) { - n, err = cc.ReadWriteCloser.Write(b) - cc.server.statSent.Add(n) - return -} |
