diff options
| -rw-r--r-- | rpctype/rpc.go | 4 | ||||
| -rw-r--r-- | rpctype/rpctype.go | 1 | ||||
| -rw-r--r-- | syz-hub/hub.go | 5 | ||||
| -rw-r--r-- | syz-hub/state/state.go | 58 | ||||
| -rw-r--r-- | syz-hub/state/state_test.go | 2 | ||||
| -rw-r--r-- | syz-manager/manager.go | 55 |
6 files changed, 84 insertions, 41 deletions
diff --git a/rpctype/rpc.go b/rpctype/rpc.go index e2005dbcb..44569a676 100644 --- a/rpctype/rpc.go +++ b/rpctype/rpc.go @@ -54,7 +54,7 @@ type RpcClient struct { } func NewRpcClient(addr string) (*RpcClient, error) { - conn, err := net.DialTimeout("tcp", addr, 30*time.Second) + conn, err := net.DialTimeout("tcp", addr, 60*time.Second) if err != nil { return nil, err } @@ -68,7 +68,7 @@ func NewRpcClient(addr string) (*RpcClient, error) { } func (cli *RpcClient) Call(method string, args, reply interface{}) error { - cli.conn.SetDeadline(time.Now().Add(60 * time.Second)) + cli.conn.SetDeadline(time.Now().Add(5 * 60 * time.Second)) err := cli.c.Call(method, args, reply) cli.conn.SetDeadline(time.Time{}) return err diff --git a/rpctype/rpctype.go b/rpctype/rpctype.go index ced475156..3adef1962 100644 --- a/rpctype/rpctype.go +++ b/rpctype/rpctype.go @@ -71,4 +71,5 @@ type HubSyncArgs struct { type HubSyncRes struct { Inputs [][]byte + More int } diff --git a/syz-hub/hub.go b/syz-hub/hub.go index 8b0be56d6..74a8740d9 100644 --- a/syz-hub/hub.go +++ b/syz-hub/hub.go @@ -88,13 +88,14 @@ func (hub *Hub) Sync(a *HubSyncArgs, r *HubSyncRes) error { hub.mu.Lock() defer hub.mu.Unlock() - inputs, err := hub.st.Sync(a.Name, a.Add, a.Del) + inputs, more, err := hub.st.Sync(a.Name, a.Add, a.Del) if err != nil { Logf(0, "sync error: %v", err) return err } r.Inputs = inputs - Logf(0, "sync from %v: add=%v del=%v new=%v", a.Name, len(a.Add), len(a.Del), len(inputs)) + r.More = more + Logf(0, "sync from %v: add=%v del=%v new=%v pending=%v", a.Name, len(a.Add), len(a.Del), len(inputs), more) return nil } diff --git a/syz-hub/state/state.go b/syz-hub/state/state.go index eb718cd73..fa8ea081e 100644 --- a/syz-hub/state/state.go +++ b/syz-hub/state/state.go @@ -8,6 +8,7 @@ import ( "io/ioutil" "os" "path/filepath" + "sort" "strconv" "time" @@ -136,10 +137,10 @@ func (st *State) Connect(name string, fresh bool, calls []string, corpus [][]byt return nil } -func (st *State) Sync(name string, add [][]byte, del []string) ([][]byte, error) { +func (st *State) Sync(name string, add [][]byte, del []string) ([][]byte, int, error) { mgr := st.Managers[name] if mgr == nil || mgr.Connected.IsZero() { - return nil, fmt.Errorf("unconnected manager %v", name) + return nil, 0, fmt.Errorf("unconnected manager %v", name) } if len(del) != 0 { for _, sig := range del { @@ -151,20 +152,20 @@ func (st *State) Sync(name string, add [][]byte, del []string) ([][]byte, error) st.purgeCorpus() } st.addInputs(mgr, add) - inputs, err := st.pendingInputs(mgr) + inputs, more, err := st.pendingInputs(mgr) mgr.Added += len(add) mgr.Deleted += len(del) mgr.New += len(inputs) - return inputs, err + return inputs, more, err } -func (st *State) pendingInputs(mgr *Manager) ([][]byte, error) { +func (st *State) pendingInputs(mgr *Manager) ([][]byte, int, error) { if mgr.seq == st.seq { - return nil, nil + return nil, 0, nil } - var inputs [][]byte + var records []db.Record for key, rec := range st.Corpus.Records { - if mgr.seq > rec.Seq { + if mgr.seq >= rec.Seq { continue } if _, ok := mgr.Corpus.Records[key]; ok { @@ -172,16 +173,35 @@ func (st *State) pendingInputs(mgr *Manager) ([][]byte, error) { } calls, err := prog.CallSet(rec.Val) if err != nil { - return nil, fmt.Errorf("failed to extract call set: %v\nprogram: %s", err, rec.Val) + return nil, 0, fmt.Errorf("failed to extract call set: %v\nprogram: %s", err, rec.Val) } if !managerSupportsAllCalls(mgr.Calls, calls) { continue } - inputs = append(inputs, rec.Val) + records = append(records, rec) } - mgr.seq = st.seq + maxSeq := st.seq + more := 0 + // Send at most that many records (rounded up to next seq number). + const maxRecords = 1000 + if len(records) > maxRecords { + sort.Sort(recordSeqSorter(records)) + pos := maxRecords + maxSeq = records[pos].Seq + for pos+1 < len(records) && records[pos+1].Seq == maxSeq { + pos++ + } + pos++ + more = len(records) - pos + records = records[:pos] + } + inputs := make([][]byte, len(records)) + for i, rec := range records { + inputs[i] = rec.Val + } + mgr.seq = maxSeq writeFile(filepath.Join(mgr.dir, "seq"), []byte(fmt.Sprint(mgr.seq))) - return inputs, nil + return inputs, more, nil } func (st *State) addInputs(mgr *Manager, inputs [][]byte) { @@ -244,3 +264,17 @@ func managerSupportsAllCalls(mgr, prog map[string]struct{}) bool { } return true } + +type recordSeqSorter []db.Record + +func (a recordSeqSorter) Len() int { + return len(a) +} + +func (a recordSeqSorter) Less(i, j int) bool { + return a[i].Seq < a[j].Seq +} + +func (a recordSeqSorter) Swap(i, j int) { + a[i], a[j] = a[j], a[i] +} diff --git a/syz-hub/state/state_test.go b/syz-hub/state/state_test.go index 0fd66da2c..bfdf4b3a7 100644 --- a/syz-hub/state/state_test.go +++ b/syz-hub/state/state_test.go @@ -20,7 +20,7 @@ func TestState(t *testing.T) { if err != nil { t.Fatalf("failed to make state: %v", err) } - _, err = st.Sync("foo", nil, nil) + _, _, err = st.Sync("foo", nil, nil) if err == nil { t.Fatalf("synced with unconnected manager") } diff --git a/syz-manager/manager.go b/syz-manager/manager.go index 6c0a1b3ac..55f387473 100644 --- a/syz-manager/manager.go +++ b/syz-manager/manager.go @@ -901,31 +901,38 @@ func (mgr *Manager) hubSync() { delete(mgr.hubCorpus, sig) a.Del = append(a.Del, sig.String()) } - mgr.mu.Unlock() - r := new(HubSyncRes) - if err := mgr.hub.Call("Hub.Sync", a, r); err != nil { + for { + mgr.mu.Unlock() + r := new(HubSyncRes) + if err := mgr.hub.Call("Hub.Sync", a, r); err != nil { + mgr.mu.Lock() + Logf(0, "Hub.Sync rpc failed: %v", err) + mgr.hub.Close() + mgr.hub = nil + return + } mgr.mu.Lock() - Logf(0, "Hub.Sync rpc failed: %v", err) - mgr.hub.Close() - mgr.hub = nil - return - } - mgr.mu.Lock() - dropped := 0 - for _, inp := range r.Inputs { - _, err := prog.Deserialize(inp) - if err != nil { - dropped++ - continue + dropped := 0 + for _, inp := range r.Inputs { + _, err := prog.Deserialize(inp) + if err != nil { + dropped++ + continue + } + mgr.candidates = append(mgr.candidates, RpcCandidate{ + Prog: inp, + Minimized: false, // don't trust programs from hub + }) + } + mgr.stats["hub add"] += uint64(len(a.Add)) + mgr.stats["hub del"] += uint64(len(a.Del)) + mgr.stats["hub drop"] += uint64(dropped) + mgr.stats["hub new"] += uint64(len(r.Inputs) - dropped) + Logf(0, "hub sync: add %v, del %v, drop %v, new %v, more %v", len(a.Add), len(a.Del), dropped, len(r.Inputs)-dropped, r.More) + if len(r.Inputs)+r.More == 0 { + break } - mgr.candidates = append(mgr.candidates, RpcCandidate{ - Prog: inp, - Minimized: false, // don't trust programs from hub - }) + a.Add = nil + a.Del = nil } - mgr.stats["hub add"] += uint64(len(a.Add)) - mgr.stats["hub del"] += uint64(len(a.Del)) - mgr.stats["hub drop"] += uint64(dropped) - mgr.stats["hub new"] += uint64(len(r.Inputs) - dropped) - Logf(0, "hub sync: add %v, del %v, drop %v, new %v", len(a.Add), len(a.Del), dropped, len(r.Inputs)-dropped) } |
