From 1f0546f0da3e93ddf818cc83eda4d1f9c7388c94 Mon Sep 17 00:00:00 2001 From: Dmitry Vyukov Date: Mon, 30 Jan 2017 15:15:37 +0100 Subject: manager, fuzzer, hub: move common rpc code into rpctype If hub hangs, it causes all managers to hang as well as they call hub under the global mutex. So move common rpc code into rpctype and make it more careful about timeouts (tcp keepalives, call timeouts). Also don't call hub under the mutex, the call can be slow. --- rpctype/rpc.go | 88 ++++++++++++++++++++++++++++++++++++++++++++++++++ rpctype/rpctype.go | 1 + syz-fuzzer/fuzzer.go | 26 +++++++++------ syz-hub/hub.go | 22 +++---------- syz-manager/manager.go | 63 +++++++++++++++++------------------- 5 files changed, 138 insertions(+), 62 deletions(-) create mode 100644 rpctype/rpc.go diff --git a/rpctype/rpc.go b/rpctype/rpc.go new file mode 100644 index 000000000..32d751be4 --- /dev/null +++ b/rpctype/rpc.go @@ -0,0 +1,88 @@ +// Copyright 2017 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package rpctype + +import ( + "fmt" + "net" + "net/rpc" + "time" + + . "github.com/google/syzkaller/log" +) + +type RpcServer struct { + ln net.Listener + s *rpc.Server +} + +func NewRpcServer(addr string, receiver interface{}) (*RpcServer, error) { + ln, err := net.Listen("tcp", addr) + if err != nil { + return nil, fmt.Errorf("failed to listen on %v: %v", addr, err) + } + s := rpc.NewServer() + s.Register(receiver) + serv := &RpcServer{ + ln: ln, + s: s, + } + return serv, nil +} + +func (serv *RpcServer) Serve() { + for { + conn, err := serv.ln.Accept() + if err != nil { + Logf(0, "failed to accept an rpc connection: %v", err) + continue + } + conn.(*net.TCPConn).SetKeepAlive(true) + conn.(*net.TCPConn).SetKeepAlivePeriod(time.Minute) + go serv.s.ServeConn(conn) + } +} + +func (serv *RpcServer) Addr() net.Addr { + return serv.ln.Addr() +} + +type RpcClient struct { + conn net.Conn + c *rpc.Client +} + +func NewRpcClient(addr string) (*RpcClient, error) { + conn, err := net.DialTimeout("tcp", addr, 15*time.Second) + if err != nil { + return nil, err + } + conn.(*net.TCPConn).SetKeepAlive(true) + conn.(*net.TCPConn).SetKeepAlivePeriod(time.Minute) + cli := &RpcClient{ + conn: conn, + c: rpc.NewClient(conn), + } + return cli, nil +} + +func (cli *RpcClient) Call(method string, args, reply interface{}) error { + cli.conn.SetDeadline(time.Now().Add(30 * time.Second)) + err := cli.c.Call(method, args, reply) + cli.conn.SetDeadline(time.Time{}) + return err +} + +func (cli *RpcClient) Close() { + cli.c.Close() +} + +func RpcCall(addr, method string, args, reply interface{}) error { + c, err := NewRpcClient(addr) + if err != nil { + return err + } + defer c.Close() + return c.Call(method, args, reply) +} diff --git a/rpctype/rpctype.go b/rpctype/rpctype.go index 87148f05b..ced475156 100644 --- a/rpctype/rpctype.go +++ b/rpctype/rpctype.go @@ -24,6 +24,7 @@ type ConnectArgs struct { type ConnectRes struct { Prios [][]float32 + Inputs []RpcInput MaxSignal []uint32 Candidates []RpcCandidate EnabledCalls string diff --git a/syz-fuzzer/fuzzer.go b/syz-fuzzer/fuzzer.go index 96b4dade0..e4eaf10e9 100644 --- a/syz-fuzzer/fuzzer.go +++ b/syz-fuzzer/fuzzer.go @@ -14,8 +14,6 @@ import ( "math/rand" "net/http" _ "net/http/pprof" - "net/rpc" - "net/rpc/jsonrpc" "os" "os/signal" "runtime" @@ -64,7 +62,7 @@ type Candidate struct { } var ( - manager *rpc.Client + manager *RpcClient signalMu sync.RWMutex corpusSignal map[uint32]struct{} @@ -128,18 +126,16 @@ func main() { corpusHashes = make(map[hash.Sig]struct{}) Logf(0, "dialing manager at %v", *flagManager) - conn, err := jsonrpc.Dial("tcp", *flagManager) - if err != nil { - panic(err) - } - manager = conn a := &ConnectArgs{*flagName} r := &ConnectRes{} - if err := manager.Call("Manager.Connect", a, r); err != nil { + if err := RpcCall(*flagManager, "Manager.Connect", a, r); err != nil { panic(err) } calls := buildCallList(r.EnabledCalls) ct := prog.BuildChoiceTable(r.Prios, calls) + for _, inp := range r.Inputs { + addInput(inp) + } for _, s := range r.MaxSignal { maxSignal[s] = struct{}{} } @@ -168,11 +164,21 @@ func main() { for c := range calls { a.Calls = append(a.Calls, c.Name) } - if err := manager.Call("Manager.Check", a, nil); err != nil { + if err := RpcCall(*flagManager, "Manager.Check", a, nil); err != nil { panic(err) } } + // Manager.Connect reply can ve very large and that memory will be permanently cached in the connection. + // So we do the call on a transient connection, free all memory and reconnect. + // The rest of rpc requests have bounded size. + debug.FreeOSMemory() + if conn, err := NewRpcClient(*flagManager); err != nil { + panic(err) + } else { + manager = conn + } + kmemleakInit() flags, timeout, err := ipc.DefaultFlags() diff --git a/syz-hub/hub.go b/syz-hub/hub.go index bb5128727..8b0be56d6 100644 --- a/syz-hub/hub.go +++ b/syz-hub/hub.go @@ -8,10 +8,7 @@ import ( "flag" "fmt" "io/ioutil" - "net" - "net/rpc" "sync" - "time" . "github.com/google/syzkaller/log" . "github.com/google/syzkaller/rpctype" @@ -59,23 +56,12 @@ func main() { hub.initHttp(cfg.Http) - ln, err := net.Listen("tcp", cfg.Rpc) + s, err := NewRpcServer(cfg.Rpc, hub) if err != nil { - Fatalf("failed to listen on %v: %v", cfg.Rpc, err) - } - Logf(0, "serving rpc on tcp://%v", ln.Addr()) - s := rpc.NewServer() - s.Register(hub) - for { - conn, err := ln.Accept() - if err != nil { - Logf(0, "failed to accept an rpc connection: %v", err) - continue - } - conn.(*net.TCPConn).SetKeepAlive(true) - conn.(*net.TCPConn).SetKeepAlivePeriod(time.Minute) - go s.ServeConn(conn) + Fatalf("failed to create rpc server: %v", err) } + Logf(0, "serving rpc on tcp://%v", s.Addr()) + s.Serve() } func (hub *Hub) Connect(a *HubConnectArgs, r *int) error { diff --git a/syz-manager/manager.go b/syz-manager/manager.go index fb36b6d75..5b7ee98ba 100644 --- a/syz-manager/manager.go +++ b/syz-manager/manager.go @@ -11,8 +11,6 @@ import ( "io/ioutil" "math/rand" "net" - "net/rpc" - "net/rpc/jsonrpc" "os" "os/signal" "path/filepath" @@ -73,7 +71,7 @@ type Manager struct { prios [][]float32 fuzzers map[string]*Fuzzer - hub *rpc.Client + hub *RpcClient hubCorpus map[hash.Sig]bool } @@ -200,26 +198,13 @@ func RunManager(cfg *config.Config, syscalls map[int]bool) { mgr.initHttp() // Create RPC server for fuzzers. - ln, err := net.Listen("tcp", cfg.Rpc) + s, err := NewRpcServer(cfg.Rpc, mgr) if err != nil { - Fatalf("failed to listen on %v: %v", cfg.Rpc, err) + Fatalf("failed to create rpc server: %v", err) } - Logf(0, "serving rpc on tcp://%v", ln.Addr()) - mgr.port = ln.Addr().(*net.TCPAddr).Port - s := rpc.NewServer() - s.Register(mgr) - go func() { - for { - conn, err := ln.Accept() - if err != nil { - Logf(0, "failed to accept an rpc connection: %v", err) - continue - } - conn.(*net.TCPConn).SetKeepAlive(true) - conn.(*net.TCPConn).SetKeepAlivePeriod(time.Minute) - go s.ServeCodec(jsonrpc.NewServerCodec(conn)) - } - }() + Logf(0, "serving rpc on tcp://%v", s.Addr()) + mgr.port = s.Addr().(*net.TCPAddr).Port + go s.Serve() go func() { for lastTime := time.Now(); ; { @@ -645,8 +630,10 @@ func (mgr *Manager) Connect(a *ConnectArgs, r *ConnectRes) error { } mgr.fuzzers[a.Name] = f mgr.minimizeCorpus() + + f.inputs = nil for _, inp := range mgr.corpus { - f.inputs = append(f.inputs, inp) + r.Inputs = append(r.Inputs, inp) } r.Prios = mgr.prios r.EnabledCalls = mgr.enabledSyscalls @@ -785,30 +772,35 @@ func (mgr *Manager) hubSync() { mgr.minimizeCorpus() if mgr.hub == nil { - conn, err := rpc.Dial("tcp", mgr.cfg.Hub_Addr) - if err != nil { - Logf(0, "failed to connect to hub at %v: %v", mgr.cfg.Hub_Addr, err) - return - } - mgr.hub = conn a := &HubConnectArgs{ Name: mgr.cfg.Name, Key: mgr.cfg.Hub_Key, Fresh: mgr.fresh, Calls: mgr.enabledCalls, } - mgr.hubCorpus = make(map[hash.Sig]bool) + hubCorpus := make(map[hash.Sig]bool) for _, inp := range mgr.corpus { - mgr.hubCorpus[hash.Hash(inp.Prog)] = true + hubCorpus[hash.Hash(inp.Prog)] = true a.Corpus = append(a.Corpus, inp.Prog) } - if err := mgr.hub.Call("Hub.Connect", a, nil); err != nil { + mgr.mu.Unlock() + // Hub.Connect request can be very large, so do it on a transient connection + // (rpc connection buffers never shrink). + // Also don't do hub rpc's under the mutex -- hub can be slow or inaccessible. + if err := RpcCall(mgr.cfg.Hub_Addr, "Hub.Connect", a, nil); err != nil { + mgr.mu.Lock() Logf(0, "Hub.Connect rpc failed: %v", err) - mgr.hub.Close() - mgr.hub = nil - mgr.hubCorpus = nil return } + conn, err := NewRpcClient(mgr.cfg.Hub_Addr) + if err != nil { + mgr.mu.Lock() + Logf(0, "failed to connect to hub at %v: %v", mgr.cfg.Hub_Addr, err) + return + } + mgr.mu.Lock() + mgr.hub = conn + mgr.hubCorpus = hubCorpus mgr.fresh = false Logf(0, "connected to hub at %v, corpus %v", mgr.cfg.Hub_Addr, len(mgr.corpus)) } @@ -834,13 +826,16 @@ func (mgr *Manager) hubSync() { delete(mgr.hubCorpus, sig) a.Del = append(a.Del, sig.String()) } + mgr.mu.Unlock() r := new(HubSyncRes) if err := mgr.hub.Call("Hub.Sync", a, r); err != nil { + mgr.mu.Lock() Logf(0, "Hub.Sync rpc failed: %v", err) mgr.hub.Close() mgr.hub = nil return } + mgr.mu.Lock() dropped := 0 for _, inp := range r.Inputs { _, err := prog.Deserialize(inp) -- cgit mrf-deployment