feat(bigtable): refactor channel_pool_factory as we need to maintain two channel factory one for session and another for unary#14652
Conversation
There was a problem hiding this comment.
Code Review
This pull request refactors the Bigtable client initialization by extracting the connection pool creation and lifecycle management into a new managedChannelPool struct and helper functions in channel_pool_factory.go. Feedback on the changes highlights a regression where the direct access dialer is unconditionally configured instead of respecting the allowDirectAccess configuration, and points out a leftover debug print statement that should be removed.
| directAccessDialerOptions := make([]option.ClientOption, len(o)) | ||
| copy(directAccessDialerOptions, o) | ||
| directAccessDialerOptions = append(directAccessDialerOptions, directPathOptions...) | ||
| directAccessDialerOptions = append(directAccessDialerOptions, internaloption.AllowHardBoundTokens("ALTS")) | ||
|
|
||
| directAccessDialer := func() (*btransport.BigtableConn, error) { | ||
| grpcConn, err := gtransport.Dial(ctx, directAccessDialerOptions...) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| return btransport.NewBigtableConn(grpcConn), nil | ||
| } | ||
|
|
||
| return btransport.NewBigtableChannelPool(ctx, | ||
| connPoolSize, | ||
| btopt.BigtableLoadBalancingStrategy(), | ||
| func() (*btransport.BigtableConn, error) { | ||
| grpcConn, err := gtransport.Dial(ctx, o...) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| return btransport.NewBigtableConn(grpcConn), nil | ||
| }, | ||
| clientCreationTimestamp, | ||
| btransport.WithInstanceName(fullInstanceName), | ||
| btransport.WithAppProfile(config.AppProfile), | ||
| btransport.WithFeatureFlagsMetadata(directAccessMD), | ||
| btransport.WithMetricsReporterConfig(btopt.DefaultMetricsReporterConfig()), | ||
| btransport.WithMeterProvider(metricsTracerFactory.otelMeterProvider), | ||
| btransport.WithDirectAccessFeatureFlagsMetadata(directAccessMD), | ||
| btransport.WithDirectAccessDialer(directAccessDialer), | ||
| ) |
There was a problem hiding this comment.
The direct access dialer is currently being set up and passed unconditionally. In the original implementation, this was only done if allowDirectAccess (i.e., isDirectAccessEnabled(config)) was true. We should preserve this behavior to avoid unnecessary connection setups and potential errors when direct access is disabled.
poolOpts := []btransport.BigtableChannelPoolOption{
btransport.WithInstanceName(fullInstanceName),
btransport.WithAppProfile(config.AppProfile),
btransport.WithFeatureFlagsMetadata(directAccessMD),
btransport.WithMetricsReporterConfig(btopt.DefaultMetricsReporterConfig()),
btransport.WithMeterProvider(metricsTracerFactory.otelMeterProvider),
btransport.WithDirectAccessFeatureFlagsMetadata(directAccessMD),
}
if isDirectAccessEnabled(config) {
directAccessDialerOptions := make([]option.ClientOption, len(o))
copy(directAccessDialerOptions, o)
directAccessDialerOptions = append(directAccessDialerOptions, directPathOptions...)
directAccessDialerOptions = append(directAccessDialerOptions, internaloption.AllowHardBoundTokens("ALTS"))
directAccessDialer := func() (*btransport.BigtableConn, error) {
grpcConn, err := gtransport.Dial(ctx, directAccessDialerOptions...)
if err != nil {
return nil, err
}
return btransport.NewBigtableConn(grpcConn), nil
}
poolOpts = append(poolOpts, btransport.WithDirectAccessDialer(directAccessDialer))
}
return btransport.NewBigtableChannelPool(ctx,
connPoolSize,
btopt.BigtableLoadBalancingStrategy(),
func() (*btransport.BigtableConn, error) {
grpcConn, err := gtransport.Dial(ctx, o...)
if err != nil {
return nil, err
}
return btransport.NewBigtableConn(grpcConn), nil
},
clientCreationTimestamp,
poolOpts...,
)There was a problem hiding this comment.
Same question here. How do we disable DP?
| directAccessMD metadata.MD, | ||
| clientCreationTimestamp time.Time, | ||
| ) (*btransport.BigtableChannelPool, error) { | ||
| fmt.Printf(">>> createBigtableChannelPool called for project=%s, instance=%s <<<\n", project, instance) |
| project, instance string, | ||
| config ClientConfig, | ||
| metricsTracerFactory *builtinMetricsTracerFactory, | ||
| o []option.ClientOption, |
There was a problem hiding this comment.
what's the difference between o and directPathOpetions? aren't they both option.ClientOption?
| m.pool = pool | ||
|
|
||
| // Validate dynamic config early if enabled | ||
| if !config.DisableDynamicChannelPool { |
There was a problem hiding this comment.
nit: can we rename this variable to EnableDynamicChannelPool? ! config.Disable feels harder to understand :/
There was a problem hiding this comment.
Oh, I guess this was added before..
| } | ||
|
|
||
| // connection recycler | ||
| if !config.DisableConnectionRecycler { |
There was a problem hiding this comment.
same nit here, can we rename it to EnableConnectionRecycler instead?
| directAccessDialerOptions := make([]option.ClientOption, len(o)) | ||
| copy(directAccessDialerOptions, o) | ||
| directAccessDialerOptions = append(directAccessDialerOptions, directPathOptions...) | ||
| directAccessDialerOptions = append(directAccessDialerOptions, internaloption.AllowHardBoundTokens("ALTS")) | ||
|
|
||
| directAccessDialer := func() (*btransport.BigtableConn, error) { | ||
| grpcConn, err := gtransport.Dial(ctx, directAccessDialerOptions...) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| return btransport.NewBigtableConn(grpcConn), nil | ||
| } | ||
|
|
||
| return btransport.NewBigtableChannelPool(ctx, | ||
| connPoolSize, | ||
| btopt.BigtableLoadBalancingStrategy(), | ||
| func() (*btransport.BigtableConn, error) { | ||
| grpcConn, err := gtransport.Dial(ctx, o...) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| return btransport.NewBigtableConn(grpcConn), nil | ||
| }, | ||
| clientCreationTimestamp, | ||
| btransport.WithInstanceName(fullInstanceName), | ||
| btransport.WithAppProfile(config.AppProfile), | ||
| btransport.WithFeatureFlagsMetadata(directAccessMD), | ||
| btransport.WithMetricsReporterConfig(btopt.DefaultMetricsReporterConfig()), | ||
| btransport.WithMeterProvider(metricsTracerFactory.otelMeterProvider), | ||
| btransport.WithDirectAccessFeatureFlagsMetadata(directAccessMD), | ||
| btransport.WithDirectAccessDialer(directAccessDialer), | ||
| ) |
There was a problem hiding this comment.
Same question here. How do we disable DP?
| clientCreationTimestamp, | ||
| btransport.WithInstanceName(fullInstanceName), | ||
| btransport.WithAppProfile(config.AppProfile), | ||
| btransport.WithFeatureFlagsMetadata(directAccessMD), |
There was a problem hiding this comment.
nit: should we rename directAccessMD to featureFlagsMD? directAccessMD suggests that it's always using direct access which may not be the case?
| btransport.WithFeatureFlagsMetadata(directAccessMD), | ||
| btransport.WithMetricsReporterConfig(btopt.DefaultMetricsReporterConfig()), | ||
| btransport.WithMeterProvider(metricsTracerFactory.otelMeterProvider), | ||
| btransport.WithDirectAccessFeatureFlagsMetadata(directAccessMD), |
There was a problem hiding this comment.
what's the difference between this and WithFeatureFlagsMetadata?
| }, | ||
| metricsTracerFactory, | ||
| o, | ||
| nil, |
There was a problem hiding this comment.
the direct access options in this test is nil, do we need to verify it's not going through DP or something?
| // Use the regular ConnPool | ||
| // For regular ConnPool the Direct Access is off by default so we need to check the env var again. | ||
| if enabled, _ := strconv.ParseBool(os.Getenv(directpathEnvVar)); enabled { | ||
| o = append(o, directPathOptions...) |
There was a problem hiding this comment.
Hmm, not sure if I understand the logic here. directpath options is appended to the options only when we're not enabling the bigtable channel pool? Should this if statement be outside of if !enableBigtableConnPool?
| config, | ||
| metricsTracerFactory, | ||
| o, | ||
| directPathOptions, |
There was a problem hiding this comment.
if directPathOptions is appended to o, why do we need to pass it in again?
No description provided.