aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksandr Nogikh <nogikh@google.com>2024-10-14 22:44:07 +0200
committerTaras Madan <tarasmadan@google.com>2024-10-25 12:08:02 +0000
commit78ccf1e6b3d9e44604faf283b65faa3a5e0825f9 (patch)
treeb9d76d91556434ddbf543f241155ebe8affbb326
parent45491cdceb1295e9120756b1615200b494a5f97b (diff)
pkg/rpcserver: take stats as a dependency
It will enable collecting statistics for several simultaneous RPCServer objects.
-rw-r--r--pkg/fuzzer/queue/stats.go4
-rw-r--r--pkg/rpcserver/local.go1
-rw-r--r--pkg/rpcserver/rpcserver.go44
-rw-r--r--pkg/rpcserver/rpcserver_test.go4
-rw-r--r--syz-manager/manager.go8
-rw-r--r--syz-manager/snapshot.go6
6 files changed, 43 insertions, 24 deletions
diff --git a/pkg/fuzzer/queue/stats.go b/pkg/fuzzer/queue/stats.go
index 3097ca2d6..9cc4c9e5c 100644
--- a/pkg/fuzzer/queue/stats.go
+++ b/pkg/fuzzer/queue/stats.go
@@ -7,10 +7,6 @@ import "github.com/google/syzkaller/pkg/stat"
// Common stats related to fuzzing that are updated/read by different parts of the system.
var (
- StatExecs = stat.New("exec total", "Total test program executions",
- stat.Console, stat.Rate{}, stat.Prometheus("syz_exec_total"))
- StatNumFuzzing = stat.New("fuzzing VMs", "Number of VMs that are currently fuzzing",
- stat.Console, stat.Link("/vms"))
StatNoExecRequests = stat.New("no exec requests",
"Number of times fuzzer was stalled with no exec requests", stat.Rate{})
StatNoExecDuration = stat.New("no exec duration",
diff --git a/pkg/rpcserver/local.go b/pkg/rpcserver/local.go
index 09cd1868d..5faa8334b 100644
--- a/pkg/rpcserver/local.go
+++ b/pkg/rpcserver/local.go
@@ -43,6 +43,7 @@ func RunLocal(cfg *LocalConfig) error {
cfg.FilterSignal = true
cfg.RPC = ":0"
cfg.PrintMachineCheck = log.V(1)
+ cfg.Stats = NewStats()
ctx := &local{
cfg: cfg,
setupDone: make(chan bool),
diff --git a/pkg/rpcserver/rpcserver.go b/pkg/rpcserver/rpcserver.go
index 53181dd2b..76f1ddc68 100644
--- a/pkg/rpcserver/rpcserver.go
+++ b/pkg/rpcserver/rpcserver.go
@@ -31,6 +31,8 @@ import (
type Config struct {
vminfo.Config
+ Stats
+
VMArch string
VMType string
RPC string
@@ -85,15 +87,33 @@ type server struct {
canonicalModules *cover.Canonicalizer
coverFilter []uint64
- mu sync.Mutex
- runners map[int]*Runner
- execSource *queue.Distributor
- triagedCorpus atomic.Bool
- statVMRestarts *stat.Val
+ mu sync.Mutex
+ runners map[int]*Runner
+ execSource *queue.Distributor
+ triagedCorpus atomic.Bool
+
+ Stats
*runnerStats
}
-func New(cfg *mgrconfig.Config, mgr Manager, debug bool) (Server, error) {
+type Stats struct {
+ StatExecs *stat.Val
+ StatNumFuzzing *stat.Val
+ StatVMRestarts *stat.Val
+}
+
+func NewStats() Stats {
+ return Stats{
+ StatExecs: stat.New("exec total", "Total test program executions",
+ stat.Console, stat.Rate{}, stat.Prometheus("syz_exec_total")),
+ StatNumFuzzing: stat.New("fuzzing VMs",
+ "Number of VMs that are currently fuzzing", stat.Graph("fuzzing VMs")),
+ StatVMRestarts: stat.New("vm restarts", "Total number of VM starts",
+ stat.Rate{}, stat.NoGraph),
+ }
+}
+
+func New(cfg *mgrconfig.Config, mgr Manager, stats Stats, debug bool) (Server, error) {
var pcBase uint64
if cfg.KernelObj != "" {
var err error
@@ -121,6 +141,7 @@ func New(cfg *mgrconfig.Config, mgr Manager, debug bool) (Server, error) {
Sandbox: sandbox,
SandboxArg: cfg.SandboxArg,
},
+ Stats: stats,
VMArch: cfg.TargetVMArch,
RPC: cfg.RPC,
VMLess: cfg.VMLess,
@@ -153,8 +174,7 @@ func newImpl(ctx context.Context, cfg *Config, mgr Manager) *server {
baseSource: baseSource,
execSource: queue.Distribute(queue.Retry(baseSource)),
- statVMRestarts: stat.New("vm restarts", "Total number of VM starts",
- stat.Rate{}, stat.NoGraph),
+ Stats: cfg.Stats,
runnerStats: &runnerStats{
statExecRetries: stat.New("exec retries",
"Number of times a test program was restarted because the first run failed",
@@ -162,7 +182,7 @@ func newImpl(ctx context.Context, cfg *Config, mgr Manager) *server {
statExecutorRestarts: stat.New("executor restarts",
"Number of times executor process was restarted", stat.Rate{}, stat.Graph("executor")),
statExecBufferTooSmall: queue.StatExecBufferTooSmall,
- statExecs: queue.StatExecs,
+ statExecs: cfg.Stats.StatExecs,
statNoExecRequests: queue.StatNoExecRequests,
statNoExecDuration: queue.StatNoExecDuration,
},
@@ -205,7 +225,7 @@ func (serv *server) handleConn(conn *flatrpc.Conn) {
} else if err := checkRevisions(connectReq, serv.cfg.Target); err != nil {
log.Fatal(err)
}
- serv.statVMRestarts.Add(1)
+ serv.StatVMRestarts.Add(1)
serv.mu.Lock()
runner := serv.runners[id]
@@ -312,8 +332,8 @@ func (serv *server) connectionLoop(runner *Runner) error {
}
}
- queue.StatNumFuzzing.Add(1)
- defer queue.StatNumFuzzing.Add(-1)
+ serv.StatNumFuzzing.Add(1)
+ defer serv.StatNumFuzzing.Add(-1)
return runner.ConnectionLoop()
}
diff --git a/pkg/rpcserver/rpcserver_test.go b/pkg/rpcserver/rpcserver_test.go
index 3252ddd4f..760ae5e13 100644
--- a/pkg/rpcserver/rpcserver_test.go
+++ b/pkg/rpcserver/rpcserver_test.go
@@ -80,7 +80,7 @@ func TestNew(t *testing.T) {
cfg.Target, err = prog.GetTarget(cfg.TargetOS, cfg.TargetArch)
assert.NoError(t, err)
- serv, err := New(cfg, nil, tt.debug)
+ serv, err := New(cfg, nil, NewStats(), tt.debug)
if tt.expectedErr != nil {
assert.Equal(t, tt.expectedErr, err)
} else if tt.expectsErr {
@@ -195,7 +195,7 @@ func TestHandleConn(t *testing.T) {
cfg.Target.Revision = tt.req.SyzRevision
assert.NoError(t, err)
- s, err := New(cfg, managerMock, debug)
+ s, err := New(cfg, managerMock, NewStats(), debug)
assert.NoError(t, err)
serv := s.(*server)
diff --git a/syz-manager/manager.go b/syz-manager/manager.go
index 2a7c2efe5..ebd4b94cf 100644
--- a/syz-manager/manager.go
+++ b/syz-manager/manager.go
@@ -81,6 +81,7 @@ type Manager struct {
crashStore *manager.CrashStore
serv rpcserver.Server
http *manager.HTTPServer
+ servStats rpcserver.Stats
corpus *corpus.Corpus
corpusDB *db.DB
corpusDBMu sync.Mutex // for concurrent operations on corpusDB
@@ -250,7 +251,8 @@ func RunManager(mode Mode, cfg *mgrconfig.Config) {
go mgr.trackUsedFiles()
// Create RPC server for fuzzers.
- mgr.serv, err = rpcserver.New(mgr.cfg, mgr, *flagDebug)
+ mgr.servStats = rpcserver.NewStats()
+ mgr.serv, err = rpcserver.New(mgr.cfg, mgr, mgr.servStats, *flagDebug)
if err != nil {
log.Fatalf("failed to create rpc server: %v", err)
}
@@ -323,7 +325,7 @@ func (mgr *Manager) heartbeatLoop() {
if mgr.firstConnect.Load() == 0 {
continue
}
- mgr.statFuzzingTime.Add(diff * queue.StatNumFuzzing.Val())
+ mgr.statFuzzingTime.Add(diff * mgr.servStats.StatNumFuzzing.Val())
buf := new(bytes.Buffer)
for _, stat := range stat.Collect(stat.Console) {
fmt.Fprintf(buf, "%v=%v ", stat.Name, stat.Value)
@@ -1307,7 +1309,7 @@ func (mgr *Manager) dashboardReporter() {
FuzzingTime: time.Duration(mgr.statFuzzingTime.Val()) - lastFuzzingTime,
Crashes: uint64(mgr.statCrashes.Val()) - lastCrashes,
SuppressedCrashes: uint64(mgr.statSuppressed.Val()) - lastSuppressedCrashes,
- Execs: uint64(queue.StatExecs.Val()) - lastExecs,
+ Execs: uint64(mgr.servStats.StatExecs.Val()) - lastExecs,
}
if mgr.phase >= phaseTriagedCorpus && !triageInfoSent {
triageInfoSent = true
diff --git a/syz-manager/snapshot.go b/syz-manager/snapshot.go
index 962589926..a7e72132f 100644
--- a/syz-manager/snapshot.go
+++ b/syz-manager/snapshot.go
@@ -20,8 +20,8 @@ import (
)
func (mgr *Manager) snapshotInstance(ctx context.Context, inst *vm.Instance, updInfo dispatcher.UpdateInfo) {
- queue.StatNumFuzzing.Add(1)
- defer queue.StatNumFuzzing.Add(-1)
+ mgr.servStats.StatNumFuzzing.Add(1)
+ defer mgr.servStats.StatNumFuzzing.Add(-1)
updInfo(func(info *dispatcher.Info) {
info.Status = "snapshot fuzzing"
@@ -48,7 +48,7 @@ func (mgr *Manager) snapshotLoop(ctx context.Context, inst *vm.Instance) error {
builder := flatbuffers.NewBuilder(0)
var envFlags flatrpc.ExecEnv
for first := true; ctx.Err() == nil; first = false {
- queue.StatExecs.Add(1)
+ mgr.servStats.StatExecs.Add(1)
req := mgr.snapshotSource.Next(inst.Index())
if first {
envFlags = req.ExecOpts.EnvFlags