Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion bigtable/apply_bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,9 @@ func (t *Table) applyGroup(ctx context.Context, group []*entryErr, opts ...Apply
attrMap := make(map[string]interface{})
mt := t.newBuiltinMetricsTracer(ctx, true)
defer mt.recordOperationCompletion()
ctx = contextWithMetricsTracer(ctx, mt)

err = gaxInvokeWithRecorder(ctx, mt, "MutateRows", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error {
err = gaxInvokeWithRecorder(ctx, "MutateRows", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error {
attrMap["rowCount"] = len(group)
trace.TracePrintf(ctx, attrMap, "Row count in ApplyBulk")
err := t.doApplyBulk(ctx, group, headerMD, trailerMD, opts...)
Expand Down
24 changes: 12 additions & 12 deletions bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,14 +229,16 @@ func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts

mt := t.newBuiltinMetricsTracer(ctx, true)
defer mt.recordOperationCompletion()
ctx = contextWithMetricsTracer(ctx, mt)

err = t.readRows(ctx, arg, f, mt, opts...)
err = t.readRows(ctx, arg, f, opts...)
statusCode, statusErr := convertToGrpcStatusErr(err)
mt.setCurrOpStatus(statusCode)
return statusErr
}

func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt *builtinMetricsTracer, opts ...ReadOption) (err error) {
func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, opts ...ReadOption) (err error) {
mt := metricsTracerFromContext(ctx)
var prevRowKey string
attrMap := make(map[string]interface{})

Expand All @@ -254,7 +256,7 @@ func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt *
}

firstResponseRecorded := false
err = gaxInvokeWithRecorder(ctx, mt, methodNameReadRows, func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error {
err = gaxInvokeWithRecorder(ctx, methodNameReadRows, func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error {
if rowLimitSet && numRowsRead >= intialRowLimit {
return nil
}
Expand Down Expand Up @@ -910,14 +912,15 @@ func (t *Table) Apply(ctx context.Context, row string, m *Mutation, opts ...Appl
defer func() { trace.EndSpan(ctx, err) }()
mt := t.newBuiltinMetricsTracer(ctx, false)
defer mt.recordOperationCompletion()
ctx = contextWithMetricsTracer(ctx, mt)

err = t.apply(ctx, mt, row, m, opts...)
err = t.apply(ctx, row, m, opts...)
statusCode, statusErr := convertToGrpcStatusErr(err)
mt.setCurrOpStatus(statusCode)
return statusErr
}

func (t *Table) apply(ctx context.Context, mt *builtinMetricsTracer, row string, m *Mutation, opts ...ApplyOption) (err error) {
func (t *Table) apply(ctx context.Context, row string, m *Mutation, opts ...ApplyOption) (err error) {
after := func(res proto.Message) {
for _, o := range opts {
o.after(res)
Expand All @@ -940,7 +943,7 @@ func (t *Table) apply(ctx context.Context, mt *builtinMetricsTracer, row string,
callOptions = append(callOptions, t.c.retryOption)
}
var res *btpb.MutateRowResponse
err := gaxInvokeWithRecorder(ctx, mt, "MutateRow", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error {
err := gaxInvokeWithRecorder(ctx, "MutateRow", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error {
var err error
res, err = t.c.client.MutateRow(ctx, req, grpc.Header(headerMD), grpc.Trailer(trailerMD))
return err
Expand Down Expand Up @@ -976,7 +979,7 @@ func (t *Table) apply(ctx context.Context, mt *builtinMetricsTracer, row string,
req.FalseMutations = m.mfalse.ops
}
var cmRes *btpb.CheckAndMutateRowResponse
err = gaxInvokeWithRecorder(ctx, mt, "CheckAndMutateRow", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error {
err = gaxInvokeWithRecorder(ctx, "CheckAndMutateRow", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error {
var err error
cmRes, err = t.c.client.CheckAndMutateRow(ctx, req, grpc.Header(headerMD), grpc.Trailer(trailerMD))
return err
Expand Down Expand Up @@ -1139,10 +1142,11 @@ func (ts Timestamp) TruncateToMilliseconds() Timestamp {
// - does not return errors seen while recording the metrics
//
// - then, calls gax.Invoke with 'callWrapper' as an argument
func gaxInvokeWithRecorder(ctx context.Context, mt *builtinMetricsTracer, method string,
func gaxInvokeWithRecorder(ctx context.Context, method string,
f func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error, opts ...gax.CallOption) error {
attemptHeaderMD := metadata.New(nil)
attempTrailerMD := metadata.New(nil)
mt := metricsTracerFromContext(ctx)
mt.setMethod(method)

callWrapper := func(ctx context.Context, callSettings gax.CallSettings) error {
Expand All @@ -1157,7 +1161,6 @@ func gaxInvokeWithRecorder(ctx context.Context, mt *builtinMetricsTracer, method
finalMD := metadata.Join(existingMD, md)
newCtx := metadata.NewOutgoingContext(ctx, finalMD)

mt.recordAttemptStart()
blockTracker := &blockingLatencyTracker{}
mt.currOp.currAttempt.blockingLatencyTracker = blockTracker
newCtx = context.WithValue(newCtx, statsContextKey, blockTracker)
Expand All @@ -1168,9 +1171,6 @@ func gaxInvokeWithRecorder(ctx context.Context, mt *builtinMetricsTracer, method
// f makes calls to CBT service
err := f(newCtx, &attemptHeaderMD, &attempTrailerMD, callSettings)

// Record attempt specific metrics
mt.recordAttemptCompletion(attemptHeaderMD, attempTrailerMD, err)

extractCookies(attemptHeaderMD, op)
extractCookies(attempTrailerMD, op)
return err
Expand Down
15 changes: 9 additions & 6 deletions bigtable/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,11 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C
}
}

// Add gRPC client interceptors to supply Google client information. No external interceptors are passed.
o = append(o, btopt.ClientInterceptorOptions(nil, nil)...)
// Add gRPC client interceptors to supply Google client information and client-side metrics.
o = append(o, btopt.ClientInterceptorOptions(
[]grpc.StreamClientInterceptor{metricsStreamClientInterceptor()},
[]grpc.UnaryClientInterceptor{metricsUnaryClientInterceptor()},
)...)
o = append(o, option.WithGRPCDialOption(grpc.WithStatsHandler(sharedLatencyStatsHandler)))
// Default to a small connection pool that can be overridden.
o = append(o,
Expand Down Expand Up @@ -380,26 +383,26 @@ func (c *Client) PingAndWarm(ctx context.Context) (err error) {
defer func() { trace.EndSpan(ctx, err) }()
mt := c.newBuiltinMetricsTracer(ctx, "", false)
defer mt.recordOperationCompletion()
ctx = contextWithMetricsTracer(ctx, mt)

err = c.pingerWithMetadata(ctx, mt)
err = c.pingerWithMetadata(ctx)
statusCode, statusErr := convertToGrpcStatusErr(err)
mt.currOp.setStatus(statusCode.String())
return statusErr
}

func (c *Client) pingerWithMetadata(ctx context.Context, mt *builtinMetricsTracer) (err error) {
func (c *Client) pingerWithMetadata(ctx context.Context) (err error) {
req := &btpb.PingAndWarmRequest{
Name: c.fullInstanceName(),
AppProfileId: c.appProfile,
}
err = gaxInvokeWithRecorder(ctx, mt, "PingAndWarm", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error {
err = gaxInvokeWithRecorder(ctx, "PingAndWarm", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error {
var err error
_, err = c.client.PingAndWarm(ctx, req, grpc.Header(headerMD), grpc.Trailer(trailerMD))
return err
})

return err

}

func (c *Client) newBuiltinMetricsTracer(ctx context.Context, table string, isStreaming bool) *builtinMetricsTracer {
Expand Down
Loading
Loading