From b41e96b421acda1761e9ba2ee17da16efad436cd Mon Sep 17 00:00:00 2001 From: Dmitry Vyukov Date: Tue, 13 Jun 2017 20:23:21 +0200 Subject: vendor: switch from dep to godep dep tool vendored too much code (100MB) including tests and unused packages. godep vendored significantly less (12MB) without tests and unused packages. The main advantage is that pre-Go1.9 toolchain does not run tests of all vendor packages now. --- vendor/cloud.google.com/go/pubsub/topic.go | 358 ----------------------------- 1 file changed, 358 deletions(-) delete mode 100644 vendor/cloud.google.com/go/pubsub/topic.go (limited to 'vendor/cloud.google.com/go/pubsub/topic.go') diff --git a/vendor/cloud.google.com/go/pubsub/topic.go b/vendor/cloud.google.com/go/pubsub/topic.go deleted file mode 100644 index 9e3c2afff..000000000 --- a/vendor/cloud.google.com/go/pubsub/topic.go +++ /dev/null @@ -1,358 +0,0 @@ -// Copyright 2016 Google Inc. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pubsub - -import ( - "errors" - "fmt" - "runtime" - "strings" - "sync" - "time" - - "cloud.google.com/go/iam" - "github.com/golang/protobuf/proto" - "golang.org/x/net/context" - "google.golang.org/api/support/bundler" - pb "google.golang.org/genproto/googleapis/pubsub/v1" -) - -const ( - // The maximum number of messages that can be in a single publish request, as - // determined by the PubSub service. - MaxPublishRequestCount = 1000 - - // The maximum size of a single publish request in bytes, as determined by the PubSub service. - MaxPublishRequestBytes = 1e7 - - maxInt = int(^uint(0) >> 1) -) - -// ErrOversizedMessage indicates that a message's size exceeds MaxPublishRequestBytes. -var ErrOversizedMessage = bundler.ErrOversizedItem - -// Topic is a reference to a PubSub topic. -// -// The methods of Topic are safe for use by multiple goroutines. -type Topic struct { - s service - // The fully qualified identifier for the topic, in the format "projects//topics/" - name string - - // Settings for publishing messages. All changes must be made before the - // first call to Publish. The default is DefaultPublishSettings. - PublishSettings PublishSettings - - mu sync.RWMutex - stopped bool - bundler *bundler.Bundler - - wg sync.WaitGroup - - // Channel for message bundles to be published. Close to indicate that Stop was called. - bundlec chan []*bundledMessage -} - -// PublishSettings control the bundling of published messages. -type PublishSettings struct { - - // Publish a non-empty batch after this delay has passed. - DelayThreshold time.Duration - - // Publish a batch when it has this many messages. The maximum is - // MaxPublishRequestCount. - CountThreshold int - - // Publish a batch when its size in bytes reaches this value. - ByteThreshold int - - // The number of goroutines that invoke the Publish RPC concurrently. - // Defaults to a multiple of GOMAXPROCS. - NumGoroutines int - - // The maximum time that the client will attempt to publish a bundle of messages. - Timeout time.Duration -} - -// DefaultPublishSettings holds the default values for topics' PublishSettings. -var DefaultPublishSettings = PublishSettings{ - DelayThreshold: 1 * time.Millisecond, - CountThreshold: 100, - ByteThreshold: 1e6, - Timeout: 60 * time.Second, -} - -// CreateTopic creates a new topic. -// The specified topic ID must start with a letter, and contain only letters -// ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods (.), -// tildes (~), plus (+) or percent signs (%). It must be between 3 and 255 -// characters in length, and must not start with "goog". -// If the topic already exists an error will be returned. -func (c *Client) CreateTopic(ctx context.Context, id string) (*Topic, error) { - t := c.Topic(id) - err := c.s.createTopic(ctx, t.name) - return t, err -} - -// Topic creates a reference to a topic. -// -// If a Topic's Publish method is called, it has background goroutines -// associated with it. Clean them up by calling Topic.Stop. -// -// Avoid creating many Topic instances if you use them to publish. -func (c *Client) Topic(id string) *Topic { - return newTopic(c.s, fmt.Sprintf("projects/%s/topics/%s", c.projectID, id)) -} - -func newTopic(s service, name string) *Topic { - // bundlec is unbuffered. A buffer would occupy memory not - // accounted for by the bundler, so BufferedByteLimit would be a lie: - // the actual memory consumed would be higher. - return &Topic{ - s: s, - name: name, - PublishSettings: DefaultPublishSettings, - bundlec: make(chan []*bundledMessage), - } -} - -// Topics returns an iterator which returns all of the topics for the client's project. -func (c *Client) Topics(ctx context.Context) *TopicIterator { - return &TopicIterator{ - s: c.s, - next: c.s.listProjectTopics(ctx, c.fullyQualifiedProjectName()), - } -} - -// TopicIterator is an iterator that returns a series of topics. -type TopicIterator struct { - s service - next nextStringFunc -} - -// Next returns the next topic. If there are no more topics, iterator.Done will be returned. -func (tps *TopicIterator) Next() (*Topic, error) { - topicName, err := tps.next() - if err != nil { - return nil, err - } - return newTopic(tps.s, topicName), nil -} - -// ID returns the unique idenfier of the topic within its project. -func (t *Topic) ID() string { - slash := strings.LastIndex(t.name, "/") - if slash == -1 { - // name is not a fully-qualified name. - panic("bad topic name") - } - return t.name[slash+1:] -} - -// String returns the printable globally unique name for the topic. -func (t *Topic) String() string { - return t.name -} - -// Delete deletes the topic. -func (t *Topic) Delete(ctx context.Context) error { - return t.s.deleteTopic(ctx, t.name) -} - -// Exists reports whether the topic exists on the server. -func (t *Topic) Exists(ctx context.Context) (bool, error) { - if t.name == "_deleted-topic_" { - return false, nil - } - - return t.s.topicExists(ctx, t.name) -} - -func (t *Topic) IAM() *iam.Handle { - return t.s.iamHandle(t.name) -} - -// Subscriptions returns an iterator which returns the subscriptions for this topic. -func (t *Topic) Subscriptions(ctx context.Context) *SubscriptionIterator { - // NOTE: zero or more Subscriptions that are ultimately returned by this - // Subscriptions iterator may belong to a different project to t. - return &SubscriptionIterator{ - s: t.s, - next: t.s.listTopicSubscriptions(ctx, t.name), - } -} - -var errTopicStopped = errors.New("pubsub: Stop has been called for this topic") - -// Publish publishes msg to the topic asynchronously. Messages are batched and -// sent according to the topic's PublishSettings. Publish never blocks. -// -// Publish returns a non-nil PublishResult which will be ready when the -// message has been sent (or has failed to be sent) to the server. -// -// Publish creates goroutines for batching and sending messages. These goroutines -// need to be stopped by calling t.Stop(). Once stopped, future calls to Publish -// will immediately return a PublishResult with an error. -func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { - // TODO(jba): if this turns out to take significant time, try to approximate it. - // Or, convert the messages to protos in Publish, instead of in the service. - msg.size = proto.Size(&pb.PubsubMessage{ - Data: msg.Data, - Attributes: msg.Attributes, - }) - r := &PublishResult{ready: make(chan struct{})} - t.initBundler() - t.mu.RLock() - defer t.mu.RUnlock() - // TODO(aboulhosn) [from bcmills] consider changing the semantics of bundler to perform this logic so we don't have to do it here - if t.stopped { - r.set("", errTopicStopped) - return r - } - - // TODO(jba) [from bcmills] consider using a shared channel per bundle - // (requires Bundler API changes; would reduce allocations) - // The call to Add should never return an error because the bundler's - // BufferedByteLimit is set to maxInt; we do not perform any flow - // control in the client. - err := t.bundler.Add(&bundledMessage{msg, r}, msg.size) - if err != nil { - r.set("", err) - } - return r -} - -// Send all remaining published messages and stop goroutines created for handling -// publishing. Returns once all outstanding messages have been sent or have -// failed to be sent. -func (t *Topic) Stop() { - t.mu.Lock() - noop := t.stopped || t.bundler == nil - t.stopped = true - t.mu.Unlock() - if noop { - return - } - t.bundler.Flush() - // At this point, all pending bundles have been published and the bundler's - // goroutines have exited, so it is OK for this goroutine to close bundlec. - close(t.bundlec) - t.wg.Wait() -} - -// A PublishResult holds the result from a call to Publish. -type PublishResult struct { - ready chan struct{} - serverID string - err error -} - -// Ready returns a channel that is closed when the result is ready. -// When the Ready channel is closed, Get is guaranteed not to block. -func (r *PublishResult) Ready() <-chan struct{} { return r.ready } - -// Get returns the server-generated message ID and/or error result of a Publish call. -// Get blocks until the Publish call completes or the context is done. -func (r *PublishResult) Get(ctx context.Context) (serverID string, err error) { - // If the result is already ready, return it even if the context is done. - select { - case <-r.Ready(): - return r.serverID, r.err - default: - } - select { - case <-ctx.Done(): - return "", ctx.Err() - case <-r.Ready(): - return r.serverID, r.err - } -} - -func (r *PublishResult) set(sid string, err error) { - r.serverID = sid - r.err = err - close(r.ready) -} - -type bundledMessage struct { - msg *Message - res *PublishResult -} - -func (t *Topic) initBundler() { - t.mu.RLock() - noop := t.stopped || t.bundler != nil - t.mu.RUnlock() - if noop { - return - } - t.mu.Lock() - defer t.mu.Unlock() - // Must re-check, since we released the lock. - if t.stopped || t.bundler != nil { - return - } - - // TODO(jba): use a context detached from the one passed to NewClient. - ctx := context.TODO() - // Unless overridden, run several goroutines per CPU to call the Publish RPC. - n := t.PublishSettings.NumGoroutines - if n <= 0 { - n = 25 * runtime.GOMAXPROCS(0) - } - timeout := t.PublishSettings.Timeout - t.wg.Add(n) - for i := 0; i < n; i++ { - go func() { - defer t.wg.Done() - for b := range t.bundlec { - bctx := ctx - cancel := func() {} - if timeout != 0 { - bctx, cancel = context.WithTimeout(ctx, timeout) - } - t.publishMessageBundle(bctx, b) - cancel() - } - }() - } - t.bundler = bundler.NewBundler(&bundledMessage{}, func(items interface{}) { - t.bundlec <- items.([]*bundledMessage) - - }) - t.bundler.DelayThreshold = t.PublishSettings.DelayThreshold - t.bundler.BundleCountThreshold = t.PublishSettings.CountThreshold - if t.bundler.BundleCountThreshold > MaxPublishRequestCount { - t.bundler.BundleCountThreshold = MaxPublishRequestCount - } - t.bundler.BundleByteThreshold = t.PublishSettings.ByteThreshold - t.bundler.BufferedByteLimit = maxInt - t.bundler.BundleByteLimit = MaxPublishRequestBytes -} - -func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) { - msgs := make([]*Message, len(bms)) - for i, bm := range bms { - msgs[i], bm.msg = bm.msg, nil // release bm.msg for GC - } - ids, err := t.s.publishMessages(ctx, t.name, msgs) - for i, bm := range bms { - if err != nil { - bm.res.set("", err) - } else { - bm.res.set(ids[i], nil) - } - } -} -- cgit mrf-deployment