diff options
| author | Taras Madan <tarasmadan@gmail.com> | 2021-10-14 14:14:27 +0000 |
|---|---|---|
| committer | tarasmadan <89859571+tarasmadan@users.noreply.github.com> | 2021-10-15 09:52:17 +0200 |
| commit | fc538d9dc19d1d8d0c0301ca73f269e71d978c85 (patch) | |
| tree | 9ae1d8d6a010dcb91aca731d7822870cc1e5d3ac /syz-verifier | |
| parent | 0bd8b1dd5124f72465a85feea4044d98b2571d6d (diff) | |
syz-verifier: add rpcserver.go and move all the related stuff there
Diffstat (limited to 'syz-verifier')
| -rwxr-xr-x | syz-verifier/main.go | 204 | ||||
| -rw-r--r-- | syz-verifier/rpcserver.go | 212 |
2 files changed, 214 insertions, 202 deletions
diff --git a/syz-verifier/main.go b/syz-verifier/main.go index b7b11b5c6..737d336b6 100755 --- a/syz-verifier/main.go +++ b/syz-verifier/main.go @@ -11,13 +11,11 @@ import ( "fmt" "io" "math/rand" - "net" "os" "os/signal" "path/filepath" "strconv" "strings" - "sync" "time" "github.com/google/syzkaller/pkg/instance" @@ -25,7 +23,6 @@ import ( "github.com/google/syzkaller/pkg/mgrconfig" "github.com/google/syzkaller/pkg/osutil" "github.com/google/syzkaller/pkg/report" - "github.com/google/syzkaller/pkg/rpctype" "github.com/google/syzkaller/pkg/tool" "github.com/google/syzkaller/prog" "github.com/google/syzkaller/vm" @@ -65,18 +62,7 @@ type Verifier struct { reruns int } -// RPCServer is a wrapper around the rpc.Server. It communicates with Runners, -// generates programs and sends complete Results for verification. -type RPCServer struct { - vrf *Verifier - port int - mu sync.Mutex - cond *sync.Cond - pools map[int]*poolInfo - progs map[int]*progInfo - notChecked int - rerunsAvailable *sync.Cond -} + // poolInfo contains kernel-specific information for spawning virtual machines // and reporting crashes. It also keeps track of the Runners executing on @@ -313,72 +299,6 @@ func (vrf *Verifier) createAndManageInstance(pi *poolInfo, idx int) { log.Logf(0, "reboot the VM in pool %d", idx) } -func startRPCServer(vrf *Verifier) (*RPCServer, error) { - srv := &RPCServer{ - vrf: vrf, - pools: vrf.pools, - progs: make(map[int]*progInfo), - notChecked: len(vrf.pools), - } - srv.cond = sync.NewCond(&srv.mu) - srv.rerunsAvailable = sync.NewCond(&srv.mu) - - s, err := rpctype.NewRPCServer(vrf.addr, "Verifier", srv) - if err != nil { - return nil, err - } - - log.Logf(0, "serving rpc on tcp://%v", s.Addr()) - srv.port = s.Addr().(*net.TCPAddr).Port - - go s.Serve() - return srv, nil -} - -// Connect notifies the RPCServer that a new Runner was started. -func (srv *RPCServer) Connect(a *rpctype.RunnerConnectArgs, r *rpctype.RunnerConnectRes) error { - srv.mu.Lock() - defer srv.mu.Unlock() - pool, vm := a.Pool, a.VM - srv.pools[pool].runners[vm] = make(runnerProgs) - r.CheckUnsupportedCalls = !srv.pools[pool].checked - return nil -} - -// UpdateUnsupported communicates to the server the list of system calls not -// supported by the kernel corresponding to this pool and updates the list of -// enabled system calls. This function is called once for each kernel. -// When all kernels have reported the list of unsupported system calls, the -// choice table will be created using only the system calls supported by all -// kernels. -func (srv *RPCServer) UpdateUnsupported(a *rpctype.UpdateUnsupportedArgs, r *int) error { - srv.mu.Lock() - defer srv.mu.Unlock() - if srv.pools[a.Pool].checked { - return nil - } - srv.pools[a.Pool].checked = true - vrf := srv.vrf - - for _, unsupported := range a.UnsupportedCalls { - if c := vrf.target.Syscalls[unsupported.ID]; vrf.calls[c] { - vrf.reasons[c] = unsupported.Reason - } - } - - srv.notChecked-- - if srv.notChecked == 0 { - vrf.finalizeCallSet(os.Stdout) - - vrf.stats = InitStats(vrf.calls) - vrf.SetPrintStatAtSIGINT() - - vrf.choiceTable = vrf.target.BuildChoiceTable(nil, vrf.calls) - srv.cond.Signal() - } - return nil -} - // finalizeCallSet removes the system calls that are not supported from the set // of enabled system calls and reports the reason to the io.Writer (either // because the call is not supported by one of the kernels or because the call @@ -412,69 +332,6 @@ func (vrf *Verifier) finalizeCallSet(w io.Writer) { } } -// NextExchange is called when a Runner requests a new program to execute and, -// potentially, wants to send a new Result to the RPCServer. -func (srv *RPCServer) NextExchange(a *rpctype.NextExchangeArgs, r *rpctype.NextExchangeRes) error { - srv.mu.Lock() - defer srv.mu.Unlock() - - var res *Result - var prog *progInfo - if a.Info.Calls != nil { - res = &Result{ - Pool: a.Pool, - Hanged: a.Hanged, - Info: a.Info, - RunIdx: a.RunIdx, - } - - prog = srv.progs[a.ProgIdx] - if prog == nil { - // This case can happen if both conditions are true: - // 1. a Runner calls Verifier.NextExchange, then crashes, - // its corresponding Pool being the only one that hasn't - // sent results for the program yet - // 2.the cleanup call for the crash got the server's mutex before - // the NextExchange call. - // As the pool was the only one that hasn't sent the result, the - // cleanup call has already removed prog from srv.progs by the time - // the NextExchange call gets the server's mutex, which is why the - // variable is nil. As the results for this program have already - // been sent for verification, we discard this one. - return nil - } - - delete(srv.pools[a.Pool].runners[a.VM], prog.idx) - if srv.newResult(res, prog) { - if srv.vrf.processResults(prog) { - delete(srv.progs, prog.idx) - } - } - } - - if srv.notChecked > 0 { - // Runner is blocked until the choice table is created. - srv.cond.Wait() - } - - newProg, pi, ri := srv.newProgram(a.Pool, a.VM) - r.RPCProg = rpctype.RPCProg{Prog: newProg, ProgIdx: pi, RunIdx: ri} - return nil -} - -// newResult is called when a Runner sends a new Result. It returns true if all -// Results from the corresponding programs have been received and they can be -// sent for verification. Otherwise, it returns false. -func (srv *RPCServer) newResult(res *Result, prog *progInfo) bool { - ri := prog.runIdx - if prog.res[ri][res.Pool] != nil { - return false - } - prog.res[ri][res.Pool] = res - prog.received++ - return prog.received == len(srv.pools) -} - // processResults will send a set of complete results for verification and, in // case differences are found, it will start the rerun process for the program // (if reruns are enabled). If every rerun produces the same results, the result @@ -547,15 +404,6 @@ func (vrf *Verifier) processResults(prog *progInfo) bool { return true } -func (srv *RPCServer) newRun(p *progInfo) { - p.runIdx++ - p.received = 0 - p.res[p.runIdx] = make([]*Result, len(srv.pools)) - for _, pool := range srv.pools { - pool.toRerun = append(pool.toRerun, p) - } -} - func createReport(rr *ResultReport, pools int) []byte { calls := strings.Split(rr.Prog, "\n") calls = calls[:len(calls)-1] @@ -580,56 +428,8 @@ func createReport(rr *ResultReport, pools int) []byte { return []byte(data) } -// newProgram returns a new program for the Runner identified by poolIdx and -// vmIdx and the program's index. -func (srv *RPCServer) newProgram(poolIdx, vmIdx int) ([]byte, int, int) { - pool := srv.pools[poolIdx] - - if len(pool.toRerun) != 0 { - p := pool.toRerun[0] - pool.runners[vmIdx][p.idx] = p - pool.toRerun = pool.toRerun[1:] - return p.serialized, p.idx, p.runIdx - } - - if len(pool.progs) == 0 { - prog, progIdx := srv.vrf.generate() - pi := &progInfo{ - prog: prog, - idx: progIdx, - serialized: prog.Serialize(), - res: make([][]*Result, srv.vrf.reruns), - } - pi.res[0] = make([]*Result, len(srv.pools)) - for _, pool := range srv.pools { - pool.progs = append(pool.progs, pi) - } - srv.progs[progIdx] = pi - } - p := pool.progs[0] - pool.runners[vmIdx][p.idx] = p - pool.progs = pool.progs[1:] - return p.serialized, p.idx, p.runIdx -} - // generate will return a newly generated program and its index. func (vrf *Verifier) generate() (*prog.Prog, int) { vrf.progIdx++ return vrf.target.Generate(vrf.rnd, prog.RecommendedCalls, vrf.choiceTable), vrf.progIdx -} - -// cleanup is called when a vm.Instance crashes. -func (srv *RPCServer) cleanup(poolIdx, vmIdx int) { - srv.mu.Lock() - defer srv.mu.Unlock() - progs := srv.pools[poolIdx].runners[vmIdx] - - for _, prog := range progs { - if srv.newResult(&Result{Pool: poolIdx, Crashed: true}, prog) { - srv.vrf.processResults(prog) - delete(srv.progs, prog.idx) - delete(srv.pools[poolIdx].runners[vmIdx], prog.idx) - continue - } - } -} +}
\ No newline at end of file diff --git a/syz-verifier/rpcserver.go b/syz-verifier/rpcserver.go new file mode 100644 index 000000000..f4956da88 --- /dev/null +++ b/syz-verifier/rpcserver.go @@ -0,0 +1,212 @@ +// Copyright 2021 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package main + +import ( + "net" + "os" + "sync" + + "github.com/google/syzkaller/pkg/log" + "github.com/google/syzkaller/pkg/rpctype" +) + +// RPCServer is a wrapper around the rpc.Server. It communicates with Runners, +// generates programs and sends complete Results for verification. +type RPCServer struct { + vrf *Verifier + port int + mu sync.Mutex + cond *sync.Cond + pools map[int]*poolInfo + progs map[int]*progInfo + notChecked int + rerunsAvailable *sync.Cond +} + +func startRPCServer(vrf *Verifier) (*RPCServer, error) { + srv := &RPCServer{ + vrf: vrf, + pools: vrf.pools, + progs: make(map[int]*progInfo), + notChecked: len(vrf.pools), + } + srv.cond = sync.NewCond(&srv.mu) + srv.rerunsAvailable = sync.NewCond(&srv.mu) + + s, err := rpctype.NewRPCServer(vrf.addr, "Verifier", srv) + if err != nil { + return nil, err + } + + log.Logf(0, "serving rpc on tcp://%v", s.Addr()) + srv.port = s.Addr().(*net.TCPAddr).Port + + go s.Serve() + return srv, nil +} + +// Connect notifies the RPCServer that a new Runner was started. +func (srv *RPCServer) Connect(a *rpctype.RunnerConnectArgs, r *rpctype.RunnerConnectRes) error { + srv.mu.Lock() + defer srv.mu.Unlock() + pool, vm := a.Pool, a.VM + srv.pools[pool].runners[vm] = make(runnerProgs) + r.CheckUnsupportedCalls = !srv.pools[pool].checked + return nil +} + +// UpdateUnsupported communicates to the server the list of system calls not +// supported by the kernel corresponding to this pool and updates the list of +// enabled system calls. This function is called once for each kernel. +// When all kernels have reported the list of unsupported system calls, the +// choice table will be created using only the system calls supported by all +// kernels. +func (srv *RPCServer) UpdateUnsupported(a *rpctype.UpdateUnsupportedArgs, r *int) error { + srv.mu.Lock() + defer srv.mu.Unlock() + if srv.pools[a.Pool].checked { + return nil + } + srv.pools[a.Pool].checked = true + vrf := srv.vrf + + for _, unsupported := range a.UnsupportedCalls { + if c := vrf.target.Syscalls[unsupported.ID]; vrf.calls[c] { + vrf.reasons[c] = unsupported.Reason + } + } + + srv.notChecked-- + if srv.notChecked == 0 { + vrf.finalizeCallSet(os.Stdout) + + vrf.stats = InitStats(vrf.calls) + vrf.SetPrintStatAtSIGINT() + + vrf.choiceTable = vrf.target.BuildChoiceTable(nil, vrf.calls) + srv.cond.Signal() + } + return nil +} + +// NextExchange is called when a Runner requests a new program to execute and, +// potentially, wants to send a new Result to the RPCServer. +func (srv *RPCServer) NextExchange(a *rpctype.NextExchangeArgs, r *rpctype.NextExchangeRes) error { + srv.mu.Lock() + defer srv.mu.Unlock() + + var res *Result + var prog *progInfo + if a.Info.Calls != nil { + res = &Result{ + Pool: a.Pool, + Hanged: a.Hanged, + Info: a.Info, + RunIdx: a.RunIdx, + } + + prog = srv.progs[a.ProgIdx] + if prog == nil { + // This case can happen if both conditions are true: + // 1. a Runner calls Verifier.NextExchange, then crashes, + // its corresponding Pool being the only one that hasn't + // sent results for the program yet + // 2.the cleanup call for the crash got the server's mutex before + // the NextExchange call. + // As the pool was the only one that hasn't sent the result, the + // cleanup call has already removed prog from srv.progs by the time + // the NextExchange call gets the server's mutex, which is why the + // variable is nil. As the results for this program have already + // been sent for verification, we discard this one. + return nil + } + + delete(srv.pools[a.Pool].runners[a.VM], prog.idx) + if srv.newResult(res, prog) { + if srv.vrf.processResults(prog) { + delete(srv.progs, prog.idx) + } + } + } + + if srv.notChecked > 0 { + // Runner is blocked until the choice table is created. + srv.cond.Wait() + } + + newProg, pi, ri := srv.newProgram(a.Pool, a.VM) + r.RPCProg = rpctype.RPCProg{Prog: newProg, ProgIdx: pi, RunIdx: ri} + return nil +} + +// newResult is called when a Runner sends a new Result. It returns true if all +// Results from the corresponding programs have been received and they can be +// sent for verification. Otherwise, it returns false. +func (srv *RPCServer) newResult(res *Result, prog *progInfo) bool { + ri := prog.runIdx + if prog.res[ri][res.Pool] != nil { + return false + } + prog.res[ri][res.Pool] = res + prog.received++ + return prog.received == len(srv.pools) +} + +func (srv *RPCServer) newRun(p *progInfo) { + p.runIdx++ + p.received = 0 + p.res[p.runIdx] = make([]*Result, len(srv.pools)) + for _, pool := range srv.pools { + pool.toRerun = append(pool.toRerun, p) + } +} + +// newProgram returns a new program for the Runner identified by poolIdx and +// vmIdx and the program's index. +func (srv *RPCServer) newProgram(poolIdx, vmIdx int) ([]byte, int, int) { + pool := srv.pools[poolIdx] + + if len(pool.toRerun) != 0 { + p := pool.toRerun[0] + pool.runners[vmIdx][p.idx] = p + pool.toRerun = pool.toRerun[1:] + return p.serialized, p.idx, p.runIdx + } + + if len(pool.progs) == 0 { + prog, progIdx := srv.vrf.generate() + pi := &progInfo{ + prog: prog, + idx: progIdx, + serialized: prog.Serialize(), + res: make([][]*Result, srv.vrf.reruns), + } + pi.res[0] = make([]*Result, len(srv.pools)) + for _, pool := range srv.pools { + pool.progs = append(pool.progs, pi) + } + srv.progs[progIdx] = pi + } + p := pool.progs[0] + pool.runners[vmIdx][p.idx] = p + pool.progs = pool.progs[1:] + return p.serialized, p.idx, p.runIdx +} + +// cleanup is called when a vm.Instance crashes. +func (srv *RPCServer) cleanup(poolIdx, vmIdx int) { + srv.mu.Lock() + defer srv.mu.Unlock() + progs := srv.pools[poolIdx].runners[vmIdx] + + for _, prog := range progs { + if srv.newResult(&Result{Pool: poolIdx, Crashed: true}, prog) { + srv.vrf.processResults(prog) + delete(srv.progs, prog.idx) + delete(srv.pools[poolIdx].runners[vmIdx], prog.idx) + continue + } + } +} |
