diff options
Diffstat (limited to 'vm/vmimpl')
| -rw-r--r-- | vm/vmimpl/merger.go | 14 |
1 files changed, 14 insertions, 0 deletions
diff --git a/vm/vmimpl/merger.go b/vm/vmimpl/merger.go index 17b837602..b189a40f0 100644 --- a/vm/vmimpl/merger.go +++ b/vm/vmimpl/merger.go @@ -32,13 +32,27 @@ func (merger *OutputMerger) Wait() { } func (merger *OutputMerger) Add(name string, r io.ReadCloser) { + merger.AddDecoder(name, r, nil) +} + +func (merger *OutputMerger) AddDecoder(name string, r io.ReadCloser, + decoder func(data []byte) (start, size int, decoded []byte)) { merger.wg.Add(1) go func() { var pending []byte + var proto []byte var buf [4 << 10]byte for { n, err := r.Read(buf[:]) if n != 0 { + if decoder != nil { + proto = append(proto, buf[:n]...) + start, size, decoded := decoder(proto) + proto = proto[start+size:] + if len(decoded) != 0 { + merger.Output <- decoded // note: this can block + } + } pending = append(pending, buf[:n]...) if pos := bytes.LastIndexByte(pending, '\n'); pos != -1 { out := pending[:pos+1] |
