Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions common/primitives/task_queues.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package primitives
import (
"fmt"

enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
)

Expand All @@ -17,6 +18,21 @@ const (
DLQActivityTQ = "temporal-sys-dlq-activity-tq"
)

// IsInternalTaskQueueKind returns true if the task queue kind identifies a
// server-internal task queue (e.g. worker commands queues).
// All kinds are listed explicitly so that adding a new kind produces a compile error.
func IsInternalTaskQueueKind(kind enumspb.TaskQueueKind) bool {
switch kind {
case enumspb.TASK_QUEUE_KIND_WORKER_COMMANDS:
return true
case enumspb.TASK_QUEUE_KIND_UNSPECIFIED,
enumspb.TASK_QUEUE_KIND_NORMAL,
enumspb.TASK_QUEUE_KIND_STICKY:
return false
}
return false
}

func IsInternalPerNsTaskQueue(taskQueue string) bool {
return taskQueue == PerNSWorkerTaskQueue
}
Expand Down
18 changes: 6 additions & 12 deletions service/matching/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package matching

import (
"context"
"strings"
"sync"
"time"

Expand All @@ -21,6 +20,7 @@ import (
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/common/persistence/visibility/manager"
"go.temporal.io/server/common/primitives"
"go.temporal.io/server/common/resource"
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/common/testing/testhooks"
Expand Down Expand Up @@ -156,18 +156,12 @@ func (h *Handler) opMetricsHandler(
)
}

// internalTaskQueuePrefix identifies server-internal task queues
// (e.g. /temporal-sys/worker-commands/{namespace}/{worker_grouping_key}).
// Note: BreakdownMetricsByTaskQueue should NOT be enabled for these queues as
// they are per-worker and will cause cardinality explosion.
const internalTaskQueuePrefix = "/temporal-sys/"

// recordNexusTaskRequest emits the nexus_task_requests metric with namespace,
// operation, client_name, and is_internal tags.
func (h *Handler) recordNexusTaskRequest(ctx context.Context, namespaceID string, taskQueueName string, operation string) {
func (h *Handler) recordNexusTaskRequest(ctx context.Context, namespaceID string, taskQueueKind enumspb.TaskQueueKind, operation string) {
nsName := h.namespaceName(namespace.ID(namespaceID))
clientName, _ := headers.GetClientNameAndVersion(ctx)
isInternal := strings.HasPrefix(taskQueueName, internalTaskQueuePrefix)
isInternal := primitives.IsInternalTaskQueueKind(taskQueueKind)
metrics.NexusTaskRequests.With(h.metricsHandler).Record(1,
metrics.NamespaceTag(nsName.String()),
metrics.OperationTag(operation),
Expand Down Expand Up @@ -531,7 +525,7 @@ func (h *Handler) PollNexusTaskQueue(ctx context.Context, request *matchingservi
// Only record on the initial handler call (ForwardedSource == ""), not on
// the forwarded call to the root partition, to avoid double-counting.
if request.GetForwardedSource() == "" {
h.recordNexusTaskRequest(ctx, request.GetNamespaceId(), request.GetRequest().GetTaskQueue().GetName(), "PollNexusTaskQueue")
h.recordNexusTaskRequest(ctx, request.GetNamespaceId(), request.GetRequest().GetTaskQueue().GetKind(), "PollNexusTaskQueue")
}

if request.GetForwardedSource() != "" {
Expand All @@ -556,7 +550,7 @@ func (h *Handler) RespondNexusTaskCompleted(ctx context.Context, request *matchi
enumspb.TASK_QUEUE_TYPE_NEXUS,
metrics.MatchingRespondNexusTaskCompletedScope,
)
h.recordNexusTaskRequest(ctx, request.GetNamespaceId(), request.GetTaskQueue().GetName(), "RespondNexusTaskCompleted")
h.recordNexusTaskRequest(ctx, request.GetNamespaceId(), request.GetTaskQueue().GetKind(), "RespondNexusTaskCompleted")

return h.engine.RespondNexusTaskCompleted(ctx, request, opMetrics)
}
Expand All @@ -569,7 +563,7 @@ func (h *Handler) RespondNexusTaskFailed(ctx context.Context, request *matchings
enumspb.TASK_QUEUE_TYPE_NEXUS,
metrics.MatchingRespondNexusTaskFailedScope,
)
h.recordNexusTaskRequest(ctx, request.GetNamespaceId(), request.GetTaskQueue().GetName(), "RespondNexusTaskFailed")
h.recordNexusTaskRequest(ctx, request.GetNamespaceId(), request.GetTaskQueue().GetKind(), "RespondNexusTaskFailed")

return h.engine.RespondNexusTaskFailed(ctx, request, opMetrics)
}
Expand Down
8 changes: 4 additions & 4 deletions service/matching/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,16 +178,16 @@ func TestNexusHandlersEmitClientNameMetric(t *testing.T) {
"should not have client_name tag when header is absent, got: %v", snap)
})

t.Run("is_internal for /temporal-sys/ task queue", func(t *testing.T) {
t.Run("is_internal for worker commands task queue kind", func(t *testing.T) {
captureHandler := metricstest.NewCaptureHandler()
capture := captureHandler.StartCapture()
defer captureHandler.StopCapture(capture)

h, _ := newTestHandler(t, captureHandler)
ctx := ctxWithClientName(t, expectedClientName)
internalTQ := &taskqueuepb.TaskQueue{
Name: "/temporal-sys/worker-commands/ns/grouping-key",
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
Name: "some-task-queue",
Kind: enumspb.TASK_QUEUE_KIND_WORKER_COMMANDS,
}

_, err := h.PollNexusTaskQueue(ctx, &matchingservice.PollNexusTaskQueueRequest{
Expand All @@ -201,7 +201,7 @@ func TestNexusHandlersEmitClientNameMetric(t *testing.T) {
snap := capture.Snapshot()
require.NotEmpty(t, snap[nexusTaskRequestsMetric])
require.True(t, findMetricWithTag(snap, nexusTaskRequestsMetric, "is_internal", "true"),
"should have is_internal=true for /temporal-sys/ task queue, got: %v", snap)
"should have is_internal=true for worker commands task queue kind, got: %v", snap)
})
}

Expand Down
Loading