aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDmitry Vyukov <dvyukov@google.com>2015-11-26 15:56:18 +0100
committerDmitry Vyukov <dvyukov@google.com>2015-11-26 15:56:18 +0100
commitb1e98de8a6c6e31544193f74130911d8b45bbd0a (patch)
tree9217e3341cc56ccb5e7d59b28e20397a1a7a6fff
parente1094916aceff77d16aea60891e613715c72818e (diff)
fuzzer: parallel execution mode
Config now have procs parameter that controls number of parallel executor's running in each VM.
-rw-r--r--fuzzer/fuzzer.go290
-rw-r--r--manager/main.go7
-rw-r--r--vm/qemu/qemu.go4
-rw-r--r--vm/vm.go1
4 files changed, 223 insertions, 79 deletions
diff --git a/fuzzer/fuzzer.go b/fuzzer/fuzzer.go
index 252b61c31..cb0237cd5 100644
--- a/fuzzer/fuzzer.go
+++ b/fuzzer/fuzzer.go
@@ -19,6 +19,7 @@ import (
"strconv"
"strings"
"sync"
+ "sync/atomic"
"time"
"github.com/google/syzkaller/cover"
@@ -37,8 +38,8 @@ var (
flagSyscalls = flag.String("calls", "", "comma-delimited list of enabled syscall IDs (empty string for all syscalls)")
flagNoCover = flag.Bool("nocover", false, "disable coverage collection/handling")
flagDropPrivs = flag.Bool("dropprivs", true, "impersonate into nobody")
-
- flagV = flag.Int("v", 0, "verbosity")
+ flagProcs = flag.Int("procs", 1, "number of parallel test processes")
+ flagV = flag.Int("v", 0, "verbosity")
)
const (
@@ -58,14 +59,22 @@ type Input struct {
}
var (
- corpusCover []cover.Cover
- maxCover []cover.Cover
- flakes cover.Cover
+ manager *rpc.Client
+
+ coverMu sync.RWMutex
+ corpusCover []cover.Cover
+ maxCover []cover.Cover
+ flakes cover.Cover
+
+ corpusMu sync.RWMutex
corpus []Input
corpusHashes map[Sig]struct{}
- triage []Input
- manager *rpc.Client
- ct *prog.ChoiceTable
+
+ triageMu sync.RWMutex
+ triage []Input
+ candidates []*prog.Prog
+
+ gate *Gate
statExecGen uint64
statExecFuzz uint64
@@ -105,7 +114,7 @@ func main() {
if err := manager.Call("Manager.Connect", a, r); err != nil {
panic(err)
}
- ct = prog.BuildChoiceTable(r.Prios, calls)
+ ct := prog.BuildChoiceTable(r.Prios, calls)
flags := ipc.FlagThreaded | ipc.FlagCollide
if *flagStrace {
@@ -117,49 +126,102 @@ func main() {
if *flagDropPrivs {
flags |= ipc.FlagDropPrivs
}
- env, err := ipc.MakeEnv(*flagExecutor, 10*time.Second, flags)
- if err != nil {
- panic(err)
+ if *flagProcs <= 0 {
+ *flagProcs = 1
+ }
+
+ gate = newGate(2 * *flagProcs)
+ envs := make([]*ipc.Env, *flagProcs)
+ for pid := 0; pid < *flagProcs; pid++ {
+ env, err := ipc.MakeEnv(*flagExecutor, 10*time.Second, flags)
+ if err != nil {
+ panic(err)
+ }
+ envs[pid] = env
+
+ pid := pid
+ go func() {
+ rs := rand.NewSource(time.Now().UnixNano())
+ rnd := rand.New(rs)
+
+ for i := 0; ; i++ {
+ triageMu.RLock()
+ if len(triage) != 0 || len(candidates) != 0 {
+ triageMu.RUnlock()
+ triageMu.Lock()
+ if len(triage) != 0 {
+ last := len(triage) - 1
+ inp := triage[last]
+ triage = triage[:last]
+ triageMu.Unlock()
+ logf(1, "triaging : %s", inp.p)
+ triageInput(pid, env, inp)
+ continue
+ } else if len(candidates) != 0 {
+ last := len(candidates) - 1
+ p := candidates[last]
+ candidates = candidates[:last]
+ triageMu.Unlock()
+ execute(pid, env, p, &statExecCandidate)
+ continue
+ } else {
+ triageMu.Unlock()
+ }
+ } else {
+ triageMu.RUnlock()
+ }
+
+ corpusMu.RLock()
+ if len(corpus) == 0 || i%10 == 0 {
+ corpusMu.RUnlock()
+ p := prog.Generate(rnd, programLength, ct)
+ logf(1, "#%v: generated: %s", i, p)
+ execute(pid, env, p, &statExecGen)
+ p.Mutate(rnd, programLength, ct)
+ logf(1, "#%v: mutated: %s", i, p)
+ execute(pid, env, p, &statExecFuzz)
+ } else {
+ inp := corpus[rnd.Intn(len(corpus))]
+ corpusMu.RUnlock()
+ p := inp.p.Clone()
+ p.Mutate(rs, programLength, ct)
+ logf(1, "#%v: mutated: %s <- %s", i, p, inp.p)
+ execute(pid, env, p, &statExecFuzz)
+ }
+ }
+ }()
}
- rs := rand.NewSource(time.Now().UnixNano())
- rnd := rand.New(rs)
+
var lastPoll time.Time
var lastPrint time.Time
- for i := 0; ; i++ {
+ for range time.NewTicker(3 * time.Second).C {
if !*flagSaveProg && time.Since(lastPrint) > 10*time.Second {
// Keep-alive for manager.
- logf(0, "#%v: alive", i)
+ logf(0, "alive")
lastPrint = time.Now()
}
- if len(triage) != 0 {
- last := len(triage) - 1
- inp := triage[last]
- triage = triage[:last]
- logf(1, "#%v: triaging : %s", i, inp.p)
- triageInput(env, inp)
- continue
- }
if time.Since(lastPoll) > 10*time.Second {
+ triageMu.RLock()
+ if len(candidates) != 0 {
+ triageMu.RUnlock()
+ continue
+ }
+ triageMu.RUnlock()
+
a := &ManagerPollArgs{
Name: *flagName,
Stats: make(map[string]uint64),
}
- a.Stats["exec total"] = env.StatExecs
- env.StatExecs = 0
- a.Stats["executor restarts"] = env.StatRestarts
- env.StatRestarts = 0
- a.Stats["exec gen"] = statExecGen
- statExecGen = 0
- a.Stats["exec fuzz"] = statExecFuzz
- statExecFuzz = 0
- a.Stats["exec candidate"] = statExecCandidate
- statExecCandidate = 0
- a.Stats["exec triage"] = statExecTriage
- statExecTriage = 0
- a.Stats["exec minimize"] = statExecMinimize
- statExecMinimize = 0
- a.Stats["fuzzer new inputs"] = statNewInput
- statNewInput = 0
+ for _, env := range envs {
+ a.Stats["exec total"] += atomic.SwapUint64(&env.StatExecs, 0)
+ a.Stats["executor restarts"] += atomic.SwapUint64(&env.StatRestarts, 0)
+ }
+ a.Stats["exec gen"] = atomic.SwapUint64(&statExecGen, 0)
+ a.Stats["exec fuzz"] = atomic.SwapUint64(&statExecFuzz, 0)
+ a.Stats["exec candidate"] = atomic.SwapUint64(&statExecCandidate, 0)
+ a.Stats["exec triage"] = atomic.SwapUint64(&statExecTriage, 0)
+ a.Stats["exec minimize"] = atomic.SwapUint64(&statExecMinimize, 0)
+ a.Stats["fuzzer new inputs"] = atomic.SwapUint64(&statNewInput, 0)
r := &ManagerPollRes{}
if err := manager.Call("Manager.Poll", a, r); err != nil {
panic(err)
@@ -174,34 +236,28 @@ func main() {
}
if *flagNoCover {
inp := Input{p, 0, nil}
+ corpusMu.Lock()
corpus = append(corpus, inp)
+ corpusMu.Unlock()
} else {
- execute(env, p, &statExecCandidate)
+ triageMu.Lock()
+ candidates = append(candidates, p)
+ triageMu.Unlock()
}
}
if len(r.NewInputs) == 0 && len(r.Candidates) == 0 {
lastPoll = time.Now()
}
- continue
- }
- if len(corpus) == 0 || i%10 == 0 {
- p := prog.Generate(rnd, programLength, ct)
- logf(1, "#%v: generated: %s", i, p)
- execute(env, p, &statExecGen)
- p.Mutate(rnd, programLength, ct)
- logf(1, "#%v: mutated: %s", i, p)
- execute(env, p, &statExecFuzz)
- } else {
- inp := corpus[rnd.Intn(len(corpus))]
- p := inp.p.Clone()
- p.Mutate(rs, programLength, ct)
- logf(1, "#%v: mutated: %s <- %s", i, p, inp.p)
- execute(env, p, &statExecFuzz)
}
}
}
func addInput(inp RpcInput) {
+ corpusMu.Lock()
+ defer corpusMu.Unlock()
+ coverMu.Lock()
+ defer coverMu.Unlock()
+
if *flagNoCover {
panic("should not be called when coverage is disabled")
}
@@ -230,41 +286,55 @@ func addInput(inp RpcInput) {
corpusHashes[hash(inp.Prog)] = struct{}{}
}
-func triageInput(env *ipc.Env, inp Input) {
+func triageInput(pid int, env *ipc.Env, inp Input) {
if *flagNoCover {
panic("should not be called when coverage is disabled")
}
+
call := inp.p.Calls[inp.call].Meta
+ coverMu.RLock()
newCover := cover.Difference(inp.cover, corpusCover[call.CallID])
newCover = cover.Difference(newCover, flakes)
+ coverMu.RUnlock()
if len(newCover) == 0 {
return
}
+ corpusMu.RLock()
if _, ok := corpusHashes[hash(inp.p.Serialize())]; ok {
+ corpusMu.RUnlock()
return
}
+ corpusMu.RUnlock()
minCover := inp.cover
for i := 0; i < 3; i++ {
- allCover := execute1(env, inp.p, &statExecTriage)
+ allCover := execute1(pid, env, inp.p, &statExecTriage)
if len(allCover[inp.call]) == 0 {
// The call was not executed. Happens sometimes, reason unknown.
continue
}
+ coverMu.RLock()
cov := allCover[inp.call]
diff := cover.SymmetricDifference(inp.cover, cov)
- if len(diff) != 0 {
+ minCover = cover.Intersection(minCover, cov)
+ updateFlakes := len(diff) != 0 && len(cover.Difference(diff, flakes)) != 0
+ coverMu.RUnlock()
+ if updateFlakes {
+ coverMu.Lock()
flakes = cover.Union(flakes, diff)
+ coverMu.Unlock()
}
- minCover = cover.Intersection(minCover, cov)
}
stableNewCover := cover.Intersection(newCover, minCover)
if len(stableNewCover) == 0 {
return
}
inp.p, inp.call = prog.Minimize(inp.p, inp.call, func(p1 *prog.Prog, call1 int) bool {
- allCover := execute1(env, p1, &statExecMinimize)
+ allCover := execute1(pid, env, p1, &statExecMinimize)
+ coverMu.RLock()
+ defer coverMu.RUnlock()
+
if len(allCover[call1]) == 0 {
return false // The call was not executed.
}
@@ -276,22 +346,29 @@ func triageInput(env *ipc.Env, inp Input) {
return true
})
inp.cover = minCover
- corpusCover[call.CallID] = cover.Union(corpusCover[call.CallID], minCover)
- corpus = append(corpus, inp)
- data := inp.p.Serialize()
- corpusHashes[hash(data)] = struct{}{}
+ atomic.AddUint64(&statNewInput, 1)
+ data := inp.p.Serialize()
logf(2, "added new input for %v to corpus:\n%s", call.CallName, data)
-
- statNewInput++
- a := &NewManagerInputArgs{*flagName, RpcInput{call.CallName, inp.p.Serialize(), inp.call, []uint32(inp.cover)}}
+ a := &NewManagerInputArgs{*flagName, RpcInput{call.CallName, data, inp.call, []uint32(inp.cover)}}
if err := manager.Call("Manager.NewInput", a, nil); err != nil {
panic(err)
}
+
+ corpusMu.Lock()
+ defer corpusMu.Unlock()
+ coverMu.Lock()
+ defer coverMu.Unlock()
+
+ corpusCover[call.CallID] = cover.Union(corpusCover[call.CallID], minCover)
+ corpus = append(corpus, inp)
+ corpusHashes[hash(data)] = struct{}{}
}
-func execute(env *ipc.Env, p *prog.Prog, stat *uint64) {
- allCover := execute1(env, p, stat)
+func execute(pid int, env *ipc.Env, p *prog.Prog, stat *uint64) {
+ allCover := execute1(pid, env, p, stat)
+ coverMu.RLock()
+ defer coverMu.RUnlock()
for i, cov := range allCover {
if len(cov) == 0 {
continue
@@ -300,17 +377,37 @@ func execute(env *ipc.Env, p *prog.Prog, stat *uint64) {
diff := cover.Difference(cov, maxCover[c.CallID])
diff = cover.Difference(diff, flakes)
if len(diff) != 0 {
- maxCover[call.CallID] = cover.Union(maxCover[call.CallID], diff)
- triage = append(triage, Input{p.Clone(), i, cover.Copy(cov)})
+ coverMu.RUnlock()
+ coverMu.Lock()
+ maxCover[c.CallID] = cover.Union(maxCover[c.CallID], diff)
+ coverMu.Unlock()
+ coverMu.RLock()
+
+ inp := Input{p.Clone(), i, cover.Copy(cov)}
+ triageMu.Lock()
+ triage = append(triage, inp)
+ triageMu.Unlock()
}
}
}
var logMu sync.Mutex
-func execute1(env *ipc.Env, p *prog.Prog, stat *uint64) []cover.Cover {
+func execute1(pid int, env *ipc.Env, p *prog.Prog, stat *uint64) []cover.Cover {
+ if false {
+ // For debugging, this function must not be executed with locks held.
+ corpusMu.Lock()
+ corpusMu.Unlock()
+ coverMu.Lock()
+ coverMu.Unlock()
+ triageMu.Lock()
+ triageMu.Unlock()
+ }
+
+ idx := gate.Enter()
+ defer gate.Leave(idx)
if *flagSaveProg {
- f, err := os.Create(fmt.Sprintf("%v.prog", *flagName))
+ f, err := os.Create(fmt.Sprintf("%v-%v.prog", *flagName, pid))
if err == nil {
f.Write(p.Serialize())
f.Close()
@@ -318,14 +415,15 @@ func execute1(env *ipc.Env, p *prog.Prog, stat *uint64) []cover.Cover {
} else {
// The following output helps to understand what program crashed kernel.
// It must not be intermixed.
+ data := p.Serialize()
logMu.Lock()
- log.Printf("executing program:\n%s", p.Serialize())
+ log.Printf("executing program %v:\n%s", pid, data)
logMu.Unlock()
}
try := 0
retry:
- *stat++
+ atomic.AddUint64(stat, 1)
output, strace, rawCover, failed, hanged, err := env.Exec(p)
if err != nil {
if try > 10 {
@@ -352,3 +450,43 @@ func logf(v int, msg string, args ...interface{}) {
log.Printf(msg, args...)
}
}
+
+type Gate struct {
+ cv *sync.Cond
+ busy []bool
+ pos int
+}
+
+func newGate(c int) *Gate {
+ return &Gate{
+ cv: sync.NewCond(new(sync.Mutex)),
+ busy: make([]bool, c),
+ }
+}
+
+func (g *Gate) Enter() int {
+ g.cv.L.Lock()
+ for g.busy[g.pos] {
+ g.cv.Wait()
+ }
+ idx := g.pos
+ g.pos++
+ if g.pos >= len(g.busy) {
+ g.pos = 0
+ }
+ g.busy[idx] = true
+ g.cv.L.Unlock()
+ return idx
+}
+
+func (g *Gate) Leave(idx int) {
+ g.cv.L.Lock()
+ if !g.busy[idx] {
+ panic("broken gate")
+ }
+ g.busy[idx] = false
+ if idx == g.pos {
+ g.cv.Broadcast()
+ }
+ g.cv.L.Unlock()
+}
diff --git a/manager/main.go b/manager/main.go
index 8fdf8dada..92a506baa 100644
--- a/manager/main.go
+++ b/manager/main.go
@@ -32,7 +32,8 @@ type Config struct {
Workdir string
Vmlinux string
Type string
- Count int
+ Count int // number of VMs
+ Procs int // number of parallel processes inside of every VM
Port int
Nocover bool
Params map[string]interface{}
@@ -63,6 +64,7 @@ func main() {
Params: params,
EnabledSyscalls: enabledSyscalls,
NoCover: cfg.Nocover,
+ Procs: cfg.Procs,
}
// Add some builtin suppressions.
@@ -125,6 +127,9 @@ func parseConfig() (*Config, map[int]bool) {
if cfg.Count <= 0 || cfg.Count > 1000 {
fatalf("invalid config param count: %v, want (1, 1000]", cfg.Count)
}
+ if cfg.Procs <= 0 {
+ cfg.Procs = 1
+ }
match := func(call *sys.Call, str string) bool {
if str == call.CallName || str == call.Name {
diff --git a/vm/qemu/qemu.go b/vm/qemu/qemu.go
index be62c343e..49d19d4c9 100644
--- a/vm/qemu/qemu.go
+++ b/vm/qemu/qemu.go
@@ -272,8 +272,8 @@ func (inst *Instance) Run() {
inst.CreateSSHCommand("echo -n 0 > /proc/sys/debug/exception-trace").Wait(10 * time.Second)
// Run the binary.
- cmd := inst.CreateSSHCommand(fmt.Sprintf("/syzkaller_fuzzer -name %v -executor /syzkaller_executor -manager %v:%v %v",
- inst.name, hostAddr, inst.cfg.ManagerPort, inst.callsFlag))
+ cmd := inst.CreateSSHCommand(fmt.Sprintf("/syzkaller_fuzzer -name %v -executor /syzkaller_executor -manager %v:%v -procs %v %v",
+ inst.name, hostAddr, inst.cfg.ManagerPort, inst.cfg.Procs, inst.callsFlag))
deadline := start.Add(time.Hour)
lastOutput := time.Now()
diff --git a/vm/vm.go b/vm/vm.go
index e6430ec47..bd18bccb1 100644
--- a/vm/vm.go
+++ b/vm/vm.go
@@ -19,6 +19,7 @@ type Config struct {
EnabledSyscalls string
Suppressions []*regexp.Regexp
NoCover bool
+ Procs int
}
type ctorFunc func(cfg *Config, index int) (Instance, error)