diff options
| author | Taras Madan <tarasmadan@google.com> | 2025-01-22 16:07:17 +0100 |
|---|---|---|
| committer | Taras Madan <tarasmadan@google.com> | 2025-01-23 10:42:36 +0000 |
| commit | 7b4377ad9d8a7205416df8d6217ef2b010f89481 (patch) | |
| tree | e6fec4fd12ff807a16d847923f501075bf71d16c /vendor/cloud.google.com/go/pubsub/pullstream.go | |
| parent | 475a4c203afb8b7d3af51c4fd32bb170ff32a45e (diff) | |
vendor: delete
Diffstat (limited to 'vendor/cloud.google.com/go/pubsub/pullstream.go')
| -rw-r--r-- | vendor/cloud.google.com/go/pubsub/pullstream.go | 211 |
1 files changed, 0 insertions, 211 deletions
diff --git a/vendor/cloud.google.com/go/pubsub/pullstream.go b/vendor/cloud.google.com/go/pubsub/pullstream.go deleted file mode 100644 index 231e5a64a..000000000 --- a/vendor/cloud.google.com/go/pubsub/pullstream.go +++ /dev/null @@ -1,211 +0,0 @@ -// Copyright 2018 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pubsub - -import ( - "context" - "fmt" - "io" - "net/url" - "sync" - "time" - - pb "cloud.google.com/go/pubsub/apiv1/pubsubpb" - gax "github.com/googleapis/gax-go/v2" - "google.golang.org/grpc" -) - -// A pullStream supports the methods of a StreamingPullClient, but re-opens -// the stream on a retryable error. -type pullStream struct { - ctx context.Context - cancel context.CancelFunc // cancel function of the context above - open func() (pb.Subscriber_StreamingPullClient, context.CancelFunc, error) - close context.CancelFunc // cancel function to close down the currently open stream - - mu sync.Mutex - spc *pb.Subscriber_StreamingPullClient - err error // permanent error -} - -// for testing -type streamingPullFunc func(context.Context, ...gax.CallOption) (pb.Subscriber_StreamingPullClient, error) - -func newPullStream(ctx context.Context, streamingPull streamingPullFunc, subName, clientID string, maxOutstandingMessages, maxOutstandingBytes int, maxDurationPerLeaseExtension time.Duration) *pullStream { - ctx = withSubscriptionKey(ctx, subName) - hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "subscription", url.QueryEscape(subName))} - ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...) - ctx, cancel := context.WithCancel(ctx) - return &pullStream{ - ctx: ctx, - cancel: cancel, - open: func() (pb.Subscriber_StreamingPullClient, context.CancelFunc, error) { - sctx, close := context.WithCancel(ctx) - spc, err := streamingPull(sctx, gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(maxSendRecvBytes))) - if err == nil { - recordStat(ctx, StreamRequestCount, 1) - streamAckDeadline := int32(maxDurationPerLeaseExtension / time.Second) - // By default, maxDurationPerLeaseExtension, aka MaxExtensionPeriod, is disabled, - // so in these cases, use a healthy default of 60 seconds. - if streamAckDeadline <= 0 { - streamAckDeadline = 60 - } - err = spc.Send(&pb.StreamingPullRequest{ - Subscription: subName, - ClientId: clientID, - StreamAckDeadlineSeconds: streamAckDeadline, - MaxOutstandingMessages: int64(maxOutstandingMessages), - MaxOutstandingBytes: int64(maxOutstandingBytes), - }) - } - if err != nil { - close() - return nil, nil, err - } - return spc, close, nil - }, - } -} - -// get returns either a valid *StreamingPullClient (SPC), or a permanent error. -// If the argument is nil, this is the first call for an RPC, and the current -// SPC will be returned (or a new one will be opened). Otherwise, this call is a -// request to re-open the stream because of a retryable error, and the argument -// is a pointer to the SPC that returned the error. -func (s *pullStream) get(spc *pb.Subscriber_StreamingPullClient) (*pb.Subscriber_StreamingPullClient, error) { - s.mu.Lock() - defer s.mu.Unlock() - // A stored error is permanent. - if s.err != nil { - return nil, s.err - } - // If the context is done, so are we. - s.err = s.ctx.Err() - if s.err != nil { - return nil, s.err - } - - // If the current and argument SPCs differ, return the current one. This subsumes two cases: - // 1. We have an SPC and the caller is getting the stream for the first time. - // 2. The caller wants to retry, but they have an older SPC; we've already retried. - if spc != s.spc { - return s.spc, nil - } - // we are about to open a new stream: if necessary, make sure the previous one is closed - if s.close != nil { - s.close() - } - // Either this is the very first call on this stream (s.spc == nil), or we have a valid - // retry request. Either way, open a new stream. - // The lock is held here for a long time, but it doesn't matter because no callers could get - // anything done anyway. - s.spc = new(pb.Subscriber_StreamingPullClient) - *s.spc, s.close, s.err = s.openWithRetry() // Any error from openWithRetry is permanent. - return s.spc, s.err -} - -func (s *pullStream) openWithRetry() (pb.Subscriber_StreamingPullClient, context.CancelFunc, error) { - r := defaultRetryer{} - for { - recordStat(s.ctx, StreamOpenCount, 1) - spc, close, err := s.open() - bo, shouldRetry := r.Retry(err) - if err != nil && shouldRetry { - recordStat(s.ctx, StreamRetryCount, 1) - if err := gax.Sleep(s.ctx, bo); err != nil { - return nil, nil, err - } - continue - } - return spc, close, err - } -} - -func (s *pullStream) call(f func(pb.Subscriber_StreamingPullClient) error, opts ...gax.CallOption) error { - var settings gax.CallSettings - for _, opt := range opts { - opt.Resolve(&settings) - } - var r gax.Retryer = &defaultRetryer{} - if settings.Retry != nil { - r = settings.Retry() - } - - var ( - spc *pb.Subscriber_StreamingPullClient - err error - ) - for { - spc, err = s.get(spc) - if err != nil { - return err - } - start := time.Now() - err = f(*spc) - if err != nil { - bo, shouldRetry := r.Retry(err) - if shouldRetry { - recordStat(s.ctx, StreamRetryCount, 1) - if time.Since(start) < 30*time.Second { // don't sleep if we've been blocked for a while - if err := gax.Sleep(s.ctx, bo); err != nil { - return err - } - } - continue - } - s.mu.Lock() - s.err = err - s.mu.Unlock() - } - return err - } -} - -func (s *pullStream) Send(req *pb.StreamingPullRequest) error { - return s.call(func(spc pb.Subscriber_StreamingPullClient) error { - recordStat(s.ctx, AckCount, int64(len(req.AckIds))) - zeroes := 0 - for _, mds := range req.ModifyDeadlineSeconds { - if mds == 0 { - zeroes++ - } - } - recordStat(s.ctx, NackCount, int64(zeroes)) - recordStat(s.ctx, ModAckCount, int64(len(req.ModifyDeadlineSeconds)-zeroes)) - recordStat(s.ctx, StreamRequestCount, 1) - return spc.Send(req) - }) -} - -func (s *pullStream) Recv() (*pb.StreamingPullResponse, error) { - var res *pb.StreamingPullResponse - err := s.call(func(spc pb.Subscriber_StreamingPullClient) error { - var err error - recordStat(s.ctx, StreamResponseCount, 1) - res, err = spc.Recv() - return err - }, gax.WithRetry(func() gax.Retryer { return &streamingPullRetryer{defaultRetryer: &defaultRetryer{}} })) - return res, err -} - -func (s *pullStream) CloseSend() error { - err := s.call(func(spc pb.Subscriber_StreamingPullClient) error { - return spc.CloseSend() - }) - s.mu.Lock() - s.err = io.EOF // should not be retried - s.mu.Unlock() - return err -} |
