aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--pkg/fuzzer/queue/queue.go6
-rw-r--r--pkg/runtest/run.go129
-rw-r--r--pkg/runtest/run_test.go87
-rw-r--r--pkg/vminfo/features.go5
-rw-r--r--syz-manager/rpc.go6
-rw-r--r--tools/syz-execprog/execprog.go4
-rw-r--r--tools/syz-runtest/runtest.go120
7 files changed, 190 insertions, 167 deletions
diff --git a/pkg/fuzzer/queue/queue.go b/pkg/fuzzer/queue/queue.go
index 0c1735243..14d4ace78 100644
--- a/pkg/fuzzer/queue/queue.go
+++ b/pkg/fuzzer/queue/queue.go
@@ -36,6 +36,10 @@ type Request struct {
// This stat will be incremented on request completion.
Stat *stats.Val
+ // Options needed by runtest.
+ BinaryFile string // If set, it's executed instead of Prog.
+ Repeat int // Repeats in addition to the first run.
+
// The callback will be called on request completion in the LIFO order.
// If it returns false, all further processing will be stopped.
// It allows wrappers to intercept Done() requests.
@@ -123,7 +127,7 @@ type Result struct {
Info *ipc.ProgInfo
Output []byte
Status Status
- Error string // More details in case of ExecFailure.
+ Err error // More details in case of ExecFailure.
}
func (r *Result) clone() *Result {
diff --git a/pkg/runtest/run.go b/pkg/runtest/run.go
index d32e9f8d3..dc8dcd2c4 100644
--- a/pkg/runtest/run.go
+++ b/pkg/runtest/run.go
@@ -13,6 +13,7 @@ package runtest
import (
"bufio"
"bytes"
+ "context"
"fmt"
"os"
"path/filepath"
@@ -24,27 +25,23 @@ import (
"github.com/google/syzkaller/pkg/csource"
"github.com/google/syzkaller/pkg/flatrpc"
+ "github.com/google/syzkaller/pkg/fuzzer/queue"
"github.com/google/syzkaller/pkg/ipc"
"github.com/google/syzkaller/prog"
"github.com/google/syzkaller/sys/targets"
)
-type RunRequest struct {
- Bin string
- P *prog.Prog
- Opts ipc.ExecOpts
- Repeat int
-
- Done chan struct{}
- Output []byte
- Info ipc.ProgInfo
- Err error
+type runRequest struct {
+ *queue.Request
+ err error
finished chan struct{}
- results *ipc.ProgInfo
- name string
- broken string
- skip string
+ result *queue.Result
+ results *ipc.ProgInfo // the expected results
+
+ name string
+ broken string
+ skip string
}
type Context struct {
@@ -52,12 +49,13 @@ type Context struct {
Target *prog.Target
Features flatrpc.Feature
EnabledCalls map[string]map[*prog.Syscall]bool
- Requests chan *RunRequest
LogFunc func(text string)
Retries int // max number of test retries to deal with flaky tests
Verbose bool
Debug bool
Tests string // prefix to match test file names
+
+ executor queue.PlainQueue
}
func (ctx *Context) log(msg string, args ...interface{}) {
@@ -65,17 +63,16 @@ func (ctx *Context) log(msg string, args ...interface{}) {
}
func (ctx *Context) Run() error {
- defer close(ctx.Requests)
if ctx.Retries%2 == 0 {
ctx.Retries++
}
- progs := make(chan *RunRequest, 1000+2*cap(ctx.Requests))
+ progs := make(chan *runRequest, 1000)
errc := make(chan error, 1)
go func() {
defer close(progs)
errc <- ctx.generatePrograms(progs)
}()
- var requests []*RunRequest
+ var requests []*runRequest
for req := range progs {
req := req
requests = append(requests, req)
@@ -94,12 +91,10 @@ func (ctx *Context) Run() error {
// In the best case this allows to get off with just 1 test run.
var resultErr error
for try, failed := 0, 0; try < ctx.Retries; try++ {
- req.Output = nil
- req.Info = ipc.ProgInfo{}
- req.Done = make(chan struct{})
- ctx.Requests <- req
- <-req.Done
- if req.Err != nil {
+ ctx.executor.Submit(req.Request)
+ req.result = req.Request.Wait(context.Background())
+ if req.result.Err != nil {
+ resultErr = req.result.Err
break
}
err := checkResult(req)
@@ -112,9 +107,7 @@ func (ctx *Context) Run() error {
break
}
}
- if req.Err == nil {
- req.Err = resultErr
- }
+ req.err = resultErr
close(req.finished)
}()
}
@@ -132,13 +125,14 @@ func (ctx *Context) Run() error {
verbose = true
} else {
<-req.finished
- if req.Err != nil {
+ if req.err != nil {
fail++
result = fmt.Sprintf("FAIL: %v",
- strings.Replace(req.Err.Error(), "\n", "\n\t", -1))
- if len(req.Output) != 0 {
+ strings.Replace(req.err.Error(), "\n", "\n\t", -1))
+ res := req.result
+ if len(res.Output) != 0 {
result += fmt.Sprintf("\n\t%s",
- strings.Replace(string(req.Output), "\n", "\n\t", -1))
+ strings.Replace(string(res.Output), "\n", "\n\t", -1))
}
} else {
ok++
@@ -148,8 +142,8 @@ func (ctx *Context) Run() error {
if !verbose || ctx.Verbose {
ctx.log("%-38v: %v", req.name, result)
}
- if req.Bin != "" {
- os.Remove(req.Bin)
+ if req.Request != nil && req.Request.BinaryFile != "" {
+ os.Remove(req.BinaryFile)
}
}
if err := <-errc; err != nil {
@@ -162,7 +156,11 @@ func (ctx *Context) Run() error {
return nil
}
-func (ctx *Context) generatePrograms(progs chan *RunRequest) error {
+func (ctx *Context) Next() *queue.Request {
+ return ctx.executor.Next()
+}
+
+func (ctx *Context) generatePrograms(progs chan *runRequest) error {
cover := []bool{false}
if ctx.Features&flatrpc.FeatureCoverage != 0 {
cover = append(cover, true)
@@ -201,7 +199,7 @@ func progFileList(dir, filter string) ([]string, error) {
return res, nil
}
-func (ctx *Context) generateFile(progs chan *RunRequest, sandboxes []string, cover []bool, filename string) error {
+func (ctx *Context) generateFile(progs chan *runRequest, sandboxes []string, cover []bool, filename string) error {
p, requires, results, err := parseProg(ctx.Target, ctx.Dir, filename)
if err != nil {
return err
@@ -215,7 +213,7 @@ nextSandbox:
name := fmt.Sprintf("%v %v", filename, sandbox)
for _, call := range p.Calls {
if !ctx.EnabledCalls[sandbox][call.Meta] {
- progs <- &RunRequest{
+ progs <- &runRequest{
name: name,
skip: fmt.Sprintf("unsupported call %v", call.Meta.Name),
}
@@ -267,7 +265,7 @@ nextSandbox:
name += " C"
if !sysTarget.ExecutorUsesForkServer && times > 1 {
// Non-fork loop implementation does not support repetition.
- progs <- &RunRequest{
+ progs <- &runRequest{
name: name,
broken: "non-forking loop",
}
@@ -378,7 +376,7 @@ func checkArch(requires map[string]bool, arch string) bool {
return true
}
-func (ctx *Context) produceTest(progs chan *RunRequest, req *RunRequest, name string,
+func (ctx *Context) produceTest(progs chan *runRequest, req *runRequest, name string,
properties, requires map[string]bool, results *ipc.ProgInfo) {
req.name = name
req.results = results
@@ -409,7 +407,7 @@ func match(props, requires map[string]bool) bool {
return true
}
-func (ctx *Context) createSyzTest(p *prog.Prog, sandbox string, threaded, cov bool, times int) (*RunRequest, error) {
+func (ctx *Context) createSyzTest(p *prog.Prog, sandbox string, threaded, cov bool, times int) (*runRequest, error) {
var opts ipc.ExecOpts
sandboxFlags, err := ipc.SandboxToFlags(sandbox)
if err != nil {
@@ -428,15 +426,17 @@ func (ctx *Context) createSyzTest(p *prog.Prog, sandbox string, threaded, cov bo
if ctx.Debug {
opts.EnvFlags |= ipc.FlagDebug
}
- req := &RunRequest{
- P: p,
- Opts: opts,
- Repeat: times,
+ req := &runRequest{
+ Request: &queue.Request{
+ Prog: p,
+ ExecOpts: &opts,
+ Repeat: times,
+ },
}
return req, nil
}
-func (ctx *Context) createCTest(p *prog.Prog, sandbox string, threaded bool, times int) (*RunRequest, error) {
+func (ctx *Context) createCTest(p *prog.Prog, sandbox string, threaded bool, times int) (*runRequest, error) {
opts := csource.Options{
Threaded: threaded,
Repeat: times > 1,
@@ -479,29 +479,34 @@ func (ctx *Context) createCTest(p *prog.Prog, sandbox string, threaded bool, tim
if threaded {
ipcFlags |= ipc.FlagThreaded
}
- req := &RunRequest{
- P: p,
- Bin: bin,
- Opts: ipc.ExecOpts{
- ExecFlags: ipcFlags,
+ req := &runRequest{
+ Request: &queue.Request{
+ Prog: p,
+ BinaryFile: bin,
+ ExecOpts: &ipc.ExecOpts{
+ ExecFlags: ipcFlags,
+ },
+ Repeat: times,
},
- Repeat: times,
}
return req, nil
}
-func checkResult(req *RunRequest) error {
+func checkResult(req *runRequest) error {
+ if req.result.Status != queue.Success {
+ return fmt.Errorf("non-successful result status (%v)", req.result.Status)
+ }
var infos []ipc.ProgInfo
- isC := req.Bin != ""
+ isC := req.BinaryFile != ""
if isC {
var err error
if infos, err = parseBinOutput(req); err != nil {
return err
}
} else {
- raw := req.Info
+ raw := req.result.Info
for len(raw.Calls) != 0 {
- ncalls := min(len(raw.Calls), len(req.P.Calls))
+ ncalls := min(len(raw.Calls), len(req.Prog.Calls))
infos = append(infos, ipc.ProgInfo{
Extra: raw.Extra,
Calls: raw.Calls[:ncalls],
@@ -511,7 +516,7 @@ func checkResult(req *RunRequest) error {
}
if req.Repeat != len(infos) {
return fmt.Errorf("should repeat %v times, but repeated %v, prog calls %v, info calls %v\n%s",
- req.Repeat, len(infos), req.P.Calls, len(req.Info.Calls), req.Output)
+ req.Repeat, len(infos), req.Prog.Calls, len(req.result.Info.Calls), req.result.Output)
}
calls := make(map[string]bool)
for run, info := range infos {
@@ -524,7 +529,7 @@ func checkResult(req *RunRequest) error {
return nil
}
-func checkCallResult(req *RunRequest, isC bool, run, call int, info ipc.ProgInfo, calls map[string]bool) error {
+func checkCallResult(req *runRequest, isC bool, run, call int, info ipc.ProgInfo, calls map[string]bool) error {
inf := info.Calls[call]
want := req.results.Calls[call]
for flag, what := range map[ipc.CallFlags]string{
@@ -537,7 +542,7 @@ func checkCallResult(req *RunRequest, isC bool, run, call int, info ipc.ProgInfo
// C code does not detect blocked/non-finished calls.
continue
}
- if req.Opts.ExecFlags&ipc.FlagThreaded == 0 {
+ if req.ExecOpts.ExecFlags&ipc.FlagThreaded == 0 {
// In non-threaded mode blocked syscalls will block main thread
// and we won't detect blocked/unfinished syscalls.
continue
@@ -563,12 +568,12 @@ func checkCallResult(req *RunRequest, isC bool, run, call int, info ipc.ProgInfo
if isC || inf.Flags&ipc.CallExecuted == 0 {
return nil
}
- if req.Opts.EnvFlags&ipc.FlagSignal != 0 {
+ if req.ExecOpts.EnvFlags&ipc.FlagSignal != 0 {
// Signal is always deduplicated, so we may not get any signal
// on a second invocation of the same syscall.
// For calls that are not meant to collect synchronous coverage we
// allow the signal to be empty as long as the extra signal is not.
- callName := req.P.Calls[call].Meta.CallName
+ callName := req.Prog.Calls[call].Meta.CallName
if len(inf.Signal) < 2 && !calls[callName] && len(info.Extra.Signal) == 0 {
return fmt.Errorf("run %v: call %v: no signal", run, call)
}
@@ -586,13 +591,13 @@ func checkCallResult(req *RunRequest, isC bool, run, call int, info ipc.ProgInfo
return nil
}
-func parseBinOutput(req *RunRequest) ([]ipc.ProgInfo, error) {
+func parseBinOutput(req *runRequest) ([]ipc.ProgInfo, error) {
var infos []ipc.ProgInfo
- s := bufio.NewScanner(bytes.NewReader(req.Output))
+ s := bufio.NewScanner(bytes.NewReader(req.result.Output))
re := regexp.MustCompile("^### call=([0-9]+) errno=([0-9]+)$")
for s.Scan() {
if s.Text() == "### start" {
- infos = append(infos, ipc.ProgInfo{Calls: make([]ipc.CallInfo, len(req.P.Calls))})
+ infos = append(infos, ipc.ProgInfo{Calls: make([]ipc.CallInfo, len(req.Prog.Calls))})
}
match := re.FindSubmatch(s.Bytes())
if match == nil {
diff --git a/pkg/runtest/run_test.go b/pkg/runtest/run_test.go
index f5798a9b4..09b714e1e 100644
--- a/pkg/runtest/run_test.go
+++ b/pkg/runtest/run_test.go
@@ -4,6 +4,7 @@
package runtest
import (
+ "context"
"errors"
"flag"
"fmt"
@@ -14,6 +15,7 @@ import (
"time"
"github.com/google/syzkaller/pkg/csource"
+ "github.com/google/syzkaller/pkg/fuzzer/queue"
"github.com/google/syzkaller/pkg/ipc"
"github.com/google/syzkaller/pkg/osutil"
"github.com/google/syzkaller/pkg/testutil"
@@ -62,24 +64,12 @@ func test(t *testing.T, sysTarget *targets.Target) {
"": calls,
"none": calls,
}
- requests := make(chan *RunRequest, 2*runtime.GOMAXPROCS(0))
- go func() {
- for req := range requests {
- if req.Bin != "" {
- runTestC(req)
- } else {
- runTest(req, executor)
- }
- close(req.Done)
- }
- }()
ctx := &Context{
Dir: filepath.Join("..", "..", "sys", target.OS, targets.TestOS),
Target: target,
Tests: *flagFilter,
Features: 0,
EnabledCalls: enabledCalls,
- Requests: requests,
LogFunc: func(text string) {
t.Helper()
t.Logf(text)
@@ -88,66 +78,99 @@ func test(t *testing.T, sysTarget *targets.Target) {
Verbose: true,
Debug: *flagDebug,
}
+
+ executorCtx, cancel := context.WithCancel(context.Background())
+ t.Cleanup(cancel)
+ go func() {
+ for {
+ select {
+ case <-time.After(time.Millisecond):
+ case <-executorCtx.Done():
+ return
+ }
+ req := ctx.Next()
+ if req == nil {
+ continue
+ }
+ if req.BinaryFile != "" {
+ req.Done(runTestC(req))
+ } else {
+ req.Done(runTest(req, executor))
+ }
+ }
+ }()
if err := ctx.Run(); err != nil {
t.Fatal(err)
}
}
-func runTest(req *RunRequest, executor string) {
+func runTest(req *queue.Request, executor string) *queue.Result {
cfg := new(ipc.Config)
- sysTarget := targets.Get(req.P.Target.OS, req.P.Target.Arch)
+ sysTarget := targets.Get(req.Prog.Target.OS, req.Prog.Target.Arch)
cfg.UseShmem = sysTarget.ExecutorUsesShmem
cfg.UseForkServer = sysTarget.ExecutorUsesForkServer
cfg.Timeouts = sysTarget.Timeouts(1)
cfg.Executor = executor
env, err := ipc.MakeEnv(cfg, 0)
if err != nil {
- req.Err = fmt.Errorf("failed to create ipc env: %w", err)
- return
+ return &queue.Result{
+ Status: queue.ExecFailure,
+ Err: fmt.Errorf("failed to create ipc env: %w", err),
+ }
}
defer env.Close()
+ ret := &queue.Result{Status: queue.Success}
for run := 0; run < req.Repeat; run++ {
if run%2 == 0 {
// Recreate Env every few iterations, this allows to cover more paths.
env.ForceRestart()
}
- output, info, hanged, err := env.Exec(&req.Opts, req.P)
- req.Output = append(req.Output, output...)
+ output, info, hanged, err := env.Exec(req.ExecOpts, req.Prog)
+ ret.Output = append(ret.Output, output...)
if err != nil {
- req.Err = fmt.Errorf("run %v: failed to run: %w", run, err)
- return
+ return &queue.Result{
+ Status: queue.ExecFailure,
+ Err: fmt.Errorf("run %v: failed to run: %w", run, err),
+ }
}
if hanged {
- req.Err = fmt.Errorf("run %v: hanged", run)
- return
+ return &queue.Result{
+ Status: queue.ExecFailure,
+ Err: fmt.Errorf("run %v: hanged", run),
+ }
}
if run == 0 {
- req.Info = *info
+ ret.Info = info
} else {
- req.Info.Calls = append(req.Info.Calls, info.Calls...)
+ ret.Info.Calls = append(ret.Info.Calls, info.Calls...)
}
}
+ return ret
}
-func runTestC(req *RunRequest) {
+func runTestC(req *queue.Request) *queue.Result {
tmpDir, err := os.MkdirTemp("", "syz-runtest")
if err != nil {
- req.Err = fmt.Errorf("failed to create temp dir: %w", err)
- return
+ return &queue.Result{
+ Status: queue.ExecFailure,
+ Err: fmt.Errorf("failed to create temp dir: %w", err),
+ }
}
defer os.RemoveAll(tmpDir)
- cmd := osutil.Command(req.Bin)
+ cmd := osutil.Command(req.BinaryFile)
cmd.Dir = tmpDir
// Tell ASAN to not mess with our NONFAILING.
cmd.Env = append(append([]string{}, os.Environ()...), "ASAN_OPTIONS=handle_segv=0 allow_user_segv_handler=1")
- req.Output, req.Err = osutil.Run(20*time.Second, cmd)
+ res := &queue.Result{}
+ res.Output, res.Err = osutil.Run(20*time.Second, cmd)
var verr *osutil.VerboseError
- if errors.As(req.Err, &verr) {
+ if errors.As(res.Err, &verr) {
// The process can legitimately do something like exit_group(1).
// So we ignore the error and rely on the rest of the checks (e.g. syscall return values).
- req.Err = nil
- req.Output = verr.Output
+ res.Err = nil
+ res.Output = verr.Output
}
+ return res
}
func TestParsing(t *testing.T) {
diff --git a/pkg/vminfo/features.go b/pkg/vminfo/features.go
index ab71e1168..fb443774c 100644
--- a/pkg/vminfo/features.go
+++ b/pkg/vminfo/features.go
@@ -176,7 +176,10 @@ func (ctx *checkContext) featureSucceeded(feat flatrpc.Feature, testProg *prog.P
if len(res.Output) != 0 {
return string(res.Output)
}
- return res.Error
+ if res.Err != nil {
+ return res.Err.Error()
+ }
+ return "test program execution failed"
}
if len(res.Info.Calls) != len(testProg.Calls) {
return fmt.Sprintf("only %v calls are executed out of %v",
diff --git a/syz-manager/rpc.go b/syz-manager/rpc.go
index 4fda07f07..92b2f8b1e 100644
--- a/syz-manager/rpc.go
+++ b/syz-manager/rpc.go
@@ -509,10 +509,14 @@ func (serv *RPCServer) doneRequest(runner *Runner, resp rpctype.ExecutionResult)
}
info.Extra.Cover = runner.instModules.Canonicalize(info.Extra.Cover)
info.Extra.Signal = runner.instModules.Canonicalize(info.Extra.Signal)
+ var err error
+ if resp.Error != "" {
+ err = fmt.Errorf("%s", resp.Error)
+ }
req.req.Done(&queue.Result{
Info: info,
Output: resp.Output,
- Error: resp.Error,
+ Err: err,
})
}
diff --git a/tools/syz-execprog/execprog.go b/tools/syz-execprog/execprog.go
index b246c4263..ad75909eb 100644
--- a/tools/syz-execprog/execprog.go
+++ b/tools/syz-execprog/execprog.go
@@ -484,14 +484,14 @@ func checkerExecutor(ctx context.Context, source queue.Source, config *ipc.Confi
Status: queue.Success,
Info: info,
Output: output,
+ Err: err,
}
if err != nil {
res.Status = queue.ExecFailure
- res.Error = err.Error()
}
if hanged && err == nil {
res.Status = queue.ExecFailure
- res.Error = "hanged"
+ res.Err = fmt.Errorf("hanged")
}
req.Done(res)
}
diff --git a/tools/syz-runtest/runtest.go b/tools/syz-runtest/runtest.go
index 6f71519c7..5605c3616 100644
--- a/tools/syz-runtest/runtest.go
+++ b/tools/syz-runtest/runtest.go
@@ -14,11 +14,13 @@ import (
"net"
"os"
"path/filepath"
+ "slices"
"sync"
"sync/atomic"
"time"
"github.com/google/syzkaller/pkg/flatrpc"
+ "github.com/google/syzkaller/pkg/fuzzer/queue"
"github.com/google/syzkaller/pkg/instance"
"github.com/google/syzkaller/pkg/mgrconfig"
"github.com/google/syzkaller/pkg/osutil"
@@ -53,20 +55,17 @@ func main() {
}
osutil.MkdirAll(cfg.Workdir)
mgr := &Manager{
- cfg: cfg,
- vmPool: vmPool,
- checker: vminfo.New(cfg),
- reporter: reporter,
- debug: *flagDebug,
- requests: make(chan *runtest.RunRequest, 4*vmPool.Count()*cfg.Procs),
- checkResultC: make(chan *rpctype.CheckArgs, 1),
- checkProgsDone: make(chan bool),
- vmStop: make(chan bool),
- reqMap: make(map[int64]*runtest.RunRequest),
- pending: make(map[string]map[int64]bool),
+ cfg: cfg,
+ vmPool: vmPool,
+ checker: vminfo.New(cfg),
+ reporter: reporter,
+ debug: *flagDebug,
+ checkResultC: make(chan *rpctype.CheckArgs, 1),
+ vmStop: make(chan bool),
+ reqMap: make(map[int64]*queue.Request),
+ pending: make(map[string]map[int64]bool),
}
- mgr.checkFiles, mgr.checkProgs = mgr.checker.StartCheck()
- mgr.needCheckResults = len(mgr.checkProgs)
+ mgr.checkFiles = mgr.checker.RequiredFiles()
s, err := rpctype.NewRPCServer(cfg.RPC, "Manager", mgr)
if err != nil {
log.Fatalf("failed to create rpc server: %v", err)
@@ -97,8 +96,8 @@ func main() {
}()
}
checkResult := <-mgr.checkResultC
- <-mgr.checkProgsDone
- calls, _, features, err := mgr.checker.FinishCheck(checkResult.Files, mgr.checkResults, checkResult.Features)
+ mgr.source.Store(mgr.checker)
+ calls, _, features, err := mgr.checker.Run(checkResult.Files, checkResult.Features)
if err != nil {
log.Fatalf("failed to detect enabled syscalls: %v", err)
}
@@ -122,12 +121,12 @@ func main() {
Target: cfg.Target,
Features: features.Enabled(),
EnabledCalls: enabledCalls,
- Requests: mgr.requests,
LogFunc: func(text string) { fmt.Println(text) },
- Verbose: false,
+ Verbose: true,
Debug: *flagDebug,
Tests: *flagTests,
}
+ mgr.source.Store(ctx)
err = ctx.Run()
close(vm.Shutdown)
wg.Wait()
@@ -138,24 +137,20 @@ func main() {
}
type Manager struct {
- cfg *mgrconfig.Config
- vmPool *vm.Pool
- checker *vminfo.Checker
- checkFiles []string
- checkProgs []rpctype.ExecutionRequest
- checkResults []rpctype.ExecutionResult
- needCheckResults int
- checkProgsDone chan bool
- reporter *report.Reporter
- requests chan *runtest.RunRequest
- checkResultC chan *rpctype.CheckArgs
- vmStop chan bool
- port int
- debug bool
+ cfg *mgrconfig.Config
+ vmPool *vm.Pool
+ checker *vminfo.Checker
+ checkFiles []string
+ reporter *report.Reporter
+ checkResultC chan *rpctype.CheckArgs
+ vmStop chan bool
+ port int
+ debug bool
+ source queue.DynamicSource
reqMu sync.Mutex
reqSeq int64
- reqMap map[int64]*runtest.RunRequest
+ reqMap map[int64]*queue.Request
pending map[string]map[int64]bool
}
@@ -220,12 +215,15 @@ func (mgr *Manager) finishRequests(name string, rep *report.Report) error {
return fmt.Errorf("vm crash: %v\n%s\n%s", rep.Title, rep.Report, rep.Output)
}
delete(mgr.reqMap, id)
- req.Err = fmt.Errorf("%v", rep.Title)
- req.Output = rep.Report
- if len(req.Output) == 0 {
- req.Output = rep.Output
+ output := rep.Report
+ if len(output) == 0 {
+ output = rep.Output
}
- close(req.Done)
+ req.Done(&queue.Result{
+ Status: queue.Crashed,
+ Err: fmt.Errorf("%v", rep.Title),
+ Output: slices.Clone(output),
+ })
}
delete(mgr.pending, name)
return nil
@@ -254,25 +252,11 @@ func (mgr *Manager) Check(a *rpctype.CheckArgs, r *rpctype.CheckRes) error {
func (mgr *Manager) ExchangeInfo(a *rpctype.ExchangeInfoRequest, r *rpctype.ExchangeInfoReply) error {
mgr.reqMu.Lock()
defer mgr.reqMu.Unlock()
-
- select {
- case <-mgr.checkProgsDone:
- default:
- mgr.checkResults = append(mgr.checkResults, a.Results...)
- if len(mgr.checkResults) < mgr.needCheckResults {
- numRequests := min(len(mgr.checkProgs), a.NeedProgs)
- r.Requests = mgr.checkProgs[:numRequests]
- mgr.checkProgs = mgr.checkProgs[numRequests:]
- } else {
- close(mgr.checkProgsDone)
- }
- return nil
- }
-
if mgr.pending[a.Name] == nil {
mgr.pending[a.Name] = make(map[int64]bool)
}
- for _, res := range a.Results {
+ for i := range a.Results {
+ res := a.Results[i]
if !mgr.pending[a.Name][res.ID] {
log.Fatalf("runner %v wasn't executing request %v", a.Name, res.ID)
}
@@ -285,19 +269,19 @@ func (mgr *Manager) ExchangeInfo(a *rpctype.ExchangeInfoRequest, r *rpctype.Exch
if req == nil {
log.Fatalf("got done request for unknown id %v", res.ID)
}
- req.Output = res.Output
- req.Info = res.Info
+ result := &queue.Result{
+ Status: queue.Success,
+ Info: &res.Info,
+ Output: res.Output,
+ }
if res.Error != "" {
- req.Err = errors.New(res.Error)
+ result.Status = queue.ExecFailure
+ result.Err = errors.New(res.Error)
}
- close(req.Done)
+ req.Done(result)
}
for i := 0; i < a.NeedProgs; i++ {
- var req *runtest.RunRequest
- select {
- case req = <-mgr.requests:
- default:
- }
+ req := mgr.source.Next()
if req == nil {
break
}
@@ -306,10 +290,10 @@ func (mgr *Manager) ExchangeInfo(a *rpctype.ExchangeInfoRequest, r *rpctype.Exch
mgr.pending[a.Name][mgr.reqSeq] = true
var progData []byte
var err error
- if req.Bin != "" {
- progData, err = os.ReadFile(req.Bin)
+ if req.BinaryFile != "" {
+ progData, err = os.ReadFile(req.BinaryFile)
} else {
- progData, err = req.P.SerializeForExec()
+ progData, err = req.Prog.SerializeForExec()
}
if err != nil {
log.Fatal(err)
@@ -317,9 +301,9 @@ func (mgr *Manager) ExchangeInfo(a *rpctype.ExchangeInfoRequest, r *rpctype.Exch
r.Requests = append(r.Requests, rpctype.ExecutionRequest{
ID: mgr.reqSeq,
ProgData: progData,
- ExecOpts: req.Opts,
- IsBinary: req.Bin != "",
- ResetState: req.Bin == "",
+ ExecOpts: *req.ExecOpts,
+ IsBinary: req.BinaryFile != "",
+ ResetState: req.BinaryFile == "",
ReturnOutput: true,
ReturnError: true,
Repeat: req.Repeat,