From e8e63830a60fc048df976188bf726abc32045162 Mon Sep 17 00:00:00 2001 From: Dmitry Vyukov Date: Sat, 17 Jun 2017 12:47:35 +0200 Subject: pkg/ipc: move from ipc --- pkg/ipc/gate.go | 76 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 pkg/ipc/gate.go (limited to 'pkg/ipc/gate.go') diff --git a/pkg/ipc/gate.go b/pkg/ipc/gate.go new file mode 100644 index 000000000..b1b1f1fc8 --- /dev/null +++ b/pkg/ipc/gate.go @@ -0,0 +1,76 @@ +// Copyright 2015 syzkaller project authors. All rights reserved. +// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file. + +package ipc + +import ( + "sync" +) + +// Gate limits concurrency level and window to the given value. +// Limitation of concurrency window means that if a very old activity is still +// running it will not let new activities to start even if concurrency level is low. +type Gate struct { + cv *sync.Cond + busy []bool + pos int + running int + stop bool + f func() +} + +// If f is not nil, it will be called after each batch of c activities. +func NewGate(c int, f func()) *Gate { + return &Gate{ + cv: sync.NewCond(new(sync.Mutex)), + busy: make([]bool, c), + f: f, + } +} + +func (g *Gate) Enter() int { + g.cv.L.Lock() + for g.busy[g.pos] || g.stop { + g.cv.Wait() + } + idx := g.pos + g.pos++ + if g.pos >= len(g.busy) { + g.pos = 0 + } + g.busy[idx] = true + g.running++ + if g.running > len(g.busy) { + panic("broken gate") + } + g.cv.L.Unlock() + return idx +} + +func (g *Gate) Leave(idx int) { + g.cv.L.Lock() + if !g.busy[idx] { + panic("broken gate") + } + g.busy[idx] = false + g.running-- + if g.running < 0 { + panic("broken gate") + } + if idx == 0 && g.f != nil { + if g.stop { + panic("broken gate") + } + g.stop = true + for g.running != 0 { + g.cv.Wait() + } + g.stop = false + g.f() + g.cv.Broadcast() + } + if idx == g.pos && !g.stop || g.running == 0 && g.stop { + g.cv.Broadcast() + } + g.cv.L.Unlock() +} -- cgit mrf-deployment