From b3e0b2be97a8a30d4f6f6dac069e4cfd1b4c38cc Mon Sep 17 00:00:00 2001 From: Dmitry Vyukov Date: Sat, 13 Aug 2016 14:44:46 -0700 Subject: ipc: read out executor output If executor constantly prints something, it will eventually hang since the pipe will overflow. Read out executor output to prevent the hang. --- ipc/ipc.go | 52 ++++++++++++++++++++++++++++++++++------------------ 1 file changed, 34 insertions(+), 18 deletions(-) diff --git a/ipc/ipc.go b/ipc/ipc.go index 2d4686bfc..41d22c705 100644 --- a/ipc/ipc.go +++ b/ipc/ipc.go @@ -296,13 +296,13 @@ func closeMapping(f *os.File, mem []byte) error { } type command struct { - timeout time.Duration - cmd *exec.Cmd - flags uint64 - dir string - rp *os.File - inrp *os.File - outwp *os.File + timeout time.Duration + cmd *exec.Cmd + flags uint64 + dir string + readDone chan []byte + inrp *os.File + outwp *os.File } func makeCommand(bin []string, timeout time.Duration, flags uint64, inFile *os.File, outFile *os.File) (*command, error) { @@ -330,7 +330,6 @@ func makeCommand(bin []string, timeout time.Duration, flags uint64, inFile *os.F return nil, fmt.Errorf("failed to create pipe: %v", err) } defer wp.Close() - c.rp = rp // Input command pipe. inrp, inwp, err := os.Pipe() @@ -348,6 +347,8 @@ func makeCommand(bin []string, timeout time.Duration, flags uint64, inFile *os.F defer outrp.Close() c.outwp = outwp + c.readDone = make(chan []byte, 1) + cmd := exec.Command(bin[0], bin[1:]...) cmd.ExtraFiles = []*os.File{inFile, outFile, outrp, inwp} cmd.Env = []string{} @@ -355,6 +356,28 @@ func makeCommand(bin []string, timeout time.Duration, flags uint64, inFile *os.F if flags&FlagDebug == 0 { cmd.Stdout = wp cmd.Stderr = wp + go func(c *command) { + // Read out output in case executor constantly prints something. + const BufSize = 128 << 10 + output := make([]byte, BufSize) + size := 0 + for { + n, err := rp.Read(output[size:]) + if n > 0 { + size += n + if size >= BufSize*3/4 { + copy(output, output[size-BufSize/2:size]) + size = BufSize / 2 + } + } + if err != nil { + rp.Close() + c.readDone <- output[:size] + close(c.readDone) + return + } + } + }(c) } else { cmd.Stdout = os.Stdout cmd.Stderr = os.Stdout @@ -379,9 +402,6 @@ func (c *command) close() { } fileutil.UmountAll(c.dir) os.RemoveAll(c.dir) - if c.rp != nil { - c.rp.Close() - } if c.inrp != nil { c.inrp.Close() } @@ -415,7 +435,7 @@ func (c *command) kill() { func (c *command) exec() (output []byte, failed, hanged, restart bool, err0 error) { var tmp [1]byte if _, err := c.outwp.Write(tmp[:]); err != nil { - output, _ = ioutil.ReadAll(c.rp) + output = <-c.readDone err0 = fmt.Errorf("failed to write control pipe: %v", err) return } @@ -432,7 +452,6 @@ func (c *command) exec() (output []byte, failed, hanged, restart bool, err0 erro hang <- false } }() - //!!! handle c.rp overflow _, readErr := c.inrp.Read(tmp[:]) close(done) if readErr == nil { @@ -441,12 +460,9 @@ func (c *command) exec() (output []byte, failed, hanged, restart bool, err0 erro } err0 = fmt.Errorf("executor did not answer") c.kill() - var err error - output, err = ioutil.ReadAll(c.rp) - if err = c.cmd.Wait(); <-hang && err != nil { + output = <-c.readDone + if err := c.cmd.Wait(); <-hang && err != nil { hanged = true - } - if err != nil { output = append(output, []byte(err.Error())...) output = append(output, '\n') } -- cgit mrf-deployment