diff options
| author | Dmitry Vyukov <dvyukov@google.com> | 2017-01-30 15:15:37 +0100 |
|---|---|---|
| committer | Dmitry Vyukov <dvyukov@google.com> | 2017-01-30 15:15:37 +0100 |
| commit | 1f0546f0da3e93ddf818cc83eda4d1f9c7388c94 (patch) | |
| tree | cb615be555eda759987734071a2bed619be549a7 | |
| parent | 8b2c1cb5bbde0ed2b055645d04ee3a2874569805 (diff) | |
manager, fuzzer, hub: move common rpc code into rpctype
If hub hangs, it causes all managers to hang as well as they call
hub under the global mutex. So move common rpc code into rpctype
and make it more careful about timeouts (tcp keepalives, call timeouts).
Also don't call hub under the mutex, the call can be slow.
| -rw-r--r-- | rpctype/rpc.go | 88 | ||||
| -rw-r--r-- | rpctype/rpctype.go | 1 | ||||
| -rw-r--r-- | syz-fuzzer/fuzzer.go | 26 | ||||
| -rw-r--r-- | syz-hub/hub.go | 22 | ||||
| -rw-r--r-- | syz-manager/manager.go | 63 |
5 files changed, 138 insertions, 62 deletions
diff --git a/rpctype/rpc.go b/rpctype/rpc.go new file mode 100644 index 000000000..32d751be4 --- /dev/null +++ b/rpctype/rpc.go @@ -0,0 +1,88 @@ +// Copyright 2017 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package rpctype + +import ( + "fmt" + "net" + "net/rpc" + "time" + + . "github.com/google/syzkaller/log" +) + +type RpcServer struct { + ln net.Listener + s *rpc.Server +} + +func NewRpcServer(addr string, receiver interface{}) (*RpcServer, error) { + ln, err := net.Listen("tcp", addr) + if err != nil { + return nil, fmt.Errorf("failed to listen on %v: %v", addr, err) + } + s := rpc.NewServer() + s.Register(receiver) + serv := &RpcServer{ + ln: ln, + s: s, + } + return serv, nil +} + +func (serv *RpcServer) Serve() { + for { + conn, err := serv.ln.Accept() + if err != nil { + Logf(0, "failed to accept an rpc connection: %v", err) + continue + } + conn.(*net.TCPConn).SetKeepAlive(true) + conn.(*net.TCPConn).SetKeepAlivePeriod(time.Minute) + go serv.s.ServeConn(conn) + } +} + +func (serv *RpcServer) Addr() net.Addr { + return serv.ln.Addr() +} + +type RpcClient struct { + conn net.Conn + c *rpc.Client +} + +func NewRpcClient(addr string) (*RpcClient, error) { + conn, err := net.DialTimeout("tcp", addr, 15*time.Second) + if err != nil { + return nil, err + } + conn.(*net.TCPConn).SetKeepAlive(true) + conn.(*net.TCPConn).SetKeepAlivePeriod(time.Minute) + cli := &RpcClient{ + conn: conn, + c: rpc.NewClient(conn), + } + return cli, nil +} + +func (cli *RpcClient) Call(method string, args, reply interface{}) error { + cli.conn.SetDeadline(time.Now().Add(30 * time.Second)) + err := cli.c.Call(method, args, reply) + cli.conn.SetDeadline(time.Time{}) + return err +} + +func (cli *RpcClient) Close() { + cli.c.Close() +} + +func RpcCall(addr, method string, args, reply interface{}) error { + c, err := NewRpcClient(addr) + if err != nil { + return err + } + defer c.Close() + return c.Call(method, args, reply) +} diff --git a/rpctype/rpctype.go b/rpctype/rpctype.go index 87148f05b..ced475156 100644 --- a/rpctype/rpctype.go +++ b/rpctype/rpctype.go @@ -24,6 +24,7 @@ type ConnectArgs struct { type ConnectRes struct { Prios [][]float32 + Inputs []RpcInput MaxSignal []uint32 Candidates []RpcCandidate EnabledCalls string diff --git a/syz-fuzzer/fuzzer.go b/syz-fuzzer/fuzzer.go index 96b4dade0..e4eaf10e9 100644 --- a/syz-fuzzer/fuzzer.go +++ b/syz-fuzzer/fuzzer.go @@ -14,8 +14,6 @@ import ( "math/rand" "net/http" _ "net/http/pprof" - "net/rpc" - "net/rpc/jsonrpc" "os" "os/signal" "runtime" @@ -64,7 +62,7 @@ type Candidate struct { } var ( - manager *rpc.Client + manager *RpcClient signalMu sync.RWMutex corpusSignal map[uint32]struct{} @@ -128,18 +126,16 @@ func main() { corpusHashes = make(map[hash.Sig]struct{}) Logf(0, "dialing manager at %v", *flagManager) - conn, err := jsonrpc.Dial("tcp", *flagManager) - if err != nil { - panic(err) - } - manager = conn a := &ConnectArgs{*flagName} r := &ConnectRes{} - if err := manager.Call("Manager.Connect", a, r); err != nil { + if err := RpcCall(*flagManager, "Manager.Connect", a, r); err != nil { panic(err) } calls := buildCallList(r.EnabledCalls) ct := prog.BuildChoiceTable(r.Prios, calls) + for _, inp := range r.Inputs { + addInput(inp) + } for _, s := range r.MaxSignal { maxSignal[s] = struct{}{} } @@ -168,11 +164,21 @@ func main() { for c := range calls { a.Calls = append(a.Calls, c.Name) } - if err := manager.Call("Manager.Check", a, nil); err != nil { + if err := RpcCall(*flagManager, "Manager.Check", a, nil); err != nil { panic(err) } } + // Manager.Connect reply can ve very large and that memory will be permanently cached in the connection. + // So we do the call on a transient connection, free all memory and reconnect. + // The rest of rpc requests have bounded size. + debug.FreeOSMemory() + if conn, err := NewRpcClient(*flagManager); err != nil { + panic(err) + } else { + manager = conn + } + kmemleakInit() flags, timeout, err := ipc.DefaultFlags() diff --git a/syz-hub/hub.go b/syz-hub/hub.go index bb5128727..8b0be56d6 100644 --- a/syz-hub/hub.go +++ b/syz-hub/hub.go @@ -8,10 +8,7 @@ import ( "flag" "fmt" "io/ioutil" - "net" - "net/rpc" "sync" - "time" . "github.com/google/syzkaller/log" . "github.com/google/syzkaller/rpctype" @@ -59,23 +56,12 @@ func main() { hub.initHttp(cfg.Http) - ln, err := net.Listen("tcp", cfg.Rpc) + s, err := NewRpcServer(cfg.Rpc, hub) if err != nil { - Fatalf("failed to listen on %v: %v", cfg.Rpc, err) - } - Logf(0, "serving rpc on tcp://%v", ln.Addr()) - s := rpc.NewServer() - s.Register(hub) - for { - conn, err := ln.Accept() - if err != nil { - Logf(0, "failed to accept an rpc connection: %v", err) - continue - } - conn.(*net.TCPConn).SetKeepAlive(true) - conn.(*net.TCPConn).SetKeepAlivePeriod(time.Minute) - go s.ServeConn(conn) + Fatalf("failed to create rpc server: %v", err) } + Logf(0, "serving rpc on tcp://%v", s.Addr()) + s.Serve() } func (hub *Hub) Connect(a *HubConnectArgs, r *int) error { diff --git a/syz-manager/manager.go b/syz-manager/manager.go index fb36b6d75..5b7ee98ba 100644 --- a/syz-manager/manager.go +++ b/syz-manager/manager.go @@ -11,8 +11,6 @@ import ( "io/ioutil" "math/rand" "net" - "net/rpc" - "net/rpc/jsonrpc" "os" "os/signal" "path/filepath" @@ -73,7 +71,7 @@ type Manager struct { prios [][]float32 fuzzers map[string]*Fuzzer - hub *rpc.Client + hub *RpcClient hubCorpus map[hash.Sig]bool } @@ -200,26 +198,13 @@ func RunManager(cfg *config.Config, syscalls map[int]bool) { mgr.initHttp() // Create RPC server for fuzzers. - ln, err := net.Listen("tcp", cfg.Rpc) + s, err := NewRpcServer(cfg.Rpc, mgr) if err != nil { - Fatalf("failed to listen on %v: %v", cfg.Rpc, err) + Fatalf("failed to create rpc server: %v", err) } - Logf(0, "serving rpc on tcp://%v", ln.Addr()) - mgr.port = ln.Addr().(*net.TCPAddr).Port - s := rpc.NewServer() - s.Register(mgr) - go func() { - for { - conn, err := ln.Accept() - if err != nil { - Logf(0, "failed to accept an rpc connection: %v", err) - continue - } - conn.(*net.TCPConn).SetKeepAlive(true) - conn.(*net.TCPConn).SetKeepAlivePeriod(time.Minute) - go s.ServeCodec(jsonrpc.NewServerCodec(conn)) - } - }() + Logf(0, "serving rpc on tcp://%v", s.Addr()) + mgr.port = s.Addr().(*net.TCPAddr).Port + go s.Serve() go func() { for lastTime := time.Now(); ; { @@ -645,8 +630,10 @@ func (mgr *Manager) Connect(a *ConnectArgs, r *ConnectRes) error { } mgr.fuzzers[a.Name] = f mgr.minimizeCorpus() + + f.inputs = nil for _, inp := range mgr.corpus { - f.inputs = append(f.inputs, inp) + r.Inputs = append(r.Inputs, inp) } r.Prios = mgr.prios r.EnabledCalls = mgr.enabledSyscalls @@ -785,30 +772,35 @@ func (mgr *Manager) hubSync() { mgr.minimizeCorpus() if mgr.hub == nil { - conn, err := rpc.Dial("tcp", mgr.cfg.Hub_Addr) - if err != nil { - Logf(0, "failed to connect to hub at %v: %v", mgr.cfg.Hub_Addr, err) - return - } - mgr.hub = conn a := &HubConnectArgs{ Name: mgr.cfg.Name, Key: mgr.cfg.Hub_Key, Fresh: mgr.fresh, Calls: mgr.enabledCalls, } - mgr.hubCorpus = make(map[hash.Sig]bool) + hubCorpus := make(map[hash.Sig]bool) for _, inp := range mgr.corpus { - mgr.hubCorpus[hash.Hash(inp.Prog)] = true + hubCorpus[hash.Hash(inp.Prog)] = true a.Corpus = append(a.Corpus, inp.Prog) } - if err := mgr.hub.Call("Hub.Connect", a, nil); err != nil { + mgr.mu.Unlock() + // Hub.Connect request can be very large, so do it on a transient connection + // (rpc connection buffers never shrink). + // Also don't do hub rpc's under the mutex -- hub can be slow or inaccessible. + if err := RpcCall(mgr.cfg.Hub_Addr, "Hub.Connect", a, nil); err != nil { + mgr.mu.Lock() Logf(0, "Hub.Connect rpc failed: %v", err) - mgr.hub.Close() - mgr.hub = nil - mgr.hubCorpus = nil return } + conn, err := NewRpcClient(mgr.cfg.Hub_Addr) + if err != nil { + mgr.mu.Lock() + Logf(0, "failed to connect to hub at %v: %v", mgr.cfg.Hub_Addr, err) + return + } + mgr.mu.Lock() + mgr.hub = conn + mgr.hubCorpus = hubCorpus mgr.fresh = false Logf(0, "connected to hub at %v, corpus %v", mgr.cfg.Hub_Addr, len(mgr.corpus)) } @@ -834,13 +826,16 @@ func (mgr *Manager) hubSync() { delete(mgr.hubCorpus, sig) a.Del = append(a.Del, sig.String()) } + mgr.mu.Unlock() r := new(HubSyncRes) if err := mgr.hub.Call("Hub.Sync", a, r); err != nil { + mgr.mu.Lock() Logf(0, "Hub.Sync rpc failed: %v", err) mgr.hub.Close() mgr.hub = nil return } + mgr.mu.Lock() dropped := 0 for _, inp := range r.Inputs { _, err := prog.Deserialize(inp) |
