diff --git a/spanner/client.go b/spanner/client.go index 59fb9f5abee7..8eaca141fb39 100644 --- a/spanner/client.go +++ b/spanner/client.go @@ -698,6 +698,11 @@ func newClientWithConfig(ctx context.Context, database string, config ClientConf sc.otConfig = otConfig sc.metricsTracerFactory = metricsTracerFactory sc.mu.Unlock() + if sc.dynamicPool != nil { + if err := registerDynamicChannelPoolOTMetrics(sc.dynamicPool); err != nil { + logf(config.Logger, "Error registering DCP metrics in OpenTelemetry: %v", err) + } + } var locationRouter *locationRouter var sharedLocationAwareState *locationAwareState diff --git a/spanner/dynamic_channel_pool.go b/spanner/dynamic_channel_pool.go index 05e8cee2ed39..e6000b76cc39 100644 --- a/spanner/dynamic_channel_pool.go +++ b/spanner/dynamic_channel_pool.go @@ -28,7 +28,10 @@ import ( vkit "cloud.google.com/go/spanner/apiv1" "cloud.google.com/go/spanner/apiv1/spannerpb" + "cloud.google.com/go/spanner/internal" "github.com/googleapis/gax-go/v2" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" gtransport "google.golang.org/api/transport/grpc" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -204,8 +207,30 @@ type dynamicChannelPool struct { primeSession atomic.Value // string drainingCount atomic.Int64 + + otMetrics atomic.Pointer[dynamicChannelPoolOTMetrics] + otConfig atomic.Pointer[openTelemetryConfig] + otRegistration metric.Registration +} + +// dynamicChannelPoolOTMetrics contains OTel-only DCP instruments. +type dynamicChannelPoolOTMetrics struct { + activeChannelCount metric.Int64ObservableGauge + drainingChannelCount metric.Int64ObservableGauge + channelUnaryLoad metric.Int64ObservableGauge + channelStreamLoad metric.Int64ObservableGauge + scaleUpCount metric.Int64Counter + scaleUpAddedChannels metric.Int64Counter + scaleDownCount metric.Int64Counter + scaleDownDrainingChannels metric.Int64Counter + drainWaitDuration metric.Int64Histogram + selectionCount metric.Int64Counter + primeSuccessCount metric.Int64Counter + primeFailureCount metric.Int64Counter } +var attributeKeyDCPChannelSlot = attribute.Key("channel_slot") + // dcpEntry represents one logical DCP slot. In DirectPath fallback mode the // entry pool is a wrapper containing one DirectPath channel and one CloudPath // fallback channel. @@ -216,6 +241,7 @@ type dcpEntry struct { delegate spannerClient client spannerClient parent *dynamicChannelPool + otAttributes atomic.Pointer[[]attribute.KeyValue] unaryLoad atomic.Int32 streamLoad atomic.Int32 errorCount atomic.Int64 // errors since process start; used for debug/diagnostics @@ -319,6 +345,11 @@ func (p *dynamicChannelPool) Close() error { p.stopOnce.Do(func() { p.cancel(); close(p.done) }) p.dialMu.Lock() defer p.dialMu.Unlock() + if p.otRegistration != nil { + if err := p.otRegistration.Unregister(); err != nil { + logf(p.sc.logger, "spanner_dcp: failed to unregister OpenTelemetry metrics: %v", err) + } + } entries := p.getEntries() p.entries.Store(&[]*dcpEntry{}) var errs []error @@ -413,6 +444,7 @@ func (p *dynamicChannelPool) newEntry(ctx context.Context, prime bool) (*dcpEntr entryPool = &dcpFallbackSlot{id: id, direct: primary, cloud: fallback, state: p.fallbackState} } e := &dcpEntry{id: id, metricSlot: metricSlot, pool: entryPool, parent: p} + p.setEntryOTAttributes(e) now := time.Now().UnixNano() e.createdAt.Store(now) e.lastActivity.Store(now) @@ -1009,6 +1041,180 @@ func isDCPFallbackFailure(err error) bool { return c == codes.Unavailable } +func (p *dynamicChannelPool) recordScaleUp(added int) { + if added <= 0 { + return + } + m := p.otMetrics.Load() + if m == nil { + return + } + ctx := context.Background() + attr := p.dcpOTAttributes() + if m.scaleUpCount != nil { + m.scaleUpCount.Add(ctx, 1, metric.WithAttributes(attr...)) + } + if m.scaleUpAddedChannels != nil { + m.scaleUpAddedChannels.Add(ctx, int64(added), metric.WithAttributes(attr...)) + } +} + +func (p *dynamicChannelPool) recordScaleDown(draining int) { + if draining <= 0 { + return + } + m := p.otMetrics.Load() + if m == nil { + return + } + ctx := context.Background() + attr := p.dcpOTAttributes() + if m.scaleDownCount != nil { + m.scaleDownCount.Add(ctx, 1, metric.WithAttributes(attr...)) + } + if m.scaleDownDrainingChannels != nil { + m.scaleDownDrainingChannels.Add(ctx, int64(draining), metric.WithAttributes(attr...)) + } +} + +func (p *dynamicChannelPool) recordDrainWait(d time.Duration) { + if m := p.otMetrics.Load(); m != nil && m.drainWaitDuration != nil { + m.drainWaitDuration.Record(context.Background(), d.Milliseconds(), metric.WithAttributes(p.dcpOTAttributes()...)) + } +} + +func (p *dynamicChannelPool) recordSelection(ctx context.Context, e *dcpEntry) { + if m := p.otMetrics.Load(); m != nil && m.selectionCount != nil && e != nil { + m.selectionCount.Add(metricContext(ctx), 1, metric.WithAttributes(p.dcpOTAttributesForEntry(e)...)) + } +} + +func metricContext(ctx context.Context) context.Context { + if ctx == nil { + return context.Background() + } + return ctx +} + +func (p *dynamicChannelPool) recordPrimeSuccess() { + if m := p.otMetrics.Load(); m != nil && m.primeSuccessCount != nil { + m.primeSuccessCount.Add(context.Background(), 1, metric.WithAttributes(p.dcpOTAttributes()...)) + } +} + +func (p *dynamicChannelPool) recordPrimeFailure() { + if m := p.otMetrics.Load(); m != nil && m.primeFailureCount != nil { + m.primeFailureCount.Add(context.Background(), 1, metric.WithAttributes(p.dcpOTAttributes()...)) + } +} + +func (p *dynamicChannelPool) dcpOTAttributes() []attribute.KeyValue { + otConfig := p.otConfig.Load() + if otConfig == nil || len(otConfig.attributeMap) == 0 { + return nil + } + attr := make([]attribute.KeyValue, len(otConfig.attributeMap)) + copy(attr, otConfig.attributeMap) + return attr +} + +func (p *dynamicChannelPool) dcpOTAttributesForEntry(e *dcpEntry) []attribute.KeyValue { + if e == nil { + return p.dcpOTAttributes() + } + if ptr := e.otAttributes.Load(); ptr != nil { + return *ptr + } + return p.setEntryOTAttributes(e) +} + +func (p *dynamicChannelPool) setEntryOTAttributes(e *dcpEntry) []attribute.KeyValue { + if e == nil { + return nil + } + attr := p.dcpOTAttributes() + if attr == nil { + return nil + } + attr = append(attr, + attributeKeyDCPChannelSlot.Int64(e.metricSlot), + ) + e.otAttributes.Store(&attr) + return attr +} + +func registerDynamicChannelPoolOTMetrics(p *dynamicChannelPool) error { + if p == nil || p.sc == nil { + return nil + } + p.sc.mu.Lock() + otConfig := p.sc.otConfig + p.sc.mu.Unlock() + if otConfig == nil || !otConfig.enabled { + return nil + } + p.otConfig.Store(otConfig) + for _, e := range p.getEntries() { + p.setEntryOTAttributes(e) + } + meter := otConfig.meterProvider.Meter(OtInstrumentationScope, metric.WithInstrumentationVersion(internal.Version)) + m := &dynamicChannelPoolOTMetrics{} + var err error + if m.activeChannelCount, err = meter.Int64ObservableGauge(metricsPrefix+"dcp/active_channel_count", metric.WithDescription("Number of DCP channels currently active."), metric.WithUnit("1")); err != nil { + return err + } + if m.drainingChannelCount, err = meter.Int64ObservableGauge(metricsPrefix+"dcp/draining_channel_count", metric.WithDescription("Number of DCP channels currently draining."), metric.WithUnit("1")); err != nil { + return err + } + if m.channelUnaryLoad, err = meter.Int64ObservableGauge(metricsPrefix+"dcp/channel_unary_load", metric.WithDescription("DCP active unary RPC load per channel."), metric.WithUnit("1")); err != nil { + return err + } + if m.channelStreamLoad, err = meter.Int64ObservableGauge(metricsPrefix+"dcp/channel_stream_load", metric.WithDescription("DCP active streaming RPC load per channel."), metric.WithUnit("1")); err != nil { + return err + } + if m.scaleUpCount, err = meter.Int64Counter(metricsPrefix+"dcp/scale_up_count", metric.WithDescription("Number of DCP scale-up events."), metric.WithUnit("1")); err != nil { + return err + } + if m.scaleUpAddedChannels, err = meter.Int64Counter(metricsPrefix+"dcp/scale_up_added_channels", metric.WithDescription("Number of DCP channels added by scale-up."), metric.WithUnit("1")); err != nil { + return err + } + if m.scaleDownCount, err = meter.Int64Counter(metricsPrefix+"dcp/scale_down_count", metric.WithDescription("Number of DCP scale-down events."), metric.WithUnit("1")); err != nil { + return err + } + if m.scaleDownDrainingChannels, err = meter.Int64Counter(metricsPrefix+"dcp/scale_down_draining_channels", metric.WithDescription("Number of DCP channels marked draining by scale-down."), metric.WithUnit("1")); err != nil { + return err + } + if m.drainWaitDuration, err = meter.Int64Histogram(metricsPrefix+"dcp/drain_wait_duration", metric.WithDescription("Time DCP waits before closing a draining channel."), metric.WithUnit("ms")); err != nil { + return err + } + if m.selectionCount, err = meter.Int64Counter(metricsPrefix+"dcp/selection_count", metric.WithDescription("Number of DCP selections by channel."), metric.WithUnit("1")); err != nil { + return err + } + if m.primeSuccessCount, err = meter.Int64Counter(metricsPrefix+"dcp/prime_success_count", metric.WithDescription("Number of DCP channel priming successes."), metric.WithUnit("1")); err != nil { + return err + } + if m.primeFailureCount, err = meter.Int64Counter(metricsPrefix+"dcp/prime_failure_count", metric.WithDescription("Number of DCP channel priming failures."), metric.WithUnit("1")); err != nil { + return err + } + reg, err := meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error { + entries := p.getEntries() + for _, e := range entries { + entryAttr := metric.WithAttributes(p.dcpOTAttributesForEntry(e)...) + o.ObserveInt64(m.channelUnaryLoad, int64(e.unaryLoad.Load()), entryAttr) + o.ObserveInt64(m.channelStreamLoad, int64(e.streamLoad.Load()), entryAttr) + } + o.ObserveInt64(m.activeChannelCount, int64(len(entries)), metric.WithAttributes(p.dcpOTAttributes()...)) + o.ObserveInt64(m.drainingChannelCount, p.drainingCount.Load(), metric.WithAttributes(p.dcpOTAttributes()...)) + return nil + }, m.activeChannelCount, m.drainingChannelCount, m.channelUnaryLoad, m.channelStreamLoad) + if err != nil { + return err + } + p.otMetrics.Store(m) + p.otRegistration = reg + return nil +} + type dcpFallbackMonitoredStream struct { grpc.ClientStream once sync.Once @@ -1033,20 +1239,6 @@ func (s *dcpFallbackMonitoredStream) CloseSend() error { return s.ClientStream.CloseSend() } -func (p *dynamicChannelPool) recordScaleUp(added int) {} - -func (p *dynamicChannelPool) recordScaleDown(draining int) {} - -func (p *dynamicChannelPool) recordDrainWait(d time.Duration) {} - -func (p *dynamicChannelPool) recordSelection(ctx context.Context, e *dcpEntry) {} - -func (p *dynamicChannelPool) recordErrorPenalty(ctx context.Context) {} - -func (p *dynamicChannelPool) recordPrimeSuccess() {} - -func (p *dynamicChannelPool) recordPrimeFailure() {} - type dcpSpannerClient struct { entry *dcpEntry delegate spannerClient diff --git a/spanner/test/opentelemetry/test/go.mod b/spanner/test/opentelemetry/test/go.mod index 5de3b549545d..616a9657e3aa 100644 --- a/spanner/test/opentelemetry/test/go.mod +++ b/spanner/test/opentelemetry/test/go.mod @@ -13,6 +13,7 @@ require ( go.opentelemetry.io/otel v1.43.0 go.opentelemetry.io/otel/sdk v1.43.0 go.opentelemetry.io/otel/sdk/metric v1.43.0 + golang.org/x/sync v0.20.0 google.golang.org/api v0.279.0 google.golang.org/grpc v1.81.1 google.golang.org/protobuf v1.36.11 @@ -53,7 +54,6 @@ require ( golang.org/x/crypto v0.50.0 // indirect golang.org/x/net v0.53.0 // indirect golang.org/x/oauth2 v0.36.0 // indirect - golang.org/x/sync v0.20.0 // indirect golang.org/x/sys v0.43.0 // indirect golang.org/x/text v0.36.0 // indirect golang.org/x/time v0.15.0 // indirect diff --git a/spanner/test/opentelemetry/test/ot_metrics_test.go b/spanner/test/opentelemetry/test/ot_metrics_test.go index 118306b55127..c6ab1643abef 100644 --- a/spanner/test/opentelemetry/test/ot_metrics_test.go +++ b/spanner/test/opentelemetry/test/ot_metrics_test.go @@ -20,7 +20,9 @@ package test import ( "context" + "fmt" "testing" + "time" "cloud.google.com/go/spanner" "cloud.google.com/go/spanner/apiv1/spannerpb" @@ -29,6 +31,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + "golang.org/x/sync/errgroup" "google.golang.org/api/iterator" structpb "google.golang.org/protobuf/types/known/structpb" ) @@ -202,3 +205,136 @@ func validateOTMetric(ctx context.Context, t *testing.T, te *openTelemetryTestEx } metricdatatest.AssertEqual(t, expectedMetric, resourceMetrics.ScopeMetrics[0].Metrics[idx], metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) } + +func TestOTMetrics_DynamicChannelPoolMetrics(t *testing.T) { + ctx := context.Background() + te := newOpenTelemetryTestExporter(false, false) + t.Cleanup(func() { te.Unregister(ctx) }) + spanner.EnableOpenTelemetryMetrics() + + server, client, teardown := setupMockedTestServerWithConfig(t, spanner.ClientConfig{ + OpenTelemetryMeterProvider: te.mp, + DynamicChannelPoolConfig: spanner.DynamicChannelPoolConfig{ + DCPEnabled: true, + DCPInitialChannels: 1, + DCPMinChannels: 1, + DCPMaxChannels: 3, + DCPMaxRPCPerChannel: 1, + DCPMinRPCPerChannel: 0.5, + DCPScaleDownCheckInterval: 30 * time.Millisecond, + DCPScaleUpCooldown: time.Millisecond, + DCPDownscaleConsecutiveLowLoadChecks: 2, + DCPMaxScaleUpPercent: 100, + DCPMaxRemoveChannels: 2, + DCPDrainIdleGrace: time.Second, + DCPPrimeTimeout: time.Second, + DCPPrimeMaxAttempts: 3, + }, + }) + defer teardown() + putSelect1Result(t, server) + server.TestSpanner.PutExecutionTime(stestutil.MethodExecuteStreamingSql, stestutil.SimulatedExecutionTime{MinimumExecutionTime: 300 * time.Millisecond}) + + var g errgroup.Group + for i := 0; i < 3; i++ { + g.Go(func() error { + iter := client.Single().Query(ctx, spanner.NewStatement(stestutil.SelectSingerIDAlbumIDAlbumTitleFromAlbums)) + defer iter.Stop() + for { + _, err := iter.Next() + if err == iterator.Done { + return nil + } + if err != nil { + return err + } + } + }) + } + if err := g.Wait(); err != nil { + t.Fatalf("query workload failed: %v", err) + } + + var metrics []metricdata.Metrics + waitFor(t, func() error { + resourceMetrics, err := te.metrics(ctx) + if err != nil { + return err + } + if resourceMetrics == nil || len(resourceMetrics.ScopeMetrics) == 0 { + return fmt.Errorf("missing resource metrics") + } + metrics = resourceMetrics.ScopeMetrics[0].Metrics + for _, name := range []string{ + "spanner/dcp/active_channel_count", + "spanner/dcp/draining_channel_count", + "spanner/dcp/channel_unary_load", + "spanner/dcp/channel_stream_load", + "spanner/dcp/selection_count", + "spanner/dcp/scale_up_count", + "spanner/dcp/scale_up_added_channels", + "spanner/dcp/prime_success_count", + } { + if getMetricIndex(metrics, name) == -1 { + return fmt.Errorf("DCP metric %q not found", name) + } + } + for _, name := range []string{"spanner/dcp/selection_count", "spanner/dcp/scale_up_count", "spanner/dcp/scale_up_added_channels", "spanner/dcp/prime_success_count"} { + v, err := int64SumMetricValue(metrics, name) + if err != nil { + return err + } + if v == 0 { + return fmt.Errorf("DCP metric %q value = 0, want > 0", name) + } + } + return nil + }) + + selectionMetric := metrics[getMetricIndex(metrics, "spanner/dcp/selection_count")] + selectionData, ok := selectionMetric.Data.(metricdata.Sum[int64]) + if !ok { + t.Fatalf("selection_count data type = %T, want Sum[int64]", selectionMetric.Data) + } + if len(selectionData.DataPoints) == 0 || selectionData.DataPoints[0].Value == 0 { + t.Fatalf("selection_count datapoints = %+v, want non-zero", selectionData.DataPoints) + } + metricdatatest.AssertHasAttributes[metricdata.DataPoint[int64]](t, selectionData.DataPoints[0], getAttributes(client.ClientID())...) + if _, ok := selectionData.DataPoints[0].Attributes.Value(attribute.Key("channel_slot")); !ok { + t.Fatalf("selection_count datapoint missing channel_slot attribute: %+v", selectionData.DataPoints[0].Attributes) + } + +} + +func int64SumMetricValue(metrics []metricdata.Metrics, metricName string) (int64, error) { + idx := getMetricIndex(metrics, metricName) + if idx == -1 { + return 0, fmt.Errorf("metric %q not found", metricName) + } + data, ok := metrics[idx].Data.(metricdata.Sum[int64]) + if !ok { + return 0, fmt.Errorf("metric %q data type = %T, want Sum[int64]", metricName, metrics[idx].Data) + } + var total int64 + for _, dp := range data.DataPoints { + total += dp.Value + } + return total, nil +} + +func putSelect1Result(t *testing.T, server *stestutil.MockedSpannerInMemTestServer) { + t.Helper() + if err := server.TestSpanner.PutStatementResult("SELECT 1", &stestutil.StatementResult{ + Type: stestutil.StatementResultResultSet, + ResultSet: &spannerpb.ResultSet{ + Metadata: &spannerpb.ResultSetMetadata{ + RowType: &spannerpb.StructType{ + Fields: []*spannerpb.StructType_Field{{Name: "Col1", Type: &spannerpb.Type{Code: spannerpb.TypeCode_INT64}}}, + }, + }, + Rows: []*structpb.ListValue{{Values: []*structpb.Value{{Kind: &structpb.Value_StringValue{StringValue: "1"}}}}}, + }, + }); err != nil { + t.Fatalf("could not add SELECT 1 result: %v", err) + } +}