diff options
| author | dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> | 2023-07-25 07:50:36 +0000 |
|---|---|---|
| committer | Taras Madan <tarasmadan@google.com> | 2023-07-25 08:27:48 +0000 |
| commit | b423bd03401d00e754d5e5c0236feda4dfb02e28 (patch) | |
| tree | b4c5cb53485b00adb2877b7fa27b9b6e5f02552a /vendor/cloud.google.com/go/pubsub/iterator.go | |
| parent | e06c669f49a06146914b04a1fbbdd21a0bf1d7b1 (diff) | |
mod: do: bump golang.org/x/oauth2 from 0.5.0 to 0.10.0
Bumps [golang.org/x/oauth2](https://github.com/golang/oauth2) from 0.5.0 to 0.10.0.
- [Commits](https://github.com/golang/oauth2/compare/v0.5.0...v0.10.0)
---
updated-dependencies:
- dependency-name: golang.org/x/oauth2
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <support@github.com>
Diffstat (limited to 'vendor/cloud.google.com/go/pubsub/iterator.go')
| -rw-r--r-- | vendor/cloud.google.com/go/pubsub/iterator.go | 86 |
1 files changed, 63 insertions, 23 deletions
diff --git a/vendor/cloud.google.com/go/pubsub/iterator.go b/vendor/cloud.google.com/go/pubsub/iterator.go index 870e34dd7..f45f1b995 100644 --- a/vendor/cloud.google.com/go/pubsub/iterator.go +++ b/vendor/cloud.google.com/go/pubsub/iterator.go @@ -255,7 +255,17 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) { // do a receipt mod-ack when streaming. maxExt := time.Now().Add(it.po.maxExtension) ackIDs := map[string]*AckResult{} + it.eoMu.RLock() + exactlyOnceDelivery := it.enableExactlyOnceDelivery + it.eoMu.RUnlock() it.mu.Lock() + + // pendingMessages maps ackID -> message, and is used + // only when exactly once delivery is enabled. + // At first, all messages are pending, and they + // are removed if the modack call fails. All other + // messages are returned to the client for processing. + pendingMessages := make(map[string]*ipubsub.Message) for _, m := range msgs { ackID := msgAckID(m) addRecv(m.ID, ackID, now) @@ -264,22 +274,52 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) { // possible if there are retries. if _, ok := it.pendingNacks[ackID]; !ok { // Don't use the message's AckResult here since these are only for receipt modacks. - // ModAckResults are transparent to the user anyway so these can automatically succeed. + // modack results are transparent to the user so these can automatically succeed unless + // exactly once is enabled. // We can't use an empty AckResult here either since SetAckResult will try to // close the channel without checking if it exists. - ackIDs[ackID] = newSuccessAckResult() + if !exactlyOnceDelivery { + ackIDs[ackID] = newSuccessAckResult() + } else { + ackIDs[ackID] = ipubsub.NewAckResult() + pendingMessages[ackID] = m + } } } deadline := it.ackDeadline() it.mu.Unlock() - go func() { - if len(ackIDs) > 0 { - // Don't check the return value of this since modacks are fire and forget, - // meaning errors should not be propagated to the client. - it.sendModAck(ackIDs, deadline) + + if len(ackIDs) > 0 { + // When exactly once delivery is not enabled, modacks are fire and forget. + if !exactlyOnceDelivery { + go func() { + it.sendModAck(ackIDs, deadline, false) + }() + return msgs, nil } - }() - return msgs, nil + + // If exactly once is enabled, we should wait until modack responses are successes + // before attempting to process messages. + it.sendModAck(ackIDs, deadline, false) + for ackID, ar := range ackIDs { + ctx := context.Background() + _, err := ar.Get(ctx) + if err != nil { + delete(pendingMessages, ackID) + it.mu.Lock() + // Remove the message from lease management if modack fails here. + delete(it.keepAliveDeadlines, ackID) + it.mu.Unlock() + } + } + // Only return for processing messages that were successfully modack'ed. + v := make([]*ipubsub.Message, 0, len(pendingMessages)) + for _, m := range pendingMessages { + v = append(v, m) + } + return v, nil + } + return nil, nil } // Get messages using the Pull RPC. @@ -399,10 +439,10 @@ func (it *messageIterator) sender() { } if sendNacks { // Nack indicated by modifying the deadline to zero. - it.sendModAck(nacks, 0) + it.sendModAck(nacks, 0, false) } if sendModAcks { - it.sendModAck(modAcks, dl) + it.sendModAck(modAcks, dl, true) } if sendPing { it.pingStream() @@ -479,7 +519,7 @@ func (it *messageIterator) sendAck(m map[string]*AckResult) { // percentile in order to capture the highest amount of time necessary without // considering 1% outliers. If the ModAck RPC fails and exactly once delivery is // enabled, we retry it in a separate goroutine for a short duration. -func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Duration) { +func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Duration, logOnInvalid bool) { deadlineSec := int32(deadline / time.Second) ackIDs := make([]string, 0, len(m)) for k := range m { @@ -517,7 +557,7 @@ func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Dur if len(toRetry) > 0 { // Retry modacks/nacks in a separate goroutine. go func() { - it.retryModAcks(toRetry, deadlineSec) + it.retryModAcks(toRetry, deadlineSec, logOnInvalid) }() } } @@ -563,29 +603,29 @@ func (it *messageIterator) retryAcks(m map[string]*AckResult) { // in it.sendModAck(), with a max of 2500 ackIDs. Modacks are retried up to 3 times // since after that, the message will have expired. Nacks are retried up until the default // deadline of 10 minutes. -func (it *messageIterator) retryModAcks(m map[string]*AckResult, deadlineSec int32) { +func (it *messageIterator) retryModAcks(m map[string]*AckResult, deadlineSec int32, logOnInvalid bool) { bo := newExactlyOnceBackoff() retryCount := 0 ctx, cancel := context.WithTimeout(context.Background(), exactlyOnceDeliveryRetryDeadline) defer cancel() for { - // If context is done, complete all remaining Nacks with DeadlineExceeded - // ModAcks are not exposed to the user so these don't need to be modified. + // If context is done, complete all AckResults with errors. if ctx.Err() != nil { - if deadlineSec == 0 { - for _, r := range m { - ipubsub.SetAckResult(r, AcknowledgeStatusOther, ctx.Err()) - } + for _, r := range m { + ipubsub.SetAckResult(r, AcknowledgeStatusOther, ctx.Err()) } return } // Only retry modack requests up to 3 times. if deadlineSec != 0 && retryCount > 3 { ackIDs := make([]string, 0, len(m)) - for k := range m { + for k, ar := range m { ackIDs = append(ackIDs, k) + ipubsub.SetAckResult(ar, AcknowledgeStatusOther, errors.New("modack retry failed")) + } + if logOnInvalid { + log.Printf("automatic lease modack retry failed for following IDs: %v", ackIDs) } - log.Printf("automatic lease modack retry failed for following IDs: %v", ackIDs) return } // Don't need to split map since this is the retry function and @@ -723,7 +763,7 @@ func extractMetadata(err error) (*status.Status, map[string]string) { return nil, nil } -// processResults processes AckResults by referring to errorStatus and errorsMap. +// processResults processes AckResults by referring to errorStatus and errorsByAckID. // The errors returned by the server in `errorStatus` or in `errorsByAckID` // are used to complete the AckResults in `ackResMap` (with a success // or error) or to return requests for further retries. |
