Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion spanner/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R
nil,
t.setTimestamp,
t.release,
asGRPCSpannerClient(client),
requestIDHeaderProviderFromSpannerClient(client),
true,
false,
)
Expand Down
103 changes: 71 additions & 32 deletions spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ const (

// MinSessions for Experimental Host connection
experimentalHostMinSessions = 0

// DirectPath fallback policy used by both non-DCP grpc-gcp fallback and the
// DCP DirectPath/CloudPath wrapper.
directPathFallbackErrorRateThreshold = float32(1)
directPathFallbackMinFailedCalls = 1
directPathFallbackPeriod = time.Minute * 3
)

const (
Expand Down Expand Up @@ -285,6 +291,9 @@ type ClientConfig struct {
// SessionPoolConfig is the configuration for session pool.
SessionPoolConfig

// DynamicChannelPoolConfig is the opt-in configuration for dynamic gRPC channel pooling.
DynamicChannelPoolConfig DynamicChannelPoolConfig

// SessionLabels for the sessions created by this client.
// See https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#session
// for more info.
Expand Down Expand Up @@ -527,14 +536,69 @@ func newClientWithConfig(ctx context.Context, database string, config ClientConf

var pool gtransport.ConnPool
var endpointClientOpts []option.ClientOption
var sc *sessionClient

isFallbackEnabled := true
if val, ok := os.LookupEnv("GOOGLE_SPANNER_ENABLE_GCP_FALLBACK"); ok {
if b, err := strconv.ParseBool(val); err == nil {
isFallbackEnabled = b
}
}
if gme != nil {

// TODO(loite): Remove as the original map cannot be changed by the user
// anyways, and the client library is also not changing it.
// Make a copy of labels.
sessionLabels := make(map[string]string)
for k, v := range config.SessionLabels {
sessionLabels[k] = v
}

md := metadata.Pairs(resourcePrefixHeader, database)
if config.Compression == gzip.Name {
md.Append(requestsCompressionHeader, gzip.Name)
}
// Append end to end tracing header if SPANNER_ENABLE_END_TO_END_TRACING
// environment variable has been set or client has passed the opt-in
// option in ClientConfig.
endToEndTracingEnvironmentVariable := os.Getenv("SPANNER_ENABLE_END_TO_END_TRACING")
if config.EnableEndToEndTracing || endToEndTracingEnvironmentVariable == "true" {
md.Append(endToEndTracingHeader, "true")
}

if isAFEBuiltInMetricEnabled {
md.Append(afeMetricHeader, "true")
}
if config.BatchTimeout == 0 {
config.BatchTimeout = time.Minute
}

dcpEnabled := config.DynamicChannelPoolConfig.DCPEnabled && gme == nil && !isExperimentalLocationAPIEnabledForConfig(config) && os.Getenv("SPANNER_EMULATOR_HOST") == ""
if dcpEnabled {
reqIDInjector := new(requestIDHeaderInjector)
dcpOpts := append([]option.ClientOption{}, opts...)
dcpOpts = append(dcpOpts,
option.WithGRPCDialOption(grpc.WithChainStreamInterceptor(reqIDInjector.interceptStream)),
option.WithGRPCDialOption(grpc.WithChainUnaryInterceptor(reqIDInjector.interceptUnary)),
)
sc = newSessionClient(nil, database, config.UserAgent, sessionLabels, config.DatabaseRole, config.DisableRouteToLeader, md, config.BatchTimeout, config.Logger, config.CallOptions)
sc.metricsTracerFactory = metricsTracerFactory
dial := func(dialCtx context.Context) (gtransport.ConnPool, error) {
return gtransport.DialPool(dialCtx, allClientOpts(1, config.Compression, config.EnableDirectAccess, dcpOpts...)...)
}
var fallbackDial func(context.Context) (gtransport.ConnPool, error)
if isFallbackEnabled && isDirectPathEnabled {
fallbackDial = func(dialCtx context.Context) (gtransport.ConnPool, error) {
return gtransport.DialPool(dialCtx, append(allClientOpts(1, config.Compression, config.EnableDirectAccess, dcpOpts...), internaloption.EnableDirectPath(false))...)
}
}
dcp, err := newDynamicChannelPool(ctx, sc, config.DynamicChannelPoolConfig, 0, dial, fallbackDial)
if err != nil {
return nil, err
}
pool = dcp
sc.connPool = pool
sc.dynamicPool = dcp
} else if gme != nil {
// Use GCPMultiEndpoint if provided.
pool = &gmeWrapper{gme}
endpointClientOpts = append(endpointClientOpts, opts...)
Expand Down Expand Up @@ -568,9 +632,9 @@ func newClientWithConfig(ctx context.Context, database string, config ClientConf

fbOpts := grpcgcp.NewGCPFallbackOptions()
fbOpts.EnableFallback = true
fbOpts.ErrorRateThreshold = 1
fbOpts.MinFailedCalls = 1
fbOpts.Period = time.Minute * 3
fbOpts.ErrorRateThreshold = directPathFallbackErrorRateThreshold
fbOpts.MinFailedCalls = directPathFallbackMinFailedCalls
fbOpts.Period = directPathFallbackPeriod

if metricsTracerFactory != nil && metricsTracerFactory.meterProvider != nil {
fbOpts.MeterProvider = metricsTracerFactory.meterProvider
Expand Down Expand Up @@ -608,45 +672,20 @@ func newClientWithConfig(ctx context.Context, database string, config ClientConf
}
}

// TODO(loite): Remove as the original map cannot be changed by the user
// anyways, and the client library is also not changing it.
// Make a copy of labels.
sessionLabels := make(map[string]string)
for k, v := range config.SessionLabels {
sessionLabels[k] = v
}

// Default configs for session pool.
if config.MaxOpened == 0 {
config.MaxOpened = uint64(pool.Num() * 100)
}
if config.MaxBurst == 0 {
config.MaxBurst = DefaultSessionPoolConfig.MaxBurst
}
if config.BatchTimeout == 0 {
config.BatchTimeout = time.Minute
}

md := metadata.Pairs(resourcePrefixHeader, database)
if config.Compression == gzip.Name {
md.Append(requestsCompressionHeader, gzip.Name)
}
// Append end to end tracing header if SPANNER_ENABLE_END_TO_END_TRACING
// environment variable has been set or client has passed the opt-in
// option in ClientConfig.
endToEndTracingEnvironmentVariable := os.Getenv("SPANNER_ENABLE_END_TO_END_TRACING")
if config.EnableEndToEndTracing || endToEndTracingEnvironmentVariable == "true" {
md.Append(endToEndTracingHeader, "true")
}

if isAFEBuiltInMetricEnabled {
md.Append(afeMetricHeader, "true")
}

// Multiplexed sessions are always enabled as the session pool has been removed.

// Create a session client.
sc := newSessionClient(pool, database, config.UserAgent, sessionLabels, config.DatabaseRole, config.DisableRouteToLeader, md, config.BatchTimeout, config.Logger, config.CallOptions)
if sc == nil {
sc = newSessionClient(pool, database, config.UserAgent, sessionLabels, config.DatabaseRole, config.DisableRouteToLeader, md, config.BatchTimeout, config.Logger, config.CallOptions)
}

// Create an OpenTelemetry configuration
otConfig, err := createOpenTelemetryConfig(ctx, config.OpenTelemetryMeterProvider, config.Logger, sc.id, database)
Expand Down
Loading