feat(spanner): add dynamic channel pool#14611
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a dynamic gRPC channel pool (DCP) for the Spanner client, allowing it to scale the number of gRPC channels dynamically based on RPC load. The implementation includes a new dynamicChannelPool type with support for power-of-two least-busy and round-robin selection strategies, along with background workers for scaling up (event-driven) and scaling down (periodic). The PR also refactors internal interfaces to use a requestIDHeaderProvider instead of concrete gRPC clients to accommodate the dynamic nature of the pool. Integration points are added in the session manager and client configuration to enable this opt-in feature. I have no feedback to provide as there were no review comments to assess.
| // environment variable has been set or client has passed the opt-in | ||
| // option in ClientConfig. | ||
| endToEndTracingEnvironmentVariable := os.Getenv("SPANNER_ENABLE_END_TO_END_TRACING") | ||
| if config.EnableEndToEndTracing || endToEndTracingEnvironmentVariable == "true" { |
There was a problem hiding this comment.
nit: to be consistent with how other env vars are being processed, this should also use a case-insensitive comparison
| if cfg.DCPPrimeMaxAttempts == 0 { | ||
| cfg.DCPPrimeMaxAttempts = def.DCPPrimeMaxAttempts | ||
| } | ||
| switch { |
There was a problem hiding this comment.
Can we add a check here for DCPScaleDownCheckInterval <= 0?
|
|
||
| dcpEnabled := config.DynamicChannelPoolConfig.DCPEnabled && gme == nil && !isExperimentalLocationAPIEnabledForConfig(config) && os.Getenv("SPANNER_EMULATOR_HOST") == "" | ||
| if dcpEnabled { | ||
| reqIDInjector := new(requestIDHeaderInjector) |
There was a problem hiding this comment.
The newClientWithConfig(..) method is getting very long. Can we separate out this block into a separate method for initializing DCP? And maybe also put the other large branch of this if statement in a separate method (the one for else if isFallbackEnabled && isDirectPathEnabled)
| dial := func(dialCtx context.Context) (gtransport.ConnPool, error) { | ||
| return gtransport.DialPool(dialCtx, allClientOpts(1, config.Compression, config.EnableDirectAccess, dcpOpts...)...) | ||
| } | ||
| dcp, err := newDynamicChannelPool(ctx, sc, config.DynamicChannelPoolConfig, 0, dial) |
There was a problem hiding this comment.
If I understand it correctly, the 0 here is for initial (as in: initial number of channels). But that is also already included in the config, and the initial argument does not appear to be used anywhere. Can we remove it?
| return | ||
| default: | ||
| } | ||
| p.dialMu.Lock() |
There was a problem hiding this comment.
This lock seems to be held during the entire scale-up, including creating the connection and priming the channel (including any retries of the priming). Would it be possible to release the lock at least during the priming?
| type dcpStreamRef struct { | ||
| once sync.Once | ||
| finish func(error) | ||
| closed chan struct{} |
There was a problem hiding this comment.
This does not appear to be used. Is it something that is needed in a follow-up PR? If not, can we remove it?
|
|
||
| type dcpStreamRef struct { | ||
| once sync.Once | ||
| finish func(error) |
There was a problem hiding this comment.
This does not appear to be used. Is it something that is needed in a follow-up PR? If not, can we remove it?
| cancel context.CancelFunc | ||
| sc *sessionClient | ||
| database string | ||
| disableRouteToLeader bool |
There was a problem hiding this comment.
I don't think you need this field. You can just read it from sessionClient.
| return ref | ||
| } | ||
|
|
||
| func (c *dcpSpannerClient) CreateSession(ctx context.Context, req *spannerpb.CreateSessionRequest, opts ...gax.CallOption) (*spannerpb.Session, error) { |
There was a problem hiding this comment.
Should we add a TODO / issue to investigate if we can refactor this into using something like a gRPC interceptor or some other more generic solution, than writing a wrapper for each RPC?
| return gsc.generateRequestIDHeaderInjector(), nil | ||
| } | ||
|
|
||
| func (c *dcpResolvingSpannerClient) CreateSession(ctx context.Context, req *spannerpb.CreateSessionRequest, opts ...gax.CallOption) (*spannerpb.Session, error) { |
There was a problem hiding this comment.
Here also in combination with the comment above (not for this pull request, but as a potential improvement in the future): use interceptors instead of repeating each RPC.

Split of #14604
Internal reference: go/go-dcp-design