From 4a12438718a957b1e9d1c85dea2b477586bf72f7 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Mon, 11 May 2026 10:48:31 -0700 Subject: [PATCH 1/2] Fix worker commands dispatch to use TASK_QUEUE_KIND_WORKER_COMMANDS The workerCommandsTaskDispatcher was incorrectly using TASK_QUEUE_KIND_NORMAL when dispatching cancel commands to the worker's control queue. This caused matching to create a NormalPartition instead of a WorkerCommandsPartition, which has different routing/partitioning/scaling behavior. Co-Authored-By: Claude Opus 4.6 --- .../worker_commands_task_dispatcher.go | 2 +- .../worker_commands_task_dispatcher_test.go | 30 ++++++++++++------- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/service/history/worker_commands_task_dispatcher.go b/service/history/worker_commands_task_dispatcher.go index 0d9510429c6..84b48fe3656 100644 --- a/service/history/worker_commands_task_dispatcher.go +++ b/service/history/worker_commands_task_dispatcher.go @@ -147,7 +147,7 @@ func (d *workerCommandsTaskDispatcher) dispatchToWorker( NamespaceId: task.NamespaceID, TaskQueue: &taskqueuepb.TaskQueue{ Name: task.Destination, - Kind: enumspb.TASK_QUEUE_KIND_NORMAL, + Kind: enumspb.TASK_QUEUE_KIND_WORKER_COMMANDS, }, Request: nexusRequest, }) diff --git a/service/history/worker_commands_task_dispatcher_test.go b/service/history/worker_commands_task_dispatcher_test.go index 6b71edd6522..7c9e8ff6b9d 100644 --- a/service/history/worker_commands_task_dispatcher_test.go +++ b/service/history/worker_commands_task_dispatcher_test.go @@ -7,6 +7,7 @@ import ( "github.com/nexus-rpc/sdk-go/nexus" "github.com/stretchr/testify/require" + enumspb "go.temporal.io/api/enums/v1" nexuspb "go.temporal.io/api/nexus/v1" workerpb "go.temporal.io/api/worker/v1" "go.temporal.io/sdk/temporal" @@ -141,25 +142,34 @@ func TestExecute_DispatchSuccess(t *testing.T) { logger: log.NewNoopLogger(), } - mockClient.EXPECT().DispatchNexusTask(gomock.Any(), gomock.Any()).Return( - &matchingservice.DispatchNexusTaskResponse{ - Outcome: &matchingservice.DispatchNexusTaskResponse_Response{ - Response: &nexuspb.Response{ - Variant: &nexuspb.Response_StartOperation{ - StartOperation: &nexuspb.StartOperationResponse{ - Variant: &nexuspb.StartOperationResponse_SyncSuccess{ - SyncSuccess: &nexuspb.StartOperationResponse_Sync{}, + var capturedReq *matchingservice.DispatchNexusTaskRequest + mockClient.EXPECT().DispatchNexusTask(gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, req *matchingservice.DispatchNexusTaskRequest, _ ...interface{}) (*matchingservice.DispatchNexusTaskResponse, error) { + capturedReq = req + return &matchingservice.DispatchNexusTaskResponse{ + Outcome: &matchingservice.DispatchNexusTaskResponse_Response{ + Response: &nexuspb.Response{ + Variant: &nexuspb.Response_StartOperation{ + StartOperation: &nexuspb.StartOperationResponse{ + Variant: &nexuspb.StartOperationResponse_SyncSuccess{ + SyncSuccess: &nexuspb.StartOperationResponse_Sync{}, + }, }, }, }, }, - }, - }, nil) + }, nil + }) task := testWorkerCommandsTask() err := d.execute(context.Background(), task, 1 /* attempt */) require.NoError(t, err) + require.NotNil(t, capturedReq) + require.Equal(t, enumspb.TASK_QUEUE_KIND_WORKER_COMMANDS, capturedReq.TaskQueue.Kind, + "dispatch request must use TASK_QUEUE_KIND_WORKER_COMMANDS, not TASK_QUEUE_KIND_NORMAL") + require.Equal(t, task.Destination, capturedReq.TaskQueue.Name) + requireMetricValue(t, capture.Snapshot(), "success") } From 921df715847198e7d0b05fee128f1f8d6723fb09 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Mon, 11 May 2026 12:30:42 -0700 Subject: [PATCH 2/2] Fix fmt lint: use `any` instead of `interface{}` in test Co-Authored-By: Claude Opus 4.6 --- service/history/worker_commands_task_dispatcher_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/history/worker_commands_task_dispatcher_test.go b/service/history/worker_commands_task_dispatcher_test.go index 7c9e8ff6b9d..a2391678afb 100644 --- a/service/history/worker_commands_task_dispatcher_test.go +++ b/service/history/worker_commands_task_dispatcher_test.go @@ -144,7 +144,7 @@ func TestExecute_DispatchSuccess(t *testing.T) { var capturedReq *matchingservice.DispatchNexusTaskRequest mockClient.EXPECT().DispatchNexusTask(gomock.Any(), gomock.Any()).DoAndReturn( - func(_ context.Context, req *matchingservice.DispatchNexusTaskRequest, _ ...interface{}) (*matchingservice.DispatchNexusTaskResponse, error) { + func(_ context.Context, req *matchingservice.DispatchNexusTaskRequest, _ ...any) (*matchingservice.DispatchNexusTaskResponse, error) { capturedReq = req return &matchingservice.DispatchNexusTaskResponse{ Outcome: &matchingservice.DispatchNexusTaskResponse_Response{