diff --git a/common/primitives/task_queues.go b/common/primitives/task_queues.go index 2a8433ff706..4a2910d4dcb 100644 --- a/common/primitives/task_queues.go +++ b/common/primitives/task_queues.go @@ -3,6 +3,7 @@ package primitives import ( "fmt" + enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" ) @@ -17,6 +18,21 @@ const ( DLQActivityTQ = "temporal-sys-dlq-activity-tq" ) +// IsInternalTaskQueueKind returns true if the task queue kind identifies a +// server-internal task queue (e.g. worker commands queues). +// All kinds are listed explicitly so that adding a new kind produces a compile error. +func IsInternalTaskQueueKind(kind enumspb.TaskQueueKind) bool { + switch kind { + case enumspb.TASK_QUEUE_KIND_WORKER_COMMANDS: + return true + case enumspb.TASK_QUEUE_KIND_UNSPECIFIED, + enumspb.TASK_QUEUE_KIND_NORMAL, + enumspb.TASK_QUEUE_KIND_STICKY: + return false + } + return false +} + func IsInternalPerNsTaskQueue(taskQueue string) bool { return taskQueue == PerNSWorkerTaskQueue } diff --git a/service/matching/handler.go b/service/matching/handler.go index 8359f88baad..53575447db2 100644 --- a/service/matching/handler.go +++ b/service/matching/handler.go @@ -2,7 +2,6 @@ package matching import ( "context" - "strings" "sync" "time" @@ -21,6 +20,7 @@ import ( "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/persistence/serialization" "go.temporal.io/server/common/persistence/visibility/manager" + "go.temporal.io/server/common/primitives" "go.temporal.io/server/common/resource" "go.temporal.io/server/common/searchattribute" "go.temporal.io/server/common/testing/testhooks" @@ -156,18 +156,12 @@ func (h *Handler) opMetricsHandler( ) } -// internalTaskQueuePrefix identifies server-internal task queues -// (e.g. /temporal-sys/worker-commands/{namespace}/{worker_grouping_key}). -// Note: BreakdownMetricsByTaskQueue should NOT be enabled for these queues as -// they are per-worker and will cause cardinality explosion. -const internalTaskQueuePrefix = "/temporal-sys/" - // recordNexusTaskRequest emits the nexus_task_requests metric with namespace, // operation, client_name, and is_internal tags. -func (h *Handler) recordNexusTaskRequest(ctx context.Context, namespaceID string, taskQueueName string, operation string) { +func (h *Handler) recordNexusTaskRequest(ctx context.Context, namespaceID string, taskQueueKind enumspb.TaskQueueKind, operation string) { nsName := h.namespaceName(namespace.ID(namespaceID)) clientName, _ := headers.GetClientNameAndVersion(ctx) - isInternal := strings.HasPrefix(taskQueueName, internalTaskQueuePrefix) + isInternal := primitives.IsInternalTaskQueueKind(taskQueueKind) metrics.NexusTaskRequests.With(h.metricsHandler).Record(1, metrics.NamespaceTag(nsName.String()), metrics.OperationTag(operation), @@ -531,7 +525,7 @@ func (h *Handler) PollNexusTaskQueue(ctx context.Context, request *matchingservi // Only record on the initial handler call (ForwardedSource == ""), not on // the forwarded call to the root partition, to avoid double-counting. if request.GetForwardedSource() == "" { - h.recordNexusTaskRequest(ctx, request.GetNamespaceId(), request.GetRequest().GetTaskQueue().GetName(), "PollNexusTaskQueue") + h.recordNexusTaskRequest(ctx, request.GetNamespaceId(), request.GetRequest().GetTaskQueue().GetKind(), "PollNexusTaskQueue") } if request.GetForwardedSource() != "" { @@ -556,7 +550,7 @@ func (h *Handler) RespondNexusTaskCompleted(ctx context.Context, request *matchi enumspb.TASK_QUEUE_TYPE_NEXUS, metrics.MatchingRespondNexusTaskCompletedScope, ) - h.recordNexusTaskRequest(ctx, request.GetNamespaceId(), request.GetTaskQueue().GetName(), "RespondNexusTaskCompleted") + h.recordNexusTaskRequest(ctx, request.GetNamespaceId(), request.GetTaskQueue().GetKind(), "RespondNexusTaskCompleted") return h.engine.RespondNexusTaskCompleted(ctx, request, opMetrics) } @@ -569,7 +563,7 @@ func (h *Handler) RespondNexusTaskFailed(ctx context.Context, request *matchings enumspb.TASK_QUEUE_TYPE_NEXUS, metrics.MatchingRespondNexusTaskFailedScope, ) - h.recordNexusTaskRequest(ctx, request.GetNamespaceId(), request.GetTaskQueue().GetName(), "RespondNexusTaskFailed") + h.recordNexusTaskRequest(ctx, request.GetNamespaceId(), request.GetTaskQueue().GetKind(), "RespondNexusTaskFailed") return h.engine.RespondNexusTaskFailed(ctx, request, opMetrics) } diff --git a/service/matching/handler_test.go b/service/matching/handler_test.go index bbe64cc0591..aee197be787 100644 --- a/service/matching/handler_test.go +++ b/service/matching/handler_test.go @@ -178,7 +178,7 @@ func TestNexusHandlersEmitClientNameMetric(t *testing.T) { "should not have client_name tag when header is absent, got: %v", snap) }) - t.Run("is_internal for /temporal-sys/ task queue", func(t *testing.T) { + t.Run("is_internal for worker commands task queue kind", func(t *testing.T) { captureHandler := metricstest.NewCaptureHandler() capture := captureHandler.StartCapture() defer captureHandler.StopCapture(capture) @@ -186,8 +186,8 @@ func TestNexusHandlersEmitClientNameMetric(t *testing.T) { h, _ := newTestHandler(t, captureHandler) ctx := ctxWithClientName(t, expectedClientName) internalTQ := &taskqueuepb.TaskQueue{ - Name: "/temporal-sys/worker-commands/ns/grouping-key", - Kind: enumspb.TASK_QUEUE_KIND_NORMAL, + Name: "some-task-queue", + Kind: enumspb.TASK_QUEUE_KIND_WORKER_COMMANDS, } _, err := h.PollNexusTaskQueue(ctx, &matchingservice.PollNexusTaskQueueRequest{ @@ -201,7 +201,7 @@ func TestNexusHandlersEmitClientNameMetric(t *testing.T) { snap := capture.Snapshot() require.NotEmpty(t, snap[nexusTaskRequestsMetric]) require.True(t, findMetricWithTag(snap, nexusTaskRequestsMetric, "is_internal", "true"), - "should have is_internal=true for /temporal-sys/ task queue, got: %v", snap) + "should have is_internal=true for worker commands task queue kind, got: %v", snap) }) }