diff options
| -rw-r--r-- | rpctype/rpc.go | 88 | ||||
| -rw-r--r-- | rpctype/rpctype.go | 1 | ||||
| -rw-r--r-- | syz-fuzzer/fuzzer.go | 26 | ||||
| -rw-r--r-- | syz-hub/hub.go | 22 | ||||
| -rw-r--r-- | syz-manager/manager.go | 63 |
5 files changed, 138 insertions, 62 deletions
diff --git a/rpctype/rpc.go b/rpctype/rpc.go new file mode 100644 index 000000000..32d751be4 --- /dev/null +++ b/rpctype/rpc.go @@ -0,0 +1,88 @@ +// Copyright 2017 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package rpctype + +import ( + "fmt" + "net" + "net/rpc" + "time" + + . "github.com/google/syzkaller/log" +) + +type RpcServer struct { + ln net.Listener + s *rpc.Server +} + +func NewRpcServer(addr string, receiver interface{}) (*RpcServer, error) { + ln, err := net.Listen("tcp", addr) + if err != nil { + return nil, fmt.Errorf("failed to listen on %v: %v", addr, err) + } + s := rpc.NewServer() + s.Register(receiver) + serv := &RpcServer{ + ln: ln, + s: s, + } + return serv, nil +} + +func (serv *RpcServer) Serve() { + for { + conn, err := serv.ln.Accept() + if err != nil { + Logf(0, "failed to accept an rpc connection: %v", err) + continue + } + conn.(*net.TCPConn).SetKeepAlive(true) + conn.(*net.TCPConn).SetKeepAlivePeriod(time.Minute) + go serv.s.ServeConn(conn) + } +} + +func (serv *RpcServer) Addr() net.Addr { + return serv.ln.Addr() +} + +type RpcClient struct { + conn net.Conn + c *rpc.Client +} + +func NewRpcClient(addr string) (*RpcClient, error) { + conn, err := net.DialTimeout("tcp", addr, 15*time.Second) + if err != nil { + return nil, err + } + conn.(*net.TCPConn).SetKeepAlive(true) + conn.(*net.TCPConn).SetKeepAlivePeriod(time.Minute) + cli := &RpcClient{ + conn: conn, + c: rpc.NewClient(conn), + } + return cli, nil +} + +func (cli *RpcClient) Call(method string, args, reply interface{}) error { + cli.conn.SetDeadline(time.Now().Add(30 * time.Second)) + err := cli.c.Call(method, args, reply) + cli.conn.SetDeadline(time.Time{}) + return err +} + +func (cli *RpcClient) Close() { + cli.c.Close() +} + +func RpcCall(addr, method string, args, reply interface{}) error { + c, err := NewRpcClient(addr) + if err != nil { + return err + } + defer c.Close() + return c.Call(method, args, reply) +} diff --git a/rpctype/rpctype.go b/rpctype/rpctype.go index 87148f05b..ced475156 100644 --- a/rpctype/rpctype.go +++ b/rpctype/rpctype.go @@ -24,6 +24,7 @@ type ConnectArgs struct { type ConnectRes struct { Prios [][]float32 + Inputs []RpcInput MaxSignal []uint32 Candidates []RpcCandidate EnabledCalls string diff --git a/syz-fuzzer/fuzzer.go b/syz-fuzzer/fuzzer.go index 96b4dade0..e4eaf10e9 100644 --- a/syz-fuzzer/fuzzer.go +++ b/syz-fuzzer/fuzzer.go @@ -14,8 +14,6 @@ import ( "math/rand" "net/http" _ "net/http/pprof" - "net/rpc" - "net/rpc/jsonrpc" "os" "os/signal" "runtime" @@ -64,7 +62,7 @@ type Candidate struct { } var ( - manager *rpc.Client + manager *RpcClient signalMu sync.RWMutex corpusSignal map[uint32]struct{} @@ -128,18 +126,16 @@ func main() { corpusHashes = make(map[hash.Sig]struct{}) Logf(0, "dialing manager at %v", *flagManager) - conn, err := jsonrpc.Dial("tcp", *flagManager) - if err != nil { - panic(err) - } - manager = conn a := &ConnectArgs{*flagName} r := &ConnectRes{} - if err := manager.Call("Manager.Connect", a, r); err != nil { + if err := RpcCall(*flagManager, "Manager.Connect", a, r); err != nil { panic(err) } calls := buildCallList(r.EnabledCalls) ct := prog.BuildChoiceTable(r.Prios, calls) + for _, inp := range r.Inputs { + addInput(inp) + } for _, s := range r.MaxSignal { maxSignal[s] = struct{}{} } @@ -168,11 +164,21 @@ func main() { for c := range calls { a.Calls = append(a.Calls, c.Name) } - if err := manager.Call("Manager.Check", a, nil); err != nil { + if err := RpcCall(*flagManager, "Manager.Check", a, nil); err != nil { panic(err) } } + // Manager.Connect reply can ve very large and that memory will be permanently cached in the connection. + // So we do the call on a transient connection, free all memory and reconnect. + // The rest of rpc requests have bounded size. + debug.FreeOSMemory() + if conn, err := NewRpcClient(*flagManager); err != nil { + panic(err) + } else { + manager = conn + } + kmemleakInit() flags, timeout, err := ipc.DefaultFlags() diff --git a/syz-hub/hub.go b/syz-hub/hub.go index bb5128727..8b0be56d6 100644 --- a/syz-hub/hub.go +++ b/syz-hub/hub.go @@ -8,10 +8,7 @@ import ( "flag" "fmt" "io/ioutil" - "net" - "net/rpc" "sync" - "time" . "github.com/google/syzkaller/log" . "github.com/google/syzkaller/rpctype" @@ -59,23 +56,12 @@ func main() { hub.initHttp(cfg.Http) - ln, err := net.Listen("tcp", cfg.Rpc) + s, err := NewRpcServer(cfg.Rpc, hub) if err != nil { - Fatalf("failed to listen on %v: %v", cfg.Rpc, err) - } - Logf(0, "serving rpc on tcp://%v", ln.Addr()) - s := rpc.NewServer() - s.Register(hub) - for { - conn, err := ln.Accept() - if err != nil { - Logf(0, "failed to accept an rpc connection: %v", err) - continue - } - conn.(*net.TCPConn).SetKeepAlive(true) - conn.(*net.TCPConn).SetKeepAlivePeriod(time.Minute) - go s.ServeConn(conn) + Fatalf("failed to create rpc server: %v", err) } + Logf(0, "serving rpc on tcp://%v", s.Addr()) + s.Serve() } func (hub *Hub) Connect(a *HubConnectArgs, r *int) error { diff --git a/syz-manager/manager.go b/syz-manager/manager.go index fb36b6d75..5b7ee98ba 100644 --- a/syz-manager/manager.go +++ b/syz-manager/manager.go @@ -11,8 +11,6 @@ import ( "io/ioutil" "math/rand" "net" - "net/rpc" - "net/rpc/jsonrpc" "os" "os/signal" "path/filepath" @@ -73,7 +71,7 @@ type Manager struct { prios [][]float32 fuzzers map[string]*Fuzzer - hub *rpc.Client + hub *RpcClient hubCorpus map[hash.Sig]bool } @@ -200,26 +198,13 @@ func RunManager(cfg *config.Config, syscalls map[int]bool) { mgr.initHttp() // Create RPC server for fuzzers. - ln, err := net.Listen("tcp", cfg.Rpc) + s, err := NewRpcServer(cfg.Rpc, mgr) if err != nil { - Fatalf("failed to listen on %v: %v", cfg.Rpc, err) + Fatalf("failed to create rpc server: %v", err) } - Logf(0, "serving rpc on tcp://%v", ln.Addr()) - mgr.port = ln.Addr().(*net.TCPAddr).Port - s := rpc.NewServer() - s.Register(mgr) - go func() { - for { - conn, err := ln.Accept() - if err != nil { - Logf(0, "failed to accept an rpc connection: %v", err) - continue - } - conn.(*net.TCPConn).SetKeepAlive(true) - conn.(*net.TCPConn).SetKeepAlivePeriod(time.Minute) - go s.ServeCodec(jsonrpc.NewServerCodec(conn)) - } - }() + Logf(0, "serving rpc on tcp://%v", s.Addr()) + mgr.port = s.Addr().(*net.TCPAddr).Port + go s.Serve() go func() { for lastTime := time.Now(); ; { @@ -645,8 +630,10 @@ func (mgr *Manager) Connect(a *ConnectArgs, r *ConnectRes) error { } mgr.fuzzers[a.Name] = f mgr.minimizeCorpus() + + f.inputs = nil for _, inp := range mgr.corpus { - f.inputs = append(f.inputs, inp) + r.Inputs = append(r.Inputs, inp) } r.Prios = mgr.prios r.EnabledCalls = mgr.enabledSyscalls @@ -785,30 +772,35 @@ func (mgr *Manager) hubSync() { mgr.minimizeCorpus() if mgr.hub == nil { - conn, err := rpc.Dial("tcp", mgr.cfg.Hub_Addr) - if err != nil { - Logf(0, "failed to connect to hub at %v: %v", mgr.cfg.Hub_Addr, err) - return - } - mgr.hub = conn a := &HubConnectArgs{ Name: mgr.cfg.Name, Key: mgr.cfg.Hub_Key, Fresh: mgr.fresh, Calls: mgr.enabledCalls, } - mgr.hubCorpus = make(map[hash.Sig]bool) + hubCorpus := make(map[hash.Sig]bool) for _, inp := range mgr.corpus { - mgr.hubCorpus[hash.Hash(inp.Prog)] = true + hubCorpus[hash.Hash(inp.Prog)] = true a.Corpus = append(a.Corpus, inp.Prog) } - if err := mgr.hub.Call("Hub.Connect", a, nil); err != nil { + mgr.mu.Unlock() + // Hub.Connect request can be very large, so do it on a transient connection + // (rpc connection buffers never shrink). + // Also don't do hub rpc's under the mutex -- hub can be slow or inaccessible. + if err := RpcCall(mgr.cfg.Hub_Addr, "Hub.Connect", a, nil); err != nil { + mgr.mu.Lock() Logf(0, "Hub.Connect rpc failed: %v", err) - mgr.hub.Close() - mgr.hub = nil - mgr.hubCorpus = nil return } + conn, err := NewRpcClient(mgr.cfg.Hub_Addr) + if err != nil { + mgr.mu.Lock() + Logf(0, "failed to connect to hub at %v: %v", mgr.cfg.Hub_Addr, err) + return + } + mgr.mu.Lock() + mgr.hub = conn + mgr.hubCorpus = hubCorpus mgr.fresh = false Logf(0, "connected to hub at %v, corpus %v", mgr.cfg.Hub_Addr, len(mgr.corpus)) } @@ -834,13 +826,16 @@ func (mgr *Manager) hubSync() { delete(mgr.hubCorpus, sig) a.Del = append(a.Del, sig.String()) } + mgr.mu.Unlock() r := new(HubSyncRes) if err := mgr.hub.Call("Hub.Sync", a, r); err != nil { + mgr.mu.Lock() Logf(0, "Hub.Sync rpc failed: %v", err) mgr.hub.Close() mgr.hub = nil return } + mgr.mu.Lock() dropped := 0 for _, inp := range r.Inputs { _, err := prog.Deserialize(inp) |
