Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions spanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,11 @@ func newClientWithConfig(ctx context.Context, database string, config ClientConf
sc.otConfig = otConfig
sc.metricsTracerFactory = metricsTracerFactory
sc.mu.Unlock()
if sc.dynamicPool != nil {
if err := registerDynamicChannelPoolOTMetrics(sc.dynamicPool); err != nil {
logf(config.Logger, "Error registering DCP metrics in OpenTelemetry: %v", err)
}
}

var locationRouter *locationRouter
var sharedLocationAwareState *locationAwareState
Expand Down
220 changes: 206 additions & 14 deletions spanner/dynamic_channel_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ import (

vkit "cloud.google.com/go/spanner/apiv1"
"cloud.google.com/go/spanner/apiv1/spannerpb"
"cloud.google.com/go/spanner/internal"
"github.com/googleapis/gax-go/v2"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
gtransport "google.golang.org/api/transport/grpc"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -204,8 +207,30 @@ type dynamicChannelPool struct {
primeSession atomic.Value // string

drainingCount atomic.Int64

otMetrics atomic.Pointer[dynamicChannelPoolOTMetrics]
otConfig atomic.Pointer[openTelemetryConfig]
otRegistration metric.Registration
}

// dynamicChannelPoolOTMetrics contains OTel-only DCP instruments.
type dynamicChannelPoolOTMetrics struct {
activeChannelCount metric.Int64ObservableGauge
drainingChannelCount metric.Int64ObservableGauge
channelUnaryLoad metric.Int64ObservableGauge
channelStreamLoad metric.Int64ObservableGauge
scaleUpCount metric.Int64Counter
scaleUpAddedChannels metric.Int64Counter
scaleDownCount metric.Int64Counter
scaleDownDrainingChannels metric.Int64Counter
drainWaitDuration metric.Int64Histogram
selectionCount metric.Int64Counter
primeSuccessCount metric.Int64Counter
primeFailureCount metric.Int64Counter
}

var attributeKeyDCPChannelSlot = attribute.Key("channel_slot")

// dcpEntry represents one logical DCP slot. In DirectPath fallback mode the
// entry pool is a wrapper containing one DirectPath channel and one CloudPath
// fallback channel.
Expand All @@ -216,6 +241,7 @@ type dcpEntry struct {
delegate spannerClient
client spannerClient
parent *dynamicChannelPool
otAttributes atomic.Pointer[[]attribute.KeyValue]
unaryLoad atomic.Int32
streamLoad atomic.Int32
errorCount atomic.Int64 // errors since process start; used for debug/diagnostics
Expand Down Expand Up @@ -319,6 +345,11 @@ func (p *dynamicChannelPool) Close() error {
p.stopOnce.Do(func() { p.cancel(); close(p.done) })
p.dialMu.Lock()
defer p.dialMu.Unlock()
if p.otRegistration != nil {
if err := p.otRegistration.Unregister(); err != nil {
logf(p.sc.logger, "spanner_dcp: failed to unregister OpenTelemetry metrics: %v", err)
}
}
entries := p.getEntries()
p.entries.Store(&[]*dcpEntry{})
var errs []error
Expand Down Expand Up @@ -413,6 +444,7 @@ func (p *dynamicChannelPool) newEntry(ctx context.Context, prime bool) (*dcpEntr
entryPool = &dcpFallbackSlot{id: id, direct: primary, cloud: fallback, state: p.fallbackState}
}
e := &dcpEntry{id: id, metricSlot: metricSlot, pool: entryPool, parent: p}
p.setEntryOTAttributes(e)
now := time.Now().UnixNano()
e.createdAt.Store(now)
e.lastActivity.Store(now)
Expand Down Expand Up @@ -1009,6 +1041,180 @@ func isDCPFallbackFailure(err error) bool {
return c == codes.Unavailable
}

func (p *dynamicChannelPool) recordScaleUp(added int) {
if added <= 0 {
return
}
m := p.otMetrics.Load()
if m == nil {
return
}
ctx := context.Background()
attr := p.dcpOTAttributes()
if m.scaleUpCount != nil {
m.scaleUpCount.Add(ctx, 1, metric.WithAttributes(attr...))
}
if m.scaleUpAddedChannels != nil {
m.scaleUpAddedChannels.Add(ctx, int64(added), metric.WithAttributes(attr...))
}
}

func (p *dynamicChannelPool) recordScaleDown(draining int) {
if draining <= 0 {
return
}
m := p.otMetrics.Load()
if m == nil {
return
}
ctx := context.Background()
attr := p.dcpOTAttributes()
if m.scaleDownCount != nil {
m.scaleDownCount.Add(ctx, 1, metric.WithAttributes(attr...))
}
if m.scaleDownDrainingChannels != nil {
m.scaleDownDrainingChannels.Add(ctx, int64(draining), metric.WithAttributes(attr...))
}
}

func (p *dynamicChannelPool) recordDrainWait(d time.Duration) {
if m := p.otMetrics.Load(); m != nil && m.drainWaitDuration != nil {
m.drainWaitDuration.Record(context.Background(), d.Milliseconds(), metric.WithAttributes(p.dcpOTAttributes()...))
}
}

func (p *dynamicChannelPool) recordSelection(ctx context.Context, e *dcpEntry) {
if m := p.otMetrics.Load(); m != nil && m.selectionCount != nil && e != nil {
m.selectionCount.Add(metricContext(ctx), 1, metric.WithAttributes(p.dcpOTAttributesForEntry(e)...))
}
}

func metricContext(ctx context.Context) context.Context {
if ctx == nil {
return context.Background()
}
return ctx
}

func (p *dynamicChannelPool) recordPrimeSuccess() {
if m := p.otMetrics.Load(); m != nil && m.primeSuccessCount != nil {
m.primeSuccessCount.Add(context.Background(), 1, metric.WithAttributes(p.dcpOTAttributes()...))
}
}

func (p *dynamicChannelPool) recordPrimeFailure() {
if m := p.otMetrics.Load(); m != nil && m.primeFailureCount != nil {
m.primeFailureCount.Add(context.Background(), 1, metric.WithAttributes(p.dcpOTAttributes()...))
}
}

func (p *dynamicChannelPool) dcpOTAttributes() []attribute.KeyValue {
otConfig := p.otConfig.Load()
if otConfig == nil || len(otConfig.attributeMap) == 0 {
return nil
}
attr := make([]attribute.KeyValue, len(otConfig.attributeMap))
copy(attr, otConfig.attributeMap)
return attr
}
Comment on lines +1111 to +1119
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current implementation of dcpOTAttributes returns nil if attributeMap is empty and performs an unnecessary shallow copy of the attributes. If OpenTelemetry is enabled, it should return the base attributes (even if empty) so that entry-specific attributes (like channel_slot) can be correctly appended in setEntryOTAttributes. Also, since the attributes are not modified by the internal callers, the copy can be avoided to improve efficiency.

func (p *dynamicChannelPool) dcpOTAttributes() []attribute.KeyValue {
	otConfig := p.otConfig.Load()
	if otConfig == nil {
		return nil
	}
	return otConfig.attributeMap
}
References
  1. In-place modification within a getter method is permissible if the data is immutable post-initialization (e.g., loaded from a configuration file), as it avoids unnecessary allocations for copies.


func (p *dynamicChannelPool) dcpOTAttributesForEntry(e *dcpEntry) []attribute.KeyValue {
if e == nil {
return p.dcpOTAttributes()
}
if ptr := e.otAttributes.Load(); ptr != nil {
return *ptr
}
return p.setEntryOTAttributes(e)
}

func (p *dynamicChannelPool) setEntryOTAttributes(e *dcpEntry) []attribute.KeyValue {
if e == nil {
return nil
}
attr := p.dcpOTAttributes()
if attr == nil {
return nil
}
attr = append(attr,
attributeKeyDCPChannelSlot.Int64(e.metricSlot),
)
e.otAttributes.Store(&attr)
return attr
}
Comment on lines +1131 to +1144
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There is a bug where the channel_slot attribute is not recorded if attributeMap is empty. This happens because p.dcpOTAttributes() returns nil when the map is empty, causing setEntryOTAttributes to return early. The check should be on whether OpenTelemetry is enabled (otConfig != nil), and it should always append the channel_slot attribute for entries. Additionally, the implementation should ensure it doesn't modify the original attributeMap by creating a new slice with sufficient capacity.

func (p *dynamicChannelPool) setEntryOTAttributes(e *dcpEntry) []attribute.KeyValue {
	if e == nil {
		return nil
	}
	otConfig := p.otConfig.Load()
	if otConfig == nil {
		return nil
	}
	attr := make([]attribute.KeyValue, 0, len(otConfig.attributeMap)+1)
	attr = append(attr, otConfig.attributeMap...)
	attr = append(attr, 
		attributeKeyDCPChannelSlot.Int64(e.metricSlot),
	)
	e.otAttributes.Store(&attr)
	return attr
}


func registerDynamicChannelPoolOTMetrics(p *dynamicChannelPool) error {
if p == nil || p.sc == nil {
return nil
}
p.sc.mu.Lock()
otConfig := p.sc.otConfig
p.sc.mu.Unlock()
if otConfig == nil || !otConfig.enabled {
return nil
}
p.otConfig.Store(otConfig)
for _, e := range p.getEntries() {
p.setEntryOTAttributes(e)
}
meter := otConfig.meterProvider.Meter(OtInstrumentationScope, metric.WithInstrumentationVersion(internal.Version))
m := &dynamicChannelPoolOTMetrics{}
var err error
if m.activeChannelCount, err = meter.Int64ObservableGauge(metricsPrefix+"dcp/active_channel_count", metric.WithDescription("Number of DCP channels currently active."), metric.WithUnit("1")); err != nil {
return err
}
if m.drainingChannelCount, err = meter.Int64ObservableGauge(metricsPrefix+"dcp/draining_channel_count", metric.WithDescription("Number of DCP channels currently draining."), metric.WithUnit("1")); err != nil {
return err
}
if m.channelUnaryLoad, err = meter.Int64ObservableGauge(metricsPrefix+"dcp/channel_unary_load", metric.WithDescription("DCP active unary RPC load per channel."), metric.WithUnit("1")); err != nil {
return err
}
if m.channelStreamLoad, err = meter.Int64ObservableGauge(metricsPrefix+"dcp/channel_stream_load", metric.WithDescription("DCP active streaming RPC load per channel."), metric.WithUnit("1")); err != nil {
return err
}
if m.scaleUpCount, err = meter.Int64Counter(metricsPrefix+"dcp/scale_up_count", metric.WithDescription("Number of DCP scale-up events."), metric.WithUnit("1")); err != nil {
return err
}
if m.scaleUpAddedChannels, err = meter.Int64Counter(metricsPrefix+"dcp/scale_up_added_channels", metric.WithDescription("Number of DCP channels added by scale-up."), metric.WithUnit("1")); err != nil {
return err
}
if m.scaleDownCount, err = meter.Int64Counter(metricsPrefix+"dcp/scale_down_count", metric.WithDescription("Number of DCP scale-down events."), metric.WithUnit("1")); err != nil {
return err
}
if m.scaleDownDrainingChannels, err = meter.Int64Counter(metricsPrefix+"dcp/scale_down_draining_channels", metric.WithDescription("Number of DCP channels marked draining by scale-down."), metric.WithUnit("1")); err != nil {
return err
}
if m.drainWaitDuration, err = meter.Int64Histogram(metricsPrefix+"dcp/drain_wait_duration", metric.WithDescription("Time DCP waits before closing a draining channel."), metric.WithUnit("ms")); err != nil {
return err
}
if m.selectionCount, err = meter.Int64Counter(metricsPrefix+"dcp/selection_count", metric.WithDescription("Number of DCP selections by channel."), metric.WithUnit("1")); err != nil {
return err
}
if m.primeSuccessCount, err = meter.Int64Counter(metricsPrefix+"dcp/prime_success_count", metric.WithDescription("Number of DCP channel priming successes."), metric.WithUnit("1")); err != nil {
return err
}
if m.primeFailureCount, err = meter.Int64Counter(metricsPrefix+"dcp/prime_failure_count", metric.WithDescription("Number of DCP channel priming failures."), metric.WithUnit("1")); err != nil {
return err
}
reg, err := meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error {
entries := p.getEntries()
for _, e := range entries {
entryAttr := metric.WithAttributes(p.dcpOTAttributesForEntry(e)...)
o.ObserveInt64(m.channelUnaryLoad, int64(e.unaryLoad.Load()), entryAttr)
o.ObserveInt64(m.channelStreamLoad, int64(e.streamLoad.Load()), entryAttr)
}
o.ObserveInt64(m.activeChannelCount, int64(len(entries)), metric.WithAttributes(p.dcpOTAttributes()...))
o.ObserveInt64(m.drainingChannelCount, p.drainingCount.Load(), metric.WithAttributes(p.dcpOTAttributes()...))
return nil
}, m.activeChannelCount, m.drainingChannelCount, m.channelUnaryLoad, m.channelStreamLoad)
if err != nil {
return err
}
p.otMetrics.Store(m)
p.otRegistration = reg
return nil
}

type dcpFallbackMonitoredStream struct {
grpc.ClientStream
once sync.Once
Expand All @@ -1033,20 +1239,6 @@ func (s *dcpFallbackMonitoredStream) CloseSend() error {
return s.ClientStream.CloseSend()
}

func (p *dynamicChannelPool) recordScaleUp(added int) {}

func (p *dynamicChannelPool) recordScaleDown(draining int) {}

func (p *dynamicChannelPool) recordDrainWait(d time.Duration) {}

func (p *dynamicChannelPool) recordSelection(ctx context.Context, e *dcpEntry) {}

func (p *dynamicChannelPool) recordErrorPenalty(ctx context.Context) {}

func (p *dynamicChannelPool) recordPrimeSuccess() {}

func (p *dynamicChannelPool) recordPrimeFailure() {}

type dcpSpannerClient struct {
entry *dcpEntry
delegate spannerClient
Expand Down
2 changes: 1 addition & 1 deletion spanner/test/opentelemetry/test/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
go.opentelemetry.io/otel v1.43.0
go.opentelemetry.io/otel/sdk v1.43.0
go.opentelemetry.io/otel/sdk/metric v1.43.0
golang.org/x/sync v0.20.0
google.golang.org/api v0.279.0
google.golang.org/grpc v1.81.1
google.golang.org/protobuf v1.36.11
Expand Down Expand Up @@ -53,7 +54,6 @@ require (
golang.org/x/crypto v0.50.0 // indirect
golang.org/x/net v0.53.0 // indirect
golang.org/x/oauth2 v0.36.0 // indirect
golang.org/x/sync v0.20.0 // indirect
golang.org/x/sys v0.43.0 // indirect
golang.org/x/text v0.36.0 // indirect
golang.org/x/time v0.15.0 // indirect
Expand Down
Loading