aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/cloud.google.com/go/pubsub/iterator.go
diff options
context:
space:
mode:
authordependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>2023-07-25 07:50:36 +0000
committerTaras Madan <tarasmadan@google.com>2023-07-25 08:27:48 +0000
commitb423bd03401d00e754d5e5c0236feda4dfb02e28 (patch)
treeb4c5cb53485b00adb2877b7fa27b9b6e5f02552a /vendor/cloud.google.com/go/pubsub/iterator.go
parente06c669f49a06146914b04a1fbbdd21a0bf1d7b1 (diff)
mod: do: bump golang.org/x/oauth2 from 0.5.0 to 0.10.0
Bumps [golang.org/x/oauth2](https://github.com/golang/oauth2) from 0.5.0 to 0.10.0. - [Commits](https://github.com/golang/oauth2/compare/v0.5.0...v0.10.0) --- updated-dependencies: - dependency-name: golang.org/x/oauth2 dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com>
Diffstat (limited to 'vendor/cloud.google.com/go/pubsub/iterator.go')
-rw-r--r--vendor/cloud.google.com/go/pubsub/iterator.go86
1 files changed, 63 insertions, 23 deletions
diff --git a/vendor/cloud.google.com/go/pubsub/iterator.go b/vendor/cloud.google.com/go/pubsub/iterator.go
index 870e34dd7..f45f1b995 100644
--- a/vendor/cloud.google.com/go/pubsub/iterator.go
+++ b/vendor/cloud.google.com/go/pubsub/iterator.go
@@ -255,7 +255,17 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) {
// do a receipt mod-ack when streaming.
maxExt := time.Now().Add(it.po.maxExtension)
ackIDs := map[string]*AckResult{}
+ it.eoMu.RLock()
+ exactlyOnceDelivery := it.enableExactlyOnceDelivery
+ it.eoMu.RUnlock()
it.mu.Lock()
+
+ // pendingMessages maps ackID -> message, and is used
+ // only when exactly once delivery is enabled.
+ // At first, all messages are pending, and they
+ // are removed if the modack call fails. All other
+ // messages are returned to the client for processing.
+ pendingMessages := make(map[string]*ipubsub.Message)
for _, m := range msgs {
ackID := msgAckID(m)
addRecv(m.ID, ackID, now)
@@ -264,22 +274,52 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) {
// possible if there are retries.
if _, ok := it.pendingNacks[ackID]; !ok {
// Don't use the message's AckResult here since these are only for receipt modacks.
- // ModAckResults are transparent to the user anyway so these can automatically succeed.
+ // modack results are transparent to the user so these can automatically succeed unless
+ // exactly once is enabled.
// We can't use an empty AckResult here either since SetAckResult will try to
// close the channel without checking if it exists.
- ackIDs[ackID] = newSuccessAckResult()
+ if !exactlyOnceDelivery {
+ ackIDs[ackID] = newSuccessAckResult()
+ } else {
+ ackIDs[ackID] = ipubsub.NewAckResult()
+ pendingMessages[ackID] = m
+ }
}
}
deadline := it.ackDeadline()
it.mu.Unlock()
- go func() {
- if len(ackIDs) > 0 {
- // Don't check the return value of this since modacks are fire and forget,
- // meaning errors should not be propagated to the client.
- it.sendModAck(ackIDs, deadline)
+
+ if len(ackIDs) > 0 {
+ // When exactly once delivery is not enabled, modacks are fire and forget.
+ if !exactlyOnceDelivery {
+ go func() {
+ it.sendModAck(ackIDs, deadline, false)
+ }()
+ return msgs, nil
}
- }()
- return msgs, nil
+
+ // If exactly once is enabled, we should wait until modack responses are successes
+ // before attempting to process messages.
+ it.sendModAck(ackIDs, deadline, false)
+ for ackID, ar := range ackIDs {
+ ctx := context.Background()
+ _, err := ar.Get(ctx)
+ if err != nil {
+ delete(pendingMessages, ackID)
+ it.mu.Lock()
+ // Remove the message from lease management if modack fails here.
+ delete(it.keepAliveDeadlines, ackID)
+ it.mu.Unlock()
+ }
+ }
+ // Only return for processing messages that were successfully modack'ed.
+ v := make([]*ipubsub.Message, 0, len(pendingMessages))
+ for _, m := range pendingMessages {
+ v = append(v, m)
+ }
+ return v, nil
+ }
+ return nil, nil
}
// Get messages using the Pull RPC.
@@ -399,10 +439,10 @@ func (it *messageIterator) sender() {
}
if sendNacks {
// Nack indicated by modifying the deadline to zero.
- it.sendModAck(nacks, 0)
+ it.sendModAck(nacks, 0, false)
}
if sendModAcks {
- it.sendModAck(modAcks, dl)
+ it.sendModAck(modAcks, dl, true)
}
if sendPing {
it.pingStream()
@@ -479,7 +519,7 @@ func (it *messageIterator) sendAck(m map[string]*AckResult) {
// percentile in order to capture the highest amount of time necessary without
// considering 1% outliers. If the ModAck RPC fails and exactly once delivery is
// enabled, we retry it in a separate goroutine for a short duration.
-func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Duration) {
+func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Duration, logOnInvalid bool) {
deadlineSec := int32(deadline / time.Second)
ackIDs := make([]string, 0, len(m))
for k := range m {
@@ -517,7 +557,7 @@ func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Dur
if len(toRetry) > 0 {
// Retry modacks/nacks in a separate goroutine.
go func() {
- it.retryModAcks(toRetry, deadlineSec)
+ it.retryModAcks(toRetry, deadlineSec, logOnInvalid)
}()
}
}
@@ -563,29 +603,29 @@ func (it *messageIterator) retryAcks(m map[string]*AckResult) {
// in it.sendModAck(), with a max of 2500 ackIDs. Modacks are retried up to 3 times
// since after that, the message will have expired. Nacks are retried up until the default
// deadline of 10 minutes.
-func (it *messageIterator) retryModAcks(m map[string]*AckResult, deadlineSec int32) {
+func (it *messageIterator) retryModAcks(m map[string]*AckResult, deadlineSec int32, logOnInvalid bool) {
bo := newExactlyOnceBackoff()
retryCount := 0
ctx, cancel := context.WithTimeout(context.Background(), exactlyOnceDeliveryRetryDeadline)
defer cancel()
for {
- // If context is done, complete all remaining Nacks with DeadlineExceeded
- // ModAcks are not exposed to the user so these don't need to be modified.
+ // If context is done, complete all AckResults with errors.
if ctx.Err() != nil {
- if deadlineSec == 0 {
- for _, r := range m {
- ipubsub.SetAckResult(r, AcknowledgeStatusOther, ctx.Err())
- }
+ for _, r := range m {
+ ipubsub.SetAckResult(r, AcknowledgeStatusOther, ctx.Err())
}
return
}
// Only retry modack requests up to 3 times.
if deadlineSec != 0 && retryCount > 3 {
ackIDs := make([]string, 0, len(m))
- for k := range m {
+ for k, ar := range m {
ackIDs = append(ackIDs, k)
+ ipubsub.SetAckResult(ar, AcknowledgeStatusOther, errors.New("modack retry failed"))
+ }
+ if logOnInvalid {
+ log.Printf("automatic lease modack retry failed for following IDs: %v", ackIDs)
}
- log.Printf("automatic lease modack retry failed for following IDs: %v", ackIDs)
return
}
// Don't need to split map since this is the retry function and
@@ -723,7 +763,7 @@ func extractMetadata(err error) (*status.Status, map[string]string) {
return nil, nil
}
-// processResults processes AckResults by referring to errorStatus and errorsMap.
+// processResults processes AckResults by referring to errorStatus and errorsByAckID.
// The errors returned by the server in `errorStatus` or in `errorsByAckID`
// are used to complete the AckResults in `ackResMap` (with a success
// or error) or to return requests for further retries.