From b1e98de8a6c6e31544193f74130911d8b45bbd0a Mon Sep 17 00:00:00 2001 From: Dmitry Vyukov Date: Thu, 26 Nov 2015 15:56:18 +0100 Subject: fuzzer: parallel execution mode Config now have procs parameter that controls number of parallel executor's running in each VM. --- fuzzer/fuzzer.go | 290 ++++++++++++++++++++++++++++++++++++++++--------------- manager/main.go | 7 +- vm/qemu/qemu.go | 4 +- vm/vm.go | 1 + 4 files changed, 223 insertions(+), 79 deletions(-) diff --git a/fuzzer/fuzzer.go b/fuzzer/fuzzer.go index 252b61c31..cb0237cd5 100644 --- a/fuzzer/fuzzer.go +++ b/fuzzer/fuzzer.go @@ -19,6 +19,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/google/syzkaller/cover" @@ -37,8 +38,8 @@ var ( flagSyscalls = flag.String("calls", "", "comma-delimited list of enabled syscall IDs (empty string for all syscalls)") flagNoCover = flag.Bool("nocover", false, "disable coverage collection/handling") flagDropPrivs = flag.Bool("dropprivs", true, "impersonate into nobody") - - flagV = flag.Int("v", 0, "verbosity") + flagProcs = flag.Int("procs", 1, "number of parallel test processes") + flagV = flag.Int("v", 0, "verbosity") ) const ( @@ -58,14 +59,22 @@ type Input struct { } var ( - corpusCover []cover.Cover - maxCover []cover.Cover - flakes cover.Cover + manager *rpc.Client + + coverMu sync.RWMutex + corpusCover []cover.Cover + maxCover []cover.Cover + flakes cover.Cover + + corpusMu sync.RWMutex corpus []Input corpusHashes map[Sig]struct{} - triage []Input - manager *rpc.Client - ct *prog.ChoiceTable + + triageMu sync.RWMutex + triage []Input + candidates []*prog.Prog + + gate *Gate statExecGen uint64 statExecFuzz uint64 @@ -105,7 +114,7 @@ func main() { if err := manager.Call("Manager.Connect", a, r); err != nil { panic(err) } - ct = prog.BuildChoiceTable(r.Prios, calls) + ct := prog.BuildChoiceTable(r.Prios, calls) flags := ipc.FlagThreaded | ipc.FlagCollide if *flagStrace { @@ -117,49 +126,102 @@ func main() { if *flagDropPrivs { flags |= ipc.FlagDropPrivs } - env, err := ipc.MakeEnv(*flagExecutor, 10*time.Second, flags) - if err != nil { - panic(err) + if *flagProcs <= 0 { + *flagProcs = 1 + } + + gate = newGate(2 * *flagProcs) + envs := make([]*ipc.Env, *flagProcs) + for pid := 0; pid < *flagProcs; pid++ { + env, err := ipc.MakeEnv(*flagExecutor, 10*time.Second, flags) + if err != nil { + panic(err) + } + envs[pid] = env + + pid := pid + go func() { + rs := rand.NewSource(time.Now().UnixNano()) + rnd := rand.New(rs) + + for i := 0; ; i++ { + triageMu.RLock() + if len(triage) != 0 || len(candidates) != 0 { + triageMu.RUnlock() + triageMu.Lock() + if len(triage) != 0 { + last := len(triage) - 1 + inp := triage[last] + triage = triage[:last] + triageMu.Unlock() + logf(1, "triaging : %s", inp.p) + triageInput(pid, env, inp) + continue + } else if len(candidates) != 0 { + last := len(candidates) - 1 + p := candidates[last] + candidates = candidates[:last] + triageMu.Unlock() + execute(pid, env, p, &statExecCandidate) + continue + } else { + triageMu.Unlock() + } + } else { + triageMu.RUnlock() + } + + corpusMu.RLock() + if len(corpus) == 0 || i%10 == 0 { + corpusMu.RUnlock() + p := prog.Generate(rnd, programLength, ct) + logf(1, "#%v: generated: %s", i, p) + execute(pid, env, p, &statExecGen) + p.Mutate(rnd, programLength, ct) + logf(1, "#%v: mutated: %s", i, p) + execute(pid, env, p, &statExecFuzz) + } else { + inp := corpus[rnd.Intn(len(corpus))] + corpusMu.RUnlock() + p := inp.p.Clone() + p.Mutate(rs, programLength, ct) + logf(1, "#%v: mutated: %s <- %s", i, p, inp.p) + execute(pid, env, p, &statExecFuzz) + } + } + }() } - rs := rand.NewSource(time.Now().UnixNano()) - rnd := rand.New(rs) + var lastPoll time.Time var lastPrint time.Time - for i := 0; ; i++ { + for range time.NewTicker(3 * time.Second).C { if !*flagSaveProg && time.Since(lastPrint) > 10*time.Second { // Keep-alive for manager. - logf(0, "#%v: alive", i) + logf(0, "alive") lastPrint = time.Now() } - if len(triage) != 0 { - last := len(triage) - 1 - inp := triage[last] - triage = triage[:last] - logf(1, "#%v: triaging : %s", i, inp.p) - triageInput(env, inp) - continue - } if time.Since(lastPoll) > 10*time.Second { + triageMu.RLock() + if len(candidates) != 0 { + triageMu.RUnlock() + continue + } + triageMu.RUnlock() + a := &ManagerPollArgs{ Name: *flagName, Stats: make(map[string]uint64), } - a.Stats["exec total"] = env.StatExecs - env.StatExecs = 0 - a.Stats["executor restarts"] = env.StatRestarts - env.StatRestarts = 0 - a.Stats["exec gen"] = statExecGen - statExecGen = 0 - a.Stats["exec fuzz"] = statExecFuzz - statExecFuzz = 0 - a.Stats["exec candidate"] = statExecCandidate - statExecCandidate = 0 - a.Stats["exec triage"] = statExecTriage - statExecTriage = 0 - a.Stats["exec minimize"] = statExecMinimize - statExecMinimize = 0 - a.Stats["fuzzer new inputs"] = statNewInput - statNewInput = 0 + for _, env := range envs { + a.Stats["exec total"] += atomic.SwapUint64(&env.StatExecs, 0) + a.Stats["executor restarts"] += atomic.SwapUint64(&env.StatRestarts, 0) + } + a.Stats["exec gen"] = atomic.SwapUint64(&statExecGen, 0) + a.Stats["exec fuzz"] = atomic.SwapUint64(&statExecFuzz, 0) + a.Stats["exec candidate"] = atomic.SwapUint64(&statExecCandidate, 0) + a.Stats["exec triage"] = atomic.SwapUint64(&statExecTriage, 0) + a.Stats["exec minimize"] = atomic.SwapUint64(&statExecMinimize, 0) + a.Stats["fuzzer new inputs"] = atomic.SwapUint64(&statNewInput, 0) r := &ManagerPollRes{} if err := manager.Call("Manager.Poll", a, r); err != nil { panic(err) @@ -174,34 +236,28 @@ func main() { } if *flagNoCover { inp := Input{p, 0, nil} + corpusMu.Lock() corpus = append(corpus, inp) + corpusMu.Unlock() } else { - execute(env, p, &statExecCandidate) + triageMu.Lock() + candidates = append(candidates, p) + triageMu.Unlock() } } if len(r.NewInputs) == 0 && len(r.Candidates) == 0 { lastPoll = time.Now() } - continue - } - if len(corpus) == 0 || i%10 == 0 { - p := prog.Generate(rnd, programLength, ct) - logf(1, "#%v: generated: %s", i, p) - execute(env, p, &statExecGen) - p.Mutate(rnd, programLength, ct) - logf(1, "#%v: mutated: %s", i, p) - execute(env, p, &statExecFuzz) - } else { - inp := corpus[rnd.Intn(len(corpus))] - p := inp.p.Clone() - p.Mutate(rs, programLength, ct) - logf(1, "#%v: mutated: %s <- %s", i, p, inp.p) - execute(env, p, &statExecFuzz) } } } func addInput(inp RpcInput) { + corpusMu.Lock() + defer corpusMu.Unlock() + coverMu.Lock() + defer coverMu.Unlock() + if *flagNoCover { panic("should not be called when coverage is disabled") } @@ -230,41 +286,55 @@ func addInput(inp RpcInput) { corpusHashes[hash(inp.Prog)] = struct{}{} } -func triageInput(env *ipc.Env, inp Input) { +func triageInput(pid int, env *ipc.Env, inp Input) { if *flagNoCover { panic("should not be called when coverage is disabled") } + call := inp.p.Calls[inp.call].Meta + coverMu.RLock() newCover := cover.Difference(inp.cover, corpusCover[call.CallID]) newCover = cover.Difference(newCover, flakes) + coverMu.RUnlock() if len(newCover) == 0 { return } + corpusMu.RLock() if _, ok := corpusHashes[hash(inp.p.Serialize())]; ok { + corpusMu.RUnlock() return } + corpusMu.RUnlock() minCover := inp.cover for i := 0; i < 3; i++ { - allCover := execute1(env, inp.p, &statExecTriage) + allCover := execute1(pid, env, inp.p, &statExecTriage) if len(allCover[inp.call]) == 0 { // The call was not executed. Happens sometimes, reason unknown. continue } + coverMu.RLock() cov := allCover[inp.call] diff := cover.SymmetricDifference(inp.cover, cov) - if len(diff) != 0 { + minCover = cover.Intersection(minCover, cov) + updateFlakes := len(diff) != 0 && len(cover.Difference(diff, flakes)) != 0 + coverMu.RUnlock() + if updateFlakes { + coverMu.Lock() flakes = cover.Union(flakes, diff) + coverMu.Unlock() } - minCover = cover.Intersection(minCover, cov) } stableNewCover := cover.Intersection(newCover, minCover) if len(stableNewCover) == 0 { return } inp.p, inp.call = prog.Minimize(inp.p, inp.call, func(p1 *prog.Prog, call1 int) bool { - allCover := execute1(env, p1, &statExecMinimize) + allCover := execute1(pid, env, p1, &statExecMinimize) + coverMu.RLock() + defer coverMu.RUnlock() + if len(allCover[call1]) == 0 { return false // The call was not executed. } @@ -276,22 +346,29 @@ func triageInput(env *ipc.Env, inp Input) { return true }) inp.cover = minCover - corpusCover[call.CallID] = cover.Union(corpusCover[call.CallID], minCover) - corpus = append(corpus, inp) - data := inp.p.Serialize() - corpusHashes[hash(data)] = struct{}{} + atomic.AddUint64(&statNewInput, 1) + data := inp.p.Serialize() logf(2, "added new input for %v to corpus:\n%s", call.CallName, data) - - statNewInput++ - a := &NewManagerInputArgs{*flagName, RpcInput{call.CallName, inp.p.Serialize(), inp.call, []uint32(inp.cover)}} + a := &NewManagerInputArgs{*flagName, RpcInput{call.CallName, data, inp.call, []uint32(inp.cover)}} if err := manager.Call("Manager.NewInput", a, nil); err != nil { panic(err) } + + corpusMu.Lock() + defer corpusMu.Unlock() + coverMu.Lock() + defer coverMu.Unlock() + + corpusCover[call.CallID] = cover.Union(corpusCover[call.CallID], minCover) + corpus = append(corpus, inp) + corpusHashes[hash(data)] = struct{}{} } -func execute(env *ipc.Env, p *prog.Prog, stat *uint64) { - allCover := execute1(env, p, stat) +func execute(pid int, env *ipc.Env, p *prog.Prog, stat *uint64) { + allCover := execute1(pid, env, p, stat) + coverMu.RLock() + defer coverMu.RUnlock() for i, cov := range allCover { if len(cov) == 0 { continue @@ -300,17 +377,37 @@ func execute(env *ipc.Env, p *prog.Prog, stat *uint64) { diff := cover.Difference(cov, maxCover[c.CallID]) diff = cover.Difference(diff, flakes) if len(diff) != 0 { - maxCover[call.CallID] = cover.Union(maxCover[call.CallID], diff) - triage = append(triage, Input{p.Clone(), i, cover.Copy(cov)}) + coverMu.RUnlock() + coverMu.Lock() + maxCover[c.CallID] = cover.Union(maxCover[c.CallID], diff) + coverMu.Unlock() + coverMu.RLock() + + inp := Input{p.Clone(), i, cover.Copy(cov)} + triageMu.Lock() + triage = append(triage, inp) + triageMu.Unlock() } } } var logMu sync.Mutex -func execute1(env *ipc.Env, p *prog.Prog, stat *uint64) []cover.Cover { +func execute1(pid int, env *ipc.Env, p *prog.Prog, stat *uint64) []cover.Cover { + if false { + // For debugging, this function must not be executed with locks held. + corpusMu.Lock() + corpusMu.Unlock() + coverMu.Lock() + coverMu.Unlock() + triageMu.Lock() + triageMu.Unlock() + } + + idx := gate.Enter() + defer gate.Leave(idx) if *flagSaveProg { - f, err := os.Create(fmt.Sprintf("%v.prog", *flagName)) + f, err := os.Create(fmt.Sprintf("%v-%v.prog", *flagName, pid)) if err == nil { f.Write(p.Serialize()) f.Close() @@ -318,14 +415,15 @@ func execute1(env *ipc.Env, p *prog.Prog, stat *uint64) []cover.Cover { } else { // The following output helps to understand what program crashed kernel. // It must not be intermixed. + data := p.Serialize() logMu.Lock() - log.Printf("executing program:\n%s", p.Serialize()) + log.Printf("executing program %v:\n%s", pid, data) logMu.Unlock() } try := 0 retry: - *stat++ + atomic.AddUint64(stat, 1) output, strace, rawCover, failed, hanged, err := env.Exec(p) if err != nil { if try > 10 { @@ -352,3 +450,43 @@ func logf(v int, msg string, args ...interface{}) { log.Printf(msg, args...) } } + +type Gate struct { + cv *sync.Cond + busy []bool + pos int +} + +func newGate(c int) *Gate { + return &Gate{ + cv: sync.NewCond(new(sync.Mutex)), + busy: make([]bool, c), + } +} + +func (g *Gate) Enter() int { + g.cv.L.Lock() + for g.busy[g.pos] { + g.cv.Wait() + } + idx := g.pos + g.pos++ + if g.pos >= len(g.busy) { + g.pos = 0 + } + g.busy[idx] = true + g.cv.L.Unlock() + return idx +} + +func (g *Gate) Leave(idx int) { + g.cv.L.Lock() + if !g.busy[idx] { + panic("broken gate") + } + g.busy[idx] = false + if idx == g.pos { + g.cv.Broadcast() + } + g.cv.L.Unlock() +} diff --git a/manager/main.go b/manager/main.go index 8fdf8dada..92a506baa 100644 --- a/manager/main.go +++ b/manager/main.go @@ -32,7 +32,8 @@ type Config struct { Workdir string Vmlinux string Type string - Count int + Count int // number of VMs + Procs int // number of parallel processes inside of every VM Port int Nocover bool Params map[string]interface{} @@ -63,6 +64,7 @@ func main() { Params: params, EnabledSyscalls: enabledSyscalls, NoCover: cfg.Nocover, + Procs: cfg.Procs, } // Add some builtin suppressions. @@ -125,6 +127,9 @@ func parseConfig() (*Config, map[int]bool) { if cfg.Count <= 0 || cfg.Count > 1000 { fatalf("invalid config param count: %v, want (1, 1000]", cfg.Count) } + if cfg.Procs <= 0 { + cfg.Procs = 1 + } match := func(call *sys.Call, str string) bool { if str == call.CallName || str == call.Name { diff --git a/vm/qemu/qemu.go b/vm/qemu/qemu.go index be62c343e..49d19d4c9 100644 --- a/vm/qemu/qemu.go +++ b/vm/qemu/qemu.go @@ -272,8 +272,8 @@ func (inst *Instance) Run() { inst.CreateSSHCommand("echo -n 0 > /proc/sys/debug/exception-trace").Wait(10 * time.Second) // Run the binary. - cmd := inst.CreateSSHCommand(fmt.Sprintf("/syzkaller_fuzzer -name %v -executor /syzkaller_executor -manager %v:%v %v", - inst.name, hostAddr, inst.cfg.ManagerPort, inst.callsFlag)) + cmd := inst.CreateSSHCommand(fmt.Sprintf("/syzkaller_fuzzer -name %v -executor /syzkaller_executor -manager %v:%v -procs %v %v", + inst.name, hostAddr, inst.cfg.ManagerPort, inst.cfg.Procs, inst.callsFlag)) deadline := start.Add(time.Hour) lastOutput := time.Now() diff --git a/vm/vm.go b/vm/vm.go index e6430ec47..bd18bccb1 100644 --- a/vm/vm.go +++ b/vm/vm.go @@ -19,6 +19,7 @@ type Config struct { EnabledSyscalls string Suppressions []*regexp.Regexp NoCover bool + Procs int } type ctorFunc func(cfg *Config, index int) (Instance, error) -- cgit mrf-deployment