aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--rpctype/rpc.go4
-rw-r--r--rpctype/rpctype.go1
-rw-r--r--syz-hub/hub.go5
-rw-r--r--syz-hub/state/state.go58
-rw-r--r--syz-hub/state/state_test.go2
-rw-r--r--syz-manager/manager.go55
6 files changed, 84 insertions, 41 deletions
diff --git a/rpctype/rpc.go b/rpctype/rpc.go
index e2005dbcb..44569a676 100644
--- a/rpctype/rpc.go
+++ b/rpctype/rpc.go
@@ -54,7 +54,7 @@ type RpcClient struct {
}
func NewRpcClient(addr string) (*RpcClient, error) {
- conn, err := net.DialTimeout("tcp", addr, 30*time.Second)
+ conn, err := net.DialTimeout("tcp", addr, 60*time.Second)
if err != nil {
return nil, err
}
@@ -68,7 +68,7 @@ func NewRpcClient(addr string) (*RpcClient, error) {
}
func (cli *RpcClient) Call(method string, args, reply interface{}) error {
- cli.conn.SetDeadline(time.Now().Add(60 * time.Second))
+ cli.conn.SetDeadline(time.Now().Add(5 * 60 * time.Second))
err := cli.c.Call(method, args, reply)
cli.conn.SetDeadline(time.Time{})
return err
diff --git a/rpctype/rpctype.go b/rpctype/rpctype.go
index ced475156..3adef1962 100644
--- a/rpctype/rpctype.go
+++ b/rpctype/rpctype.go
@@ -71,4 +71,5 @@ type HubSyncArgs struct {
type HubSyncRes struct {
Inputs [][]byte
+ More int
}
diff --git a/syz-hub/hub.go b/syz-hub/hub.go
index 8b0be56d6..74a8740d9 100644
--- a/syz-hub/hub.go
+++ b/syz-hub/hub.go
@@ -88,13 +88,14 @@ func (hub *Hub) Sync(a *HubSyncArgs, r *HubSyncRes) error {
hub.mu.Lock()
defer hub.mu.Unlock()
- inputs, err := hub.st.Sync(a.Name, a.Add, a.Del)
+ inputs, more, err := hub.st.Sync(a.Name, a.Add, a.Del)
if err != nil {
Logf(0, "sync error: %v", err)
return err
}
r.Inputs = inputs
- Logf(0, "sync from %v: add=%v del=%v new=%v", a.Name, len(a.Add), len(a.Del), len(inputs))
+ r.More = more
+ Logf(0, "sync from %v: add=%v del=%v new=%v pending=%v", a.Name, len(a.Add), len(a.Del), len(inputs), more)
return nil
}
diff --git a/syz-hub/state/state.go b/syz-hub/state/state.go
index eb718cd73..fa8ea081e 100644
--- a/syz-hub/state/state.go
+++ b/syz-hub/state/state.go
@@ -8,6 +8,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
+ "sort"
"strconv"
"time"
@@ -136,10 +137,10 @@ func (st *State) Connect(name string, fresh bool, calls []string, corpus [][]byt
return nil
}
-func (st *State) Sync(name string, add [][]byte, del []string) ([][]byte, error) {
+func (st *State) Sync(name string, add [][]byte, del []string) ([][]byte, int, error) {
mgr := st.Managers[name]
if mgr == nil || mgr.Connected.IsZero() {
- return nil, fmt.Errorf("unconnected manager %v", name)
+ return nil, 0, fmt.Errorf("unconnected manager %v", name)
}
if len(del) != 0 {
for _, sig := range del {
@@ -151,20 +152,20 @@ func (st *State) Sync(name string, add [][]byte, del []string) ([][]byte, error)
st.purgeCorpus()
}
st.addInputs(mgr, add)
- inputs, err := st.pendingInputs(mgr)
+ inputs, more, err := st.pendingInputs(mgr)
mgr.Added += len(add)
mgr.Deleted += len(del)
mgr.New += len(inputs)
- return inputs, err
+ return inputs, more, err
}
-func (st *State) pendingInputs(mgr *Manager) ([][]byte, error) {
+func (st *State) pendingInputs(mgr *Manager) ([][]byte, int, error) {
if mgr.seq == st.seq {
- return nil, nil
+ return nil, 0, nil
}
- var inputs [][]byte
+ var records []db.Record
for key, rec := range st.Corpus.Records {
- if mgr.seq > rec.Seq {
+ if mgr.seq >= rec.Seq {
continue
}
if _, ok := mgr.Corpus.Records[key]; ok {
@@ -172,16 +173,35 @@ func (st *State) pendingInputs(mgr *Manager) ([][]byte, error) {
}
calls, err := prog.CallSet(rec.Val)
if err != nil {
- return nil, fmt.Errorf("failed to extract call set: %v\nprogram: %s", err, rec.Val)
+ return nil, 0, fmt.Errorf("failed to extract call set: %v\nprogram: %s", err, rec.Val)
}
if !managerSupportsAllCalls(mgr.Calls, calls) {
continue
}
- inputs = append(inputs, rec.Val)
+ records = append(records, rec)
}
- mgr.seq = st.seq
+ maxSeq := st.seq
+ more := 0
+ // Send at most that many records (rounded up to next seq number).
+ const maxRecords = 1000
+ if len(records) > maxRecords {
+ sort.Sort(recordSeqSorter(records))
+ pos := maxRecords
+ maxSeq = records[pos].Seq
+ for pos+1 < len(records) && records[pos+1].Seq == maxSeq {
+ pos++
+ }
+ pos++
+ more = len(records) - pos
+ records = records[:pos]
+ }
+ inputs := make([][]byte, len(records))
+ for i, rec := range records {
+ inputs[i] = rec.Val
+ }
+ mgr.seq = maxSeq
writeFile(filepath.Join(mgr.dir, "seq"), []byte(fmt.Sprint(mgr.seq)))
- return inputs, nil
+ return inputs, more, nil
}
func (st *State) addInputs(mgr *Manager, inputs [][]byte) {
@@ -244,3 +264,17 @@ func managerSupportsAllCalls(mgr, prog map[string]struct{}) bool {
}
return true
}
+
+type recordSeqSorter []db.Record
+
+func (a recordSeqSorter) Len() int {
+ return len(a)
+}
+
+func (a recordSeqSorter) Less(i, j int) bool {
+ return a[i].Seq < a[j].Seq
+}
+
+func (a recordSeqSorter) Swap(i, j int) {
+ a[i], a[j] = a[j], a[i]
+}
diff --git a/syz-hub/state/state_test.go b/syz-hub/state/state_test.go
index 0fd66da2c..bfdf4b3a7 100644
--- a/syz-hub/state/state_test.go
+++ b/syz-hub/state/state_test.go
@@ -20,7 +20,7 @@ func TestState(t *testing.T) {
if err != nil {
t.Fatalf("failed to make state: %v", err)
}
- _, err = st.Sync("foo", nil, nil)
+ _, _, err = st.Sync("foo", nil, nil)
if err == nil {
t.Fatalf("synced with unconnected manager")
}
diff --git a/syz-manager/manager.go b/syz-manager/manager.go
index 6c0a1b3ac..55f387473 100644
--- a/syz-manager/manager.go
+++ b/syz-manager/manager.go
@@ -901,31 +901,38 @@ func (mgr *Manager) hubSync() {
delete(mgr.hubCorpus, sig)
a.Del = append(a.Del, sig.String())
}
- mgr.mu.Unlock()
- r := new(HubSyncRes)
- if err := mgr.hub.Call("Hub.Sync", a, r); err != nil {
+ for {
+ mgr.mu.Unlock()
+ r := new(HubSyncRes)
+ if err := mgr.hub.Call("Hub.Sync", a, r); err != nil {
+ mgr.mu.Lock()
+ Logf(0, "Hub.Sync rpc failed: %v", err)
+ mgr.hub.Close()
+ mgr.hub = nil
+ return
+ }
mgr.mu.Lock()
- Logf(0, "Hub.Sync rpc failed: %v", err)
- mgr.hub.Close()
- mgr.hub = nil
- return
- }
- mgr.mu.Lock()
- dropped := 0
- for _, inp := range r.Inputs {
- _, err := prog.Deserialize(inp)
- if err != nil {
- dropped++
- continue
+ dropped := 0
+ for _, inp := range r.Inputs {
+ _, err := prog.Deserialize(inp)
+ if err != nil {
+ dropped++
+ continue
+ }
+ mgr.candidates = append(mgr.candidates, RpcCandidate{
+ Prog: inp,
+ Minimized: false, // don't trust programs from hub
+ })
+ }
+ mgr.stats["hub add"] += uint64(len(a.Add))
+ mgr.stats["hub del"] += uint64(len(a.Del))
+ mgr.stats["hub drop"] += uint64(dropped)
+ mgr.stats["hub new"] += uint64(len(r.Inputs) - dropped)
+ Logf(0, "hub sync: add %v, del %v, drop %v, new %v, more %v", len(a.Add), len(a.Del), dropped, len(r.Inputs)-dropped, r.More)
+ if len(r.Inputs)+r.More == 0 {
+ break
}
- mgr.candidates = append(mgr.candidates, RpcCandidate{
- Prog: inp,
- Minimized: false, // don't trust programs from hub
- })
+ a.Add = nil
+ a.Del = nil
}
- mgr.stats["hub add"] += uint64(len(a.Add))
- mgr.stats["hub del"] += uint64(len(a.Del))
- mgr.stats["hub drop"] += uint64(dropped)
- mgr.stats["hub new"] += uint64(len(r.Inputs) - dropped)
- Logf(0, "hub sync: add %v, del %v, drop %v, new %v", len(a.Add), len(a.Del), dropped, len(r.Inputs)-dropped)
}