aboutsummaryrefslogtreecommitdiffstats
path: root/vm/vmimpl
diff options
context:
space:
mode:
Diffstat (limited to 'vm/vmimpl')
-rw-r--r--vm/vmimpl/merger.go14
1 files changed, 14 insertions, 0 deletions
diff --git a/vm/vmimpl/merger.go b/vm/vmimpl/merger.go
index 17b837602..b189a40f0 100644
--- a/vm/vmimpl/merger.go
+++ b/vm/vmimpl/merger.go
@@ -32,13 +32,27 @@ func (merger *OutputMerger) Wait() {
}
func (merger *OutputMerger) Add(name string, r io.ReadCloser) {
+ merger.AddDecoder(name, r, nil)
+}
+
+func (merger *OutputMerger) AddDecoder(name string, r io.ReadCloser,
+ decoder func(data []byte) (start, size int, decoded []byte)) {
merger.wg.Add(1)
go func() {
var pending []byte
+ var proto []byte
var buf [4 << 10]byte
for {
n, err := r.Read(buf[:])
if n != 0 {
+ if decoder != nil {
+ proto = append(proto, buf[:n]...)
+ start, size, decoded := decoder(proto)
+ proto = proto[start+size:]
+ if len(decoded) != 0 {
+ merger.Output <- decoded // note: this can block
+ }
+ }
pending = append(pending, buf[:n]...)
if pos := bytes.LastIndexByte(pending, '\n'); pos != -1 {
out := pending[:pos+1]