aboutsummaryrefslogtreecommitdiffstats
path: root/pkg
diff options
context:
space:
mode:
authorDmitry Vyukov <dvyukov@google.com>2024-05-03 08:44:48 +0200
committerDmitry Vyukov <dvyukov@google.com>2024-05-03 15:57:46 +0000
commit610f2a54d02f8cf4f2454c03bf679b602e6e59b6 (patch)
treedd1768a4e516fe9fb6945613ebf009eda82317f6 /pkg
parent3a81775029176dd4c693542e6715b985fa7ade4d (diff)
pkg/rpctype: prepare for not using for target communication
Remove things that are only needed for target VM communication: conditional compression, timeout scaling, traffic stats. To minimize diffs when we switch target VM communication to flatrpc.
Diffstat (limited to 'pkg')
-rw-r--r--pkg/instance/instance.go8
-rw-r--r--pkg/rpctype/rpc.go94
2 files changed, 23 insertions, 79 deletions
diff --git a/pkg/instance/instance.go b/pkg/instance/instance.go
index 3e59209e2..d47c093ec 100644
--- a/pkg/instance/instance.go
+++ b/pkg/instance/instance.go
@@ -456,10 +456,9 @@ func (inst *inst) testRepro() ([]byte, error) {
}
type OptionalFuzzerArgs struct {
- Slowdown int
- SandboxArg int
- PprofPort int
- NetCompression bool
+ Slowdown int
+ SandboxArg int
+ PprofPort int
}
type FuzzerCmdArgs struct {
@@ -496,7 +495,6 @@ func FuzzerCmd(args *FuzzerCmdArgs) string {
{Name: "slowdown", Value: fmt.Sprint(args.Optional.Slowdown)},
{Name: "sandbox_arg", Value: fmt.Sprint(args.Optional.SandboxArg)},
{Name: "pprof_port", Value: fmt.Sprint(args.Optional.PprofPort)},
- {Name: "net_compression", Value: fmt.Sprint(args.Optional.NetCompression)},
}
optionalArg = " " + tool.OptionalFlags(flags)
}
diff --git a/pkg/rpctype/rpc.go b/pkg/rpctype/rpc.go
index d195e993d..f6b8d1eff 100644
--- a/pkg/rpctype/rpc.go
+++ b/pkg/rpctype/rpc.go
@@ -13,18 +13,14 @@ import (
"time"
"github.com/google/syzkaller/pkg/log"
- "github.com/google/syzkaller/pkg/stats"
)
type RPCServer struct {
- ln net.Listener
- s *rpc.Server
- useCompression bool
- statSent *stats.Val
- statRecv *stats.Val
+ ln net.Listener
+ s *rpc.Server
}
-func NewRPCServer(addr, name string, receiver interface{}, useCompression bool) (*RPCServer, error) {
+func NewRPCServer(addr, name string, receiver interface{}) (*RPCServer, error) {
ln, err := net.Listen("tcp", addr)
if err != nil {
return nil, fmt.Errorf("failed to listen on %v: %w", addr, err)
@@ -34,13 +30,8 @@ func NewRPCServer(addr, name string, receiver interface{}, useCompression bool)
return nil, err
}
serv := &RPCServer{
- ln: ln,
- s: s,
- useCompression: useCompression,
- statSent: stats.Create("go rpc sent", "Uncompressed outbound RPC traffic",
- stats.Graph("traffic"), stats.Rate{}, stats.FormatMB),
- statRecv: stats.Create("go rpc recv", "Uncompressed inbound RPC traffic",
- stats.Graph("traffic"), stats.Rate{}, stats.FormatMB),
+ ln: ln,
+ s: s,
}
return serv, nil
}
@@ -53,7 +44,7 @@ func (serv *RPCServer) Serve() {
continue
}
setupKeepAlive(conn, time.Minute)
- go serv.s.ServeConn(maybeFlateConn(newCountedConn(serv, conn), serv.useCompression))
+ go serv.s.ServeConn(newFlateConn(conn))
}
}
@@ -62,51 +53,35 @@ func (serv *RPCServer) Addr() net.Addr {
}
type RPCClient struct {
- conn net.Conn
- c *rpc.Client
- timeScale time.Duration
- useTimeouts bool
- useCompression bool
+ conn net.Conn
+ c *rpc.Client
}
-func Dial(addr string, timeScale time.Duration) (net.Conn, error) {
- if timeScale <= 0 {
- return nil, fmt.Errorf("bad rpc time scale %v", timeScale)
- }
+func NewRPCClient(addr string) (*RPCClient, error) {
var conn net.Conn
var err error
if addr == "stdin" {
// This is used by vm/gvisor which passes us a unix socket connection in stdin.
- return net.FileConn(os.Stdin)
- }
- if conn, err = net.DialTimeout("tcp", addr, time.Minute*timeScale); err != nil {
- return nil, err
+ // TODO: remove this once we switch to flatrpc for target communication.
+ conn, err = net.FileConn(os.Stdin)
+ } else {
+ conn, err = net.DialTimeout("tcp", addr, 3*time.Minute)
}
- setupKeepAlive(conn, time.Minute*timeScale)
- return conn, nil
-}
-
-func NewRPCClient(addr string, timeScale time.Duration, useTimeouts, useCompression bool) (*RPCClient, error) {
- conn, err := Dial(addr, timeScale)
if err != nil {
return nil, err
}
+ setupKeepAlive(conn, time.Minute)
cli := &RPCClient{
- conn: conn,
- c: rpc.NewClient(maybeFlateConn(conn, useCompression)),
- timeScale: timeScale,
- useTimeouts: useTimeouts,
- useCompression: useCompression,
+ conn: conn,
+ c: rpc.NewClient(newFlateConn(conn)),
}
return cli, nil
}
func (cli *RPCClient) Call(method string, args, reply interface{}) error {
- if cli.useTimeouts {
- // Note: SetDeadline is not implemented on fuchsia, so don't fail on error.
- cli.conn.SetDeadline(time.Now().Add(3 * time.Minute * cli.timeScale))
- defer cli.conn.SetDeadline(time.Time{})
- }
+ // Note: SetDeadline is not implemented on fuchsia, so don't fail on error.
+ cli.conn.SetDeadline(time.Now().Add(10 * time.Minute))
+ defer cli.conn.SetDeadline(time.Time{})
return cli.c.Call(method, args, reply)
}
@@ -130,10 +105,7 @@ type flateConn struct {
c io.Closer
}
-func maybeFlateConn(conn io.ReadWriteCloser, useCompression bool) io.ReadWriteCloser {
- if !useCompression {
- return conn
- }
+func newFlateConn(conn io.ReadWriteCloser) io.ReadWriteCloser {
w, err := flate.NewWriter(conn, 9)
if err != nil {
panic(err)
@@ -173,29 +145,3 @@ func (fc *flateConn) Close() error {
}
return err0
}
-
-// countedConn wraps net.Conn to record the transferred bytes.
-type countedConn struct {
- io.ReadWriteCloser
- server *RPCServer
-}
-
-func newCountedConn(server *RPCServer,
- conn io.ReadWriteCloser) io.ReadWriteCloser {
- return &countedConn{
- ReadWriteCloser: conn,
- server: server,
- }
-}
-
-func (cc countedConn) Read(p []byte) (n int, err error) {
- n, err = cc.ReadWriteCloser.Read(p)
- cc.server.statRecv.Add(n)
- return
-}
-
-func (cc countedConn) Write(b []byte) (n int, err error) {
- n, err = cc.ReadWriteCloser.Write(b)
- cc.server.statSent.Add(n)
- return
-}