diff options
| author | Taras Madan <tarasmadan@google.com> | 2024-09-10 12:16:33 +0200 |
|---|---|---|
| committer | Taras Madan <tarasmadan@google.com> | 2024-09-10 14:05:26 +0000 |
| commit | c97c816133b42257d0bcf1ee4bd178bb2a7a2b9e (patch) | |
| tree | 0bcbc2e540bbf8f62f6c17887cdd53b8c2cee637 /vendor/github.com/GoogleCloudPlatform | |
| parent | 54e657429ab892ad06c90cd7c1a4eb33ba93a3dc (diff) | |
vendor: update
Diffstat (limited to 'vendor/github.com/GoogleCloudPlatform')
15 files changed, 3003 insertions, 0 deletions
diff --git a/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/LICENSE b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/LICENSE new file mode 100644 index 000000000..d64569567 --- /dev/null +++ b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/README.md b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/README.md new file mode 100644 index 000000000..47d809846 --- /dev/null +++ b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/README.md @@ -0,0 +1,18 @@ + +## How to test Spanner integration + +1. Set GCP project id with GCP_PROJECT_ID environment variable. + + export GCP_PROJECT_ID=test-project + +1. Set service key credentials file using GOOGLE_APPLICATION_CREDENTIALS env variable. + + export GOOGLE_APPLICATION_CREDENTIALS=/service/account/credentials.json + +1. Run the tests. + + go test -v + +To skip Spanner setup run + + SKIP_SPANNER=true go test -v diff --git a/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/doc.go b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/doc.go new file mode 100644 index 000000000..aae869fb5 --- /dev/null +++ b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/doc.go @@ -0,0 +1,129 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/* +Package grpcgcp provides grpc supports for Google Cloud APIs. +For now it provides connection management with affinity support. + +Note: "channel" is analagous to "connection" in our context. + +Usage: + +1. First, initialize the api configuration. There are two ways: + + 1a. Create a json file defining the configuration and read it. + + // Create some_api_config.json + { + "channelPool": { + "maxSize": 4, + "maxConcurrentStreamsLowWatermark": 50 + }, + "method": [ + { + "name": [ "/some.api.v1/Method1" ], + "affinity": { + "command": "BIND", + "affinityKey": "key1" + } + }, + { + "name": [ "/some.api.v1/Method2" ], + "affinity": { + "command": "BOUND", + "affinityKey": "key2" + } + }, + { + "name": [ "/some.api.v1/Method3" ], + "affinity": { + "command": "UNBIND", + "affinityKey": "key3" + } + } + ] + } + + jsonFile, err := ioutil.ReadFile("some_api_config.json") + if err != nil { + t.Fatalf("Failed to read config file: %v", err) + } + jsonCfg := string(jsonFile) + + 1b. Create apiConfig directly and convert it to json. + + // import ( + // configpb "github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp" + // ) + + apiConfig := &configpb.ApiConfig{ + ChannelPool: &configpb.ChannelPoolConfig{ + MaxSize: 4, + MaxConcurrentStreamsLowWatermark: 50, + }, + Method: []*configpb.MethodConfig{ + &configpb.MethodConfig{ + Name: []string{"/some.api.v1/Method1"}, + Affinity: &configpb.AffinityConfig{ + Command: configpb.AffinityConfig_BIND, + AffinityKey: "key1", + }, + }, + &configpb.MethodConfig{ + Name: []string{"/some.api.v1/Method2"}, + Affinity: &configpb.AffinityConfig{ + Command: configpb.AffinityConfig_BOUND, + AffinityKey: "key2", + }, + }, + &configpb.MethodConfig{ + Name: []string{"/some.api.v1/Method3"}, + Affinity: &configpb.AffinityConfig{ + Command: configpb.AffinityConfig_UNBIND, + AffinityKey: "key3", + }, + }, + }, + } + + c, err := protojson.Marshal(apiConfig) + if err != nil { + t.Fatalf("cannot json encode config: %v", err) + } + jsonCfg := string(c) + +2. Make ClientConn with specific DialOptions to enable grpc_gcp load balancer +with provided configuration. And specify gRPC-GCP interceptors. + + conn, err := grpc.Dial( + target, + // Register and specify the grpc-gcp load balancer. + grpc.WithDisableServiceConfig(), + grpc.WithDefaultServiceConfig( + fmt.Sprintf( + `{"loadBalancingConfig": [{"%s":%s}]}`, + grpcgcp.Name, + jsonCfg, + ), + ), + // Set grpcgcp interceptors. + grpc.WithUnaryInterceptor(grpcgcp.GCPUnaryClientInterceptor), + grpc.WithStreamInterceptor(grpcgcp.GCPStreamClientInterceptor), + ) +*/ +package grpcgcp // import "github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp" diff --git a/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/gcp_balancer.go b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/gcp_balancer.go new file mode 100644 index 000000000..6fdac53d6 --- /dev/null +++ b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/gcp_balancer.go @@ -0,0 +1,576 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpcgcp + +import ( + "context" + "encoding/json" + "fmt" + "sync" + "sync/atomic" + "time" + + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/serviceconfig" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" + + pb "github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp" +) + +var _ balancer.Balancer = (*gcpBalancer)(nil) // Ensure gcpBalancer implements Balancer + +const ( + // Name is the name of grpc_gcp balancer. + Name = "grpc_gcp" + + healthCheckEnabled = true + defaultMinSize = 1 + defaultMaxSize = 4 + defaultMaxStreams = 100 +) + +func init() { + balancer.Register(newBuilder()) +} + +type gcpBalancerBuilder struct { + balancer.ConfigParser +} + +type GCPBalancerConfig struct { + serviceconfig.LoadBalancingConfig + *pb.ApiConfig +} + +func (bb *gcpBalancerBuilder) Build( + cc balancer.ClientConn, + opt balancer.BuildOptions, +) balancer.Balancer { + gb := &gcpBalancer{ + cc: cc, + methodCfg: make(map[string]*pb.AffinityConfig), + affinityMap: make(map[string]balancer.SubConn), + fallbackMap: make(map[string]balancer.SubConn), + scRefs: make(map[balancer.SubConn]*subConnRef), + scStates: make(map[balancer.SubConn]connectivity.State), + refreshingScRefs: make(map[balancer.SubConn]*subConnRef), + scRefList: []*subConnRef{}, + rrRefId: ^uint32(0), + csEvltr: &connectivityStateEvaluator{}, + // Initialize picker to a picker that always return + // ErrNoSubConnAvailable, because when state of a SubConn changes, we + // may call UpdateBalancerState with this picker. + picker: newErrPicker(balancer.ErrNoSubConnAvailable), + } + gb.log = NewGCPLogger(compLogger, fmt.Sprintf("[gcpBalancer %p]", gb)) + return gb +} + +func (*gcpBalancerBuilder) Name() string { + return Name +} + +// ParseConfig converts raw json config into GCPBalancerConfig. +// This is called by ClientConn on any load balancer config update. +// After parsing the config, ClientConn calls UpdateClientConnState passing the config. +func (*gcpBalancerBuilder) ParseConfig(j json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { + c := &GCPBalancerConfig{ + ApiConfig: &pb.ApiConfig{}, + } + err := protojson.Unmarshal(j, c) + return c, err +} + +// newBuilder creates a new grpcgcp balancer builder. +func newBuilder() balancer.Builder { + return &gcpBalancerBuilder{} +} + +// connectivityStateEvaluator gets updated by addrConns when their +// states transition, based on which it evaluates the state of +// ClientConn. +type connectivityStateEvaluator struct { + numReady uint64 // Number of addrConns in ready state. + numConnecting uint64 // Number of addrConns in connecting state. + numTransientFailure uint64 // Number of addrConns in transientFailure. +} + +// recordTransition records state change happening in every subConn and based on +// that it evaluates what aggregated state should be. +// It can only transition between Ready, Connecting and TransientFailure. Other states, +// Idle and Shutdown are transitioned into by ClientConn; in the beginning of the connection +// before any subConn is created ClientConn is in idle state. In the end when ClientConn +// closes it is in Shutdown state. +// +// recordTransition should only be called synchronously from the same goroutine. +func (cse *connectivityStateEvaluator) recordTransition( + oldState, + newState connectivity.State, +) connectivity.State { + // Update counters. + for idx, state := range []connectivity.State{oldState, newState} { + updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new. + switch state { + case connectivity.Ready: + cse.numReady += updateVal + case connectivity.Connecting: + cse.numConnecting += updateVal + case connectivity.TransientFailure: + cse.numTransientFailure += updateVal + } + } + + // Evaluate. + if cse.numReady > 0 { + return connectivity.Ready + } + if cse.numConnecting > 0 { + return connectivity.Connecting + } + return connectivity.TransientFailure +} + +// subConnRef keeps reference to the real SubConn with its +// connectivity state, affinity count and streams count. +type subConnRef struct { + subConn balancer.SubConn + stateSignal chan struct{} // This channel is closed and re-created when subConn or its state changes. + affinityCnt int32 // Keeps track of the number of keys bound to the subConn. + streamsCnt int32 // Keeps track of the number of streams opened on the subConn. + lastResp time.Time // Timestamp of the last response from the server. + deCalls uint32 // Keeps track of deadline exceeded calls since last response. + refreshing bool // If this subconn is in the process of refreshing. + refreshCnt uint32 // Number of refreshes since last response. +} + +func (ref *subConnRef) getAffinityCnt() int32 { + return atomic.LoadInt32(&ref.affinityCnt) +} + +func (ref *subConnRef) getStreamsCnt() int32 { + return atomic.LoadInt32(&ref.streamsCnt) +} + +func (ref *subConnRef) affinityIncr() { + atomic.AddInt32(&ref.affinityCnt, 1) +} + +func (ref *subConnRef) affinityDecr() { + atomic.AddInt32(&ref.affinityCnt, -1) +} + +func (ref *subConnRef) streamsIncr() { + atomic.AddInt32(&ref.streamsCnt, 1) +} + +func (ref *subConnRef) streamsDecr() { + atomic.AddInt32(&ref.streamsCnt, -1) +} + +func (ref *subConnRef) deCallsInc() uint32 { + return atomic.AddUint32(&ref.deCalls, 1) +} + +func (ref *subConnRef) gotResp() { + ref.lastResp = time.Now() + atomic.StoreUint32(&ref.deCalls, 0) + ref.refreshCnt = 0 +} + +type gcpBalancer struct { + cfg *GCPBalancerConfig + methodCfg map[string]*pb.AffinityConfig + + addrs []resolver.Address + cc balancer.ClientConn + csEvltr *connectivityStateEvaluator + state connectivity.State + + mu sync.RWMutex + affinityMap map[string]balancer.SubConn + fallbackMap map[string]balancer.SubConn + scStates map[balancer.SubConn]connectivity.State + scRefs map[balancer.SubConn]*subConnRef + scRefList []*subConnRef + rrRefId uint32 + + // Map from a fresh SubConn to the subConnRef where we want to refresh subConn. + refreshingScRefs map[balancer.SubConn]*subConnRef + // Unresponsive detection enabled flag. + unresponsiveDetection bool + + picker balancer.Picker + log grpclog.LoggerV2 +} + +func (gb *gcpBalancer) initializeConfig(cfg *GCPBalancerConfig) { + gb.cfg = &GCPBalancerConfig{ + ApiConfig: &pb.ApiConfig{ + ChannelPool: &pb.ChannelPoolConfig{}, + }, + } + if cfg != nil && cfg.ApiConfig != nil { + gb.cfg = &GCPBalancerConfig{ + ApiConfig: proto.Clone(cfg.ApiConfig).(*pb.ApiConfig), + } + } + + if gb.cfg.GetChannelPool() == nil { + gb.cfg.ChannelPool = &pb.ChannelPoolConfig{} + } + cp := gb.cfg.GetChannelPool() + if cp.GetMinSize() == 0 { + cp.MinSize = defaultMinSize + } + if cp.GetMaxSize() == 0 { + cp.MaxSize = defaultMaxSize + } + if cp.GetMaxConcurrentStreamsLowWatermark() == 0 { + cp.MaxConcurrentStreamsLowWatermark = defaultMaxStreams + } + mp := make(map[string]*pb.AffinityConfig) + methodCfgs := gb.cfg.GetMethod() + for _, methodCfg := range methodCfgs { + methodNames := methodCfg.GetName() + affinityCfg := methodCfg.GetAffinity() + if methodNames != nil && affinityCfg != nil { + for _, method := range methodNames { + mp[method] = affinityCfg + } + } + } + gb.methodCfg = mp + gb.unresponsiveDetection = cp.GetUnresponsiveCalls() > 0 && cp.GetUnresponsiveDetectionMs() > 0 + gb.enforceMinSize() +} + +func (gb *gcpBalancer) enforceMinSize() { + for len(gb.scRefs) < int(gb.cfg.GetChannelPool().GetMinSize()) { + gb.addSubConn() + } +} + +func (gb *gcpBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error { + gb.mu.Lock() + defer gb.mu.Unlock() + addrs := ccs.ResolverState.Addresses + if gb.log.V(FINE) { + gb.log.Infoln("got new resolved addresses: ", addrs, " and balancer config: ", ccs.BalancerConfig) + } + gb.addrs = addrs + if gb.cfg == nil { + cfg, ok := ccs.BalancerConfig.(*GCPBalancerConfig) + if !ok && ccs.BalancerConfig != nil { + return fmt.Errorf("provided config is not GCPBalancerConfig: %v", ccs.BalancerConfig) + } + gb.initializeConfig(cfg) + } + + if len(gb.scRefs) == 0 { + gb.newSubConn() + return nil + } + + for _, scRef := range gb.scRefs { + // TODO(weiranf): update streams count when new addrs resolved? + scRef.subConn.UpdateAddresses(addrs) + scRef.subConn.Connect() + } + + return nil +} + +func (gb *gcpBalancer) ResolverError(err error) { + gb.log.Warningf("ResolverError: %v", err) +} + +// check current connection pool size +func (gb *gcpBalancer) getConnectionPoolSize() int { + // TODO(golobokov): replace this with locked increase of subconns. + gb.mu.Lock() + defer gb.mu.Unlock() + return len(gb.scRefs) +} + +// newSubConn creates a new SubConn using cc.NewSubConn and initialize the subConnRef +// if none of the subconns are in the Connecting state. +func (gb *gcpBalancer) newSubConn() { + gb.mu.Lock() + defer gb.mu.Unlock() + + // there are chances the newly created subconns are still connecting, + // we can wait on those new subconns. + for _, scState := range gb.scStates { + if scState == connectivity.Connecting || scState == connectivity.Idle { + return + } + } + gb.addSubConn() +} + +// addSubConn creates a new SubConn using cc.NewSubConn and initialize the subConnRef. +// Must be called holding the mutex lock. +func (gb *gcpBalancer) addSubConn() { + sc, err := gb.cc.NewSubConn( + gb.addrs, + balancer.NewSubConnOptions{HealthCheckEnabled: healthCheckEnabled}, + ) + if err != nil { + gb.log.Errorf("failed to NewSubConn: %v", err) + return + } + gb.scRefs[sc] = &subConnRef{ + subConn: sc, + stateSignal: make(chan struct{}), + lastResp: time.Now(), + } + gb.scStates[sc] = connectivity.Idle + gb.scRefList = append(gb.scRefList, gb.scRefs[sc]) + sc.Connect() +} + +// getReadySubConnRef returns a subConnRef and a bool. The bool indicates whether +// the boundKey exists in the affinityMap. If returned subConnRef is a nil, it +// means the underlying subconn is not READY yet. +func (gb *gcpBalancer) getReadySubConnRef(boundKey string) (*subConnRef, bool) { + gb.mu.Lock() + defer gb.mu.Unlock() + + if sc, ok := gb.affinityMap[boundKey]; ok { + if gb.scStates[sc] != connectivity.Ready { + // It's possible that the bound subconn is not in the readySubConns list, + // If it's not ready, we throw ErrNoSubConnAvailable or + // fallback to a previously mapped ready subconn or the least busy. + if gb.cfg.GetChannelPool().GetFallbackToReady() { + if sc, ok := gb.fallbackMap[boundKey]; ok { + return gb.scRefs[sc], true + } + // Try to create fallback mapping. + if scRef, err := gb.picker.(*gcpPicker).getLeastBusySubConnRef(); err == nil { + gb.fallbackMap[boundKey] = scRef.subConn + return scRef, true + } + } + return nil, true + } + return gb.scRefs[sc], true + } + return nil, false +} + +func (gb *gcpBalancer) getSubConnRoundRobin(ctx context.Context) *subConnRef { + if len(gb.scRefList) == 0 { + gb.newSubConn() + } + scRef := gb.scRefList[atomic.AddUint32(&gb.rrRefId, 1)%uint32(len(gb.scRefList))] + + gb.mu.RLock() + if state := gb.scStates[scRef.subConn]; state == connectivity.Ready { + gb.mu.RUnlock() + return scRef + } else { + grpclog.Infof("grpcgcp.gcpBalancer: scRef is not ready: %v", state) + } + + ticker := time.NewTicker(time.Millisecond * 100) + defer ticker.Stop() + + // Wait until SubConn is ready or call context is done. + for gb.scStates[scRef.subConn] != connectivity.Ready { + sigChan := scRef.stateSignal + gb.mu.RUnlock() + select { + case <-ctx.Done(): + return scRef + case <-ticker.C: + case <-sigChan: + } + gb.mu.RLock() + } + gb.mu.RUnlock() + + return scRef +} + +// bindSubConn binds the given affinity key to an existing subConnRef. +func (gb *gcpBalancer) bindSubConn(bindKey string, sc balancer.SubConn) { + gb.mu.Lock() + defer gb.mu.Unlock() + _, ok := gb.affinityMap[bindKey] + if !ok { + gb.affinityMap[bindKey] = sc + } + gb.scRefs[sc].affinityIncr() +} + +// unbindSubConn removes the existing binding associated with the key. +func (gb *gcpBalancer) unbindSubConn(boundKey string) { + gb.mu.Lock() + defer gb.mu.Unlock() + boundSC, ok := gb.affinityMap[boundKey] + if ok { + gb.scRefs[boundSC].affinityDecr() + delete(gb.affinityMap, boundKey) + } +} + +// regeneratePicker takes a snapshot of the balancer, and generates a picker +// from it. The picker is +// - errPicker with ErrTransientFailure if the balancer is in TransientFailure, +// - built by the pickerBuilder with all READY SubConns otherwise. +func (gb *gcpBalancer) regeneratePicker() { + if gb.state == connectivity.TransientFailure { + gb.picker = newErrPicker(balancer.ErrTransientFailure) + return + } + readyRefs := []*subConnRef{} + + // Select ready subConns from subConn map. + for sc, scState := range gb.scStates { + if scState == connectivity.Ready { + readyRefs = append(readyRefs, gb.scRefs[sc]) + } + } + gb.picker = newGCPPicker(readyRefs, gb) +} + +func (gb *gcpBalancer) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) { + gb.mu.Lock() + defer gb.mu.Unlock() + s := scs.ConnectivityState + + if scRef, found := gb.refreshingScRefs[sc]; found { + if gb.log.V(FINE) { + gb.log.Infof("handle replacement SubConn state change: %p, %v", sc, s) + } + if s != connectivity.Ready { + // Ignore the replacement sc until it's ready. + return + } + + // Replace SubConn of the scRef with the fresh SubConn (sc) concluding + // the refresh process initiated by refresh(*subConnRef). + oldSc := scRef.subConn + gb.scStates[sc] = gb.scStates[oldSc] + delete(gb.refreshingScRefs, sc) + delete(gb.scRefs, oldSc) + delete(gb.scStates, oldSc) + gb.scRefs[sc] = scRef + scRef.subConn = sc + scRef.deCalls = 0 + scRef.lastResp = time.Now() + scRef.refreshing = false + scRef.refreshCnt++ + gb.cc.RemoveSubConn(oldSc) + } + + if gb.log.V(FINE) { + gb.log.Infof("handle SubConn state change: %p, %v", sc, s) + } + + oldS, ok := gb.scStates[sc] + if !ok { + if gb.log.V(FINE) { + gb.log.Infof( + "got state changes for an unknown/replaced SubConn: %p, %v", + sc, + s, + ) + } + return + } + gb.scStates[sc] = s + switch s { + case connectivity.Idle: + sc.Connect() + case connectivity.Shutdown: + delete(gb.scRefs, sc) + delete(gb.scStates, sc) + } + if oldS == connectivity.Ready && s != oldS { + // Subconn is broken. Remove fallback mapping to this subconn. + for k, v := range gb.fallbackMap { + if v == sc { + delete(gb.fallbackMap, k) + } + } + } + if oldS != connectivity.Ready && s == connectivity.Ready { + // Remove fallback mapping for the keys of recovered subconn. + for k := range gb.fallbackMap { + if gb.affinityMap[k] == sc { + delete(gb.fallbackMap, k) + } + } + } + + oldAggrState := gb.state + gb.state = gb.csEvltr.recordTransition(oldS, s) + + // Regenerate picker when one of the following happens: + // - this sc became ready from not-ready + // - this sc became not-ready from ready + // - the aggregated state of balancer became TransientFailure from non-TransientFailure + // - the aggregated state of balancer became non-TransientFailure from TransientFailure + if (s == connectivity.Ready) != (oldS == connectivity.Ready) || + (gb.state == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) { + gb.regeneratePicker() + gb.cc.UpdateState(balancer.State{ + ConnectivityState: gb.state, + Picker: gb.picker, + }) + } + + if scRef := gb.scRefs[sc]; scRef != nil { + // Inform of the state change. + close(scRef.stateSignal) + scRef.stateSignal = make(chan struct{}) + } +} + +// refresh initiates a new SubConn for a specific subConnRef and starts connecting. +// If the refresh is already initiated for the ref, then this is a no-op. +func (gb *gcpBalancer) refresh(ref *subConnRef) { + if ref.refreshing { + return + } + gb.mu.Lock() + defer gb.mu.Unlock() + if ref.refreshing { + return + } + ref.refreshing = true + sc, err := gb.cc.NewSubConn( + gb.addrs, + balancer.NewSubConnOptions{HealthCheckEnabled: healthCheckEnabled}, + ) + if err != nil { + gb.log.Errorf("failed to create a replacement SubConn with NewSubConn: %v", err) + return + } + gb.refreshingScRefs[sc] = ref + sc.Connect() +} + +func (gb *gcpBalancer) Close() { +} diff --git a/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/gcp_interceptor.go b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/gcp_interceptor.go new file mode 100644 index 000000000..c0d77f1ce --- /dev/null +++ b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/gcp_interceptor.go @@ -0,0 +1,130 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpcgcp + +import ( + "context" + "sync" + + "google.golang.org/grpc" +) + +type key int + +var gcpKey key + +type gcpContext struct { + // request message used for pre-process of an affinity call + reqMsg interface{} + // response message used for post-process of an affinity call + replyMsg interface{} +} + +// GCPUnaryClientInterceptor intercepts the execution of a unary RPC +// and injects necessary information to be used by the picker. +func GCPUnaryClientInterceptor( + ctx context.Context, + method string, + req interface{}, + reply interface{}, + cc *grpc.ClientConn, + invoker grpc.UnaryInvoker, + opts ...grpc.CallOption, +) error { + gcpCtx := &gcpContext{ + reqMsg: req, + replyMsg: reply, + } + ctx = context.WithValue(ctx, gcpKey, gcpCtx) + + return invoker(ctx, method, req, reply, cc, opts...) +} + +// GCPStreamClientInterceptor intercepts the execution of a client streaming RPC +// and injects necessary information to be used by the picker. +func GCPStreamClientInterceptor( + ctx context.Context, + desc *grpc.StreamDesc, + cc *grpc.ClientConn, + method string, + streamer grpc.Streamer, + opts ...grpc.CallOption, +) (grpc.ClientStream, error) { + // This constructor does not create a real ClientStream, + // it only stores all parameters and let SendMsg() to create ClientStream. + cs := &gcpClientStream{ + ctx: ctx, + desc: desc, + cc: cc, + method: method, + streamer: streamer, + opts: opts, + } + cs.cond = sync.NewCond(cs) + return cs, nil +} + +type gcpClientStream struct { + sync.Mutex + grpc.ClientStream + + cond *sync.Cond + initStreamErr error + + ctx context.Context + desc *grpc.StreamDesc + cc *grpc.ClientConn + method string + streamer grpc.Streamer + opts []grpc.CallOption +} + +func (cs *gcpClientStream) SendMsg(m interface{}) error { + cs.Lock() + // Initialize underlying ClientStream when getting the first request. + if cs.ClientStream == nil { + ctx := context.WithValue(cs.ctx, gcpKey, &gcpContext{reqMsg: m}) + realCS, err := cs.streamer(ctx, cs.desc, cs.cc, cs.method, cs.opts...) + if err != nil { + cs.initStreamErr = err + cs.Unlock() + cs.cond.Broadcast() + return err + } + cs.ClientStream = realCS + } + cs.Unlock() + cs.cond.Broadcast() + return cs.ClientStream.SendMsg(m) +} + +func (cs *gcpClientStream) RecvMsg(m interface{}) error { + // If RecvMsg is called before SendMsg, it should wait until cs.ClientStream + // is initialized or the initialization failed. + cs.Lock() + for cs.initStreamErr == nil && cs.ClientStream == nil { + cs.cond.Wait() + } + if cs.initStreamErr != nil { + cs.Unlock() + return cs.initStreamErr + } + cs.Unlock() + return cs.ClientStream.RecvMsg(m) +} diff --git a/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/gcp_logger.go b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/gcp_logger.go new file mode 100644 index 000000000..cd1caf301 --- /dev/null +++ b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/gcp_logger.go @@ -0,0 +1,98 @@ +package grpcgcp + +import ( + "strings" + + "google.golang.org/grpc/grpclog" +) + +const ( + FINE = 90 + FINEST = 99 +) + +var compLogger = grpclog.Component("grpcgcp") + +type gcpLogger struct { + logger grpclog.LoggerV2 + prefix string +} + +// Make sure gcpLogger implements grpclog.LoggerV2. +var _ grpclog.LoggerV2 = (*gcpLogger)(nil) + +func NewGCPLogger(logger grpclog.LoggerV2, prefix string) *gcpLogger { + p := prefix + if !strings.HasSuffix(p, " ") { + p = p + " " + } + return &gcpLogger{ + logger: logger, + prefix: p, + } +} + +// Error implements grpclog.LoggerV2. +func (l *gcpLogger) Error(args ...interface{}) { + l.logger.Error(append([]interface{}{l.prefix}, args)...) +} + +// Errorf implements grpclog.LoggerV2. +func (l *gcpLogger) Errorf(format string, args ...interface{}) { + l.logger.Errorf(l.prefix+format, args...) +} + +// Errorln implements grpclog.LoggerV2. +func (l *gcpLogger) Errorln(args ...interface{}) { + l.logger.Errorln(append([]interface{}{l.prefix}, args)...) +} + +// Fatal implements grpclog.LoggerV2. +func (l *gcpLogger) Fatal(args ...interface{}) { + l.logger.Fatal(append([]interface{}{l.prefix}, args)...) +} + +// Fatalf implements grpclog.LoggerV2. +func (l *gcpLogger) Fatalf(format string, args ...interface{}) { + l.logger.Fatalf(l.prefix+format, args...) +} + +// Fatalln implements grpclog.LoggerV2. +func (l *gcpLogger) Fatalln(args ...interface{}) { + l.Fatalln(append([]interface{}{l.prefix}, args)...) +} + +// Info implements grpclog.LoggerV2. +func (l *gcpLogger) Info(args ...interface{}) { + l.logger.Info(append([]interface{}{l.prefix}, args)...) +} + +// Infof implements grpclog.LoggerV2. +func (l *gcpLogger) Infof(format string, args ...interface{}) { + l.logger.Infof(l.prefix+format, args...) +} + +// Infoln implements grpclog.LoggerV2. +func (l *gcpLogger) Infoln(args ...interface{}) { + l.logger.Infoln(append([]interface{}{l.prefix}, args)...) +} + +// V implements grpclog.LoggerV2. +func (l *gcpLogger) V(level int) bool { + return l.logger.V(level) +} + +// Warning implements grpclog.LoggerV2. +func (l *gcpLogger) Warning(args ...interface{}) { + l.logger.Warning(append([]interface{}{l.prefix}, args)...) +} + +// Warningf implements grpclog.LoggerV2. +func (l *gcpLogger) Warningf(format string, args ...interface{}) { + l.logger.Warningf(l.prefix+format, args...) +} + +// Warningln implements grpclog.LoggerV2. +func (l *gcpLogger) Warningln(args ...interface{}) { + l.logger.Warningln(append([]interface{}{l.prefix}, args)...) +} diff --git a/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/gcp_multiendpoint.go b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/gcp_multiendpoint.go new file mode 100644 index 000000000..9ee507437 --- /dev/null +++ b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/gcp_multiendpoint.go @@ -0,0 +1,408 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpcgcp + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + + "github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/multiendpoint" + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/grpclog" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" + + pb "github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp" +) + +var gmeCounter uint32 + +type contextMEKey int + +var meKey contextMEKey + +// NewMEContext returns a new Context that carries Multiendpoint name. +func NewMEContext(ctx context.Context, name string) context.Context { + return context.WithValue(ctx, meKey, name) +} + +// FromMEContext returns the MultiEndpoint name stored in ctx, if any. +func FromMEContext(ctx context.Context) (string, bool) { + name, ok := ctx.Value(meKey).(string) + return name, ok +} + +// GCPMultiEndpoint holds the state of MultiEndpoints-enabled gRPC client connection. +// +// The purposes of GCPMultiEndpoint are: +// +// - Fallback to an alternative endpoint (host:port) of a gRPC service when the original +// endpoint is completely unavailable. +// - Be able to route an RPC call to a specific group of endpoints. +// - Be able to reconfigure endpoints in runtime. +// +// A group of endpoints is called a [multiendpoint.MultiEndpoint] and is essentially a list of endpoints +// where priority is defined by the position in the list with the first endpoint having top +// priority. A MultiEndpoint tracks endpoints' availability. When a MultiEndpoint is picked for an +// RPC call, it picks the top priority endpoint that is currently available. More information on the +// [multiendpoint.MultiEndpoint]. +// +// GCPMultiEndpoint can have one or more MultiEndpoint identified by its name -- arbitrary +// string provided in the [GCPMultiEndpointOptions] when configuring MultiEndpoints. This name +// can be used to route an RPC call to this MultiEndpoint by using the [NewMEContext]. +// +// GCPMultiEndpoint uses [GCPMultiEndpointOptions] for initial configuration. +// An updated configuration can be provided at any time later using [UpdateMultiEndpoints]. +// +// Example: +// +// Let's assume we have a service with read and write operations and the following backends: +// +// - service.example.com -- the main set of backends supporting all operations +// - service-fallback.example.com -- read-write replica supporting all operations +// - ro-service.example.com -- read-only replica supporting only read operations +// +// Example configuration: +// +// - MultiEndpoint named "default" with endpoints: +// +// 1. service.example.com:443 +// +// 2. service-fallback.example.com:443 +// +// - MultiEndpoint named "read" with endpoints: +// +// 1. ro-service.example.com:443 +// +// 2. service-fallback.example.com:443 +// +// 3. service.example.com:443 +// +// With the configuration above GCPMultiEndpoint will use the "default" MultiEndpoint by +// default. It means that RPC calls by default will use the main endpoint and if it is not available +// then the read-write replica. +// +// To offload some read calls to the read-only replica we can specify "read" MultiEndpoint in the +// context. Then these calls will use the read-only replica endpoint and if it is not available +// then the read-write replica and if it is also not available then the main endpoint. +// +// GCPMultiEndpoint creates a [grpcgcp] connection pool for every unique +// endpoint. For the example above three connection pools will be created. +// +// [GCPMultiEndpoint] implements [grpc.ClientConnInterface] and can be used +// as a [grpc.ClientConn] when creating gRPC clients. +type GCPMultiEndpoint struct { + mu sync.RWMutex + + defaultName string + mes map[string]multiendpoint.MultiEndpoint + pools map[string]*monitoredConn + opts []grpc.DialOption + gcpConfig *pb.ApiConfig + dialFunc func(ctx context.Context, target string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) + log grpclog.LoggerV2 + + grpc.ClientConnInterface +} + +// Make sure GCPMultiEndpoint implements grpc.ClientConnInterface. +var _ grpc.ClientConnInterface = (*GCPMultiEndpoint)(nil) + +func (gme *GCPMultiEndpoint) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption) error { + return gme.pickConn(ctx).Invoke(ctx, method, args, reply, opts...) +} + +func (gme *GCPMultiEndpoint) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) { + return gme.pickConn(ctx).NewStream(ctx, desc, method, opts...) +} + +func (gme *GCPMultiEndpoint) pickConn(ctx context.Context) *grpc.ClientConn { + name, ok := FromMEContext(ctx) + me, ook := gme.mes[name] + if !ok || !ook { + me = gme.mes[gme.defaultName] + } + return gme.pools[me.Current()].conn +} + +func (gme *GCPMultiEndpoint) Close() error { + var errs multiError + for e, mc := range gme.pools { + mc.stopMonitoring() + if err := mc.conn.Close(); err != nil { + errs = append(errs, err) + gme.log.Errorf("error while closing the pool for %q endpoint: %v", e, err) + } + if gme.log.V(FINE) { + gme.log.Infof("closed channel pool for %q endpoint.", e) + } + } + return errs.Combine() +} + +func (gme *GCPMultiEndpoint) GCPConfig() *pb.ApiConfig { + return proto.Clone(gme.gcpConfig).(*pb.ApiConfig) +} + +// GCPMultiEndpointOptions holds options to construct a MultiEndpoints-enabled gRPC client +// connection. +type GCPMultiEndpointOptions struct { + // Regular gRPC-GCP configuration to be applied to every endpoint. + GRPCgcpConfig *pb.ApiConfig + // Map of MultiEndpoints where key is the MultiEndpoint name. + MultiEndpoints map[string]*multiendpoint.MultiEndpointOptions + // Name of the default MultiEndpoint. + Default string + // Func to dial grpc ClientConn. + DialFunc func(ctx context.Context, target string, dopts ...grpc.DialOption) (*grpc.ClientConn, error) +} + +// NewGCPMultiEndpoint creates new [GCPMultiEndpoint] -- MultiEndpoints-enabled gRPC client +// connection. +// +// Deprecated: use NewGCPMultiEndpoint. +func NewGcpMultiEndpoint(meOpts *GCPMultiEndpointOptions, opts ...grpc.DialOption) (*GCPMultiEndpoint, error) { + return NewGCPMultiEndpoint(meOpts, opts...) +} + +// NewGCPMultiEndpoint creates new [GCPMultiEndpoint] -- MultiEndpoints-enabled gRPC client +// connection. +// +// [GCPMultiEndpoint] implements [grpc.ClientConnInterface] and can be used +// as a [grpc.ClientConn] when creating gRPC clients. +func NewGCPMultiEndpoint(meOpts *GCPMultiEndpointOptions, opts ...grpc.DialOption) (*GCPMultiEndpoint, error) { + // Read config, create multiendpoints and pools. + o, err := makeOpts(meOpts, opts) + if err != nil { + return nil, err + } + gme := &GCPMultiEndpoint{ + mes: make(map[string]multiendpoint.MultiEndpoint), + pools: make(map[string]*monitoredConn), + defaultName: meOpts.Default, + opts: o, + gcpConfig: proto.Clone(meOpts.GRPCgcpConfig).(*pb.ApiConfig), + dialFunc: meOpts.DialFunc, + log: NewGCPLogger(compLogger, fmt.Sprintf("[GCPMultiEndpoint #%d]", atomic.AddUint32(&gmeCounter, 1))), + } + if gme.dialFunc == nil { + gme.dialFunc = func(_ context.Context, target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + return grpc.Dial(target, opts...) + } + } + if err := gme.UpdateMultiEndpoints(meOpts); err != nil { + return nil, err + } + return gme, nil +} + +func makeOpts(meOpts *GCPMultiEndpointOptions, opts []grpc.DialOption) ([]grpc.DialOption, error) { + grpcGCPjsonConfig, err := protojson.Marshal(meOpts.GRPCgcpConfig) + if err != nil { + return nil, err + } + o := append([]grpc.DialOption{}, opts...) + o = append(o, []grpc.DialOption{ + grpc.WithDisableServiceConfig(), + grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":%s}]}`, Name, string(grpcGCPjsonConfig))), + grpc.WithChainUnaryInterceptor(GCPUnaryClientInterceptor), + grpc.WithChainStreamInterceptor(GCPStreamClientInterceptor), + }...) + + return o, nil +} + +type monitoredConn struct { + endpoint string + conn *grpc.ClientConn + gme *GCPMultiEndpoint + cancel context.CancelFunc +} + +func newMonitoredConn(endpoint string, conn *grpc.ClientConn, gme *GCPMultiEndpoint) (mc *monitoredConn) { + ctx, cancel := context.WithCancel(context.Background()) + mc = &monitoredConn{ + endpoint: endpoint, + conn: conn, + gme: gme, + cancel: cancel, + } + go mc.monitor(ctx) + return +} + +func (mc *monitoredConn) notify(state connectivity.State) { + if mc.gme.log.V(FINE) { + mc.gme.log.Infof("%q endpoint state changed to %v", mc.endpoint, state) + } + // Inform all multiendpoints. + mc.gme.mu.RLock() + for _, me := range mc.gme.mes { + me.SetEndpointAvailability(mc.endpoint, state == connectivity.Ready) + } + mc.gme.mu.RUnlock() +} + +func (mc *monitoredConn) monitor(ctx context.Context) { + for { + currentState := mc.conn.GetState() + mc.notify(currentState) + if !mc.conn.WaitForStateChange(ctx, currentState) { + break + } + } +} + +func (mc *monitoredConn) stopMonitoring() { + mc.cancel() +} + +// UpdateMultiEndpoints reconfigures MultiEndpoints. +// +// MultiEndpoints are matched with the current ones by name. +// +// - If a current MultiEndpoint is missing in the updated list, the MultiEndpoint will be +// removed. +// - A new MultiEndpoint will be created for every new name in the list. +// - For an existing MultiEndpoint only its endpoints will be updated (no recovery timeout +// change). +// +// Endpoints are matched by the endpoint address (usually in the form of address:port). +// +// - If an existing endpoint is not used by any MultiEndpoint in the updated list, then the +// connection poll for this endpoint will be shutdown. +// - A connection pool will be created for every new endpoint. +// - For an existing endpoint nothing will change (the connection pool will not be re-created, +// thus no connection credentials change, nor connection configuration change). +func (gme *GCPMultiEndpoint) UpdateMultiEndpoints(meOpts *GCPMultiEndpointOptions) error { + gme.mu.Lock() + defer gme.mu.Unlock() + if _, ok := meOpts.MultiEndpoints[meOpts.Default]; !ok { + return fmt.Errorf("default MultiEndpoint %q missing options", meOpts.Default) + } + + validPools := make(map[string]bool) + for _, meo := range meOpts.MultiEndpoints { + for _, e := range meo.Endpoints { + validPools[e] = true + } + } + + // Add missing pools. + for e := range validPools { + if _, ok := gme.pools[e]; !ok { + // This creates a ClientConn with the gRPC-GCP balancer managing connection pool. + conn, err := gme.dialFunc(context.Background(), e, gme.opts...) + if err != nil { + return err + } + if gme.log.V(FINE) { + gme.log.Infof("created new channel pool for %q endpoint.", e) + } + gme.pools[e] = newMonitoredConn(e, conn, gme) + } + } + + // Add new multi-endpoints and update existing. + for name, meo := range meOpts.MultiEndpoints { + if me, ok := gme.mes[name]; ok { + // Updating existing MultiEndpoint. + me.SetEndpoints(meo.Endpoints) + continue + } + + // Add new MultiEndpoint. + if gme.log.V(FINE) { + gme.log.Infof("creating new %q multiendpoint.", name) + } + me, err := multiendpoint.NewMultiEndpoint(meo) + if err != nil { + return err + } + gme.mes[name] = me + } + gme.defaultName = meOpts.Default + + // Remove obsolete MultiEndpoints. + for name := range gme.mes { + if _, ok := meOpts.MultiEndpoints[name]; !ok { + delete(gme.mes, name) + if gme.log.V(FINE) { + gme.log.Infof("removed obsolete %q multiendpoint.", name) + } + } + } + + // Remove obsolete pools. + for e, mc := range gme.pools { + if _, ok := validPools[e]; !ok { + if err := mc.conn.Close(); err != nil { + gme.log.Errorf("error while closing the pool for %q endpoint: %v", e, err) + } + if gme.log.V(FINE) { + gme.log.Infof("closed channel pool for %q endpoint.", e) + } + mc.stopMonitoring() + delete(gme.pools, e) + } + } + + // Trigger status update. + for e, mc := range gme.pools { + s := mc.conn.GetState() + for _, me := range gme.mes { + me.SetEndpointAvailability(e, s == connectivity.Ready) + } + } + return nil +} + +type multiError []error + +func (m multiError) Error() string { + s, n := "", 0 + for _, e := range m { + if e != nil { + if n == 0 { + s = e.Error() + } + n++ + } + } + switch n { + case 0: + return "(0 errors)" + case 1: + return s + case 2: + return s + " (and 1 other error)" + } + return fmt.Sprintf("%s (and %d other errors)", s, n-1) +} + +func (m multiError) Combine() error { + if len(m) == 0 { + return nil + } + + return m +} diff --git a/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/gcp_picker.go b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/gcp_picker.go new file mode 100644 index 000000000..522323625 --- /dev/null +++ b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/gcp_picker.go @@ -0,0 +1,276 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpcgcp + +import ( + "context" + "fmt" + "reflect" + "strings" + "sync" + "time" + + "github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/status" +) + +// Deadline exceeded gRPC error caused by client-side context reached deadline. +var deErr = status.Error(codes.DeadlineExceeded, context.DeadlineExceeded.Error()) + +func newGCPPicker(readySCRefs []*subConnRef, gb *gcpBalancer) balancer.Picker { + gp := &gcpPicker{ + gb: gb, + scRefs: readySCRefs, + } + gp.log = NewGCPLogger(gb.log, fmt.Sprintf("[gcpPicker %p]", gp)) + return gp +} + +type gcpPicker struct { + gb *gcpBalancer + mu sync.Mutex + scRefs []*subConnRef + log grpclog.LoggerV2 +} + +func (p *gcpPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { + if len(p.scRefs) <= 0 { + if p.log.V(FINEST) { + p.log.Info("returning balancer.ErrNoSubConnAvailable as no subconns are available.") + } + return balancer.PickResult{}, balancer.ErrNoSubConnAvailable + } + + ctx := info.Ctx + gcpCtx, hasGCPCtx := ctx.Value(gcpKey).(*gcpContext) + boundKey := "" + locator := "" + var cmd grpc_gcp.AffinityConfig_Command + + if mcfg, ok := p.gb.methodCfg[info.FullMethodName]; ok { + locator = mcfg.GetAffinityKey() + cmd = mcfg.GetCommand() + if hasGCPCtx && (cmd == grpc_gcp.AffinityConfig_BOUND || cmd == grpc_gcp.AffinityConfig_UNBIND) { + a, err := getAffinityKeysFromMessage(locator, gcpCtx.reqMsg) + if err != nil { + return balancer.PickResult{}, fmt.Errorf( + "failed to retrieve affinity key from request message: %v", err) + } + boundKey = a[0] + } + } + + scRef, err := p.getAndIncrementSubConnRef(info.Ctx, boundKey, cmd) + if err != nil { + return balancer.PickResult{}, err + } + if scRef == nil { + if p.log.V(FINEST) { + p.log.Info("returning balancer.ErrNoSubConnAvailable as no SubConn was picked.") + } + return balancer.PickResult{}, balancer.ErrNoSubConnAvailable + } + + callStarted := time.Now() + // define callback for post process once call is done + callback := func(info balancer.DoneInfo) { + scRef.streamsDecr() + p.detectUnresponsive(ctx, scRef, callStarted, info.Err) + if info.Err != nil { + return + } + + switch cmd { + case grpc_gcp.AffinityConfig_BIND: + bindKeys, err := getAffinityKeysFromMessage(locator, gcpCtx.replyMsg) + if err == nil { + for _, bk := range bindKeys { + p.gb.bindSubConn(bk, scRef.subConn) + } + } + case grpc_gcp.AffinityConfig_UNBIND: + p.gb.unbindSubConn(boundKey) + } + } + + if p.log.V(FINEST) { + p.log.Infof("picked SubConn: %p", scRef.subConn) + } + return balancer.PickResult{SubConn: scRef.subConn, Done: callback}, nil +} + +// unresponsiveWindow returns channel pool's unresponsiveDetectionMs multiplied +// by 2^(refresh count since last response) as a time.Duration. This provides +// exponential backoff when RPCs keep deadline exceeded after consecutive reconnections. +func (p *gcpPicker) unresponsiveWindow(scRef *subConnRef) time.Duration { + factor := uint32(1 << scRef.refreshCnt) + return time.Millisecond * time.Duration(factor*p.gb.cfg.GetChannelPool().GetUnresponsiveDetectionMs()) +} + +func (p *gcpPicker) detectUnresponsive(ctx context.Context, scRef *subConnRef, callStarted time.Time, rpcErr error) { + if !p.gb.unresponsiveDetection { + return + } + + // Treat as a response from the server if deadline exceeded was not caused by client side context reached deadline. + if dl, ok := ctx.Deadline(); rpcErr == nil || status.Code(rpcErr) != codes.DeadlineExceeded || + rpcErr.Error() != deErr.Error() || !ok || dl.After(time.Now()) { + scRef.gotResp() + return + } + + if callStarted.Before(scRef.lastResp) { + return + } + + // Increment deadline exceeded calls and check if there were enough deadline + // exceeded calls and enough time passed since last response to trigger refresh. + if scRef.deCallsInc() >= p.gb.cfg.GetChannelPool().GetUnresponsiveCalls() && + scRef.lastResp.Before(time.Now().Add(-p.unresponsiveWindow(scRef))) { + p.gb.refresh(scRef) + } +} + +func (p *gcpPicker) getAndIncrementSubConnRef(ctx context.Context, boundKey string, cmd grpc_gcp.AffinityConfig_Command) (*subConnRef, error) { + if cmd == grpc_gcp.AffinityConfig_BIND && p.gb.cfg.GetChannelPool().GetBindPickStrategy() == grpc_gcp.ChannelPoolConfig_ROUND_ROBIN { + scRef := p.gb.getSubConnRoundRobin(ctx) + if p.log.V(FINEST) { + p.log.Infof("picking SubConn for round-robin bind: %p", scRef.subConn) + } + scRef.streamsIncr() + return scRef, nil + } + + p.mu.Lock() + defer p.mu.Unlock() + scRef, err := p.getSubConnRef(boundKey) + if err != nil { + return nil, err + } + if scRef != nil { + scRef.streamsIncr() + } + return scRef, nil +} + +// getSubConnRef returns the subConnRef object that contains the subconn +// ready to be used by picker. +// Must be called holding the picker mutex lock. +func (p *gcpPicker) getSubConnRef(boundKey string) (*subConnRef, error) { + if boundKey != "" { + if ref, ok := p.gb.getReadySubConnRef(boundKey); ok { + return ref, nil + } + } + + return p.getLeastBusySubConnRef() +} + +// Must be called holding the picker mutex lock. +func (p *gcpPicker) getLeastBusySubConnRef() (*subConnRef, error) { + minScRef := p.scRefs[0] + minStreamsCnt := minScRef.getStreamsCnt() + for _, scRef := range p.scRefs { + if scRef.getStreamsCnt() < minStreamsCnt { + minStreamsCnt = scRef.getStreamsCnt() + minScRef = scRef + } + } + + // If the least busy connection still has capacity, use it + if minStreamsCnt < int32(p.gb.cfg.GetChannelPool().GetMaxConcurrentStreamsLowWatermark()) { + return minScRef, nil + } + + if p.gb.cfg.GetChannelPool().GetMaxSize() == 0 || p.gb.getConnectionPoolSize() < int(p.gb.cfg.GetChannelPool().GetMaxSize()) { + // Ask balancer to create new subconn when all current subconns are busy and + // the connection pool still has capacity (either unlimited or maxSize is not reached). + p.gb.newSubConn() + + // Let this picker return ErrNoSubConnAvailable because it needs some time + // for the subconn to be READY. + return nil, balancer.ErrNoSubConnAvailable + } + + // If no capacity for the pool size and every connection reachs the soft limit, + // Then picks the least busy one anyway. + return minScRef, nil +} + +func keysFromMessage(val reflect.Value, path []string, start int) ([]string, error) { + if val.Kind() == reflect.Pointer || val.Kind() == reflect.Interface { + val = val.Elem() + } + + if len(path) == start { + if val.Kind() != reflect.String { + return nil, fmt.Errorf("cannot get string value from %q which is %q", strings.Join(path, "."), val.Kind()) + } + return []string{val.String()}, nil + } + + if val.Kind() != reflect.Struct { + return nil, fmt.Errorf("path %q traversal error: cannot lookup field %q (index %d in the path) in a %q value", strings.Join(path, "."), path[start], start, val.Kind()) + } + valField := val.FieldByName(strings.Title(path[start])) + + if valField.Kind() != reflect.Slice { + return keysFromMessage(valField, path, start+1) + } + + keys := []string{} + for i := 0; i < valField.Len(); i++ { + kk, err := keysFromMessage(valField.Index(i), path, start+1) + if err != nil { + return keys, err + } + keys = append(keys, kk...) + } + return keys, nil +} + +// getAffinityKeysFromMessage retrieves the affinity key(s) from proto message using +// the key locator defined in the affinity config. +func getAffinityKeysFromMessage( + locator string, + msg interface{}, +) (affinityKeys []string, err error) { + names := strings.Split(locator, ".") + if len(names) == 0 { + return nil, fmt.Errorf("empty affinityKey locator") + } + + return keysFromMessage(reflect.ValueOf(msg), names, 0) +} + +// NewErrPicker returns a picker that always returns err on Pick(). +func newErrPicker(err error) balancer.Picker { + return &errPicker{err: err} +} + +type errPicker struct { + err error // Pick() always returns this err. +} + +func (p *errPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { + return balancer.PickResult{}, p.err +} diff --git a/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp/codegen.sh b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp/codegen.sh new file mode 100644 index 000000000..334a718cc --- /dev/null +++ b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp/codegen.sh @@ -0,0 +1,6 @@ +#!/usr/bin/env bash +cd "$(dirname "$0")" + +rm grpc_gcp.pb.go +protoc --plugin=$(go env GOPATH)/bin/protoc-gen-go --proto_path=./ --go_out=.. ./grpc_gcp.proto + diff --git a/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp/grpc_gcp.pb.go b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp/grpc_gcp.pb.go new file mode 100644 index 000000000..51dd4b27a --- /dev/null +++ b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp/grpc_gcp.pb.go @@ -0,0 +1,638 @@ +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.27.1 +// protoc v3.12.4 +// source: grpc_gcp.proto + +package grpc_gcp + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// A selection of strategies for picking a channel for a call with BIND command. +type ChannelPoolConfig_BindPickStrategy int32 + +const ( + // No preference -- picking a channel for a BIND call will be no different + // than for any other calls. + ChannelPoolConfig_UNSPECIFIED ChannelPoolConfig_BindPickStrategy = 0 + // A channel with the least active streams at the moment of a BIND call + // initiation will be picked. + ChannelPoolConfig_LEAST_ACTIVE_STREAMS ChannelPoolConfig_BindPickStrategy = 1 + // Cycle through channels created by the BIND call initiation. I. e. pick + // a channel in a round-robin manner. Note that some channels may be + // skipped during channel pool resize. + ChannelPoolConfig_ROUND_ROBIN ChannelPoolConfig_BindPickStrategy = 2 +) + +// Enum value maps for ChannelPoolConfig_BindPickStrategy. +var ( + ChannelPoolConfig_BindPickStrategy_name = map[int32]string{ + 0: "UNSPECIFIED", + 1: "LEAST_ACTIVE_STREAMS", + 2: "ROUND_ROBIN", + } + ChannelPoolConfig_BindPickStrategy_value = map[string]int32{ + "UNSPECIFIED": 0, + "LEAST_ACTIVE_STREAMS": 1, + "ROUND_ROBIN": 2, + } +) + +func (x ChannelPoolConfig_BindPickStrategy) Enum() *ChannelPoolConfig_BindPickStrategy { + p := new(ChannelPoolConfig_BindPickStrategy) + *p = x + return p +} + +func (x ChannelPoolConfig_BindPickStrategy) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (ChannelPoolConfig_BindPickStrategy) Descriptor() protoreflect.EnumDescriptor { + return file_grpc_gcp_proto_enumTypes[0].Descriptor() +} + +func (ChannelPoolConfig_BindPickStrategy) Type() protoreflect.EnumType { + return &file_grpc_gcp_proto_enumTypes[0] +} + +func (x ChannelPoolConfig_BindPickStrategy) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use ChannelPoolConfig_BindPickStrategy.Descriptor instead. +func (ChannelPoolConfig_BindPickStrategy) EnumDescriptor() ([]byte, []int) { + return file_grpc_gcp_proto_rawDescGZIP(), []int{1, 0} +} + +type AffinityConfig_Command int32 + +const ( + // The annotated method will be required to be bound to an existing session + // to execute the RPC. The corresponding <affinity_key_field_path> will be + // used to find the affinity key from the request message. + AffinityConfig_BOUND AffinityConfig_Command = 0 + // The annotated method will establish the channel affinity with the + // channel which is used to execute the RPC. The corresponding + // <affinity_key_field_path> will be used to find the affinity key from the + // response message. + AffinityConfig_BIND AffinityConfig_Command = 1 + // The annotated method will remove the channel affinity with the + // channel which is used to execute the RPC. The corresponding + // <affinity_key_field_path> will be used to find the affinity key from the + // request message. + AffinityConfig_UNBIND AffinityConfig_Command = 2 +) + +// Enum value maps for AffinityConfig_Command. +var ( + AffinityConfig_Command_name = map[int32]string{ + 0: "BOUND", + 1: "BIND", + 2: "UNBIND", + } + AffinityConfig_Command_value = map[string]int32{ + "BOUND": 0, + "BIND": 1, + "UNBIND": 2, + } +) + +func (x AffinityConfig_Command) Enum() *AffinityConfig_Command { + p := new(AffinityConfig_Command) + *p = x + return p +} + +func (x AffinityConfig_Command) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (AffinityConfig_Command) Descriptor() protoreflect.EnumDescriptor { + return file_grpc_gcp_proto_enumTypes[1].Descriptor() +} + +func (AffinityConfig_Command) Type() protoreflect.EnumType { + return &file_grpc_gcp_proto_enumTypes[1] +} + +func (x AffinityConfig_Command) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use AffinityConfig_Command.Descriptor instead. +func (AffinityConfig_Command) EnumDescriptor() ([]byte, []int) { + return file_grpc_gcp_proto_rawDescGZIP(), []int{3, 0} +} + +type ApiConfig struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // The channel pool configurations. + ChannelPool *ChannelPoolConfig `protobuf:"bytes,2,opt,name=channel_pool,json=channelPool,proto3" json:"channel_pool,omitempty"` + // The method configurations. + Method []*MethodConfig `protobuf:"bytes,1001,rep,name=method,proto3" json:"method,omitempty"` +} + +func (x *ApiConfig) Reset() { + *x = ApiConfig{} + if protoimpl.UnsafeEnabled { + mi := &file_grpc_gcp_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ApiConfig) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ApiConfig) ProtoMessage() {} + +func (x *ApiConfig) ProtoReflect() protoreflect.Message { + mi := &file_grpc_gcp_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ApiConfig.ProtoReflect.Descriptor instead. +func (*ApiConfig) Descriptor() ([]byte, []int) { + return file_grpc_gcp_proto_rawDescGZIP(), []int{0} +} + +func (x *ApiConfig) GetChannelPool() *ChannelPoolConfig { + if x != nil { + return x.ChannelPool + } + return nil +} + +func (x *ApiConfig) GetMethod() []*MethodConfig { + if x != nil { + return x.Method + } + return nil +} + +// ChannelPoolConfig are options for configuring the channel pool. +// RPCs will be scheduled onto existing channels in the pool until all channels +// have <max_concurrent_streams_low_watermark> number of streams. At this point +// a new channel is spun out. Once <max_size> channels have been spun out and +// each has <max_concurrent_streams_low_watermark> streams, subsequent RPCs will +// hang until any of the in-flight RPCs is finished, freeing up a channel. +type ChannelPoolConfig struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // The max number of channels in the pool. + // Default value is 0, meaning 'unlimited' size. + MaxSize uint32 `protobuf:"varint,1,opt,name=max_size,json=maxSize,proto3" json:"max_size,omitempty"` + // The idle timeout (seconds) of channels without bound affinity sessions. + IdleTimeout uint64 `protobuf:"varint,2,opt,name=idle_timeout,json=idleTimeout,proto3" json:"idle_timeout,omitempty"` + // The low watermark of max number of concurrent streams in a channel. + // New channel will be created once it get hit, until we reach the max size of the channel pool. + // Default value is 100. The valid range is [1, 100]. Any value outside the range will be ignored and the default value will be used. + // Note: It is not recommended that users adjust this value, since a single channel should generally have no trouble managing the default (maximum) number of streams. + MaxConcurrentStreamsLowWatermark uint32 `protobuf:"varint,3,opt,name=max_concurrent_streams_low_watermark,json=maxConcurrentStreamsLowWatermark,proto3" json:"max_concurrent_streams_low_watermark,omitempty"` + // The minimum number of channels in the pool. + MinSize uint32 `protobuf:"varint,4,opt,name=min_size,json=minSize,proto3" json:"min_size,omitempty"` + // If a channel mapped to an affinity key is not ready, temporarily fallback + // to another ready channel. + // Enabling this fallback is beneficial in scenarios with short RPC timeouts + // and rather slow connection establishing or during incidents when new + // connections fail but existing connections still operate. + FallbackToReady bool `protobuf:"varint,5,opt,name=fallback_to_ready,json=fallbackToReady,proto3" json:"fallback_to_ready,omitempty"` + // Enables per channel unresponsive connection detection if > 0 and unresponsive_calls > 0. + // If enabled and more than unresponsive_detection_ms passed since the last response from the server, + // and >= unresponsive_calls RPC calls (started after last response from the server) timed-out on the client side, + // then the connection of that channel will be gracefully refreshed. I.e., a new connection will be created for + // that channel and after the new connection is ready it will replace the old connection. The calls on the old + // connection will not be interrupted. The unresponsive_detection_ms will be doubled every consecutive refresh + // if no response from the server is received. + UnresponsiveDetectionMs uint32 `protobuf:"varint,6,opt,name=unresponsive_detection_ms,json=unresponsiveDetectionMs,proto3" json:"unresponsive_detection_ms,omitempty"` + // Enables per channel unresponsive connection detection if > 0 and unresponsive_detection_ms > 0. + // If enabled and more than unresponsive_detection_ms passed since the last response from the server, + // and >= unresponsive_calls RPC calls (started after last response from the server) timed-out on the client side, + // then the connection of that channel will be gracefully refreshed. I.e., a new connection will be created for + // that channel and after the new connection is ready it will replace the old connection. The calls on the old + // connection will not be interrupted. The unresponsive_detection_ms will be doubled every consecutive refresh + // if no response from the server is received. + UnresponsiveCalls uint32 `protobuf:"varint,7,opt,name=unresponsive_calls,json=unresponsiveCalls,proto3" json:"unresponsive_calls,omitempty"` + // The strategy for picking a channel for a call with BIND command. + BindPickStrategy ChannelPoolConfig_BindPickStrategy `protobuf:"varint,8,opt,name=bind_pick_strategy,json=bindPickStrategy,proto3,enum=grpc.gcp.ChannelPoolConfig_BindPickStrategy" json:"bind_pick_strategy,omitempty"` +} + +func (x *ChannelPoolConfig) Reset() { + *x = ChannelPoolConfig{} + if protoimpl.UnsafeEnabled { + mi := &file_grpc_gcp_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ChannelPoolConfig) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ChannelPoolConfig) ProtoMessage() {} + +func (x *ChannelPoolConfig) ProtoReflect() protoreflect.Message { + mi := &file_grpc_gcp_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ChannelPoolConfig.ProtoReflect.Descriptor instead. +func (*ChannelPoolConfig) Descriptor() ([]byte, []int) { + return file_grpc_gcp_proto_rawDescGZIP(), []int{1} +} + +func (x *ChannelPoolConfig) GetMaxSize() uint32 { + if x != nil { + return x.MaxSize + } + return 0 +} + +func (x *ChannelPoolConfig) GetIdleTimeout() uint64 { + if x != nil { + return x.IdleTimeout + } + return 0 +} + +func (x *ChannelPoolConfig) GetMaxConcurrentStreamsLowWatermark() uint32 { + if x != nil { + return x.MaxConcurrentStreamsLowWatermark + } + return 0 +} + +func (x *ChannelPoolConfig) GetMinSize() uint32 { + if x != nil { + return x.MinSize + } + return 0 +} + +func (x *ChannelPoolConfig) GetFallbackToReady() bool { + if x != nil { + return x.FallbackToReady + } + return false +} + +func (x *ChannelPoolConfig) GetUnresponsiveDetectionMs() uint32 { + if x != nil { + return x.UnresponsiveDetectionMs + } + return 0 +} + +func (x *ChannelPoolConfig) GetUnresponsiveCalls() uint32 { + if x != nil { + return x.UnresponsiveCalls + } + return 0 +} + +func (x *ChannelPoolConfig) GetBindPickStrategy() ChannelPoolConfig_BindPickStrategy { + if x != nil { + return x.BindPickStrategy + } + return ChannelPoolConfig_UNSPECIFIED +} + +type MethodConfig struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // A fully qualified name of a gRPC method, or a wildcard pattern ending + // with .*, such as foo.bar.A, foo.bar.*. Method configs are evaluated + // sequentially, and the first one takes precedence. + Name []string `protobuf:"bytes,1,rep,name=name,proto3" json:"name,omitempty"` + // The channel affinity configurations. + Affinity *AffinityConfig `protobuf:"bytes,1001,opt,name=affinity,proto3" json:"affinity,omitempty"` +} + +func (x *MethodConfig) Reset() { + *x = MethodConfig{} + if protoimpl.UnsafeEnabled { + mi := &file_grpc_gcp_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *MethodConfig) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MethodConfig) ProtoMessage() {} + +func (x *MethodConfig) ProtoReflect() protoreflect.Message { + mi := &file_grpc_gcp_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MethodConfig.ProtoReflect.Descriptor instead. +func (*MethodConfig) Descriptor() ([]byte, []int) { + return file_grpc_gcp_proto_rawDescGZIP(), []int{2} +} + +func (x *MethodConfig) GetName() []string { + if x != nil { + return x.Name + } + return nil +} + +func (x *MethodConfig) GetAffinity() *AffinityConfig { + if x != nil { + return x.Affinity + } + return nil +} + +type AffinityConfig struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // The affinity command applies on the selected gRPC methods. + Command AffinityConfig_Command `protobuf:"varint,2,opt,name=command,proto3,enum=grpc.gcp.AffinityConfig_Command" json:"command,omitempty"` + // The field path of the affinity key in the request/response message. + // For example: "f.a", "f.b.d", etc. + AffinityKey string `protobuf:"bytes,3,opt,name=affinity_key,json=affinityKey,proto3" json:"affinity_key,omitempty"` +} + +func (x *AffinityConfig) Reset() { + *x = AffinityConfig{} + if protoimpl.UnsafeEnabled { + mi := &file_grpc_gcp_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *AffinityConfig) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AffinityConfig) ProtoMessage() {} + +func (x *AffinityConfig) ProtoReflect() protoreflect.Message { + mi := &file_grpc_gcp_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AffinityConfig.ProtoReflect.Descriptor instead. +func (*AffinityConfig) Descriptor() ([]byte, []int) { + return file_grpc_gcp_proto_rawDescGZIP(), []int{3} +} + +func (x *AffinityConfig) GetCommand() AffinityConfig_Command { + if x != nil { + return x.Command + } + return AffinityConfig_BOUND +} + +func (x *AffinityConfig) GetAffinityKey() string { + if x != nil { + return x.AffinityKey + } + return "" +} + +var File_grpc_gcp_proto protoreflect.FileDescriptor + +var file_grpc_gcp_proto_rawDesc = []byte{ + 0x0a, 0x0e, 0x67, 0x72, 0x70, 0x63, 0x5f, 0x67, 0x63, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x12, 0x08, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x67, 0x63, 0x70, 0x22, 0x7c, 0x0a, 0x09, 0x41, 0x70, + 0x69, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x3e, 0x0a, 0x0c, 0x63, 0x68, 0x61, 0x6e, 0x6e, + 0x65, 0x6c, 0x5f, 0x70, 0x6f, 0x6f, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, + 0x67, 0x72, 0x70, 0x63, 0x2e, 0x67, 0x63, 0x70, 0x2e, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, + 0x50, 0x6f, 0x6f, 0x6c, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0b, 0x63, 0x68, 0x61, 0x6e, + 0x6e, 0x65, 0x6c, 0x50, 0x6f, 0x6f, 0x6c, 0x12, 0x2f, 0x0a, 0x06, 0x6d, 0x65, 0x74, 0x68, 0x6f, + 0x64, 0x18, 0xe9, 0x07, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, + 0x67, 0x63, 0x70, 0x2e, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, + 0x52, 0x06, 0x6d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x22, 0xff, 0x03, 0x0a, 0x11, 0x43, 0x68, 0x61, + 0x6e, 0x6e, 0x65, 0x6c, 0x50, 0x6f, 0x6f, 0x6c, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x19, + 0x0a, 0x08, 0x6d, 0x61, 0x78, 0x5f, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, + 0x52, 0x07, 0x6d, 0x61, 0x78, 0x53, 0x69, 0x7a, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x69, 0x64, 0x6c, + 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, + 0x0b, 0x69, 0x64, 0x6c, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x12, 0x4e, 0x0a, 0x24, + 0x6d, 0x61, 0x78, 0x5f, 0x63, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x5f, 0x73, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x5f, 0x6c, 0x6f, 0x77, 0x5f, 0x77, 0x61, 0x74, 0x65, 0x72, + 0x6d, 0x61, 0x72, 0x6b, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x20, 0x6d, 0x61, 0x78, 0x43, + 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, + 0x4c, 0x6f, 0x77, 0x57, 0x61, 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, 0x6b, 0x12, 0x19, 0x0a, 0x08, + 0x6d, 0x69, 0x6e, 0x5f, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x07, + 0x6d, 0x69, 0x6e, 0x53, 0x69, 0x7a, 0x65, 0x12, 0x2a, 0x0a, 0x11, 0x66, 0x61, 0x6c, 0x6c, 0x62, + 0x61, 0x63, 0x6b, 0x5f, 0x74, 0x6f, 0x5f, 0x72, 0x65, 0x61, 0x64, 0x79, 0x18, 0x05, 0x20, 0x01, + 0x28, 0x08, 0x52, 0x0f, 0x66, 0x61, 0x6c, 0x6c, 0x62, 0x61, 0x63, 0x6b, 0x54, 0x6f, 0x52, 0x65, + 0x61, 0x64, 0x79, 0x12, 0x3a, 0x0a, 0x19, 0x75, 0x6e, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x69, 0x76, 0x65, 0x5f, 0x64, 0x65, 0x74, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x6d, 0x73, + 0x18, 0x06, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x17, 0x75, 0x6e, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x69, 0x76, 0x65, 0x44, 0x65, 0x74, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x73, 0x12, + 0x2d, 0x0a, 0x12, 0x75, 0x6e, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x69, 0x76, 0x65, 0x5f, + 0x63, 0x61, 0x6c, 0x6c, 0x73, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x11, 0x75, 0x6e, 0x72, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x69, 0x76, 0x65, 0x43, 0x61, 0x6c, 0x6c, 0x73, 0x12, 0x5a, + 0x0a, 0x12, 0x62, 0x69, 0x6e, 0x64, 0x5f, 0x70, 0x69, 0x63, 0x6b, 0x5f, 0x73, 0x74, 0x72, 0x61, + 0x74, 0x65, 0x67, 0x79, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x2c, 0x2e, 0x67, 0x72, 0x70, + 0x63, 0x2e, 0x67, 0x63, 0x70, 0x2e, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x50, 0x6f, 0x6f, + 0x6c, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x42, 0x69, 0x6e, 0x64, 0x50, 0x69, 0x63, 0x6b, + 0x53, 0x74, 0x72, 0x61, 0x74, 0x65, 0x67, 0x79, 0x52, 0x10, 0x62, 0x69, 0x6e, 0x64, 0x50, 0x69, + 0x63, 0x6b, 0x53, 0x74, 0x72, 0x61, 0x74, 0x65, 0x67, 0x79, 0x22, 0x4e, 0x0a, 0x10, 0x42, 0x69, + 0x6e, 0x64, 0x50, 0x69, 0x63, 0x6b, 0x53, 0x74, 0x72, 0x61, 0x74, 0x65, 0x67, 0x79, 0x12, 0x0f, + 0x0a, 0x0b, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, + 0x18, 0x0a, 0x14, 0x4c, 0x45, 0x41, 0x53, 0x54, 0x5f, 0x41, 0x43, 0x54, 0x49, 0x56, 0x45, 0x5f, + 0x53, 0x54, 0x52, 0x45, 0x41, 0x4d, 0x53, 0x10, 0x01, 0x12, 0x0f, 0x0a, 0x0b, 0x52, 0x4f, 0x55, + 0x4e, 0x44, 0x5f, 0x52, 0x4f, 0x42, 0x49, 0x4e, 0x10, 0x02, 0x22, 0x59, 0x0a, 0x0c, 0x4d, 0x65, + 0x74, 0x68, 0x6f, 0x64, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, + 0x6d, 0x65, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x35, + 0x0a, 0x08, 0x61, 0x66, 0x66, 0x69, 0x6e, 0x69, 0x74, 0x79, 0x18, 0xe9, 0x07, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x18, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x67, 0x63, 0x70, 0x2e, 0x41, 0x66, 0x66, + 0x69, 0x6e, 0x69, 0x74, 0x79, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x08, 0x61, 0x66, 0x66, + 0x69, 0x6e, 0x69, 0x74, 0x79, 0x22, 0x9b, 0x01, 0x0a, 0x0e, 0x41, 0x66, 0x66, 0x69, 0x6e, 0x69, + 0x74, 0x79, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x3a, 0x0a, 0x07, 0x63, 0x6f, 0x6d, 0x6d, + 0x61, 0x6e, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x20, 0x2e, 0x67, 0x72, 0x70, 0x63, + 0x2e, 0x67, 0x63, 0x70, 0x2e, 0x41, 0x66, 0x66, 0x69, 0x6e, 0x69, 0x74, 0x79, 0x43, 0x6f, 0x6e, + 0x66, 0x69, 0x67, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x07, 0x63, 0x6f, 0x6d, + 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x61, 0x66, 0x66, 0x69, 0x6e, 0x69, 0x74, 0x79, + 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x61, 0x66, 0x66, 0x69, + 0x6e, 0x69, 0x74, 0x79, 0x4b, 0x65, 0x79, 0x22, 0x2a, 0x0a, 0x07, 0x43, 0x6f, 0x6d, 0x6d, 0x61, + 0x6e, 0x64, 0x12, 0x09, 0x0a, 0x05, 0x42, 0x4f, 0x55, 0x4e, 0x44, 0x10, 0x00, 0x12, 0x08, 0x0a, + 0x04, 0x42, 0x49, 0x4e, 0x44, 0x10, 0x01, 0x12, 0x0a, 0x0a, 0x06, 0x55, 0x4e, 0x42, 0x49, 0x4e, + 0x44, 0x10, 0x02, 0x42, 0x0c, 0x5a, 0x0a, 0x2e, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x5f, 0x67, 0x63, + 0x70, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_grpc_gcp_proto_rawDescOnce sync.Once + file_grpc_gcp_proto_rawDescData = file_grpc_gcp_proto_rawDesc +) + +func file_grpc_gcp_proto_rawDescGZIP() []byte { + file_grpc_gcp_proto_rawDescOnce.Do(func() { + file_grpc_gcp_proto_rawDescData = protoimpl.X.CompressGZIP(file_grpc_gcp_proto_rawDescData) + }) + return file_grpc_gcp_proto_rawDescData +} + +var file_grpc_gcp_proto_enumTypes = make([]protoimpl.EnumInfo, 2) +var file_grpc_gcp_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_grpc_gcp_proto_goTypes = []interface{}{ + (ChannelPoolConfig_BindPickStrategy)(0), // 0: grpc.gcp.ChannelPoolConfig.BindPickStrategy + (AffinityConfig_Command)(0), // 1: grpc.gcp.AffinityConfig.Command + (*ApiConfig)(nil), // 2: grpc.gcp.ApiConfig + (*ChannelPoolConfig)(nil), // 3: grpc.gcp.ChannelPoolConfig + (*MethodConfig)(nil), // 4: grpc.gcp.MethodConfig + (*AffinityConfig)(nil), // 5: grpc.gcp.AffinityConfig +} +var file_grpc_gcp_proto_depIdxs = []int32{ + 3, // 0: grpc.gcp.ApiConfig.channel_pool:type_name -> grpc.gcp.ChannelPoolConfig + 4, // 1: grpc.gcp.ApiConfig.method:type_name -> grpc.gcp.MethodConfig + 0, // 2: grpc.gcp.ChannelPoolConfig.bind_pick_strategy:type_name -> grpc.gcp.ChannelPoolConfig.BindPickStrategy + 5, // 3: grpc.gcp.MethodConfig.affinity:type_name -> grpc.gcp.AffinityConfig + 1, // 4: grpc.gcp.AffinityConfig.command:type_name -> grpc.gcp.AffinityConfig.Command + 5, // [5:5] is the sub-list for method output_type + 5, // [5:5] is the sub-list for method input_type + 5, // [5:5] is the sub-list for extension type_name + 5, // [5:5] is the sub-list for extension extendee + 0, // [0:5] is the sub-list for field type_name +} + +func init() { file_grpc_gcp_proto_init() } +func file_grpc_gcp_proto_init() { + if File_grpc_gcp_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_grpc_gcp_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ApiConfig); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_grpc_gcp_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ChannelPoolConfig); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_grpc_gcp_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*MethodConfig); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_grpc_gcp_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*AffinityConfig); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_grpc_gcp_proto_rawDesc, + NumEnums: 2, + NumMessages: 4, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_grpc_gcp_proto_goTypes, + DependencyIndexes: file_grpc_gcp_proto_depIdxs, + EnumInfos: file_grpc_gcp_proto_enumTypes, + MessageInfos: file_grpc_gcp_proto_msgTypes, + }.Build() + File_grpc_gcp_proto = out.File + file_grpc_gcp_proto_rawDesc = nil + file_grpc_gcp_proto_goTypes = nil + file_grpc_gcp_proto_depIdxs = nil +} diff --git a/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp/grpc_gcp.proto b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp/grpc_gcp.proto new file mode 100644 index 000000000..47eb8b87d --- /dev/null +++ b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/grpc_gcp/grpc_gcp.proto @@ -0,0 +1,129 @@ +// Copyright 2018 gRPC authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +option go_package = "./grpc_gcp"; + +package grpc.gcp; + +message ApiConfig { + // The channel pool configurations. + ChannelPoolConfig channel_pool = 2; + + // The method configurations. + repeated MethodConfig method = 1001; +} + +// ChannelPoolConfig are options for configuring the channel pool. +// RPCs will be scheduled onto existing channels in the pool until all channels +// have <max_concurrent_streams_low_watermark> number of streams. At this point +// a new channel is spun out. Once <max_size> channels have been spun out and +// each has <max_concurrent_streams_low_watermark> streams, subsequent RPCs will +// hang until any of the in-flight RPCs is finished, freeing up a channel. +message ChannelPoolConfig { + // The max number of channels in the pool. + // Default value is 0, meaning 'unlimited' size. + uint32 max_size = 1; + + // The idle timeout (seconds) of channels without bound affinity sessions. + uint64 idle_timeout = 2; + + // The low watermark of max number of concurrent streams in a channel. + // New channel will be created once it get hit, until we reach the max size of the channel pool. + // Default value is 100. The valid range is [1, 100]. Any value outside the range will be ignored and the default value will be used. + // Note: It is not recommended that users adjust this value, since a single channel should generally have no trouble managing the default (maximum) number of streams. + uint32 max_concurrent_streams_low_watermark = 3; + + // The minimum number of channels in the pool. + uint32 min_size = 4; + + // If a channel mapped to an affinity key is not ready, temporarily fallback + // to another ready channel. + // Enabling this fallback is beneficial in scenarios with short RPC timeouts + // and rather slow connection establishing or during incidents when new + // connections fail but existing connections still operate. + bool fallback_to_ready = 5; + + // Enables per channel unresponsive connection detection if > 0 and unresponsive_calls > 0. + // If enabled and more than unresponsive_detection_ms passed since the last response from the server, + // and >= unresponsive_calls RPC calls (started after last response from the server) timed-out on the client side, + // then the connection of that channel will be gracefully refreshed. I.e., a new connection will be created for + // that channel and after the new connection is ready it will replace the old connection. The calls on the old + // connection will not be interrupted. The unresponsive_detection_ms will be doubled every consecutive refresh + // if no response from the server is received. + uint32 unresponsive_detection_ms = 6; + + // Enables per channel unresponsive connection detection if > 0 and unresponsive_detection_ms > 0. + // If enabled and more than unresponsive_detection_ms passed since the last response from the server, + // and >= unresponsive_calls RPC calls (started after last response from the server) timed-out on the client side, + // then the connection of that channel will be gracefully refreshed. I.e., a new connection will be created for + // that channel and after the new connection is ready it will replace the old connection. The calls on the old + // connection will not be interrupted. The unresponsive_detection_ms will be doubled every consecutive refresh + // if no response from the server is received. + uint32 unresponsive_calls = 7; + + // A selection of strategies for picking a channel for a call with BIND command. + enum BindPickStrategy { + // No preference -- picking a channel for a BIND call will be no different + // than for any other calls. + UNSPECIFIED = 0; + + // A channel with the least active streams at the moment of a BIND call + // initiation will be picked. + LEAST_ACTIVE_STREAMS = 1; + + // Cycle through channels created by the BIND call initiation. I. e. pick + // a channel in a round-robin manner. Note that some channels may be + // skipped during channel pool resize. + ROUND_ROBIN = 2; + } + + // The strategy for picking a channel for a call with BIND command. + BindPickStrategy bind_pick_strategy = 8; +} + +message MethodConfig { + // A fully qualified name of a gRPC method, or a wildcard pattern ending + // with .*, such as foo.bar.A, foo.bar.*. Method configs are evaluated + // sequentially, and the first one takes precedence. + repeated string name = 1; + + // The channel affinity configurations. + AffinityConfig affinity = 1001; +} + +message AffinityConfig { + enum Command { + // The annotated method will be required to be bound to an existing session + // to execute the RPC. The corresponding <affinity_key_field_path> will be + // used to find the affinity key from the request message. + BOUND = 0; + // The annotated method will establish the channel affinity with the + // channel which is used to execute the RPC. The corresponding + // <affinity_key_field_path> will be used to find the affinity key from the + // response message. + BIND = 1; + // The annotated method will remove the channel affinity with the + // channel which is used to execute the RPC. The corresponding + // <affinity_key_field_path> will be used to find the affinity key from the + // request message. + UNBIND = 2; + } + // The affinity command applies on the selected gRPC methods. + Command command = 2; + // The field path of the affinity key in the request/response message. + // For example: "f.a", "f.b.d", etc. + string affinity_key = 3; +} diff --git a/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/mockgen.sh b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/mockgen.sh new file mode 100644 index 000000000..ff4de3390 --- /dev/null +++ b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/mockgen.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env bash +cd "$(dirname "$0")" +mockgen -destination=mocks/mock_balancer.go -package=mocks google.golang.org/grpc/balancer ClientConn,SubConn +mockgen -destination=mocks/mock_stream.go -package=mocks google.golang.org/grpc ClientStream diff --git a/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/multiendpoint/endpoint.go b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/multiendpoint/endpoint.go new file mode 100644 index 000000000..2a9c52eba --- /dev/null +++ b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/multiendpoint/endpoint.go @@ -0,0 +1,54 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package multiendpoint + +import ( + "fmt" + "time" +) + +type status int + +// Status of an endpoint. +const ( + unavailable status = iota + available + recovering +) + +func (s status) String() string { + switch s { + case unavailable: + return "Unavailable" + case available: + return "Available" + case recovering: + return "Recovering" + default: + return fmt.Sprintf("%d", s) + } +} + +type endpoint struct { + id string + priority int + status status + lastChange time.Time + futureChange timerAlike +} diff --git a/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/multiendpoint/multiendpoint.go b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/multiendpoint/multiendpoint.go new file mode 100644 index 000000000..7e70a6ea5 --- /dev/null +++ b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/multiendpoint/multiendpoint.go @@ -0,0 +1,306 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package multiendpoint implements multiendpoint feature. See [MultiEndpoint] +package multiendpoint + +import ( + "errors" + "fmt" + "sync" + "time" +) + +type timerAlike interface { + Reset(time.Duration) bool + Stop() bool +} + +// To be redefined in tests. +var ( + timeNow = func() time.Time { + return time.Now() + } + timeAfterFunc = func(d time.Duration, f func()) timerAlike { + return time.AfterFunc(d, f) + } +) + +// MultiEndpoint holds a list of endpoints, tracks their availability and defines the current +// endpoint. An endpoint has a priority defined by its position in the list (first item has top +// priority). +// +// The current endpoint is the highest available endpoint in the list. If no endpoint is available, +// MultiEndpoint sticks to the previously current endpoint. +// +// Sometimes switching between endpoints can be costly, and it is worth waiting for some time +// after current endpoint becomes unavailable. For this case, use +// [MultiEndpointOptions.RecoveryTimeout] to set the recovery timeout. MultiEndpoint will keep the +// current endpoint for up to recovery timeout after it became unavailable to give it some time to +// recover. +// +// The list of endpoints can be changed at any time with [MultiEndpoint.SetEndpoints] function. +// MultiEndpoint will: +// - remove obsolete endpoints; +// - preserve remaining endpoints and their states; +// - add new endpoints; +// - update all endpoints priority according to the new order; +// - change current endpoint if necessary. +// +// After updating the list of endpoints, MultiEndpoint will switch the current endpoint to the +// highest available endpoint in the list. If you have many processes using MultiEndpoint, this may +// lead to immediate shift of all traffic which may be undesired. To smooth this transfer, use +// [MultiEndpointOptions.SwitchingDelay] with randomized value to introduce a jitter. Each +// MultiEndpoint will delay switching from an available endpoint to another endpoint for this amount +// of time. This delay is only applicable when switching from a lower priority available endpoint to +// a higher priority available endpoint. +type MultiEndpoint interface { + // Current returns current endpoint. + // + // Note that the read is not synchronized and in case of a race condition there is a chance of + // getting an outdated current endpoint. + Current() string + + // SetEndpointAvailability informs MultiEndpoint when an endpoint becomes available or unavailable. + // This may change the current endpoint. + SetEndpointAvailability(e string, avail bool) + + // SetEndpoints updates a list of endpoints: + // - remove obsolete endpoints + // - preserve remaining endpoints and their states + // - add new endpoints + // - update all endpoints priority according to the new order + // This may change the current endpoint. + SetEndpoints(endpoints []string) error +} + +// MultiEndpointOptions is used for configuring [MultiEndpoint]. +type MultiEndpointOptions struct { + // A list of endpoints ordered by priority (first endpoint has top priority). + Endpoints []string + // RecoveryTimeout sets the amount of time MultiEndpoint keeps endpoint as current after it + // became unavailable. + RecoveryTimeout time.Duration + // When switching from a lower priority available endpoint to a higher priority available + // endpoint the MultiEndpoint will delay the switch for this duration. + SwitchingDelay time.Duration +} + +// NewMultiEndpoint validates options and creates a new [MultiEndpoint]. +func NewMultiEndpoint(b *MultiEndpointOptions) (MultiEndpoint, error) { + if len(b.Endpoints) == 0 { + return nil, fmt.Errorf("endpoints list cannot be empty") + } + + me := &multiEndpoint{ + recoveryTimeout: b.RecoveryTimeout, + switchingDelay: b.SwitchingDelay, + current: b.Endpoints[0], + } + eMap := make(map[string]*endpoint) + for i, e := range b.Endpoints { + eMap[e] = me.newEndpoint(e, i) + } + me.endpoints = eMap + return me, nil +} + +type multiEndpoint struct { + sync.RWMutex + + endpoints map[string]*endpoint + recoveryTimeout time.Duration + switchingDelay time.Duration + current string + future string +} + +// Current returns current endpoint. +func (me *multiEndpoint) Current() string { + me.RLock() + defer me.RUnlock() + return me.current +} + +// SetEndpoints updates endpoints list: +// - remove obsolete endpoints; +// - preserve remaining endpoints and their states; +// - add new endpoints; +// - update all endpoints priority according to the new order; +// - change current endpoint if necessary. +func (me *multiEndpoint) SetEndpoints(endpoints []string) error { + me.Lock() + defer me.Unlock() + if len(endpoints) == 0 { + return errors.New("endpoints list cannot be empty") + } + newEndpoints := make(map[string]struct{}) + for _, v := range endpoints { + newEndpoints[v] = struct{}{} + } + // Remove obsolete endpoints. + for e := range me.endpoints { + if _, ok := newEndpoints[e]; !ok { + delete(me.endpoints, e) + } + } + // Add new endpoints and update priority. + for i, e := range endpoints { + if _, ok := me.endpoints[e]; !ok { + me.endpoints[e] = me.newEndpoint(e, i) + } else { + me.endpoints[e].priority = i + } + } + + me.maybeUpdateCurrent() + return nil +} + +// Updates current to the top-priority available endpoint unless the current endpoint is +// recovering. +// +// Must be run under me.Lock. +func (me *multiEndpoint) maybeUpdateCurrent() { + c, exists := me.endpoints[me.current] + var topA *endpoint + var top *endpoint + for _, e := range me.endpoints { + if e.status == available && (topA == nil || topA.priority > e.priority) { + topA = e + } + if top == nil || top.priority > e.priority { + top = e + } + } + + if exists && c.status == recovering && (topA == nil || topA.priority > c.priority) { + // Let current endpoint recover while no higher priority endpoints available. + return + } + + // Always prefer top available endpoint. + if topA != nil { + me.switchFromTo(c, topA) + return + } + + // If no current endpoint exists, resort to the top priority endpoint immediately. + if !exists { + me.current = top.id + } +} + +func (me *multiEndpoint) newEndpoint(id string, priority int) *endpoint { + s := unavailable + if me.recoveryTimeout > 0 { + s = recovering + } + e := &endpoint{ + id: id, + priority: priority, + status: s, + } + if e.status == recovering { + me.scheduleUnavailable(e) + } + return e +} + +// Changes or schedules a change of current to the endpoint t. +// +// Must be run under me.Lock. +func (me *multiEndpoint) switchFromTo(f, t *endpoint) { + if me.current == t.id { + return + } + + if me.switchingDelay == 0 || f == nil || f.status == unavailable { + // Switching immediately if no delay or no current or current is unavailable. + me.current = t.id + return + } + + me.future = t.id + timeAfterFunc(me.switchingDelay, func() { + me.Lock() + defer me.Unlock() + if e, ok := me.endpoints[me.future]; ok && e.status == available { + me.current = e.id + } + }) +} + +// SetEndpointAvailability updates the state of an endpoint. +func (me *multiEndpoint) SetEndpointAvailability(e string, avail bool) { + me.Lock() + defer me.Unlock() + me.setEndpointAvailability(e, avail) + me.maybeUpdateCurrent() +} + +// Must be run under me.Lock. +func (me *multiEndpoint) setEndpointAvailability(e string, avail bool) { + ee, ok := me.endpoints[e] + if !ok { + return + } + + if avail { + setState(ee, available) + return + } + + if ee.status != available { + return + } + + if me.recoveryTimeout == 0 { + setState(ee, unavailable) + return + } + + setState(ee, recovering) + me.scheduleUnavailable(ee) +} + +// Change the state of endpoint e to state s. +// +// Must be run under me.Lock. +func setState(e *endpoint, s status) { + if e.futureChange != nil { + e.futureChange.Stop() + } + e.status = s + e.lastChange = timeNow() +} + +// Schedule endpoint e to become unavailable after recoveryTimeout. +func (me *multiEndpoint) scheduleUnavailable(e *endpoint) { + stateChange := e.lastChange + e.futureChange = timeAfterFunc(me.recoveryTimeout, func() { + me.Lock() + defer me.Unlock() + if e.lastChange != stateChange { + // This timer is outdated. + return + } + setState(e, unavailable) + me.maybeUpdateCurrent() + }) +} diff --git a/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/test_config.json b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/test_config.json new file mode 100644 index 000000000..6291ae9cb --- /dev/null +++ b/vendor/github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp/test_config.json @@ -0,0 +1,29 @@ +{ + "channelPool": { + "maxSize": 10, + "maxConcurrentStreamsLowWatermark": 10 + }, + "method": [ + { + "name": [ "method1" ], + "affinity": { + "command": "BIND", + "affinityKey": "key1" + } + }, + { + "name": [ "method2" ], + "affinity": { + "command": "BOUND", + "affinityKey": "key2" + } + }, + { + "name": [ "method3" ], + "affinity": { + "command": "UNBIND", + "affinityKey": "key3" + } + } + ] +}
\ No newline at end of file |
