aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDmitry Vyukov <dvyukov@google.com>2017-01-30 15:15:37 +0100
committerDmitry Vyukov <dvyukov@google.com>2017-01-30 15:15:37 +0100
commit1f0546f0da3e93ddf818cc83eda4d1f9c7388c94 (patch)
treecb615be555eda759987734071a2bed619be549a7
parent8b2c1cb5bbde0ed2b055645d04ee3a2874569805 (diff)
manager, fuzzer, hub: move common rpc code into rpctype
If hub hangs, it causes all managers to hang as well as they call hub under the global mutex. So move common rpc code into rpctype and make it more careful about timeouts (tcp keepalives, call timeouts). Also don't call hub under the mutex, the call can be slow.
-rw-r--r--rpctype/rpc.go88
-rw-r--r--rpctype/rpctype.go1
-rw-r--r--syz-fuzzer/fuzzer.go26
-rw-r--r--syz-hub/hub.go22
-rw-r--r--syz-manager/manager.go63
5 files changed, 138 insertions, 62 deletions
diff --git a/rpctype/rpc.go b/rpctype/rpc.go
new file mode 100644
index 000000000..32d751be4
--- /dev/null
+++ b/rpctype/rpc.go
@@ -0,0 +1,88 @@
+// Copyright 2017 syzkaller project authors. All rights reserved.
+// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
+
+package rpctype
+
+import (
+ "fmt"
+ "net"
+ "net/rpc"
+ "time"
+
+ . "github.com/google/syzkaller/log"
+)
+
+type RpcServer struct {
+ ln net.Listener
+ s *rpc.Server
+}
+
+func NewRpcServer(addr string, receiver interface{}) (*RpcServer, error) {
+ ln, err := net.Listen("tcp", addr)
+ if err != nil {
+ return nil, fmt.Errorf("failed to listen on %v: %v", addr, err)
+ }
+ s := rpc.NewServer()
+ s.Register(receiver)
+ serv := &RpcServer{
+ ln: ln,
+ s: s,
+ }
+ return serv, nil
+}
+
+func (serv *RpcServer) Serve() {
+ for {
+ conn, err := serv.ln.Accept()
+ if err != nil {
+ Logf(0, "failed to accept an rpc connection: %v", err)
+ continue
+ }
+ conn.(*net.TCPConn).SetKeepAlive(true)
+ conn.(*net.TCPConn).SetKeepAlivePeriod(time.Minute)
+ go serv.s.ServeConn(conn)
+ }
+}
+
+func (serv *RpcServer) Addr() net.Addr {
+ return serv.ln.Addr()
+}
+
+type RpcClient struct {
+ conn net.Conn
+ c *rpc.Client
+}
+
+func NewRpcClient(addr string) (*RpcClient, error) {
+ conn, err := net.DialTimeout("tcp", addr, 15*time.Second)
+ if err != nil {
+ return nil, err
+ }
+ conn.(*net.TCPConn).SetKeepAlive(true)
+ conn.(*net.TCPConn).SetKeepAlivePeriod(time.Minute)
+ cli := &RpcClient{
+ conn: conn,
+ c: rpc.NewClient(conn),
+ }
+ return cli, nil
+}
+
+func (cli *RpcClient) Call(method string, args, reply interface{}) error {
+ cli.conn.SetDeadline(time.Now().Add(30 * time.Second))
+ err := cli.c.Call(method, args, reply)
+ cli.conn.SetDeadline(time.Time{})
+ return err
+}
+
+func (cli *RpcClient) Close() {
+ cli.c.Close()
+}
+
+func RpcCall(addr, method string, args, reply interface{}) error {
+ c, err := NewRpcClient(addr)
+ if err != nil {
+ return err
+ }
+ defer c.Close()
+ return c.Call(method, args, reply)
+}
diff --git a/rpctype/rpctype.go b/rpctype/rpctype.go
index 87148f05b..ced475156 100644
--- a/rpctype/rpctype.go
+++ b/rpctype/rpctype.go
@@ -24,6 +24,7 @@ type ConnectArgs struct {
type ConnectRes struct {
Prios [][]float32
+ Inputs []RpcInput
MaxSignal []uint32
Candidates []RpcCandidate
EnabledCalls string
diff --git a/syz-fuzzer/fuzzer.go b/syz-fuzzer/fuzzer.go
index 96b4dade0..e4eaf10e9 100644
--- a/syz-fuzzer/fuzzer.go
+++ b/syz-fuzzer/fuzzer.go
@@ -14,8 +14,6 @@ import (
"math/rand"
"net/http"
_ "net/http/pprof"
- "net/rpc"
- "net/rpc/jsonrpc"
"os"
"os/signal"
"runtime"
@@ -64,7 +62,7 @@ type Candidate struct {
}
var (
- manager *rpc.Client
+ manager *RpcClient
signalMu sync.RWMutex
corpusSignal map[uint32]struct{}
@@ -128,18 +126,16 @@ func main() {
corpusHashes = make(map[hash.Sig]struct{})
Logf(0, "dialing manager at %v", *flagManager)
- conn, err := jsonrpc.Dial("tcp", *flagManager)
- if err != nil {
- panic(err)
- }
- manager = conn
a := &ConnectArgs{*flagName}
r := &ConnectRes{}
- if err := manager.Call("Manager.Connect", a, r); err != nil {
+ if err := RpcCall(*flagManager, "Manager.Connect", a, r); err != nil {
panic(err)
}
calls := buildCallList(r.EnabledCalls)
ct := prog.BuildChoiceTable(r.Prios, calls)
+ for _, inp := range r.Inputs {
+ addInput(inp)
+ }
for _, s := range r.MaxSignal {
maxSignal[s] = struct{}{}
}
@@ -168,11 +164,21 @@ func main() {
for c := range calls {
a.Calls = append(a.Calls, c.Name)
}
- if err := manager.Call("Manager.Check", a, nil); err != nil {
+ if err := RpcCall(*flagManager, "Manager.Check", a, nil); err != nil {
panic(err)
}
}
+ // Manager.Connect reply can ve very large and that memory will be permanently cached in the connection.
+ // So we do the call on a transient connection, free all memory and reconnect.
+ // The rest of rpc requests have bounded size.
+ debug.FreeOSMemory()
+ if conn, err := NewRpcClient(*flagManager); err != nil {
+ panic(err)
+ } else {
+ manager = conn
+ }
+
kmemleakInit()
flags, timeout, err := ipc.DefaultFlags()
diff --git a/syz-hub/hub.go b/syz-hub/hub.go
index bb5128727..8b0be56d6 100644
--- a/syz-hub/hub.go
+++ b/syz-hub/hub.go
@@ -8,10 +8,7 @@ import (
"flag"
"fmt"
"io/ioutil"
- "net"
- "net/rpc"
"sync"
- "time"
. "github.com/google/syzkaller/log"
. "github.com/google/syzkaller/rpctype"
@@ -59,23 +56,12 @@ func main() {
hub.initHttp(cfg.Http)
- ln, err := net.Listen("tcp", cfg.Rpc)
+ s, err := NewRpcServer(cfg.Rpc, hub)
if err != nil {
- Fatalf("failed to listen on %v: %v", cfg.Rpc, err)
- }
- Logf(0, "serving rpc on tcp://%v", ln.Addr())
- s := rpc.NewServer()
- s.Register(hub)
- for {
- conn, err := ln.Accept()
- if err != nil {
- Logf(0, "failed to accept an rpc connection: %v", err)
- continue
- }
- conn.(*net.TCPConn).SetKeepAlive(true)
- conn.(*net.TCPConn).SetKeepAlivePeriod(time.Minute)
- go s.ServeConn(conn)
+ Fatalf("failed to create rpc server: %v", err)
}
+ Logf(0, "serving rpc on tcp://%v", s.Addr())
+ s.Serve()
}
func (hub *Hub) Connect(a *HubConnectArgs, r *int) error {
diff --git a/syz-manager/manager.go b/syz-manager/manager.go
index fb36b6d75..5b7ee98ba 100644
--- a/syz-manager/manager.go
+++ b/syz-manager/manager.go
@@ -11,8 +11,6 @@ import (
"io/ioutil"
"math/rand"
"net"
- "net/rpc"
- "net/rpc/jsonrpc"
"os"
"os/signal"
"path/filepath"
@@ -73,7 +71,7 @@ type Manager struct {
prios [][]float32
fuzzers map[string]*Fuzzer
- hub *rpc.Client
+ hub *RpcClient
hubCorpus map[hash.Sig]bool
}
@@ -200,26 +198,13 @@ func RunManager(cfg *config.Config, syscalls map[int]bool) {
mgr.initHttp()
// Create RPC server for fuzzers.
- ln, err := net.Listen("tcp", cfg.Rpc)
+ s, err := NewRpcServer(cfg.Rpc, mgr)
if err != nil {
- Fatalf("failed to listen on %v: %v", cfg.Rpc, err)
+ Fatalf("failed to create rpc server: %v", err)
}
- Logf(0, "serving rpc on tcp://%v", ln.Addr())
- mgr.port = ln.Addr().(*net.TCPAddr).Port
- s := rpc.NewServer()
- s.Register(mgr)
- go func() {
- for {
- conn, err := ln.Accept()
- if err != nil {
- Logf(0, "failed to accept an rpc connection: %v", err)
- continue
- }
- conn.(*net.TCPConn).SetKeepAlive(true)
- conn.(*net.TCPConn).SetKeepAlivePeriod(time.Minute)
- go s.ServeCodec(jsonrpc.NewServerCodec(conn))
- }
- }()
+ Logf(0, "serving rpc on tcp://%v", s.Addr())
+ mgr.port = s.Addr().(*net.TCPAddr).Port
+ go s.Serve()
go func() {
for lastTime := time.Now(); ; {
@@ -645,8 +630,10 @@ func (mgr *Manager) Connect(a *ConnectArgs, r *ConnectRes) error {
}
mgr.fuzzers[a.Name] = f
mgr.minimizeCorpus()
+
+ f.inputs = nil
for _, inp := range mgr.corpus {
- f.inputs = append(f.inputs, inp)
+ r.Inputs = append(r.Inputs, inp)
}
r.Prios = mgr.prios
r.EnabledCalls = mgr.enabledSyscalls
@@ -785,30 +772,35 @@ func (mgr *Manager) hubSync() {
mgr.minimizeCorpus()
if mgr.hub == nil {
- conn, err := rpc.Dial("tcp", mgr.cfg.Hub_Addr)
- if err != nil {
- Logf(0, "failed to connect to hub at %v: %v", mgr.cfg.Hub_Addr, err)
- return
- }
- mgr.hub = conn
a := &HubConnectArgs{
Name: mgr.cfg.Name,
Key: mgr.cfg.Hub_Key,
Fresh: mgr.fresh,
Calls: mgr.enabledCalls,
}
- mgr.hubCorpus = make(map[hash.Sig]bool)
+ hubCorpus := make(map[hash.Sig]bool)
for _, inp := range mgr.corpus {
- mgr.hubCorpus[hash.Hash(inp.Prog)] = true
+ hubCorpus[hash.Hash(inp.Prog)] = true
a.Corpus = append(a.Corpus, inp.Prog)
}
- if err := mgr.hub.Call("Hub.Connect", a, nil); err != nil {
+ mgr.mu.Unlock()
+ // Hub.Connect request can be very large, so do it on a transient connection
+ // (rpc connection buffers never shrink).
+ // Also don't do hub rpc's under the mutex -- hub can be slow or inaccessible.
+ if err := RpcCall(mgr.cfg.Hub_Addr, "Hub.Connect", a, nil); err != nil {
+ mgr.mu.Lock()
Logf(0, "Hub.Connect rpc failed: %v", err)
- mgr.hub.Close()
- mgr.hub = nil
- mgr.hubCorpus = nil
return
}
+ conn, err := NewRpcClient(mgr.cfg.Hub_Addr)
+ if err != nil {
+ mgr.mu.Lock()
+ Logf(0, "failed to connect to hub at %v: %v", mgr.cfg.Hub_Addr, err)
+ return
+ }
+ mgr.mu.Lock()
+ mgr.hub = conn
+ mgr.hubCorpus = hubCorpus
mgr.fresh = false
Logf(0, "connected to hub at %v, corpus %v", mgr.cfg.Hub_Addr, len(mgr.corpus))
}
@@ -834,13 +826,16 @@ func (mgr *Manager) hubSync() {
delete(mgr.hubCorpus, sig)
a.Del = append(a.Del, sig.String())
}
+ mgr.mu.Unlock()
r := new(HubSyncRes)
if err := mgr.hub.Call("Hub.Sync", a, r); err != nil {
+ mgr.mu.Lock()
Logf(0, "Hub.Sync rpc failed: %v", err)
mgr.hub.Close()
mgr.hub = nil
return
}
+ mgr.mu.Lock()
dropped := 0
for _, inp := range r.Inputs {
_, err := prog.Deserialize(inp)