diff --git a/service/history/worker_commands_task_dispatcher.go b/service/history/worker_commands_task_dispatcher.go index 0d9510429c6..84b48fe3656 100644 --- a/service/history/worker_commands_task_dispatcher.go +++ b/service/history/worker_commands_task_dispatcher.go @@ -147,7 +147,7 @@ func (d *workerCommandsTaskDispatcher) dispatchToWorker( NamespaceId: task.NamespaceID, TaskQueue: &taskqueuepb.TaskQueue{ Name: task.Destination, - Kind: enumspb.TASK_QUEUE_KIND_NORMAL, + Kind: enumspb.TASK_QUEUE_KIND_WORKER_COMMANDS, }, Request: nexusRequest, }) diff --git a/service/history/worker_commands_task_dispatcher_test.go b/service/history/worker_commands_task_dispatcher_test.go index 6b71edd6522..a2391678afb 100644 --- a/service/history/worker_commands_task_dispatcher_test.go +++ b/service/history/worker_commands_task_dispatcher_test.go @@ -7,6 +7,7 @@ import ( "github.com/nexus-rpc/sdk-go/nexus" "github.com/stretchr/testify/require" + enumspb "go.temporal.io/api/enums/v1" nexuspb "go.temporal.io/api/nexus/v1" workerpb "go.temporal.io/api/worker/v1" "go.temporal.io/sdk/temporal" @@ -141,25 +142,34 @@ func TestExecute_DispatchSuccess(t *testing.T) { logger: log.NewNoopLogger(), } - mockClient.EXPECT().DispatchNexusTask(gomock.Any(), gomock.Any()).Return( - &matchingservice.DispatchNexusTaskResponse{ - Outcome: &matchingservice.DispatchNexusTaskResponse_Response{ - Response: &nexuspb.Response{ - Variant: &nexuspb.Response_StartOperation{ - StartOperation: &nexuspb.StartOperationResponse{ - Variant: &nexuspb.StartOperationResponse_SyncSuccess{ - SyncSuccess: &nexuspb.StartOperationResponse_Sync{}, + var capturedReq *matchingservice.DispatchNexusTaskRequest + mockClient.EXPECT().DispatchNexusTask(gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, req *matchingservice.DispatchNexusTaskRequest, _ ...any) (*matchingservice.DispatchNexusTaskResponse, error) { + capturedReq = req + return &matchingservice.DispatchNexusTaskResponse{ + Outcome: &matchingservice.DispatchNexusTaskResponse_Response{ + Response: &nexuspb.Response{ + Variant: &nexuspb.Response_StartOperation{ + StartOperation: &nexuspb.StartOperationResponse{ + Variant: &nexuspb.StartOperationResponse_SyncSuccess{ + SyncSuccess: &nexuspb.StartOperationResponse_Sync{}, + }, }, }, }, }, - }, - }, nil) + }, nil + }) task := testWorkerCommandsTask() err := d.execute(context.Background(), task, 1 /* attempt */) require.NoError(t, err) + require.NotNil(t, capturedReq) + require.Equal(t, enumspb.TASK_QUEUE_KIND_WORKER_COMMANDS, capturedReq.TaskQueue.Kind, + "dispatch request must use TASK_QUEUE_KIND_WORKER_COMMANDS, not TASK_QUEUE_KIND_NORMAL") + require.Equal(t, task.Destination, capturedReq.TaskQueue.Name) + requireMetricValue(t, capture.Snapshot(), "success") }