From 8d09a3b3b53116077d125020ea2db71db49bc8de Mon Sep 17 00:00:00 2001 From: Aleksandr Nogikh Date: Fri, 30 Aug 2024 15:58:57 +0200 Subject: syz-manager: move repro loop to pkg/manager This is a potentially reusable piece of functionality. --- pkg/manager/repro.go | 261 ++++++++++++++++++++++++++++++++++++++++++++++ pkg/manager/repro_test.go | 176 +++++++++++++++++++++++++++++++ 2 files changed, 437 insertions(+) create mode 100644 pkg/manager/repro.go create mode 100644 pkg/manager/repro_test.go (limited to 'pkg') diff --git a/pkg/manager/repro.go b/pkg/manager/repro.go new file mode 100644 index 000000000..9858a4629 --- /dev/null +++ b/pkg/manager/repro.go @@ -0,0 +1,261 @@ +// Copyright 2024 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package manager + +import ( + "context" + "fmt" + "maps" + "slices" + "sync" + + "github.com/google/syzkaller/pkg/log" + "github.com/google/syzkaller/pkg/report" + "github.com/google/syzkaller/pkg/repro" + "github.com/google/syzkaller/pkg/stat" +) + +type ReproResult struct { + Crash *Crash // the original crash + Repro *repro.Result + Strace *repro.StraceResult + Stats *repro.Stats + Err error +} + +type Crash struct { + InstanceIndex int + FromHub bool // this crash was created based on a repro from syz-hub + FromDashboard bool // .. or from dashboard + Manual bool + *report.Report +} + +func (c *Crash) FullTitle() string { + if c.Report.Title != "" { + return c.Report.Title + } + // Just use some unique, but stable titles. + if c.FromDashboard { + return fmt.Sprintf("dashboard crash %p", c) + } else if c.FromHub { + return fmt.Sprintf("crash from hub %p", c) + } + panic("the crash is expected to have a report") +} + +type ReproManagerView interface { + RunRepro(crash *Crash) *ReproResult + NeedRepro(crash *Crash) bool + ResizeReproPool(size int) +} + +type ReproLoop struct { + statNumReproducing *stat.Val + statPending *stat.Val + + onlyOnce bool + mgr ReproManagerView + parallel chan struct{} + pingQueue chan struct{} + reproVMs int + + mu sync.Mutex + queue []*Crash + reproducing map[string]bool + attempted map[string]bool +} + +func NewReproLoop(mgr ReproManagerView, reproVMs int, onlyOnce bool) *ReproLoop { + ret := &ReproLoop{ + mgr: mgr, + onlyOnce: onlyOnce, + parallel: make(chan struct{}, reproVMs), + reproVMs: reproVMs, + reproducing: map[string]bool{}, + pingQueue: make(chan struct{}, 1), + attempted: map[string]bool{}, + } + ret.statNumReproducing = stat.New("reproducing", "Number of crashes being reproduced", + stat.Console, stat.NoGraph, func() int { + ret.mu.Lock() + defer ret.mu.Unlock() + return len(ret.reproducing) + }) + ret.statPending = stat.New("pending", "Number of pending repro tasks", + stat.Console, stat.NoGraph, func() int { + ret.mu.Lock() + defer ret.mu.Unlock() + return len(ret.queue) + }) + return ret +} + +// startReproduction() is assumed to be called only once. +// The agument is the maximum number of VMs dedicated to the bug reproduction. +func (r *ReproLoop) StartReproduction() { + count := 0 + for ; r.calculateReproVMs(count+1) <= r.reproVMs; count++ { + r.parallel <- struct{}{} + } + log.Logf(0, "starting bug reproductions (max %d VMs, %d repros)", r.reproVMs, count) +} + +func (r *ReproLoop) calculateReproVMs(repros int) int { + // Let's allocate 1.33 VMs per a reproducer thread. + if r.reproVMs == 1 && repros == 1 { + // With one exception -- if we have only one VM, let's still do one repro. + return 1 + } + return (repros*4 + 2) / 3 +} + +func (r *ReproLoop) CanReproMore() bool { + return len(r.parallel) != 0 +} + +func (r *ReproLoop) Reproducing() map[string]bool { + r.mu.Lock() + defer r.mu.Unlock() + return maps.Clone(r.reproducing) +} + +// Empty returns true if there are neither running nor planned bug reproductions. +func (r *ReproLoop) Empty() bool { + r.mu.Lock() + defer r.mu.Unlock() + return len(r.reproducing) == 0 && len(r.queue) == 0 +} + +func (r *ReproLoop) Enqueue(crash *Crash) { + r.mu.Lock() + defer r.mu.Unlock() + + title := crash.FullTitle() + if r.onlyOnce && r.attempted[title] { + // Try to reproduce each bug at most 1 time in this mode. + // Since we don't upload bugs/repros to dashboard, it likely won't have + // the reproducer even if we succeeded last time, and will repeatedly + // say it needs a repro. + return + } + log.Logf(1, "scheduled a reproduction of '%v'", title) + r.attempted[title] = true + r.queue = append(r.queue, crash) + + // Ping the loop. + select { + case r.pingQueue <- struct{}{}: + default: + } +} + +func (r *ReproLoop) popCrash() *Crash { + r.mu.Lock() + defer r.mu.Unlock() + + newBetter := func(base, new *Crash) bool { + // First, serve manual requests. + if new.Manual != base.Manual { + return new.Manual + } + // Then, deprioritize hub reproducers. + if new.FromHub != base.FromHub { + return !new.FromHub + } + return false + } + + idx := -1 + for i, crash := range r.queue { + if r.reproducing[crash.FullTitle()] { + continue + } + if idx == -1 || newBetter(r.queue[idx], r.queue[i]) { + idx = i + } + } + if idx == -1 { + return nil + } + crash := r.queue[idx] + r.queue = slices.Delete(r.queue, idx, idx+1) + return crash +} + +func (r *ReproLoop) Loop(ctx context.Context) { + var wg sync.WaitGroup + defer wg.Wait() + + for { + crash := r.popCrash() + for crash == nil { + select { + case <-r.pingQueue: + crash = r.popCrash() + case <-ctx.Done(): + return + } + if crash == nil || !r.mgr.NeedRepro(crash) { + continue + } + } + + // Now wait until we can schedule another runner. + select { + case <-r.parallel: + case <-ctx.Done(): + return + } + + r.mu.Lock() + r.reproducing[crash.FullTitle()] = true + r.adjustPoolSizeLocked() + r.mu.Unlock() + + wg.Add(1) + go func() { + defer wg.Done() + + r.handle(crash) + + r.mu.Lock() + delete(r.reproducing, crash.FullTitle()) + r.adjustPoolSizeLocked() + r.mu.Unlock() + + r.parallel <- struct{}{} + r.pingQueue <- struct{}{} + }() + } +} + +func (r *ReproLoop) handle(crash *Crash) { + log.Logf(0, "start reproducing '%v'", crash.FullTitle()) + + res := r.mgr.RunRepro(crash) + + crepro := false + title := "" + if res.Repro != nil { + crepro = res.Repro.CRepro + title = res.Repro.Report.Title + } + log.Logf(0, "repro finished '%v', repro=%v crepro=%v desc='%v' hub=%v from_dashboard=%v", + crash.FullTitle(), res.Repro != nil, crepro, title, crash.FromHub, crash.FromDashboard, + ) +} + +func (r *ReproLoop) adjustPoolSizeLocked() { + // Avoid the +-1 jitter by considering the repro queue size as well. + // We process same-titled crashes sequentially, so only count unique ones. + uniqueTitles := maps.Clone(r.reproducing) + for _, crash := range r.queue { + uniqueTitles[crash.FullTitle()] = true + } + + needRepros := len(uniqueTitles) + VMs := min(r.reproVMs, r.calculateReproVMs(needRepros)) + r.mgr.ResizeReproPool(VMs) +} diff --git a/pkg/manager/repro_test.go b/pkg/manager/repro_test.go new file mode 100644 index 000000000..80d224fd5 --- /dev/null +++ b/pkg/manager/repro_test.go @@ -0,0 +1,176 @@ +// Copyright 2024 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package manager + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/google/syzkaller/pkg/report" + "github.com/stretchr/testify/assert" +) + +func TestReproManager(t *testing.T) { + mock := &reproMgrMock{ + run: make(chan runCallback), + } + obj := NewReproLoop(mock, 3, false) + + ctx, done := context.WithCancel(context.Background()) + complete := make(chan struct{}) + go func() { + obj.Loop(ctx) + close(complete) + }() + + defer func() { + done() + <-complete + }() + + // No reproductions until we've signaled to start. + assert.False(t, obj.CanReproMore()) + obj.StartReproduction() + + // No reproducers -- we can definitely take more. + assert.True(t, obj.CanReproMore()) + obj.Enqueue(&Crash{Report: &report.Report{Title: "A"}}) + called := <-mock.run + assert.Equal(t, "A", called.crash.Title) + + // One reproducer is running -- we can take one more. + assert.True(t, obj.CanReproMore()) + assert.EqualValues(t, 2, mock.reserved.Load()) + obj.Enqueue(&Crash{Report: &report.Report{Title: "B"}}) + called2 := <-mock.run + assert.Equal(t, "B", called2.crash.Title) + + assert.False(t, obj.CanReproMore()) + assert.Len(t, obj.Reproducing(), 2) + assert.EqualValues(t, 3, mock.reserved.Load()) + + // Pretend that reproducers have finished. + called.ret <- &ReproResult{Crash: &Crash{FromHub: true}} + called2.ret <- &ReproResult{Crash: &Crash{FromHub: true}} + + mock.onVMShutdown(t, obj) +} + +func TestReproOrder(t *testing.T) { + mock := &reproMgrMock{ + run: make(chan runCallback), + } + obj := NewReproLoop(mock, 3, false) + + // The right order is A B C. + crashes := []*Crash{ + { + Report: &report.Report{Title: "A"}, + FromDashboard: true, + Manual: true, + }, + { + Report: &report.Report{Title: "B"}, + FromDashboard: true, + }, + { + Report: &report.Report{Title: "C"}, + FromHub: true, + }, + } + + obj.Enqueue(crashes[2]) + obj.Enqueue(crashes[1]) + obj.Enqueue(crashes[0]) + assert.Equal(t, crashes[0], obj.popCrash()) + assert.Equal(t, crashes[1], obj.popCrash()) + assert.Equal(t, crashes[2], obj.popCrash()) + + obj.Enqueue(crashes[1]) + obj.Enqueue(crashes[0]) + obj.Enqueue(crashes[2]) + assert.Equal(t, crashes[0], obj.popCrash()) + assert.Equal(t, crashes[1], obj.popCrash()) + assert.Equal(t, crashes[2], obj.popCrash()) +} + +func TestReproRWRace(t *testing.T) { + mock := &reproMgrMock{ + run: make(chan runCallback), + } + obj := NewReproLoop(mock, 3, false) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + obj.Loop(ctx) // calls runRepro() + }() + + assert.False(t, obj.CanReproMore()) + obj.StartReproduction() + assert.True(t, obj.CanReproMore()) + + obj.Enqueue(&Crash{Report: &report.Report{Title: "A"}}) + obj.Enqueue(&Crash{Report: &report.Report{Title: "A"}}) + + assert.True(t, mock.NeedRepro(nil)) + called := <-mock.run + called.ret <- &ReproResult{} + // Pretend that processRepro() is finished and + // we've written "repro.prog" to the disk. + mock.reproProgExist.Store(true) + assert.False(t, mock.NeedRepro(nil)) + assert.True(t, obj.CanReproMore()) + + called2 := <-mock.run + called2.ret <- &ReproResult{} + assert.False(t, mock.NeedRepro(nil)) + assert.True(t, obj.CanReproMore()) + + // Reproducers may be still running. + mock.onVMShutdown(t, obj) +} + +type reproMgrMock struct { + reserved atomic.Int64 + run chan runCallback + reproProgExist atomic.Bool +} + +type runCallback struct { + crash *Crash + ret chan *ReproResult +} + +// Wait until the number of reserved VMs goes to 0. +func (m *reproMgrMock) onVMShutdown(t *testing.T, reproLoop *ReproLoop) { + for i := 0; i < 100; i++ { + if m.reserved.Load() == 0 { + assert.True(t, reproLoop.CanReproMore()) + assert.True(t, reproLoop.Empty()) + return + } + time.Sleep(10 * time.Millisecond) + } + t.Fatal("reserved VMs must have dropped to 0") +} + +func (m *reproMgrMock) RunRepro(crash *Crash) *ReproResult { + retCh := make(chan *ReproResult) + m.run <- runCallback{crash: crash, ret: retCh} + ret := <-retCh + close(retCh) + return ret +} + +func (m *reproMgrMock) NeedRepro(crash *Crash) bool { + return !m.reproProgExist.Load() +} + +func (m *reproMgrMock) ResizeReproPool(VMs int) { + m.reserved.Store(int64(VMs)) +} -- cgit mrf-deployment