diff --git a/bigtable/apply_bulk.go b/bigtable/apply_bulk.go index 10f1ca15ef9f..38f671da3c0c 100644 --- a/bigtable/apply_bulk.go +++ b/bigtable/apply_bulk.go @@ -97,8 +97,9 @@ func (t *Table) applyGroup(ctx context.Context, group []*entryErr, opts ...Apply attrMap := make(map[string]interface{}) mt := t.newBuiltinMetricsTracer(ctx, true) defer mt.recordOperationCompletion() + ctx = contextWithMetricsTracer(ctx, mt) - err = gaxInvokeWithRecorder(ctx, mt, "MutateRows", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error { + err = gaxInvokeWithRecorder(ctx, "MutateRows", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error { attrMap["rowCount"] = len(group) trace.TracePrintf(ctx, attrMap, "Row count in ApplyBulk") err := t.doApplyBulk(ctx, group, headerMD, trailerMD, opts...) diff --git a/bigtable/bigtable.go b/bigtable/bigtable.go index 3b7585f12a38..a0bca4e2f3d5 100644 --- a/bigtable/bigtable.go +++ b/bigtable/bigtable.go @@ -229,14 +229,16 @@ func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts mt := t.newBuiltinMetricsTracer(ctx, true) defer mt.recordOperationCompletion() + ctx = contextWithMetricsTracer(ctx, mt) - err = t.readRows(ctx, arg, f, mt, opts...) + err = t.readRows(ctx, arg, f, opts...) statusCode, statusErr := convertToGrpcStatusErr(err) mt.setCurrOpStatus(statusCode) return statusErr } -func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt *builtinMetricsTracer, opts ...ReadOption) (err error) { +func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, opts ...ReadOption) (err error) { + mt := metricsTracerFromContext(ctx) var prevRowKey string attrMap := make(map[string]interface{}) @@ -254,7 +256,7 @@ func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt * } firstResponseRecorded := false - err = gaxInvokeWithRecorder(ctx, mt, methodNameReadRows, func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error { + err = gaxInvokeWithRecorder(ctx, methodNameReadRows, func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error { if rowLimitSet && numRowsRead >= intialRowLimit { return nil } @@ -910,14 +912,15 @@ func (t *Table) Apply(ctx context.Context, row string, m *Mutation, opts ...Appl defer func() { trace.EndSpan(ctx, err) }() mt := t.newBuiltinMetricsTracer(ctx, false) defer mt.recordOperationCompletion() + ctx = contextWithMetricsTracer(ctx, mt) - err = t.apply(ctx, mt, row, m, opts...) + err = t.apply(ctx, row, m, opts...) statusCode, statusErr := convertToGrpcStatusErr(err) mt.setCurrOpStatus(statusCode) return statusErr } -func (t *Table) apply(ctx context.Context, mt *builtinMetricsTracer, row string, m *Mutation, opts ...ApplyOption) (err error) { +func (t *Table) apply(ctx context.Context, row string, m *Mutation, opts ...ApplyOption) (err error) { after := func(res proto.Message) { for _, o := range opts { o.after(res) @@ -940,7 +943,7 @@ func (t *Table) apply(ctx context.Context, mt *builtinMetricsTracer, row string, callOptions = append(callOptions, t.c.retryOption) } var res *btpb.MutateRowResponse - err := gaxInvokeWithRecorder(ctx, mt, "MutateRow", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error { + err := gaxInvokeWithRecorder(ctx, "MutateRow", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error { var err error res, err = t.c.client.MutateRow(ctx, req, grpc.Header(headerMD), grpc.Trailer(trailerMD)) return err @@ -976,7 +979,7 @@ func (t *Table) apply(ctx context.Context, mt *builtinMetricsTracer, row string, req.FalseMutations = m.mfalse.ops } var cmRes *btpb.CheckAndMutateRowResponse - err = gaxInvokeWithRecorder(ctx, mt, "CheckAndMutateRow", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error { + err = gaxInvokeWithRecorder(ctx, "CheckAndMutateRow", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error { var err error cmRes, err = t.c.client.CheckAndMutateRow(ctx, req, grpc.Header(headerMD), grpc.Trailer(trailerMD)) return err @@ -1139,10 +1142,11 @@ func (ts Timestamp) TruncateToMilliseconds() Timestamp { // - does not return errors seen while recording the metrics // // - then, calls gax.Invoke with 'callWrapper' as an argument -func gaxInvokeWithRecorder(ctx context.Context, mt *builtinMetricsTracer, method string, +func gaxInvokeWithRecorder(ctx context.Context, method string, f func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error, opts ...gax.CallOption) error { attemptHeaderMD := metadata.New(nil) attempTrailerMD := metadata.New(nil) + mt := metricsTracerFromContext(ctx) mt.setMethod(method) callWrapper := func(ctx context.Context, callSettings gax.CallSettings) error { @@ -1157,7 +1161,6 @@ func gaxInvokeWithRecorder(ctx context.Context, mt *builtinMetricsTracer, method finalMD := metadata.Join(existingMD, md) newCtx := metadata.NewOutgoingContext(ctx, finalMD) - mt.recordAttemptStart() blockTracker := &blockingLatencyTracker{} mt.currOp.currAttempt.blockingLatencyTracker = blockTracker newCtx = context.WithValue(newCtx, statsContextKey, blockTracker) @@ -1168,9 +1171,6 @@ func gaxInvokeWithRecorder(ctx context.Context, mt *builtinMetricsTracer, method // f makes calls to CBT service err := f(newCtx, &attemptHeaderMD, &attempTrailerMD, callSettings) - // Record attempt specific metrics - mt.recordAttemptCompletion(attemptHeaderMD, attempTrailerMD, err) - extractCookies(attemptHeaderMD, op) extractCookies(attempTrailerMD, op) return err diff --git a/bigtable/client.go b/bigtable/client.go index 34a44edcdaa0..6ba8424dc383 100644 --- a/bigtable/client.go +++ b/bigtable/client.go @@ -121,8 +121,11 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C } } - // Add gRPC client interceptors to supply Google client information. No external interceptors are passed. - o = append(o, btopt.ClientInterceptorOptions(nil, nil)...) + // Add gRPC client interceptors to supply Google client information and client-side metrics. + o = append(o, btopt.ClientInterceptorOptions( + []grpc.StreamClientInterceptor{metricsStreamClientInterceptor()}, + []grpc.UnaryClientInterceptor{metricsUnaryClientInterceptor()}, + )...) o = append(o, option.WithGRPCDialOption(grpc.WithStatsHandler(sharedLatencyStatsHandler))) // Default to a small connection pool that can be overridden. o = append(o, @@ -380,26 +383,26 @@ func (c *Client) PingAndWarm(ctx context.Context) (err error) { defer func() { trace.EndSpan(ctx, err) }() mt := c.newBuiltinMetricsTracer(ctx, "", false) defer mt.recordOperationCompletion() + ctx = contextWithMetricsTracer(ctx, mt) - err = c.pingerWithMetadata(ctx, mt) + err = c.pingerWithMetadata(ctx) statusCode, statusErr := convertToGrpcStatusErr(err) mt.currOp.setStatus(statusCode.String()) return statusErr } -func (c *Client) pingerWithMetadata(ctx context.Context, mt *builtinMetricsTracer) (err error) { +func (c *Client) pingerWithMetadata(ctx context.Context) (err error) { req := &btpb.PingAndWarmRequest{ Name: c.fullInstanceName(), AppProfileId: c.appProfile, } - err = gaxInvokeWithRecorder(ctx, mt, "PingAndWarm", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error { + err = gaxInvokeWithRecorder(ctx, "PingAndWarm", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error { var err error _, err = c.client.PingAndWarm(ctx, req, grpc.Header(headerMD), grpc.Trailer(trailerMD)) return err }) return err - } func (c *Client) newBuiltinMetricsTracer(ctx context.Context, table string, isStreaming bool) *builtinMetricsTracer { diff --git a/bigtable/metrics.go b/bigtable/metrics.go index 0554bdc8ebfa..05848d7052eb 100644 --- a/bigtable/metrics.go +++ b/bigtable/metrics.go @@ -34,6 +34,7 @@ import ( "go.opentelemetry.io/otel/metric" sdkmetric "go.opentelemetry.io/otel/sdk/metric" "google.golang.org/api/option" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/stats" @@ -84,10 +85,27 @@ const ( type contextKey string const ( - statsContextKey contextKey = "bigtable/clientBlockingLatencyTracker" - t4t7ContextKey contextKey = "bigtable/t4t7Tracker" + statsContextKey contextKey = "bigtable/clientBlockingLatencyTracker" + t4t7ContextKey contextKey = "bigtable/t4t7Tracker" + metricsTracerContextKey contextKey = "bigtable/metricsTracer" ) +func contextWithMetricsTracer(ctx context.Context, mt *builtinMetricsTracer) context.Context { + return context.WithValue(ctx, metricsTracerContextKey, mt) +} + +func metricsTracerFromContext(ctx context.Context) *builtinMetricsTracer { + if mt, ok := ctx.Value(metricsTracerContextKey).(*builtinMetricsTracer); ok { + return mt + } + return &builtinMetricsTracer{ + builtInEnabled: false, + currOp: opTracer{ + cookies: make(map[string]string), + }, + } +} + // These are effectively constant, but for testing purposes they are mutable var ( // duration between two metric exports @@ -478,6 +496,10 @@ type opTracer struct { // For routing cookie and gRPC attempt number cookies map[string]string + + // Last known location details across all attempts + lastClusterID string + lastZoneID string } func (o *opTracer) setStartTime(t time.Time) { @@ -519,6 +541,9 @@ type attemptTracer struct { // Tracker for client blocking latency blockingLatencyTracker *blockingLatencyTracker + // Client blocking latency in ms + clientBlockingLatency float64 + // Tracker for t4t7 t4t7Tracker *t4t7Tracker } @@ -593,6 +618,23 @@ func (mt *builtinMetricsTracer) setMethod(m string) { // to OpenTelemetry attributes format, // - combines these with common client attributes and returns func (mt *builtinMetricsTracer) toOtelMetricAttrs(metricName string) (attribute.Set, error) { + // Get metric details + mDetails, found := metricsDetails[metricName] + if !found { + return attribute.Set{}, fmt.Errorf("unable to create attributes list for unknown metric: %v", metricName) + } + + clusterID := mt.currOp.currAttempt.clusterID + zoneID := mt.currOp.currAttempt.zoneID + status := mt.currOp.status + + if mDetails.recordedPerAttempt { + status = mt.currOp.currAttempt.status + } else { + clusterID = fallbackString(clusterID, mt.currOp.lastClusterID) + zoneID = fallbackString(zoneID, mt.currOp.lastZoneID) + } + attrKeyValues := make([]attribute.KeyValue, 0, maxAttrsLen) // Create attribute key value pairs for attributes common to all metricss attrKeyValues = append(attrKeyValues, @@ -603,24 +645,11 @@ func (mt *builtinMetricsTracer) toOtelMetricAttrs(metricName string) (attribute. // will not add them to Google Cloud Monitoring metric labels attribute.String(monitoredResLabelKeyTable, mt.tableName), - // Irrespective of whether metric is attempt specific or operation specific, - // use last attempt's cluster and zone - attribute.String(monitoredResLabelKeyCluster, mt.currOp.currAttempt.clusterID), - attribute.String(monitoredResLabelKeyZone, mt.currOp.currAttempt.zoneID), + attribute.String(monitoredResLabelKeyCluster, clusterID), + attribute.String(monitoredResLabelKeyZone, zoneID), ) attrKeyValues = append(attrKeyValues, mt.clientAttributes...) - // Get metric details - mDetails, found := metricsDetails[metricName] - if !found { - return attribute.Set{}, fmt.Errorf("unable to create attributes list for unknown metric: %v", metricName) - } - - status := mt.currOp.status - if mDetails.recordedPerAttempt { - status = mt.currOp.currAttempt.status - } - // Add additional attributes to metrics for _, attrKey := range mDetails.additionalAttrs { switch attrKey { @@ -651,6 +680,38 @@ func (mt *builtinMetricsTracer) recordAttemptStart() { mt.currOp.currAttempt.setStartTime(time.Now()) } +// recordAttemptCompletionWithMetadata extracts location, server latency (with t4t7 fallback), +// and client blocking latency from headers, trailers, and active trackers, saves them to +// the current attempt tracer, and then records the attempt metrics. +func (mt *builtinMetricsTracer) recordAttemptCompletionWithMetadata(attemptHeaderMD, attempTrailerMD metadata.MD, err error) { + if !mt.builtInEnabled { + return + } + + // 1. Calculate client blocking latency + if mt.currOp.currAttempt.blockingLatencyTracker != nil { + messageSentNanos := mt.currOp.currAttempt.blockingLatencyTracker.getMessageSentNanos() + if messageSentNanos > 0 { + mt.currOp.currAttempt.clientBlockingLatency = convertToMs(time.Unix(0, messageSentNanos).Sub(mt.currOp.currAttempt.startTime)) + } + } + + // 2. Extract server latency and apply t4t7 fallback + serverLatency, serverLatencyErr := extractServerLatency(attemptHeaderMD, attempTrailerMD) + if serverLatency == 0 && mt.currOp.currAttempt.t4t7Tracker != nil { + fallbackLatency := mt.currOp.currAttempt.t4t7Tracker.getLatencyMs() + if fallbackLatency > 0 { + serverLatency = fallbackLatency + serverLatencyErr = nil + } + } + mt.currOp.currAttempt.serverLatency = serverLatency + mt.currOp.currAttempt.serverLatencyErr = serverLatencyErr + + // 3. Call recordAttemptCompletion + mt.recordAttemptCompletion(attemptHeaderMD, attempTrailerMD, err) +} + // recordAttemptCompletion records as many attempt specific metrics as it can // Ignore errors seen while creating metric attributes since metric can still // be recorded with rest of the attributes @@ -666,27 +727,16 @@ func (mt *builtinMetricsTracer) recordAttemptCompletion(attemptHeaderMD, attempT // Get location attributes from metadata and set it in tracer // Ignore get location error since the metric can still be recorded with rest of the attributes clusterID, zoneID, _ := extractLocation(attemptHeaderMD, attempTrailerMD) - mt.currOp.currAttempt.setClusterID(clusterID) - mt.currOp.currAttempt.setZoneID(zoneID) - - // Set server latency in tracer - // FYI this is GFE t4t7(not server latency) latency where - // it measures the time between initial metadata send (client) - initial metadata recv(server) - serverLatency, serverLatencyErr := extractServerLatency(attemptHeaderMD, attempTrailerMD) - // If server latency is missing (0), fallback to the client-measured t4t7 latency - // t4t7 for directpath measures the time between the OutHeaders and InHeaders - // t4t7 for cloudpath measures the time between gfe receives the initial metadata of req - // and gfe sends the initial metadata of response to client - if serverLatency == 0 && mt.currOp.currAttempt.t4t7Tracker != nil { - fallbackLatency := mt.currOp.currAttempt.t4t7Tracker.getLatencyMs() - if fallbackLatency > 0 { - serverLatency = fallbackLatency - serverLatencyErr = nil - } + if clusterID != "" { + mt.currOp.currAttempt.setClusterID(clusterID) + mt.currOp.lastClusterID = clusterID } + if zoneID != "" { + mt.currOp.currAttempt.setZoneID(zoneID) + mt.currOp.lastZoneID = zoneID + } + - mt.currOp.currAttempt.setServerLatencyErr(serverLatencyErr) - mt.currOp.currAttempt.setServerLatency(serverLatency) // Calculate elapsed time elapsedTime := convertToMs(time.Since(mt.currOp.currAttempt.startTime)) @@ -696,13 +746,8 @@ func (mt *builtinMetricsTracer) recordAttemptCompletion(attemptHeaderMD, attempT mt.instrumentAttemptLatencies.Record(mt.ctx, elapsedTime, metric.WithAttributeSet(attemptLatAttrs)) // Record client_blocking_latencies - var clientBlockingLatencyMs float64 - if mt.currOp.currAttempt.blockingLatencyTracker != nil { - messageSentNanos := mt.currOp.currAttempt.blockingLatencyTracker.getMessageSentNanos() - clientBlockingLatencyMs = convertToMs(time.Unix(0, int64(messageSentNanos)).Sub(mt.currOp.currAttempt.startTime)) - } clientBlockingLatAttrs, _ := mt.toOtelMetricAttrs(metricNameClientBlockingLatencies) - mt.instrumentClientBlockingLatencies.Record(mt.ctx, clientBlockingLatencyMs, metric.WithAttributeSet(clientBlockingLatAttrs)) + mt.instrumentClientBlockingLatencies.Record(mt.ctx, mt.currOp.currAttempt.clientBlockingLatency, metric.WithAttributeSet(clientBlockingLatAttrs)) // Record server_latencies serverLatAttrs, _ := mt.toOtelMetricAttrs(metricNameServerLatencies) @@ -782,6 +827,18 @@ func (mt *builtinMetricsTracer) incrementAppBlockingLatency(latency float64) { mt.currOp.incrementAppBlockingLatency(latency) } +// recordAttemptClientBlockingLatency records the client blocking latency for the current attempt. +// It is measured per attempt as the duration between when the attempt started preparing and when it was executed or dispatched. +func (mt *builtinMetricsTracer) recordAttemptClientBlockingLatency() { + if !mt.builtInEnabled { + return + } + startTime := mt.currOp.currAttempt.startTime + if !startTime.IsZero() { + mt.currOp.currAttempt.clientBlockingLatency = convertToMs(time.Since(startTime)) + } +} + // blockingLatencyTracker is used to calculate the time between stream creation and the first message send. type blockingLatencyTracker struct { endNanos atomic.Int64 @@ -858,3 +915,99 @@ func (h *latencyStatsHandler) TagConn(ctx context.Context, info *stats.ConnTagIn } func (h *latencyStatsHandler) HandleConn(context.Context, stats.ConnStats) {} + +func metricsUnaryClientInterceptor() grpc.UnaryClientInterceptor { + return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + mt := metricsTracerFromContext(ctx) + if !mt.builtInEnabled { + return invoker(ctx, method, req, reply, cc, opts...) + } + + parts := strings.Split(method, "/") + shortMethod := parts[len(parts)-1] + mt.setMethod(shortMethod) + mt.recordAttemptStart() + mt.recordAttemptClientBlockingLatency() + + blockTracker := &blockingLatencyTracker{} + mt.currOp.currAttempt.blockingLatencyTracker = blockTracker + ctx = context.WithValue(ctx, statsContextKey, blockTracker) + + t4t7 := &t4t7Tracker{} + mt.currOp.currAttempt.t4t7Tracker = t4t7 + ctx = context.WithValue(ctx, t4t7ContextKey, t4t7) + + var headerMD, trailerMD metadata.MD + opts = append(opts, grpc.Header(&headerMD), grpc.Trailer(&trailerMD)) + + err := invoker(ctx, method, req, reply, cc, opts...) + + mt.recordAttemptCompletionWithMetadata(headerMD, trailerMD, err) + return err + } +} + +func metricsStreamClientInterceptor() grpc.StreamClientInterceptor { + return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + mt := metricsTracerFromContext(ctx) + if !mt.builtInEnabled { + return streamer(ctx, desc, cc, method, opts...) + } + + parts := strings.Split(method, "/") + shortMethod := parts[len(parts)-1] + mt.setMethod(shortMethod) + mt.recordAttemptStart() + mt.recordAttemptClientBlockingLatency() + + blockTracker := &blockingLatencyTracker{} + mt.currOp.currAttempt.blockingLatencyTracker = blockTracker + ctx = context.WithValue(ctx, statsContextKey, blockTracker) + + t4t7 := &t4t7Tracker{} + mt.currOp.currAttempt.t4t7Tracker = t4t7 + ctx = context.WithValue(ctx, t4t7ContextKey, t4t7) + + var headerMD, trailerMD metadata.MD + opts = append(opts, grpc.Header(&headerMD), grpc.Trailer(&trailerMD)) + + clientStream, err := streamer(ctx, desc, cc, method, opts...) + if err != nil { + mt.recordAttemptCompletionWithMetadata(headerMD, trailerMD, err) + return nil, err + } + + wrapped := &metricsWrappedClientStream{ + ClientStream: clientStream, + mt: mt, + headerMD: &headerMD, + trailerMD: &trailerMD, + } + return wrapped, nil + } +} + +type metricsWrappedClientStream struct { + grpc.ClientStream + mt *builtinMetricsTracer + headerMD *metadata.MD + trailerMD *metadata.MD + completed int32 +} + +func (w *metricsWrappedClientStream) RecvMsg(m interface{}) error { + err := w.ClientStream.RecvMsg(m) + if err != nil { + if atomic.CompareAndSwapInt32(&w.completed, 0, 1) { + w.mt.recordAttemptCompletionWithMetadata(*w.headerMD, *w.trailerMD, err) + } + } + return err +} + +func fallbackString(a, b string) string { + if a != "" { + return a + } + return b +} diff --git a/bigtable/metrics_test.go b/bigtable/metrics_test.go index e3502a1c1ee0..c0f8473eda74 100644 --- a/bigtable/metrics_test.go +++ b/bigtable/metrics_test.go @@ -151,7 +151,13 @@ func setupFakeServerWithCustomHandler(projectID, instanceID string, cfg ClientCo return nil, nil, fmt.Errorf("failed to start bttest server: %w", err) } - conn, err := grpc.Dial(rawGrpcServer.Addr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), grpc.WithStatsHandler(sharedLatencyStatsHandler)) + conn, err := grpc.Dial(rawGrpcServer.Addr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), + grpc.WithStatsHandler(sharedLatencyStatsHandler), + grpc.WithChainUnaryInterceptor(metricsUnaryClientInterceptor()), + grpc.WithChainStreamInterceptor(metricsStreamClientInterceptor()), + ) if err != nil { rawGrpcServer.Close() return nil, nil, fmt.Errorf("failed to dial test server: %w", err) diff --git a/bigtable/query.go b/bigtable/query.go index 2696eaa7c093..31e6aa136fc7 100644 --- a/bigtable/query.go +++ b/bigtable/query.go @@ -108,14 +108,15 @@ func (c *Client) prepareStatementWithMetadata(ctx context.Context, query string, mt := c.newBuiltinMetricsTracer(ctx, "", false) defer mt.recordOperationCompletion() + ctx = contextWithMetricsTracer(ctx, mt) - preparedStatement, err = c.prepareStatement(ctx, mt, query, paramTypes, opts...) + preparedStatement, err = c.prepareStatement(ctx, query, paramTypes, opts...) statusCode, statusErr := convertToGrpcStatusErr(err) mt.setCurrOpStatus(statusCode) return preparedStatement, statusErr } -func (c *Client) prepareStatement(ctx context.Context, mt *builtinMetricsTracer, query string, paramTypes map[string]SQLType, opts ...PrepareOption) (*PreparedStatement, error) { +func (c *Client) prepareStatement(ctx context.Context, query string, paramTypes map[string]SQLType, opts ...PrepareOption) (*PreparedStatement, error) { reqParamTypes := map[string]*btpb.Type{} for k, v := range paramTypes { if v == nil { @@ -140,7 +141,7 @@ func (c *Client) prepareStatement(ctx context.Context, mt *builtinMetricsTracer, ParamTypes: reqParamTypes, } var res *btpb.PrepareQueryResponse - err := gaxInvokeWithRecorder(ctx, mt, "PrepareQuery", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error { + err := gaxInvokeWithRecorder(ctx, "PrepareQuery", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error { var err error res, err = c.client.PrepareQuery(ctx, req, grpc.Header(headerMD), grpc.Trailer(trailerMD)) return err @@ -287,8 +288,9 @@ func (bs *BoundStatement) Execute(ctx context.Context, f func(ResultRow) bool, o mt := bs.ps.c.newBuiltinMetricsTracer(ctx, "", true) defer mt.recordOperationCompletion() + ctx = contextWithMetricsTracer(ctx, mt) - err = bs.execute(ctx, f, mt) + err = bs.execute(ctx, f) statusCode, statusErr := convertToGrpcStatusErr(err) mt.setCurrOpStatus(statusCode) return statusErr @@ -299,7 +301,7 @@ func newPreparedQueryData(ps *PreparedStatement) *preparedQueryData { return &data } -func (bs *BoundStatement) execute(ctx context.Context, f func(ResultRow) bool, mt *builtinMetricsTracer) error { +func (bs *BoundStatement) execute(ctx context.Context, f func(ResultRow) bool) error { // buffer data constructed from the fields in PartialRows` var ongoingResultBatch bytes.Buffer @@ -321,7 +323,7 @@ func (bs *BoundStatement) execute(ctx context.Context, f func(ResultRow) bool, m // // So, do not use latest metadata from `bs.ps` var finalizedStmt *preparedQueryData - err := gaxInvokeWithRecorder(ctx, mt, "ExecuteQuery", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error { + err := gaxInvokeWithRecorder(ctx, "ExecuteQuery", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error { ctx, cancel := context.WithCancel(ctx) // for aborting the stream defer cancel() diff --git a/bigtable/read_modify_write.go b/bigtable/read_modify_write.go index f8a1a1899969..a3aeb0130db4 100644 --- a/bigtable/read_modify_write.go +++ b/bigtable/read_modify_write.go @@ -31,14 +31,15 @@ func (t *Table) ApplyReadModifyWrite(ctx context.Context, row string, m *ReadMod mt := t.newBuiltinMetricsTracer(ctx, false) defer mt.recordOperationCompletion() + ctx = contextWithMetricsTracer(ctx, mt) - updatedRow, err := t.applyReadModifyWrite(ctx, mt, row, m) + updatedRow, err := t.applyReadModifyWrite(ctx, row, m) statusCode, statusErr := convertToGrpcStatusErr(err) mt.setCurrOpStatus(statusCode) return updatedRow, statusErr } -func (t *Table) applyReadModifyWrite(ctx context.Context, mt *builtinMetricsTracer, row string, m *ReadModifyWrite) (Row, error) { +func (t *Table) applyReadModifyWrite(ctx context.Context, row string, m *ReadModifyWrite) (Row, error) { req := &btpb.ReadModifyWriteRowRequest{ AppProfileId: t.c.appProfile, RowKey: []byte(row), @@ -51,7 +52,7 @@ func (t *Table) applyReadModifyWrite(ctx context.Context, mt *builtinMetricsTrac } var r Row - err := gaxInvokeWithRecorder(ctx, mt, "ReadModifyWriteRow", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error { + err := gaxInvokeWithRecorder(ctx, "ReadModifyWriteRow", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error { res, err := t.c.client.ReadModifyWriteRow(ctx, req, grpc.Header(headerMD), grpc.Trailer(trailerMD)) if err != nil { return err diff --git a/bigtable/retry_test.go b/bigtable/retry_test.go index 47f63dd6d3cb..7de891c44ac9 100644 --- a/bigtable/retry_test.go +++ b/bigtable/retry_test.go @@ -39,7 +39,12 @@ func setupFakeServer(project, instance string, config ClientConfig, opt ...grpc. if err != nil { return nil, nil, err } - conn, err := grpc.Dial(srv.Addr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) + conn, err := grpc.Dial(srv.Addr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), + grpc.WithChainUnaryInterceptor(metricsUnaryClientInterceptor()), + grpc.WithChainStreamInterceptor(metricsStreamClientInterceptor()), + ) if err != nil { return nil, nil, err } diff --git a/bigtable/sample_row_keys.go b/bigtable/sample_row_keys.go index c47e6193ace0..3c91466697b1 100644 --- a/bigtable/sample_row_keys.go +++ b/bigtable/sample_row_keys.go @@ -30,16 +30,17 @@ func (t *Table) SampleRowKeys(ctx context.Context) ([]string, error) { mt := t.newBuiltinMetricsTracer(ctx, true) defer mt.recordOperationCompletion() + ctx = contextWithMetricsTracer(ctx, mt) - rowKeys, err := t.sampleRowKeys(ctx, mt) + rowKeys, err := t.sampleRowKeys(ctx) statusCode, statusErr := convertToGrpcStatusErr(err) mt.setCurrOpStatus(statusCode) return rowKeys, statusErr } -func (t *Table) sampleRowKeys(ctx context.Context, mt *builtinMetricsTracer) ([]string, error) { +func (t *Table) sampleRowKeys(ctx context.Context) ([]string, error) { var sampledRowKeys []string - err := gaxInvokeWithRecorder(ctx, mt, "SampleRowKeys", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error { + err := gaxInvokeWithRecorder(ctx, "SampleRowKeys", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error { sampledRowKeys = nil req := &btpb.SampleRowKeysRequest{ AppProfileId: t.c.appProfile,