diff options
| -rw-r--r-- | pkg/fuzzer/queue/queue.go | 121 | ||||
| -rw-r--r-- | pkg/ipc/ipc.go | 19 | ||||
| -rw-r--r-- | pkg/vminfo/features.go | 22 | ||||
| -rw-r--r-- | pkg/vminfo/linux_test.go | 25 | ||||
| -rw-r--r-- | pkg/vminfo/syscalls.go | 203 | ||||
| -rw-r--r-- | pkg/vminfo/vminfo.go | 27 | ||||
| -rw-r--r-- | pkg/vminfo/vminfo_test.go | 61 | ||||
| -rw-r--r-- | syz-manager/manager.go | 11 | ||||
| -rw-r--r-- | syz-manager/rpc.go | 113 | ||||
| -rw-r--r-- | tools/syz-execprog/execprog.go | 74 |
10 files changed, 377 insertions, 299 deletions
diff --git a/pkg/fuzzer/queue/queue.go b/pkg/fuzzer/queue/queue.go index cb5aa134b..0c1735243 100644 --- a/pkg/fuzzer/queue/queue.go +++ b/pkg/fuzzer/queue/queue.go @@ -4,10 +4,14 @@ package queue import ( + "bytes" "context" + "encoding/gob" + "fmt" "sync" "sync/atomic" + "github.com/google/syzkaller/pkg/hash" "github.com/google/syzkaller/pkg/ipc" "github.com/google/syzkaller/pkg/signal" "github.com/google/syzkaller/pkg/stats" @@ -19,11 +23,16 @@ type Request struct { NeedSignal SignalType NeedCover bool NeedHints bool + ExecOpts *ipc.ExecOpts + // If specified, the resulting signal for call SignalFilterCall // will include subset of it even if it's not new. SignalFilter signal.Signal SignalFilterCall int + ReturnError bool + ReturnOutput bool + // This stat will be incremented on request completion. Stat *stats.Val @@ -78,6 +87,22 @@ func (r *Request) Wait(ctx context.Context) *Result { } } +func (r *Request) hash() hash.Sig { + buf := new(bytes.Buffer) + if r.ExecOpts != nil { + if err := gob.NewEncoder(buf).Encode(r.ExecOpts); err != nil { + panic(err) + } + } + return hash.Hash( + []byte(fmt.Sprint(r.NeedSignal)), + []byte(fmt.Sprint(r.NeedCover)), + []byte(fmt.Sprint(r.NeedHints)), + r.Prog.Serialize(), + buf.Bytes(), + ) +} + func (r *Request) initChannel() { r.mu.Lock() if r.done == nil { @@ -96,7 +121,17 @@ const ( type Result struct { Info *ipc.ProgInfo + Output []byte Status Status + Error string // More details in case of ExecFailure. +} + +func (r *Result) clone() *Result { + ret := *r + if ret.Info != nil { + ret.Info = ret.Info.Clone() + } + return &ret } func (r *Result) Stop() bool { @@ -293,3 +328,89 @@ func (pq *PriorityQueue) Next() *Request { defer pq.mu.Unlock() return pq.ops.Pop() } + +type DynamicSource struct { + value atomic.Pointer[wrapSource] +} + +type wrapSource struct { + source Source +} + +func (ds *DynamicSource) Store(source Source) { + ds.value.Store(&wrapSource{source}) +} + +func (ds *DynamicSource) Next() *Request { + val := ds.value.Load() + if val == nil || val.source == nil { + return nil + } + return val.source.Next() +} + +// Deduplicator() keeps track of the previously run requests to avoid re-running them. +type Deduplicator struct { + mu sync.Mutex + ctx context.Context + source Source + mm map[hash.Sig]*duplicateState +} + +type duplicateState struct { + res *Result + queued []*Request // duplicate requests waiting for the result. +} + +func Deduplicate(ctx context.Context, source Source) Source { + return &Deduplicator{ + ctx: ctx, + source: source, + mm: map[hash.Sig]*duplicateState{}, + } +} + +func (d *Deduplicator) Next() *Request { + for { + req := d.source.Next() + if req == nil { + return nil + } + hash := req.hash() + d.mu.Lock() + entry, ok := d.mm[hash] + if !ok { + d.mm[hash] = &duplicateState{} + } else if entry.res == nil { + // There's no result yet, put the request to the queue. + entry.queued = append(entry.queued, req) + } else { + // We already know the result. + req.Done(entry.res.clone()) + } + d.mu.Unlock() + if !ok { + // This is the first time we see such a request. + req.OnDone(d.onDone) + return req + } + } +} + +func (d *Deduplicator) onDone(req *Request, res *Result) bool { + hash := req.hash() + clonedRes := res.clone() + + d.mu.Lock() + entry := d.mm[hash] + queued := entry.queued + entry.queued = nil + entry.res = clonedRes + d.mu.Unlock() + + // Broadcast the result. + for _, waitingReq := range queued { + waitingReq.Done(res.clone()) + } + return true +} diff --git a/pkg/ipc/ipc.go b/pkg/ipc/ipc.go index 826d3068d..d87cb8b0e 100644 --- a/pkg/ipc/ipc.go +++ b/pkg/ipc/ipc.go @@ -6,6 +6,7 @@ package ipc import ( "fmt" "io" + "maps" "os" "os/exec" "path/filepath" @@ -98,6 +99,14 @@ type CallInfo struct { Errno int // call errno (0 if the call was successful) } +func (ci *CallInfo) Clone() CallInfo { + ret := *ci + ret.Signal = slices.Clone(ret.Signal) + ret.Cover = slices.Clone(ret.Cover) + ret.Comps = maps.Clone(ret.Comps) + return ret +} + type ProgInfo struct { Calls []CallInfo Extra CallInfo // stores Signal and Cover collected from background threads @@ -105,6 +114,16 @@ type ProgInfo struct { Freshness int // number of programs executed in the same process before this one } +func (pi *ProgInfo) Clone() *ProgInfo { + ret := *pi + ret.Extra = ret.Extra.Clone() + ret.Calls = slices.Clone(ret.Calls) + for i, call := range ret.Calls { + ret.Calls[i] = call.Clone() + } + return &ret +} + func EmptyProgInfo(calls int) *ProgInfo { info := &ProgInfo{Calls: make([]CallInfo, calls)} for i := range info.Calls { diff --git a/pkg/vminfo/features.go b/pkg/vminfo/features.go index 2ba76a255..ab71e1168 100644 --- a/pkg/vminfo/features.go +++ b/pkg/vminfo/features.go @@ -8,8 +8,8 @@ import ( "strings" "github.com/google/syzkaller/pkg/flatrpc" + "github.com/google/syzkaller/pkg/fuzzer/queue" "github.com/google/syzkaller/pkg/ipc" - "github.com/google/syzkaller/pkg/rpctype" "github.com/google/syzkaller/prog" ) @@ -46,14 +46,24 @@ type featureResult struct { reason string } -func (ctx *checkContext) checkFeatures() { +func (ctx *checkContext) startFeaturesCheck() { testProg := ctx.target.DataMmapProg() for feat := range flatrpc.EnumNamesFeature { feat := feat - ctx.pendingRequests++ go func() { envFlags, execFlags := ctx.featureToFlags(feat) - res := ctx.execProg(testProg, envFlags, execFlags) + req := &queue.Request{ + Prog: testProg, + ReturnOutput: true, + ReturnError: true, + ExecOpts: &ipc.ExecOpts{ + EnvFlags: envFlags, + ExecFlags: execFlags, + SandboxArg: ctx.cfg.SandboxArg, + }, + } + ctx.executor.Submit(req) + res := req.Wait(ctx.ctx) reason := ctx.featureSucceeded(feat, testProg, res) ctx.features <- featureResult{feat, reason} }() @@ -161,8 +171,8 @@ func (ctx *checkContext) featureToFlags(feat flatrpc.Feature) (ipc.EnvFlags, ipc // This generally checks that just all syscalls were executed and succeed, // for coverage features we also check that we got actual coverage. func (ctx *checkContext) featureSucceeded(feat flatrpc.Feature, testProg *prog.Prog, - res rpctype.ExecutionResult) string { - if res.Error != "" { + res *queue.Result) string { + if res.Status != queue.Success { if len(res.Output) != 0 { return string(res.Output) } diff --git a/pkg/vminfo/linux_test.go b/pkg/vminfo/linux_test.go index e9be133c1..2a09a66cf 100644 --- a/pkg/vminfo/linux_test.go +++ b/pkg/vminfo/linux_test.go @@ -21,14 +21,6 @@ import ( func TestLinuxSyscalls(t *testing.T) { cfg := testConfig(t, targets.Linux, targets.AMD64) checker := New(cfg) - _, checkProgs := checker.StartCheck() - t.Logf("got %v test programs", len(checkProgs)) - if len(checkProgs) > 1000 { - // This is just a sanity check that we don't do something stupid accidentally. - // If it grows above the limit intentionally, the limit can be increased. - // Currently we have 641 (when we failed to properly dedup syscall tests, it was 4349). - t.Fatal("too many test programs") - } filesystems := []string{ // Without sysfs, the checks would also disable mount(). "", "sysfs", "ext4", "binder", "", @@ -45,8 +37,10 @@ func TestLinuxSyscalls(t *testing.T) { Data: []byte(strings.Join(filesystems, "\nnodev\t")), }, } - results, featureInfos := createSuccessfulResults(t, cfg.Target, checkProgs) - enabled, disabled, features, err := checker.FinishCheck(files, results, featureInfos) + stop := make(chan struct{}) + go createSuccessfulResults(checker, stop) + enabled, disabled, features, err := checker.Run(files, allFeatures()) + close(stop) if err != nil { t.Fatal(err) } @@ -70,6 +64,17 @@ func TestLinuxSyscalls(t *testing.T) { } t.Errorf("disabled call %v: %v", call.Name, reason) } + for _, id := range cfg.Syscalls { + call := cfg.Target.Syscalls[id] + if enabled[call] && disabled[call] != "" { + t.Fatalf("%s is both enabled and disabled", call.Name) + } + expected := !expectDisabled[call.Name] + got := enabled[call] + if expected != got { + t.Errorf("%s: expected %t, got %t", call.Name, expected, got) + } + } expectEnabled := len(cfg.Syscalls) - len(expectDisabled) if len(enabled) != expectEnabled { t.Errorf("enabled only %v calls out of %v", len(enabled), expectEnabled) diff --git a/pkg/vminfo/syscalls.go b/pkg/vminfo/syscalls.go index 78cc973c8..863c995bd 100644 --- a/pkg/vminfo/syscalls.go +++ b/pkg/vminfo/syscalls.go @@ -4,18 +4,15 @@ package vminfo import ( - "bytes" - "encoding/gob" + "context" "fmt" - "slices" "strings" "syscall" "github.com/google/syzkaller/pkg/flatrpc" - "github.com/google/syzkaller/pkg/hash" + "github.com/google/syzkaller/pkg/fuzzer/queue" "github.com/google/syzkaller/pkg/ipc" "github.com/google/syzkaller/pkg/mgrconfig" - "github.com/google/syzkaller/pkg/rpctype" "github.com/google/syzkaller/prog" "github.com/google/syzkaller/sys/targets" ) @@ -26,44 +23,19 @@ import ( // and provides primitives for reading target VM files, checking if a file can be opened, // executing test programs on the target VM, etc. // -// To make use of this type simpler, we collect all test programs that need -// to be executed on the target into a batch, send them to the target VM once, -// then get results and finish the check. This means that impl.syscallCheck -// cannot e.g. issue one test program, look at results, and then issue another one. -// This is achieved by starting each impl.syscallCheck in a separate goroutine -// and then waiting when it will call ctx.execRaw to submit syscalls that -// need to be executed on the target. Once all impl.syscallCheck submit -// their test syscalls, we know that we collected all of them. -// impl.syscallCheck may also decide to read a file on the target VM instead -// of executing a test program, this also counts as submitting an empty test program. -// This means that impl.syscallCheck cannot execute a test program after reading a file, -// but can do these things in opposite order (since all files are known ahead of time). -// These rules are bit brittle, but all of the checkers are unit-tested -// and misuse (trying to execute 2 programs, etc) will either crash or hang in tests. -// Theoretically we can execute more than 1 program per checker, but it will -// require some special arrangements, e.g. see handling of PseudoSyscallDeps. -// // The external interface of this type contains only 2 methods: // startCheck - starts impl.syscallCheck goroutines and collects all test programs in progs, // finishCheck - accepts results of program execution, unblocks impl.syscallCheck goroutines, // // waits and returns results of checking. type checkContext struct { - impl checker - cfg *mgrconfig.Config - target *prog.Target - sandbox ipc.EnvFlags - // Checkers use requests channel to submit their test programs, - // main goroutine will wait for exactly pendingRequests message on this channel - // (similar to sync.WaitGroup, pendingRequests is incremented before starting - // a goroutine that will send on requests). - requests chan []*rpctype.ExecutionRequest - pendingRequests int - // Ready channel is closed after we've recevied results of execution of test - // programs and file contents. After this results maps and fs are populated. - ready chan bool - results map[int64]rpctype.ExecutionResult - fs filesystem + ctx context.Context + impl checker + cfg *mgrconfig.Config + target *prog.Target + sandbox ipc.EnvFlags + executor queue.Executor + fs filesystem // Once checking of a syscall is finished, the result is sent to syscalls. // The main goroutine will wait for exactly pendingSyscalls messages. syscalls chan syscallResult @@ -76,25 +48,26 @@ type syscallResult struct { reason string } -func newCheckContext(cfg *mgrconfig.Config, impl checker) *checkContext { +func newCheckContext(ctx context.Context, cfg *mgrconfig.Config, impl checker, + executor queue.Executor) *checkContext { sandbox, err := ipc.SandboxToFlags(cfg.Sandbox) if err != nil { panic(fmt.Sprintf("failed to parse sandbox: %v", err)) } return &checkContext{ + ctx: ctx, impl: impl, cfg: cfg, target: cfg.Target, sandbox: sandbox, - requests: make(chan []*rpctype.ExecutionRequest), - results: make(map[int64]rpctype.ExecutionResult), + executor: executor, syscalls: make(chan syscallResult), features: make(chan featureResult, 100), - ready: make(chan bool), } } -func (ctx *checkContext) startCheck() []rpctype.ExecutionRequest { +func (ctx *checkContext) start(fileInfos []flatrpc.FileInfo) { + ctx.fs = createVirtualFilesystem(fileInfos) for _, id := range ctx.cfg.Syscalls { call := ctx.target.Syscalls[id] if call.Attrs.Disabled { @@ -112,52 +85,24 @@ func (ctx *checkContext) startCheck() []rpctype.ExecutionRequest { if ctx.cfg.SysTarget.HostFuzzer || ctx.target.OS == targets.TestOS { syscallCheck = alwaysSupported } - - var depsReason chan string - deps := ctx.cfg.SysTarget.PseudoSyscallDeps[call.CallName] - if len(deps) != 0 { - ctx.pendingRequests++ - depsReason = make(chan string, 1) - go func() { - depsReason <- ctx.supportedSyscalls(deps) - }() - } - ctx.pendingRequests++ go func() { - reason := syscallCheck(ctx, call) - ctx.waitForResults() - if reason == "" && depsReason != nil { - reason = <-depsReason + var reason string + deps := ctx.cfg.SysTarget.PseudoSyscallDeps[call.CallName] + if len(deps) != 0 { + reason = ctx.supportedSyscalls(deps) + } + // Only check the call if all its dependencies are satisfied. + if reason == "" { + reason = syscallCheck(ctx, call) } ctx.syscalls <- syscallResult{call, reason} }() } - ctx.checkFeatures() - var progs []rpctype.ExecutionRequest - dedup := make(map[hash.Sig]int64) - for i := 0; i < ctx.pendingRequests; i++ { - for _, req := range <-ctx.requests { - sig := hashReq(req) - req.ID = dedup[sig] - if req.ID != 0 { - continue - } - req.ID = int64(len(dedup) + 1) - dedup[sig] = req.ID - progs = append(progs, *req) - } - } - ctx.requests = nil - return progs + ctx.startFeaturesCheck() } -func (ctx *checkContext) finishCheck(fileInfos []flatrpc.FileInfo, progs []rpctype.ExecutionResult, - featureInfos []flatrpc.FeatureInfo) (map[*prog.Syscall]bool, map[*prog.Syscall]string, Features, error) { - ctx.fs = createVirtualFilesystem(fileInfos) - for _, res := range progs { - ctx.results[res.ID] = res - } - close(ctx.ready) +func (ctx *checkContext) wait(featureInfos []flatrpc.FeatureInfo) ( + map[*prog.Syscall]bool, map[*prog.Syscall]string, Features, error) { enabled := make(map[*prog.Syscall]bool) disabled := make(map[*prog.Syscall]string) for i := 0; i < ctx.pendingSyscalls; i++ { @@ -292,16 +237,12 @@ func (ctx *checkContext) val(name string) uint64 { } func (ctx *checkContext) execRaw(calls []string, mode prog.DeserializeMode, root bool) *ipc.ProgInfo { - if ctx.requests == nil { - panic("only one test execution per checker is supported") - } sandbox := ctx.sandbox if root { sandbox = 0 } - remain := calls - var requests []*rpctype.ExecutionRequest - for len(remain) != 0 { + info := &ipc.ProgInfo{} + for remain := calls; len(remain) != 0; { // Don't put too many syscalls into a single program, // it will have higher chances to time out. ncalls := min(len(remain), prog.MaxCalls/2) @@ -311,93 +252,39 @@ func (ctx *checkContext) execRaw(calls []string, mode prog.DeserializeMode, root if err != nil { panic(fmt.Sprintf("failed to deserialize: %v\n%v", err, progStr)) } - data, err := p.SerializeForExec() - if err != nil { - panic(fmt.Sprintf("failed to serialize test program: %v\n%s", err, progStr)) - } - requests = append(requests, &rpctype.ExecutionRequest{ - ProgData: slices.Clone(data), // clone to reduce memory usage - ExecOpts: ipc.ExecOpts{ + // TODO: request that the program must be re-executed on the first failure. + req := &queue.Request{ + Prog: p, + ExecOpts: &ipc.ExecOpts{ EnvFlags: sandbox, ExecFlags: 0, SandboxArg: ctx.cfg.SandboxArg, }, - }) - } - ctx.requests <- requests - <-ctx.ready - info := &ipc.ProgInfo{} - for _, req := range requests { - res, ok := ctx.results[req.ID] - if !ok { - panic(fmt.Sprintf("no result for request %v", req.ID)) } - if len(res.Info.Calls) == 0 { - panic(fmt.Sprintf("result for request %v has no calls", req.ID)) + ctx.executor.Submit(req) + res := req.Wait(ctx.ctx) + if res.Status == queue.Success { + info.Calls = append(info.Calls, res.Info.Calls...) + } else if res.Status == queue.Crashed { + // Pretend these calls were not executed. + info.Calls = append(info.Calls, ipc.EmptyProgInfo(ncalls).Calls...) + } else { + // The program must have been either executed or not due to a crash. + panic(fmt.Sprintf("got unexpected execution status (%d) for the prog %s", + res.Status, progStr)) } - info.Calls = append(info.Calls, res.Info.Calls...) } if len(info.Calls) != len(calls) { - panic(fmt.Sprintf("got only %v results for program %v with %v calls:\n%s", - len(info.Calls), requests[0].ID, len(calls), strings.Join(calls, "\n"))) + panic(fmt.Sprintf("got %v != %v results for program:\n%s", + len(info.Calls), len(calls), strings.Join(calls, "\n"))) } return info } -func (ctx *checkContext) execProg(p *prog.Prog, envFlags ipc.EnvFlags, - execFlags ipc.ExecFlags) rpctype.ExecutionResult { - if ctx.requests == nil { - panic("only one test execution per checker is supported") - } - data, err := p.SerializeForExec() - if err != nil { - panic(fmt.Sprintf("failed to serialize test program: %v\n%s", err, p.Serialize())) - } - req := &rpctype.ExecutionRequest{ - ProgData: data, - ReturnOutput: true, - ReturnError: true, - ExecOpts: ipc.ExecOpts{ - EnvFlags: envFlags, - ExecFlags: execFlags, - SandboxArg: ctx.cfg.SandboxArg, - }, - } - ctx.requests <- []*rpctype.ExecutionRequest{req} - <-ctx.ready - res, ok := ctx.results[req.ID] - if !ok { - panic(fmt.Sprintf("no result for request %v", req.ID)) - } - return res -} - func (ctx *checkContext) readFile(name string) ([]byte, error) { - ctx.waitForResults() return ctx.fs.ReadFile(name) } -func (ctx *checkContext) waitForResults() { - // If syscallCheck has already executed a program, then it's also waited for ctx.ready. - // If it hasn't, then we need to unblock the loop in startCheck by sending a nil request. - if ctx.requests == nil { - return - } - ctx.requests <- nil - <-ctx.ready - if ctx.fs == nil { - panic("filesystem should be initialized by now") - } -} - -func hashReq(req *rpctype.ExecutionRequest) hash.Sig { - buf := new(bytes.Buffer) - if err := gob.NewEncoder(buf).Encode(req.ExecOpts); err != nil { - panic(err) - } - return hash.Hash(req.ProgData, buf.Bytes()) -} - func alwaysSupported(ctx *checkContext, call *prog.Syscall) string { return "" } diff --git a/pkg/vminfo/vminfo.go b/pkg/vminfo/vminfo.go index 8c5af606e..4f6ab7425 100644 --- a/pkg/vminfo/vminfo.go +++ b/pkg/vminfo/vminfo.go @@ -14,6 +14,7 @@ package vminfo import ( "bytes" + "context" "errors" "fmt" "io" @@ -23,14 +24,15 @@ import ( "github.com/google/syzkaller/pkg/cover" "github.com/google/syzkaller/pkg/flatrpc" + "github.com/google/syzkaller/pkg/fuzzer/queue" "github.com/google/syzkaller/pkg/mgrconfig" - "github.com/google/syzkaller/pkg/rpctype" "github.com/google/syzkaller/prog" "github.com/google/syzkaller/sys/targets" ) type Checker struct { checker + source queue.Source checkContext *checkContext } @@ -46,9 +48,12 @@ func New(cfg *mgrconfig.Config) *Checker { default: impl = new(stub) } + ctx := context.Background() + executor := queue.Plain() return &Checker{ checker: impl, - checkContext: newCheckContext(cfg, impl), + source: queue.Deduplicate(ctx, executor), + checkContext: newCheckContext(ctx, cfg, impl, executor), } } @@ -77,17 +82,25 @@ func (checker *Checker) MachineInfo(fileInfos []flatrpc.FileInfo) ([]cover.Kerne return modules, info.Bytes(), nil } -func (checker *Checker) StartCheck() ([]string, []rpctype.ExecutionRequest) { - return checker.checkFiles(), checker.checkContext.startCheck() +func (checker *Checker) CheckFiles() []string { + return checker.checkFiles() } -func (checker *Checker) FinishCheck(files []flatrpc.FileInfo, progs []rpctype.ExecutionResult, - featureInfos []flatrpc.FeatureInfo) (map[*prog.Syscall]bool, map[*prog.Syscall]string, Features, error) { +func (checker *Checker) Run(files []flatrpc.FileInfo, featureInfos []flatrpc.FeatureInfo) ( + map[*prog.Syscall]bool, map[*prog.Syscall]string, Features, error) { ctx := checker.checkContext checker.checkContext = nil - return ctx.finishCheck(files, progs, featureInfos) + ctx.start(files) + return ctx.wait(featureInfos) } +// Implementation of the queue.Source interface. +func (checker *Checker) Next() *queue.Request { + return checker.source.Next() +} + +var _ queue.Source = &Checker{} + type machineInfoFunc func(files filesystem, w io.Writer) (string, error) type checker interface { diff --git a/pkg/vminfo/vminfo_test.go b/pkg/vminfo/vminfo_test.go index 2f72131a5..535782a64 100644 --- a/pkg/vminfo/vminfo_test.go +++ b/pkg/vminfo/vminfo_test.go @@ -7,12 +7,13 @@ import ( "runtime" "strings" "testing" + "time" "github.com/google/syzkaller/pkg/flatrpc" + "github.com/google/syzkaller/pkg/fuzzer/queue" "github.com/google/syzkaller/pkg/host" "github.com/google/syzkaller/pkg/ipc" "github.com/google/syzkaller/pkg/mgrconfig" - "github.com/google/syzkaller/pkg/rpctype" "github.com/google/syzkaller/prog" "github.com/google/syzkaller/sys/targets" ) @@ -55,9 +56,10 @@ func TestSyscalls(t *testing.T) { t.Parallel() cfg := testConfig(t, target.OS, target.Arch) checker := New(cfg) - _, checkProgs := checker.StartCheck() - results, featureInfos := createSuccessfulResults(t, cfg.Target, checkProgs) - enabled, disabled, _, err := checker.FinishCheck(nil, results, featureInfos) + stop := make(chan struct{}) + go createSuccessfulResults(checker, stop) + enabled, disabled, _, err := checker.Run(nil, allFeatures()) + close(stop) if err != nil { t.Fatal(err) } @@ -72,19 +74,38 @@ func TestSyscalls(t *testing.T) { } } -func createSuccessfulResults(t *testing.T, target *prog.Target, - progs []rpctype.ExecutionRequest) ([]rpctype.ExecutionResult, []flatrpc.FeatureInfo) { - var results []rpctype.ExecutionResult - for _, req := range progs { - p, err := target.DeserializeExec(req.ProgData, nil) - if err != nil { - t.Fatal(err) +func allFeatures() []flatrpc.FeatureInfo { + var features []flatrpc.FeatureInfo + for feat := range flatrpc.EnumNamesFeature { + features = append(features, flatrpc.FeatureInfo{ + Id: feat, + }) + } + return features +} + +func createSuccessfulResults(source queue.Source, stop chan struct{}) { + var count int + for { + select { + case <-stop: + return + case <-time.After(time.Millisecond): + } + req := source.Next() + if req == nil { + continue } - res := rpctype.ExecutionResult{ - ID: req.ID, + count++ + if count > 1000 { + // This is just a sanity check that we don't do something stupid accidentally. + // If it grows above the limit intentionally, the limit can be increased. + // Currently we have 641 (when we failed to properly dedup syscall tests, it was 4349). + panic("too many test programs") } - for range p.Calls { - res.Info.Calls = append(res.Info.Calls, ipc.CallInfo{ + info := &ipc.ProgInfo{} + for range req.Prog.Calls { + info.Calls = append(info.Calls, ipc.CallInfo{ Cover: []uint32{1}, Signal: []uint32{1}, Comps: map[uint64]map[uint64]bool{ @@ -92,15 +113,11 @@ func createSuccessfulResults(t *testing.T, target *prog.Target, }, }) } - results = append(results, res) - } - var features []flatrpc.FeatureInfo - for feat := range flatrpc.EnumNamesFeature { - features = append(features, flatrpc.FeatureInfo{ - Id: feat, + req.Done(&queue.Result{ + Status: queue.Success, + Info: info, }) } - return results, features } func hostChecker(t *testing.T) (*Checker, []flatrpc.FileInfo) { diff --git a/syz-manager/manager.go b/syz-manager/manager.go index 864a3676f..648020482 100644 --- a/syz-manager/manager.go +++ b/syz-manager/manager.go @@ -74,7 +74,7 @@ type Manager struct { mu sync.Mutex fuzzer atomic.Pointer[fuzzer.Fuzzer] - execSource atomic.Value // queue.Source + execSource queue.DynamicSource phase int targetEnabledSyscalls map[*prog.Syscall]bool @@ -194,7 +194,7 @@ func RunManager(cfg *mgrconfig.Config) { go mgr.corpusInputHandler(corpusUpdates) // Create RPC server for fuzzers. - mgr.serv, err = startRPCServer(mgr) + mgr.serv, err = startRPCServer(mgr, &mgr.execSource) if err != nil { log.Fatalf("failed to create rpc server: %v", err) } @@ -1376,7 +1376,7 @@ func (mgr *Manager) machineChecked(features flatrpc.Feature, enabledSyscalls map }, }, rnd, mgr.target) mgr.fuzzer.Store(fuzzerObj) - mgr.execSource.Store(queue.Retry(fuzzerObj)) + mgr.execSource.Store(fuzzerObj) mgr.loadCorpus() mgr.firstConnect.Store(time.Now().Unix()) @@ -1398,11 +1398,6 @@ func (mgr *Manager) corpusMinimization() { } } -// We need this method since we're not supposed to access Manager fields from RPCServer. -func (mgr *Manager) getExecSource() queue.Source { - return mgr.execSource.Load().(queue.Source) -} - func (mgr *Manager) fuzzerSignalRotation() { const ( rotateSignals = 1000 diff --git a/syz-manager/rpc.go b/syz-manager/rpc.go index 4e95849b0..4fda07f07 100644 --- a/syz-manager/rpc.go +++ b/syz-manager/rpc.go @@ -36,13 +36,8 @@ type RPCServer struct { infoDone bool checkDone atomic.Bool - checkFiles []string - checkFilesInfo []flatrpc.FileInfo - checkFeatureInfo []flatrpc.FeatureInfo - checkProgs []rpctype.ExecutionRequest - checkResults []rpctype.ExecutionResult - needCheckResults int checkFailures int + checkerSource *queue.DynamicSource enabledFeatures flatrpc.Feature setupFeatures flatrpc.Feature modules []cover.KernelModule @@ -50,8 +45,9 @@ type RPCServer struct { execCoverFilter map[uint32]uint32 coverFilter map[uint32]uint32 - mu sync.Mutex - runners sync.Map // Instead of map[string]*Runner. + mu sync.Mutex + runners sync.Map // Instead of map[string]*Runner. + execSource queue.Source statExecs *stats.Val statExecRetries *stats.Val @@ -98,16 +94,18 @@ type BugFrames struct { type RPCManagerView interface { currentBugFrames() BugFrames machineChecked(features flatrpc.Feature, enabledSyscalls map[*prog.Syscall]bool) - getExecSource() queue.Source } -func startRPCServer(mgr *Manager) (*RPCServer, error) { +func startRPCServer(mgr *Manager, source queue.Source) (*RPCServer, error) { + var checkSource queue.DynamicSource serv := &RPCServer{ - mgr: mgr, - cfg: mgr.cfg, - target: mgr.target, - checker: vminfo.New(mgr.cfg), - statExecs: mgr.statExecs, + mgr: mgr, + cfg: mgr.cfg, + target: mgr.target, + checker: vminfo.New(mgr.cfg), + checkerSource: &checkSource, + execSource: queue.Retry(queue.Order(&checkSource, source)), + statExecs: mgr.statExecs, statExecRetries: stats.Create("exec retries", "Number of times a test program was restarted because the first run failed", stats.Rate{}, stats.Graph("executor")), @@ -131,8 +129,8 @@ func startRPCServer(mgr *Manager) (*RPCServer, error) { if err != nil { return nil, err } - serv.checkFiles, serv.checkProgs = serv.checker.StartCheck() - serv.needCheckResults = len(serv.checkProgs) + checkSource.Store(serv.checker) + log.Logf(0, "serving rpc on tcp://%v", s.Addr()) serv.port = s.Addr().(*net.TCPAddr).Port serv.server = s @@ -155,7 +153,7 @@ func (serv *RPCServer) Connect(a *rpctype.ConnectArgs, r *rpctype.ConnectRes) er if serv.checkDone.Load() { r.Features = serv.setupFeatures } else { - r.ReadFiles = append(r.ReadFiles, serv.checkFiles...) + r.ReadFiles = append(r.ReadFiles, serv.checker.CheckFiles()...) r.ReadGlobs = serv.target.RequiredGlobs() r.Features = flatrpc.AllFeatures } @@ -207,8 +205,10 @@ func (serv *RPCServer) Check(a *rpctype.CheckArgs, r *rpctype.CheckRes) error { if !serv.infoDone { serv.infoDone = true - serv.checkFeatureInfo = a.Features - serv.checkFilesInfo = a.Files + + // Now execute check programs. + go serv.runCheck(a.Files, a.Features) + serv.modules = modules serv.target.UpdateGlobs(a.Globs) serv.canonicalModules = cover.NewCanonicalizer(modules, serv.cfg.Cover) @@ -237,11 +237,19 @@ func (serv *RPCServer) Check(a *rpctype.CheckArgs, r *rpctype.CheckRes) error { return nil } -func (serv *RPCServer) finishCheck() error { +func (serv *RPCServer) runCheck(checkFilesInfo []flatrpc.FileInfo, checkFeatureInfo []flatrpc.FeatureInfo) { + if err := serv.finishCheck(checkFilesInfo, checkFeatureInfo); err != nil { + log.Fatalf("check failed: %v", err) + } + serv.checkerSource.Store(nil) // There's no sense in calling checker's Next() each time. + serv.checkDone.Store(true) +} + +func (serv *RPCServer) finishCheck(checkFilesInfo []flatrpc.FileInfo, checkFeatureInfo []flatrpc.FeatureInfo) error { // Note: need to print disbled syscalls before failing due to an error. // This helps to debug "all system calls are disabled". - enabledCalls, disabledCalls, features, checkErr := serv.checker.FinishCheck( - serv.checkFilesInfo, serv.checkResults, serv.checkFeatureInfo) + + enabledCalls, disabledCalls, features, checkErr := serv.checker.Run(checkFilesInfo, checkFeatureInfo) enabledCalls, transitivelyDisabled := serv.target.TransitivelyEnabledCalls(enabledCalls) buf := new(bytes.Buffer) if len(serv.cfg.EnabledSyscalls) != 0 || log.V(1) { @@ -265,7 +273,7 @@ func (serv *RPCServer) finishCheck() error { } } hasFileErrors := false - for _, file := range serv.checkFilesInfo { + for _, file := range checkFilesInfo { if file.Error == "" { continue } @@ -334,42 +342,15 @@ func (serv *RPCServer) ExchangeInfo(a *rpctype.ExchangeInfoRequest, r *rpctype.E if runner == nil { return nil } - - if !serv.checkDone.Load() { - serv.mu.Lock() - if !serv.checkDone.Load() { - serv.checkResults = append(serv.checkResults, a.Results...) - if len(serv.checkResults) < serv.needCheckResults { - numRequests := min(len(serv.checkProgs), a.NeedProgs) - r.Requests = serv.checkProgs[:numRequests] - serv.checkProgs = serv.checkProgs[numRequests:] - } else { - if err := serv.finishCheck(); err != nil { - log.Fatalf("check failed: %v", err) - } - serv.checkProgs = nil - serv.checkResults = nil - serv.checkFiles = nil - serv.checkFilesInfo = nil - serv.checkFeatureInfo = nil - serv.checkDone.Store(true) - } - } - serv.mu.Unlock() - return nil - } - - source := serv.mgr.getExecSource() - if source == nil { - // ExchangeInfo calls follow MachineCheck, so the fuzzer must have been initialized. - panic("exchange info call with nil fuzzer") - } - // First query new inputs and only then post results. // It should foster a more even distribution of executions // across all VMs. for len(r.Requests) < a.NeedProgs { - inp := source.Next() + inp := serv.execSource.Next() + if inp == nil { + // It's unlikely that subsequent Next() calls will yield something. + break + } if req, ok := serv.newRequest(runner, inp); ok { r.Requests = append(r.Requests, req) } else { @@ -460,9 +441,6 @@ func (serv *RPCServer) shutdownInstance(name string, crashed bool) []byte { serv.mu.Lock() defer serv.mu.Unlock() - if !serv.checkDone.Load() { - log.Fatalf("VM is exited while checking is not done") - } for _, req := range oldRequests { if crashed && req.try >= 0 { req.req.Done(&queue.Result{Status: queue.Crashed}) @@ -531,7 +509,11 @@ func (serv *RPCServer) doneRequest(runner *Runner, resp rpctype.ExecutionResult) } info.Extra.Cover = runner.instModules.Canonicalize(info.Extra.Cover) info.Extra.Signal = runner.instModules.Canonicalize(info.Extra.Signal) - req.req.Done(&queue.Result{Info: info}) + req.req.Done(&queue.Result{ + Info: info, + Output: resp.Output, + Error: resp.Error, + }) } func (serv *RPCServer) newRequest(runner *Runner, req *queue.Request) (rpctype.ExecutionRequest, bool) { @@ -560,14 +542,23 @@ func (serv *RPCServer) newRequest(runner *Runner, req *queue.Request) (rpctype.E } } runner.mu.Unlock() + + var execOpts ipc.ExecOpts + if req.ExecOpts != nil { + execOpts = *req.ExecOpts + } else { + execOpts = serv.createExecOpts(req) + } return rpctype.ExecutionRequest{ ID: id, ProgData: progData, - ExecOpts: serv.createExecOpts(req), + ExecOpts: execOpts, NewSignal: req.NeedSignal == queue.NewSignal, SignalFilter: signalFilter, SignalFilterCall: req.SignalFilterCall, ResetState: serv.cfg.Experimental.ResetAccState, + ReturnError: req.ReturnError, + ReturnOutput: req.ReturnOutput, }, true } diff --git a/tools/syz-execprog/execprog.go b/tools/syz-execprog/execprog.go index 518e7511f..b246c4263 100644 --- a/tools/syz-execprog/execprog.go +++ b/tools/syz-execprog/execprog.go @@ -7,6 +7,7 @@ package main import ( "bytes" + "context" "flag" "fmt" "math/rand" @@ -22,13 +23,13 @@ import ( "github.com/google/syzkaller/pkg/csource" "github.com/google/syzkaller/pkg/db" "github.com/google/syzkaller/pkg/flatrpc" + "github.com/google/syzkaller/pkg/fuzzer/queue" "github.com/google/syzkaller/pkg/host" "github.com/google/syzkaller/pkg/ipc" "github.com/google/syzkaller/pkg/ipc/ipcconfig" "github.com/google/syzkaller/pkg/log" "github.com/google/syzkaller/pkg/mgrconfig" "github.com/google/syzkaller/pkg/osutil" - "github.com/google/syzkaller/pkg/rpctype" "github.com/google/syzkaller/pkg/tool" "github.com/google/syzkaller/pkg/vminfo" "github.com/google/syzkaller/prog" @@ -428,36 +429,17 @@ func createConfig(target *prog.Target, featuresFlags csource.Features, syscalls }, } checker := vminfo.New(cfg) - files, requests := checker.StartCheck() - fileInfos := host.ReadFiles(files) + fileInfos := host.ReadFiles(checker.RequiredFiles()) featureInfos, err := host.SetupFeatures(target, config.Executor, flatrpc.AllFeatures, featuresFlags) if err != nil { log.Fatal(err) } - env, err := ipc.MakeEnv(config, 0) - if err != nil { - log.Fatalf("failed to create ipc env: %v", err) - } - defer env.Close() - var results []rpctype.ExecutionResult - for _, req := range requests { - output, info, hanged, err := env.ExecProg(&req.ExecOpts, req.ProgData) - res := rpctype.ExecutionResult{ - ID: req.ID, - Output: output, - } - if info != nil { - res.Info = *info - } - if err != nil { - res.Error = err.Error() - } - if hanged && err == nil { - res.Error = "hanged" - } - results = append(results, res) - } - enabledSyscalls, disabledSyscalls, features, err := checker.FinishCheck(fileInfos, results, featureInfos) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go checkerExecutor(ctx, checker, config) + + enabledSyscalls, disabledSyscalls, features, err := checker.Run(fileInfos, featureInfos) if err != nil { log.Fatal(err) } @@ -476,3 +458,41 @@ func createConfig(target *prog.Target, featuresFlags csource.Features, syscalls execOpts.EnvFlags |= ipc.FeaturesToFlags(features.Enabled(), featuresFlags) return config, execOpts, enabledSyscalls, features.Enabled() } + +func checkerExecutor(ctx context.Context, source queue.Source, config *ipc.Config) { + env, err := ipc.MakeEnv(config, 0) + if err != nil { + log.Fatalf("failed to create ipc env: %v", err) + } + defer env.Close() + for { + req := source.Next() + if req == nil { + select { + case <-time.After(time.Second / 100): + case <-ctx.Done(): + return + } + continue + } + progData, err := req.Prog.SerializeForExec() + if err != nil { + log.Fatalf("failed to serialize %s: %v", req.Prog.Serialize(), err) + } + output, info, hanged, err := env.ExecProg(req.ExecOpts, progData) + res := &queue.Result{ + Status: queue.Success, + Info: info, + Output: output, + } + if err != nil { + res.Status = queue.ExecFailure + res.Error = err.Error() + } + if hanged && err == nil { + res.Status = queue.ExecFailure + res.Error = "hanged" + } + req.Done(res) + } +} |
