aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--rpctype/rpc.go88
-rw-r--r--rpctype/rpctype.go1
-rw-r--r--syz-fuzzer/fuzzer.go26
-rw-r--r--syz-hub/hub.go22
-rw-r--r--syz-manager/manager.go63
5 files changed, 138 insertions, 62 deletions
diff --git a/rpctype/rpc.go b/rpctype/rpc.go
new file mode 100644
index 000000000..32d751be4
--- /dev/null
+++ b/rpctype/rpc.go
@@ -0,0 +1,88 @@
+// Copyright 2017 syzkaller project authors. All rights reserved.
+// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
+
+package rpctype
+
+import (
+ "fmt"
+ "net"
+ "net/rpc"
+ "time"
+
+ . "github.com/google/syzkaller/log"
+)
+
+type RpcServer struct {
+ ln net.Listener
+ s *rpc.Server
+}
+
+func NewRpcServer(addr string, receiver interface{}) (*RpcServer, error) {
+ ln, err := net.Listen("tcp", addr)
+ if err != nil {
+ return nil, fmt.Errorf("failed to listen on %v: %v", addr, err)
+ }
+ s := rpc.NewServer()
+ s.Register(receiver)
+ serv := &RpcServer{
+ ln: ln,
+ s: s,
+ }
+ return serv, nil
+}
+
+func (serv *RpcServer) Serve() {
+ for {
+ conn, err := serv.ln.Accept()
+ if err != nil {
+ Logf(0, "failed to accept an rpc connection: %v", err)
+ continue
+ }
+ conn.(*net.TCPConn).SetKeepAlive(true)
+ conn.(*net.TCPConn).SetKeepAlivePeriod(time.Minute)
+ go serv.s.ServeConn(conn)
+ }
+}
+
+func (serv *RpcServer) Addr() net.Addr {
+ return serv.ln.Addr()
+}
+
+type RpcClient struct {
+ conn net.Conn
+ c *rpc.Client
+}
+
+func NewRpcClient(addr string) (*RpcClient, error) {
+ conn, err := net.DialTimeout("tcp", addr, 15*time.Second)
+ if err != nil {
+ return nil, err
+ }
+ conn.(*net.TCPConn).SetKeepAlive(true)
+ conn.(*net.TCPConn).SetKeepAlivePeriod(time.Minute)
+ cli := &RpcClient{
+ conn: conn,
+ c: rpc.NewClient(conn),
+ }
+ return cli, nil
+}
+
+func (cli *RpcClient) Call(method string, args, reply interface{}) error {
+ cli.conn.SetDeadline(time.Now().Add(30 * time.Second))
+ err := cli.c.Call(method, args, reply)
+ cli.conn.SetDeadline(time.Time{})
+ return err
+}
+
+func (cli *RpcClient) Close() {
+ cli.c.Close()
+}
+
+func RpcCall(addr, method string, args, reply interface{}) error {
+ c, err := NewRpcClient(addr)
+ if err != nil {
+ return err
+ }
+ defer c.Close()
+ return c.Call(method, args, reply)
+}
diff --git a/rpctype/rpctype.go b/rpctype/rpctype.go
index 87148f05b..ced475156 100644
--- a/rpctype/rpctype.go
+++ b/rpctype/rpctype.go
@@ -24,6 +24,7 @@ type ConnectArgs struct {
type ConnectRes struct {
Prios [][]float32
+ Inputs []RpcInput
MaxSignal []uint32
Candidates []RpcCandidate
EnabledCalls string
diff --git a/syz-fuzzer/fuzzer.go b/syz-fuzzer/fuzzer.go
index 96b4dade0..e4eaf10e9 100644
--- a/syz-fuzzer/fuzzer.go
+++ b/syz-fuzzer/fuzzer.go
@@ -14,8 +14,6 @@ import (
"math/rand"
"net/http"
_ "net/http/pprof"
- "net/rpc"
- "net/rpc/jsonrpc"
"os"
"os/signal"
"runtime"
@@ -64,7 +62,7 @@ type Candidate struct {
}
var (
- manager *rpc.Client
+ manager *RpcClient
signalMu sync.RWMutex
corpusSignal map[uint32]struct{}
@@ -128,18 +126,16 @@ func main() {
corpusHashes = make(map[hash.Sig]struct{})
Logf(0, "dialing manager at %v", *flagManager)
- conn, err := jsonrpc.Dial("tcp", *flagManager)
- if err != nil {
- panic(err)
- }
- manager = conn
a := &ConnectArgs{*flagName}
r := &ConnectRes{}
- if err := manager.Call("Manager.Connect", a, r); err != nil {
+ if err := RpcCall(*flagManager, "Manager.Connect", a, r); err != nil {
panic(err)
}
calls := buildCallList(r.EnabledCalls)
ct := prog.BuildChoiceTable(r.Prios, calls)
+ for _, inp := range r.Inputs {
+ addInput(inp)
+ }
for _, s := range r.MaxSignal {
maxSignal[s] = struct{}{}
}
@@ -168,11 +164,21 @@ func main() {
for c := range calls {
a.Calls = append(a.Calls, c.Name)
}
- if err := manager.Call("Manager.Check", a, nil); err != nil {
+ if err := RpcCall(*flagManager, "Manager.Check", a, nil); err != nil {
panic(err)
}
}
+ // Manager.Connect reply can ve very large and that memory will be permanently cached in the connection.
+ // So we do the call on a transient connection, free all memory and reconnect.
+ // The rest of rpc requests have bounded size.
+ debug.FreeOSMemory()
+ if conn, err := NewRpcClient(*flagManager); err != nil {
+ panic(err)
+ } else {
+ manager = conn
+ }
+
kmemleakInit()
flags, timeout, err := ipc.DefaultFlags()
diff --git a/syz-hub/hub.go b/syz-hub/hub.go
index bb5128727..8b0be56d6 100644
--- a/syz-hub/hub.go
+++ b/syz-hub/hub.go
@@ -8,10 +8,7 @@ import (
"flag"
"fmt"
"io/ioutil"
- "net"
- "net/rpc"
"sync"
- "time"
. "github.com/google/syzkaller/log"
. "github.com/google/syzkaller/rpctype"
@@ -59,23 +56,12 @@ func main() {
hub.initHttp(cfg.Http)
- ln, err := net.Listen("tcp", cfg.Rpc)
+ s, err := NewRpcServer(cfg.Rpc, hub)
if err != nil {
- Fatalf("failed to listen on %v: %v", cfg.Rpc, err)
- }
- Logf(0, "serving rpc on tcp://%v", ln.Addr())
- s := rpc.NewServer()
- s.Register(hub)
- for {
- conn, err := ln.Accept()
- if err != nil {
- Logf(0, "failed to accept an rpc connection: %v", err)
- continue
- }
- conn.(*net.TCPConn).SetKeepAlive(true)
- conn.(*net.TCPConn).SetKeepAlivePeriod(time.Minute)
- go s.ServeConn(conn)
+ Fatalf("failed to create rpc server: %v", err)
}
+ Logf(0, "serving rpc on tcp://%v", s.Addr())
+ s.Serve()
}
func (hub *Hub) Connect(a *HubConnectArgs, r *int) error {
diff --git a/syz-manager/manager.go b/syz-manager/manager.go
index fb36b6d75..5b7ee98ba 100644
--- a/syz-manager/manager.go
+++ b/syz-manager/manager.go
@@ -11,8 +11,6 @@ import (
"io/ioutil"
"math/rand"
"net"
- "net/rpc"
- "net/rpc/jsonrpc"
"os"
"os/signal"
"path/filepath"
@@ -73,7 +71,7 @@ type Manager struct {
prios [][]float32
fuzzers map[string]*Fuzzer
- hub *rpc.Client
+ hub *RpcClient
hubCorpus map[hash.Sig]bool
}
@@ -200,26 +198,13 @@ func RunManager(cfg *config.Config, syscalls map[int]bool) {
mgr.initHttp()
// Create RPC server for fuzzers.
- ln, err := net.Listen("tcp", cfg.Rpc)
+ s, err := NewRpcServer(cfg.Rpc, mgr)
if err != nil {
- Fatalf("failed to listen on %v: %v", cfg.Rpc, err)
+ Fatalf("failed to create rpc server: %v", err)
}
- Logf(0, "serving rpc on tcp://%v", ln.Addr())
- mgr.port = ln.Addr().(*net.TCPAddr).Port
- s := rpc.NewServer()
- s.Register(mgr)
- go func() {
- for {
- conn, err := ln.Accept()
- if err != nil {
- Logf(0, "failed to accept an rpc connection: %v", err)
- continue
- }
- conn.(*net.TCPConn).SetKeepAlive(true)
- conn.(*net.TCPConn).SetKeepAlivePeriod(time.Minute)
- go s.ServeCodec(jsonrpc.NewServerCodec(conn))
- }
- }()
+ Logf(0, "serving rpc on tcp://%v", s.Addr())
+ mgr.port = s.Addr().(*net.TCPAddr).Port
+ go s.Serve()
go func() {
for lastTime := time.Now(); ; {
@@ -645,8 +630,10 @@ func (mgr *Manager) Connect(a *ConnectArgs, r *ConnectRes) error {
}
mgr.fuzzers[a.Name] = f
mgr.minimizeCorpus()
+
+ f.inputs = nil
for _, inp := range mgr.corpus {
- f.inputs = append(f.inputs, inp)
+ r.Inputs = append(r.Inputs, inp)
}
r.Prios = mgr.prios
r.EnabledCalls = mgr.enabledSyscalls
@@ -785,30 +772,35 @@ func (mgr *Manager) hubSync() {
mgr.minimizeCorpus()
if mgr.hub == nil {
- conn, err := rpc.Dial("tcp", mgr.cfg.Hub_Addr)
- if err != nil {
- Logf(0, "failed to connect to hub at %v: %v", mgr.cfg.Hub_Addr, err)
- return
- }
- mgr.hub = conn
a := &HubConnectArgs{
Name: mgr.cfg.Name,
Key: mgr.cfg.Hub_Key,
Fresh: mgr.fresh,
Calls: mgr.enabledCalls,
}
- mgr.hubCorpus = make(map[hash.Sig]bool)
+ hubCorpus := make(map[hash.Sig]bool)
for _, inp := range mgr.corpus {
- mgr.hubCorpus[hash.Hash(inp.Prog)] = true
+ hubCorpus[hash.Hash(inp.Prog)] = true
a.Corpus = append(a.Corpus, inp.Prog)
}
- if err := mgr.hub.Call("Hub.Connect", a, nil); err != nil {
+ mgr.mu.Unlock()
+ // Hub.Connect request can be very large, so do it on a transient connection
+ // (rpc connection buffers never shrink).
+ // Also don't do hub rpc's under the mutex -- hub can be slow or inaccessible.
+ if err := RpcCall(mgr.cfg.Hub_Addr, "Hub.Connect", a, nil); err != nil {
+ mgr.mu.Lock()
Logf(0, "Hub.Connect rpc failed: %v", err)
- mgr.hub.Close()
- mgr.hub = nil
- mgr.hubCorpus = nil
return
}
+ conn, err := NewRpcClient(mgr.cfg.Hub_Addr)
+ if err != nil {
+ mgr.mu.Lock()
+ Logf(0, "failed to connect to hub at %v: %v", mgr.cfg.Hub_Addr, err)
+ return
+ }
+ mgr.mu.Lock()
+ mgr.hub = conn
+ mgr.hubCorpus = hubCorpus
mgr.fresh = false
Logf(0, "connected to hub at %v, corpus %v", mgr.cfg.Hub_Addr, len(mgr.corpus))
}
@@ -834,13 +826,16 @@ func (mgr *Manager) hubSync() {
delete(mgr.hubCorpus, sig)
a.Del = append(a.Del, sig.String())
}
+ mgr.mu.Unlock()
r := new(HubSyncRes)
if err := mgr.hub.Call("Hub.Sync", a, r); err != nil {
+ mgr.mu.Lock()
Logf(0, "Hub.Sync rpc failed: %v", err)
mgr.hub.Close()
mgr.hub = nil
return
}
+ mgr.mu.Lock()
dropped := 0
for _, inp := range r.Inputs {
_, err := prog.Deserialize(inp)