aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--pkg/manager/repro.go261
-rw-r--r--pkg/manager/repro_test.go (renamed from syz-manager/repro_test.go)38
-rw-r--r--syz-manager/http.go4
-rw-r--r--syz-manager/hub.go9
-rw-r--r--syz-manager/manager.go146
-rw-r--r--syz-manager/repro.go230
-rw-r--r--syz-manager/snapshot.go5
7 files changed, 349 insertions, 344 deletions
diff --git a/pkg/manager/repro.go b/pkg/manager/repro.go
new file mode 100644
index 000000000..9858a4629
--- /dev/null
+++ b/pkg/manager/repro.go
@@ -0,0 +1,261 @@
+// Copyright 2024 syzkaller project authors. All rights reserved.
+// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
+
+package manager
+
+import (
+ "context"
+ "fmt"
+ "maps"
+ "slices"
+ "sync"
+
+ "github.com/google/syzkaller/pkg/log"
+ "github.com/google/syzkaller/pkg/report"
+ "github.com/google/syzkaller/pkg/repro"
+ "github.com/google/syzkaller/pkg/stat"
+)
+
+type ReproResult struct {
+ Crash *Crash // the original crash
+ Repro *repro.Result
+ Strace *repro.StraceResult
+ Stats *repro.Stats
+ Err error
+}
+
+type Crash struct {
+ InstanceIndex int
+ FromHub bool // this crash was created based on a repro from syz-hub
+ FromDashboard bool // .. or from dashboard
+ Manual bool
+ *report.Report
+}
+
+func (c *Crash) FullTitle() string {
+ if c.Report.Title != "" {
+ return c.Report.Title
+ }
+ // Just use some unique, but stable titles.
+ if c.FromDashboard {
+ return fmt.Sprintf("dashboard crash %p", c)
+ } else if c.FromHub {
+ return fmt.Sprintf("crash from hub %p", c)
+ }
+ panic("the crash is expected to have a report")
+}
+
+type ReproManagerView interface {
+ RunRepro(crash *Crash) *ReproResult
+ NeedRepro(crash *Crash) bool
+ ResizeReproPool(size int)
+}
+
+type ReproLoop struct {
+ statNumReproducing *stat.Val
+ statPending *stat.Val
+
+ onlyOnce bool
+ mgr ReproManagerView
+ parallel chan struct{}
+ pingQueue chan struct{}
+ reproVMs int
+
+ mu sync.Mutex
+ queue []*Crash
+ reproducing map[string]bool
+ attempted map[string]bool
+}
+
+func NewReproLoop(mgr ReproManagerView, reproVMs int, onlyOnce bool) *ReproLoop {
+ ret := &ReproLoop{
+ mgr: mgr,
+ onlyOnce: onlyOnce,
+ parallel: make(chan struct{}, reproVMs),
+ reproVMs: reproVMs,
+ reproducing: map[string]bool{},
+ pingQueue: make(chan struct{}, 1),
+ attempted: map[string]bool{},
+ }
+ ret.statNumReproducing = stat.New("reproducing", "Number of crashes being reproduced",
+ stat.Console, stat.NoGraph, func() int {
+ ret.mu.Lock()
+ defer ret.mu.Unlock()
+ return len(ret.reproducing)
+ })
+ ret.statPending = stat.New("pending", "Number of pending repro tasks",
+ stat.Console, stat.NoGraph, func() int {
+ ret.mu.Lock()
+ defer ret.mu.Unlock()
+ return len(ret.queue)
+ })
+ return ret
+}
+
+// startReproduction() is assumed to be called only once.
+// The agument is the maximum number of VMs dedicated to the bug reproduction.
+func (r *ReproLoop) StartReproduction() {
+ count := 0
+ for ; r.calculateReproVMs(count+1) <= r.reproVMs; count++ {
+ r.parallel <- struct{}{}
+ }
+ log.Logf(0, "starting bug reproductions (max %d VMs, %d repros)", r.reproVMs, count)
+}
+
+func (r *ReproLoop) calculateReproVMs(repros int) int {
+ // Let's allocate 1.33 VMs per a reproducer thread.
+ if r.reproVMs == 1 && repros == 1 {
+ // With one exception -- if we have only one VM, let's still do one repro.
+ return 1
+ }
+ return (repros*4 + 2) / 3
+}
+
+func (r *ReproLoop) CanReproMore() bool {
+ return len(r.parallel) != 0
+}
+
+func (r *ReproLoop) Reproducing() map[string]bool {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ return maps.Clone(r.reproducing)
+}
+
+// Empty returns true if there are neither running nor planned bug reproductions.
+func (r *ReproLoop) Empty() bool {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ return len(r.reproducing) == 0 && len(r.queue) == 0
+}
+
+func (r *ReproLoop) Enqueue(crash *Crash) {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+
+ title := crash.FullTitle()
+ if r.onlyOnce && r.attempted[title] {
+ // Try to reproduce each bug at most 1 time in this mode.
+ // Since we don't upload bugs/repros to dashboard, it likely won't have
+ // the reproducer even if we succeeded last time, and will repeatedly
+ // say it needs a repro.
+ return
+ }
+ log.Logf(1, "scheduled a reproduction of '%v'", title)
+ r.attempted[title] = true
+ r.queue = append(r.queue, crash)
+
+ // Ping the loop.
+ select {
+ case r.pingQueue <- struct{}{}:
+ default:
+ }
+}
+
+func (r *ReproLoop) popCrash() *Crash {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+
+ newBetter := func(base, new *Crash) bool {
+ // First, serve manual requests.
+ if new.Manual != base.Manual {
+ return new.Manual
+ }
+ // Then, deprioritize hub reproducers.
+ if new.FromHub != base.FromHub {
+ return !new.FromHub
+ }
+ return false
+ }
+
+ idx := -1
+ for i, crash := range r.queue {
+ if r.reproducing[crash.FullTitle()] {
+ continue
+ }
+ if idx == -1 || newBetter(r.queue[idx], r.queue[i]) {
+ idx = i
+ }
+ }
+ if idx == -1 {
+ return nil
+ }
+ crash := r.queue[idx]
+ r.queue = slices.Delete(r.queue, idx, idx+1)
+ return crash
+}
+
+func (r *ReproLoop) Loop(ctx context.Context) {
+ var wg sync.WaitGroup
+ defer wg.Wait()
+
+ for {
+ crash := r.popCrash()
+ for crash == nil {
+ select {
+ case <-r.pingQueue:
+ crash = r.popCrash()
+ case <-ctx.Done():
+ return
+ }
+ if crash == nil || !r.mgr.NeedRepro(crash) {
+ continue
+ }
+ }
+
+ // Now wait until we can schedule another runner.
+ select {
+ case <-r.parallel:
+ case <-ctx.Done():
+ return
+ }
+
+ r.mu.Lock()
+ r.reproducing[crash.FullTitle()] = true
+ r.adjustPoolSizeLocked()
+ r.mu.Unlock()
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+
+ r.handle(crash)
+
+ r.mu.Lock()
+ delete(r.reproducing, crash.FullTitle())
+ r.adjustPoolSizeLocked()
+ r.mu.Unlock()
+
+ r.parallel <- struct{}{}
+ r.pingQueue <- struct{}{}
+ }()
+ }
+}
+
+func (r *ReproLoop) handle(crash *Crash) {
+ log.Logf(0, "start reproducing '%v'", crash.FullTitle())
+
+ res := r.mgr.RunRepro(crash)
+
+ crepro := false
+ title := ""
+ if res.Repro != nil {
+ crepro = res.Repro.CRepro
+ title = res.Repro.Report.Title
+ }
+ log.Logf(0, "repro finished '%v', repro=%v crepro=%v desc='%v' hub=%v from_dashboard=%v",
+ crash.FullTitle(), res.Repro != nil, crepro, title, crash.FromHub, crash.FromDashboard,
+ )
+}
+
+func (r *ReproLoop) adjustPoolSizeLocked() {
+ // Avoid the +-1 jitter by considering the repro queue size as well.
+ // We process same-titled crashes sequentially, so only count unique ones.
+ uniqueTitles := maps.Clone(r.reproducing)
+ for _, crash := range r.queue {
+ uniqueTitles[crash.FullTitle()] = true
+ }
+
+ needRepros := len(uniqueTitles)
+ VMs := min(r.reproVMs, r.calculateReproVMs(needRepros))
+ r.mgr.ResizeReproPool(VMs)
+}
diff --git a/syz-manager/repro_test.go b/pkg/manager/repro_test.go
index 84ab94a6b..80d224fd5 100644
--- a/syz-manager/repro_test.go
+++ b/pkg/manager/repro_test.go
@@ -1,7 +1,7 @@
// Copyright 2024 syzkaller project authors. All rights reserved.
// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
-package main
+package manager
import (
"context"
@@ -17,7 +17,7 @@ func TestReproManager(t *testing.T) {
mock := &reproMgrMock{
run: make(chan runCallback),
}
- obj := newReproManager(mock, 3, false)
+ obj := NewReproLoop(mock, 3, false)
ctx, done := context.WithCancel(context.Background())
complete := make(chan struct{})
@@ -53,8 +53,8 @@ func TestReproManager(t *testing.T) {
assert.EqualValues(t, 3, mock.reserved.Load())
// Pretend that reproducers have finished.
- called.ret <- &ReproResult{crash: &Crash{fromHub: true}}
- called2.ret <- &ReproResult{crash: &Crash{fromHub: true}}
+ called.ret <- &ReproResult{Crash: &Crash{FromHub: true}}
+ called2.ret <- &ReproResult{Crash: &Crash{FromHub: true}}
mock.onVMShutdown(t, obj)
}
@@ -63,22 +63,22 @@ func TestReproOrder(t *testing.T) {
mock := &reproMgrMock{
run: make(chan runCallback),
}
- obj := newReproManager(mock, 3, false)
+ obj := NewReproLoop(mock, 3, false)
// The right order is A B C.
crashes := []*Crash{
{
Report: &report.Report{Title: "A"},
- fromDashboard: true,
- manual: true,
+ FromDashboard: true,
+ Manual: true,
},
{
Report: &report.Report{Title: "B"},
- fromDashboard: true,
+ FromDashboard: true,
},
{
Report: &report.Report{Title: "C"},
- fromHub: true,
+ FromHub: true,
},
}
@@ -101,7 +101,7 @@ func TestReproRWRace(t *testing.T) {
mock := &reproMgrMock{
run: make(chan runCallback),
}
- obj := newReproManager(mock, 3, false)
+ obj := NewReproLoop(mock, 3, false)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -117,18 +117,18 @@ func TestReproRWRace(t *testing.T) {
obj.Enqueue(&Crash{Report: &report.Report{Title: "A"}})
obj.Enqueue(&Crash{Report: &report.Report{Title: "A"}})
- assert.True(t, mock.needRepro(nil))
+ assert.True(t, mock.NeedRepro(nil))
called := <-mock.run
called.ret <- &ReproResult{}
// Pretend that processRepro() is finished and
// we've written "repro.prog" to the disk.
mock.reproProgExist.Store(true)
- assert.False(t, mock.needRepro(nil))
+ assert.False(t, mock.NeedRepro(nil))
assert.True(t, obj.CanReproMore())
called2 := <-mock.run
called2.ret <- &ReproResult{}
- assert.False(t, mock.needRepro(nil))
+ assert.False(t, mock.NeedRepro(nil))
assert.True(t, obj.CanReproMore())
// Reproducers may be still running.
@@ -147,11 +147,11 @@ type runCallback struct {
}
// Wait until the number of reserved VMs goes to 0.
-func (m *reproMgrMock) onVMShutdown(t *testing.T, reproMgr *reproManager) {
+func (m *reproMgrMock) onVMShutdown(t *testing.T, reproLoop *ReproLoop) {
for i := 0; i < 100; i++ {
if m.reserved.Load() == 0 {
- assert.True(t, reproMgr.CanReproMore())
- assert.True(t, reproMgr.Empty())
+ assert.True(t, reproLoop.CanReproMore())
+ assert.True(t, reproLoop.Empty())
return
}
time.Sleep(10 * time.Millisecond)
@@ -159,7 +159,7 @@ func (m *reproMgrMock) onVMShutdown(t *testing.T, reproMgr *reproManager) {
t.Fatal("reserved VMs must have dropped to 0")
}
-func (m *reproMgrMock) runRepro(crash *Crash) *ReproResult {
+func (m *reproMgrMock) RunRepro(crash *Crash) *ReproResult {
retCh := make(chan *ReproResult)
m.run <- runCallback{crash: crash, ret: retCh}
ret := <-retCh
@@ -167,10 +167,10 @@ func (m *reproMgrMock) runRepro(crash *Crash) *ReproResult {
return ret
}
-func (m *reproMgrMock) needRepro(crash *Crash) bool {
+func (m *reproMgrMock) NeedRepro(crash *Crash) bool {
return !m.reproProgExist.Load()
}
-func (m *reproMgrMock) resizeReproPool(VMs int) {
+func (m *reproMgrMock) ResizeReproPool(VMs int) {
m.reserved.Store(int64(VMs))
}
diff --git a/syz-manager/http.go b/syz-manager/http.go
index 34cca4d14..4e4b92d00 100644
--- a/syz-manager/http.go
+++ b/syz-manager/http.go
@@ -630,8 +630,8 @@ func (mgr *Manager) httpFilterPCs(w http.ResponseWriter, r *http.Request) {
func (mgr *Manager) collectCrashes(workdir string) ([]*UICrashType, error) {
// Note: mu is not locked here.
var repros map[string]bool
- if !mgr.cfg.VMLess && mgr.reproMgr != nil {
- repros = mgr.reproMgr.Reproducing()
+ if !mgr.cfg.VMLess && mgr.reproLoop != nil {
+ repros = mgr.reproLoop.Reproducing()
}
crashdir := filepath.Join(workdir, "crashes")
diff --git a/syz-manager/hub.go b/syz-manager/hub.go
index 5fd2356f6..58fac33f2 100644
--- a/syz-manager/hub.go
+++ b/syz-manager/hub.go
@@ -14,6 +14,7 @@ import (
"github.com/google/syzkaller/pkg/flatrpc"
"github.com/google/syzkaller/pkg/fuzzer"
"github.com/google/syzkaller/pkg/log"
+ "github.com/google/syzkaller/pkg/manager"
"github.com/google/syzkaller/pkg/mgrconfig"
"github.com/google/syzkaller/pkg/report"
"github.com/google/syzkaller/pkg/report/crash"
@@ -60,7 +61,7 @@ func (mgr *Manager) hubSyncLoop(keyGet keyGetter) {
}
if mgr.cfg.Reproduce && mgr.dash != nil {
// Request reproducers from hub only if there is nothing else to reproduce.
- hc.needMoreRepros = mgr.reproMgr.Empty
+ hc.needMoreRepros = mgr.reproLoop.Empty
}
hc.loop()
}
@@ -75,7 +76,7 @@ type HubConnector struct {
fresh bool
hubCorpus map[string]bool
newRepros [][]byte
- hubReproQueue chan *Crash
+ hubReproQueue chan *manager.Crash
needMoreRepros func() bool
keyGet keyGetter
@@ -304,8 +305,8 @@ func (hc *HubConnector) processRepros(repros [][]byte) int {
if hc.leak {
typ = crash.MemoryLeak
}
- hc.hubReproQueue <- &Crash{
- fromHub: true,
+ hc.hubReproQueue <- &manager.Crash{
+ FromHub: true,
Report: &report.Report{
Type: typ,
Output: repro,
diff --git a/syz-manager/manager.go b/syz-manager/manager.go
index 90aa4198a..fc38b74ed 100644
--- a/syz-manager/manager.go
+++ b/syz-manager/manager.go
@@ -34,6 +34,7 @@ import (
"github.com/google/syzkaller/pkg/gce"
"github.com/google/syzkaller/pkg/hash"
"github.com/google/syzkaller/pkg/log"
+ "github.com/google/syzkaller/pkg/manager"
"github.com/google/syzkaller/pkg/mgrconfig"
"github.com/google/syzkaller/pkg/osutil"
"github.com/google/syzkaller/pkg/report"
@@ -112,15 +113,15 @@ type Manager struct {
dataRaceFrames map[string]bool
saturatedCalls map[string]bool
- externalReproQueue chan *Crash
- crashes chan *Crash
+ externalReproQueue chan *manager.Crash
+ crashes chan *manager.Crash
benchMu sync.Mutex
benchFile *os.File
assetStorage *asset.Storage
- reproMgr *reproManager
+ reproLoop *manager.ReproLoop
Stats
}
@@ -153,27 +154,6 @@ const (
const currentDBVersion = 5
-type Crash struct {
- instanceIndex int
- fromHub bool // this crash was created based on a repro from syz-hub
- fromDashboard bool // .. or from dashboard
- manual bool
- *report.Report
-}
-
-func (c *Crash) FullTitle() string {
- if c.Report.Title != "" {
- return c.Report.Title
- }
- // Just use some unique, but stable titles.
- if c.fromDashboard {
- return fmt.Sprintf("dashboard crash %p", c)
- } else if c.fromHub {
- return fmt.Sprintf("crash from hub %p", c)
- }
- panic("the crash is expected to have a report")
-}
-
func main() {
if prog.GitRevision == "" {
log.Fatalf("bad syz-manager build: build with make, run bin/syz-manager")
@@ -249,8 +229,8 @@ func RunManager(mode Mode, cfg *mgrconfig.Config) {
memoryLeakFrames: make(map[string]bool),
dataRaceFrames: make(map[string]bool),
fresh: true,
- externalReproQueue: make(chan *Crash, 10),
- crashes: make(chan *Crash, 10),
+ externalReproQueue: make(chan *manager.Crash, 10),
+ crashes: make(chan *manager.Crash, 10),
saturatedCalls: make(map[string]bool),
}
@@ -313,10 +293,10 @@ func RunManager(mode Mode, cfg *mgrconfig.Config) {
return
}
mgr.pool = vm.NewDispatcher(mgr.vmPool, mgr.fuzzerInstance)
- mgr.reproMgr = newReproManager(mgr, mgr.vmPool.Count()-mgr.cfg.FuzzingVMs, mgr.cfg.DashboardOnlyRepro)
+ mgr.reproLoop = manager.NewReproLoop(mgr, mgr.vmPool.Count()-mgr.cfg.FuzzingVMs, mgr.cfg.DashboardOnlyRepro)
ctx := vm.ShutdownCtx()
go mgr.processFuzzingResults(ctx)
- go mgr.reproMgr.Loop(ctx)
+ go mgr.reproLoop.Loop(ctx)
mgr.pool.Loop(ctx)
}
@@ -378,14 +358,6 @@ func (mgr *Manager) writeBench() {
}
}
-type ReproResult struct {
- crash *Crash // the original crash
- repro *repro.Result
- strace *repro.StraceResult
- stats *repro.Stats
- err error
-}
-
func (mgr *Manager) processFuzzingResults(ctx context.Context) {
for {
select {
@@ -394,7 +366,7 @@ func (mgr *Manager) processFuzzingResults(ctx context.Context) {
case crash := <-mgr.crashes:
needRepro := mgr.saveCrash(crash)
if mgr.cfg.Reproduce && needRepro {
- mgr.reproMgr.Enqueue(crash)
+ mgr.reproLoop.Enqueue(crash)
}
case err := <-mgr.pool.BootErrors:
crash := mgr.convertBootError(err)
@@ -402,14 +374,14 @@ func (mgr *Manager) processFuzzingResults(ctx context.Context) {
mgr.saveCrash(crash)
}
case crash := <-mgr.externalReproQueue:
- if mgr.needRepro(crash) {
- mgr.reproMgr.Enqueue(crash)
+ if mgr.NeedRepro(crash) {
+ mgr.reproLoop.Enqueue(crash)
}
}
}
}
-func (mgr *Manager) convertBootError(err error) *Crash {
+func (mgr *Manager) convertBootError(err error) *manager.Crash {
var bootErr vm.BootErrorer
if errors.As(err, &bootErr) {
title, output := bootErr.BootError()
@@ -424,7 +396,7 @@ func (mgr *Manager) convertBootError(err error) *Crash {
Output: output,
}
}
- return &Crash{
+ return &manager.Crash{
Report: rep,
}
}
@@ -453,13 +425,13 @@ func reportReproError(err error) {
log.Errorf("repro failed: %v", err)
}
-func (mgr *Manager) runRepro(crash *Crash) *ReproResult {
+func (mgr *Manager) RunRepro(crash *manager.Crash) *manager.ReproResult {
res, stats, err := repro.Run(crash.Output, mgr.cfg, mgr.enabledFeatures, mgr.reporter, mgr.pool)
- ret := &ReproResult{
- crash: crash,
- repro: res,
- stats: stats,
- err: err,
+ ret := &manager.ReproResult{
+ Crash: crash,
+ Repro: res,
+ Stats: stats,
+ Err: err,
}
if err == nil && res != nil && mgr.cfg.StraceBin != "" {
const straceAttempts = 2
@@ -471,7 +443,7 @@ func (mgr *Manager) runRepro(crash *Crash) *ReproResult {
// We only want to save strace output if it resulted in the same bug.
// Otherwise, it will be hard to reproduce on syzbot and will confuse users.
if sameBug {
- ret.strace = strace
+ ret.Strace = strace
break
}
}
@@ -482,17 +454,17 @@ func (mgr *Manager) runRepro(crash *Crash) *ReproResult {
return ret
}
-func (mgr *Manager) processRepro(res *ReproResult) {
- if res.err != nil {
- reportReproError(res.err)
+func (mgr *Manager) processRepro(res *manager.ReproResult) {
+ if res.Err != nil {
+ reportReproError(res.Err)
}
- if res.repro == nil {
- if res.crash.Title == "" {
+ if res.Repro == nil {
+ if res.Crash.Title == "" {
log.Logf(1, "repro '%v' not from dashboard, so not reporting the failure",
- res.crash.FullTitle())
+ res.Crash.FullTitle())
} else {
- log.Logf(1, "report repro failure of '%v'", res.crash.Title)
- mgr.saveFailedRepro(res.crash.Report, res.stats)
+ log.Logf(1, "report repro failure of '%v'", res.Crash.Title)
+ mgr.saveFailedRepro(res.Crash.Report, res.Stats)
}
} else {
mgr.saveRepro(res)
@@ -766,8 +738,8 @@ func (mgr *Manager) fuzzerInstance(ctx context.Context, inst *vm.Instance, updIn
rep.MachineInfo = machineInfo
}
if err == nil && rep != nil {
- mgr.crashes <- &Crash{
- instanceIndex: inst.Index(),
+ mgr.crashes <- &manager.Crash{
+ InstanceIndex: inst.Index(),
Report: rep,
}
}
@@ -820,7 +792,7 @@ func (mgr *Manager) runInstanceInner(ctx context.Context, inst *vm.Instance, inj
return rep, vmInfo, nil
}
-func (mgr *Manager) emailCrash(crash *Crash) {
+func (mgr *Manager) emailCrash(crash *manager.Crash) {
if len(mgr.cfg.EmailAddrs) == 0 {
return
}
@@ -835,7 +807,7 @@ func (mgr *Manager) emailCrash(crash *Crash) {
}
}
-func (mgr *Manager) saveCrash(crash *Crash) bool {
+func (mgr *Manager) saveCrash(crash *manager.Crash) bool {
if err := mgr.reporter.Symbolize(crash.Report); err != nil {
log.Errorf("failed to symbolize report: %v", err)
}
@@ -856,7 +828,7 @@ func (mgr *Manager) saveCrash(crash *Crash) bool {
if crash.Suppressed {
flags += " [suppressed]"
}
- log.Logf(0, "VM %v: crash: %v%v", crash.instanceIndex, crash.Title, flags)
+ log.Logf(0, "VM %v: crash: %v%v", crash.InstanceIndex, crash.Title, flags)
if mgr.mode == ModeSmokeTest {
data, err := json.Marshal(crash.Report)
@@ -949,12 +921,12 @@ func (mgr *Manager) saveCrash(crash *Crash) bool {
writeOrRemove("tag", []byte(mgr.cfg.Tag))
writeOrRemove("report", crash.Report.Report)
writeOrRemove("machineInfo", crash.MachineInfo)
- return mgr.needRepro(crash)
+ return mgr.NeedRepro(crash)
}
const maxReproAttempts = 3
-func (mgr *Manager) needLocalRepro(crash *Crash) bool {
+func (mgr *Manager) needLocalRepro(crash *manager.Crash) bool {
if !mgr.cfg.Reproduce || crash.Corrupted || crash.Suppressed {
return false
}
@@ -971,11 +943,11 @@ func (mgr *Manager) needLocalRepro(crash *Crash) bool {
return false
}
-func (mgr *Manager) needRepro(crash *Crash) bool {
+func (mgr *Manager) NeedRepro(crash *manager.Crash) bool {
if !mgr.cfg.Reproduce {
return false
}
- if crash.fromHub || crash.fromDashboard {
+ if crash.FromHub || crash.FromDashboard {
return true
}
if !mgr.checkDone.Load() || (mgr.enabledFeatures&flatrpc.FeatureLeak != 0 &&
@@ -1043,13 +1015,13 @@ func (mgr *Manager) saveFailedRepro(rep *report.Report, stats *repro.Stats) {
}
}
-func (mgr *Manager) saveRepro(res *ReproResult) {
- repro := res.repro
+func (mgr *Manager) saveRepro(res *manager.ReproResult) {
+ repro := res.Repro
opts := fmt.Sprintf("# %+v\n", repro.Opts)
progText := repro.Prog.Serialize()
// Append this repro to repro list to send to hub if it didn't come from hub originally.
- if !res.crash.fromHub {
+ if !res.Crash.FromHub {
progForHub := []byte(fmt.Sprintf("# %+v\n# %v\n# %v\n%s",
repro.Opts, repro.Report.Title, mgr.cfg.Tag, progText))
mgr.mu.Lock()
@@ -1082,11 +1054,11 @@ func (mgr *Manager) saveRepro(res *ReproResult) {
output := report.Output
var crashFlags dashapi.CrashFlags
- if res.strace != nil {
+ if res.Strace != nil {
// If syzkaller managed to successfully run the repro with strace, send
// the report and the output generated under strace.
- report = res.strace.Report
- output = res.strace.Output
+ report = res.Strace.Report
+ output = res.Strace.Output
crashFlags = dashapi.CrashUnderStrace
}
@@ -1102,9 +1074,9 @@ func (mgr *Manager) saveRepro(res *ReproResult) {
ReproOpts: repro.Opts.Serialize(),
ReproSyz: progText,
ReproC: cprogText,
- ReproLog: truncateReproLog(res.stats.FullLog()),
+ ReproLog: truncateReproLog(res.Stats.FullLog()),
Assets: mgr.uploadReproAssets(repro),
- OriginalTitle: res.crash.Title,
+ OriginalTitle: res.Crash.Title,
}
setGuiltyFiles(dc, report)
if _, err := mgr.dash.ReportCrash(dc); err != nil {
@@ -1142,22 +1114,22 @@ func (mgr *Manager) saveRepro(res *ReproResult) {
log.Logf(0, "failed to write crash asset: type %d, write error %v", typ, err)
}
})
- if res.strace != nil {
+ if res.Strace != nil {
// Unlike dashboard reporting, we save strace output separately from the original log.
- if res.strace.Error != nil {
+ if res.Strace.Error != nil {
osutil.WriteFile(filepath.Join(dir, "strace.error"),
- []byte(fmt.Sprintf("%v", res.strace.Error)))
+ []byte(fmt.Sprintf("%v", res.Strace.Error)))
}
- if len(res.strace.Output) > 0 {
- osutil.WriteFile(filepath.Join(dir, "strace.log"), res.strace.Output)
+ if len(res.Strace.Output) > 0 {
+ osutil.WriteFile(filepath.Join(dir, "strace.log"), res.Strace.Output)
}
}
- if reproLog := res.stats.FullLog(); len(reproLog) > 0 {
+ if reproLog := res.Stats.FullLog(); len(reproLog) > 0 {
osutil.WriteFile(filepath.Join(dir, "repro.stats"), reproLog)
}
}
-func (mgr *Manager) resizeReproPool(size int) {
+func (mgr *Manager) ResizeReproPool(size int) {
mgr.pool.ReserveForRun(size)
}
@@ -1534,11 +1506,11 @@ func (mgr *Manager) fuzzerLoop(fuzzer *fuzzer.Fuzzer) {
go mgr.hubSyncLoop(pickGetter(mgr.cfg.HubKey))
} else {
mgr.phase = phaseTriagedHub
- mgr.reproMgr.StartReproduction()
+ mgr.reproLoop.StartReproduction()
}
} else if mgr.phase == phaseQueriedHub {
mgr.phase = phaseTriagedHub
- mgr.reproMgr.StartReproduction()
+ mgr.reproLoop.StartReproduction()
}
mgr.mu.Unlock()
}
@@ -1555,7 +1527,7 @@ func (mgr *Manager) hubIsUnreachable() {
if mgr.phase == phaseTriagedCorpus {
dash = mgr.dash
mgr.phase = phaseTriagedHub
- mgr.reproMgr.StartReproduction()
+ mgr.reproLoop.StartReproduction()
log.Errorf("did not manage to connect to syz-hub; moving forward")
}
mgr.mu.Unlock()
@@ -1646,7 +1618,7 @@ func (mgr *Manager) dashboardReporter() {
func (mgr *Manager) dashboardReproTasks() {
for range time.NewTicker(20 * time.Minute).C {
- if !mgr.reproMgr.CanReproMore() {
+ if !mgr.reproLoop.CanReproMore() {
// We don't need reproducers at the moment.
continue
}
@@ -1656,9 +1628,9 @@ func (mgr *Manager) dashboardReproTasks() {
continue
}
if len(resp.CrashLog) > 0 {
- mgr.externalReproQueue <- &Crash{
- fromDashboard: true,
- manual: resp.Type == dashapi.ManualLog,
+ mgr.externalReproQueue <- &manager.Crash{
+ FromDashboard: true,
+ Manual: resp.Type == dashapi.ManualLog,
Report: &report.Report{
Title: resp.Title,
Output: resp.CrashLog,
diff --git a/syz-manager/repro.go b/syz-manager/repro.go
deleted file mode 100644
index 3232bd9f3..000000000
--- a/syz-manager/repro.go
+++ /dev/null
@@ -1,230 +0,0 @@
-// Copyright 2024 syzkaller project authors. All rights reserved.
-// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
-
-package main
-
-import (
- "context"
- "maps"
- "slices"
- "sync"
-
- "github.com/google/syzkaller/pkg/log"
- "github.com/google/syzkaller/pkg/stat"
-)
-
-type reproManagerView interface {
- runRepro(crash *Crash) *ReproResult // TODO: consider moving runRepro() to repro.go.
- needRepro(crash *Crash) bool
- resizeReproPool(size int)
-}
-
-type reproManager struct {
- statNumReproducing *stat.Val
- statPending *stat.Val
-
- onlyOnce bool
- mgr reproManagerView
- parallel chan struct{}
- pingQueue chan struct{}
- reproVMs int
-
- mu sync.Mutex
- queue []*Crash
- reproducing map[string]bool
- attempted map[string]bool
-}
-
-func newReproManager(mgr reproManagerView, reproVMs int, onlyOnce bool) *reproManager {
- ret := &reproManager{
- mgr: mgr,
- onlyOnce: onlyOnce,
- parallel: make(chan struct{}, reproVMs),
- reproVMs: reproVMs,
- reproducing: map[string]bool{},
- pingQueue: make(chan struct{}, 1),
- attempted: map[string]bool{},
- }
- ret.statNumReproducing = stat.New("reproducing", "Number of crashes being reproduced",
- stat.Console, stat.NoGraph, func() int {
- ret.mu.Lock()
- defer ret.mu.Unlock()
- return len(ret.reproducing)
- })
- ret.statPending = stat.New("pending", "Number of pending repro tasks",
- stat.Console, stat.NoGraph, func() int {
- ret.mu.Lock()
- defer ret.mu.Unlock()
- return len(ret.queue)
- })
- return ret
-}
-
-// startReproduction() is assumed to be called only once.
-// The agument is the maximum number of VMs dedicated to the bug reproduction.
-func (m *reproManager) StartReproduction() {
- count := 0
- for ; m.calculateReproVMs(count+1) <= m.reproVMs; count++ {
- m.parallel <- struct{}{}
- }
- log.Logf(0, "starting bug reproductions (max %d VMs, %d repros)", m.reproVMs, count)
-}
-
-func (m *reproManager) calculateReproVMs(repros int) int {
- // Let's allocate 1.33 VMs per a reproducer thread.
- if m.reproVMs == 1 && repros == 1 {
- // With one exception -- if we have only one VM, let's still do one repro.
- return 1
- }
- return (repros*4 + 2) / 3
-}
-
-func (m *reproManager) CanReproMore() bool {
- return len(m.parallel) != 0
-}
-
-func (m *reproManager) Reproducing() map[string]bool {
- m.mu.Lock()
- defer m.mu.Unlock()
- return maps.Clone(m.reproducing)
-}
-
-// Empty returns true if there are neither running nor planned bug reproductions.
-func (m *reproManager) Empty() bool {
- m.mu.Lock()
- defer m.mu.Unlock()
- return len(m.reproducing) == 0 && len(m.queue) == 0
-}
-
-func (m *reproManager) Enqueue(crash *Crash) {
- m.mu.Lock()
- defer m.mu.Unlock()
-
- title := crash.FullTitle()
- if m.onlyOnce && m.attempted[title] {
- // Try to reproduce each bug at most 1 time in this mode.
- // Since we don't upload bugs/repros to dashboard, it likely won't have
- // the reproducer even if we succeeded last time, and will repeatedly
- // say it needs a repro.
- return
- }
- log.Logf(1, "scheduled a reproduction of '%v'", title)
- m.attempted[title] = true
- m.queue = append(m.queue, crash)
-
- // Ping the loop.
- select {
- case m.pingQueue <- struct{}{}:
- default:
- }
-}
-
-func (m *reproManager) popCrash() *Crash {
- m.mu.Lock()
- defer m.mu.Unlock()
-
- newBetter := func(base, new *Crash) bool {
- // First, serve manual requests.
- if new.manual != base.manual {
- return new.manual
- }
- // Then, deprioritize hub reproducers.
- if new.fromHub != base.fromHub {
- return !new.fromHub
- }
- return false
- }
-
- idx := -1
- for i, crash := range m.queue {
- if m.reproducing[crash.FullTitle()] {
- continue
- }
- if idx == -1 || newBetter(m.queue[idx], m.queue[i]) {
- idx = i
- }
- }
- if idx == -1 {
- return nil
- }
- crash := m.queue[idx]
- m.queue = slices.Delete(m.queue, idx, idx+1)
- return crash
-}
-
-func (m *reproManager) Loop(ctx context.Context) {
- var wg sync.WaitGroup
- defer wg.Wait()
-
- for {
- crash := m.popCrash()
- for crash == nil {
- select {
- case <-m.pingQueue:
- crash = m.popCrash()
- case <-ctx.Done():
- return
- }
- if crash == nil || !m.mgr.needRepro(crash) {
- continue
- }
- }
-
- // Now wait until we can schedule another runner.
- select {
- case <-m.parallel:
- case <-ctx.Done():
- return
- }
-
- m.mu.Lock()
- m.reproducing[crash.FullTitle()] = true
- m.adjustPoolSizeLocked()
- m.mu.Unlock()
-
- wg.Add(1)
- go func() {
- defer wg.Done()
-
- m.handle(crash)
-
- m.mu.Lock()
- delete(m.reproducing, crash.FullTitle())
- m.adjustPoolSizeLocked()
- m.mu.Unlock()
-
- m.parallel <- struct{}{}
- m.pingQueue <- struct{}{}
- }()
- }
-}
-
-func (m *reproManager) handle(crash *Crash) {
- log.Logf(0, "start reproducing '%v'", crash.FullTitle())
-
- res := m.mgr.runRepro(crash)
-
- crepro := false
- title := ""
- if res.repro != nil {
- crepro = res.repro.CRepro
- title = res.repro.Report.Title
- }
- log.Logf(0, "repro finished '%v', repro=%v crepro=%v desc='%v' hub=%v from_dashboard=%v",
- crash.FullTitle(), res.repro != nil, crepro, title, crash.fromHub, crash.fromDashboard,
- )
-}
-
-func (m *reproManager) adjustPoolSizeLocked() {
- // Avoid the +-1 jitter by considering the repro queue size as well.
-
- // We process same-titled crashes sequentially, so only count unique ones.
- uniqueTitles := maps.Clone(m.reproducing)
- for _, crash := range m.queue {
- uniqueTitles[crash.FullTitle()] = true
- }
-
- needRepros := len(uniqueTitles)
- VMs := min(m.reproVMs, m.calculateReproVMs(needRepros))
- m.mgr.resizeReproPool(VMs)
-}
diff --git a/syz-manager/snapshot.go b/syz-manager/snapshot.go
index 1413ac744..962589926 100644
--- a/syz-manager/snapshot.go
+++ b/syz-manager/snapshot.go
@@ -10,10 +10,11 @@ import (
"fmt"
"time"
- "github.com/google/flatbuffers/go"
+ flatbuffers "github.com/google/flatbuffers/go"
"github.com/google/syzkaller/pkg/flatrpc"
"github.com/google/syzkaller/pkg/fuzzer/queue"
"github.com/google/syzkaller/pkg/log"
+ "github.com/google/syzkaller/pkg/manager"
"github.com/google/syzkaller/vm"
"github.com/google/syzkaller/vm/dispatcher"
)
@@ -74,7 +75,7 @@ func (mgr *Manager) snapshotLoop(ctx context.Context, inst *vm.Instance) error {
fmt.Fprintf(buf, "program:\n%s\n", req.Prog.Serialize())
buf.Write(rep.Output)
rep.Output = buf.Bytes()
- mgr.crashes <- &Crash{Report: rep}
+ mgr.crashes <- &manager.Crash{Report: rep}
}
req.Done(res)