diff options
| author | Dmitry Vyukov <dvyukov@google.com> | 2024-03-27 14:36:01 +0100 |
|---|---|---|
| committer | Dmitry Vyukov <dvyukov@google.com> | 2024-04-03 11:26:05 +0000 |
| commit | fed899ed4a625d124a881a2da67430be5d15325c (patch) | |
| tree | 9138f5c2f506b22efb1a84af5a1d21063f9b3dfd | |
| parent | afbcc4a93d840f52b2579530d638654a4a1e5447 (diff) | |
pkg/rpctype: make RPC compression optional
RPC compression take up to 10% of CPU time in profiles,
but it's unlikely to be beneficial for local VM runs
(we are mostly copying memory in this case).
Enable RPC compression based on the VM type
(local VM don't use it, remove machines use it).
| -rw-r--r-- | pkg/instance/instance.go | 12 | ||||
| -rw-r--r-- | pkg/rpctype/rpc.go | 47 | ||||
| -rw-r--r-- | syz-fuzzer/fuzzer.go | 23 | ||||
| -rw-r--r-- | syz-hub/hub.go | 2 | ||||
| -rw-r--r-- | syz-manager/hub.go | 10 | ||||
| -rw-r--r-- | syz-manager/manager.go | 13 | ||||
| -rw-r--r-- | syz-manager/rpc.go | 2 | ||||
| -rw-r--r-- | syz-runner/runner.go | 2 | ||||
| -rw-r--r-- | syz-verifier/rpcserver.go | 2 | ||||
| -rw-r--r-- | tools/syz-hubtool/hubtool.go | 2 | ||||
| -rw-r--r-- | tools/syz-runtest/runtest.go | 2 | ||||
| -rw-r--r-- | vm/adb/adb.go | 2 | ||||
| -rw-r--r-- | vm/bhyve/bhyve.go | 2 | ||||
| -rw-r--r-- | vm/cuttlefish/cuttlefish.go | 2 | ||||
| -rw-r--r-- | vm/gce/gce.go | 2 | ||||
| -rw-r--r-- | vm/gvisor/gvisor.go | 2 | ||||
| -rwxr-xr-x | vm/isolated/isolated.go | 2 | ||||
| -rw-r--r-- | vm/kvm/kvm.go | 2 | ||||
| -rw-r--r-- | vm/proxyapp/init.go | 2 | ||||
| -rw-r--r-- | vm/qemu/qemu.go | 2 | ||||
| -rw-r--r-- | vm/starnix/starnix.go | 2 | ||||
| -rw-r--r-- | vm/vm.go | 6 | ||||
| -rw-r--r-- | vm/vm_test.go | 2 | ||||
| -rw-r--r-- | vm/vmimpl/vmimpl.go | 12 | ||||
| -rw-r--r-- | vm/vmm/vmm.go | 2 | ||||
| -rw-r--r-- | vm/vmware/vmware.go | 2 |
26 files changed, 90 insertions, 71 deletions
diff --git a/pkg/instance/instance.go b/pkg/instance/instance.go index 7b4ca5c71..1e4787c94 100644 --- a/pkg/instance/instance.go +++ b/pkg/instance/instance.go @@ -456,11 +456,12 @@ func (inst *inst) testRepro() ([]byte, error) { } type OptionalFuzzerArgs struct { - Slowdown int - RawCover bool - SandboxArg int - PprofPort int - ResetAccState bool + Slowdown int + RawCover bool + SandboxArg int + PprofPort int + ResetAccState bool + NetCompression bool } type FuzzerCmdArgs struct { @@ -504,6 +505,7 @@ func FuzzerCmd(args *FuzzerCmdArgs) string { {Name: "sandbox_arg", Value: fmt.Sprint(args.Optional.SandboxArg)}, {Name: "pprof_port", Value: fmt.Sprint(args.Optional.PprofPort)}, {Name: "reset_acc_state", Value: fmt.Sprint(args.Optional.ResetAccState)}, + {Name: "net_compression", Value: fmt.Sprint(args.Optional.NetCompression)}, } optionalArg = " " + tool.OptionalFlags(flags) } diff --git a/pkg/rpctype/rpc.go b/pkg/rpctype/rpc.go index 4bd0a17d0..01f0f1b02 100644 --- a/pkg/rpctype/rpc.go +++ b/pkg/rpctype/rpc.go @@ -17,12 +17,14 @@ import ( ) type RPCServer struct { - ln net.Listener - s *rpc.Server - TotalBytes atomic.Uint64 + TotalBytes atomic.Uint64 // total compressed bytes transmitted + + ln net.Listener + s *rpc.Server + useCompression bool } -func NewRPCServer(addr, name string, receiver interface{}) (*RPCServer, error) { +func NewRPCServer(addr, name string, receiver interface{}, useCompression bool) (*RPCServer, error) { ln, err := net.Listen("tcp", addr) if err != nil { return nil, fmt.Errorf("failed to listen on %v: %w", addr, err) @@ -32,8 +34,9 @@ func NewRPCServer(addr, name string, receiver interface{}) (*RPCServer, error) { return nil, err } serv := &RPCServer{ - ln: ln, - s: s, + ln: ln, + s: s, + useCompression: useCompression, } return serv, nil } @@ -46,7 +49,7 @@ func (serv *RPCServer) Serve() { continue } setupKeepAlive(conn, time.Minute) - go serv.s.ServeConn(newFlateConn(newCountedConn(serv, conn))) + go serv.s.ServeConn(maybeFlateConn(newCountedConn(serv, conn), serv.useCompression)) } } @@ -55,9 +58,10 @@ func (serv *RPCServer) Addr() net.Addr { } type RPCClient struct { - conn net.Conn - c *rpc.Client - timeScale time.Duration + conn net.Conn + c *rpc.Client + timeScale time.Duration + useCompression bool } func Dial(addr string, timeScale time.Duration) (net.Conn, error) { @@ -77,15 +81,16 @@ func Dial(addr string, timeScale time.Duration) (net.Conn, error) { return conn, nil } -func NewRPCClient(addr string, timeScale time.Duration) (*RPCClient, error) { +func NewRPCClient(addr string, timeScale time.Duration, useCompression bool) (*RPCClient, error) { conn, err := Dial(addr, timeScale) if err != nil { return nil, err } cli := &RPCClient{ - conn: conn, - c: rpc.NewClient(newFlateConn(conn)), - timeScale: timeScale, + conn: conn, + c: rpc.NewClient(maybeFlateConn(conn, useCompression)), + timeScale: timeScale, + useCompression: useCompression, } return cli, nil } @@ -101,15 +106,6 @@ func (cli *RPCClient) Close() { cli.c.Close() } -func RPCCall(addr string, timeScale time.Duration, method string, args, reply interface{}) error { - c, err := NewRPCClient(addr, timeScale) - if err != nil { - return err - } - defer c.Close() - return c.Call(method, args, reply) -} - func setupKeepAlive(conn net.Conn, keepAlive time.Duration) { conn.(*net.TCPConn).SetKeepAlive(true) conn.(*net.TCPConn).SetKeepAlivePeriod(keepAlive) @@ -122,7 +118,10 @@ type flateConn struct { c io.Closer } -func newFlateConn(conn io.ReadWriteCloser) io.ReadWriteCloser { +func maybeFlateConn(conn io.ReadWriteCloser, useCompression bool) io.ReadWriteCloser { + if !useCompression { + return conn + } w, err := flate.NewWriter(conn, 9) if err != nil { panic(err) diff --git a/syz-fuzzer/fuzzer.go b/syz-fuzzer/fuzzer.go index fb7f57b30..4448718a1 100644 --- a/syz-fuzzer/fuzzer.go +++ b/syz-fuzzer/fuzzer.go @@ -118,16 +118,17 @@ func main() { debug.SetGCPercent(50) var ( - flagName = flag.String("name", "test", "unique name for manager") - flagOS = flag.String("os", runtime.GOOS, "target OS") - flagArch = flag.String("arch", runtime.GOARCH, "target arch") - flagManager = flag.String("manager", "", "manager rpc address") - flagProcs = flag.Int("procs", 1, "number of parallel test processes") - flagOutput = flag.String("output", "stdout", "write programs to none/stdout/dmesg/file") - flagTest = flag.Bool("test", false, "enable image testing mode") // used by syz-ci - flagRunTest = flag.Bool("runtest", false, "enable program testing mode") // used by pkg/runtest - flagRawCover = flag.Bool("raw_cover", false, "fetch raw coverage") - flagPprofPort = flag.Int("pprof_port", 0, "HTTP port for the pprof endpoint (disabled if 0)") + flagName = flag.String("name", "test", "unique name for manager") + flagOS = flag.String("os", runtime.GOOS, "target OS") + flagArch = flag.String("arch", runtime.GOARCH, "target arch") + flagManager = flag.String("manager", "", "manager rpc address") + flagProcs = flag.Int("procs", 1, "number of parallel test processes") + flagOutput = flag.String("output", "stdout", "write programs to none/stdout/dmesg/file") + flagTest = flag.Bool("test", false, "enable image testing mode") // used by syz-ci + flagRunTest = flag.Bool("runtest", false, "enable program testing mode") // used by pkg/runtest + flagRawCover = flag.Bool("raw_cover", false, "fetch raw coverage") + flagPprofPort = flag.Int("pprof_port", 0, "HTTP port for the pprof endpoint (disabled if 0)") + flagNetCompression = flag.Bool("net_compression", false, "use network compression for RPC calls") // Experimental flags. flagResetAccState = flag.Bool("reset_acc_state", false, "restarts executor before most executions") @@ -179,7 +180,7 @@ func main() { machineInfo, modules := collectMachineInfos(target) log.Logf(0, "dialing manager at %v", *flagManager) - manager, err := rpctype.NewRPCClient(*flagManager, timeouts.Scale) + manager, err := rpctype.NewRPCClient(*flagManager, timeouts.Scale, *flagNetCompression) if err != nil { log.SyzFatalf("failed to create an RPC client: %v ", err) } diff --git a/syz-hub/hub.go b/syz-hub/hub.go index 187d4a1eb..f7e7e3e48 100644 --- a/syz-hub/hub.go +++ b/syz-hub/hub.go @@ -61,7 +61,7 @@ func main() { hub.initHTTP(cfg.HTTP) - s, err := rpctype.NewRPCServer(cfg.RPC, "Hub", hub) + s, err := rpctype.NewRPCServer(cfg.RPC, "Hub", hub, true) if err != nil { log.Fatalf("failed to create rpc server: %v", err) } diff --git a/syz-manager/hub.go b/syz-manager/hub.go index 7a38310d5..04904664d 100644 --- a/syz-manager/hub.go +++ b/syz-manager/hub.go @@ -114,6 +114,10 @@ func (hc *HubConnector) connect(corpus [][]byte) (*rpctype.RPCClient, error) { if err != nil { return nil, err } + hub, err := rpctype.NewRPCClient(hc.cfg.HubAddr, 1, true) + if err != nil { + return nil, err + } a := &rpctype.HubConnectArgs{ Client: hc.cfg.HubClient, Key: key, @@ -136,12 +140,14 @@ func (hc *HubConnector) connect(corpus [][]byte) (*rpctype.RPCClient, error) { if len(a.Corpus) > max { a.Corpus = a.Corpus[:max] } + err = hub.Call("Hub.Connect", a, nil) // Hub.Connect request can be very large, so do it on a transient connection // (rpc connection buffers never shrink). - if err := rpctype.RPCCall(hc.cfg.HubAddr, 1, "Hub.Connect", a, nil); err != nil { + hub.Close() + if err != nil { return nil, err } - hub, err := rpctype.NewRPCClient(hc.cfg.HubAddr, 1) + hub, err = rpctype.NewRPCClient(hc.cfg.HubAddr, 1, true) if err != nil { return nil, err } diff --git a/syz-manager/manager.go b/syz-manager/manager.go index 2280cf287..e7519b707 100644 --- a/syz-manager/manager.go +++ b/syz-manager/manager.go @@ -67,6 +67,7 @@ type Manager struct { vmStop chan bool checkResult *rpctype.CheckArgs fresh bool + netCompression bool expertMode bool numFuzzing uint32 numReproducing uint32 @@ -184,6 +185,7 @@ func RunManager(cfg *mgrconfig.Config) { memoryLeakFrames: make(map[string]bool), dataRaceFrames: make(map[string]bool), fresh: true, + netCompression: vm.UseNetCompression(cfg.Type), vmStop: make(chan bool), externalReproQueue: make(chan *Crash, 10), needMoreRepros: make(chan chan bool), @@ -836,11 +838,12 @@ func (mgr *Manager) runInstanceInner(index int, instanceName string) (*report.Re Test: false, Runtest: false, Optional: &instance.OptionalFuzzerArgs{ - Slowdown: mgr.cfg.Timeouts.Slowdown, - RawCover: mgr.cfg.RawCover, - SandboxArg: mgr.cfg.SandboxArg, - PprofPort: inst.PprofPort(), - ResetAccState: mgr.cfg.Experimental.ResetAccState, + Slowdown: mgr.cfg.Timeouts.Slowdown, + RawCover: mgr.cfg.RawCover, + SandboxArg: mgr.cfg.SandboxArg, + PprofPort: inst.PprofPort(), + ResetAccState: mgr.cfg.Experimental.ResetAccState, + NetCompression: mgr.netCompression, }, } cmd := instance.FuzzerCmd(args) diff --git a/syz-manager/rpc.go b/syz-manager/rpc.go index 0c94a5793..185b44d0f 100644 --- a/syz-manager/rpc.go +++ b/syz-manager/rpc.go @@ -74,7 +74,7 @@ func startRPCServer(mgr *Manager) (*RPCServer, error) { cfg: mgr.cfg, stats: mgr.stats, } - s, err := rpctype.NewRPCServer(mgr.cfg.RPC, "Manager", serv) + s, err := rpctype.NewRPCServer(mgr.cfg.RPC, "Manager", serv, mgr.netCompression) if err != nil { return nil, err } diff --git a/syz-runner/runner.go b/syz-runner/runner.go index de4b7de5b..9f97ebfd9 100644 --- a/syz-runner/runner.go +++ b/syz-runner/runner.go @@ -46,7 +46,7 @@ func main() { } timeouts := config.Timeouts - vrf, err := rpctype.NewRPCClient(*flagAddr, timeouts.Scale) + vrf, err := rpctype.NewRPCClient(*flagAddr, timeouts.Scale, true) if err != nil { log.Fatalf("failed to connect to verifier : %v", err) } diff --git a/syz-verifier/rpcserver.go b/syz-verifier/rpcserver.go index f80fb7a46..8c0e6b705 100644 --- a/syz-verifier/rpcserver.go +++ b/syz-verifier/rpcserver.go @@ -33,7 +33,7 @@ func startRPCServer(vrf *Verifier) (*RPCServer, error) { notChecked: len(vrf.pools), } - s, err := rpctype.NewRPCServer(vrf.addr, "Verifier", srv) + s, err := rpctype.NewRPCServer(vrf.addr, "Verifier", srv, true) if err != nil { return nil, err } diff --git a/tools/syz-hubtool/hubtool.go b/tools/syz-hubtool/hubtool.go index 9789639db..08d55cd95 100644 --- a/tools/syz-hubtool/hubtool.go +++ b/tools/syz-hubtool/hubtool.go @@ -57,7 +57,7 @@ func main() { return } log.Printf("connecting to hub at %v...", *flagHubAddress) - conn, err := rpctype.NewRPCClient(*flagHubAddress, 1) + conn, err := rpctype.NewRPCClient(*flagHubAddress, 1, true) if err != nil { log.Fatalf("failed to connect to hub: %v", err) } diff --git a/tools/syz-runtest/runtest.go b/tools/syz-runtest/runtest.go index 405b22255..d18bf7d07 100644 --- a/tools/syz-runtest/runtest.go +++ b/tools/syz-runtest/runtest.go @@ -61,7 +61,7 @@ func main() { reqMap: make(map[int]*runtest.RunRequest), lastReq: make(map[string]int), } - s, err := rpctype.NewRPCServer(cfg.RPC, "Manager", mgr) + s, err := rpctype.NewRPCServer(cfg.RPC, "Manager", mgr, true) if err != nil { log.Fatalf("failed to create rpc server: %v", err) } diff --git a/vm/adb/adb.go b/vm/adb/adb.go index a108104c5..5b162bc37 100644 --- a/vm/adb/adb.go +++ b/vm/adb/adb.go @@ -27,7 +27,7 @@ import ( ) func init() { - vmimpl.Register("adb", ctor, false) + vmimpl.Register("adb", ctor, false, true) } type Device struct { diff --git a/vm/bhyve/bhyve.go b/vm/bhyve/bhyve.go index c3b53e053..3e82cbc2d 100644 --- a/vm/bhyve/bhyve.go +++ b/vm/bhyve/bhyve.go @@ -21,7 +21,7 @@ import ( ) func init() { - vmimpl.Register("bhyve", ctor, true) + vmimpl.Register("bhyve", ctor, true, false) } type Config struct { diff --git a/vm/cuttlefish/cuttlefish.go b/vm/cuttlefish/cuttlefish.go index 94610ed4e..d17bd78f2 100644 --- a/vm/cuttlefish/cuttlefish.go +++ b/vm/cuttlefish/cuttlefish.go @@ -28,7 +28,7 @@ const ( ) func init() { - vmimpl.Register("cuttlefish", ctor, true) + vmimpl.Register("cuttlefish", ctor, true, true) } type Pool struct { diff --git a/vm/gce/gce.go b/vm/gce/gce.go index 16f3f996f..6a292d4f6 100644 --- a/vm/gce/gce.go +++ b/vm/gce/gce.go @@ -35,7 +35,7 @@ import ( ) func init() { - vmimpl.Register("gce", ctor, true) + vmimpl.Register("gce", ctor, true, true) } type Config struct { diff --git a/vm/gvisor/gvisor.go b/vm/gvisor/gvisor.go index 45c076b23..f8194ab56 100644 --- a/vm/gvisor/gvisor.go +++ b/vm/gvisor/gvisor.go @@ -25,7 +25,7 @@ import ( ) func init() { - vmimpl.Register("gvisor", ctor, true) + vmimpl.Register("gvisor", ctor, true, false) } type Config struct { diff --git a/vm/isolated/isolated.go b/vm/isolated/isolated.go index 51a995b72..3383e91fa 100755 --- a/vm/isolated/isolated.go +++ b/vm/isolated/isolated.go @@ -23,7 +23,7 @@ import ( const pstoreConsoleFile = "/sys/fs/pstore/console-ramoops-0" func init() { - vmimpl.Register("isolated", ctor, false) + vmimpl.Register("isolated", ctor, false, true) } type Config struct { diff --git a/vm/kvm/kvm.go b/vm/kvm/kvm.go index 353f4300e..f22a195b3 100644 --- a/vm/kvm/kvm.go +++ b/vm/kvm/kvm.go @@ -27,7 +27,7 @@ const ( ) func init() { - vmimpl.Register("kvm", ctor, true) + vmimpl.Register("kvm", ctor, true, false) } type Config struct { diff --git a/vm/proxyapp/init.go b/vm/proxyapp/init.go index 467187f46..9c96e4855 100644 --- a/vm/proxyapp/init.go +++ b/vm/proxyapp/init.go @@ -32,7 +32,7 @@ func init() { func(env *vmimpl.Env) (vmimpl.Pool, error) { return ctor(makeDefaultParams(), env) }, - false) + false, true) } // Package configuration VARs are mostly needed for tests. diff --git a/vm/qemu/qemu.go b/vm/qemu/qemu.go index 4b857066a..4496680c3 100644 --- a/vm/qemu/qemu.go +++ b/vm/qemu/qemu.go @@ -27,7 +27,7 @@ import ( func init() { var _ vmimpl.Infoer = (*instance)(nil) - vmimpl.Register("qemu", ctor, true) + vmimpl.Register("qemu", ctor, true, false) } type Config struct { diff --git a/vm/starnix/starnix.go b/vm/starnix/starnix.go index d8beaba91..c3973fee9 100644 --- a/vm/starnix/starnix.go +++ b/vm/starnix/starnix.go @@ -22,7 +22,7 @@ import ( func init() { var _ vmimpl.Infoer = (*instance)(nil) - vmimpl.Register("starnix", ctor, true) + vmimpl.Register("starnix", ctor, true, false) } type Config struct { @@ -93,6 +93,12 @@ func AllowsOvercommit(typ string) bool { return vmimpl.Types[vmType(typ)].Overcommit } +// UseNetCompression says if it's beneficial to use network compression for this VM type. +// Local VMs (qemu) generally don't benefit from compression, while remote machines may benefit. +func UseNetCompression(typ string) bool { + return vmimpl.Types[vmType(typ)].NetCompression +} + // Create creates a VM pool that can be used to create individual VMs. func Create(cfg *mgrconfig.Config, debug bool) (*Pool, error) { typ, ok := vmimpl.Types[vmType(cfg.Type)] diff --git a/vm/vm_test.go b/vm/vm_test.go index 2a3f1171a..62b93f4c1 100644 --- a/vm/vm_test.go +++ b/vm/vm_test.go @@ -80,7 +80,7 @@ func init() { ctor := func(env *vmimpl.Env) (vmimpl.Pool, error) { return &testPool{}, nil } - vmimpl.Register("test", ctor, false) + vmimpl.Register("test", ctor, false, false) } type Test struct { diff --git a/vm/vmimpl/vmimpl.go b/vm/vmimpl/vmimpl.go index a9afdc1f1..bd565c8aa 100644 --- a/vm/vmimpl/vmimpl.go +++ b/vm/vmimpl/vmimpl.go @@ -129,16 +129,18 @@ func (err InfraError) InfraError() (string, []byte) { } // Register registers a new VM type within the package. -func Register(typ string, ctor ctorFunc, allowsOvercommit bool) { +func Register(typ string, ctor ctorFunc, allowsOvercommit, netCompression bool) { Types[typ] = Type{ - Ctor: ctor, - Overcommit: allowsOvercommit, + Ctor: ctor, + Overcommit: allowsOvercommit, + NetCompression: netCompression, } } type Type struct { - Ctor ctorFunc - Overcommit bool + Ctor ctorFunc + Overcommit bool + NetCompression bool } type ctorFunc func(env *Env) (Pool, error) diff --git a/vm/vmm/vmm.go b/vm/vmm/vmm.go index 71dfd39b4..67f12f4c7 100644 --- a/vm/vmm/vmm.go +++ b/vm/vmm/vmm.go @@ -25,7 +25,7 @@ import ( var vmctlStatusRegex = regexp.MustCompile(`^\s+([0-9]+)\b.*\brunning`) func init() { - vmimpl.Register("vmm", ctor, true) + vmimpl.Register("vmm", ctor, true, false) } type Config struct { diff --git a/vm/vmware/vmware.go b/vm/vmware/vmware.go index 4c05bac12..6bccbaa6e 100644 --- a/vm/vmware/vmware.go +++ b/vm/vmware/vmware.go @@ -22,7 +22,7 @@ import ( ) func init() { - vmimpl.Register("vmware", ctor, false) + vmimpl.Register("vmware", ctor, false, false) } type Config struct { |
