diff options
| author | Aleksandr Nogikh <nogikh@google.com> | 2024-05-07 19:11:42 +0200 |
|---|---|---|
| committer | Dmitry Vyukov <dvyukov@google.com> | 2024-05-16 15:38:27 +0000 |
| commit | a10940b81777718365ee7f71495abdc69a32285b (patch) | |
| tree | 415a9d17232b2f7c09c85e7ddffc107500aeee97 | |
| parent | b6954dce2f21b8feb1448edaaeeefc22f5ff4944 (diff) | |
pkg/runtest: use queue.Request and queue.Result
There's no need to duplicate the execution mechanisms.
| -rw-r--r-- | pkg/fuzzer/queue/queue.go | 6 | ||||
| -rw-r--r-- | pkg/runtest/run.go | 129 | ||||
| -rw-r--r-- | pkg/runtest/run_test.go | 87 | ||||
| -rw-r--r-- | pkg/vminfo/features.go | 5 | ||||
| -rw-r--r-- | syz-manager/rpc.go | 6 | ||||
| -rw-r--r-- | tools/syz-execprog/execprog.go | 4 | ||||
| -rw-r--r-- | tools/syz-runtest/runtest.go | 120 |
7 files changed, 190 insertions, 167 deletions
diff --git a/pkg/fuzzer/queue/queue.go b/pkg/fuzzer/queue/queue.go index 0c1735243..14d4ace78 100644 --- a/pkg/fuzzer/queue/queue.go +++ b/pkg/fuzzer/queue/queue.go @@ -36,6 +36,10 @@ type Request struct { // This stat will be incremented on request completion. Stat *stats.Val + // Options needed by runtest. + BinaryFile string // If set, it's executed instead of Prog. + Repeat int // Repeats in addition to the first run. + // The callback will be called on request completion in the LIFO order. // If it returns false, all further processing will be stopped. // It allows wrappers to intercept Done() requests. @@ -123,7 +127,7 @@ type Result struct { Info *ipc.ProgInfo Output []byte Status Status - Error string // More details in case of ExecFailure. + Err error // More details in case of ExecFailure. } func (r *Result) clone() *Result { diff --git a/pkg/runtest/run.go b/pkg/runtest/run.go index d32e9f8d3..dc8dcd2c4 100644 --- a/pkg/runtest/run.go +++ b/pkg/runtest/run.go @@ -13,6 +13,7 @@ package runtest import ( "bufio" "bytes" + "context" "fmt" "os" "path/filepath" @@ -24,27 +25,23 @@ import ( "github.com/google/syzkaller/pkg/csource" "github.com/google/syzkaller/pkg/flatrpc" + "github.com/google/syzkaller/pkg/fuzzer/queue" "github.com/google/syzkaller/pkg/ipc" "github.com/google/syzkaller/prog" "github.com/google/syzkaller/sys/targets" ) -type RunRequest struct { - Bin string - P *prog.Prog - Opts ipc.ExecOpts - Repeat int - - Done chan struct{} - Output []byte - Info ipc.ProgInfo - Err error +type runRequest struct { + *queue.Request + err error finished chan struct{} - results *ipc.ProgInfo - name string - broken string - skip string + result *queue.Result + results *ipc.ProgInfo // the expected results + + name string + broken string + skip string } type Context struct { @@ -52,12 +49,13 @@ type Context struct { Target *prog.Target Features flatrpc.Feature EnabledCalls map[string]map[*prog.Syscall]bool - Requests chan *RunRequest LogFunc func(text string) Retries int // max number of test retries to deal with flaky tests Verbose bool Debug bool Tests string // prefix to match test file names + + executor queue.PlainQueue } func (ctx *Context) log(msg string, args ...interface{}) { @@ -65,17 +63,16 @@ func (ctx *Context) log(msg string, args ...interface{}) { } func (ctx *Context) Run() error { - defer close(ctx.Requests) if ctx.Retries%2 == 0 { ctx.Retries++ } - progs := make(chan *RunRequest, 1000+2*cap(ctx.Requests)) + progs := make(chan *runRequest, 1000) errc := make(chan error, 1) go func() { defer close(progs) errc <- ctx.generatePrograms(progs) }() - var requests []*RunRequest + var requests []*runRequest for req := range progs { req := req requests = append(requests, req) @@ -94,12 +91,10 @@ func (ctx *Context) Run() error { // In the best case this allows to get off with just 1 test run. var resultErr error for try, failed := 0, 0; try < ctx.Retries; try++ { - req.Output = nil - req.Info = ipc.ProgInfo{} - req.Done = make(chan struct{}) - ctx.Requests <- req - <-req.Done - if req.Err != nil { + ctx.executor.Submit(req.Request) + req.result = req.Request.Wait(context.Background()) + if req.result.Err != nil { + resultErr = req.result.Err break } err := checkResult(req) @@ -112,9 +107,7 @@ func (ctx *Context) Run() error { break } } - if req.Err == nil { - req.Err = resultErr - } + req.err = resultErr close(req.finished) }() } @@ -132,13 +125,14 @@ func (ctx *Context) Run() error { verbose = true } else { <-req.finished - if req.Err != nil { + if req.err != nil { fail++ result = fmt.Sprintf("FAIL: %v", - strings.Replace(req.Err.Error(), "\n", "\n\t", -1)) - if len(req.Output) != 0 { + strings.Replace(req.err.Error(), "\n", "\n\t", -1)) + res := req.result + if len(res.Output) != 0 { result += fmt.Sprintf("\n\t%s", - strings.Replace(string(req.Output), "\n", "\n\t", -1)) + strings.Replace(string(res.Output), "\n", "\n\t", -1)) } } else { ok++ @@ -148,8 +142,8 @@ func (ctx *Context) Run() error { if !verbose || ctx.Verbose { ctx.log("%-38v: %v", req.name, result) } - if req.Bin != "" { - os.Remove(req.Bin) + if req.Request != nil && req.Request.BinaryFile != "" { + os.Remove(req.BinaryFile) } } if err := <-errc; err != nil { @@ -162,7 +156,11 @@ func (ctx *Context) Run() error { return nil } -func (ctx *Context) generatePrograms(progs chan *RunRequest) error { +func (ctx *Context) Next() *queue.Request { + return ctx.executor.Next() +} + +func (ctx *Context) generatePrograms(progs chan *runRequest) error { cover := []bool{false} if ctx.Features&flatrpc.FeatureCoverage != 0 { cover = append(cover, true) @@ -201,7 +199,7 @@ func progFileList(dir, filter string) ([]string, error) { return res, nil } -func (ctx *Context) generateFile(progs chan *RunRequest, sandboxes []string, cover []bool, filename string) error { +func (ctx *Context) generateFile(progs chan *runRequest, sandboxes []string, cover []bool, filename string) error { p, requires, results, err := parseProg(ctx.Target, ctx.Dir, filename) if err != nil { return err @@ -215,7 +213,7 @@ nextSandbox: name := fmt.Sprintf("%v %v", filename, sandbox) for _, call := range p.Calls { if !ctx.EnabledCalls[sandbox][call.Meta] { - progs <- &RunRequest{ + progs <- &runRequest{ name: name, skip: fmt.Sprintf("unsupported call %v", call.Meta.Name), } @@ -267,7 +265,7 @@ nextSandbox: name += " C" if !sysTarget.ExecutorUsesForkServer && times > 1 { // Non-fork loop implementation does not support repetition. - progs <- &RunRequest{ + progs <- &runRequest{ name: name, broken: "non-forking loop", } @@ -378,7 +376,7 @@ func checkArch(requires map[string]bool, arch string) bool { return true } -func (ctx *Context) produceTest(progs chan *RunRequest, req *RunRequest, name string, +func (ctx *Context) produceTest(progs chan *runRequest, req *runRequest, name string, properties, requires map[string]bool, results *ipc.ProgInfo) { req.name = name req.results = results @@ -409,7 +407,7 @@ func match(props, requires map[string]bool) bool { return true } -func (ctx *Context) createSyzTest(p *prog.Prog, sandbox string, threaded, cov bool, times int) (*RunRequest, error) { +func (ctx *Context) createSyzTest(p *prog.Prog, sandbox string, threaded, cov bool, times int) (*runRequest, error) { var opts ipc.ExecOpts sandboxFlags, err := ipc.SandboxToFlags(sandbox) if err != nil { @@ -428,15 +426,17 @@ func (ctx *Context) createSyzTest(p *prog.Prog, sandbox string, threaded, cov bo if ctx.Debug { opts.EnvFlags |= ipc.FlagDebug } - req := &RunRequest{ - P: p, - Opts: opts, - Repeat: times, + req := &runRequest{ + Request: &queue.Request{ + Prog: p, + ExecOpts: &opts, + Repeat: times, + }, } return req, nil } -func (ctx *Context) createCTest(p *prog.Prog, sandbox string, threaded bool, times int) (*RunRequest, error) { +func (ctx *Context) createCTest(p *prog.Prog, sandbox string, threaded bool, times int) (*runRequest, error) { opts := csource.Options{ Threaded: threaded, Repeat: times > 1, @@ -479,29 +479,34 @@ func (ctx *Context) createCTest(p *prog.Prog, sandbox string, threaded bool, tim if threaded { ipcFlags |= ipc.FlagThreaded } - req := &RunRequest{ - P: p, - Bin: bin, - Opts: ipc.ExecOpts{ - ExecFlags: ipcFlags, + req := &runRequest{ + Request: &queue.Request{ + Prog: p, + BinaryFile: bin, + ExecOpts: &ipc.ExecOpts{ + ExecFlags: ipcFlags, + }, + Repeat: times, }, - Repeat: times, } return req, nil } -func checkResult(req *RunRequest) error { +func checkResult(req *runRequest) error { + if req.result.Status != queue.Success { + return fmt.Errorf("non-successful result status (%v)", req.result.Status) + } var infos []ipc.ProgInfo - isC := req.Bin != "" + isC := req.BinaryFile != "" if isC { var err error if infos, err = parseBinOutput(req); err != nil { return err } } else { - raw := req.Info + raw := req.result.Info for len(raw.Calls) != 0 { - ncalls := min(len(raw.Calls), len(req.P.Calls)) + ncalls := min(len(raw.Calls), len(req.Prog.Calls)) infos = append(infos, ipc.ProgInfo{ Extra: raw.Extra, Calls: raw.Calls[:ncalls], @@ -511,7 +516,7 @@ func checkResult(req *RunRequest) error { } if req.Repeat != len(infos) { return fmt.Errorf("should repeat %v times, but repeated %v, prog calls %v, info calls %v\n%s", - req.Repeat, len(infos), req.P.Calls, len(req.Info.Calls), req.Output) + req.Repeat, len(infos), req.Prog.Calls, len(req.result.Info.Calls), req.result.Output) } calls := make(map[string]bool) for run, info := range infos { @@ -524,7 +529,7 @@ func checkResult(req *RunRequest) error { return nil } -func checkCallResult(req *RunRequest, isC bool, run, call int, info ipc.ProgInfo, calls map[string]bool) error { +func checkCallResult(req *runRequest, isC bool, run, call int, info ipc.ProgInfo, calls map[string]bool) error { inf := info.Calls[call] want := req.results.Calls[call] for flag, what := range map[ipc.CallFlags]string{ @@ -537,7 +542,7 @@ func checkCallResult(req *RunRequest, isC bool, run, call int, info ipc.ProgInfo // C code does not detect blocked/non-finished calls. continue } - if req.Opts.ExecFlags&ipc.FlagThreaded == 0 { + if req.ExecOpts.ExecFlags&ipc.FlagThreaded == 0 { // In non-threaded mode blocked syscalls will block main thread // and we won't detect blocked/unfinished syscalls. continue @@ -563,12 +568,12 @@ func checkCallResult(req *RunRequest, isC bool, run, call int, info ipc.ProgInfo if isC || inf.Flags&ipc.CallExecuted == 0 { return nil } - if req.Opts.EnvFlags&ipc.FlagSignal != 0 { + if req.ExecOpts.EnvFlags&ipc.FlagSignal != 0 { // Signal is always deduplicated, so we may not get any signal // on a second invocation of the same syscall. // For calls that are not meant to collect synchronous coverage we // allow the signal to be empty as long as the extra signal is not. - callName := req.P.Calls[call].Meta.CallName + callName := req.Prog.Calls[call].Meta.CallName if len(inf.Signal) < 2 && !calls[callName] && len(info.Extra.Signal) == 0 { return fmt.Errorf("run %v: call %v: no signal", run, call) } @@ -586,13 +591,13 @@ func checkCallResult(req *RunRequest, isC bool, run, call int, info ipc.ProgInfo return nil } -func parseBinOutput(req *RunRequest) ([]ipc.ProgInfo, error) { +func parseBinOutput(req *runRequest) ([]ipc.ProgInfo, error) { var infos []ipc.ProgInfo - s := bufio.NewScanner(bytes.NewReader(req.Output)) + s := bufio.NewScanner(bytes.NewReader(req.result.Output)) re := regexp.MustCompile("^### call=([0-9]+) errno=([0-9]+)$") for s.Scan() { if s.Text() == "### start" { - infos = append(infos, ipc.ProgInfo{Calls: make([]ipc.CallInfo, len(req.P.Calls))}) + infos = append(infos, ipc.ProgInfo{Calls: make([]ipc.CallInfo, len(req.Prog.Calls))}) } match := re.FindSubmatch(s.Bytes()) if match == nil { diff --git a/pkg/runtest/run_test.go b/pkg/runtest/run_test.go index f5798a9b4..09b714e1e 100644 --- a/pkg/runtest/run_test.go +++ b/pkg/runtest/run_test.go @@ -4,6 +4,7 @@ package runtest import ( + "context" "errors" "flag" "fmt" @@ -14,6 +15,7 @@ import ( "time" "github.com/google/syzkaller/pkg/csource" + "github.com/google/syzkaller/pkg/fuzzer/queue" "github.com/google/syzkaller/pkg/ipc" "github.com/google/syzkaller/pkg/osutil" "github.com/google/syzkaller/pkg/testutil" @@ -62,24 +64,12 @@ func test(t *testing.T, sysTarget *targets.Target) { "": calls, "none": calls, } - requests := make(chan *RunRequest, 2*runtime.GOMAXPROCS(0)) - go func() { - for req := range requests { - if req.Bin != "" { - runTestC(req) - } else { - runTest(req, executor) - } - close(req.Done) - } - }() ctx := &Context{ Dir: filepath.Join("..", "..", "sys", target.OS, targets.TestOS), Target: target, Tests: *flagFilter, Features: 0, EnabledCalls: enabledCalls, - Requests: requests, LogFunc: func(text string) { t.Helper() t.Logf(text) @@ -88,66 +78,99 @@ func test(t *testing.T, sysTarget *targets.Target) { Verbose: true, Debug: *flagDebug, } + + executorCtx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + go func() { + for { + select { + case <-time.After(time.Millisecond): + case <-executorCtx.Done(): + return + } + req := ctx.Next() + if req == nil { + continue + } + if req.BinaryFile != "" { + req.Done(runTestC(req)) + } else { + req.Done(runTest(req, executor)) + } + } + }() if err := ctx.Run(); err != nil { t.Fatal(err) } } -func runTest(req *RunRequest, executor string) { +func runTest(req *queue.Request, executor string) *queue.Result { cfg := new(ipc.Config) - sysTarget := targets.Get(req.P.Target.OS, req.P.Target.Arch) + sysTarget := targets.Get(req.Prog.Target.OS, req.Prog.Target.Arch) cfg.UseShmem = sysTarget.ExecutorUsesShmem cfg.UseForkServer = sysTarget.ExecutorUsesForkServer cfg.Timeouts = sysTarget.Timeouts(1) cfg.Executor = executor env, err := ipc.MakeEnv(cfg, 0) if err != nil { - req.Err = fmt.Errorf("failed to create ipc env: %w", err) - return + return &queue.Result{ + Status: queue.ExecFailure, + Err: fmt.Errorf("failed to create ipc env: %w", err), + } } defer env.Close() + ret := &queue.Result{Status: queue.Success} for run := 0; run < req.Repeat; run++ { if run%2 == 0 { // Recreate Env every few iterations, this allows to cover more paths. env.ForceRestart() } - output, info, hanged, err := env.Exec(&req.Opts, req.P) - req.Output = append(req.Output, output...) + output, info, hanged, err := env.Exec(req.ExecOpts, req.Prog) + ret.Output = append(ret.Output, output...) if err != nil { - req.Err = fmt.Errorf("run %v: failed to run: %w", run, err) - return + return &queue.Result{ + Status: queue.ExecFailure, + Err: fmt.Errorf("run %v: failed to run: %w", run, err), + } } if hanged { - req.Err = fmt.Errorf("run %v: hanged", run) - return + return &queue.Result{ + Status: queue.ExecFailure, + Err: fmt.Errorf("run %v: hanged", run), + } } if run == 0 { - req.Info = *info + ret.Info = info } else { - req.Info.Calls = append(req.Info.Calls, info.Calls...) + ret.Info.Calls = append(ret.Info.Calls, info.Calls...) } } + return ret } -func runTestC(req *RunRequest) { +func runTestC(req *queue.Request) *queue.Result { tmpDir, err := os.MkdirTemp("", "syz-runtest") if err != nil { - req.Err = fmt.Errorf("failed to create temp dir: %w", err) - return + return &queue.Result{ + Status: queue.ExecFailure, + Err: fmt.Errorf("failed to create temp dir: %w", err), + } } defer os.RemoveAll(tmpDir) - cmd := osutil.Command(req.Bin) + cmd := osutil.Command(req.BinaryFile) cmd.Dir = tmpDir // Tell ASAN to not mess with our NONFAILING. cmd.Env = append(append([]string{}, os.Environ()...), "ASAN_OPTIONS=handle_segv=0 allow_user_segv_handler=1") - req.Output, req.Err = osutil.Run(20*time.Second, cmd) + res := &queue.Result{} + res.Output, res.Err = osutil.Run(20*time.Second, cmd) var verr *osutil.VerboseError - if errors.As(req.Err, &verr) { + if errors.As(res.Err, &verr) { // The process can legitimately do something like exit_group(1). // So we ignore the error and rely on the rest of the checks (e.g. syscall return values). - req.Err = nil - req.Output = verr.Output + res.Err = nil + res.Output = verr.Output } + return res } func TestParsing(t *testing.T) { diff --git a/pkg/vminfo/features.go b/pkg/vminfo/features.go index ab71e1168..fb443774c 100644 --- a/pkg/vminfo/features.go +++ b/pkg/vminfo/features.go @@ -176,7 +176,10 @@ func (ctx *checkContext) featureSucceeded(feat flatrpc.Feature, testProg *prog.P if len(res.Output) != 0 { return string(res.Output) } - return res.Error + if res.Err != nil { + return res.Err.Error() + } + return "test program execution failed" } if len(res.Info.Calls) != len(testProg.Calls) { return fmt.Sprintf("only %v calls are executed out of %v", diff --git a/syz-manager/rpc.go b/syz-manager/rpc.go index 4fda07f07..92b2f8b1e 100644 --- a/syz-manager/rpc.go +++ b/syz-manager/rpc.go @@ -509,10 +509,14 @@ func (serv *RPCServer) doneRequest(runner *Runner, resp rpctype.ExecutionResult) } info.Extra.Cover = runner.instModules.Canonicalize(info.Extra.Cover) info.Extra.Signal = runner.instModules.Canonicalize(info.Extra.Signal) + var err error + if resp.Error != "" { + err = fmt.Errorf("%s", resp.Error) + } req.req.Done(&queue.Result{ Info: info, Output: resp.Output, - Error: resp.Error, + Err: err, }) } diff --git a/tools/syz-execprog/execprog.go b/tools/syz-execprog/execprog.go index b246c4263..ad75909eb 100644 --- a/tools/syz-execprog/execprog.go +++ b/tools/syz-execprog/execprog.go @@ -484,14 +484,14 @@ func checkerExecutor(ctx context.Context, source queue.Source, config *ipc.Confi Status: queue.Success, Info: info, Output: output, + Err: err, } if err != nil { res.Status = queue.ExecFailure - res.Error = err.Error() } if hanged && err == nil { res.Status = queue.ExecFailure - res.Error = "hanged" + res.Err = fmt.Errorf("hanged") } req.Done(res) } diff --git a/tools/syz-runtest/runtest.go b/tools/syz-runtest/runtest.go index 6f71519c7..5605c3616 100644 --- a/tools/syz-runtest/runtest.go +++ b/tools/syz-runtest/runtest.go @@ -14,11 +14,13 @@ import ( "net" "os" "path/filepath" + "slices" "sync" "sync/atomic" "time" "github.com/google/syzkaller/pkg/flatrpc" + "github.com/google/syzkaller/pkg/fuzzer/queue" "github.com/google/syzkaller/pkg/instance" "github.com/google/syzkaller/pkg/mgrconfig" "github.com/google/syzkaller/pkg/osutil" @@ -53,20 +55,17 @@ func main() { } osutil.MkdirAll(cfg.Workdir) mgr := &Manager{ - cfg: cfg, - vmPool: vmPool, - checker: vminfo.New(cfg), - reporter: reporter, - debug: *flagDebug, - requests: make(chan *runtest.RunRequest, 4*vmPool.Count()*cfg.Procs), - checkResultC: make(chan *rpctype.CheckArgs, 1), - checkProgsDone: make(chan bool), - vmStop: make(chan bool), - reqMap: make(map[int64]*runtest.RunRequest), - pending: make(map[string]map[int64]bool), + cfg: cfg, + vmPool: vmPool, + checker: vminfo.New(cfg), + reporter: reporter, + debug: *flagDebug, + checkResultC: make(chan *rpctype.CheckArgs, 1), + vmStop: make(chan bool), + reqMap: make(map[int64]*queue.Request), + pending: make(map[string]map[int64]bool), } - mgr.checkFiles, mgr.checkProgs = mgr.checker.StartCheck() - mgr.needCheckResults = len(mgr.checkProgs) + mgr.checkFiles = mgr.checker.RequiredFiles() s, err := rpctype.NewRPCServer(cfg.RPC, "Manager", mgr) if err != nil { log.Fatalf("failed to create rpc server: %v", err) @@ -97,8 +96,8 @@ func main() { }() } checkResult := <-mgr.checkResultC - <-mgr.checkProgsDone - calls, _, features, err := mgr.checker.FinishCheck(checkResult.Files, mgr.checkResults, checkResult.Features) + mgr.source.Store(mgr.checker) + calls, _, features, err := mgr.checker.Run(checkResult.Files, checkResult.Features) if err != nil { log.Fatalf("failed to detect enabled syscalls: %v", err) } @@ -122,12 +121,12 @@ func main() { Target: cfg.Target, Features: features.Enabled(), EnabledCalls: enabledCalls, - Requests: mgr.requests, LogFunc: func(text string) { fmt.Println(text) }, - Verbose: false, + Verbose: true, Debug: *flagDebug, Tests: *flagTests, } + mgr.source.Store(ctx) err = ctx.Run() close(vm.Shutdown) wg.Wait() @@ -138,24 +137,20 @@ func main() { } type Manager struct { - cfg *mgrconfig.Config - vmPool *vm.Pool - checker *vminfo.Checker - checkFiles []string - checkProgs []rpctype.ExecutionRequest - checkResults []rpctype.ExecutionResult - needCheckResults int - checkProgsDone chan bool - reporter *report.Reporter - requests chan *runtest.RunRequest - checkResultC chan *rpctype.CheckArgs - vmStop chan bool - port int - debug bool + cfg *mgrconfig.Config + vmPool *vm.Pool + checker *vminfo.Checker + checkFiles []string + reporter *report.Reporter + checkResultC chan *rpctype.CheckArgs + vmStop chan bool + port int + debug bool + source queue.DynamicSource reqMu sync.Mutex reqSeq int64 - reqMap map[int64]*runtest.RunRequest + reqMap map[int64]*queue.Request pending map[string]map[int64]bool } @@ -220,12 +215,15 @@ func (mgr *Manager) finishRequests(name string, rep *report.Report) error { return fmt.Errorf("vm crash: %v\n%s\n%s", rep.Title, rep.Report, rep.Output) } delete(mgr.reqMap, id) - req.Err = fmt.Errorf("%v", rep.Title) - req.Output = rep.Report - if len(req.Output) == 0 { - req.Output = rep.Output + output := rep.Report + if len(output) == 0 { + output = rep.Output } - close(req.Done) + req.Done(&queue.Result{ + Status: queue.Crashed, + Err: fmt.Errorf("%v", rep.Title), + Output: slices.Clone(output), + }) } delete(mgr.pending, name) return nil @@ -254,25 +252,11 @@ func (mgr *Manager) Check(a *rpctype.CheckArgs, r *rpctype.CheckRes) error { func (mgr *Manager) ExchangeInfo(a *rpctype.ExchangeInfoRequest, r *rpctype.ExchangeInfoReply) error { mgr.reqMu.Lock() defer mgr.reqMu.Unlock() - - select { - case <-mgr.checkProgsDone: - default: - mgr.checkResults = append(mgr.checkResults, a.Results...) - if len(mgr.checkResults) < mgr.needCheckResults { - numRequests := min(len(mgr.checkProgs), a.NeedProgs) - r.Requests = mgr.checkProgs[:numRequests] - mgr.checkProgs = mgr.checkProgs[numRequests:] - } else { - close(mgr.checkProgsDone) - } - return nil - } - if mgr.pending[a.Name] == nil { mgr.pending[a.Name] = make(map[int64]bool) } - for _, res := range a.Results { + for i := range a.Results { + res := a.Results[i] if !mgr.pending[a.Name][res.ID] { log.Fatalf("runner %v wasn't executing request %v", a.Name, res.ID) } @@ -285,19 +269,19 @@ func (mgr *Manager) ExchangeInfo(a *rpctype.ExchangeInfoRequest, r *rpctype.Exch if req == nil { log.Fatalf("got done request for unknown id %v", res.ID) } - req.Output = res.Output - req.Info = res.Info + result := &queue.Result{ + Status: queue.Success, + Info: &res.Info, + Output: res.Output, + } if res.Error != "" { - req.Err = errors.New(res.Error) + result.Status = queue.ExecFailure + result.Err = errors.New(res.Error) } - close(req.Done) + req.Done(result) } for i := 0; i < a.NeedProgs; i++ { - var req *runtest.RunRequest - select { - case req = <-mgr.requests: - default: - } + req := mgr.source.Next() if req == nil { break } @@ -306,10 +290,10 @@ func (mgr *Manager) ExchangeInfo(a *rpctype.ExchangeInfoRequest, r *rpctype.Exch mgr.pending[a.Name][mgr.reqSeq] = true var progData []byte var err error - if req.Bin != "" { - progData, err = os.ReadFile(req.Bin) + if req.BinaryFile != "" { + progData, err = os.ReadFile(req.BinaryFile) } else { - progData, err = req.P.SerializeForExec() + progData, err = req.Prog.SerializeForExec() } if err != nil { log.Fatal(err) @@ -317,9 +301,9 @@ func (mgr *Manager) ExchangeInfo(a *rpctype.ExchangeInfoRequest, r *rpctype.Exch r.Requests = append(r.Requests, rpctype.ExecutionRequest{ ID: mgr.reqSeq, ProgData: progData, - ExecOpts: req.Opts, - IsBinary: req.Bin != "", - ResetState: req.Bin == "", + ExecOpts: *req.ExecOpts, + IsBinary: req.BinaryFile != "", + ResetState: req.BinaryFile == "", ReturnOutput: true, ReturnError: true, Repeat: req.Repeat, |
