diff options
| author | Dmitry Vyukov <dvyukov@google.com> | 2018-08-02 16:44:21 +0200 |
|---|---|---|
| committer | Dmitry Vyukov <dvyukov@google.com> | 2018-08-02 16:57:32 +0200 |
| commit | 976e4de0480dff760fdc9bd1b21cbe81e46625e5 (patch) | |
| tree | c674e2bb09392030fb02ccc74e2f6271d40a2ba4 /syz-manager/hub.go | |
| parent | fbedd425b575868124577640fe02c9c0a73c4be3 (diff) | |
syz-manager: refactor work with hub
Move work with hub into a separate file and fully separate
its state from the rest of the manager state.
First step towards splitting manager into managable parts.
This also required to rework stats as they are used throughout the code.
Update #538
Update #605
Diffstat (limited to 'syz-manager/hub.go')
| -rw-r--r-- | syz-manager/hub.go | 192 |
1 files changed, 192 insertions, 0 deletions
diff --git a/syz-manager/hub.go b/syz-manager/hub.go new file mode 100644 index 000000000..dee4bcfd9 --- /dev/null +++ b/syz-manager/hub.go @@ -0,0 +1,192 @@ +// Copyright 2018 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package main + +import ( + "time" + + "github.com/google/syzkaller/pkg/hash" + "github.com/google/syzkaller/pkg/log" + "github.com/google/syzkaller/pkg/mgrconfig" + "github.com/google/syzkaller/pkg/report" + "github.com/google/syzkaller/pkg/rpctype" + "github.com/google/syzkaller/prog" +) + +func (mgr *Manager) hubSyncLoop() { + hc := &HubConnector{ + mgr: mgr, + cfg: mgr.cfg, + target: mgr.target, + stats: mgr.stats, + enabledCalls: mgr.checkResult.EnabledCalls, + fresh: mgr.fresh, + hubReproQueue: mgr.hubReproQueue, + } + if mgr.cfg.Reproduce && mgr.dash != nil { + hc.needMoreRepros = mgr.needMoreRepros + } + hc.loop() +} + +type HubConnector struct { + mgr HubManagerView + cfg *mgrconfig.Config + target *prog.Target + stats *Stats + enabledCalls []int + fresh bool + hubCorpus map[hash.Sig]bool + newRepros [][]byte + hubReproQueue chan *Crash + needMoreRepros chan chan bool +} + +// HubManagerView restricts interface between HubConnector and Manager. +type HubManagerView interface { + getMinimizedCorpus() (corpus, repros [][]byte) + addNewCandidates(progs [][]byte) +} + +func (hc *HubConnector) loop() { + var hub *rpctype.RPCClient + for { + time.Sleep(time.Minute) + corpus, repros := hc.mgr.getMinimizedCorpus() + hc.newRepros = append(hc.newRepros, repros...) + if hub == nil { + var err error + if hub, err = hc.connect(corpus); err != nil { + log.Logf(0, "failed to connect to hub at %v: %v", hc.cfg.HubAddr, err) + continue + } + log.Logf(0, "connected to hub at %v, corpus %v", hc.cfg.HubAddr, len(corpus)) + } + if err := hc.sync(hub, corpus); err != nil { + log.Logf(0, "hub sync failed: %v", err) + hub.Close() + hub = nil + } + } +} + +func (hc *HubConnector) connect(corpus [][]byte) (*rpctype.RPCClient, error) { + a := &rpctype.HubConnectArgs{ + Client: hc.cfg.HubClient, + Key: hc.cfg.HubKey, + Manager: hc.cfg.Name, + Fresh: hc.fresh, + } + for _, id := range hc.enabledCalls { + a.Calls = append(a.Calls, hc.target.Syscalls[id].Name) + } + hubCorpus := make(map[hash.Sig]bool) + for _, inp := range corpus { + hubCorpus[hash.Hash(inp)] = true + a.Corpus = append(a.Corpus, inp) + } + // Hub.Connect request can be very large, so do it on a transient connection + // (rpc connection buffers never shrink). + if err := rpctype.RPCCall(hc.cfg.HubAddr, "Hub.Connect", a, nil); err != nil { + return nil, err + } + hub, err := rpctype.NewRPCClient(hc.cfg.HubAddr) + if err != nil { + return nil, err + } + hc.hubCorpus = hubCorpus + hc.fresh = false + return hub, nil +} + +func (hc *HubConnector) sync(hub *rpctype.RPCClient, corpus [][]byte) error { + a := &rpctype.HubSyncArgs{ + Client: hc.cfg.HubClient, + Key: hc.cfg.HubKey, + Manager: hc.cfg.Name, + } + sigs := make(map[hash.Sig]bool) + for _, inp := range corpus { + sig := hash.Hash(inp) + sigs[sig] = true + if hc.hubCorpus[sig] { + continue + } + hc.hubCorpus[sig] = true + a.Add = append(a.Add, inp) + } + for sig := range hc.hubCorpus { + if sigs[sig] { + continue + } + delete(hc.hubCorpus, sig) + a.Del = append(a.Del, sig.String()) + } + if hc.needMoreRepros != nil { + needReproReply := make(chan bool) + hc.needMoreRepros <- needReproReply + a.NeedRepros = <-needReproReply + } + a.Repros = hc.newRepros + for { + r := new(rpctype.HubSyncRes) + if err := hub.Call("Hub.Sync", a, r); err != nil { + return err + } + progDropped := hc.processProgs(r.Progs) + reproDropped := hc.processRepros(r.Repros) + hc.stats.hubSendProgAdd.add(len(a.Add)) + hc.stats.hubSendProgDel.add(len(a.Del)) + hc.stats.hubSendRepro.add(len(a.Repros)) + hc.stats.hubRecvProg.add(len(r.Progs) - progDropped) + hc.stats.hubRecvProgDrop.add(progDropped) + hc.stats.hubRecvRepro.add(len(r.Repros) - reproDropped) + hc.stats.hubRecvReproDrop.add(reproDropped) + log.Logf(0, "hub sync: send: add %v, del %v, repros %v;"+ + " recv: progs %v, repros %v; more %v", + len(a.Add), len(a.Del), len(a.Repros), + len(r.Progs)-progDropped, len(r.Repros)-reproDropped, r.More) + a.Add = nil + a.Del = nil + a.Repros = nil + a.NeedRepros = false + hc.newRepros = nil + if len(r.Progs)+r.More == 0 { + return nil + } + } +} + +func (hc *HubConnector) processProgs(progs [][]byte) int { + dropped := 0 + candidates := make([][]byte, 0, len(progs)) + for _, inp := range progs { + if _, err := hc.target.Deserialize(inp); err != nil { + dropped++ + continue + } + candidates = append(candidates, inp) + } + hc.mgr.addNewCandidates(candidates) + return dropped +} + +func (hc *HubConnector) processRepros(repros [][]byte) int { + dropped := 0 + for _, repro := range repros { + if _, err := hc.target.Deserialize(repro); err != nil { + dropped++ + continue + } + hc.hubReproQueue <- &Crash{ + vmIndex: -1, + hub: true, + Report: &report.Report{ + Title: "external repro", + Output: repro, + }, + } + } + return dropped +} |
