aboutsummaryrefslogtreecommitdiffstats
path: root/vendor/github.com/GoogleCloudPlatform
diff options
context:
space:
mode:
authorTaras Madan <tarasmadan@google.com>2024-09-10 12:16:33 +0200
committerTaras Madan <tarasmadan@google.com>2024-09-10 14:05:26 +0000
commitc97c816133b42257d0bcf1ee4bd178bb2a7a2b9e (patch)
tree0bcbc2e540bbf8f62f6c17887cdd53b8c2cee637 /vendor/github.com/GoogleCloudPlatform
parent54e657429ab892ad06c90cd7c1a4eb33ba93a3dc (diff)
vendor: update
Diffstat (limited to 'vendor/github.com/GoogleCloudPlatform')
-rw-r--r--vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/LICENSE202
-rw-r--r--vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/README.md18
-rw-r--r--vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/doc.go129
-rw-r--r--vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/gcp_balancer.go576
-rw-r--r--vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/gcp_interceptor.go130
-rw-r--r--vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/gcp_logger.go98
-rw-r--r--vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/gcp_multiendpoint.go408
-rw-r--r--vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/gcp_picker.go276
-rw-r--r--vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp/codegen.sh6
-rw-r--r--vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp/grpc_gcp.pb.go638
-rw-r--r--vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp/grpc_gcp.proto129
-rw-r--r--vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/mockgen.sh4
-rw-r--r--vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/multiendpoint/endpoint.go54
-rw-r--r--vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/multiendpoint/multiendpoint.go306
-rw-r--r--vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/test_config.json29
15 files changed, 3003 insertions, 0 deletions
diff --git a/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/LICENSE b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/LICENSE
new file mode 100644
index 000000000..d64569567
--- /dev/null
+++ b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/LICENSE
@@ -0,0 +1,202 @@
+
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/README.md b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/README.md
new file mode 100644
index 000000000..47d809846
--- /dev/null
+++ b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/README.md
@@ -0,0 +1,18 @@
+
+## How to test Spanner integration
+
+1. Set GCP project id with GCP_PROJECT_ID environment variable.
+
+ export GCP_PROJECT_ID=test-project
+
+1. Set service key credentials file using GOOGLE_APPLICATION_CREDENTIALS env variable.
+
+ export GOOGLE_APPLICATION_CREDENTIALS=/service/account/credentials.json
+
+1. Run the tests.
+
+ go test -v
+
+To skip Spanner setup run
+
+ SKIP_SPANNER=true go test -v
diff --git a/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/doc.go b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/doc.go
new file mode 100644
index 000000000..aae869fb5
--- /dev/null
+++ b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/doc.go
@@ -0,0 +1,129 @@
+/*
+ *
+ * Copyright 2019 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/*
+Package grpcgcp provides grpc supports for Google Cloud APIs.
+For now it provides connection management with affinity support.
+
+Note: "channel" is analagous to "connection" in our context.
+
+Usage:
+
+1. First, initialize the api configuration. There are two ways:
+
+ 1a. Create a json file defining the configuration and read it.
+
+ // Create some_api_config.json
+ {
+ "channelPool": {
+ "maxSize": 4,
+ "maxConcurrentStreamsLowWatermark": 50
+ },
+ "method": [
+ {
+ "name": [ "/some.api.v1/Method1" ],
+ "affinity": {
+ "command": "BIND",
+ "affinityKey": "key1"
+ }
+ },
+ {
+ "name": [ "/some.api.v1/Method2" ],
+ "affinity": {
+ "command": "BOUND",
+ "affinityKey": "key2"
+ }
+ },
+ {
+ "name": [ "/some.api.v1/Method3" ],
+ "affinity": {
+ "command": "UNBIND",
+ "affinityKey": "key3"
+ }
+ }
+ ]
+ }
+
+ jsonFile, err := ioutil.ReadFile("some_api_config.json")
+ if err != nil {
+ t.Fatalf("Failed to read config file: %v", err)
+ }
+ jsonCfg := string(jsonFile)
+
+ 1b. Create apiConfig directly and convert it to json.
+
+ // import (
+ // configpb "github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp"
+ // )
+
+ apiConfig := &configpb.ApiConfig{
+ ChannelPool: &configpb.ChannelPoolConfig{
+ MaxSize: 4,
+ MaxConcurrentStreamsLowWatermark: 50,
+ },
+ Method: []*configpb.MethodConfig{
+ &configpb.MethodConfig{
+ Name: []string{"/some.api.v1/Method1"},
+ Affinity: &configpb.AffinityConfig{
+ Command: configpb.AffinityConfig_BIND,
+ AffinityKey: "key1",
+ },
+ },
+ &configpb.MethodConfig{
+ Name: []string{"/some.api.v1/Method2"},
+ Affinity: &configpb.AffinityConfig{
+ Command: configpb.AffinityConfig_BOUND,
+ AffinityKey: "key2",
+ },
+ },
+ &configpb.MethodConfig{
+ Name: []string{"/some.api.v1/Method3"},
+ Affinity: &configpb.AffinityConfig{
+ Command: configpb.AffinityConfig_UNBIND,
+ AffinityKey: "key3",
+ },
+ },
+ },
+ }
+
+ c, err := protojson.Marshal(apiConfig)
+ if err != nil {
+ t.Fatalf("cannot json encode config: %v", err)
+ }
+ jsonCfg := string(c)
+
+2. Make ClientConn with specific DialOptions to enable grpc_gcp load balancer
+with provided configuration. And specify gRPC-GCP interceptors.
+
+ conn, err := grpc.Dial(
+ target,
+ // Register and specify the grpc-gcp load balancer.
+ grpc.WithDisableServiceConfig(),
+ grpc.WithDefaultServiceConfig(
+ fmt.Sprintf(
+ `{"loadBalancingConfig": [{"%s":%s}]}`,
+ grpcgcp.Name,
+ jsonCfg,
+ ),
+ ),
+ // Set grpcgcp interceptors.
+ grpc.WithUnaryInterceptor(grpcgcp.GCPUnaryClientInterceptor),
+ grpc.WithStreamInterceptor(grpcgcp.GCPStreamClientInterceptor),
+ )
+*/
+package grpcgcp // import "github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp"
diff --git a/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/gcp_balancer.go b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/gcp_balancer.go
new file mode 100644
index 000000000..6fdac53d6
--- /dev/null
+++ b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/gcp_balancer.go
@@ -0,0 +1,576 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package grpcgcp
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "google.golang.org/grpc/balancer"
+ "google.golang.org/grpc/connectivity"
+ "google.golang.org/grpc/grpclog"
+ "google.golang.org/grpc/resolver"
+ "google.golang.org/grpc/serviceconfig"
+ "google.golang.org/protobuf/encoding/protojson"
+ "google.golang.org/protobuf/proto"
+
+ pb "github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp"
+)
+
+var _ balancer.Balancer = (*gcpBalancer)(nil) // Ensure gcpBalancer implements Balancer
+
+const (
+ // Name is the name of grpc_gcp balancer.
+ Name = "grpc_gcp"
+
+ healthCheckEnabled = true
+ defaultMinSize = 1
+ defaultMaxSize = 4
+ defaultMaxStreams = 100
+)
+
+func init() {
+ balancer.Register(newBuilder())
+}
+
+type gcpBalancerBuilder struct {
+ balancer.ConfigParser
+}
+
+type GCPBalancerConfig struct {
+ serviceconfig.LoadBalancingConfig
+ *pb.ApiConfig
+}
+
+func (bb *gcpBalancerBuilder) Build(
+ cc balancer.ClientConn,
+ opt balancer.BuildOptions,
+) balancer.Balancer {
+ gb := &gcpBalancer{
+ cc: cc,
+ methodCfg: make(map[string]*pb.AffinityConfig),
+ affinityMap: make(map[string]balancer.SubConn),
+ fallbackMap: make(map[string]balancer.SubConn),
+ scRefs: make(map[balancer.SubConn]*subConnRef),
+ scStates: make(map[balancer.SubConn]connectivity.State),
+ refreshingScRefs: make(map[balancer.SubConn]*subConnRef),
+ scRefList: []*subConnRef{},
+ rrRefId: ^uint32(0),
+ csEvltr: &connectivityStateEvaluator{},
+ // Initialize picker to a picker that always return
+ // ErrNoSubConnAvailable, because when state of a SubConn changes, we
+ // may call UpdateBalancerState with this picker.
+ picker: newErrPicker(balancer.ErrNoSubConnAvailable),
+ }
+ gb.log = NewGCPLogger(compLogger, fmt.Sprintf("[gcpBalancer %p]", gb))
+ return gb
+}
+
+func (*gcpBalancerBuilder) Name() string {
+ return Name
+}
+
+// ParseConfig converts raw json config into GCPBalancerConfig.
+// This is called by ClientConn on any load balancer config update.
+// After parsing the config, ClientConn calls UpdateClientConnState passing the config.
+func (*gcpBalancerBuilder) ParseConfig(j json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
+ c := &GCPBalancerConfig{
+ ApiConfig: &pb.ApiConfig{},
+ }
+ err := protojson.Unmarshal(j, c)
+ return c, err
+}
+
+// newBuilder creates a new grpcgcp balancer builder.
+func newBuilder() balancer.Builder {
+ return &gcpBalancerBuilder{}
+}
+
+// connectivityStateEvaluator gets updated by addrConns when their
+// states transition, based on which it evaluates the state of
+// ClientConn.
+type connectivityStateEvaluator struct {
+ numReady uint64 // Number of addrConns in ready state.
+ numConnecting uint64 // Number of addrConns in connecting state.
+ numTransientFailure uint64 // Number of addrConns in transientFailure.
+}
+
+// recordTransition records state change happening in every subConn and based on
+// that it evaluates what aggregated state should be.
+// It can only transition between Ready, Connecting and TransientFailure. Other states,
+// Idle and Shutdown are transitioned into by ClientConn; in the beginning of the connection
+// before any subConn is created ClientConn is in idle state. In the end when ClientConn
+// closes it is in Shutdown state.
+//
+// recordTransition should only be called synchronously from the same goroutine.
+func (cse *connectivityStateEvaluator) recordTransition(
+ oldState,
+ newState connectivity.State,
+) connectivity.State {
+ // Update counters.
+ for idx, state := range []connectivity.State{oldState, newState} {
+ updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
+ switch state {
+ case connectivity.Ready:
+ cse.numReady += updateVal
+ case connectivity.Connecting:
+ cse.numConnecting += updateVal
+ case connectivity.TransientFailure:
+ cse.numTransientFailure += updateVal
+ }
+ }
+
+ // Evaluate.
+ if cse.numReady > 0 {
+ return connectivity.Ready
+ }
+ if cse.numConnecting > 0 {
+ return connectivity.Connecting
+ }
+ return connectivity.TransientFailure
+}
+
+// subConnRef keeps reference to the real SubConn with its
+// connectivity state, affinity count and streams count.
+type subConnRef struct {
+ subConn balancer.SubConn
+ stateSignal chan struct{} // This channel is closed and re-created when subConn or its state changes.
+ affinityCnt int32 // Keeps track of the number of keys bound to the subConn.
+ streamsCnt int32 // Keeps track of the number of streams opened on the subConn.
+ lastResp time.Time // Timestamp of the last response from the server.
+ deCalls uint32 // Keeps track of deadline exceeded calls since last response.
+ refreshing bool // If this subconn is in the process of refreshing.
+ refreshCnt uint32 // Number of refreshes since last response.
+}
+
+func (ref *subConnRef) getAffinityCnt() int32 {
+ return atomic.LoadInt32(&ref.affinityCnt)
+}
+
+func (ref *subConnRef) getStreamsCnt() int32 {
+ return atomic.LoadInt32(&ref.streamsCnt)
+}
+
+func (ref *subConnRef) affinityIncr() {
+ atomic.AddInt32(&ref.affinityCnt, 1)
+}
+
+func (ref *subConnRef) affinityDecr() {
+ atomic.AddInt32(&ref.affinityCnt, -1)
+}
+
+func (ref *subConnRef) streamsIncr() {
+ atomic.AddInt32(&ref.streamsCnt, 1)
+}
+
+func (ref *subConnRef) streamsDecr() {
+ atomic.AddInt32(&ref.streamsCnt, -1)
+}
+
+func (ref *subConnRef) deCallsInc() uint32 {
+ return atomic.AddUint32(&ref.deCalls, 1)
+}
+
+func (ref *subConnRef) gotResp() {
+ ref.lastResp = time.Now()
+ atomic.StoreUint32(&ref.deCalls, 0)
+ ref.refreshCnt = 0
+}
+
+type gcpBalancer struct {
+ cfg *GCPBalancerConfig
+ methodCfg map[string]*pb.AffinityConfig
+
+ addrs []resolver.Address
+ cc balancer.ClientConn
+ csEvltr *connectivityStateEvaluator
+ state connectivity.State
+
+ mu sync.RWMutex
+ affinityMap map[string]balancer.SubConn
+ fallbackMap map[string]balancer.SubConn
+ scStates map[balancer.SubConn]connectivity.State
+ scRefs map[balancer.SubConn]*subConnRef
+ scRefList []*subConnRef
+ rrRefId uint32
+
+ // Map from a fresh SubConn to the subConnRef where we want to refresh subConn.
+ refreshingScRefs map[balancer.SubConn]*subConnRef
+ // Unresponsive detection enabled flag.
+ unresponsiveDetection bool
+
+ picker balancer.Picker
+ log grpclog.LoggerV2
+}
+
+func (gb *gcpBalancer) initializeConfig(cfg *GCPBalancerConfig) {
+ gb.cfg = &GCPBalancerConfig{
+ ApiConfig: &pb.ApiConfig{
+ ChannelPool: &pb.ChannelPoolConfig{},
+ },
+ }
+ if cfg != nil && cfg.ApiConfig != nil {
+ gb.cfg = &GCPBalancerConfig{
+ ApiConfig: proto.Clone(cfg.ApiConfig).(*pb.ApiConfig),
+ }
+ }
+
+ if gb.cfg.GetChannelPool() == nil {
+ gb.cfg.ChannelPool = &pb.ChannelPoolConfig{}
+ }
+ cp := gb.cfg.GetChannelPool()
+ if cp.GetMinSize() == 0 {
+ cp.MinSize = defaultMinSize
+ }
+ if cp.GetMaxSize() == 0 {
+ cp.MaxSize = defaultMaxSize
+ }
+ if cp.GetMaxConcurrentStreamsLowWatermark() == 0 {
+ cp.MaxConcurrentStreamsLowWatermark = defaultMaxStreams
+ }
+ mp := make(map[string]*pb.AffinityConfig)
+ methodCfgs := gb.cfg.GetMethod()
+ for _, methodCfg := range methodCfgs {
+ methodNames := methodCfg.GetName()
+ affinityCfg := methodCfg.GetAffinity()
+ if methodNames != nil && affinityCfg != nil {
+ for _, method := range methodNames {
+ mp[method] = affinityCfg
+ }
+ }
+ }
+ gb.methodCfg = mp
+ gb.unresponsiveDetection = cp.GetUnresponsiveCalls() > 0 && cp.GetUnresponsiveDetectionMs() > 0
+ gb.enforceMinSize()
+}
+
+func (gb *gcpBalancer) enforceMinSize() {
+ for len(gb.scRefs) < int(gb.cfg.GetChannelPool().GetMinSize()) {
+ gb.addSubConn()
+ }
+}
+
+func (gb *gcpBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
+ gb.mu.Lock()
+ defer gb.mu.Unlock()
+ addrs := ccs.ResolverState.Addresses
+ if gb.log.V(FINE) {
+ gb.log.Infoln("got new resolved addresses: ", addrs, " and balancer config: ", ccs.BalancerConfig)
+ }
+ gb.addrs = addrs
+ if gb.cfg == nil {
+ cfg, ok := ccs.BalancerConfig.(*GCPBalancerConfig)
+ if !ok && ccs.BalancerConfig != nil {
+ return fmt.Errorf("provided config is not GCPBalancerConfig: %v", ccs.BalancerConfig)
+ }
+ gb.initializeConfig(cfg)
+ }
+
+ if len(gb.scRefs) == 0 {
+ gb.newSubConn()
+ return nil
+ }
+
+ for _, scRef := range gb.scRefs {
+ // TODO(weiranf): update streams count when new addrs resolved?
+ scRef.subConn.UpdateAddresses(addrs)
+ scRef.subConn.Connect()
+ }
+
+ return nil
+}
+
+func (gb *gcpBalancer) ResolverError(err error) {
+ gb.log.Warningf("ResolverError: %v", err)
+}
+
+// check current connection pool size
+func (gb *gcpBalancer) getConnectionPoolSize() int {
+ // TODO(golobokov): replace this with locked increase of subconns.
+ gb.mu.Lock()
+ defer gb.mu.Unlock()
+ return len(gb.scRefs)
+}
+
+// newSubConn creates a new SubConn using cc.NewSubConn and initialize the subConnRef
+// if none of the subconns are in the Connecting state.
+func (gb *gcpBalancer) newSubConn() {
+ gb.mu.Lock()
+ defer gb.mu.Unlock()
+
+ // there are chances the newly created subconns are still connecting,
+ // we can wait on those new subconns.
+ for _, scState := range gb.scStates {
+ if scState == connectivity.Connecting || scState == connectivity.Idle {
+ return
+ }
+ }
+ gb.addSubConn()
+}
+
+// addSubConn creates a new SubConn using cc.NewSubConn and initialize the subConnRef.
+// Must be called holding the mutex lock.
+func (gb *gcpBalancer) addSubConn() {
+ sc, err := gb.cc.NewSubConn(
+ gb.addrs,
+ balancer.NewSubConnOptions{HealthCheckEnabled: healthCheckEnabled},
+ )
+ if err != nil {
+ gb.log.Errorf("failed to NewSubConn: %v", err)
+ return
+ }
+ gb.scRefs[sc] = &subConnRef{
+ subConn: sc,
+ stateSignal: make(chan struct{}),
+ lastResp: time.Now(),
+ }
+ gb.scStates[sc] = connectivity.Idle
+ gb.scRefList = append(gb.scRefList, gb.scRefs[sc])
+ sc.Connect()
+}
+
+// getReadySubConnRef returns a subConnRef and a bool. The bool indicates whether
+// the boundKey exists in the affinityMap. If returned subConnRef is a nil, it
+// means the underlying subconn is not READY yet.
+func (gb *gcpBalancer) getReadySubConnRef(boundKey string) (*subConnRef, bool) {
+ gb.mu.Lock()
+ defer gb.mu.Unlock()
+
+ if sc, ok := gb.affinityMap[boundKey]; ok {
+ if gb.scStates[sc] != connectivity.Ready {
+ // It's possible that the bound subconn is not in the readySubConns list,
+ // If it's not ready, we throw ErrNoSubConnAvailable or
+ // fallback to a previously mapped ready subconn or the least busy.
+ if gb.cfg.GetChannelPool().GetFallbackToReady() {
+ if sc, ok := gb.fallbackMap[boundKey]; ok {
+ return gb.scRefs[sc], true
+ }
+ // Try to create fallback mapping.
+ if scRef, err := gb.picker.(*gcpPicker).getLeastBusySubConnRef(); err == nil {
+ gb.fallbackMap[boundKey] = scRef.subConn
+ return scRef, true
+ }
+ }
+ return nil, true
+ }
+ return gb.scRefs[sc], true
+ }
+ return nil, false
+}
+
+func (gb *gcpBalancer) getSubConnRoundRobin(ctx context.Context) *subConnRef {
+ if len(gb.scRefList) == 0 {
+ gb.newSubConn()
+ }
+ scRef := gb.scRefList[atomic.AddUint32(&gb.rrRefId, 1)%uint32(len(gb.scRefList))]
+
+ gb.mu.RLock()
+ if state := gb.scStates[scRef.subConn]; state == connectivity.Ready {
+ gb.mu.RUnlock()
+ return scRef
+ } else {
+ grpclog.Infof("grpcgcp.gcpBalancer: scRef is not ready: %v", state)
+ }
+
+ ticker := time.NewTicker(time.Millisecond * 100)
+ defer ticker.Stop()
+
+ // Wait until SubConn is ready or call context is done.
+ for gb.scStates[scRef.subConn] != connectivity.Ready {
+ sigChan := scRef.stateSignal
+ gb.mu.RUnlock()
+ select {
+ case <-ctx.Done():
+ return scRef
+ case <-ticker.C:
+ case <-sigChan:
+ }
+ gb.mu.RLock()
+ }
+ gb.mu.RUnlock()
+
+ return scRef
+}
+
+// bindSubConn binds the given affinity key to an existing subConnRef.
+func (gb *gcpBalancer) bindSubConn(bindKey string, sc balancer.SubConn) {
+ gb.mu.Lock()
+ defer gb.mu.Unlock()
+ _, ok := gb.affinityMap[bindKey]
+ if !ok {
+ gb.affinityMap[bindKey] = sc
+ }
+ gb.scRefs[sc].affinityIncr()
+}
+
+// unbindSubConn removes the existing binding associated with the key.
+func (gb *gcpBalancer) unbindSubConn(boundKey string) {
+ gb.mu.Lock()
+ defer gb.mu.Unlock()
+ boundSC, ok := gb.affinityMap[boundKey]
+ if ok {
+ gb.scRefs[boundSC].affinityDecr()
+ delete(gb.affinityMap, boundKey)
+ }
+}
+
+// regeneratePicker takes a snapshot of the balancer, and generates a picker
+// from it. The picker is
+// - errPicker with ErrTransientFailure if the balancer is in TransientFailure,
+// - built by the pickerBuilder with all READY SubConns otherwise.
+func (gb *gcpBalancer) regeneratePicker() {
+ if gb.state == connectivity.TransientFailure {
+ gb.picker = newErrPicker(balancer.ErrTransientFailure)
+ return
+ }
+ readyRefs := []*subConnRef{}
+
+ // Select ready subConns from subConn map.
+ for sc, scState := range gb.scStates {
+ if scState == connectivity.Ready {
+ readyRefs = append(readyRefs, gb.scRefs[sc])
+ }
+ }
+ gb.picker = newGCPPicker(readyRefs, gb)
+}
+
+func (gb *gcpBalancer) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {
+ gb.mu.Lock()
+ defer gb.mu.Unlock()
+ s := scs.ConnectivityState
+
+ if scRef, found := gb.refreshingScRefs[sc]; found {
+ if gb.log.V(FINE) {
+ gb.log.Infof("handle replacement SubConn state change: %p, %v", sc, s)
+ }
+ if s != connectivity.Ready {
+ // Ignore the replacement sc until it's ready.
+ return
+ }
+
+ // Replace SubConn of the scRef with the fresh SubConn (sc) concluding
+ // the refresh process initiated by refresh(*subConnRef).
+ oldSc := scRef.subConn
+ gb.scStates[sc] = gb.scStates[oldSc]
+ delete(gb.refreshingScRefs, sc)
+ delete(gb.scRefs, oldSc)
+ delete(gb.scStates, oldSc)
+ gb.scRefs[sc] = scRef
+ scRef.subConn = sc
+ scRef.deCalls = 0
+ scRef.lastResp = time.Now()
+ scRef.refreshing = false
+ scRef.refreshCnt++
+ gb.cc.RemoveSubConn(oldSc)
+ }
+
+ if gb.log.V(FINE) {
+ gb.log.Infof("handle SubConn state change: %p, %v", sc, s)
+ }
+
+ oldS, ok := gb.scStates[sc]
+ if !ok {
+ if gb.log.V(FINE) {
+ gb.log.Infof(
+ "got state changes for an unknown/replaced SubConn: %p, %v",
+ sc,
+ s,
+ )
+ }
+ return
+ }
+ gb.scStates[sc] = s
+ switch s {
+ case connectivity.Idle:
+ sc.Connect()
+ case connectivity.Shutdown:
+ delete(gb.scRefs, sc)
+ delete(gb.scStates, sc)
+ }
+ if oldS == connectivity.Ready && s != oldS {
+ // Subconn is broken. Remove fallback mapping to this subconn.
+ for k, v := range gb.fallbackMap {
+ if v == sc {
+ delete(gb.fallbackMap, k)
+ }
+ }
+ }
+ if oldS != connectivity.Ready && s == connectivity.Ready {
+ // Remove fallback mapping for the keys of recovered subconn.
+ for k := range gb.fallbackMap {
+ if gb.affinityMap[k] == sc {
+ delete(gb.fallbackMap, k)
+ }
+ }
+ }
+
+ oldAggrState := gb.state
+ gb.state = gb.csEvltr.recordTransition(oldS, s)
+
+ // Regenerate picker when one of the following happens:
+ // - this sc became ready from not-ready
+ // - this sc became not-ready from ready
+ // - the aggregated state of balancer became TransientFailure from non-TransientFailure
+ // - the aggregated state of balancer became non-TransientFailure from TransientFailure
+ if (s == connectivity.Ready) != (oldS == connectivity.Ready) ||
+ (gb.state == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
+ gb.regeneratePicker()
+ gb.cc.UpdateState(balancer.State{
+ ConnectivityState: gb.state,
+ Picker: gb.picker,
+ })
+ }
+
+ if scRef := gb.scRefs[sc]; scRef != nil {
+ // Inform of the state change.
+ close(scRef.stateSignal)
+ scRef.stateSignal = make(chan struct{})
+ }
+}
+
+// refresh initiates a new SubConn for a specific subConnRef and starts connecting.
+// If the refresh is already initiated for the ref, then this is a no-op.
+func (gb *gcpBalancer) refresh(ref *subConnRef) {
+ if ref.refreshing {
+ return
+ }
+ gb.mu.Lock()
+ defer gb.mu.Unlock()
+ if ref.refreshing {
+ return
+ }
+ ref.refreshing = true
+ sc, err := gb.cc.NewSubConn(
+ gb.addrs,
+ balancer.NewSubConnOptions{HealthCheckEnabled: healthCheckEnabled},
+ )
+ if err != nil {
+ gb.log.Errorf("failed to create a replacement SubConn with NewSubConn: %v", err)
+ return
+ }
+ gb.refreshingScRefs[sc] = ref
+ sc.Connect()
+}
+
+func (gb *gcpBalancer) Close() {
+}
diff --git a/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/gcp_interceptor.go b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/gcp_interceptor.go
new file mode 100644
index 000000000..c0d77f1ce
--- /dev/null
+++ b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/gcp_interceptor.go
@@ -0,0 +1,130 @@
+/*
+ *
+ * Copyright 2019 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package grpcgcp
+
+import (
+ "context"
+ "sync"
+
+ "google.golang.org/grpc"
+)
+
+type key int
+
+var gcpKey key
+
+type gcpContext struct {
+ // request message used for pre-process of an affinity call
+ reqMsg interface{}
+ // response message used for post-process of an affinity call
+ replyMsg interface{}
+}
+
+// GCPUnaryClientInterceptor intercepts the execution of a unary RPC
+// and injects necessary information to be used by the picker.
+func GCPUnaryClientInterceptor(
+ ctx context.Context,
+ method string,
+ req interface{},
+ reply interface{},
+ cc *grpc.ClientConn,
+ invoker grpc.UnaryInvoker,
+ opts ...grpc.CallOption,
+) error {
+ gcpCtx := &gcpContext{
+ reqMsg: req,
+ replyMsg: reply,
+ }
+ ctx = context.WithValue(ctx, gcpKey, gcpCtx)
+
+ return invoker(ctx, method, req, reply, cc, opts...)
+}
+
+// GCPStreamClientInterceptor intercepts the execution of a client streaming RPC
+// and injects necessary information to be used by the picker.
+func GCPStreamClientInterceptor(
+ ctx context.Context,
+ desc *grpc.StreamDesc,
+ cc *grpc.ClientConn,
+ method string,
+ streamer grpc.Streamer,
+ opts ...grpc.CallOption,
+) (grpc.ClientStream, error) {
+ // This constructor does not create a real ClientStream,
+ // it only stores all parameters and let SendMsg() to create ClientStream.
+ cs := &gcpClientStream{
+ ctx: ctx,
+ desc: desc,
+ cc: cc,
+ method: method,
+ streamer: streamer,
+ opts: opts,
+ }
+ cs.cond = sync.NewCond(cs)
+ return cs, nil
+}
+
+type gcpClientStream struct {
+ sync.Mutex
+ grpc.ClientStream
+
+ cond *sync.Cond
+ initStreamErr error
+
+ ctx context.Context
+ desc *grpc.StreamDesc
+ cc *grpc.ClientConn
+ method string
+ streamer grpc.Streamer
+ opts []grpc.CallOption
+}
+
+func (cs *gcpClientStream) SendMsg(m interface{}) error {
+ cs.Lock()
+ // Initialize underlying ClientStream when getting the first request.
+ if cs.ClientStream == nil {
+ ctx := context.WithValue(cs.ctx, gcpKey, &gcpContext{reqMsg: m})
+ realCS, err := cs.streamer(ctx, cs.desc, cs.cc, cs.method, cs.opts...)
+ if err != nil {
+ cs.initStreamErr = err
+ cs.Unlock()
+ cs.cond.Broadcast()
+ return err
+ }
+ cs.ClientStream = realCS
+ }
+ cs.Unlock()
+ cs.cond.Broadcast()
+ return cs.ClientStream.SendMsg(m)
+}
+
+func (cs *gcpClientStream) RecvMsg(m interface{}) error {
+ // If RecvMsg is called before SendMsg, it should wait until cs.ClientStream
+ // is initialized or the initialization failed.
+ cs.Lock()
+ for cs.initStreamErr == nil && cs.ClientStream == nil {
+ cs.cond.Wait()
+ }
+ if cs.initStreamErr != nil {
+ cs.Unlock()
+ return cs.initStreamErr
+ }
+ cs.Unlock()
+ return cs.ClientStream.RecvMsg(m)
+}
diff --git a/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/gcp_logger.go b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/gcp_logger.go
new file mode 100644
index 000000000..cd1caf301
--- /dev/null
+++ b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/gcp_logger.go
@@ -0,0 +1,98 @@
+package grpcgcp
+
+import (
+ "strings"
+
+ "google.golang.org/grpc/grpclog"
+)
+
+const (
+ FINE = 90
+ FINEST = 99
+)
+
+var compLogger = grpclog.Component("grpcgcp")
+
+type gcpLogger struct {
+ logger grpclog.LoggerV2
+ prefix string
+}
+
+// Make sure gcpLogger implements grpclog.LoggerV2.
+var _ grpclog.LoggerV2 = (*gcpLogger)(nil)
+
+func NewGCPLogger(logger grpclog.LoggerV2, prefix string) *gcpLogger {
+ p := prefix
+ if !strings.HasSuffix(p, " ") {
+ p = p + " "
+ }
+ return &gcpLogger{
+ logger: logger,
+ prefix: p,
+ }
+}
+
+// Error implements grpclog.LoggerV2.
+func (l *gcpLogger) Error(args ...interface{}) {
+ l.logger.Error(append([]interface{}{l.prefix}, args)...)
+}
+
+// Errorf implements grpclog.LoggerV2.
+func (l *gcpLogger) Errorf(format string, args ...interface{}) {
+ l.logger.Errorf(l.prefix+format, args...)
+}
+
+// Errorln implements grpclog.LoggerV2.
+func (l *gcpLogger) Errorln(args ...interface{}) {
+ l.logger.Errorln(append([]interface{}{l.prefix}, args)...)
+}
+
+// Fatal implements grpclog.LoggerV2.
+func (l *gcpLogger) Fatal(args ...interface{}) {
+ l.logger.Fatal(append([]interface{}{l.prefix}, args)...)
+}
+
+// Fatalf implements grpclog.LoggerV2.
+func (l *gcpLogger) Fatalf(format string, args ...interface{}) {
+ l.logger.Fatalf(l.prefix+format, args...)
+}
+
+// Fatalln implements grpclog.LoggerV2.
+func (l *gcpLogger) Fatalln(args ...interface{}) {
+ l.Fatalln(append([]interface{}{l.prefix}, args)...)
+}
+
+// Info implements grpclog.LoggerV2.
+func (l *gcpLogger) Info(args ...interface{}) {
+ l.logger.Info(append([]interface{}{l.prefix}, args)...)
+}
+
+// Infof implements grpclog.LoggerV2.
+func (l *gcpLogger) Infof(format string, args ...interface{}) {
+ l.logger.Infof(l.prefix+format, args...)
+}
+
+// Infoln implements grpclog.LoggerV2.
+func (l *gcpLogger) Infoln(args ...interface{}) {
+ l.logger.Infoln(append([]interface{}{l.prefix}, args)...)
+}
+
+// V implements grpclog.LoggerV2.
+func (l *gcpLogger) V(level int) bool {
+ return l.logger.V(level)
+}
+
+// Warning implements grpclog.LoggerV2.
+func (l *gcpLogger) Warning(args ...interface{}) {
+ l.logger.Warning(append([]interface{}{l.prefix}, args)...)
+}
+
+// Warningf implements grpclog.LoggerV2.
+func (l *gcpLogger) Warningf(format string, args ...interface{}) {
+ l.logger.Warningf(l.prefix+format, args...)
+}
+
+// Warningln implements grpclog.LoggerV2.
+func (l *gcpLogger) Warningln(args ...interface{}) {
+ l.logger.Warningln(append([]interface{}{l.prefix}, args)...)
+}
diff --git a/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/gcp_multiendpoint.go b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/gcp_multiendpoint.go
new file mode 100644
index 000000000..9ee507437
--- /dev/null
+++ b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/gcp_multiendpoint.go
@@ -0,0 +1,408 @@
+/*
+ *
+ * Copyright 2023 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package grpcgcp
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "sync/atomic"
+
+ "github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/multiendpoint"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/connectivity"
+ "google.golang.org/grpc/grpclog"
+ "google.golang.org/protobuf/encoding/protojson"
+ "google.golang.org/protobuf/proto"
+
+ pb "github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp"
+)
+
+var gmeCounter uint32
+
+type contextMEKey int
+
+var meKey contextMEKey
+
+// NewMEContext returns a new Context that carries Multiendpoint name.
+func NewMEContext(ctx context.Context, name string) context.Context {
+ return context.WithValue(ctx, meKey, name)
+}
+
+// FromMEContext returns the MultiEndpoint name stored in ctx, if any.
+func FromMEContext(ctx context.Context) (string, bool) {
+ name, ok := ctx.Value(meKey).(string)
+ return name, ok
+}
+
+// GCPMultiEndpoint holds the state of MultiEndpoints-enabled gRPC client connection.
+//
+// The purposes of GCPMultiEndpoint are:
+//
+// - Fallback to an alternative endpoint (host:port) of a gRPC service when the original
+// endpoint is completely unavailable.
+// - Be able to route an RPC call to a specific group of endpoints.
+// - Be able to reconfigure endpoints in runtime.
+//
+// A group of endpoints is called a [multiendpoint.MultiEndpoint] and is essentially a list of endpoints
+// where priority is defined by the position in the list with the first endpoint having top
+// priority. A MultiEndpoint tracks endpoints' availability. When a MultiEndpoint is picked for an
+// RPC call, it picks the top priority endpoint that is currently available. More information on the
+// [multiendpoint.MultiEndpoint].
+//
+// GCPMultiEndpoint can have one or more MultiEndpoint identified by its name -- arbitrary
+// string provided in the [GCPMultiEndpointOptions] when configuring MultiEndpoints. This name
+// can be used to route an RPC call to this MultiEndpoint by using the [NewMEContext].
+//
+// GCPMultiEndpoint uses [GCPMultiEndpointOptions] for initial configuration.
+// An updated configuration can be provided at any time later using [UpdateMultiEndpoints].
+//
+// Example:
+//
+// Let's assume we have a service with read and write operations and the following backends:
+//
+// - service.example.com -- the main set of backends supporting all operations
+// - service-fallback.example.com -- read-write replica supporting all operations
+// - ro-service.example.com -- read-only replica supporting only read operations
+//
+// Example configuration:
+//
+// - MultiEndpoint named "default" with endpoints:
+//
+// 1. service.example.com:443
+//
+// 2. service-fallback.example.com:443
+//
+// - MultiEndpoint named "read" with endpoints:
+//
+// 1. ro-service.example.com:443
+//
+// 2. service-fallback.example.com:443
+//
+// 3. service.example.com:443
+//
+// With the configuration above GCPMultiEndpoint will use the "default" MultiEndpoint by
+// default. It means that RPC calls by default will use the main endpoint and if it is not available
+// then the read-write replica.
+//
+// To offload some read calls to the read-only replica we can specify "read" MultiEndpoint in the
+// context. Then these calls will use the read-only replica endpoint and if it is not available
+// then the read-write replica and if it is also not available then the main endpoint.
+//
+// GCPMultiEndpoint creates a [grpcgcp] connection pool for every unique
+// endpoint. For the example above three connection pools will be created.
+//
+// [GCPMultiEndpoint] implements [grpc.ClientConnInterface] and can be used
+// as a [grpc.ClientConn] when creating gRPC clients.
+type GCPMultiEndpoint struct {
+ mu sync.RWMutex
+
+ defaultName string
+ mes map[string]multiendpoint.MultiEndpoint
+ pools map[string]*monitoredConn
+ opts []grpc.DialOption
+ gcpConfig *pb.ApiConfig
+ dialFunc func(ctx context.Context, target string, dopts ...grpc.DialOption) (*grpc.ClientConn, error)
+ log grpclog.LoggerV2
+
+ grpc.ClientConnInterface
+}
+
+// Make sure GCPMultiEndpoint implements grpc.ClientConnInterface.
+var _ grpc.ClientConnInterface = (*GCPMultiEndpoint)(nil)
+
+func (gme *GCPMultiEndpoint) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption) error {
+ return gme.pickConn(ctx).Invoke(ctx, method, args, reply, opts...)
+}
+
+func (gme *GCPMultiEndpoint) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
+ return gme.pickConn(ctx).NewStream(ctx, desc, method, opts...)
+}
+
+func (gme *GCPMultiEndpoint) pickConn(ctx context.Context) *grpc.ClientConn {
+ name, ok := FromMEContext(ctx)
+ me, ook := gme.mes[name]
+ if !ok || !ook {
+ me = gme.mes[gme.defaultName]
+ }
+ return gme.pools[me.Current()].conn
+}
+
+func (gme *GCPMultiEndpoint) Close() error {
+ var errs multiError
+ for e, mc := range gme.pools {
+ mc.stopMonitoring()
+ if err := mc.conn.Close(); err != nil {
+ errs = append(errs, err)
+ gme.log.Errorf("error while closing the pool for %q endpoint: %v", e, err)
+ }
+ if gme.log.V(FINE) {
+ gme.log.Infof("closed channel pool for %q endpoint.", e)
+ }
+ }
+ return errs.Combine()
+}
+
+func (gme *GCPMultiEndpoint) GCPConfig() *pb.ApiConfig {
+ return proto.Clone(gme.gcpConfig).(*pb.ApiConfig)
+}
+
+// GCPMultiEndpointOptions holds options to construct a MultiEndpoints-enabled gRPC client
+// connection.
+type GCPMultiEndpointOptions struct {
+ // Regular gRPC-GCP configuration to be applied to every endpoint.
+ GRPCgcpConfig *pb.ApiConfig
+ // Map of MultiEndpoints where key is the MultiEndpoint name.
+ MultiEndpoints map[string]*multiendpoint.MultiEndpointOptions
+ // Name of the default MultiEndpoint.
+ Default string
+ // Func to dial grpc ClientConn.
+ DialFunc func(ctx context.Context, target string, dopts ...grpc.DialOption) (*grpc.ClientConn, error)
+}
+
+// NewGCPMultiEndpoint creates new [GCPMultiEndpoint] -- MultiEndpoints-enabled gRPC client
+// connection.
+//
+// Deprecated: use NewGCPMultiEndpoint.
+func NewGcpMultiEndpoint(meOpts *GCPMultiEndpointOptions, opts ...grpc.DialOption) (*GCPMultiEndpoint, error) {
+ return NewGCPMultiEndpoint(meOpts, opts...)
+}
+
+// NewGCPMultiEndpoint creates new [GCPMultiEndpoint] -- MultiEndpoints-enabled gRPC client
+// connection.
+//
+// [GCPMultiEndpoint] implements [grpc.ClientConnInterface] and can be used
+// as a [grpc.ClientConn] when creating gRPC clients.
+func NewGCPMultiEndpoint(meOpts *GCPMultiEndpointOptions, opts ...grpc.DialOption) (*GCPMultiEndpoint, error) {
+ // Read config, create multiendpoints and pools.
+ o, err := makeOpts(meOpts, opts)
+ if err != nil {
+ return nil, err
+ }
+ gme := &GCPMultiEndpoint{
+ mes: make(map[string]multiendpoint.MultiEndpoint),
+ pools: make(map[string]*monitoredConn),
+ defaultName: meOpts.Default,
+ opts: o,
+ gcpConfig: proto.Clone(meOpts.GRPCgcpConfig).(*pb.ApiConfig),
+ dialFunc: meOpts.DialFunc,
+ log: NewGCPLogger(compLogger, fmt.Sprintf("[GCPMultiEndpoint #%d]", atomic.AddUint32(&gmeCounter, 1))),
+ }
+ if gme.dialFunc == nil {
+ gme.dialFunc = func(_ context.Context, target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
+ return grpc.Dial(target, opts...)
+ }
+ }
+ if err := gme.UpdateMultiEndpoints(meOpts); err != nil {
+ return nil, err
+ }
+ return gme, nil
+}
+
+func makeOpts(meOpts *GCPMultiEndpointOptions, opts []grpc.DialOption) ([]grpc.DialOption, error) {
+ grpcGCPjsonConfig, err := protojson.Marshal(meOpts.GRPCgcpConfig)
+ if err != nil {
+ return nil, err
+ }
+ o := append([]grpc.DialOption{}, opts...)
+ o = append(o, []grpc.DialOption{
+ grpc.WithDisableServiceConfig(),
+ grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":%s}]}`, Name, string(grpcGCPjsonConfig))),
+ grpc.WithChainUnaryInterceptor(GCPUnaryClientInterceptor),
+ grpc.WithChainStreamInterceptor(GCPStreamClientInterceptor),
+ }...)
+
+ return o, nil
+}
+
+type monitoredConn struct {
+ endpoint string
+ conn *grpc.ClientConn
+ gme *GCPMultiEndpoint
+ cancel context.CancelFunc
+}
+
+func newMonitoredConn(endpoint string, conn *grpc.ClientConn, gme *GCPMultiEndpoint) (mc *monitoredConn) {
+ ctx, cancel := context.WithCancel(context.Background())
+ mc = &monitoredConn{
+ endpoint: endpoint,
+ conn: conn,
+ gme: gme,
+ cancel: cancel,
+ }
+ go mc.monitor(ctx)
+ return
+}
+
+func (mc *monitoredConn) notify(state connectivity.State) {
+ if mc.gme.log.V(FINE) {
+ mc.gme.log.Infof("%q endpoint state changed to %v", mc.endpoint, state)
+ }
+ // Inform all multiendpoints.
+ mc.gme.mu.RLock()
+ for _, me := range mc.gme.mes {
+ me.SetEndpointAvailability(mc.endpoint, state == connectivity.Ready)
+ }
+ mc.gme.mu.RUnlock()
+}
+
+func (mc *monitoredConn) monitor(ctx context.Context) {
+ for {
+ currentState := mc.conn.GetState()
+ mc.notify(currentState)
+ if !mc.conn.WaitForStateChange(ctx, currentState) {
+ break
+ }
+ }
+}
+
+func (mc *monitoredConn) stopMonitoring() {
+ mc.cancel()
+}
+
+// UpdateMultiEndpoints reconfigures MultiEndpoints.
+//
+// MultiEndpoints are matched with the current ones by name.
+//
+// - If a current MultiEndpoint is missing in the updated list, the MultiEndpoint will be
+// removed.
+// - A new MultiEndpoint will be created for every new name in the list.
+// - For an existing MultiEndpoint only its endpoints will be updated (no recovery timeout
+// change).
+//
+// Endpoints are matched by the endpoint address (usually in the form of address:port).
+//
+// - If an existing endpoint is not used by any MultiEndpoint in the updated list, then the
+// connection poll for this endpoint will be shutdown.
+// - A connection pool will be created for every new endpoint.
+// - For an existing endpoint nothing will change (the connection pool will not be re-created,
+// thus no connection credentials change, nor connection configuration change).
+func (gme *GCPMultiEndpoint) UpdateMultiEndpoints(meOpts *GCPMultiEndpointOptions) error {
+ gme.mu.Lock()
+ defer gme.mu.Unlock()
+ if _, ok := meOpts.MultiEndpoints[meOpts.Default]; !ok {
+ return fmt.Errorf("default MultiEndpoint %q missing options", meOpts.Default)
+ }
+
+ validPools := make(map[string]bool)
+ for _, meo := range meOpts.MultiEndpoints {
+ for _, e := range meo.Endpoints {
+ validPools[e] = true
+ }
+ }
+
+ // Add missing pools.
+ for e := range validPools {
+ if _, ok := gme.pools[e]; !ok {
+ // This creates a ClientConn with the gRPC-GCP balancer managing connection pool.
+ conn, err := gme.dialFunc(context.Background(), e, gme.opts...)
+ if err != nil {
+ return err
+ }
+ if gme.log.V(FINE) {
+ gme.log.Infof("created new channel pool for %q endpoint.", e)
+ }
+ gme.pools[e] = newMonitoredConn(e, conn, gme)
+ }
+ }
+
+ // Add new multi-endpoints and update existing.
+ for name, meo := range meOpts.MultiEndpoints {
+ if me, ok := gme.mes[name]; ok {
+ // Updating existing MultiEndpoint.
+ me.SetEndpoints(meo.Endpoints)
+ continue
+ }
+
+ // Add new MultiEndpoint.
+ if gme.log.V(FINE) {
+ gme.log.Infof("creating new %q multiendpoint.", name)
+ }
+ me, err := multiendpoint.NewMultiEndpoint(meo)
+ if err != nil {
+ return err
+ }
+ gme.mes[name] = me
+ }
+ gme.defaultName = meOpts.Default
+
+ // Remove obsolete MultiEndpoints.
+ for name := range gme.mes {
+ if _, ok := meOpts.MultiEndpoints[name]; !ok {
+ delete(gme.mes, name)
+ if gme.log.V(FINE) {
+ gme.log.Infof("removed obsolete %q multiendpoint.", name)
+ }
+ }
+ }
+
+ // Remove obsolete pools.
+ for e, mc := range gme.pools {
+ if _, ok := validPools[e]; !ok {
+ if err := mc.conn.Close(); err != nil {
+ gme.log.Errorf("error while closing the pool for %q endpoint: %v", e, err)
+ }
+ if gme.log.V(FINE) {
+ gme.log.Infof("closed channel pool for %q endpoint.", e)
+ }
+ mc.stopMonitoring()
+ delete(gme.pools, e)
+ }
+ }
+
+ // Trigger status update.
+ for e, mc := range gme.pools {
+ s := mc.conn.GetState()
+ for _, me := range gme.mes {
+ me.SetEndpointAvailability(e, s == connectivity.Ready)
+ }
+ }
+ return nil
+}
+
+type multiError []error
+
+func (m multiError) Error() string {
+ s, n := "", 0
+ for _, e := range m {
+ if e != nil {
+ if n == 0 {
+ s = e.Error()
+ }
+ n++
+ }
+ }
+ switch n {
+ case 0:
+ return "(0 errors)"
+ case 1:
+ return s
+ case 2:
+ return s + " (and 1 other error)"
+ }
+ return fmt.Sprintf("%s (and %d other errors)", s, n-1)
+}
+
+func (m multiError) Combine() error {
+ if len(m) == 0 {
+ return nil
+ }
+
+ return m
+}
diff --git a/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/gcp_picker.go b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/gcp_picker.go
new file mode 100644
index 000000000..522323625
--- /dev/null
+++ b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/gcp_picker.go
@@ -0,0 +1,276 @@
+/*
+ *
+ * Copyright 2019 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package grpcgcp
+
+import (
+ "context"
+ "fmt"
+ "reflect"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp"
+ "google.golang.org/grpc/balancer"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/grpclog"
+ "google.golang.org/grpc/status"
+)
+
+// Deadline exceeded gRPC error caused by client-side context reached deadline.
+var deErr = status.Error(codes.DeadlineExceeded, context.DeadlineExceeded.Error())
+
+func newGCPPicker(readySCRefs []*subConnRef, gb *gcpBalancer) balancer.Picker {
+ gp := &gcpPicker{
+ gb: gb,
+ scRefs: readySCRefs,
+ }
+ gp.log = NewGCPLogger(gb.log, fmt.Sprintf("[gcpPicker %p]", gp))
+ return gp
+}
+
+type gcpPicker struct {
+ gb *gcpBalancer
+ mu sync.Mutex
+ scRefs []*subConnRef
+ log grpclog.LoggerV2
+}
+
+func (p *gcpPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
+ if len(p.scRefs) <= 0 {
+ if p.log.V(FINEST) {
+ p.log.Info("returning balancer.ErrNoSubConnAvailable as no subconns are available.")
+ }
+ return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
+ }
+
+ ctx := info.Ctx
+ gcpCtx, hasGCPCtx := ctx.Value(gcpKey).(*gcpContext)
+ boundKey := ""
+ locator := ""
+ var cmd grpc_gcp.AffinityConfig_Command
+
+ if mcfg, ok := p.gb.methodCfg[info.FullMethodName]; ok {
+ locator = mcfg.GetAffinityKey()
+ cmd = mcfg.GetCommand()
+ if hasGCPCtx && (cmd == grpc_gcp.AffinityConfig_BOUND || cmd == grpc_gcp.AffinityConfig_UNBIND) {
+ a, err := getAffinityKeysFromMessage(locator, gcpCtx.reqMsg)
+ if err != nil {
+ return balancer.PickResult{}, fmt.Errorf(
+ "failed to retrieve affinity key from request message: %v", err)
+ }
+ boundKey = a[0]
+ }
+ }
+
+ scRef, err := p.getAndIncrementSubConnRef(info.Ctx, boundKey, cmd)
+ if err != nil {
+ return balancer.PickResult{}, err
+ }
+ if scRef == nil {
+ if p.log.V(FINEST) {
+ p.log.Info("returning balancer.ErrNoSubConnAvailable as no SubConn was picked.")
+ }
+ return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
+ }
+
+ callStarted := time.Now()
+ // define callback for post process once call is done
+ callback := func(info balancer.DoneInfo) {
+ scRef.streamsDecr()
+ p.detectUnresponsive(ctx, scRef, callStarted, info.Err)
+ if info.Err != nil {
+ return
+ }
+
+ switch cmd {
+ case grpc_gcp.AffinityConfig_BIND:
+ bindKeys, err := getAffinityKeysFromMessage(locator, gcpCtx.replyMsg)
+ if err == nil {
+ for _, bk := range bindKeys {
+ p.gb.bindSubConn(bk, scRef.subConn)
+ }
+ }
+ case grpc_gcp.AffinityConfig_UNBIND:
+ p.gb.unbindSubConn(boundKey)
+ }
+ }
+
+ if p.log.V(FINEST) {
+ p.log.Infof("picked SubConn: %p", scRef.subConn)
+ }
+ return balancer.PickResult{SubConn: scRef.subConn, Done: callback}, nil
+}
+
+// unresponsiveWindow returns channel pool's unresponsiveDetectionMs multiplied
+// by 2^(refresh count since last response) as a time.Duration. This provides
+// exponential backoff when RPCs keep deadline exceeded after consecutive reconnections.
+func (p *gcpPicker) unresponsiveWindow(scRef *subConnRef) time.Duration {
+ factor := uint32(1 << scRef.refreshCnt)
+ return time.Millisecond * time.Duration(factor*p.gb.cfg.GetChannelPool().GetUnresponsiveDetectionMs())
+}
+
+func (p *gcpPicker) detectUnresponsive(ctx context.Context, scRef *subConnRef, callStarted time.Time, rpcErr error) {
+ if !p.gb.unresponsiveDetection {
+ return
+ }
+
+ // Treat as a response from the server if deadline exceeded was not caused by client side context reached deadline.
+ if dl, ok := ctx.Deadline(); rpcErr == nil || status.Code(rpcErr) != codes.DeadlineExceeded ||
+ rpcErr.Error() != deErr.Error() || !ok || dl.After(time.Now()) {
+ scRef.gotResp()
+ return
+ }
+
+ if callStarted.Before(scRef.lastResp) {
+ return
+ }
+
+ // Increment deadline exceeded calls and check if there were enough deadline
+ // exceeded calls and enough time passed since last response to trigger refresh.
+ if scRef.deCallsInc() >= p.gb.cfg.GetChannelPool().GetUnresponsiveCalls() &&
+ scRef.lastResp.Before(time.Now().Add(-p.unresponsiveWindow(scRef))) {
+ p.gb.refresh(scRef)
+ }
+}
+
+func (p *gcpPicker) getAndIncrementSubConnRef(ctx context.Context, boundKey string, cmd grpc_gcp.AffinityConfig_Command) (*subConnRef, error) {
+ if cmd == grpc_gcp.AffinityConfig_BIND && p.gb.cfg.GetChannelPool().GetBindPickStrategy() == grpc_gcp.ChannelPoolConfig_ROUND_ROBIN {
+ scRef := p.gb.getSubConnRoundRobin(ctx)
+ if p.log.V(FINEST) {
+ p.log.Infof("picking SubConn for round-robin bind: %p", scRef.subConn)
+ }
+ scRef.streamsIncr()
+ return scRef, nil
+ }
+
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ scRef, err := p.getSubConnRef(boundKey)
+ if err != nil {
+ return nil, err
+ }
+ if scRef != nil {
+ scRef.streamsIncr()
+ }
+ return scRef, nil
+}
+
+// getSubConnRef returns the subConnRef object that contains the subconn
+// ready to be used by picker.
+// Must be called holding the picker mutex lock.
+func (p *gcpPicker) getSubConnRef(boundKey string) (*subConnRef, error) {
+ if boundKey != "" {
+ if ref, ok := p.gb.getReadySubConnRef(boundKey); ok {
+ return ref, nil
+ }
+ }
+
+ return p.getLeastBusySubConnRef()
+}
+
+// Must be called holding the picker mutex lock.
+func (p *gcpPicker) getLeastBusySubConnRef() (*subConnRef, error) {
+ minScRef := p.scRefs[0]
+ minStreamsCnt := minScRef.getStreamsCnt()
+ for _, scRef := range p.scRefs {
+ if scRef.getStreamsCnt() < minStreamsCnt {
+ minStreamsCnt = scRef.getStreamsCnt()
+ minScRef = scRef
+ }
+ }
+
+ // If the least busy connection still has capacity, use it
+ if minStreamsCnt < int32(p.gb.cfg.GetChannelPool().GetMaxConcurrentStreamsLowWatermark()) {
+ return minScRef, nil
+ }
+
+ if p.gb.cfg.GetChannelPool().GetMaxSize() == 0 || p.gb.getConnectionPoolSize() < int(p.gb.cfg.GetChannelPool().GetMaxSize()) {
+ // Ask balancer to create new subconn when all current subconns are busy and
+ // the connection pool still has capacity (either unlimited or maxSize is not reached).
+ p.gb.newSubConn()
+
+ // Let this picker return ErrNoSubConnAvailable because it needs some time
+ // for the subconn to be READY.
+ return nil, balancer.ErrNoSubConnAvailable
+ }
+
+ // If no capacity for the pool size and every connection reachs the soft limit,
+ // Then picks the least busy one anyway.
+ return minScRef, nil
+}
+
+func keysFromMessage(val reflect.Value, path []string, start int) ([]string, error) {
+ if val.Kind() == reflect.Pointer || val.Kind() == reflect.Interface {
+ val = val.Elem()
+ }
+
+ if len(path) == start {
+ if val.Kind() != reflect.String {
+ return nil, fmt.Errorf("cannot get string value from %q which is %q", strings.Join(path, "."), val.Kind())
+ }
+ return []string{val.String()}, nil
+ }
+
+ if val.Kind() != reflect.Struct {
+ return nil, fmt.Errorf("path %q traversal error: cannot lookup field %q (index %d in the path) in a %q value", strings.Join(path, "."), path[start], start, val.Kind())
+ }
+ valField := val.FieldByName(strings.Title(path[start]))
+
+ if valField.Kind() != reflect.Slice {
+ return keysFromMessage(valField, path, start+1)
+ }
+
+ keys := []string{}
+ for i := 0; i < valField.Len(); i++ {
+ kk, err := keysFromMessage(valField.Index(i), path, start+1)
+ if err != nil {
+ return keys, err
+ }
+ keys = append(keys, kk...)
+ }
+ return keys, nil
+}
+
+// getAffinityKeysFromMessage retrieves the affinity key(s) from proto message using
+// the key locator defined in the affinity config.
+func getAffinityKeysFromMessage(
+ locator string,
+ msg interface{},
+) (affinityKeys []string, err error) {
+ names := strings.Split(locator, ".")
+ if len(names) == 0 {
+ return nil, fmt.Errorf("empty affinityKey locator")
+ }
+
+ return keysFromMessage(reflect.ValueOf(msg), names, 0)
+}
+
+// NewErrPicker returns a picker that always returns err on Pick().
+func newErrPicker(err error) balancer.Picker {
+ return &errPicker{err: err}
+}
+
+type errPicker struct {
+ err error // Pick() always returns this err.
+}
+
+func (p *errPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
+ return balancer.PickResult{}, p.err
+}
diff --git a/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp/codegen.sh b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp/codegen.sh
new file mode 100644
index 000000000..334a718cc
--- /dev/null
+++ b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp/codegen.sh
@@ -0,0 +1,6 @@
+#!/usr/bin/env bash
+cd "$(dirname "$0")"
+
+rm grpc_gcp.pb.go
+protoc --plugin=$(go env GOPATH)/bin/protoc-gen-go --proto_path=./ --go_out=.. ./grpc_gcp.proto
+
diff --git a/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp/grpc_gcp.pb.go b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp/grpc_gcp.pb.go
new file mode 100644
index 000000000..51dd4b27a
--- /dev/null
+++ b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp/grpc_gcp.pb.go
@@ -0,0 +1,638 @@
+// Copyright 2018 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// versions:
+// protoc-gen-go v1.27.1
+// protoc v3.12.4
+// source: grpc_gcp.proto
+
+package grpc_gcp
+
+import (
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+ reflect "reflect"
+ sync "sync"
+)
+
+const (
+ // Verify that this generated code is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
+ // Verify that runtime/protoimpl is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
+)
+
+// A selection of strategies for picking a channel for a call with BIND command.
+type ChannelPoolConfig_BindPickStrategy int32
+
+const (
+ // No preference -- picking a channel for a BIND call will be no different
+ // than for any other calls.
+ ChannelPoolConfig_UNSPECIFIED ChannelPoolConfig_BindPickStrategy = 0
+ // A channel with the least active streams at the moment of a BIND call
+ // initiation will be picked.
+ ChannelPoolConfig_LEAST_ACTIVE_STREAMS ChannelPoolConfig_BindPickStrategy = 1
+ // Cycle through channels created by the BIND call initiation. I. e. pick
+ // a channel in a round-robin manner. Note that some channels may be
+ // skipped during channel pool resize.
+ ChannelPoolConfig_ROUND_ROBIN ChannelPoolConfig_BindPickStrategy = 2
+)
+
+// Enum value maps for ChannelPoolConfig_BindPickStrategy.
+var (
+ ChannelPoolConfig_BindPickStrategy_name = map[int32]string{
+ 0: "UNSPECIFIED",
+ 1: "LEAST_ACTIVE_STREAMS",
+ 2: "ROUND_ROBIN",
+ }
+ ChannelPoolConfig_BindPickStrategy_value = map[string]int32{
+ "UNSPECIFIED": 0,
+ "LEAST_ACTIVE_STREAMS": 1,
+ "ROUND_ROBIN": 2,
+ }
+)
+
+func (x ChannelPoolConfig_BindPickStrategy) Enum() *ChannelPoolConfig_BindPickStrategy {
+ p := new(ChannelPoolConfig_BindPickStrategy)
+ *p = x
+ return p
+}
+
+func (x ChannelPoolConfig_BindPickStrategy) String() string {
+ return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
+}
+
+func (ChannelPoolConfig_BindPickStrategy) Descriptor() protoreflect.EnumDescriptor {
+ return file_grpc_gcp_proto_enumTypes[0].Descriptor()
+}
+
+func (ChannelPoolConfig_BindPickStrategy) Type() protoreflect.EnumType {
+ return &file_grpc_gcp_proto_enumTypes[0]
+}
+
+func (x ChannelPoolConfig_BindPickStrategy) Number() protoreflect.EnumNumber {
+ return protoreflect.EnumNumber(x)
+}
+
+// Deprecated: Use ChannelPoolConfig_BindPickStrategy.Descriptor instead.
+func (ChannelPoolConfig_BindPickStrategy) EnumDescriptor() ([]byte, []int) {
+ return file_grpc_gcp_proto_rawDescGZIP(), []int{1, 0}
+}
+
+type AffinityConfig_Command int32
+
+const (
+ // The annotated method will be required to be bound to an existing session
+ // to execute the RPC. The corresponding <affinity_key_field_path> will be
+ // used to find the affinity key from the request message.
+ AffinityConfig_BOUND AffinityConfig_Command = 0
+ // The annotated method will establish the channel affinity with the
+ // channel which is used to execute the RPC. The corresponding
+ // <affinity_key_field_path> will be used to find the affinity key from the
+ // response message.
+ AffinityConfig_BIND AffinityConfig_Command = 1
+ // The annotated method will remove the channel affinity with the
+ // channel which is used to execute the RPC. The corresponding
+ // <affinity_key_field_path> will be used to find the affinity key from the
+ // request message.
+ AffinityConfig_UNBIND AffinityConfig_Command = 2
+)
+
+// Enum value maps for AffinityConfig_Command.
+var (
+ AffinityConfig_Command_name = map[int32]string{
+ 0: "BOUND",
+ 1: "BIND",
+ 2: "UNBIND",
+ }
+ AffinityConfig_Command_value = map[string]int32{
+ "BOUND": 0,
+ "BIND": 1,
+ "UNBIND": 2,
+ }
+)
+
+func (x AffinityConfig_Command) Enum() *AffinityConfig_Command {
+ p := new(AffinityConfig_Command)
+ *p = x
+ return p
+}
+
+func (x AffinityConfig_Command) String() string {
+ return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
+}
+
+func (AffinityConfig_Command) Descriptor() protoreflect.EnumDescriptor {
+ return file_grpc_gcp_proto_enumTypes[1].Descriptor()
+}
+
+func (AffinityConfig_Command) Type() protoreflect.EnumType {
+ return &file_grpc_gcp_proto_enumTypes[1]
+}
+
+func (x AffinityConfig_Command) Number() protoreflect.EnumNumber {
+ return protoreflect.EnumNumber(x)
+}
+
+// Deprecated: Use AffinityConfig_Command.Descriptor instead.
+func (AffinityConfig_Command) EnumDescriptor() ([]byte, []int) {
+ return file_grpc_gcp_proto_rawDescGZIP(), []int{3, 0}
+}
+
+type ApiConfig struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ // The channel pool configurations.
+ ChannelPool *ChannelPoolConfig `protobuf:"bytes,2,opt,name=channel_pool,json=channelPool,proto3" json:"channel_pool,omitempty"`
+ // The method configurations.
+ Method []*MethodConfig `protobuf:"bytes,1001,rep,name=method,proto3" json:"method,omitempty"`
+}
+
+func (x *ApiConfig) Reset() {
+ *x = ApiConfig{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_grpc_gcp_proto_msgTypes[0]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *ApiConfig) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*ApiConfig) ProtoMessage() {}
+
+func (x *ApiConfig) ProtoReflect() protoreflect.Message {
+ mi := &file_grpc_gcp_proto_msgTypes[0]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use ApiConfig.ProtoReflect.Descriptor instead.
+func (*ApiConfig) Descriptor() ([]byte, []int) {
+ return file_grpc_gcp_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *ApiConfig) GetChannelPool() *ChannelPoolConfig {
+ if x != nil {
+ return x.ChannelPool
+ }
+ return nil
+}
+
+func (x *ApiConfig) GetMethod() []*MethodConfig {
+ if x != nil {
+ return x.Method
+ }
+ return nil
+}
+
+// ChannelPoolConfig are options for configuring the channel pool.
+// RPCs will be scheduled onto existing channels in the pool until all channels
+// have <max_concurrent_streams_low_watermark> number of streams. At this point
+// a new channel is spun out. Once <max_size> channels have been spun out and
+// each has <max_concurrent_streams_low_watermark> streams, subsequent RPCs will
+// hang until any of the in-flight RPCs is finished, freeing up a channel.
+type ChannelPoolConfig struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ // The max number of channels in the pool.
+ // Default value is 0, meaning 'unlimited' size.
+ MaxSize uint32 `protobuf:"varint,1,opt,name=max_size,json=maxSize,proto3" json:"max_size,omitempty"`
+ // The idle timeout (seconds) of channels without bound affinity sessions.
+ IdleTimeout uint64 `protobuf:"varint,2,opt,name=idle_timeout,json=idleTimeout,proto3" json:"idle_timeout,omitempty"`
+ // The low watermark of max number of concurrent streams in a channel.
+ // New channel will be created once it get hit, until we reach the max size of the channel pool.
+ // Default value is 100. The valid range is [1, 100]. Any value outside the range will be ignored and the default value will be used.
+ // Note: It is not recommended that users adjust this value, since a single channel should generally have no trouble managing the default (maximum) number of streams.
+ MaxConcurrentStreamsLowWatermark uint32 `protobuf:"varint,3,opt,name=max_concurrent_streams_low_watermark,json=maxConcurrentStreamsLowWatermark,proto3" json:"max_concurrent_streams_low_watermark,omitempty"`
+ // The minimum number of channels in the pool.
+ MinSize uint32 `protobuf:"varint,4,opt,name=min_size,json=minSize,proto3" json:"min_size,omitempty"`
+ // If a channel mapped to an affinity key is not ready, temporarily fallback
+ // to another ready channel.
+ // Enabling this fallback is beneficial in scenarios with short RPC timeouts
+ // and rather slow connection establishing or during incidents when new
+ // connections fail but existing connections still operate.
+ FallbackToReady bool `protobuf:"varint,5,opt,name=fallback_to_ready,json=fallbackToReady,proto3" json:"fallback_to_ready,omitempty"`
+ // Enables per channel unresponsive connection detection if > 0 and unresponsive_calls > 0.
+ // If enabled and more than unresponsive_detection_ms passed since the last response from the server,
+ // and >= unresponsive_calls RPC calls (started after last response from the server) timed-out on the client side,
+ // then the connection of that channel will be gracefully refreshed. I.e., a new connection will be created for
+ // that channel and after the new connection is ready it will replace the old connection. The calls on the old
+ // connection will not be interrupted. The unresponsive_detection_ms will be doubled every consecutive refresh
+ // if no response from the server is received.
+ UnresponsiveDetectionMs uint32 `protobuf:"varint,6,opt,name=unresponsive_detection_ms,json=unresponsiveDetectionMs,proto3" json:"unresponsive_detection_ms,omitempty"`
+ // Enables per channel unresponsive connection detection if > 0 and unresponsive_detection_ms > 0.
+ // If enabled and more than unresponsive_detection_ms passed since the last response from the server,
+ // and >= unresponsive_calls RPC calls (started after last response from the server) timed-out on the client side,
+ // then the connection of that channel will be gracefully refreshed. I.e., a new connection will be created for
+ // that channel and after the new connection is ready it will replace the old connection. The calls on the old
+ // connection will not be interrupted. The unresponsive_detection_ms will be doubled every consecutive refresh
+ // if no response from the server is received.
+ UnresponsiveCalls uint32 `protobuf:"varint,7,opt,name=unresponsive_calls,json=unresponsiveCalls,proto3" json:"unresponsive_calls,omitempty"`
+ // The strategy for picking a channel for a call with BIND command.
+ BindPickStrategy ChannelPoolConfig_BindPickStrategy `protobuf:"varint,8,opt,name=bind_pick_strategy,json=bindPickStrategy,proto3,enum=grpc.gcp.ChannelPoolConfig_BindPickStrategy" json:"bind_pick_strategy,omitempty"`
+}
+
+func (x *ChannelPoolConfig) Reset() {
+ *x = ChannelPoolConfig{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_grpc_gcp_proto_msgTypes[1]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *ChannelPoolConfig) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*ChannelPoolConfig) ProtoMessage() {}
+
+func (x *ChannelPoolConfig) ProtoReflect() protoreflect.Message {
+ mi := &file_grpc_gcp_proto_msgTypes[1]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use ChannelPoolConfig.ProtoReflect.Descriptor instead.
+func (*ChannelPoolConfig) Descriptor() ([]byte, []int) {
+ return file_grpc_gcp_proto_rawDescGZIP(), []int{1}
+}
+
+func (x *ChannelPoolConfig) GetMaxSize() uint32 {
+ if x != nil {
+ return x.MaxSize
+ }
+ return 0
+}
+
+func (x *ChannelPoolConfig) GetIdleTimeout() uint64 {
+ if x != nil {
+ return x.IdleTimeout
+ }
+ return 0
+}
+
+func (x *ChannelPoolConfig) GetMaxConcurrentStreamsLowWatermark() uint32 {
+ if x != nil {
+ return x.MaxConcurrentStreamsLowWatermark
+ }
+ return 0
+}
+
+func (x *ChannelPoolConfig) GetMinSize() uint32 {
+ if x != nil {
+ return x.MinSize
+ }
+ return 0
+}
+
+func (x *ChannelPoolConfig) GetFallbackToReady() bool {
+ if x != nil {
+ return x.FallbackToReady
+ }
+ return false
+}
+
+func (x *ChannelPoolConfig) GetUnresponsiveDetectionMs() uint32 {
+ if x != nil {
+ return x.UnresponsiveDetectionMs
+ }
+ return 0
+}
+
+func (x *ChannelPoolConfig) GetUnresponsiveCalls() uint32 {
+ if x != nil {
+ return x.UnresponsiveCalls
+ }
+ return 0
+}
+
+func (x *ChannelPoolConfig) GetBindPickStrategy() ChannelPoolConfig_BindPickStrategy {
+ if x != nil {
+ return x.BindPickStrategy
+ }
+ return ChannelPoolConfig_UNSPECIFIED
+}
+
+type MethodConfig struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ // A fully qualified name of a gRPC method, or a wildcard pattern ending
+ // with .*, such as foo.bar.A, foo.bar.*. Method configs are evaluated
+ // sequentially, and the first one takes precedence.
+ Name []string `protobuf:"bytes,1,rep,name=name,proto3" json:"name,omitempty"`
+ // The channel affinity configurations.
+ Affinity *AffinityConfig `protobuf:"bytes,1001,opt,name=affinity,proto3" json:"affinity,omitempty"`
+}
+
+func (x *MethodConfig) Reset() {
+ *x = MethodConfig{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_grpc_gcp_proto_msgTypes[2]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *MethodConfig) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*MethodConfig) ProtoMessage() {}
+
+func (x *MethodConfig) ProtoReflect() protoreflect.Message {
+ mi := &file_grpc_gcp_proto_msgTypes[2]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use MethodConfig.ProtoReflect.Descriptor instead.
+func (*MethodConfig) Descriptor() ([]byte, []int) {
+ return file_grpc_gcp_proto_rawDescGZIP(), []int{2}
+}
+
+func (x *MethodConfig) GetName() []string {
+ if x != nil {
+ return x.Name
+ }
+ return nil
+}
+
+func (x *MethodConfig) GetAffinity() *AffinityConfig {
+ if x != nil {
+ return x.Affinity
+ }
+ return nil
+}
+
+type AffinityConfig struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ // The affinity command applies on the selected gRPC methods.
+ Command AffinityConfig_Command `protobuf:"varint,2,opt,name=command,proto3,enum=grpc.gcp.AffinityConfig_Command" json:"command,omitempty"`
+ // The field path of the affinity key in the request/response message.
+ // For example: "f.a", "f.b.d", etc.
+ AffinityKey string `protobuf:"bytes,3,opt,name=affinity_key,json=affinityKey,proto3" json:"affinity_key,omitempty"`
+}
+
+func (x *AffinityConfig) Reset() {
+ *x = AffinityConfig{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_grpc_gcp_proto_msgTypes[3]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *AffinityConfig) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*AffinityConfig) ProtoMessage() {}
+
+func (x *AffinityConfig) ProtoReflect() protoreflect.Message {
+ mi := &file_grpc_gcp_proto_msgTypes[3]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use AffinityConfig.ProtoReflect.Descriptor instead.
+func (*AffinityConfig) Descriptor() ([]byte, []int) {
+ return file_grpc_gcp_proto_rawDescGZIP(), []int{3}
+}
+
+func (x *AffinityConfig) GetCommand() AffinityConfig_Command {
+ if x != nil {
+ return x.Command
+ }
+ return AffinityConfig_BOUND
+}
+
+func (x *AffinityConfig) GetAffinityKey() string {
+ if x != nil {
+ return x.AffinityKey
+ }
+ return ""
+}
+
+var File_grpc_gcp_proto protoreflect.FileDescriptor
+
+var file_grpc_gcp_proto_rawDesc = []byte{
+ 0x0a, 0x0e, 0x67, 0x72, 0x70, 0x63, 0x5f, 0x67, 0x63, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
+ 0x12, 0x08, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x67, 0x63, 0x70, 0x22, 0x7c, 0x0a, 0x09, 0x41, 0x70,
+ 0x69, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x3e, 0x0a, 0x0c, 0x63, 0x68, 0x61, 0x6e, 0x6e,
+ 0x65, 0x6c, 0x5f, 0x70, 0x6f, 0x6f, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e,
+ 0x67, 0x72, 0x70, 0x63, 0x2e, 0x67, 0x63, 0x70, 0x2e, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c,
+ 0x50, 0x6f, 0x6f, 0x6c, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0b, 0x63, 0x68, 0x61, 0x6e,
+ 0x6e, 0x65, 0x6c, 0x50, 0x6f, 0x6f, 0x6c, 0x12, 0x2f, 0x0a, 0x06, 0x6d, 0x65, 0x74, 0x68, 0x6f,
+ 0x64, 0x18, 0xe9, 0x07, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e,
+ 0x67, 0x63, 0x70, 0x2e, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67,
+ 0x52, 0x06, 0x6d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x22, 0xff, 0x03, 0x0a, 0x11, 0x43, 0x68, 0x61,
+ 0x6e, 0x6e, 0x65, 0x6c, 0x50, 0x6f, 0x6f, 0x6c, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x19,
+ 0x0a, 0x08, 0x6d, 0x61, 0x78, 0x5f, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d,
+ 0x52, 0x07, 0x6d, 0x61, 0x78, 0x53, 0x69, 0x7a, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x69, 0x64, 0x6c,
+ 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52,
+ 0x0b, 0x69, 0x64, 0x6c, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x12, 0x4e, 0x0a, 0x24,
+ 0x6d, 0x61, 0x78, 0x5f, 0x63, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x73,
+ 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x5f, 0x6c, 0x6f, 0x77, 0x5f, 0x77, 0x61, 0x74, 0x65, 0x72,
+ 0x6d, 0x61, 0x72, 0x6b, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x20, 0x6d, 0x61, 0x78, 0x43,
+ 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73,
+ 0x4c, 0x6f, 0x77, 0x57, 0x61, 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, 0x6b, 0x12, 0x19, 0x0a, 0x08,
+ 0x6d, 0x69, 0x6e, 0x5f, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07,
+ 0x6d, 0x69, 0x6e, 0x53, 0x69, 0x7a, 0x65, 0x12, 0x2a, 0x0a, 0x11, 0x66, 0x61, 0x6c, 0x6c, 0x62,
+ 0x61, 0x63, 0x6b, 0x5f, 0x74, 0x6f, 0x5f, 0x72, 0x65, 0x61, 0x64, 0x79, 0x18, 0x05, 0x20, 0x01,
+ 0x28, 0x08, 0x52, 0x0f, 0x66, 0x61, 0x6c, 0x6c, 0x62, 0x61, 0x63, 0x6b, 0x54, 0x6f, 0x52, 0x65,
+ 0x61, 0x64, 0x79, 0x12, 0x3a, 0x0a, 0x19, 0x75, 0x6e, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
+ 0x69, 0x76, 0x65, 0x5f, 0x64, 0x65, 0x74, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6d, 0x73,
+ 0x18, 0x06, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x17, 0x75, 0x6e, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e,
+ 0x73, 0x69, 0x76, 0x65, 0x44, 0x65, 0x74, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x73, 0x12,
+ 0x2d, 0x0a, 0x12, 0x75, 0x6e, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x69, 0x76, 0x65, 0x5f,
+ 0x63, 0x61, 0x6c, 0x6c, 0x73, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x11, 0x75, 0x6e, 0x72,
+ 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x69, 0x76, 0x65, 0x43, 0x61, 0x6c, 0x6c, 0x73, 0x12, 0x5a,
+ 0x0a, 0x12, 0x62, 0x69, 0x6e, 0x64, 0x5f, 0x70, 0x69, 0x63, 0x6b, 0x5f, 0x73, 0x74, 0x72, 0x61,
+ 0x74, 0x65, 0x67, 0x79, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x2c, 0x2e, 0x67, 0x72, 0x70,
+ 0x63, 0x2e, 0x67, 0x63, 0x70, 0x2e, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x50, 0x6f, 0x6f,
+ 0x6c, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x42, 0x69, 0x6e, 0x64, 0x50, 0x69, 0x63, 0x6b,
+ 0x53, 0x74, 0x72, 0x61, 0x74, 0x65, 0x67, 0x79, 0x52, 0x10, 0x62, 0x69, 0x6e, 0x64, 0x50, 0x69,
+ 0x63, 0x6b, 0x53, 0x74, 0x72, 0x61, 0x74, 0x65, 0x67, 0x79, 0x22, 0x4e, 0x0a, 0x10, 0x42, 0x69,
+ 0x6e, 0x64, 0x50, 0x69, 0x63, 0x6b, 0x53, 0x74, 0x72, 0x61, 0x74, 0x65, 0x67, 0x79, 0x12, 0x0f,
+ 0x0a, 0x0b, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12,
+ 0x18, 0x0a, 0x14, 0x4c, 0x45, 0x41, 0x53, 0x54, 0x5f, 0x41, 0x43, 0x54, 0x49, 0x56, 0x45, 0x5f,
+ 0x53, 0x54, 0x52, 0x45, 0x41, 0x4d, 0x53, 0x10, 0x01, 0x12, 0x0f, 0x0a, 0x0b, 0x52, 0x4f, 0x55,
+ 0x4e, 0x44, 0x5f, 0x52, 0x4f, 0x42, 0x49, 0x4e, 0x10, 0x02, 0x22, 0x59, 0x0a, 0x0c, 0x4d, 0x65,
+ 0x74, 0x68, 0x6f, 0x64, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61,
+ 0x6d, 0x65, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x35,
+ 0x0a, 0x08, 0x61, 0x66, 0x66, 0x69, 0x6e, 0x69, 0x74, 0x79, 0x18, 0xe9, 0x07, 0x20, 0x01, 0x28,
+ 0x0b, 0x32, 0x18, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x67, 0x63, 0x70, 0x2e, 0x41, 0x66, 0x66,
+ 0x69, 0x6e, 0x69, 0x74, 0x79, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x08, 0x61, 0x66, 0x66,
+ 0x69, 0x6e, 0x69, 0x74, 0x79, 0x22, 0x9b, 0x01, 0x0a, 0x0e, 0x41, 0x66, 0x66, 0x69, 0x6e, 0x69,
+ 0x74, 0x79, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x3a, 0x0a, 0x07, 0x63, 0x6f, 0x6d, 0x6d,
+ 0x61, 0x6e, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x20, 0x2e, 0x67, 0x72, 0x70, 0x63,
+ 0x2e, 0x67, 0x63, 0x70, 0x2e, 0x41, 0x66, 0x66, 0x69, 0x6e, 0x69, 0x74, 0x79, 0x43, 0x6f, 0x6e,
+ 0x66, 0x69, 0x67, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x07, 0x63, 0x6f, 0x6d,
+ 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x61, 0x66, 0x66, 0x69, 0x6e, 0x69, 0x74, 0x79,
+ 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x61, 0x66, 0x66, 0x69,
+ 0x6e, 0x69, 0x74, 0x79, 0x4b, 0x65, 0x79, 0x22, 0x2a, 0x0a, 0x07, 0x43, 0x6f, 0x6d, 0x6d, 0x61,
+ 0x6e, 0x64, 0x12, 0x09, 0x0a, 0x05, 0x42, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x08, 0x0a,
+ 0x04, 0x42, 0x49, 0x4e, 0x44, 0x10, 0x01, 0x12, 0x0a, 0x0a, 0x06, 0x55, 0x4e, 0x42, 0x49, 0x4e,
+ 0x44, 0x10, 0x02, 0x42, 0x0c, 0x5a, 0x0a, 0x2e, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x5f, 0x67, 0x63,
+ 0x70, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+}
+
+var (
+ file_grpc_gcp_proto_rawDescOnce sync.Once
+ file_grpc_gcp_proto_rawDescData = file_grpc_gcp_proto_rawDesc
+)
+
+func file_grpc_gcp_proto_rawDescGZIP() []byte {
+ file_grpc_gcp_proto_rawDescOnce.Do(func() {
+ file_grpc_gcp_proto_rawDescData = protoimpl.X.CompressGZIP(file_grpc_gcp_proto_rawDescData)
+ })
+ return file_grpc_gcp_proto_rawDescData
+}
+
+var file_grpc_gcp_proto_enumTypes = make([]protoimpl.EnumInfo, 2)
+var file_grpc_gcp_proto_msgTypes = make([]protoimpl.MessageInfo, 4)
+var file_grpc_gcp_proto_goTypes = []interface{}{
+ (ChannelPoolConfig_BindPickStrategy)(0), // 0: grpc.gcp.ChannelPoolConfig.BindPickStrategy
+ (AffinityConfig_Command)(0), // 1: grpc.gcp.AffinityConfig.Command
+ (*ApiConfig)(nil), // 2: grpc.gcp.ApiConfig
+ (*ChannelPoolConfig)(nil), // 3: grpc.gcp.ChannelPoolConfig
+ (*MethodConfig)(nil), // 4: grpc.gcp.MethodConfig
+ (*AffinityConfig)(nil), // 5: grpc.gcp.AffinityConfig
+}
+var file_grpc_gcp_proto_depIdxs = []int32{
+ 3, // 0: grpc.gcp.ApiConfig.channel_pool:type_name -> grpc.gcp.ChannelPoolConfig
+ 4, // 1: grpc.gcp.ApiConfig.method:type_name -> grpc.gcp.MethodConfig
+ 0, // 2: grpc.gcp.ChannelPoolConfig.bind_pick_strategy:type_name -> grpc.gcp.ChannelPoolConfig.BindPickStrategy
+ 5, // 3: grpc.gcp.MethodConfig.affinity:type_name -> grpc.gcp.AffinityConfig
+ 1, // 4: grpc.gcp.AffinityConfig.command:type_name -> grpc.gcp.AffinityConfig.Command
+ 5, // [5:5] is the sub-list for method output_type
+ 5, // [5:5] is the sub-list for method input_type
+ 5, // [5:5] is the sub-list for extension type_name
+ 5, // [5:5] is the sub-list for extension extendee
+ 0, // [0:5] is the sub-list for field type_name
+}
+
+func init() { file_grpc_gcp_proto_init() }
+func file_grpc_gcp_proto_init() {
+ if File_grpc_gcp_proto != nil {
+ return
+ }
+ if !protoimpl.UnsafeEnabled {
+ file_grpc_gcp_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*ApiConfig); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_grpc_gcp_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*ChannelPoolConfig); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_grpc_gcp_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*MethodConfig); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_grpc_gcp_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*AffinityConfig); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ }
+ type x struct{}
+ out := protoimpl.TypeBuilder{
+ File: protoimpl.DescBuilder{
+ GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
+ RawDescriptor: file_grpc_gcp_proto_rawDesc,
+ NumEnums: 2,
+ NumMessages: 4,
+ NumExtensions: 0,
+ NumServices: 0,
+ },
+ GoTypes: file_grpc_gcp_proto_goTypes,
+ DependencyIndexes: file_grpc_gcp_proto_depIdxs,
+ EnumInfos: file_grpc_gcp_proto_enumTypes,
+ MessageInfos: file_grpc_gcp_proto_msgTypes,
+ }.Build()
+ File_grpc_gcp_proto = out.File
+ file_grpc_gcp_proto_rawDesc = nil
+ file_grpc_gcp_proto_goTypes = nil
+ file_grpc_gcp_proto_depIdxs = nil
+}
diff --git a/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp/grpc_gcp.proto b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp/grpc_gcp.proto
new file mode 100644
index 000000000..47eb8b87d
--- /dev/null
+++ b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp/grpc_gcp.proto
@@ -0,0 +1,129 @@
+// Copyright 2018 gRPC authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+option go_package = "./grpc_gcp";
+
+package grpc.gcp;
+
+message ApiConfig {
+ // The channel pool configurations.
+ ChannelPoolConfig channel_pool = 2;
+
+ // The method configurations.
+ repeated MethodConfig method = 1001;
+}
+
+// ChannelPoolConfig are options for configuring the channel pool.
+// RPCs will be scheduled onto existing channels in the pool until all channels
+// have <max_concurrent_streams_low_watermark> number of streams. At this point
+// a new channel is spun out. Once <max_size> channels have been spun out and
+// each has <max_concurrent_streams_low_watermark> streams, subsequent RPCs will
+// hang until any of the in-flight RPCs is finished, freeing up a channel.
+message ChannelPoolConfig {
+ // The max number of channels in the pool.
+ // Default value is 0, meaning 'unlimited' size.
+ uint32 max_size = 1;
+
+ // The idle timeout (seconds) of channels without bound affinity sessions.
+ uint64 idle_timeout = 2;
+
+ // The low watermark of max number of concurrent streams in a channel.
+ // New channel will be created once it get hit, until we reach the max size of the channel pool.
+ // Default value is 100. The valid range is [1, 100]. Any value outside the range will be ignored and the default value will be used.
+ // Note: It is not recommended that users adjust this value, since a single channel should generally have no trouble managing the default (maximum) number of streams.
+ uint32 max_concurrent_streams_low_watermark = 3;
+
+ // The minimum number of channels in the pool.
+ uint32 min_size = 4;
+
+ // If a channel mapped to an affinity key is not ready, temporarily fallback
+ // to another ready channel.
+ // Enabling this fallback is beneficial in scenarios with short RPC timeouts
+ // and rather slow connection establishing or during incidents when new
+ // connections fail but existing connections still operate.
+ bool fallback_to_ready = 5;
+
+ // Enables per channel unresponsive connection detection if > 0 and unresponsive_calls > 0.
+ // If enabled and more than unresponsive_detection_ms passed since the last response from the server,
+ // and >= unresponsive_calls RPC calls (started after last response from the server) timed-out on the client side,
+ // then the connection of that channel will be gracefully refreshed. I.e., a new connection will be created for
+ // that channel and after the new connection is ready it will replace the old connection. The calls on the old
+ // connection will not be interrupted. The unresponsive_detection_ms will be doubled every consecutive refresh
+ // if no response from the server is received.
+ uint32 unresponsive_detection_ms = 6;
+
+ // Enables per channel unresponsive connection detection if > 0 and unresponsive_detection_ms > 0.
+ // If enabled and more than unresponsive_detection_ms passed since the last response from the server,
+ // and >= unresponsive_calls RPC calls (started after last response from the server) timed-out on the client side,
+ // then the connection of that channel will be gracefully refreshed. I.e., a new connection will be created for
+ // that channel and after the new connection is ready it will replace the old connection. The calls on the old
+ // connection will not be interrupted. The unresponsive_detection_ms will be doubled every consecutive refresh
+ // if no response from the server is received.
+ uint32 unresponsive_calls = 7;
+
+ // A selection of strategies for picking a channel for a call with BIND command.
+ enum BindPickStrategy {
+ // No preference -- picking a channel for a BIND call will be no different
+ // than for any other calls.
+ UNSPECIFIED = 0;
+
+ // A channel with the least active streams at the moment of a BIND call
+ // initiation will be picked.
+ LEAST_ACTIVE_STREAMS = 1;
+
+ // Cycle through channels created by the BIND call initiation. I. e. pick
+ // a channel in a round-robin manner. Note that some channels may be
+ // skipped during channel pool resize.
+ ROUND_ROBIN = 2;
+ }
+
+ // The strategy for picking a channel for a call with BIND command.
+ BindPickStrategy bind_pick_strategy = 8;
+}
+
+message MethodConfig {
+ // A fully qualified name of a gRPC method, or a wildcard pattern ending
+ // with .*, such as foo.bar.A, foo.bar.*. Method configs are evaluated
+ // sequentially, and the first one takes precedence.
+ repeated string name = 1;
+
+ // The channel affinity configurations.
+ AffinityConfig affinity = 1001;
+}
+
+message AffinityConfig {
+ enum Command {
+ // The annotated method will be required to be bound to an existing session
+ // to execute the RPC. The corresponding <affinity_key_field_path> will be
+ // used to find the affinity key from the request message.
+ BOUND = 0;
+ // The annotated method will establish the channel affinity with the
+ // channel which is used to execute the RPC. The corresponding
+ // <affinity_key_field_path> will be used to find the affinity key from the
+ // response message.
+ BIND = 1;
+ // The annotated method will remove the channel affinity with the
+ // channel which is used to execute the RPC. The corresponding
+ // <affinity_key_field_path> will be used to find the affinity key from the
+ // request message.
+ UNBIND = 2;
+ }
+ // The affinity command applies on the selected gRPC methods.
+ Command command = 2;
+ // The field path of the affinity key in the request/response message.
+ // For example: "f.a", "f.b.d", etc.
+ string affinity_key = 3;
+}
diff --git a/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/mockgen.sh b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/mockgen.sh
new file mode 100644
index 000000000..ff4de3390
--- /dev/null
+++ b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/mockgen.sh
@@ -0,0 +1,4 @@
+#!/usr/bin/env bash
+cd "$(dirname "$0")"
+mockgen -destination=mocks/mock_balancer.go -package=mocks google.golang.org/grpc/balancer ClientConn,SubConn
+mockgen -destination=mocks/mock_stream.go -package=mocks google.golang.org/grpc ClientStream
diff --git a/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/multiendpoint/endpoint.go b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/multiendpoint/endpoint.go
new file mode 100644
index 000000000..2a9c52eba
--- /dev/null
+++ b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/multiendpoint/endpoint.go
@@ -0,0 +1,54 @@
+/*
+ *
+ * Copyright 2023 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package multiendpoint
+
+import (
+ "fmt"
+ "time"
+)
+
+type status int
+
+// Status of an endpoint.
+const (
+ unavailable status = iota
+ available
+ recovering
+)
+
+func (s status) String() string {
+ switch s {
+ case unavailable:
+ return "Unavailable"
+ case available:
+ return "Available"
+ case recovering:
+ return "Recovering"
+ default:
+ return fmt.Sprintf("%d", s)
+ }
+}
+
+type endpoint struct {
+ id string
+ priority int
+ status status
+ lastChange time.Time
+ futureChange timerAlike
+}
diff --git a/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/multiendpoint/multiendpoint.go b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/multiendpoint/multiendpoint.go
new file mode 100644
index 000000000..7e70a6ea5
--- /dev/null
+++ b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/multiendpoint/multiendpoint.go
@@ -0,0 +1,306 @@
+/*
+ *
+ * Copyright 2023 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+// Package multiendpoint implements multiendpoint feature. See [MultiEndpoint]
+package multiendpoint
+
+import (
+ "errors"
+ "fmt"
+ "sync"
+ "time"
+)
+
+type timerAlike interface {
+ Reset(time.Duration) bool
+ Stop() bool
+}
+
+// To be redefined in tests.
+var (
+ timeNow = func() time.Time {
+ return time.Now()
+ }
+ timeAfterFunc = func(d time.Duration, f func()) timerAlike {
+ return time.AfterFunc(d, f)
+ }
+)
+
+// MultiEndpoint holds a list of endpoints, tracks their availability and defines the current
+// endpoint. An endpoint has a priority defined by its position in the list (first item has top
+// priority).
+//
+// The current endpoint is the highest available endpoint in the list. If no endpoint is available,
+// MultiEndpoint sticks to the previously current endpoint.
+//
+// Sometimes switching between endpoints can be costly, and it is worth waiting for some time
+// after current endpoint becomes unavailable. For this case, use
+// [MultiEndpointOptions.RecoveryTimeout] to set the recovery timeout. MultiEndpoint will keep the
+// current endpoint for up to recovery timeout after it became unavailable to give it some time to
+// recover.
+//
+// The list of endpoints can be changed at any time with [MultiEndpoint.SetEndpoints] function.
+// MultiEndpoint will:
+// - remove obsolete endpoints;
+// - preserve remaining endpoints and their states;
+// - add new endpoints;
+// - update all endpoints priority according to the new order;
+// - change current endpoint if necessary.
+//
+// After updating the list of endpoints, MultiEndpoint will switch the current endpoint to the
+// highest available endpoint in the list. If you have many processes using MultiEndpoint, this may
+// lead to immediate shift of all traffic which may be undesired. To smooth this transfer, use
+// [MultiEndpointOptions.SwitchingDelay] with randomized value to introduce a jitter. Each
+// MultiEndpoint will delay switching from an available endpoint to another endpoint for this amount
+// of time. This delay is only applicable when switching from a lower priority available endpoint to
+// a higher priority available endpoint.
+type MultiEndpoint interface {
+ // Current returns current endpoint.
+ //
+ // Note that the read is not synchronized and in case of a race condition there is a chance of
+ // getting an outdated current endpoint.
+ Current() string
+
+ // SetEndpointAvailability informs MultiEndpoint when an endpoint becomes available or unavailable.
+ // This may change the current endpoint.
+ SetEndpointAvailability(e string, avail bool)
+
+ // SetEndpoints updates a list of endpoints:
+ // - remove obsolete endpoints
+ // - preserve remaining endpoints and their states
+ // - add new endpoints
+ // - update all endpoints priority according to the new order
+ // This may change the current endpoint.
+ SetEndpoints(endpoints []string) error
+}
+
+// MultiEndpointOptions is used for configuring [MultiEndpoint].
+type MultiEndpointOptions struct {
+ // A list of endpoints ordered by priority (first endpoint has top priority).
+ Endpoints []string
+ // RecoveryTimeout sets the amount of time MultiEndpoint keeps endpoint as current after it
+ // became unavailable.
+ RecoveryTimeout time.Duration
+ // When switching from a lower priority available endpoint to a higher priority available
+ // endpoint the MultiEndpoint will delay the switch for this duration.
+ SwitchingDelay time.Duration
+}
+
+// NewMultiEndpoint validates options and creates a new [MultiEndpoint].
+func NewMultiEndpoint(b *MultiEndpointOptions) (MultiEndpoint, error) {
+ if len(b.Endpoints) == 0 {
+ return nil, fmt.Errorf("endpoints list cannot be empty")
+ }
+
+ me := &multiEndpoint{
+ recoveryTimeout: b.RecoveryTimeout,
+ switchingDelay: b.SwitchingDelay,
+ current: b.Endpoints[0],
+ }
+ eMap := make(map[string]*endpoint)
+ for i, e := range b.Endpoints {
+ eMap[e] = me.newEndpoint(e, i)
+ }
+ me.endpoints = eMap
+ return me, nil
+}
+
+type multiEndpoint struct {
+ sync.RWMutex
+
+ endpoints map[string]*endpoint
+ recoveryTimeout time.Duration
+ switchingDelay time.Duration
+ current string
+ future string
+}
+
+// Current returns current endpoint.
+func (me *multiEndpoint) Current() string {
+ me.RLock()
+ defer me.RUnlock()
+ return me.current
+}
+
+// SetEndpoints updates endpoints list:
+// - remove obsolete endpoints;
+// - preserve remaining endpoints and their states;
+// - add new endpoints;
+// - update all endpoints priority according to the new order;
+// - change current endpoint if necessary.
+func (me *multiEndpoint) SetEndpoints(endpoints []string) error {
+ me.Lock()
+ defer me.Unlock()
+ if len(endpoints) == 0 {
+ return errors.New("endpoints list cannot be empty")
+ }
+ newEndpoints := make(map[string]struct{})
+ for _, v := range endpoints {
+ newEndpoints[v] = struct{}{}
+ }
+ // Remove obsolete endpoints.
+ for e := range me.endpoints {
+ if _, ok := newEndpoints[e]; !ok {
+ delete(me.endpoints, e)
+ }
+ }
+ // Add new endpoints and update priority.
+ for i, e := range endpoints {
+ if _, ok := me.endpoints[e]; !ok {
+ me.endpoints[e] = me.newEndpoint(e, i)
+ } else {
+ me.endpoints[e].priority = i
+ }
+ }
+
+ me.maybeUpdateCurrent()
+ return nil
+}
+
+// Updates current to the top-priority available endpoint unless the current endpoint is
+// recovering.
+//
+// Must be run under me.Lock.
+func (me *multiEndpoint) maybeUpdateCurrent() {
+ c, exists := me.endpoints[me.current]
+ var topA *endpoint
+ var top *endpoint
+ for _, e := range me.endpoints {
+ if e.status == available && (topA == nil || topA.priority > e.priority) {
+ topA = e
+ }
+ if top == nil || top.priority > e.priority {
+ top = e
+ }
+ }
+
+ if exists && c.status == recovering && (topA == nil || topA.priority > c.priority) {
+ // Let current endpoint recover while no higher priority endpoints available.
+ return
+ }
+
+ // Always prefer top available endpoint.
+ if topA != nil {
+ me.switchFromTo(c, topA)
+ return
+ }
+
+ // If no current endpoint exists, resort to the top priority endpoint immediately.
+ if !exists {
+ me.current = top.id
+ }
+}
+
+func (me *multiEndpoint) newEndpoint(id string, priority int) *endpoint {
+ s := unavailable
+ if me.recoveryTimeout > 0 {
+ s = recovering
+ }
+ e := &endpoint{
+ id: id,
+ priority: priority,
+ status: s,
+ }
+ if e.status == recovering {
+ me.scheduleUnavailable(e)
+ }
+ return e
+}
+
+// Changes or schedules a change of current to the endpoint t.
+//
+// Must be run under me.Lock.
+func (me *multiEndpoint) switchFromTo(f, t *endpoint) {
+ if me.current == t.id {
+ return
+ }
+
+ if me.switchingDelay == 0 || f == nil || f.status == unavailable {
+ // Switching immediately if no delay or no current or current is unavailable.
+ me.current = t.id
+ return
+ }
+
+ me.future = t.id
+ timeAfterFunc(me.switchingDelay, func() {
+ me.Lock()
+ defer me.Unlock()
+ if e, ok := me.endpoints[me.future]; ok && e.status == available {
+ me.current = e.id
+ }
+ })
+}
+
+// SetEndpointAvailability updates the state of an endpoint.
+func (me *multiEndpoint) SetEndpointAvailability(e string, avail bool) {
+ me.Lock()
+ defer me.Unlock()
+ me.setEndpointAvailability(e, avail)
+ me.maybeUpdateCurrent()
+}
+
+// Must be run under me.Lock.
+func (me *multiEndpoint) setEndpointAvailability(e string, avail bool) {
+ ee, ok := me.endpoints[e]
+ if !ok {
+ return
+ }
+
+ if avail {
+ setState(ee, available)
+ return
+ }
+
+ if ee.status != available {
+ return
+ }
+
+ if me.recoveryTimeout == 0 {
+ setState(ee, unavailable)
+ return
+ }
+
+ setState(ee, recovering)
+ me.scheduleUnavailable(ee)
+}
+
+// Change the state of endpoint e to state s.
+//
+// Must be run under me.Lock.
+func setState(e *endpoint, s status) {
+ if e.futureChange != nil {
+ e.futureChange.Stop()
+ }
+ e.status = s
+ e.lastChange = timeNow()
+}
+
+// Schedule endpoint e to become unavailable after recoveryTimeout.
+func (me *multiEndpoint) scheduleUnavailable(e *endpoint) {
+ stateChange := e.lastChange
+ e.futureChange = timeAfterFunc(me.recoveryTimeout, func() {
+ me.Lock()
+ defer me.Unlock()
+ if e.lastChange != stateChange {
+ // This timer is outdated.
+ return
+ }
+ setState(e, unavailable)
+ me.maybeUpdateCurrent()
+ })
+}
diff --git a/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/test_config.json b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/test_config.json
new file mode 100644
index 000000000..6291ae9cb
--- /dev/null
+++ b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/test_config.json
@@ -0,0 +1,29 @@
+{
+ "channelPool": {
+ "maxSize": 10,
+ "maxConcurrentStreamsLowWatermark": 10
+ },
+ "method": [
+ {
+ "name": [ "method1" ],
+ "affinity": {
+ "command": "BIND",
+ "affinityKey": "key1"
+ }
+ },
+ {
+ "name": [ "method2" ],
+ "affinity": {
+ "command": "BOUND",
+ "affinityKey": "key2"
+ }
+ },
+ {
+ "name": [ "method3" ],
+ "affinity": {
+ "command": "UNBIND",
+ "affinityKey": "key3"
+ }
+ }
+ ]
+} \ No newline at end of file