diff --git a/service/history/interfaces/mutable_state.go b/service/history/interfaces/mutable_state.go index 277368a26ed..c5bda00afb9 100644 --- a/service/history/interfaces/mutable_state.go +++ b/service/history/interfaces/mutable_state.go @@ -49,6 +49,7 @@ type ( AddActivityTaskCancelRequestedEvent(int64, int64, string) (*historypb.HistoryEvent, *persistencespb.ActivityInfo, error) AddActivityTaskCanceledEvent(int64, int64, int64, *commonpb.Payloads, string) (*historypb.HistoryEvent, error) AddWorkerCommandsTasks(commands []*workerpb.WorkerCommand, controlQueue string) error + GenerateActivityCancelCommandsForClose() error AddActivityTaskCompletedEvent(int64, int64, *workflowservice.RespondActivityTaskCompletedRequest) (*historypb.HistoryEvent, error) AddActivityTaskFailedEvent(int64, int64, *failurepb.Failure, enumspb.RetryState, string, *commonpb.WorkerVersionStamp) (*historypb.HistoryEvent, error) AddActivityTaskScheduledEvent(int64, *commandpb.ScheduleActivityTaskCommandAttributes, bool) (*historypb.HistoryEvent, *persistencespb.ActivityInfo, error) diff --git a/service/history/interfaces/mutable_state_mock.go b/service/history/interfaces/mutable_state_mock.go index 5fcba16e810..5fa88dc7a9b 100644 --- a/service/history/interfaces/mutable_state_mock.go +++ b/service/history/interfaces/mutable_state_mock.go @@ -1972,6 +1972,20 @@ func (mr *MockMutableStateMockRecorder) FlushBufferedEvents() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FlushBufferedEvents", reflect.TypeOf((*MockMutableState)(nil).FlushBufferedEvents)) } +// GenerateActivityCancelCommandsForClose mocks base method. +func (m *MockMutableState) GenerateActivityCancelCommandsForClose() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GenerateActivityCancelCommandsForClose") + ret0, _ := ret[0].(error) + return ret0 +} + +// GenerateActivityCancelCommandsForClose indicates an expected call of GenerateActivityCancelCommandsForClose. +func (mr *MockMutableStateMockRecorder) GenerateActivityCancelCommandsForClose() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GenerateActivityCancelCommandsForClose", reflect.TypeOf((*MockMutableState)(nil).GenerateActivityCancelCommandsForClose)) +} + // GenerateEventLoadToken mocks base method. func (m *MockMutableState) GenerateEventLoadToken(event *history.HistoryEvent) ([]byte, error) { m.ctrl.T.Helper() diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 862894e92b3..397a0120f6e 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -61,6 +61,7 @@ import ( "go.temporal.io/server/common/searchattribute/sadefs" serviceerrors "go.temporal.io/server/common/serviceerror" "go.temporal.io/server/common/softassert" + "go.temporal.io/server/common/tasktoken" "go.temporal.io/server/common/util" "go.temporal.io/server/common/worker_versioning" "go.temporal.io/server/components/callbacks" @@ -4536,6 +4537,80 @@ func (ms *MutableStateImpl) AddWorkerCommandsTasks(commands []*workerpb.WorkerCo return ms.taskGenerator.GenerateWorkerCommandsTasks(commands, controlQueue) } +// GenerateActivityCancelCommandsForClose generates WorkerCommandsTasks to cancel all +// in-flight activities that have a worker control queue. +func (ms *MutableStateImpl) GenerateActivityCancelCommandsForClose() error { + if !ms.config.EnableCancelActivityWorkerCommand() { + return nil + } + + // Cancel commands are best-effort and only dispatched on the active cluster. + // Skip task generation on standby to avoid creating tasks that will be dropped. + activeCluster := ms.clusterMetadata.ClusterNameForFailoverVersion( + ms.namespaceEntry.IsGlobalNamespace(), ms.GetCurrentVersion()) + if activeCluster != ms.clusterMetadata.GetCurrentClusterName() { + return nil + } + + serializer := tasktoken.NewSerializer() + wfKey := ms.GetWorkflowKey() + nsID := ms.GetNamespaceEntry().ID().String() + + commandsByQueue := make(map[string][]*workerpb.WorkerCommand) + for _, ai := range ms.pendingActivityInfoIDs { + // No control queue means the activity was started before this feature was deployed. + if ai.WorkerControlTaskQueue == "" { + continue + } + if ai.StartedClock == nil { + // StartedClock is nil when the activity is not currently started (e.g. in retry backoff) + // or was started before this feature was deployed. Skip cancel command. + ms.logger.Debug("Skipping worker cancel command: activity not currently started", + tag.WorkflowNamespaceID(wfKey.NamespaceID), + tag.WorkflowID(wfKey.WorkflowID), + tag.WorkflowRunID(wfKey.RunID), + tag.WorkflowScheduledEventID(ai.ScheduledEventId), + ) + continue + } + + taskToken, err := serializer.Serialize(tasktoken.NewActivityTaskToken( + nsID, + wfKey.WorkflowID, + wfKey.RunID, + ai.ScheduledEventId, + ai.ActivityId, + ai.ActivityType.GetName(), + ai.Attempt, + ai.StartedClock, + ai.Version, + ai.StartVersion, + nil, + )) + if err != nil { + return err + } + + commandsByQueue[ai.WorkerControlTaskQueue] = append( + commandsByQueue[ai.WorkerControlTaskQueue], + &workerpb.WorkerCommand{ + Type: &workerpb.WorkerCommand_CancelActivity{ + CancelActivity: &workerpb.CancelActivityCommand{ + TaskToken: taskToken, + }, + }, + }, + ) + } + + for controlQueue, commands := range commandsByQueue { + if err := ms.AddWorkerCommandsTasks(commands, controlQueue); err != nil { + return err + } + } + return nil +} + func (ms *MutableStateImpl) ApplyActivityTaskCancelRequestedEvent( event *historypb.HistoryEvent, ) error { diff --git a/service/history/workflow/mutable_state_impl_test.go b/service/history/workflow/mutable_state_impl_test.go index 24de0d9ac43..08d6a08e031 100644 --- a/service/history/workflow/mutable_state_impl_test.go +++ b/service/history/workflow/mutable_state_impl_test.go @@ -7591,6 +7591,185 @@ func (s *mutableStateSuite) TestApplyWorkflowExecutionOptionsUpdatedEvent_TimeSk } } +func TestGenerateActivityCancelCommandsForClose(t *testing.T) { + t.Parallel() + + startedClock := &clockspb.VectorClock{ShardId: 1, Clock: 100} + + testCases := []struct { + name string + featureEnabled bool + standby bool // simulate running on a standby (non-active) cluster + activities map[int64]*persistencespb.ActivityInfo + expectedQueues map[string]int // controlQueue -> expected command count + expectedNoTasks bool + }{ + { + name: "activities with control queue and started clock", + featureEnabled: true, + activities: map[int64]*persistencespb.ActivityInfo{ + 1: { + ScheduledEventId: 1, + ActivityId: "act-1", + ActivityType: &commonpb.ActivityType{Name: "type1"}, + WorkerControlTaskQueue: "control-queue-1", + StartedClock: startedClock, + Attempt: 1, + }, + }, + expectedQueues: map[string]int{"control-queue-1": 1}, + }, + { + name: "skips activities without control queue", + featureEnabled: true, + activities: map[int64]*persistencespb.ActivityInfo{ + 1: { + ScheduledEventId: 1, + ActivityId: "act-1", + ActivityType: &commonpb.ActivityType{Name: "type1"}, + StartedClock: startedClock, + Attempt: 1, + }, + }, + expectedNoTasks: true, + }, + { + name: "skips activities without started clock", + featureEnabled: true, + activities: map[int64]*persistencespb.ActivityInfo{ + 1: { + ScheduledEventId: 1, + ActivityId: "act-1", + ActivityType: &commonpb.ActivityType{Name: "type1"}, + WorkerControlTaskQueue: "control-queue-1", + Attempt: 1, + }, + }, + expectedNoTasks: true, + }, + { + name: "multiple activities batched by control queue", + featureEnabled: true, + activities: map[int64]*persistencespb.ActivityInfo{ + 1: { + ScheduledEventId: 1, + ActivityId: "act-1", + ActivityType: &commonpb.ActivityType{Name: "type1"}, + WorkerControlTaskQueue: "queue-A", + StartedClock: startedClock, + Attempt: 1, + }, + 2: { + ScheduledEventId: 2, + ActivityId: "act-2", + ActivityType: &commonpb.ActivityType{Name: "type2"}, + WorkerControlTaskQueue: "queue-A", + StartedClock: startedClock, + Attempt: 1, + }, + 3: { + ScheduledEventId: 3, + ActivityId: "act-3", + ActivityType: &commonpb.ActivityType{Name: "type3"}, + WorkerControlTaskQueue: "queue-B", + StartedClock: startedClock, + Attempt: 1, + }, + }, + expectedQueues: map[string]int{"queue-A": 2, "queue-B": 1}, + }, + { + name: "feature flag disabled - no tasks generated", + featureEnabled: false, + activities: map[int64]*persistencespb.ActivityInfo{ + 1: { + ScheduledEventId: 1, + ActivityId: "act-1", + ActivityType: &commonpb.ActivityType{Name: "type1"}, + WorkerControlTaskQueue: "control-queue-1", + StartedClock: startedClock, + Attempt: 1, + }, + }, + expectedNoTasks: true, + }, + { + name: "standby cluster - no tasks generated", + featureEnabled: true, + standby: true, + activities: map[int64]*persistencespb.ActivityInfo{ + 1: { + ScheduledEventId: 1, + ActivityId: "act-1", + ActivityType: &commonpb.ActivityType{Name: "type1"}, + WorkerControlTaskQueue: "control-queue-1", + StartedClock: startedClock, + Attempt: 1, + }, + }, + expectedNoTasks: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockEventsCache := events.NewMockCache(ctrl) + mockConfig := tests.NewDynamicConfig() + mockConfig.EnableCancelActivityWorkerCommand = dynamicconfig.GetBoolPropertyFn(tc.featureEnabled) + + mockShard := shard.NewTestContext( + ctrl, + &persistencespb.ShardInfo{ShardId: 0, RangeId: 1}, + mockConfig, + ) + defer mockShard.StopForTest() + reg := hsm.NewRegistry() + require.NoError(t, RegisterStateMachine(reg)) + require.NoError(t, callbacks.RegisterStateMachine(reg)) + require.NoError(t, nexusoperations.RegisterStateMachines(reg)) + mockShard.SetStateMachineRegistry(reg) + mockShard.SetEventsCacheForTesting(mockEventsCache) + + namespaceEntry := tests.GlobalNamespaceEntry + mockShard.Resource.NamespaceCache.EXPECT().GetNamespaceByID(tests.NamespaceID).Return(namespaceEntry, nil).AnyTimes() + activeCluster := cluster.TestCurrentClusterName + if tc.standby { + activeCluster = cluster.TestAlternativeClusterName + } + mockShard.Resource.ClusterMetadata.EXPECT().ClusterNameForFailoverVersion(gomock.Any(), gomock.Any()).Return(activeCluster).AnyTimes() + mockShard.Resource.ClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes() + mockShard.Resource.ClusterMetadata.EXPECT().GetClusterID().Return(int64(1)).AnyTimes() + + ms := NewMutableState(mockShard, mockEventsCache, log.NewTestLogger(), namespaceEntry, tests.WorkflowID, tests.RunID, time.Now().UTC()) + ms.pendingActivityInfoIDs = tc.activities + + err := ms.GenerateActivityCancelCommandsForClose() + require.NoError(t, err) + + if tc.expectedNoTasks { + require.Empty(t, ms.InsertTasks[tasks.CategoryOutbound]) + return + } + + // Verify tasks were generated by checking outbound task messages + var workerCommandTasks []*tasks.WorkerCommandsTask + for _, task := range ms.InsertTasks[tasks.CategoryOutbound] { + if wct, ok := task.(*tasks.WorkerCommandsTask); ok { + workerCommandTasks = append(workerCommandTasks, wct) + } + } + + // Verify each expected queue got the right number of commands + tasksByQueue := make(map[string]int) + for _, wct := range workerCommandTasks { + tasksByQueue[wct.Destination] = len(wct.Commands) + } + require.Equal(t, tc.expectedQueues, tasksByQueue) + }) + } +} + // TestApplyTimeSkippingBound covers the full branch table of applyTimeSkippingBound: // MaxElapsedDuration set / nil duration / nil bound / nil config / Enabled=false / // MaxSkippedDuration clearing a stale CurrentElapsedDurationBound. The first-init diff --git a/service/history/workflow/task_generator.go b/service/history/workflow/task_generator.go index 2020be905fc..2aeb97f8798 100644 --- a/service/history/workflow/task_generator.go +++ b/service/history/workflow/task_generator.go @@ -271,7 +271,8 @@ func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks( r.mutableState.AddTasks(closeTasks...) - return nil + // Proactively cancel in-flight activities so they don't run uselessly after the workflow is closed. + return r.mutableState.GenerateActivityCancelCommandsForClose() } // getRetention returns the retention period for this task generator's workflow execution. diff --git a/service/history/workflow/task_generator_test.go b/service/history/workflow/task_generator_test.go index 507ead3e239..a211b6d9ad1 100644 --- a/service/history/workflow/task_generator_test.go +++ b/service/history/workflow/task_generator_test.go @@ -261,6 +261,8 @@ func TestTaskGeneratorImpl_GenerateWorkflowCloseTasks(t *testing.T) { return cfg }).AnyTimes() + mutableState.EXPECT().GenerateActivityCancelCommandsForClose().Return(nil) + taskGenerator := NewTaskGenerator(namespaceRegistry, mutableState, cfg, archivalMetadata, log.NewTestLogger()) err := taskGenerator.GenerateWorkflowCloseTasks(p.CloseEventTime, p.DeleteAfterClose, false) require.NoError(t, err) diff --git a/service/history/workflow/util.go b/service/history/workflow/util.go index f42b1e2a362..85021f35134 100644 --- a/service/history/workflow/util.go +++ b/service/history/workflow/util.go @@ -143,7 +143,6 @@ func TerminateWorkflow( deleteAfterTerminate, links, ) - return err } diff --git a/tests/worker_commands_task_test.go b/tests/worker_commands_task_test.go index 187062797b5..76825c9a3f4 100644 --- a/tests/worker_commands_task_test.go +++ b/tests/worker_commands_task_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/google/uuid" commandpb "go.temporal.io/api/command/v1" commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" @@ -163,3 +164,456 @@ func TestDispatchCancelToWorker(t *testing.T) { "Cancel command task token must match the activity's original task token") t.Log("SUCCESS: Received ExecuteCommands Nexus request on control queue with matching CancelActivity task token") } + +// TestDispatchCancelOnWorkflowTermination tests that when a workflow is terminated, +// the server proactively dispatches cancel commands for in-flight activities +// that have a worker control queue. The activity is given a long timeout to +// ensure it is still in-flight when the workflow is terminated. +func TestDispatchCancelOnWorkflowTermination(t *testing.T) { + env := testcore.NewEnv(t, testcore.WithDynamicConfig(dynamicconfig.EnableCancelActivityWorkerCommand, true)) + + ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) + defer cancel() + + tv := env.Tv() + poller := env.TaskPoller() + + controlQueueName := tv.ControlQueueName(env.Namespace().String()) + + // Start the workflow + _, err := env.FrontendClient().StartWorkflowExecution(ctx, &workflowservice.StartWorkflowExecutionRequest{ + RequestId: tv.Any().String(), + Namespace: env.Namespace().String(), + WorkflowId: tv.WorkflowID(), + WorkflowType: tv.WorkflowType(), + TaskQueue: tv.TaskQueue(), + WorkflowExecutionTimeout: durationpb.New(60 * time.Second), + WorkflowTaskTimeout: durationpb.New(10 * time.Second), + }) + env.NoError(err) + + // Schedule the activity + _, err = poller.PollAndHandleWorkflowTask(tv, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + return &workflowservice.RespondWorkflowTaskCompletedRequest{ + Commands: []*commandpb.Command{ + { + CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK, + Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{ + ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{ + ActivityId: tv.ActivityID(), + ActivityType: tv.ActivityType(), + TaskQueue: tv.TaskQueue(), + // Long timeout to ensure the activity is still in-flight when the workflow is terminated. + ScheduleToCloseTimeout: durationpb.New(5 * time.Minute), + StartToCloseTimeout: durationpb.New(5 * time.Minute), + }, + }, + }, + }, + }, nil + }) + env.NoError(err) + + // Poll for activity task and start it with a worker control queue + activityPollResp, err := env.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ + Namespace: env.Namespace().String(), + TaskQueue: tv.TaskQueue(), + Identity: tv.WorkerIdentity(), + WorkerInstanceKey: tv.WorkerInstanceKey(), + WorkerControlTaskQueue: controlQueueName, + }) + env.NoError(err) + env.NotNil(activityPollResp) + env.NotEmpty(activityPollResp.TaskToken) + + // Terminate the workflow directly (no cancellation request, no workflow task) + _, err = env.FrontendClient().TerminateWorkflowExecution(ctx, &workflowservice.TerminateWorkflowExecutionRequest{ + Namespace: env.Namespace().String(), + WorkflowExecution: activityPollResp.WorkflowExecution, + Reason: "test termination", + }) + env.NoError(err) + + // Poll Nexus control queue - should receive cancel command for the in-flight activity + var nexusPollResp *workflowservice.PollNexusTaskQueueResponse + env.Eventually(func() bool { + pollCtx, pollCancel := context.WithTimeout(ctx, 5*time.Second) + defer pollCancel() + resp, err := env.FrontendClient().PollNexusTaskQueue(pollCtx, &workflowservice.PollNexusTaskQueueRequest{ + Namespace: env.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{Name: controlQueueName, Kind: enumspb.TASK_QUEUE_KIND_WORKER_COMMANDS}, + Identity: tv.WorkerIdentity(), + }) + if err == nil && resp != nil && resp.Request != nil { + nexusPollResp = resp + return true + } + return false + }, 120*time.Second, 100*time.Millisecond, "Timed out waiting for cancel command on control queue after termination") + + // Verify the cancel command + startOp := nexusPollResp.Request.GetStartOperation() + env.NotNil(startOp, "Expected StartOperation in Nexus request") + env.Equal("temporal.api.nexusservices.workerservice.v1.WorkerService", startOp.Service) + env.Equal("ExecuteCommands", startOp.Operation) + + var executeReq workerservicepb.ExecuteCommandsRequest + err = payload.Decode(startOp.Payload, &executeReq) + env.NoError(err) + env.Len(executeReq.Commands, 1, "Expected exactly 1 command") + cancelCmd := executeReq.Commands[0].GetCancelActivity() + env.NotNil(cancelCmd, "Expected CancelActivity command") + env.Equal(activityPollResp.TaskToken, cancelCmd.TaskToken, + "Cancel command task token must match the activity's original task token") + t.Log("SUCCESS: Received cancel command on control queue after workflow termination") +} + +// TestDispatchCancelOnWorkflowReset tests that when a workflow is reset, +// the old run is terminated and cancel commands are dispatched for its in-flight activities. +// The activity is given a long timeout to ensure it is still in-flight when the workflow is reset. +// Note: reset internally calls TerminateWorkflow, so this exercises the same code path as +// TestDispatchCancelOnWorkflowTermination — it verifies the plumbing rather than a distinct branch. +func TestDispatchCancelOnWorkflowReset(t *testing.T) { + env := testcore.NewEnv(t, testcore.WithDynamicConfig(dynamicconfig.EnableCancelActivityWorkerCommand, true)) + + ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) + defer cancel() + + tv := env.Tv() + poller := env.TaskPoller() + + controlQueueName := tv.ControlQueueName(env.Namespace().String()) + + // Start the workflow + startResp, err := env.FrontendClient().StartWorkflowExecution(ctx, &workflowservice.StartWorkflowExecutionRequest{ + RequestId: tv.Any().String(), + Namespace: env.Namespace().String(), + WorkflowId: tv.WorkflowID(), + WorkflowType: tv.WorkflowType(), + TaskQueue: tv.TaskQueue(), + WorkflowExecutionTimeout: durationpb.New(60 * time.Second), + WorkflowTaskTimeout: durationpb.New(10 * time.Second), + }) + env.NoError(err) + + // Schedule the activity + _, err = poller.PollAndHandleWorkflowTask(tv, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + return &workflowservice.RespondWorkflowTaskCompletedRequest{ + Commands: []*commandpb.Command{ + { + CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK, + Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{ + ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{ + ActivityId: tv.ActivityID(), + ActivityType: tv.ActivityType(), + TaskQueue: tv.TaskQueue(), + // Long timeout to ensure the activity is still in-flight when the workflow is reset. + ScheduleToCloseTimeout: durationpb.New(5 * time.Minute), + StartToCloseTimeout: durationpb.New(5 * time.Minute), + }, + }, + }, + }, + }, nil + }) + env.NoError(err) + + // Poll for activity task and start it with a worker control queue + activityPollResp, err := env.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ + Namespace: env.Namespace().String(), + TaskQueue: tv.TaskQueue(), + Identity: tv.WorkerIdentity(), + WorkerInstanceKey: tv.WorkerInstanceKey(), + WorkerControlTaskQueue: controlQueueName, + }) + env.NoError(err) + env.NotNil(activityPollResp) + env.NotEmpty(activityPollResp.TaskToken) + + // Find the WFT completed event ID for the reset point + histResp, err := env.FrontendClient().GetWorkflowExecutionHistory(ctx, &workflowservice.GetWorkflowExecutionHistoryRequest{ + Namespace: env.Namespace().String(), + Execution: &commonpb.WorkflowExecution{ + WorkflowId: tv.WorkflowID(), + RunId: startResp.RunId, + }, + }) + env.NoError(err) + var wftCompletedEventID int64 + for _, event := range histResp.History.Events { + if event.EventType == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED { + wftCompletedEventID = event.EventId + break + } + } + env.NotZero(wftCompletedEventID) + + // Reset the workflow — this terminates the current run + _, err = env.FrontendClient().ResetWorkflowExecution(ctx, &workflowservice.ResetWorkflowExecutionRequest{ + Namespace: env.Namespace().String(), + WorkflowExecution: &commonpb.WorkflowExecution{ + WorkflowId: tv.WorkflowID(), + RunId: startResp.RunId, + }, + Reason: "test reset", + RequestId: uuid.NewString(), + WorkflowTaskFinishEventId: wftCompletedEventID, + }) + env.NoError(err) + + // Poll Nexus control queue - should receive cancel command for the in-flight activity + var nexusPollResp *workflowservice.PollNexusTaskQueueResponse + env.Eventually(func() bool { + pollCtx, pollCancel := context.WithTimeout(ctx, 5*time.Second) + defer pollCancel() + resp, err := env.FrontendClient().PollNexusTaskQueue(pollCtx, &workflowservice.PollNexusTaskQueueRequest{ + Namespace: env.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{Name: controlQueueName, Kind: enumspb.TASK_QUEUE_KIND_WORKER_COMMANDS}, + Identity: tv.WorkerIdentity(), + }) + if err == nil && resp != nil && resp.Request != nil { + nexusPollResp = resp + return true + } + return false + }, 120*time.Second, 100*time.Millisecond, "Timed out waiting for cancel command on control queue after reset") + + // Verify the cancel command + startOp := nexusPollResp.Request.GetStartOperation() + env.NotNil(startOp, "Expected StartOperation in Nexus request") + env.Equal("temporal.api.nexusservices.workerservice.v1.WorkerService", startOp.Service) + env.Equal("ExecuteCommands", startOp.Operation) + + var executeReq workerservicepb.ExecuteCommandsRequest + err = payload.Decode(startOp.Payload, &executeReq) + env.NoError(err) + env.Len(executeReq.Commands, 1, "Expected exactly 1 command") + cancelCmd := executeReq.Commands[0].GetCancelActivity() + env.NotNil(cancelCmd, "Expected CancelActivity command") + env.Equal(activityPollResp.TaskToken, cancelCmd.TaskToken, + "Cancel command task token must match the activity's original task token") + t.Log("SUCCESS: Received cancel command on control queue after workflow reset") +} + +// TestDispatchCancelOnWorkflowTimeout tests that when a workflow times out, +// cancel commands are dispatched for in-flight activities with a worker control queue. +// The activity is given a long timeout to ensure it is still in-flight when the workflow times out. +func TestDispatchCancelOnWorkflowTimeout(t *testing.T) { + env := testcore.NewEnv(t, testcore.WithDynamicConfig(dynamicconfig.EnableCancelActivityWorkerCommand, true)) + + ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) + defer cancel() + + tv := env.Tv() + poller := env.TaskPoller() + + controlQueueName := tv.ControlQueueName(env.Namespace().String()) + + // Start the workflow with a short execution timeout + _, err := env.FrontendClient().StartWorkflowExecution(ctx, &workflowservice.StartWorkflowExecutionRequest{ + RequestId: tv.Any().String(), + Namespace: env.Namespace().String(), + WorkflowId: tv.WorkflowID(), + WorkflowType: tv.WorkflowType(), + TaskQueue: tv.TaskQueue(), + WorkflowExecutionTimeout: durationpb.New(3 * time.Second), + WorkflowTaskTimeout: durationpb.New(10 * time.Second), + }) + env.NoError(err) + + // Schedule the activity + _, err = poller.PollAndHandleWorkflowTask(tv, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + return &workflowservice.RespondWorkflowTaskCompletedRequest{ + Commands: []*commandpb.Command{ + { + CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK, + Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{ + ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{ + ActivityId: tv.ActivityID(), + ActivityType: tv.ActivityType(), + TaskQueue: tv.TaskQueue(), + // Long timeout to ensure the activity is still in-flight when the workflow times out. + ScheduleToCloseTimeout: durationpb.New(5 * time.Minute), + StartToCloseTimeout: durationpb.New(5 * time.Minute), + }, + }, + }, + }, + }, nil + }) + env.NoError(err) + + // Poll for activity task and start it with a worker control queue + activityPollResp, err := env.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ + Namespace: env.Namespace().String(), + TaskQueue: tv.TaskQueue(), + Identity: tv.WorkerIdentity(), + WorkerInstanceKey: tv.WorkerInstanceKey(), + WorkerControlTaskQueue: controlQueueName, + }) + env.NoError(err) + env.NotNil(activityPollResp) + env.NotEmpty(activityPollResp.TaskToken) + + // Wait for the workflow to time out (3s timeout set above) + // Poll Nexus control queue - should receive cancel command after timeout + var nexusPollResp *workflowservice.PollNexusTaskQueueResponse + env.Eventually(func() bool { + pollCtx, pollCancel := context.WithTimeout(ctx, 5*time.Second) + defer pollCancel() + resp, err := env.FrontendClient().PollNexusTaskQueue(pollCtx, &workflowservice.PollNexusTaskQueueRequest{ + Namespace: env.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{Name: controlQueueName, Kind: enumspb.TASK_QUEUE_KIND_WORKER_COMMANDS}, + Identity: tv.WorkerIdentity(), + }) + if err == nil && resp != nil && resp.Request != nil { + nexusPollResp = resp + return true + } + return false + }, 120*time.Second, 100*time.Millisecond, "Timed out waiting for cancel command on control queue after workflow timeout") + + // Verify the cancel command + startOp := nexusPollResp.Request.GetStartOperation() + env.NotNil(startOp, "Expected StartOperation in Nexus request") + env.Equal("temporal.api.nexusservices.workerservice.v1.WorkerService", startOp.Service) + env.Equal("ExecuteCommands", startOp.Operation) + + var executeReq workerservicepb.ExecuteCommandsRequest + err = payload.Decode(startOp.Payload, &executeReq) + env.NoError(err) + env.Len(executeReq.Commands, 1, "Expected exactly 1 command") + cancelCmd := executeReq.Commands[0].GetCancelActivity() + env.NotNil(cancelCmd, "Expected CancelActivity command") + env.Equal(activityPollResp.TaskToken, cancelCmd.TaskToken, + "Cancel command task token must match the activity's original task token") + t.Log("SUCCESS: Received cancel command on control queue after workflow timeout") +} + +// TestDispatchCancelOnContinueAsNew tests that when a workflow continues-as-new, +// cancel commands are dispatched for in-flight activities from the old run. +// CaN creates a fresh mutable state — pending activities are abandoned, same as terminate. +func TestDispatchCancelOnContinueAsNew(t *testing.T) { + env := testcore.NewEnv(t, testcore.WithDynamicConfig(dynamicconfig.EnableCancelActivityWorkerCommand, true)) + + ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) + defer cancel() + + tv := env.Tv() + poller := env.TaskPoller() + + controlQueueName := tv.ControlQueueName(env.Namespace().String()) + + // Start the workflow + _, err := env.FrontendClient().StartWorkflowExecution(ctx, &workflowservice.StartWorkflowExecutionRequest{ + RequestId: tv.Any().String(), + Namespace: env.Namespace().String(), + WorkflowId: tv.WorkflowID(), + WorkflowType: tv.WorkflowType(), + TaskQueue: tv.TaskQueue(), + WorkflowExecutionTimeout: durationpb.New(60 * time.Second), + WorkflowTaskTimeout: durationpb.New(10 * time.Second), + }) + env.NoError(err) + + // Schedule the activity + _, err = poller.PollAndHandleWorkflowTask(tv, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + return &workflowservice.RespondWorkflowTaskCompletedRequest{ + Commands: []*commandpb.Command{ + { + CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK, + Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{ + ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{ + ActivityId: tv.ActivityID(), + ActivityType: tv.ActivityType(), + TaskQueue: tv.TaskQueue(), + // Long timeout to ensure the activity is still in-flight when CaN happens. + ScheduleToCloseTimeout: durationpb.New(5 * time.Minute), + StartToCloseTimeout: durationpb.New(5 * time.Minute), + }, + }, + }, + }, + }, nil + }) + env.NoError(err) + + // Poll for activity task and start it with a worker control queue + activityPollResp, err := env.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ + Namespace: env.Namespace().String(), + TaskQueue: tv.TaskQueue(), + Identity: tv.WorkerIdentity(), + WorkerInstanceKey: tv.WorkerInstanceKey(), + WorkerControlTaskQueue: controlQueueName, + }) + env.NoError(err) + env.NotNil(activityPollResp) + env.NotEmpty(activityPollResp.TaskToken) + + // Signal the workflow to trigger a new workflow task + _, err = env.FrontendClient().SignalWorkflowExecution(ctx, &workflowservice.SignalWorkflowExecutionRequest{ + Namespace: env.Namespace().String(), + WorkflowExecution: activityPollResp.WorkflowExecution, + SignalName: "trigger-can", + Identity: tv.WorkerIdentity(), + }) + env.NoError(err) + + // Handle the workflow task by issuing ContinueAsNew + _, err = poller.PollAndHandleWorkflowTask(tv, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + return &workflowservice.RespondWorkflowTaskCompletedRequest{ + Commands: []*commandpb.Command{ + { + CommandType: enumspb.COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION, + Attributes: &commandpb.Command_ContinueAsNewWorkflowExecutionCommandAttributes{ + ContinueAsNewWorkflowExecutionCommandAttributes: &commandpb.ContinueAsNewWorkflowExecutionCommandAttributes{ + WorkflowType: tv.WorkflowType(), + TaskQueue: tv.TaskQueue(), + WorkflowTaskTimeout: durationpb.New(10 * time.Second), + WorkflowRunTimeout: durationpb.New(60 * time.Second), + }, + }, + }, + }, + }, nil + }) + env.NoError(err) + + // Poll Nexus control queue - should receive cancel command for the in-flight activity + var nexusPollResp *workflowservice.PollNexusTaskQueueResponse + env.Eventually(func() bool { + pollCtx, pollCancel := context.WithTimeout(ctx, 5*time.Second) + defer pollCancel() + resp, err := env.FrontendClient().PollNexusTaskQueue(pollCtx, &workflowservice.PollNexusTaskQueueRequest{ + Namespace: env.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{Name: controlQueueName, Kind: enumspb.TASK_QUEUE_KIND_WORKER_COMMANDS}, + Identity: tv.WorkerIdentity(), + }) + if err == nil && resp != nil && resp.Request != nil { + nexusPollResp = resp + return true + } + return false + }, 120*time.Second, 100*time.Millisecond, "Timed out waiting for cancel command on control queue after continue-as-new") + + // Verify the cancel command + startOp := nexusPollResp.Request.GetStartOperation() + env.NotNil(startOp, "Expected StartOperation in Nexus request") + env.Equal("temporal.api.nexusservices.workerservice.v1.WorkerService", startOp.Service) + env.Equal("ExecuteCommands", startOp.Operation) + + var executeReq workerservicepb.ExecuteCommandsRequest + err = payload.Decode(startOp.Payload, &executeReq) + env.NoError(err) + env.Len(executeReq.Commands, 1, "Expected exactly 1 command") + cancelCmd := executeReq.Commands[0].GetCancelActivity() + env.NotNil(cancelCmd, "Expected CancelActivity command") + env.Equal(activityPollResp.TaskToken, cancelCmd.TaskToken, + "Cancel command task token must match the activity's original task token") + t.Log("SUCCESS: Received cancel command on control queue after continue-as-new") +}