diff options
| -rw-r--r-- | pkg/instance/instance.go | 12 | ||||
| -rw-r--r-- | pkg/rpctype/rpc.go | 47 | ||||
| -rw-r--r-- | syz-fuzzer/fuzzer.go | 23 | ||||
| -rw-r--r-- | syz-hub/hub.go | 2 | ||||
| -rw-r--r-- | syz-manager/hub.go | 10 | ||||
| -rw-r--r-- | syz-manager/manager.go | 13 | ||||
| -rw-r--r-- | syz-manager/rpc.go | 2 | ||||
| -rw-r--r-- | syz-runner/runner.go | 2 | ||||
| -rw-r--r-- | syz-verifier/rpcserver.go | 2 | ||||
| -rw-r--r-- | tools/syz-hubtool/hubtool.go | 2 | ||||
| -rw-r--r-- | tools/syz-runtest/runtest.go | 2 | ||||
| -rw-r--r-- | vm/adb/adb.go | 2 | ||||
| -rw-r--r-- | vm/bhyve/bhyve.go | 2 | ||||
| -rw-r--r-- | vm/cuttlefish/cuttlefish.go | 2 | ||||
| -rw-r--r-- | vm/gce/gce.go | 2 | ||||
| -rw-r--r-- | vm/gvisor/gvisor.go | 2 | ||||
| -rwxr-xr-x | vm/isolated/isolated.go | 2 | ||||
| -rw-r--r-- | vm/kvm/kvm.go | 2 | ||||
| -rw-r--r-- | vm/proxyapp/init.go | 2 | ||||
| -rw-r--r-- | vm/qemu/qemu.go | 2 | ||||
| -rw-r--r-- | vm/starnix/starnix.go | 2 | ||||
| -rw-r--r-- | vm/vm.go | 6 | ||||
| -rw-r--r-- | vm/vm_test.go | 2 | ||||
| -rw-r--r-- | vm/vmimpl/vmimpl.go | 12 | ||||
| -rw-r--r-- | vm/vmm/vmm.go | 2 | ||||
| -rw-r--r-- | vm/vmware/vmware.go | 2 |
26 files changed, 90 insertions, 71 deletions
diff --git a/pkg/instance/instance.go b/pkg/instance/instance.go index 7b4ca5c71..1e4787c94 100644 --- a/pkg/instance/instance.go +++ b/pkg/instance/instance.go @@ -456,11 +456,12 @@ func (inst *inst) testRepro() ([]byte, error) { } type OptionalFuzzerArgs struct { - Slowdown int - RawCover bool - SandboxArg int - PprofPort int - ResetAccState bool + Slowdown int + RawCover bool + SandboxArg int + PprofPort int + ResetAccState bool + NetCompression bool } type FuzzerCmdArgs struct { @@ -504,6 +505,7 @@ func FuzzerCmd(args *FuzzerCmdArgs) string { {Name: "sandbox_arg", Value: fmt.Sprint(args.Optional.SandboxArg)}, {Name: "pprof_port", Value: fmt.Sprint(args.Optional.PprofPort)}, {Name: "reset_acc_state", Value: fmt.Sprint(args.Optional.ResetAccState)}, + {Name: "net_compression", Value: fmt.Sprint(args.Optional.NetCompression)}, } optionalArg = " " + tool.OptionalFlags(flags) } diff --git a/pkg/rpctype/rpc.go b/pkg/rpctype/rpc.go index 4bd0a17d0..01f0f1b02 100644 --- a/pkg/rpctype/rpc.go +++ b/pkg/rpctype/rpc.go @@ -17,12 +17,14 @@ import ( ) type RPCServer struct { - ln net.Listener - s *rpc.Server - TotalBytes atomic.Uint64 + TotalBytes atomic.Uint64 // total compressed bytes transmitted + + ln net.Listener + s *rpc.Server + useCompression bool } -func NewRPCServer(addr, name string, receiver interface{}) (*RPCServer, error) { +func NewRPCServer(addr, name string, receiver interface{}, useCompression bool) (*RPCServer, error) { ln, err := net.Listen("tcp", addr) if err != nil { return nil, fmt.Errorf("failed to listen on %v: %w", addr, err) @@ -32,8 +34,9 @@ func NewRPCServer(addr, name string, receiver interface{}) (*RPCServer, error) { return nil, err } serv := &RPCServer{ - ln: ln, - s: s, + ln: ln, + s: s, + useCompression: useCompression, } return serv, nil } @@ -46,7 +49,7 @@ func (serv *RPCServer) Serve() { continue } setupKeepAlive(conn, time.Minute) - go serv.s.ServeConn(newFlateConn(newCountedConn(serv, conn))) + go serv.s.ServeConn(maybeFlateConn(newCountedConn(serv, conn), serv.useCompression)) } } @@ -55,9 +58,10 @@ func (serv *RPCServer) Addr() net.Addr { } type RPCClient struct { - conn net.Conn - c *rpc.Client - timeScale time.Duration + conn net.Conn + c *rpc.Client + timeScale time.Duration + useCompression bool } func Dial(addr string, timeScale time.Duration) (net.Conn, error) { @@ -77,15 +81,16 @@ func Dial(addr string, timeScale time.Duration) (net.Conn, error) { return conn, nil } -func NewRPCClient(addr string, timeScale time.Duration) (*RPCClient, error) { +func NewRPCClient(addr string, timeScale time.Duration, useCompression bool) (*RPCClient, error) { conn, err := Dial(addr, timeScale) if err != nil { return nil, err } cli := &RPCClient{ - conn: conn, - c: rpc.NewClient(newFlateConn(conn)), - timeScale: timeScale, + conn: conn, + c: rpc.NewClient(maybeFlateConn(conn, useCompression)), + timeScale: timeScale, + useCompression: useCompression, } return cli, nil } @@ -101,15 +106,6 @@ func (cli *RPCClient) Close() { cli.c.Close() } -func RPCCall(addr string, timeScale time.Duration, method string, args, reply interface{}) error { - c, err := NewRPCClient(addr, timeScale) - if err != nil { - return err - } - defer c.Close() - return c.Call(method, args, reply) -} - func setupKeepAlive(conn net.Conn, keepAlive time.Duration) { conn.(*net.TCPConn).SetKeepAlive(true) conn.(*net.TCPConn).SetKeepAlivePeriod(keepAlive) @@ -122,7 +118,10 @@ type flateConn struct { c io.Closer } -func newFlateConn(conn io.ReadWriteCloser) io.ReadWriteCloser { +func maybeFlateConn(conn io.ReadWriteCloser, useCompression bool) io.ReadWriteCloser { + if !useCompression { + return conn + } w, err := flate.NewWriter(conn, 9) if err != nil { panic(err) diff --git a/syz-fuzzer/fuzzer.go b/syz-fuzzer/fuzzer.go index fb7f57b30..4448718a1 100644 --- a/syz-fuzzer/fuzzer.go +++ b/syz-fuzzer/fuzzer.go @@ -118,16 +118,17 @@ func main() { debug.SetGCPercent(50) var ( - flagName = flag.String("name", "test", "unique name for manager") - flagOS = flag.String("os", runtime.GOOS, "target OS") - flagArch = flag.String("arch", runtime.GOARCH, "target arch") - flagManager = flag.String("manager", "", "manager rpc address") - flagProcs = flag.Int("procs", 1, "number of parallel test processes") - flagOutput = flag.String("output", "stdout", "write programs to none/stdout/dmesg/file") - flagTest = flag.Bool("test", false, "enable image testing mode") // used by syz-ci - flagRunTest = flag.Bool("runtest", false, "enable program testing mode") // used by pkg/runtest - flagRawCover = flag.Bool("raw_cover", false, "fetch raw coverage") - flagPprofPort = flag.Int("pprof_port", 0, "HTTP port for the pprof endpoint (disabled if 0)") + flagName = flag.String("name", "test", "unique name for manager") + flagOS = flag.String("os", runtime.GOOS, "target OS") + flagArch = flag.String("arch", runtime.GOARCH, "target arch") + flagManager = flag.String("manager", "", "manager rpc address") + flagProcs = flag.Int("procs", 1, "number of parallel test processes") + flagOutput = flag.String("output", "stdout", "write programs to none/stdout/dmesg/file") + flagTest = flag.Bool("test", false, "enable image testing mode") // used by syz-ci + flagRunTest = flag.Bool("runtest", false, "enable program testing mode") // used by pkg/runtest + flagRawCover = flag.Bool("raw_cover", false, "fetch raw coverage") + flagPprofPort = flag.Int("pprof_port", 0, "HTTP port for the pprof endpoint (disabled if 0)") + flagNetCompression = flag.Bool("net_compression", false, "use network compression for RPC calls") // Experimental flags. flagResetAccState = flag.Bool("reset_acc_state", false, "restarts executor before most executions") @@ -179,7 +180,7 @@ func main() { machineInfo, modules := collectMachineInfos(target) log.Logf(0, "dialing manager at %v", *flagManager) - manager, err := rpctype.NewRPCClient(*flagManager, timeouts.Scale) + manager, err := rpctype.NewRPCClient(*flagManager, timeouts.Scale, *flagNetCompression) if err != nil { log.SyzFatalf("failed to create an RPC client: %v ", err) } diff --git a/syz-hub/hub.go b/syz-hub/hub.go index 187d4a1eb..f7e7e3e48 100644 --- a/syz-hub/hub.go +++ b/syz-hub/hub.go @@ -61,7 +61,7 @@ func main() { hub.initHTTP(cfg.HTTP) - s, err := rpctype.NewRPCServer(cfg.RPC, "Hub", hub) + s, err := rpctype.NewRPCServer(cfg.RPC, "Hub", hub, true) if err != nil { log.Fatalf("failed to create rpc server: %v", err) } diff --git a/syz-manager/hub.go b/syz-manager/hub.go index 7a38310d5..04904664d 100644 --- a/syz-manager/hub.go +++ b/syz-manager/hub.go @@ -114,6 +114,10 @@ func (hc *HubConnector) connect(corpus [][]byte) (*rpctype.RPCClient, error) { if err != nil { return nil, err } + hub, err := rpctype.NewRPCClient(hc.cfg.HubAddr, 1, true) + if err != nil { + return nil, err + } a := &rpctype.HubConnectArgs{ Client: hc.cfg.HubClient, Key: key, @@ -136,12 +140,14 @@ func (hc *HubConnector) connect(corpus [][]byte) (*rpctype.RPCClient, error) { if len(a.Corpus) > max { a.Corpus = a.Corpus[:max] } + err = hub.Call("Hub.Connect", a, nil) // Hub.Connect request can be very large, so do it on a transient connection // (rpc connection buffers never shrink). - if err := rpctype.RPCCall(hc.cfg.HubAddr, 1, "Hub.Connect", a, nil); err != nil { + hub.Close() + if err != nil { return nil, err } - hub, err := rpctype.NewRPCClient(hc.cfg.HubAddr, 1) + hub, err = rpctype.NewRPCClient(hc.cfg.HubAddr, 1, true) if err != nil { return nil, err } diff --git a/syz-manager/manager.go b/syz-manager/manager.go index 2280cf287..e7519b707 100644 --- a/syz-manager/manager.go +++ b/syz-manager/manager.go @@ -67,6 +67,7 @@ type Manager struct { vmStop chan bool checkResult *rpctype.CheckArgs fresh bool + netCompression bool expertMode bool numFuzzing uint32 numReproducing uint32 @@ -184,6 +185,7 @@ func RunManager(cfg *mgrconfig.Config) { memoryLeakFrames: make(map[string]bool), dataRaceFrames: make(map[string]bool), fresh: true, + netCompression: vm.UseNetCompression(cfg.Type), vmStop: make(chan bool), externalReproQueue: make(chan *Crash, 10), needMoreRepros: make(chan chan bool), @@ -836,11 +838,12 @@ func (mgr *Manager) runInstanceInner(index int, instanceName string) (*report.Re Test: false, Runtest: false, Optional: &instance.OptionalFuzzerArgs{ - Slowdown: mgr.cfg.Timeouts.Slowdown, - RawCover: mgr.cfg.RawCover, - SandboxArg: mgr.cfg.SandboxArg, - PprofPort: inst.PprofPort(), - ResetAccState: mgr.cfg.Experimental.ResetAccState, + Slowdown: mgr.cfg.Timeouts.Slowdown, + RawCover: mgr.cfg.RawCover, + SandboxArg: mgr.cfg.SandboxArg, + PprofPort: inst.PprofPort(), + ResetAccState: mgr.cfg.Experimental.ResetAccState, + NetCompression: mgr.netCompression, }, } cmd := instance.FuzzerCmd(args) diff --git a/syz-manager/rpc.go b/syz-manager/rpc.go index 0c94a5793..185b44d0f 100644 --- a/syz-manager/rpc.go +++ b/syz-manager/rpc.go @@ -74,7 +74,7 @@ func startRPCServer(mgr *Manager) (*RPCServer, error) { cfg: mgr.cfg, stats: mgr.stats, } - s, err := rpctype.NewRPCServer(mgr.cfg.RPC, "Manager", serv) + s, err := rpctype.NewRPCServer(mgr.cfg.RPC, "Manager", serv, mgr.netCompression) if err != nil { return nil, err } diff --git a/syz-runner/runner.go b/syz-runner/runner.go index de4b7de5b..9f97ebfd9 100644 --- a/syz-runner/runner.go +++ b/syz-runner/runner.go @@ -46,7 +46,7 @@ func main() { } timeouts := config.Timeouts - vrf, err := rpctype.NewRPCClient(*flagAddr, timeouts.Scale) + vrf, err := rpctype.NewRPCClient(*flagAddr, timeouts.Scale, true) if err != nil { log.Fatalf("failed to connect to verifier : %v", err) } diff --git a/syz-verifier/rpcserver.go b/syz-verifier/rpcserver.go index f80fb7a46..8c0e6b705 100644 --- a/syz-verifier/rpcserver.go +++ b/syz-verifier/rpcserver.go @@ -33,7 +33,7 @@ func startRPCServer(vrf *Verifier) (*RPCServer, error) { notChecked: len(vrf.pools), } - s, err := rpctype.NewRPCServer(vrf.addr, "Verifier", srv) + s, err := rpctype.NewRPCServer(vrf.addr, "Verifier", srv, true) if err != nil { return nil, err } diff --git a/tools/syz-hubtool/hubtool.go b/tools/syz-hubtool/hubtool.go index 9789639db..08d55cd95 100644 --- a/tools/syz-hubtool/hubtool.go +++ b/tools/syz-hubtool/hubtool.go @@ -57,7 +57,7 @@ func main() { return } log.Printf("connecting to hub at %v...", *flagHubAddress) - conn, err := rpctype.NewRPCClient(*flagHubAddress, 1) + conn, err := rpctype.NewRPCClient(*flagHubAddress, 1, true) if err != nil { log.Fatalf("failed to connect to hub: %v", err) } diff --git a/tools/syz-runtest/runtest.go b/tools/syz-runtest/runtest.go index 405b22255..d18bf7d07 100644 --- a/tools/syz-runtest/runtest.go +++ b/tools/syz-runtest/runtest.go @@ -61,7 +61,7 @@ func main() { reqMap: make(map[int]*runtest.RunRequest), lastReq: make(map[string]int), } - s, err := rpctype.NewRPCServer(cfg.RPC, "Manager", mgr) + s, err := rpctype.NewRPCServer(cfg.RPC, "Manager", mgr, true) if err != nil { log.Fatalf("failed to create rpc server: %v", err) } diff --git a/vm/adb/adb.go b/vm/adb/adb.go index a108104c5..5b162bc37 100644 --- a/vm/adb/adb.go +++ b/vm/adb/adb.go @@ -27,7 +27,7 @@ import ( ) func init() { - vmimpl.Register("adb", ctor, false) + vmimpl.Register("adb", ctor, false, true) } type Device struct { diff --git a/vm/bhyve/bhyve.go b/vm/bhyve/bhyve.go index c3b53e053..3e82cbc2d 100644 --- a/vm/bhyve/bhyve.go +++ b/vm/bhyve/bhyve.go @@ -21,7 +21,7 @@ import ( ) func init() { - vmimpl.Register("bhyve", ctor, true) + vmimpl.Register("bhyve", ctor, true, false) } type Config struct { diff --git a/vm/cuttlefish/cuttlefish.go b/vm/cuttlefish/cuttlefish.go index 94610ed4e..d17bd78f2 100644 --- a/vm/cuttlefish/cuttlefish.go +++ b/vm/cuttlefish/cuttlefish.go @@ -28,7 +28,7 @@ const ( ) func init() { - vmimpl.Register("cuttlefish", ctor, true) + vmimpl.Register("cuttlefish", ctor, true, true) } type Pool struct { diff --git a/vm/gce/gce.go b/vm/gce/gce.go index 16f3f996f..6a292d4f6 100644 --- a/vm/gce/gce.go +++ b/vm/gce/gce.go @@ -35,7 +35,7 @@ import ( ) func init() { - vmimpl.Register("gce", ctor, true) + vmimpl.Register("gce", ctor, true, true) } type Config struct { diff --git a/vm/gvisor/gvisor.go b/vm/gvisor/gvisor.go index 45c076b23..f8194ab56 100644 --- a/vm/gvisor/gvisor.go +++ b/vm/gvisor/gvisor.go @@ -25,7 +25,7 @@ import ( ) func init() { - vmimpl.Register("gvisor", ctor, true) + vmimpl.Register("gvisor", ctor, true, false) } type Config struct { diff --git a/vm/isolated/isolated.go b/vm/isolated/isolated.go index 51a995b72..3383e91fa 100755 --- a/vm/isolated/isolated.go +++ b/vm/isolated/isolated.go @@ -23,7 +23,7 @@ import ( const pstoreConsoleFile = "/sys/fs/pstore/console-ramoops-0" func init() { - vmimpl.Register("isolated", ctor, false) + vmimpl.Register("isolated", ctor, false, true) } type Config struct { diff --git a/vm/kvm/kvm.go b/vm/kvm/kvm.go index 353f4300e..f22a195b3 100644 --- a/vm/kvm/kvm.go +++ b/vm/kvm/kvm.go @@ -27,7 +27,7 @@ const ( ) func init() { - vmimpl.Register("kvm", ctor, true) + vmimpl.Register("kvm", ctor, true, false) } type Config struct { diff --git a/vm/proxyapp/init.go b/vm/proxyapp/init.go index 467187f46..9c96e4855 100644 --- a/vm/proxyapp/init.go +++ b/vm/proxyapp/init.go @@ -32,7 +32,7 @@ func init() { func(env *vmimpl.Env) (vmimpl.Pool, error) { return ctor(makeDefaultParams(), env) }, - false) + false, true) } // Package configuration VARs are mostly needed for tests. diff --git a/vm/qemu/qemu.go b/vm/qemu/qemu.go index 4b857066a..4496680c3 100644 --- a/vm/qemu/qemu.go +++ b/vm/qemu/qemu.go @@ -27,7 +27,7 @@ import ( func init() { var _ vmimpl.Infoer = (*instance)(nil) - vmimpl.Register("qemu", ctor, true) + vmimpl.Register("qemu", ctor, true, false) } type Config struct { diff --git a/vm/starnix/starnix.go b/vm/starnix/starnix.go index d8beaba91..c3973fee9 100644 --- a/vm/starnix/starnix.go +++ b/vm/starnix/starnix.go @@ -22,7 +22,7 @@ import ( func init() { var _ vmimpl.Infoer = (*instance)(nil) - vmimpl.Register("starnix", ctor, true) + vmimpl.Register("starnix", ctor, true, false) } type Config struct { @@ -93,6 +93,12 @@ func AllowsOvercommit(typ string) bool { return vmimpl.Types[vmType(typ)].Overcommit } +// UseNetCompression says if it's beneficial to use network compression for this VM type. +// Local VMs (qemu) generally don't benefit from compression, while remote machines may benefit. +func UseNetCompression(typ string) bool { + return vmimpl.Types[vmType(typ)].NetCompression +} + // Create creates a VM pool that can be used to create individual VMs. func Create(cfg *mgrconfig.Config, debug bool) (*Pool, error) { typ, ok := vmimpl.Types[vmType(cfg.Type)] diff --git a/vm/vm_test.go b/vm/vm_test.go index 2a3f1171a..62b93f4c1 100644 --- a/vm/vm_test.go +++ b/vm/vm_test.go @@ -80,7 +80,7 @@ func init() { ctor := func(env *vmimpl.Env) (vmimpl.Pool, error) { return &testPool{}, nil } - vmimpl.Register("test", ctor, false) + vmimpl.Register("test", ctor, false, false) } type Test struct { diff --git a/vm/vmimpl/vmimpl.go b/vm/vmimpl/vmimpl.go index a9afdc1f1..bd565c8aa 100644 --- a/vm/vmimpl/vmimpl.go +++ b/vm/vmimpl/vmimpl.go @@ -129,16 +129,18 @@ func (err InfraError) InfraError() (string, []byte) { } // Register registers a new VM type within the package. -func Register(typ string, ctor ctorFunc, allowsOvercommit bool) { +func Register(typ string, ctor ctorFunc, allowsOvercommit, netCompression bool) { Types[typ] = Type{ - Ctor: ctor, - Overcommit: allowsOvercommit, + Ctor: ctor, + Overcommit: allowsOvercommit, + NetCompression: netCompression, } } type Type struct { - Ctor ctorFunc - Overcommit bool + Ctor ctorFunc + Overcommit bool + NetCompression bool } type ctorFunc func(env *Env) (Pool, error) diff --git a/vm/vmm/vmm.go b/vm/vmm/vmm.go index 71dfd39b4..67f12f4c7 100644 --- a/vm/vmm/vmm.go +++ b/vm/vmm/vmm.go @@ -25,7 +25,7 @@ import ( var vmctlStatusRegex = regexp.MustCompile(`^\s+([0-9]+)\b.*\brunning`) func init() { - vmimpl.Register("vmm", ctor, true) + vmimpl.Register("vmm", ctor, true, false) } type Config struct { diff --git a/vm/vmware/vmware.go b/vm/vmware/vmware.go index 4c05bac12..6bccbaa6e 100644 --- a/vm/vmware/vmware.go +++ b/vm/vmware/vmware.go @@ -22,7 +22,7 @@ import ( ) func init() { - vmimpl.Register("vmware", ctor, false) + vmimpl.Register("vmware", ctor, false, false) } type Config struct { |
