aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksandr Nogikh <nogikh@google.com>2024-05-07 17:04:45 +0200
committerDmitry Vyukov <dvyukov@google.com>2024-05-16 15:38:27 +0000
commitb6954dce2f21b8feb1448edaaeeefc22f5ff4944 (patch)
tree6f4358ba6609826b614847707e180662d986f98e
parentf694ecdc179cf43429135188934eed687ae28645 (diff)
pkg/vminfo: run programs interactively
Use the same interfaces as the fuzzer. Now syz-manager no longer needs to treat machine check executions differently.
-rw-r--r--pkg/fuzzer/queue/queue.go121
-rw-r--r--pkg/ipc/ipc.go19
-rw-r--r--pkg/vminfo/features.go22
-rw-r--r--pkg/vminfo/linux_test.go25
-rw-r--r--pkg/vminfo/syscalls.go203
-rw-r--r--pkg/vminfo/vminfo.go27
-rw-r--r--pkg/vminfo/vminfo_test.go61
-rw-r--r--syz-manager/manager.go11
-rw-r--r--syz-manager/rpc.go113
-rw-r--r--tools/syz-execprog/execprog.go74
10 files changed, 377 insertions, 299 deletions
diff --git a/pkg/fuzzer/queue/queue.go b/pkg/fuzzer/queue/queue.go
index cb5aa134b..0c1735243 100644
--- a/pkg/fuzzer/queue/queue.go
+++ b/pkg/fuzzer/queue/queue.go
@@ -4,10 +4,14 @@
package queue
import (
+ "bytes"
"context"
+ "encoding/gob"
+ "fmt"
"sync"
"sync/atomic"
+ "github.com/google/syzkaller/pkg/hash"
"github.com/google/syzkaller/pkg/ipc"
"github.com/google/syzkaller/pkg/signal"
"github.com/google/syzkaller/pkg/stats"
@@ -19,11 +23,16 @@ type Request struct {
NeedSignal SignalType
NeedCover bool
NeedHints bool
+ ExecOpts *ipc.ExecOpts
+
// If specified, the resulting signal for call SignalFilterCall
// will include subset of it even if it's not new.
SignalFilter signal.Signal
SignalFilterCall int
+ ReturnError bool
+ ReturnOutput bool
+
// This stat will be incremented on request completion.
Stat *stats.Val
@@ -78,6 +87,22 @@ func (r *Request) Wait(ctx context.Context) *Result {
}
}
+func (r *Request) hash() hash.Sig {
+ buf := new(bytes.Buffer)
+ if r.ExecOpts != nil {
+ if err := gob.NewEncoder(buf).Encode(r.ExecOpts); err != nil {
+ panic(err)
+ }
+ }
+ return hash.Hash(
+ []byte(fmt.Sprint(r.NeedSignal)),
+ []byte(fmt.Sprint(r.NeedCover)),
+ []byte(fmt.Sprint(r.NeedHints)),
+ r.Prog.Serialize(),
+ buf.Bytes(),
+ )
+}
+
func (r *Request) initChannel() {
r.mu.Lock()
if r.done == nil {
@@ -96,7 +121,17 @@ const (
type Result struct {
Info *ipc.ProgInfo
+ Output []byte
Status Status
+ Error string // More details in case of ExecFailure.
+}
+
+func (r *Result) clone() *Result {
+ ret := *r
+ if ret.Info != nil {
+ ret.Info = ret.Info.Clone()
+ }
+ return &ret
}
func (r *Result) Stop() bool {
@@ -293,3 +328,89 @@ func (pq *PriorityQueue) Next() *Request {
defer pq.mu.Unlock()
return pq.ops.Pop()
}
+
+type DynamicSource struct {
+ value atomic.Pointer[wrapSource]
+}
+
+type wrapSource struct {
+ source Source
+}
+
+func (ds *DynamicSource) Store(source Source) {
+ ds.value.Store(&wrapSource{source})
+}
+
+func (ds *DynamicSource) Next() *Request {
+ val := ds.value.Load()
+ if val == nil || val.source == nil {
+ return nil
+ }
+ return val.source.Next()
+}
+
+// Deduplicator() keeps track of the previously run requests to avoid re-running them.
+type Deduplicator struct {
+ mu sync.Mutex
+ ctx context.Context
+ source Source
+ mm map[hash.Sig]*duplicateState
+}
+
+type duplicateState struct {
+ res *Result
+ queued []*Request // duplicate requests waiting for the result.
+}
+
+func Deduplicate(ctx context.Context, source Source) Source {
+ return &Deduplicator{
+ ctx: ctx,
+ source: source,
+ mm: map[hash.Sig]*duplicateState{},
+ }
+}
+
+func (d *Deduplicator) Next() *Request {
+ for {
+ req := d.source.Next()
+ if req == nil {
+ return nil
+ }
+ hash := req.hash()
+ d.mu.Lock()
+ entry, ok := d.mm[hash]
+ if !ok {
+ d.mm[hash] = &duplicateState{}
+ } else if entry.res == nil {
+ // There's no result yet, put the request to the queue.
+ entry.queued = append(entry.queued, req)
+ } else {
+ // We already know the result.
+ req.Done(entry.res.clone())
+ }
+ d.mu.Unlock()
+ if !ok {
+ // This is the first time we see such a request.
+ req.OnDone(d.onDone)
+ return req
+ }
+ }
+}
+
+func (d *Deduplicator) onDone(req *Request, res *Result) bool {
+ hash := req.hash()
+ clonedRes := res.clone()
+
+ d.mu.Lock()
+ entry := d.mm[hash]
+ queued := entry.queued
+ entry.queued = nil
+ entry.res = clonedRes
+ d.mu.Unlock()
+
+ // Broadcast the result.
+ for _, waitingReq := range queued {
+ waitingReq.Done(res.clone())
+ }
+ return true
+}
diff --git a/pkg/ipc/ipc.go b/pkg/ipc/ipc.go
index 826d3068d..d87cb8b0e 100644
--- a/pkg/ipc/ipc.go
+++ b/pkg/ipc/ipc.go
@@ -6,6 +6,7 @@ package ipc
import (
"fmt"
"io"
+ "maps"
"os"
"os/exec"
"path/filepath"
@@ -98,6 +99,14 @@ type CallInfo struct {
Errno int // call errno (0 if the call was successful)
}
+func (ci *CallInfo) Clone() CallInfo {
+ ret := *ci
+ ret.Signal = slices.Clone(ret.Signal)
+ ret.Cover = slices.Clone(ret.Cover)
+ ret.Comps = maps.Clone(ret.Comps)
+ return ret
+}
+
type ProgInfo struct {
Calls []CallInfo
Extra CallInfo // stores Signal and Cover collected from background threads
@@ -105,6 +114,16 @@ type ProgInfo struct {
Freshness int // number of programs executed in the same process before this one
}
+func (pi *ProgInfo) Clone() *ProgInfo {
+ ret := *pi
+ ret.Extra = ret.Extra.Clone()
+ ret.Calls = slices.Clone(ret.Calls)
+ for i, call := range ret.Calls {
+ ret.Calls[i] = call.Clone()
+ }
+ return &ret
+}
+
func EmptyProgInfo(calls int) *ProgInfo {
info := &ProgInfo{Calls: make([]CallInfo, calls)}
for i := range info.Calls {
diff --git a/pkg/vminfo/features.go b/pkg/vminfo/features.go
index 2ba76a255..ab71e1168 100644
--- a/pkg/vminfo/features.go
+++ b/pkg/vminfo/features.go
@@ -8,8 +8,8 @@ import (
"strings"
"github.com/google/syzkaller/pkg/flatrpc"
+ "github.com/google/syzkaller/pkg/fuzzer/queue"
"github.com/google/syzkaller/pkg/ipc"
- "github.com/google/syzkaller/pkg/rpctype"
"github.com/google/syzkaller/prog"
)
@@ -46,14 +46,24 @@ type featureResult struct {
reason string
}
-func (ctx *checkContext) checkFeatures() {
+func (ctx *checkContext) startFeaturesCheck() {
testProg := ctx.target.DataMmapProg()
for feat := range flatrpc.EnumNamesFeature {
feat := feat
- ctx.pendingRequests++
go func() {
envFlags, execFlags := ctx.featureToFlags(feat)
- res := ctx.execProg(testProg, envFlags, execFlags)
+ req := &queue.Request{
+ Prog: testProg,
+ ReturnOutput: true,
+ ReturnError: true,
+ ExecOpts: &ipc.ExecOpts{
+ EnvFlags: envFlags,
+ ExecFlags: execFlags,
+ SandboxArg: ctx.cfg.SandboxArg,
+ },
+ }
+ ctx.executor.Submit(req)
+ res := req.Wait(ctx.ctx)
reason := ctx.featureSucceeded(feat, testProg, res)
ctx.features <- featureResult{feat, reason}
}()
@@ -161,8 +171,8 @@ func (ctx *checkContext) featureToFlags(feat flatrpc.Feature) (ipc.EnvFlags, ipc
// This generally checks that just all syscalls were executed and succeed,
// for coverage features we also check that we got actual coverage.
func (ctx *checkContext) featureSucceeded(feat flatrpc.Feature, testProg *prog.Prog,
- res rpctype.ExecutionResult) string {
- if res.Error != "" {
+ res *queue.Result) string {
+ if res.Status != queue.Success {
if len(res.Output) != 0 {
return string(res.Output)
}
diff --git a/pkg/vminfo/linux_test.go b/pkg/vminfo/linux_test.go
index e9be133c1..2a09a66cf 100644
--- a/pkg/vminfo/linux_test.go
+++ b/pkg/vminfo/linux_test.go
@@ -21,14 +21,6 @@ import (
func TestLinuxSyscalls(t *testing.T) {
cfg := testConfig(t, targets.Linux, targets.AMD64)
checker := New(cfg)
- _, checkProgs := checker.StartCheck()
- t.Logf("got %v test programs", len(checkProgs))
- if len(checkProgs) > 1000 {
- // This is just a sanity check that we don't do something stupid accidentally.
- // If it grows above the limit intentionally, the limit can be increased.
- // Currently we have 641 (when we failed to properly dedup syscall tests, it was 4349).
- t.Fatal("too many test programs")
- }
filesystems := []string{
// Without sysfs, the checks would also disable mount().
"", "sysfs", "ext4", "binder", "",
@@ -45,8 +37,10 @@ func TestLinuxSyscalls(t *testing.T) {
Data: []byte(strings.Join(filesystems, "\nnodev\t")),
},
}
- results, featureInfos := createSuccessfulResults(t, cfg.Target, checkProgs)
- enabled, disabled, features, err := checker.FinishCheck(files, results, featureInfos)
+ stop := make(chan struct{})
+ go createSuccessfulResults(checker, stop)
+ enabled, disabled, features, err := checker.Run(files, allFeatures())
+ close(stop)
if err != nil {
t.Fatal(err)
}
@@ -70,6 +64,17 @@ func TestLinuxSyscalls(t *testing.T) {
}
t.Errorf("disabled call %v: %v", call.Name, reason)
}
+ for _, id := range cfg.Syscalls {
+ call := cfg.Target.Syscalls[id]
+ if enabled[call] && disabled[call] != "" {
+ t.Fatalf("%s is both enabled and disabled", call.Name)
+ }
+ expected := !expectDisabled[call.Name]
+ got := enabled[call]
+ if expected != got {
+ t.Errorf("%s: expected %t, got %t", call.Name, expected, got)
+ }
+ }
expectEnabled := len(cfg.Syscalls) - len(expectDisabled)
if len(enabled) != expectEnabled {
t.Errorf("enabled only %v calls out of %v", len(enabled), expectEnabled)
diff --git a/pkg/vminfo/syscalls.go b/pkg/vminfo/syscalls.go
index 78cc973c8..863c995bd 100644
--- a/pkg/vminfo/syscalls.go
+++ b/pkg/vminfo/syscalls.go
@@ -4,18 +4,15 @@
package vminfo
import (
- "bytes"
- "encoding/gob"
+ "context"
"fmt"
- "slices"
"strings"
"syscall"
"github.com/google/syzkaller/pkg/flatrpc"
- "github.com/google/syzkaller/pkg/hash"
+ "github.com/google/syzkaller/pkg/fuzzer/queue"
"github.com/google/syzkaller/pkg/ipc"
"github.com/google/syzkaller/pkg/mgrconfig"
- "github.com/google/syzkaller/pkg/rpctype"
"github.com/google/syzkaller/prog"
"github.com/google/syzkaller/sys/targets"
)
@@ -26,44 +23,19 @@ import (
// and provides primitives for reading target VM files, checking if a file can be opened,
// executing test programs on the target VM, etc.
//
-// To make use of this type simpler, we collect all test programs that need
-// to be executed on the target into a batch, send them to the target VM once,
-// then get results and finish the check. This means that impl.syscallCheck
-// cannot e.g. issue one test program, look at results, and then issue another one.
-// This is achieved by starting each impl.syscallCheck in a separate goroutine
-// and then waiting when it will call ctx.execRaw to submit syscalls that
-// need to be executed on the target. Once all impl.syscallCheck submit
-// their test syscalls, we know that we collected all of them.
-// impl.syscallCheck may also decide to read a file on the target VM instead
-// of executing a test program, this also counts as submitting an empty test program.
-// This means that impl.syscallCheck cannot execute a test program after reading a file,
-// but can do these things in opposite order (since all files are known ahead of time).
-// These rules are bit brittle, but all of the checkers are unit-tested
-// and misuse (trying to execute 2 programs, etc) will either crash or hang in tests.
-// Theoretically we can execute more than 1 program per checker, but it will
-// require some special arrangements, e.g. see handling of PseudoSyscallDeps.
-//
// The external interface of this type contains only 2 methods:
// startCheck - starts impl.syscallCheck goroutines and collects all test programs in progs,
// finishCheck - accepts results of program execution, unblocks impl.syscallCheck goroutines,
//
// waits and returns results of checking.
type checkContext struct {
- impl checker
- cfg *mgrconfig.Config
- target *prog.Target
- sandbox ipc.EnvFlags
- // Checkers use requests channel to submit their test programs,
- // main goroutine will wait for exactly pendingRequests message on this channel
- // (similar to sync.WaitGroup, pendingRequests is incremented before starting
- // a goroutine that will send on requests).
- requests chan []*rpctype.ExecutionRequest
- pendingRequests int
- // Ready channel is closed after we've recevied results of execution of test
- // programs and file contents. After this results maps and fs are populated.
- ready chan bool
- results map[int64]rpctype.ExecutionResult
- fs filesystem
+ ctx context.Context
+ impl checker
+ cfg *mgrconfig.Config
+ target *prog.Target
+ sandbox ipc.EnvFlags
+ executor queue.Executor
+ fs filesystem
// Once checking of a syscall is finished, the result is sent to syscalls.
// The main goroutine will wait for exactly pendingSyscalls messages.
syscalls chan syscallResult
@@ -76,25 +48,26 @@ type syscallResult struct {
reason string
}
-func newCheckContext(cfg *mgrconfig.Config, impl checker) *checkContext {
+func newCheckContext(ctx context.Context, cfg *mgrconfig.Config, impl checker,
+ executor queue.Executor) *checkContext {
sandbox, err := ipc.SandboxToFlags(cfg.Sandbox)
if err != nil {
panic(fmt.Sprintf("failed to parse sandbox: %v", err))
}
return &checkContext{
+ ctx: ctx,
impl: impl,
cfg: cfg,
target: cfg.Target,
sandbox: sandbox,
- requests: make(chan []*rpctype.ExecutionRequest),
- results: make(map[int64]rpctype.ExecutionResult),
+ executor: executor,
syscalls: make(chan syscallResult),
features: make(chan featureResult, 100),
- ready: make(chan bool),
}
}
-func (ctx *checkContext) startCheck() []rpctype.ExecutionRequest {
+func (ctx *checkContext) start(fileInfos []flatrpc.FileInfo) {
+ ctx.fs = createVirtualFilesystem(fileInfos)
for _, id := range ctx.cfg.Syscalls {
call := ctx.target.Syscalls[id]
if call.Attrs.Disabled {
@@ -112,52 +85,24 @@ func (ctx *checkContext) startCheck() []rpctype.ExecutionRequest {
if ctx.cfg.SysTarget.HostFuzzer || ctx.target.OS == targets.TestOS {
syscallCheck = alwaysSupported
}
-
- var depsReason chan string
- deps := ctx.cfg.SysTarget.PseudoSyscallDeps[call.CallName]
- if len(deps) != 0 {
- ctx.pendingRequests++
- depsReason = make(chan string, 1)
- go func() {
- depsReason <- ctx.supportedSyscalls(deps)
- }()
- }
- ctx.pendingRequests++
go func() {
- reason := syscallCheck(ctx, call)
- ctx.waitForResults()
- if reason == "" && depsReason != nil {
- reason = <-depsReason
+ var reason string
+ deps := ctx.cfg.SysTarget.PseudoSyscallDeps[call.CallName]
+ if len(deps) != 0 {
+ reason = ctx.supportedSyscalls(deps)
+ }
+ // Only check the call if all its dependencies are satisfied.
+ if reason == "" {
+ reason = syscallCheck(ctx, call)
}
ctx.syscalls <- syscallResult{call, reason}
}()
}
- ctx.checkFeatures()
- var progs []rpctype.ExecutionRequest
- dedup := make(map[hash.Sig]int64)
- for i := 0; i < ctx.pendingRequests; i++ {
- for _, req := range <-ctx.requests {
- sig := hashReq(req)
- req.ID = dedup[sig]
- if req.ID != 0 {
- continue
- }
- req.ID = int64(len(dedup) + 1)
- dedup[sig] = req.ID
- progs = append(progs, *req)
- }
- }
- ctx.requests = nil
- return progs
+ ctx.startFeaturesCheck()
}
-func (ctx *checkContext) finishCheck(fileInfos []flatrpc.FileInfo, progs []rpctype.ExecutionResult,
- featureInfos []flatrpc.FeatureInfo) (map[*prog.Syscall]bool, map[*prog.Syscall]string, Features, error) {
- ctx.fs = createVirtualFilesystem(fileInfos)
- for _, res := range progs {
- ctx.results[res.ID] = res
- }
- close(ctx.ready)
+func (ctx *checkContext) wait(featureInfos []flatrpc.FeatureInfo) (
+ map[*prog.Syscall]bool, map[*prog.Syscall]string, Features, error) {
enabled := make(map[*prog.Syscall]bool)
disabled := make(map[*prog.Syscall]string)
for i := 0; i < ctx.pendingSyscalls; i++ {
@@ -292,16 +237,12 @@ func (ctx *checkContext) val(name string) uint64 {
}
func (ctx *checkContext) execRaw(calls []string, mode prog.DeserializeMode, root bool) *ipc.ProgInfo {
- if ctx.requests == nil {
- panic("only one test execution per checker is supported")
- }
sandbox := ctx.sandbox
if root {
sandbox = 0
}
- remain := calls
- var requests []*rpctype.ExecutionRequest
- for len(remain) != 0 {
+ info := &ipc.ProgInfo{}
+ for remain := calls; len(remain) != 0; {
// Don't put too many syscalls into a single program,
// it will have higher chances to time out.
ncalls := min(len(remain), prog.MaxCalls/2)
@@ -311,93 +252,39 @@ func (ctx *checkContext) execRaw(calls []string, mode prog.DeserializeMode, root
if err != nil {
panic(fmt.Sprintf("failed to deserialize: %v\n%v", err, progStr))
}
- data, err := p.SerializeForExec()
- if err != nil {
- panic(fmt.Sprintf("failed to serialize test program: %v\n%s", err, progStr))
- }
- requests = append(requests, &rpctype.ExecutionRequest{
- ProgData: slices.Clone(data), // clone to reduce memory usage
- ExecOpts: ipc.ExecOpts{
+ // TODO: request that the program must be re-executed on the first failure.
+ req := &queue.Request{
+ Prog: p,
+ ExecOpts: &ipc.ExecOpts{
EnvFlags: sandbox,
ExecFlags: 0,
SandboxArg: ctx.cfg.SandboxArg,
},
- })
- }
- ctx.requests <- requests
- <-ctx.ready
- info := &ipc.ProgInfo{}
- for _, req := range requests {
- res, ok := ctx.results[req.ID]
- if !ok {
- panic(fmt.Sprintf("no result for request %v", req.ID))
}
- if len(res.Info.Calls) == 0 {
- panic(fmt.Sprintf("result for request %v has no calls", req.ID))
+ ctx.executor.Submit(req)
+ res := req.Wait(ctx.ctx)
+ if res.Status == queue.Success {
+ info.Calls = append(info.Calls, res.Info.Calls...)
+ } else if res.Status == queue.Crashed {
+ // Pretend these calls were not executed.
+ info.Calls = append(info.Calls, ipc.EmptyProgInfo(ncalls).Calls...)
+ } else {
+ // The program must have been either executed or not due to a crash.
+ panic(fmt.Sprintf("got unexpected execution status (%d) for the prog %s",
+ res.Status, progStr))
}
- info.Calls = append(info.Calls, res.Info.Calls...)
}
if len(info.Calls) != len(calls) {
- panic(fmt.Sprintf("got only %v results for program %v with %v calls:\n%s",
- len(info.Calls), requests[0].ID, len(calls), strings.Join(calls, "\n")))
+ panic(fmt.Sprintf("got %v != %v results for program:\n%s",
+ len(info.Calls), len(calls), strings.Join(calls, "\n")))
}
return info
}
-func (ctx *checkContext) execProg(p *prog.Prog, envFlags ipc.EnvFlags,
- execFlags ipc.ExecFlags) rpctype.ExecutionResult {
- if ctx.requests == nil {
- panic("only one test execution per checker is supported")
- }
- data, err := p.SerializeForExec()
- if err != nil {
- panic(fmt.Sprintf("failed to serialize test program: %v\n%s", err, p.Serialize()))
- }
- req := &rpctype.ExecutionRequest{
- ProgData: data,
- ReturnOutput: true,
- ReturnError: true,
- ExecOpts: ipc.ExecOpts{
- EnvFlags: envFlags,
- ExecFlags: execFlags,
- SandboxArg: ctx.cfg.SandboxArg,
- },
- }
- ctx.requests <- []*rpctype.ExecutionRequest{req}
- <-ctx.ready
- res, ok := ctx.results[req.ID]
- if !ok {
- panic(fmt.Sprintf("no result for request %v", req.ID))
- }
- return res
-}
-
func (ctx *checkContext) readFile(name string) ([]byte, error) {
- ctx.waitForResults()
return ctx.fs.ReadFile(name)
}
-func (ctx *checkContext) waitForResults() {
- // If syscallCheck has already executed a program, then it's also waited for ctx.ready.
- // If it hasn't, then we need to unblock the loop in startCheck by sending a nil request.
- if ctx.requests == nil {
- return
- }
- ctx.requests <- nil
- <-ctx.ready
- if ctx.fs == nil {
- panic("filesystem should be initialized by now")
- }
-}
-
-func hashReq(req *rpctype.ExecutionRequest) hash.Sig {
- buf := new(bytes.Buffer)
- if err := gob.NewEncoder(buf).Encode(req.ExecOpts); err != nil {
- panic(err)
- }
- return hash.Hash(req.ProgData, buf.Bytes())
-}
-
func alwaysSupported(ctx *checkContext, call *prog.Syscall) string {
return ""
}
diff --git a/pkg/vminfo/vminfo.go b/pkg/vminfo/vminfo.go
index 8c5af606e..4f6ab7425 100644
--- a/pkg/vminfo/vminfo.go
+++ b/pkg/vminfo/vminfo.go
@@ -14,6 +14,7 @@ package vminfo
import (
"bytes"
+ "context"
"errors"
"fmt"
"io"
@@ -23,14 +24,15 @@ import (
"github.com/google/syzkaller/pkg/cover"
"github.com/google/syzkaller/pkg/flatrpc"
+ "github.com/google/syzkaller/pkg/fuzzer/queue"
"github.com/google/syzkaller/pkg/mgrconfig"
- "github.com/google/syzkaller/pkg/rpctype"
"github.com/google/syzkaller/prog"
"github.com/google/syzkaller/sys/targets"
)
type Checker struct {
checker
+ source queue.Source
checkContext *checkContext
}
@@ -46,9 +48,12 @@ func New(cfg *mgrconfig.Config) *Checker {
default:
impl = new(stub)
}
+ ctx := context.Background()
+ executor := queue.Plain()
return &Checker{
checker: impl,
- checkContext: newCheckContext(cfg, impl),
+ source: queue.Deduplicate(ctx, executor),
+ checkContext: newCheckContext(ctx, cfg, impl, executor),
}
}
@@ -77,17 +82,25 @@ func (checker *Checker) MachineInfo(fileInfos []flatrpc.FileInfo) ([]cover.Kerne
return modules, info.Bytes(), nil
}
-func (checker *Checker) StartCheck() ([]string, []rpctype.ExecutionRequest) {
- return checker.checkFiles(), checker.checkContext.startCheck()
+func (checker *Checker) CheckFiles() []string {
+ return checker.checkFiles()
}
-func (checker *Checker) FinishCheck(files []flatrpc.FileInfo, progs []rpctype.ExecutionResult,
- featureInfos []flatrpc.FeatureInfo) (map[*prog.Syscall]bool, map[*prog.Syscall]string, Features, error) {
+func (checker *Checker) Run(files []flatrpc.FileInfo, featureInfos []flatrpc.FeatureInfo) (
+ map[*prog.Syscall]bool, map[*prog.Syscall]string, Features, error) {
ctx := checker.checkContext
checker.checkContext = nil
- return ctx.finishCheck(files, progs, featureInfos)
+ ctx.start(files)
+ return ctx.wait(featureInfos)
}
+// Implementation of the queue.Source interface.
+func (checker *Checker) Next() *queue.Request {
+ return checker.source.Next()
+}
+
+var _ queue.Source = &Checker{}
+
type machineInfoFunc func(files filesystem, w io.Writer) (string, error)
type checker interface {
diff --git a/pkg/vminfo/vminfo_test.go b/pkg/vminfo/vminfo_test.go
index 2f72131a5..535782a64 100644
--- a/pkg/vminfo/vminfo_test.go
+++ b/pkg/vminfo/vminfo_test.go
@@ -7,12 +7,13 @@ import (
"runtime"
"strings"
"testing"
+ "time"
"github.com/google/syzkaller/pkg/flatrpc"
+ "github.com/google/syzkaller/pkg/fuzzer/queue"
"github.com/google/syzkaller/pkg/host"
"github.com/google/syzkaller/pkg/ipc"
"github.com/google/syzkaller/pkg/mgrconfig"
- "github.com/google/syzkaller/pkg/rpctype"
"github.com/google/syzkaller/prog"
"github.com/google/syzkaller/sys/targets"
)
@@ -55,9 +56,10 @@ func TestSyscalls(t *testing.T) {
t.Parallel()
cfg := testConfig(t, target.OS, target.Arch)
checker := New(cfg)
- _, checkProgs := checker.StartCheck()
- results, featureInfos := createSuccessfulResults(t, cfg.Target, checkProgs)
- enabled, disabled, _, err := checker.FinishCheck(nil, results, featureInfos)
+ stop := make(chan struct{})
+ go createSuccessfulResults(checker, stop)
+ enabled, disabled, _, err := checker.Run(nil, allFeatures())
+ close(stop)
if err != nil {
t.Fatal(err)
}
@@ -72,19 +74,38 @@ func TestSyscalls(t *testing.T) {
}
}
-func createSuccessfulResults(t *testing.T, target *prog.Target,
- progs []rpctype.ExecutionRequest) ([]rpctype.ExecutionResult, []flatrpc.FeatureInfo) {
- var results []rpctype.ExecutionResult
- for _, req := range progs {
- p, err := target.DeserializeExec(req.ProgData, nil)
- if err != nil {
- t.Fatal(err)
+func allFeatures() []flatrpc.FeatureInfo {
+ var features []flatrpc.FeatureInfo
+ for feat := range flatrpc.EnumNamesFeature {
+ features = append(features, flatrpc.FeatureInfo{
+ Id: feat,
+ })
+ }
+ return features
+}
+
+func createSuccessfulResults(source queue.Source, stop chan struct{}) {
+ var count int
+ for {
+ select {
+ case <-stop:
+ return
+ case <-time.After(time.Millisecond):
+ }
+ req := source.Next()
+ if req == nil {
+ continue
}
- res := rpctype.ExecutionResult{
- ID: req.ID,
+ count++
+ if count > 1000 {
+ // This is just a sanity check that we don't do something stupid accidentally.
+ // If it grows above the limit intentionally, the limit can be increased.
+ // Currently we have 641 (when we failed to properly dedup syscall tests, it was 4349).
+ panic("too many test programs")
}
- for range p.Calls {
- res.Info.Calls = append(res.Info.Calls, ipc.CallInfo{
+ info := &ipc.ProgInfo{}
+ for range req.Prog.Calls {
+ info.Calls = append(info.Calls, ipc.CallInfo{
Cover: []uint32{1},
Signal: []uint32{1},
Comps: map[uint64]map[uint64]bool{
@@ -92,15 +113,11 @@ func createSuccessfulResults(t *testing.T, target *prog.Target,
},
})
}
- results = append(results, res)
- }
- var features []flatrpc.FeatureInfo
- for feat := range flatrpc.EnumNamesFeature {
- features = append(features, flatrpc.FeatureInfo{
- Id: feat,
+ req.Done(&queue.Result{
+ Status: queue.Success,
+ Info: info,
})
}
- return results, features
}
func hostChecker(t *testing.T) (*Checker, []flatrpc.FileInfo) {
diff --git a/syz-manager/manager.go b/syz-manager/manager.go
index 864a3676f..648020482 100644
--- a/syz-manager/manager.go
+++ b/syz-manager/manager.go
@@ -74,7 +74,7 @@ type Manager struct {
mu sync.Mutex
fuzzer atomic.Pointer[fuzzer.Fuzzer]
- execSource atomic.Value // queue.Source
+ execSource queue.DynamicSource
phase int
targetEnabledSyscalls map[*prog.Syscall]bool
@@ -194,7 +194,7 @@ func RunManager(cfg *mgrconfig.Config) {
go mgr.corpusInputHandler(corpusUpdates)
// Create RPC server for fuzzers.
- mgr.serv, err = startRPCServer(mgr)
+ mgr.serv, err = startRPCServer(mgr, &mgr.execSource)
if err != nil {
log.Fatalf("failed to create rpc server: %v", err)
}
@@ -1376,7 +1376,7 @@ func (mgr *Manager) machineChecked(features flatrpc.Feature, enabledSyscalls map
},
}, rnd, mgr.target)
mgr.fuzzer.Store(fuzzerObj)
- mgr.execSource.Store(queue.Retry(fuzzerObj))
+ mgr.execSource.Store(fuzzerObj)
mgr.loadCorpus()
mgr.firstConnect.Store(time.Now().Unix())
@@ -1398,11 +1398,6 @@ func (mgr *Manager) corpusMinimization() {
}
}
-// We need this method since we're not supposed to access Manager fields from RPCServer.
-func (mgr *Manager) getExecSource() queue.Source {
- return mgr.execSource.Load().(queue.Source)
-}
-
func (mgr *Manager) fuzzerSignalRotation() {
const (
rotateSignals = 1000
diff --git a/syz-manager/rpc.go b/syz-manager/rpc.go
index 4e95849b0..4fda07f07 100644
--- a/syz-manager/rpc.go
+++ b/syz-manager/rpc.go
@@ -36,13 +36,8 @@ type RPCServer struct {
infoDone bool
checkDone atomic.Bool
- checkFiles []string
- checkFilesInfo []flatrpc.FileInfo
- checkFeatureInfo []flatrpc.FeatureInfo
- checkProgs []rpctype.ExecutionRequest
- checkResults []rpctype.ExecutionResult
- needCheckResults int
checkFailures int
+ checkerSource *queue.DynamicSource
enabledFeatures flatrpc.Feature
setupFeatures flatrpc.Feature
modules []cover.KernelModule
@@ -50,8 +45,9 @@ type RPCServer struct {
execCoverFilter map[uint32]uint32
coverFilter map[uint32]uint32
- mu sync.Mutex
- runners sync.Map // Instead of map[string]*Runner.
+ mu sync.Mutex
+ runners sync.Map // Instead of map[string]*Runner.
+ execSource queue.Source
statExecs *stats.Val
statExecRetries *stats.Val
@@ -98,16 +94,18 @@ type BugFrames struct {
type RPCManagerView interface {
currentBugFrames() BugFrames
machineChecked(features flatrpc.Feature, enabledSyscalls map[*prog.Syscall]bool)
- getExecSource() queue.Source
}
-func startRPCServer(mgr *Manager) (*RPCServer, error) {
+func startRPCServer(mgr *Manager, source queue.Source) (*RPCServer, error) {
+ var checkSource queue.DynamicSource
serv := &RPCServer{
- mgr: mgr,
- cfg: mgr.cfg,
- target: mgr.target,
- checker: vminfo.New(mgr.cfg),
- statExecs: mgr.statExecs,
+ mgr: mgr,
+ cfg: mgr.cfg,
+ target: mgr.target,
+ checker: vminfo.New(mgr.cfg),
+ checkerSource: &checkSource,
+ execSource: queue.Retry(queue.Order(&checkSource, source)),
+ statExecs: mgr.statExecs,
statExecRetries: stats.Create("exec retries",
"Number of times a test program was restarted because the first run failed",
stats.Rate{}, stats.Graph("executor")),
@@ -131,8 +129,8 @@ func startRPCServer(mgr *Manager) (*RPCServer, error) {
if err != nil {
return nil, err
}
- serv.checkFiles, serv.checkProgs = serv.checker.StartCheck()
- serv.needCheckResults = len(serv.checkProgs)
+ checkSource.Store(serv.checker)
+
log.Logf(0, "serving rpc on tcp://%v", s.Addr())
serv.port = s.Addr().(*net.TCPAddr).Port
serv.server = s
@@ -155,7 +153,7 @@ func (serv *RPCServer) Connect(a *rpctype.ConnectArgs, r *rpctype.ConnectRes) er
if serv.checkDone.Load() {
r.Features = serv.setupFeatures
} else {
- r.ReadFiles = append(r.ReadFiles, serv.checkFiles...)
+ r.ReadFiles = append(r.ReadFiles, serv.checker.CheckFiles()...)
r.ReadGlobs = serv.target.RequiredGlobs()
r.Features = flatrpc.AllFeatures
}
@@ -207,8 +205,10 @@ func (serv *RPCServer) Check(a *rpctype.CheckArgs, r *rpctype.CheckRes) error {
if !serv.infoDone {
serv.infoDone = true
- serv.checkFeatureInfo = a.Features
- serv.checkFilesInfo = a.Files
+
+ // Now execute check programs.
+ go serv.runCheck(a.Files, a.Features)
+
serv.modules = modules
serv.target.UpdateGlobs(a.Globs)
serv.canonicalModules = cover.NewCanonicalizer(modules, serv.cfg.Cover)
@@ -237,11 +237,19 @@ func (serv *RPCServer) Check(a *rpctype.CheckArgs, r *rpctype.CheckRes) error {
return nil
}
-func (serv *RPCServer) finishCheck() error {
+func (serv *RPCServer) runCheck(checkFilesInfo []flatrpc.FileInfo, checkFeatureInfo []flatrpc.FeatureInfo) {
+ if err := serv.finishCheck(checkFilesInfo, checkFeatureInfo); err != nil {
+ log.Fatalf("check failed: %v", err)
+ }
+ serv.checkerSource.Store(nil) // There's no sense in calling checker's Next() each time.
+ serv.checkDone.Store(true)
+}
+
+func (serv *RPCServer) finishCheck(checkFilesInfo []flatrpc.FileInfo, checkFeatureInfo []flatrpc.FeatureInfo) error {
// Note: need to print disbled syscalls before failing due to an error.
// This helps to debug "all system calls are disabled".
- enabledCalls, disabledCalls, features, checkErr := serv.checker.FinishCheck(
- serv.checkFilesInfo, serv.checkResults, serv.checkFeatureInfo)
+
+ enabledCalls, disabledCalls, features, checkErr := serv.checker.Run(checkFilesInfo, checkFeatureInfo)
enabledCalls, transitivelyDisabled := serv.target.TransitivelyEnabledCalls(enabledCalls)
buf := new(bytes.Buffer)
if len(serv.cfg.EnabledSyscalls) != 0 || log.V(1) {
@@ -265,7 +273,7 @@ func (serv *RPCServer) finishCheck() error {
}
}
hasFileErrors := false
- for _, file := range serv.checkFilesInfo {
+ for _, file := range checkFilesInfo {
if file.Error == "" {
continue
}
@@ -334,42 +342,15 @@ func (serv *RPCServer) ExchangeInfo(a *rpctype.ExchangeInfoRequest, r *rpctype.E
if runner == nil {
return nil
}
-
- if !serv.checkDone.Load() {
- serv.mu.Lock()
- if !serv.checkDone.Load() {
- serv.checkResults = append(serv.checkResults, a.Results...)
- if len(serv.checkResults) < serv.needCheckResults {
- numRequests := min(len(serv.checkProgs), a.NeedProgs)
- r.Requests = serv.checkProgs[:numRequests]
- serv.checkProgs = serv.checkProgs[numRequests:]
- } else {
- if err := serv.finishCheck(); err != nil {
- log.Fatalf("check failed: %v", err)
- }
- serv.checkProgs = nil
- serv.checkResults = nil
- serv.checkFiles = nil
- serv.checkFilesInfo = nil
- serv.checkFeatureInfo = nil
- serv.checkDone.Store(true)
- }
- }
- serv.mu.Unlock()
- return nil
- }
-
- source := serv.mgr.getExecSource()
- if source == nil {
- // ExchangeInfo calls follow MachineCheck, so the fuzzer must have been initialized.
- panic("exchange info call with nil fuzzer")
- }
-
// First query new inputs and only then post results.
// It should foster a more even distribution of executions
// across all VMs.
for len(r.Requests) < a.NeedProgs {
- inp := source.Next()
+ inp := serv.execSource.Next()
+ if inp == nil {
+ // It's unlikely that subsequent Next() calls will yield something.
+ break
+ }
if req, ok := serv.newRequest(runner, inp); ok {
r.Requests = append(r.Requests, req)
} else {
@@ -460,9 +441,6 @@ func (serv *RPCServer) shutdownInstance(name string, crashed bool) []byte {
serv.mu.Lock()
defer serv.mu.Unlock()
- if !serv.checkDone.Load() {
- log.Fatalf("VM is exited while checking is not done")
- }
for _, req := range oldRequests {
if crashed && req.try >= 0 {
req.req.Done(&queue.Result{Status: queue.Crashed})
@@ -531,7 +509,11 @@ func (serv *RPCServer) doneRequest(runner *Runner, resp rpctype.ExecutionResult)
}
info.Extra.Cover = runner.instModules.Canonicalize(info.Extra.Cover)
info.Extra.Signal = runner.instModules.Canonicalize(info.Extra.Signal)
- req.req.Done(&queue.Result{Info: info})
+ req.req.Done(&queue.Result{
+ Info: info,
+ Output: resp.Output,
+ Error: resp.Error,
+ })
}
func (serv *RPCServer) newRequest(runner *Runner, req *queue.Request) (rpctype.ExecutionRequest, bool) {
@@ -560,14 +542,23 @@ func (serv *RPCServer) newRequest(runner *Runner, req *queue.Request) (rpctype.E
}
}
runner.mu.Unlock()
+
+ var execOpts ipc.ExecOpts
+ if req.ExecOpts != nil {
+ execOpts = *req.ExecOpts
+ } else {
+ execOpts = serv.createExecOpts(req)
+ }
return rpctype.ExecutionRequest{
ID: id,
ProgData: progData,
- ExecOpts: serv.createExecOpts(req),
+ ExecOpts: execOpts,
NewSignal: req.NeedSignal == queue.NewSignal,
SignalFilter: signalFilter,
SignalFilterCall: req.SignalFilterCall,
ResetState: serv.cfg.Experimental.ResetAccState,
+ ReturnError: req.ReturnError,
+ ReturnOutput: req.ReturnOutput,
}, true
}
diff --git a/tools/syz-execprog/execprog.go b/tools/syz-execprog/execprog.go
index 518e7511f..b246c4263 100644
--- a/tools/syz-execprog/execprog.go
+++ b/tools/syz-execprog/execprog.go
@@ -7,6 +7,7 @@ package main
import (
"bytes"
+ "context"
"flag"
"fmt"
"math/rand"
@@ -22,13 +23,13 @@ import (
"github.com/google/syzkaller/pkg/csource"
"github.com/google/syzkaller/pkg/db"
"github.com/google/syzkaller/pkg/flatrpc"
+ "github.com/google/syzkaller/pkg/fuzzer/queue"
"github.com/google/syzkaller/pkg/host"
"github.com/google/syzkaller/pkg/ipc"
"github.com/google/syzkaller/pkg/ipc/ipcconfig"
"github.com/google/syzkaller/pkg/log"
"github.com/google/syzkaller/pkg/mgrconfig"
"github.com/google/syzkaller/pkg/osutil"
- "github.com/google/syzkaller/pkg/rpctype"
"github.com/google/syzkaller/pkg/tool"
"github.com/google/syzkaller/pkg/vminfo"
"github.com/google/syzkaller/prog"
@@ -428,36 +429,17 @@ func createConfig(target *prog.Target, featuresFlags csource.Features, syscalls
},
}
checker := vminfo.New(cfg)
- files, requests := checker.StartCheck()
- fileInfos := host.ReadFiles(files)
+ fileInfos := host.ReadFiles(checker.RequiredFiles())
featureInfos, err := host.SetupFeatures(target, config.Executor, flatrpc.AllFeatures, featuresFlags)
if err != nil {
log.Fatal(err)
}
- env, err := ipc.MakeEnv(config, 0)
- if err != nil {
- log.Fatalf("failed to create ipc env: %v", err)
- }
- defer env.Close()
- var results []rpctype.ExecutionResult
- for _, req := range requests {
- output, info, hanged, err := env.ExecProg(&req.ExecOpts, req.ProgData)
- res := rpctype.ExecutionResult{
- ID: req.ID,
- Output: output,
- }
- if info != nil {
- res.Info = *info
- }
- if err != nil {
- res.Error = err.Error()
- }
- if hanged && err == nil {
- res.Error = "hanged"
- }
- results = append(results, res)
- }
- enabledSyscalls, disabledSyscalls, features, err := checker.FinishCheck(fileInfos, results, featureInfos)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ go checkerExecutor(ctx, checker, config)
+
+ enabledSyscalls, disabledSyscalls, features, err := checker.Run(fileInfos, featureInfos)
if err != nil {
log.Fatal(err)
}
@@ -476,3 +458,41 @@ func createConfig(target *prog.Target, featuresFlags csource.Features, syscalls
execOpts.EnvFlags |= ipc.FeaturesToFlags(features.Enabled(), featuresFlags)
return config, execOpts, enabledSyscalls, features.Enabled()
}
+
+func checkerExecutor(ctx context.Context, source queue.Source, config *ipc.Config) {
+ env, err := ipc.MakeEnv(config, 0)
+ if err != nil {
+ log.Fatalf("failed to create ipc env: %v", err)
+ }
+ defer env.Close()
+ for {
+ req := source.Next()
+ if req == nil {
+ select {
+ case <-time.After(time.Second / 100):
+ case <-ctx.Done():
+ return
+ }
+ continue
+ }
+ progData, err := req.Prog.SerializeForExec()
+ if err != nil {
+ log.Fatalf("failed to serialize %s: %v", req.Prog.Serialize(), err)
+ }
+ output, info, hanged, err := env.ExecProg(req.ExecOpts, progData)
+ res := &queue.Result{
+ Status: queue.Success,
+ Info: info,
+ Output: output,
+ }
+ if err != nil {
+ res.Status = queue.ExecFailure
+ res.Error = err.Error()
+ }
+ if hanged && err == nil {
+ res.Status = queue.ExecFailure
+ res.Error = "hanged"
+ }
+ req.Done(res)
+ }
+}