aboutsummaryrefslogtreecommitdiffstats
path: root/pkg
diff options
context:
space:
mode:
authorAleksandr Nogikh <nogikh@google.com>2024-08-30 15:58:57 +0200
committerAleksandr Nogikh <nogikh@google.com>2024-09-02 18:39:02 +0000
commit8d09a3b3b53116077d125020ea2db71db49bc8de (patch)
tree06e802a01b7f0efda0808bca19486135df4a9948 /pkg
parent1eda0d1459e5ff07903ffa2f8cedf55ae7b24af0 (diff)
syz-manager: move repro loop to pkg/manager
This is a potentially reusable piece of functionality.
Diffstat (limited to 'pkg')
-rw-r--r--pkg/manager/repro.go261
-rw-r--r--pkg/manager/repro_test.go176
2 files changed, 437 insertions, 0 deletions
diff --git a/pkg/manager/repro.go b/pkg/manager/repro.go
new file mode 100644
index 000000000..9858a4629
--- /dev/null
+++ b/pkg/manager/repro.go
@@ -0,0 +1,261 @@
+// Copyright 2024 syzkaller project authors. All rights reserved.
+// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
+
+package manager
+
+import (
+ "context"
+ "fmt"
+ "maps"
+ "slices"
+ "sync"
+
+ "github.com/google/syzkaller/pkg/log"
+ "github.com/google/syzkaller/pkg/report"
+ "github.com/google/syzkaller/pkg/repro"
+ "github.com/google/syzkaller/pkg/stat"
+)
+
+type ReproResult struct {
+ Crash *Crash // the original crash
+ Repro *repro.Result
+ Strace *repro.StraceResult
+ Stats *repro.Stats
+ Err error
+}
+
+type Crash struct {
+ InstanceIndex int
+ FromHub bool // this crash was created based on a repro from syz-hub
+ FromDashboard bool // .. or from dashboard
+ Manual bool
+ *report.Report
+}
+
+func (c *Crash) FullTitle() string {
+ if c.Report.Title != "" {
+ return c.Report.Title
+ }
+ // Just use some unique, but stable titles.
+ if c.FromDashboard {
+ return fmt.Sprintf("dashboard crash %p", c)
+ } else if c.FromHub {
+ return fmt.Sprintf("crash from hub %p", c)
+ }
+ panic("the crash is expected to have a report")
+}
+
+type ReproManagerView interface {
+ RunRepro(crash *Crash) *ReproResult
+ NeedRepro(crash *Crash) bool
+ ResizeReproPool(size int)
+}
+
+type ReproLoop struct {
+ statNumReproducing *stat.Val
+ statPending *stat.Val
+
+ onlyOnce bool
+ mgr ReproManagerView
+ parallel chan struct{}
+ pingQueue chan struct{}
+ reproVMs int
+
+ mu sync.Mutex
+ queue []*Crash
+ reproducing map[string]bool
+ attempted map[string]bool
+}
+
+func NewReproLoop(mgr ReproManagerView, reproVMs int, onlyOnce bool) *ReproLoop {
+ ret := &ReproLoop{
+ mgr: mgr,
+ onlyOnce: onlyOnce,
+ parallel: make(chan struct{}, reproVMs),
+ reproVMs: reproVMs,
+ reproducing: map[string]bool{},
+ pingQueue: make(chan struct{}, 1),
+ attempted: map[string]bool{},
+ }
+ ret.statNumReproducing = stat.New("reproducing", "Number of crashes being reproduced",
+ stat.Console, stat.NoGraph, func() int {
+ ret.mu.Lock()
+ defer ret.mu.Unlock()
+ return len(ret.reproducing)
+ })
+ ret.statPending = stat.New("pending", "Number of pending repro tasks",
+ stat.Console, stat.NoGraph, func() int {
+ ret.mu.Lock()
+ defer ret.mu.Unlock()
+ return len(ret.queue)
+ })
+ return ret
+}
+
+// startReproduction() is assumed to be called only once.
+// The agument is the maximum number of VMs dedicated to the bug reproduction.
+func (r *ReproLoop) StartReproduction() {
+ count := 0
+ for ; r.calculateReproVMs(count+1) <= r.reproVMs; count++ {
+ r.parallel <- struct{}{}
+ }
+ log.Logf(0, "starting bug reproductions (max %d VMs, %d repros)", r.reproVMs, count)
+}
+
+func (r *ReproLoop) calculateReproVMs(repros int) int {
+ // Let's allocate 1.33 VMs per a reproducer thread.
+ if r.reproVMs == 1 && repros == 1 {
+ // With one exception -- if we have only one VM, let's still do one repro.
+ return 1
+ }
+ return (repros*4 + 2) / 3
+}
+
+func (r *ReproLoop) CanReproMore() bool {
+ return len(r.parallel) != 0
+}
+
+func (r *ReproLoop) Reproducing() map[string]bool {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ return maps.Clone(r.reproducing)
+}
+
+// Empty returns true if there are neither running nor planned bug reproductions.
+func (r *ReproLoop) Empty() bool {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ return len(r.reproducing) == 0 && len(r.queue) == 0
+}
+
+func (r *ReproLoop) Enqueue(crash *Crash) {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+
+ title := crash.FullTitle()
+ if r.onlyOnce && r.attempted[title] {
+ // Try to reproduce each bug at most 1 time in this mode.
+ // Since we don't upload bugs/repros to dashboard, it likely won't have
+ // the reproducer even if we succeeded last time, and will repeatedly
+ // say it needs a repro.
+ return
+ }
+ log.Logf(1, "scheduled a reproduction of '%v'", title)
+ r.attempted[title] = true
+ r.queue = append(r.queue, crash)
+
+ // Ping the loop.
+ select {
+ case r.pingQueue <- struct{}{}:
+ default:
+ }
+}
+
+func (r *ReproLoop) popCrash() *Crash {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+
+ newBetter := func(base, new *Crash) bool {
+ // First, serve manual requests.
+ if new.Manual != base.Manual {
+ return new.Manual
+ }
+ // Then, deprioritize hub reproducers.
+ if new.FromHub != base.FromHub {
+ return !new.FromHub
+ }
+ return false
+ }
+
+ idx := -1
+ for i, crash := range r.queue {
+ if r.reproducing[crash.FullTitle()] {
+ continue
+ }
+ if idx == -1 || newBetter(r.queue[idx], r.queue[i]) {
+ idx = i
+ }
+ }
+ if idx == -1 {
+ return nil
+ }
+ crash := r.queue[idx]
+ r.queue = slices.Delete(r.queue, idx, idx+1)
+ return crash
+}
+
+func (r *ReproLoop) Loop(ctx context.Context) {
+ var wg sync.WaitGroup
+ defer wg.Wait()
+
+ for {
+ crash := r.popCrash()
+ for crash == nil {
+ select {
+ case <-r.pingQueue:
+ crash = r.popCrash()
+ case <-ctx.Done():
+ return
+ }
+ if crash == nil || !r.mgr.NeedRepro(crash) {
+ continue
+ }
+ }
+
+ // Now wait until we can schedule another runner.
+ select {
+ case <-r.parallel:
+ case <-ctx.Done():
+ return
+ }
+
+ r.mu.Lock()
+ r.reproducing[crash.FullTitle()] = true
+ r.adjustPoolSizeLocked()
+ r.mu.Unlock()
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+
+ r.handle(crash)
+
+ r.mu.Lock()
+ delete(r.reproducing, crash.FullTitle())
+ r.adjustPoolSizeLocked()
+ r.mu.Unlock()
+
+ r.parallel <- struct{}{}
+ r.pingQueue <- struct{}{}
+ }()
+ }
+}
+
+func (r *ReproLoop) handle(crash *Crash) {
+ log.Logf(0, "start reproducing '%v'", crash.FullTitle())
+
+ res := r.mgr.RunRepro(crash)
+
+ crepro := false
+ title := ""
+ if res.Repro != nil {
+ crepro = res.Repro.CRepro
+ title = res.Repro.Report.Title
+ }
+ log.Logf(0, "repro finished '%v', repro=%v crepro=%v desc='%v' hub=%v from_dashboard=%v",
+ crash.FullTitle(), res.Repro != nil, crepro, title, crash.FromHub, crash.FromDashboard,
+ )
+}
+
+func (r *ReproLoop) adjustPoolSizeLocked() {
+ // Avoid the +-1 jitter by considering the repro queue size as well.
+ // We process same-titled crashes sequentially, so only count unique ones.
+ uniqueTitles := maps.Clone(r.reproducing)
+ for _, crash := range r.queue {
+ uniqueTitles[crash.FullTitle()] = true
+ }
+
+ needRepros := len(uniqueTitles)
+ VMs := min(r.reproVMs, r.calculateReproVMs(needRepros))
+ r.mgr.ResizeReproPool(VMs)
+}
diff --git a/pkg/manager/repro_test.go b/pkg/manager/repro_test.go
new file mode 100644
index 000000000..80d224fd5
--- /dev/null
+++ b/pkg/manager/repro_test.go
@@ -0,0 +1,176 @@
+// Copyright 2024 syzkaller project authors. All rights reserved.
+// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
+
+package manager
+
+import (
+ "context"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/google/syzkaller/pkg/report"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestReproManager(t *testing.T) {
+ mock := &reproMgrMock{
+ run: make(chan runCallback),
+ }
+ obj := NewReproLoop(mock, 3, false)
+
+ ctx, done := context.WithCancel(context.Background())
+ complete := make(chan struct{})
+ go func() {
+ obj.Loop(ctx)
+ close(complete)
+ }()
+
+ defer func() {
+ done()
+ <-complete
+ }()
+
+ // No reproductions until we've signaled to start.
+ assert.False(t, obj.CanReproMore())
+ obj.StartReproduction()
+
+ // No reproducers -- we can definitely take more.
+ assert.True(t, obj.CanReproMore())
+ obj.Enqueue(&Crash{Report: &report.Report{Title: "A"}})
+ called := <-mock.run
+ assert.Equal(t, "A", called.crash.Title)
+
+ // One reproducer is running -- we can take one more.
+ assert.True(t, obj.CanReproMore())
+ assert.EqualValues(t, 2, mock.reserved.Load())
+ obj.Enqueue(&Crash{Report: &report.Report{Title: "B"}})
+ called2 := <-mock.run
+ assert.Equal(t, "B", called2.crash.Title)
+
+ assert.False(t, obj.CanReproMore())
+ assert.Len(t, obj.Reproducing(), 2)
+ assert.EqualValues(t, 3, mock.reserved.Load())
+
+ // Pretend that reproducers have finished.
+ called.ret <- &ReproResult{Crash: &Crash{FromHub: true}}
+ called2.ret <- &ReproResult{Crash: &Crash{FromHub: true}}
+
+ mock.onVMShutdown(t, obj)
+}
+
+func TestReproOrder(t *testing.T) {
+ mock := &reproMgrMock{
+ run: make(chan runCallback),
+ }
+ obj := NewReproLoop(mock, 3, false)
+
+ // The right order is A B C.
+ crashes := []*Crash{
+ {
+ Report: &report.Report{Title: "A"},
+ FromDashboard: true,
+ Manual: true,
+ },
+ {
+ Report: &report.Report{Title: "B"},
+ FromDashboard: true,
+ },
+ {
+ Report: &report.Report{Title: "C"},
+ FromHub: true,
+ },
+ }
+
+ obj.Enqueue(crashes[2])
+ obj.Enqueue(crashes[1])
+ obj.Enqueue(crashes[0])
+ assert.Equal(t, crashes[0], obj.popCrash())
+ assert.Equal(t, crashes[1], obj.popCrash())
+ assert.Equal(t, crashes[2], obj.popCrash())
+
+ obj.Enqueue(crashes[1])
+ obj.Enqueue(crashes[0])
+ obj.Enqueue(crashes[2])
+ assert.Equal(t, crashes[0], obj.popCrash())
+ assert.Equal(t, crashes[1], obj.popCrash())
+ assert.Equal(t, crashes[2], obj.popCrash())
+}
+
+func TestReproRWRace(t *testing.T) {
+ mock := &reproMgrMock{
+ run: make(chan runCallback),
+ }
+ obj := NewReproLoop(mock, 3, false)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ go func() {
+ obj.Loop(ctx) // calls runRepro()
+ }()
+
+ assert.False(t, obj.CanReproMore())
+ obj.StartReproduction()
+ assert.True(t, obj.CanReproMore())
+
+ obj.Enqueue(&Crash{Report: &report.Report{Title: "A"}})
+ obj.Enqueue(&Crash{Report: &report.Report{Title: "A"}})
+
+ assert.True(t, mock.NeedRepro(nil))
+ called := <-mock.run
+ called.ret <- &ReproResult{}
+ // Pretend that processRepro() is finished and
+ // we've written "repro.prog" to the disk.
+ mock.reproProgExist.Store(true)
+ assert.False(t, mock.NeedRepro(nil))
+ assert.True(t, obj.CanReproMore())
+
+ called2 := <-mock.run
+ called2.ret <- &ReproResult{}
+ assert.False(t, mock.NeedRepro(nil))
+ assert.True(t, obj.CanReproMore())
+
+ // Reproducers may be still running.
+ mock.onVMShutdown(t, obj)
+}
+
+type reproMgrMock struct {
+ reserved atomic.Int64
+ run chan runCallback
+ reproProgExist atomic.Bool
+}
+
+type runCallback struct {
+ crash *Crash
+ ret chan *ReproResult
+}
+
+// Wait until the number of reserved VMs goes to 0.
+func (m *reproMgrMock) onVMShutdown(t *testing.T, reproLoop *ReproLoop) {
+ for i := 0; i < 100; i++ {
+ if m.reserved.Load() == 0 {
+ assert.True(t, reproLoop.CanReproMore())
+ assert.True(t, reproLoop.Empty())
+ return
+ }
+ time.Sleep(10 * time.Millisecond)
+ }
+ t.Fatal("reserved VMs must have dropped to 0")
+}
+
+func (m *reproMgrMock) RunRepro(crash *Crash) *ReproResult {
+ retCh := make(chan *ReproResult)
+ m.run <- runCallback{crash: crash, ret: retCh}
+ ret := <-retCh
+ close(retCh)
+ return ret
+}
+
+func (m *reproMgrMock) NeedRepro(crash *Crash) bool {
+ return !m.reproProgExist.Load()
+}
+
+func (m *reproMgrMock) ResizeReproPool(VMs int) {
+ m.reserved.Store(int64(VMs))
+}