From 69daa0c35feb4e391943226b882520cd2880bb91 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Mon, 4 May 2026 11:07:37 -0700 Subject: [PATCH 01/12] Proactively cancel activities on workflow termination and timeout When a workflow is terminated or times out, dispatch cancel commands to workers for in-flight activities that registered a control queue. Extends the mechanism from #9233 to cover forceful workflow close scenarios (including reset, which terminates the old run). Guarded by EnableCancelActivityWorkerCommand. Disabled by default. Co-Authored-By: Claude Opus 4.6 --- service/history/interfaces/mutable_state.go | 1 + .../history/interfaces/mutable_state_mock.go | 14 + service/history/ndc/events_reapplier_test.go | 1 + service/history/ndc/workflow_resetter_test.go | 2 + .../history/workflow/mutable_state_impl.go | 68 ++++ .../workflow/mutable_state_impl_test.go | 158 +++++++++ service/history/workflow/util.go | 13 +- tests/worker_commands_task_test.go | 329 ++++++++++++++++++ 8 files changed, 584 insertions(+), 2 deletions(-) diff --git a/service/history/interfaces/mutable_state.go b/service/history/interfaces/mutable_state.go index 40db09ed5f2..bae87c91f42 100644 --- a/service/history/interfaces/mutable_state.go +++ b/service/history/interfaces/mutable_state.go @@ -48,6 +48,7 @@ type ( AddActivityTaskCancelRequestedEvent(int64, int64, string) (*historypb.HistoryEvent, *persistencespb.ActivityInfo, error) AddActivityTaskCanceledEvent(int64, int64, int64, *commonpb.Payloads, string) (*historypb.HistoryEvent, error) AddWorkerCommandsTasks(commands []*workerpb.WorkerCommand, controlQueue string) error + GenerateActivityCancelCommandsForClose() error AddActivityTaskCompletedEvent(int64, int64, *workflowservice.RespondActivityTaskCompletedRequest) (*historypb.HistoryEvent, error) AddActivityTaskFailedEvent(int64, int64, *failurepb.Failure, enumspb.RetryState, string, *commonpb.WorkerVersionStamp) (*historypb.HistoryEvent, error) AddActivityTaskScheduledEvent(int64, *commandpb.ScheduleActivityTaskCommandAttributes, bool) (*historypb.HistoryEvent, *persistencespb.ActivityInfo, error) diff --git a/service/history/interfaces/mutable_state_mock.go b/service/history/interfaces/mutable_state_mock.go index 741f37e46e5..512e5ea002d 100644 --- a/service/history/interfaces/mutable_state_mock.go +++ b/service/history/interfaces/mutable_state_mock.go @@ -647,6 +647,20 @@ func (mr *MockMutableStateMockRecorder) AddWorkerCommandsTasks(commands, control return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddWorkerCommandsTasks", reflect.TypeOf((*MockMutableState)(nil).AddWorkerCommandsTasks), commands, controlQueue) } +// GenerateActivityCancelCommandsForClose mocks base method. +func (m *MockMutableState) GenerateActivityCancelCommandsForClose() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GenerateActivityCancelCommandsForClose") + ret0, _ := ret[0].(error) + return ret0 +} + +// GenerateActivityCancelCommandsForClose indicates an expected call of GenerateActivityCancelCommandsForClose. +func (mr *MockMutableStateMockRecorder) GenerateActivityCancelCommandsForClose() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GenerateActivityCancelCommandsForClose", reflect.TypeOf((*MockMutableState)(nil).GenerateActivityCancelCommandsForClose)) +} + // AddWorkflowExecutionCancelRequestedEvent mocks base method. func (m *MockMutableState) AddWorkflowExecutionCancelRequestedEvent(arg0 *historyservice.RequestCancelWorkflowExecutionRequest) (*history.HistoryEvent, error) { m.ctrl.T.Helper() diff --git a/service/history/ndc/events_reapplier_test.go b/service/history/ndc/events_reapplier_test.go index ce3c4b88a43..4ecfb3b9630 100644 --- a/service/history/ndc/events_reapplier_test.go +++ b/service/history/ndc/events_reapplier_test.go @@ -459,6 +459,7 @@ func (s *nDCEventReapplicationSuite) TestReapplyEvents_AppliedEvent_Termination( false, nil, ).Return(nil, nil) + msCurrent.EXPECT().GenerateActivityCancelCommandsForClose().Return(nil) events := []*historypb.HistoryEvent{ {EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED}, event, diff --git a/service/history/ndc/workflow_resetter_test.go b/service/history/ndc/workflow_resetter_test.go index 6a2b1ee1014..a0c1a639eaf 100644 --- a/service/history/ndc/workflow_resetter_test.go +++ b/service/history/ndc/workflow_resetter_test.go @@ -561,6 +561,7 @@ func (s *workflowResetterSuite) TestTerminateWorkflow() { false, nil, ).Return(&historypb.HistoryEvent{}, nil) + mutableState.EXPECT().GenerateActivityCancelCommandsForClose().Return(nil) err := s.workflowResetter.terminateWorkflow(mutableState, terminateReason) s.NoError(err) @@ -1255,6 +1256,7 @@ func (s *workflowResetterSuite) TestReapplyEvents() { false, event.Links, ).Return(&historypb.HistoryEvent{}, nil) + ms.EXPECT().GenerateActivityCancelCommandsForClose().Return(nil) } } } diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 2513b30643c..f488d7434b3 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -61,6 +61,7 @@ import ( "go.temporal.io/server/common/searchattribute/sadefs" serviceerrors "go.temporal.io/server/common/serviceerror" "go.temporal.io/server/common/softassert" + "go.temporal.io/server/common/tasktoken" "go.temporal.io/server/common/util" "go.temporal.io/server/common/worker_versioning" "go.temporal.io/server/components/callbacks" @@ -4556,6 +4557,73 @@ func (ms *MutableStateImpl) AddWorkerCommandsTasks(commands []*workerpb.WorkerCo return ms.taskGenerator.GenerateWorkerCommandsTasks(commands, controlQueue) } +// GenerateActivityCancelCommandsForClose generates WorkerCommandsTasks to cancel all +// in-flight activities that have a worker control queue. Called when the workflow is being +// terminated (or otherwise forcefully closed) to proactively notify workers. +func (ms *MutableStateImpl) GenerateActivityCancelCommandsForClose() error { + if !ms.config.EnableCancelActivityWorkerCommand() { + return nil + } + + serializer := tasktoken.NewSerializer() + wfKey := ms.GetWorkflowKey() + nsID := ms.GetNamespaceEntry().ID().String() + + commandsByQueue := make(map[string][]*workerpb.WorkerCommand) + for _, ai := range ms.pendingActivityInfoIDs { + // No control queue means the activity was started before this feature was deployed. + if ai.WorkerControlTaskQueue == "" { + continue + } + if ai.StartedClock == nil { + // StartedClock may be nil for activities started before this feature was deployed. + // Skip cancel command; the activity will time out normally. + ms.logger.Debug("Skipping worker cancel command: activity missing StartedClock (pre-deploy)", + tag.WorkflowNamespaceID(wfKey.NamespaceID), + tag.WorkflowID(wfKey.WorkflowID), + tag.WorkflowRunID(wfKey.RunID), + tag.WorkflowScheduledEventID(ai.ScheduledEventId), + ) + continue + } + + taskToken, err := serializer.Serialize(tasktoken.NewActivityTaskToken( + nsID, + wfKey.WorkflowID, + wfKey.RunID, + ai.ScheduledEventId, + ai.ActivityId, + ai.ActivityType.GetName(), + ai.Attempt, + ai.StartedClock, + ai.Version, + ai.StartVersion, + nil, + )) + if err != nil { + return err + } + + commandsByQueue[ai.WorkerControlTaskQueue] = append( + commandsByQueue[ai.WorkerControlTaskQueue], + &workerpb.WorkerCommand{ + Type: &workerpb.WorkerCommand_CancelActivity{ + CancelActivity: &workerpb.CancelActivityCommand{ + TaskToken: taskToken, + }, + }, + }, + ) + } + + for controlQueue, commands := range commandsByQueue { + if err := ms.AddWorkerCommandsTasks(commands, controlQueue); err != nil { + return err + } + } + return nil +} + func (ms *MutableStateImpl) ApplyActivityTaskCancelRequestedEvent( event *historypb.HistoryEvent, ) error { diff --git a/service/history/workflow/mutable_state_impl_test.go b/service/history/workflow/mutable_state_impl_test.go index 955cba32f2a..d23cebd30ae 100644 --- a/service/history/workflow/mutable_state_impl_test.go +++ b/service/history/workflow/mutable_state_impl_test.go @@ -7285,3 +7285,161 @@ func (s *mutableStateSuite) TestApplyWorkflowExecutionOptionsUpdatedEvent_TimeSk }) } } + +func TestGenerateActivityCancelCommandsForClose(t *testing.T) { + t.Parallel() + + startedClock := &clockspb.VectorClock{ShardId: 1, Clock: 100} + + testCases := []struct { + name string + featureEnabled bool + activities map[int64]*persistencespb.ActivityInfo + expectedQueues map[string]int // controlQueue -> expected command count + expectedNoTasks bool + }{ + { + name: "activities with control queue and started clock", + featureEnabled: true, + activities: map[int64]*persistencespb.ActivityInfo{ + 1: { + ScheduledEventId: 1, + ActivityId: "act-1", + ActivityType: &commonpb.ActivityType{Name: "type1"}, + WorkerControlTaskQueue: "control-queue-1", + StartedClock: startedClock, + Attempt: 1, + }, + }, + expectedQueues: map[string]int{"control-queue-1": 1}, + }, + { + name: "skips activities without control queue", + featureEnabled: true, + activities: map[int64]*persistencespb.ActivityInfo{ + 1: { + ScheduledEventId: 1, + ActivityId: "act-1", + ActivityType: &commonpb.ActivityType{Name: "type1"}, + StartedClock: startedClock, + Attempt: 1, + }, + }, + expectedNoTasks: true, + }, + { + name: "skips activities without started clock", + featureEnabled: true, + activities: map[int64]*persistencespb.ActivityInfo{ + 1: { + ScheduledEventId: 1, + ActivityId: "act-1", + ActivityType: &commonpb.ActivityType{Name: "type1"}, + WorkerControlTaskQueue: "control-queue-1", + Attempt: 1, + }, + }, + expectedNoTasks: true, + }, + { + name: "multiple activities batched by control queue", + featureEnabled: true, + activities: map[int64]*persistencespb.ActivityInfo{ + 1: { + ScheduledEventId: 1, + ActivityId: "act-1", + ActivityType: &commonpb.ActivityType{Name: "type1"}, + WorkerControlTaskQueue: "queue-A", + StartedClock: startedClock, + Attempt: 1, + }, + 2: { + ScheduledEventId: 2, + ActivityId: "act-2", + ActivityType: &commonpb.ActivityType{Name: "type2"}, + WorkerControlTaskQueue: "queue-A", + StartedClock: startedClock, + Attempt: 1, + }, + 3: { + ScheduledEventId: 3, + ActivityId: "act-3", + ActivityType: &commonpb.ActivityType{Name: "type3"}, + WorkerControlTaskQueue: "queue-B", + StartedClock: startedClock, + Attempt: 1, + }, + }, + expectedQueues: map[string]int{"queue-A": 2, "queue-B": 1}, + }, + { + name: "feature flag disabled - no tasks generated", + featureEnabled: false, + activities: map[int64]*persistencespb.ActivityInfo{ + 1: { + ScheduledEventId: 1, + ActivityId: "act-1", + ActivityType: &commonpb.ActivityType{Name: "type1"}, + WorkerControlTaskQueue: "control-queue-1", + StartedClock: startedClock, + Attempt: 1, + }, + }, + expectedNoTasks: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockEventsCache := events.NewMockCache(ctrl) + mockConfig := tests.NewDynamicConfig() + mockConfig.EnableCancelActivityWorkerCommand = dynamicconfig.GetBoolPropertyFn(tc.featureEnabled) + + mockShard := shard.NewTestContext( + ctrl, + &persistencespb.ShardInfo{ShardId: 0, RangeId: 1}, + mockConfig, + ) + defer mockShard.StopForTest() + reg := hsm.NewRegistry() + require.NoError(t, RegisterStateMachine(reg)) + require.NoError(t, callbacks.RegisterStateMachine(reg)) + require.NoError(t, nexusoperations.RegisterStateMachines(reg)) + mockShard.SetStateMachineRegistry(reg) + mockShard.SetEventsCacheForTesting(mockEventsCache) + + namespaceEntry := tests.GlobalNamespaceEntry + mockShard.Resource.NamespaceCache.EXPECT().GetNamespaceByID(tests.NamespaceID).Return(namespaceEntry, nil).AnyTimes() + mockShard.Resource.ClusterMetadata.EXPECT().ClusterNameForFailoverVersion(gomock.Any(), gomock.Any()).Return(cluster.TestCurrentClusterName).AnyTimes() + mockShard.Resource.ClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes() + mockShard.Resource.ClusterMetadata.EXPECT().GetClusterID().Return(int64(1)).AnyTimes() + + ms := NewMutableState(mockShard, mockEventsCache, log.NewTestLogger(), namespaceEntry, tests.WorkflowID, tests.RunID, time.Now().UTC()) + ms.pendingActivityInfoIDs = tc.activities + + err := ms.GenerateActivityCancelCommandsForClose() + require.NoError(t, err) + + if tc.expectedNoTasks { + require.Empty(t, ms.InsertTasks[tasks.CategoryOutbound]) + return + } + + // Verify tasks were generated by checking outbound task messages + var workerCommandTasks []*tasks.WorkerCommandsTask + for _, task := range ms.InsertTasks[tasks.CategoryOutbound] { + if wct, ok := task.(*tasks.WorkerCommandsTask); ok { + workerCommandTasks = append(workerCommandTasks, wct) + } + } + + // Verify each expected queue got the right number of commands + tasksByQueue := make(map[string]int) + for _, wct := range workerCommandTasks { + tasksByQueue[wct.Destination] = len(wct.Commands) + } + require.Equal(t, tc.expectedQueues, tasksByQueue) + }) + } +} diff --git a/service/history/workflow/util.go b/service/history/workflow/util.go index cb5091dfc44..cea282db095 100644 --- a/service/history/workflow/util.go +++ b/service/history/workflow/util.go @@ -95,7 +95,12 @@ func TimeoutWorkflow( retryState, continuedRunID, ) - return err + if err != nil { + return err + } + + // Proactively cancel in-flight activities so they don't run uselessly after the workflow is closed. + return mutableState.GenerateActivityCancelCommandsForClose() } // TerminateWorkflow will write a WorkflowExecutionTerminated event with a fresh @@ -142,8 +147,12 @@ func TerminateWorkflow( deleteAfterTerminate, links, ) + if err != nil { + return err + } - return err + // Proactively cancel in-flight activities so they don't run uselessly after the workflow is closed. + return mutableState.GenerateActivityCancelCommandsForClose() } // FindAutoResetPoint returns the auto reset point diff --git a/tests/worker_commands_task_test.go b/tests/worker_commands_task_test.go index 187062797b5..e172b66d6f2 100644 --- a/tests/worker_commands_task_test.go +++ b/tests/worker_commands_task_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/google/uuid" commandpb "go.temporal.io/api/command/v1" commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" @@ -163,3 +164,331 @@ func TestDispatchCancelToWorker(t *testing.T) { "Cancel command task token must match the activity's original task token") t.Log("SUCCESS: Received ExecuteCommands Nexus request on control queue with matching CancelActivity task token") } + +// TestDispatchCancelOnWorkflowTermination tests that when a workflow is terminated, +// the server proactively dispatches cancel commands for in-flight activities +// that have a worker control queue. The activity is given a long timeout to +// ensure it is still in-flight when the workflow is terminated. +func TestDispatchCancelOnWorkflowTermination(t *testing.T) { + env := testcore.NewEnv(t, testcore.WithDynamicConfig(dynamicconfig.EnableCancelActivityWorkerCommand, true)) + + ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) + defer cancel() + + tv := env.Tv() + poller := env.TaskPoller() + + controlQueueName := tv.ControlQueueName(env.Namespace().String()) + + // Start the workflow + _, err := env.FrontendClient().StartWorkflowExecution(ctx, &workflowservice.StartWorkflowExecutionRequest{ + RequestId: tv.Any().String(), + Namespace: env.Namespace().String(), + WorkflowId: tv.WorkflowID(), + WorkflowType: tv.WorkflowType(), + TaskQueue: tv.TaskQueue(), + WorkflowExecutionTimeout: durationpb.New(60 * time.Second), + WorkflowTaskTimeout: durationpb.New(10 * time.Second), + }) + env.NoError(err) + + // Schedule the activity + _, err = poller.PollAndHandleWorkflowTask(tv, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + return &workflowservice.RespondWorkflowTaskCompletedRequest{ + Commands: []*commandpb.Command{ + { + CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK, + Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{ + ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{ + ActivityId: tv.ActivityID(), + ActivityType: tv.ActivityType(), + TaskQueue: tv.TaskQueue(), + // Long timeout to ensure the activity is still in-flight when the workflow is terminated. + ScheduleToCloseTimeout: durationpb.New(5 * time.Minute), + StartToCloseTimeout: durationpb.New(5 * time.Minute), + }, + }, + }, + }, + }, nil + }) + env.NoError(err) + + // Poll for activity task and start it with a worker control queue + activityPollResp, err := env.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ + Namespace: env.Namespace().String(), + TaskQueue: tv.TaskQueue(), + Identity: tv.WorkerIdentity(), + WorkerInstanceKey: tv.WorkerInstanceKey(), + WorkerControlTaskQueue: controlQueueName, + }) + env.NoError(err) + env.NotNil(activityPollResp) + env.NotEmpty(activityPollResp.TaskToken) + + // Terminate the workflow directly (no cancellation request, no workflow task) + _, err = env.FrontendClient().TerminateWorkflowExecution(ctx, &workflowservice.TerminateWorkflowExecutionRequest{ + Namespace: env.Namespace().String(), + WorkflowExecution: activityPollResp.WorkflowExecution, + Reason: "test termination", + }) + env.NoError(err) + + // Poll Nexus control queue - should receive cancel command for the in-flight activity + var nexusPollResp *workflowservice.PollNexusTaskQueueResponse + env.Eventually(func() bool { + pollCtx, pollCancel := context.WithTimeout(ctx, 5*time.Second) + defer pollCancel() + resp, err := env.FrontendClient().PollNexusTaskQueue(pollCtx, &workflowservice.PollNexusTaskQueueRequest{ + Namespace: env.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{Name: controlQueueName, Kind: enumspb.TASK_QUEUE_KIND_WORKER_COMMANDS}, + Identity: tv.WorkerIdentity(), + }) + if err == nil && resp != nil && resp.Request != nil { + nexusPollResp = resp + return true + } + return false + }, 120*time.Second, 100*time.Millisecond, "Timed out waiting for cancel command on control queue after termination") + + // Verify the cancel command + startOp := nexusPollResp.Request.GetStartOperation() + env.NotNil(startOp, "Expected StartOperation in Nexus request") + env.Equal("temporal.api.nexusservices.workerservice.v1.WorkerService", startOp.Service) + env.Equal("ExecuteCommands", startOp.Operation) + + var executeReq workerservicepb.ExecuteCommandsRequest + err = payload.Decode(startOp.Payload, &executeReq) + env.NoError(err) + env.Len(executeReq.Commands, 1, "Expected exactly 1 command") + cancelCmd := executeReq.Commands[0].GetCancelActivity() + env.NotNil(cancelCmd, "Expected CancelActivity command") + env.Equal(activityPollResp.TaskToken, cancelCmd.TaskToken, + "Cancel command task token must match the activity's original task token") + t.Log("SUCCESS: Received cancel command on control queue after workflow termination") +} + +// TestDispatchCancelOnWorkflowReset tests that when a workflow is reset, +// the old run is terminated and cancel commands are dispatched for its in-flight activities. +// The activity is given a long timeout to ensure it is still in-flight when the workflow is reset. +// Note: reset internally calls TerminateWorkflow, so this exercises the same code path as +// TestDispatchCancelOnWorkflowTermination — it verifies the plumbing rather than a distinct branch. +func TestDispatchCancelOnWorkflowReset(t *testing.T) { + env := testcore.NewEnv(t, testcore.WithDynamicConfig(dynamicconfig.EnableCancelActivityWorkerCommand, true)) + + ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) + defer cancel() + + tv := env.Tv() + poller := env.TaskPoller() + + controlQueueName := tv.ControlQueueName(env.Namespace().String()) + + // Start the workflow + startResp, err := env.FrontendClient().StartWorkflowExecution(ctx, &workflowservice.StartWorkflowExecutionRequest{ + RequestId: tv.Any().String(), + Namespace: env.Namespace().String(), + WorkflowId: tv.WorkflowID(), + WorkflowType: tv.WorkflowType(), + TaskQueue: tv.TaskQueue(), + WorkflowExecutionTimeout: durationpb.New(60 * time.Second), + WorkflowTaskTimeout: durationpb.New(10 * time.Second), + }) + env.NoError(err) + + // Schedule the activity + _, err = poller.PollAndHandleWorkflowTask(tv, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + return &workflowservice.RespondWorkflowTaskCompletedRequest{ + Commands: []*commandpb.Command{ + { + CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK, + Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{ + ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{ + ActivityId: tv.ActivityID(), + ActivityType: tv.ActivityType(), + TaskQueue: tv.TaskQueue(), + // Long timeout to ensure the activity is still in-flight when the workflow is reset. + ScheduleToCloseTimeout: durationpb.New(5 * time.Minute), + StartToCloseTimeout: durationpb.New(5 * time.Minute), + }, + }, + }, + }, + }, nil + }) + env.NoError(err) + + // Poll for activity task and start it with a worker control queue + activityPollResp, err := env.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ + Namespace: env.Namespace().String(), + TaskQueue: tv.TaskQueue(), + Identity: tv.WorkerIdentity(), + WorkerInstanceKey: tv.WorkerInstanceKey(), + WorkerControlTaskQueue: controlQueueName, + }) + env.NoError(err) + env.NotNil(activityPollResp) + env.NotEmpty(activityPollResp.TaskToken) + + // Find the WFT completed event ID for the reset point + histResp, err := env.FrontendClient().GetWorkflowExecutionHistory(ctx, &workflowservice.GetWorkflowExecutionHistoryRequest{ + Namespace: env.Namespace().String(), + Execution: &commonpb.WorkflowExecution{ + WorkflowId: tv.WorkflowID(), + RunId: startResp.RunId, + }, + }) + env.NoError(err) + var wftCompletedEventID int64 + for _, event := range histResp.History.Events { + if event.EventType == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED { + wftCompletedEventID = event.EventId + break + } + } + env.NotZero(wftCompletedEventID) + + // Reset the workflow — this terminates the current run + _, err = env.FrontendClient().ResetWorkflowExecution(ctx, &workflowservice.ResetWorkflowExecutionRequest{ + Namespace: env.Namespace().String(), + WorkflowExecution: &commonpb.WorkflowExecution{ + WorkflowId: tv.WorkflowID(), + RunId: startResp.RunId, + }, + Reason: "test reset", + RequestId: uuid.NewString(), + WorkflowTaskFinishEventId: wftCompletedEventID, + }) + env.NoError(err) + + // Poll Nexus control queue - should receive cancel command for the in-flight activity + var nexusPollResp *workflowservice.PollNexusTaskQueueResponse + env.Eventually(func() bool { + pollCtx, pollCancel := context.WithTimeout(ctx, 5*time.Second) + defer pollCancel() + resp, err := env.FrontendClient().PollNexusTaskQueue(pollCtx, &workflowservice.PollNexusTaskQueueRequest{ + Namespace: env.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{Name: controlQueueName, Kind: enumspb.TASK_QUEUE_KIND_WORKER_COMMANDS}, + Identity: tv.WorkerIdentity(), + }) + if err == nil && resp != nil && resp.Request != nil { + nexusPollResp = resp + return true + } + return false + }, 120*time.Second, 100*time.Millisecond, "Timed out waiting for cancel command on control queue after reset") + + // Verify the cancel command + startOp := nexusPollResp.Request.GetStartOperation() + env.NotNil(startOp, "Expected StartOperation in Nexus request") + env.Equal("temporal.api.nexusservices.workerservice.v1.WorkerService", startOp.Service) + env.Equal("ExecuteCommands", startOp.Operation) + + var executeReq workerservicepb.ExecuteCommandsRequest + err = payload.Decode(startOp.Payload, &executeReq) + env.NoError(err) + env.Len(executeReq.Commands, 1, "Expected exactly 1 command") + cancelCmd := executeReq.Commands[0].GetCancelActivity() + env.NotNil(cancelCmd, "Expected CancelActivity command") + env.Equal(activityPollResp.TaskToken, cancelCmd.TaskToken, + "Cancel command task token must match the activity's original task token") + t.Log("SUCCESS: Received cancel command on control queue after workflow reset") +} + +// TestDispatchCancelOnWorkflowTimeout tests that when a workflow times out, +// cancel commands are dispatched for in-flight activities with a worker control queue. +// The activity is given a long timeout to ensure it is still in-flight when the workflow times out. +func TestDispatchCancelOnWorkflowTimeout(t *testing.T) { + env := testcore.NewEnv(t, testcore.WithDynamicConfig(dynamicconfig.EnableCancelActivityWorkerCommand, true)) + + ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) + defer cancel() + + tv := env.Tv() + poller := env.TaskPoller() + + controlQueueName := tv.ControlQueueName(env.Namespace().String()) + + // Start the workflow with a short execution timeout + _, err := env.FrontendClient().StartWorkflowExecution(ctx, &workflowservice.StartWorkflowExecutionRequest{ + RequestId: tv.Any().String(), + Namespace: env.Namespace().String(), + WorkflowId: tv.WorkflowID(), + WorkflowType: tv.WorkflowType(), + TaskQueue: tv.TaskQueue(), + WorkflowExecutionTimeout: durationpb.New(3 * time.Second), + WorkflowTaskTimeout: durationpb.New(10 * time.Second), + }) + env.NoError(err) + + // Schedule the activity + _, err = poller.PollAndHandleWorkflowTask(tv, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + return &workflowservice.RespondWorkflowTaskCompletedRequest{ + Commands: []*commandpb.Command{ + { + CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK, + Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{ + ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{ + ActivityId: tv.ActivityID(), + ActivityType: tv.ActivityType(), + TaskQueue: tv.TaskQueue(), + // Long timeout to ensure the activity is still in-flight when the workflow times out. + ScheduleToCloseTimeout: durationpb.New(5 * time.Minute), + StartToCloseTimeout: durationpb.New(5 * time.Minute), + }, + }, + }, + }, + }, nil + }) + env.NoError(err) + + // Poll for activity task and start it with a worker control queue + activityPollResp, err := env.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ + Namespace: env.Namespace().String(), + TaskQueue: tv.TaskQueue(), + Identity: tv.WorkerIdentity(), + WorkerInstanceKey: tv.WorkerInstanceKey(), + WorkerControlTaskQueue: controlQueueName, + }) + env.NoError(err) + env.NotNil(activityPollResp) + env.NotEmpty(activityPollResp.TaskToken) + + // Wait for the workflow to time out (3s timeout set above) + // Poll Nexus control queue - should receive cancel command after timeout + var nexusPollResp *workflowservice.PollNexusTaskQueueResponse + env.Eventually(func() bool { + pollCtx, pollCancel := context.WithTimeout(ctx, 5*time.Second) + defer pollCancel() + resp, err := env.FrontendClient().PollNexusTaskQueue(pollCtx, &workflowservice.PollNexusTaskQueueRequest{ + Namespace: env.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{Name: controlQueueName, Kind: enumspb.TASK_QUEUE_KIND_WORKER_COMMANDS}, + Identity: tv.WorkerIdentity(), + }) + if err == nil && resp != nil && resp.Request != nil { + nexusPollResp = resp + return true + } + return false + }, 120*time.Second, 100*time.Millisecond, "Timed out waiting for cancel command on control queue after workflow timeout") + + // Verify the cancel command + startOp := nexusPollResp.Request.GetStartOperation() + env.NotNil(startOp, "Expected StartOperation in Nexus request") + env.Equal("temporal.api.nexusservices.workerservice.v1.WorkerService", startOp.Service) + env.Equal("ExecuteCommands", startOp.Operation) + + var executeReq workerservicepb.ExecuteCommandsRequest + err = payload.Decode(startOp.Payload, &executeReq) + env.NoError(err) + env.Len(executeReq.Commands, 1, "Expected exactly 1 command") + cancelCmd := executeReq.Commands[0].GetCancelActivity() + env.NotNil(cancelCmd, "Expected CancelActivity command") + env.Equal(activityPollResp.TaskToken, cancelCmd.TaskToken, + "Cancel command task token must match the activity's original task token") + t.Log("SUCCESS: Received cancel command on control queue after workflow timeout") +} From d00bac14f85db49c1febde002c04e39e10973de3 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Fri, 8 May 2026 11:40:39 -0700 Subject: [PATCH 02/12] Regenerate mutable_state_mock.go to fix alphabetical ordering Co-Authored-By: Claude Opus 4.6 --- .../history/interfaces/mutable_state_mock.go | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/service/history/interfaces/mutable_state_mock.go b/service/history/interfaces/mutable_state_mock.go index 0a8849356a3..9843b19c7ba 100644 --- a/service/history/interfaces/mutable_state_mock.go +++ b/service/history/interfaces/mutable_state_mock.go @@ -647,20 +647,6 @@ func (mr *MockMutableStateMockRecorder) AddWorkerCommandsTasks(commands, control return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddWorkerCommandsTasks", reflect.TypeOf((*MockMutableState)(nil).AddWorkerCommandsTasks), commands, controlQueue) } -// GenerateActivityCancelCommandsForClose mocks base method. -func (m *MockMutableState) GenerateActivityCancelCommandsForClose() error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GenerateActivityCancelCommandsForClose") - ret0, _ := ret[0].(error) - return ret0 -} - -// GenerateActivityCancelCommandsForClose indicates an expected call of GenerateActivityCancelCommandsForClose. -func (mr *MockMutableStateMockRecorder) GenerateActivityCancelCommandsForClose() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GenerateActivityCancelCommandsForClose", reflect.TypeOf((*MockMutableState)(nil).GenerateActivityCancelCommandsForClose)) -} - // AddWorkflowExecutionCancelRequestedEvent mocks base method. func (m *MockMutableState) AddWorkflowExecutionCancelRequestedEvent(arg0 *historyservice.RequestCancelWorkflowExecutionRequest) (*history.HistoryEvent, error) { m.ctrl.T.Helper() @@ -1986,6 +1972,20 @@ func (mr *MockMutableStateMockRecorder) FlushBufferedEvents() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FlushBufferedEvents", reflect.TypeOf((*MockMutableState)(nil).FlushBufferedEvents)) } +// GenerateActivityCancelCommandsForClose mocks base method. +func (m *MockMutableState) GenerateActivityCancelCommandsForClose() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GenerateActivityCancelCommandsForClose") + ret0, _ := ret[0].(error) + return ret0 +} + +// GenerateActivityCancelCommandsForClose indicates an expected call of GenerateActivityCancelCommandsForClose. +func (mr *MockMutableStateMockRecorder) GenerateActivityCancelCommandsForClose() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GenerateActivityCancelCommandsForClose", reflect.TypeOf((*MockMutableState)(nil).GenerateActivityCancelCommandsForClose)) +} + // GenerateMigrationTasks mocks base method. func (m *MockMutableState) GenerateMigrationTasks(targetClusters []string) ([]tasks.Task, int64, error) { m.ctrl.T.Helper() From 510bc2b68659b8874f49cfa83ac7773c85bda06c Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Tue, 12 May 2026 14:35:42 -0700 Subject: [PATCH 03/12] Move cancel command generation into GenerateWorkflowCloseTasks Cancel commands for in-flight activities are now generated from GenerateWorkflowCloseTasks instead of being called explicitly from TerminateWorkflow and TimeoutWorkflow. This covers all workflow close paths (complete, fail, timeout, cancel, terminate, continue-as-new) with a single call site. Co-Authored-By: Claude Opus 4.6 --- service/history/ndc/events_reapplier_test.go | 1 - service/history/ndc/workflow_resetter_test.go | 2 -- service/history/workflow/mutable_state_impl.go | 4 ++-- service/history/workflow/task_generator.go | 3 ++- service/history/workflow/task_generator_test.go | 2 ++ service/history/workflow/util.go | 6 ++---- 6 files changed, 8 insertions(+), 10 deletions(-) diff --git a/service/history/ndc/events_reapplier_test.go b/service/history/ndc/events_reapplier_test.go index 4ecfb3b9630..ce3c4b88a43 100644 --- a/service/history/ndc/events_reapplier_test.go +++ b/service/history/ndc/events_reapplier_test.go @@ -459,7 +459,6 @@ func (s *nDCEventReapplicationSuite) TestReapplyEvents_AppliedEvent_Termination( false, nil, ).Return(nil, nil) - msCurrent.EXPECT().GenerateActivityCancelCommandsForClose().Return(nil) events := []*historypb.HistoryEvent{ {EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED}, event, diff --git a/service/history/ndc/workflow_resetter_test.go b/service/history/ndc/workflow_resetter_test.go index 40878dd9ebe..21118ce1f45 100644 --- a/service/history/ndc/workflow_resetter_test.go +++ b/service/history/ndc/workflow_resetter_test.go @@ -562,7 +562,6 @@ func (s *workflowResetterSuite) TestTerminateWorkflow() { false, nil, ).Return(&historypb.HistoryEvent{}, nil) - mutableState.EXPECT().GenerateActivityCancelCommandsForClose().Return(nil) err := s.workflowResetter.terminateWorkflow(mutableState, terminateReason) s.NoError(err) @@ -1257,7 +1256,6 @@ func (s *workflowResetterSuite) TestReapplyEvents() { false, event.Links, ).Return(&historypb.HistoryEvent{}, nil) - ms.EXPECT().GenerateActivityCancelCommandsForClose().Return(nil) } } } diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 0b58aa97d36..999c4a773ce 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -4538,8 +4538,8 @@ func (ms *MutableStateImpl) AddWorkerCommandsTasks(commands []*workerpb.WorkerCo } // GenerateActivityCancelCommandsForClose generates WorkerCommandsTasks to cancel all -// in-flight activities that have a worker control queue. Called when the workflow is being -// terminated (or otherwise forcefully closed) to proactively notify workers. +// in-flight activities that have a worker control queue. Called from GenerateWorkflowCloseTasks +// on every workflow close path to proactively notify workers. func (ms *MutableStateImpl) GenerateActivityCancelCommandsForClose() error { if !ms.config.EnableCancelActivityWorkerCommand() { return nil diff --git a/service/history/workflow/task_generator.go b/service/history/workflow/task_generator.go index 2020be905fc..2aeb97f8798 100644 --- a/service/history/workflow/task_generator.go +++ b/service/history/workflow/task_generator.go @@ -271,7 +271,8 @@ func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks( r.mutableState.AddTasks(closeTasks...) - return nil + // Proactively cancel in-flight activities so they don't run uselessly after the workflow is closed. + return r.mutableState.GenerateActivityCancelCommandsForClose() } // getRetention returns the retention period for this task generator's workflow execution. diff --git a/service/history/workflow/task_generator_test.go b/service/history/workflow/task_generator_test.go index 507ead3e239..a211b6d9ad1 100644 --- a/service/history/workflow/task_generator_test.go +++ b/service/history/workflow/task_generator_test.go @@ -261,6 +261,8 @@ func TestTaskGeneratorImpl_GenerateWorkflowCloseTasks(t *testing.T) { return cfg }).AnyTimes() + mutableState.EXPECT().GenerateActivityCancelCommandsForClose().Return(nil) + taskGenerator := NewTaskGenerator(namespaceRegistry, mutableState, cfg, archivalMetadata, log.NewTestLogger()) err := taskGenerator.GenerateWorkflowCloseTasks(p.CloseEventTime, p.DeleteAfterClose, false) require.NoError(t, err) diff --git a/service/history/workflow/util.go b/service/history/workflow/util.go index 1f9be1cdc96..69101f0ec08 100644 --- a/service/history/workflow/util.go +++ b/service/history/workflow/util.go @@ -100,8 +100,7 @@ func TimeoutWorkflow( return err } - // Proactively cancel in-flight activities so they don't run uselessly after the workflow is closed. - return mutableState.GenerateActivityCancelCommandsForClose() + return nil } // TerminateWorkflow will write a WorkflowExecutionTerminated event with a fresh @@ -152,8 +151,7 @@ func TerminateWorkflow( return err } - // Proactively cancel in-flight activities so they don't run uselessly after the workflow is closed. - return mutableState.GenerateActivityCancelCommandsForClose() + return nil } // FindAutoResetPoint returns the auto reset point From 82617878f12cf2a6070da45c2187a6e1bad65c18 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Tue, 12 May 2026 14:37:35 -0700 Subject: [PATCH 04/12] Remove unnecessary comment about caller Co-Authored-By: Claude Opus 4.6 --- service/history/workflow/mutable_state_impl.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 999c4a773ce..52f25e9a641 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -4538,8 +4538,7 @@ func (ms *MutableStateImpl) AddWorkerCommandsTasks(commands []*workerpb.WorkerCo } // GenerateActivityCancelCommandsForClose generates WorkerCommandsTasks to cancel all -// in-flight activities that have a worker control queue. Called from GenerateWorkflowCloseTasks -// on every workflow close path to proactively notify workers. +// in-flight activities that have a worker control queue. func (ms *MutableStateImpl) GenerateActivityCancelCommandsForClose() error { if !ms.config.EnableCancelActivityWorkerCommand() { return nil From 25107f9a827a1b19b056637fc43788e24184654c Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Tue, 12 May 2026 14:38:38 -0700 Subject: [PATCH 05/12] Update StartedClock nil check comment to cover retry backoff case Co-Authored-By: Claude Opus 4.6 --- service/history/workflow/mutable_state_impl.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 52f25e9a641..02458cba59b 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -4555,9 +4555,9 @@ func (ms *MutableStateImpl) GenerateActivityCancelCommandsForClose() error { continue } if ai.StartedClock == nil { - // StartedClock may be nil for activities started before this feature was deployed. - // Skip cancel command; the activity will time out normally. - ms.logger.Debug("Skipping worker cancel command: activity missing StartedClock (pre-deploy)", + // StartedClock is nil when the activity is not currently started (e.g. in retry backoff) + // or was started before this feature was deployed. Skip cancel command. + ms.logger.Debug("Skipping worker cancel command: activity not currently started", tag.WorkflowNamespaceID(wfKey.NamespaceID), tag.WorkflowID(wfKey.WorkflowID), tag.WorkflowRunID(wfKey.RunID), From 51543fcbffcfcc8f62c565d134cf4d6a63651f35 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Tue, 12 May 2026 14:40:31 -0700 Subject: [PATCH 06/12] Simplify error returns in util.go Co-Authored-By: Claude Opus 4.6 --- service/history/workflow/util.go | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/service/history/workflow/util.go b/service/history/workflow/util.go index 69101f0ec08..85021f35134 100644 --- a/service/history/workflow/util.go +++ b/service/history/workflow/util.go @@ -96,11 +96,7 @@ func TimeoutWorkflow( retryState, continuedRunID, ) - if err != nil { - return err - } - - return nil + return err } // TerminateWorkflow will write a WorkflowExecutionTerminated event with a fresh @@ -147,11 +143,7 @@ func TerminateWorkflow( deleteAfterTerminate, links, ) - if err != nil { - return err - } - - return nil + return err } // FindAutoResetPoint returns the auto reset point From 7b11848c73a6dbab091f9432b53538161d2a7aca Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Tue, 12 May 2026 14:42:00 -0700 Subject: [PATCH 07/12] Add e2e test for cancel command dispatch on continue-as-new Co-Authored-By: Claude Opus 4.6 --- tests/worker_commands_task_test.go | 125 +++++++++++++++++++++++++++++ 1 file changed, 125 insertions(+) diff --git a/tests/worker_commands_task_test.go b/tests/worker_commands_task_test.go index e172b66d6f2..4a57419e305 100644 --- a/tests/worker_commands_task_test.go +++ b/tests/worker_commands_task_test.go @@ -492,3 +492,128 @@ func TestDispatchCancelOnWorkflowTimeout(t *testing.T) { "Cancel command task token must match the activity's original task token") t.Log("SUCCESS: Received cancel command on control queue after workflow timeout") } + +// TestDispatchCancelOnContinueAsNew tests that when a workflow continues-as-new, +// cancel commands are dispatched for in-flight activities from the old run. +// CaN creates a fresh mutable state — pending activities are abandoned, same as terminate. +func TestDispatchCancelOnContinueAsNew(t *testing.T) { + env := testcore.NewEnv(t, testcore.WithDynamicConfig(dynamicconfig.EnableCancelActivityWorkerCommand, true)) + + ctx, cancel := context.WithTimeout(context.Background(), 90*time.Second) + defer cancel() + + tv := env.Tv() + poller := env.TaskPoller() + + controlQueueName := tv.ControlQueueName(env.Namespace().String()) + + // Start the workflow + _, err := env.FrontendClient().StartWorkflowExecution(ctx, &workflowservice.StartWorkflowExecutionRequest{ + RequestId: tv.Any().String(), + Namespace: env.Namespace().String(), + WorkflowId: tv.WorkflowID(), + WorkflowType: tv.WorkflowType(), + TaskQueue: tv.TaskQueue(), + WorkflowExecutionTimeout: durationpb.New(60 * time.Second), + WorkflowTaskTimeout: durationpb.New(10 * time.Second), + }) + env.NoError(err) + + // Schedule the activity + _, err = poller.PollAndHandleWorkflowTask(tv, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + return &workflowservice.RespondWorkflowTaskCompletedRequest{ + Commands: []*commandpb.Command{ + { + CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK, + Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{ + ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{ + ActivityId: tv.ActivityID(), + ActivityType: tv.ActivityType(), + TaskQueue: tv.TaskQueue(), + // Long timeout to ensure the activity is still in-flight when CaN happens. + ScheduleToCloseTimeout: durationpb.New(5 * time.Minute), + StartToCloseTimeout: durationpb.New(5 * time.Minute), + }, + }, + }, + }, + }, nil + }) + env.NoError(err) + + // Poll for activity task and start it with a worker control queue + activityPollResp, err := env.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ + Namespace: env.Namespace().String(), + TaskQueue: tv.TaskQueue(), + Identity: tv.WorkerIdentity(), + WorkerInstanceKey: tv.WorkerInstanceKey(), + WorkerControlTaskQueue: controlQueueName, + }) + env.NoError(err) + env.NotNil(activityPollResp) + env.NotEmpty(activityPollResp.TaskToken) + + // Signal the workflow to trigger a new workflow task + _, err = env.FrontendClient().SignalWorkflowExecution(ctx, &workflowservice.SignalWorkflowExecutionRequest{ + Namespace: env.Namespace().String(), + WorkflowExecution: activityPollResp.WorkflowExecution, + SignalName: "trigger-can", + Identity: tv.WorkerIdentity(), + }) + env.NoError(err) + + // Handle the workflow task by issuing ContinueAsNew + _, err = poller.PollAndHandleWorkflowTask(tv, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + return &workflowservice.RespondWorkflowTaskCompletedRequest{ + Commands: []*commandpb.Command{ + { + CommandType: enumspb.COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION, + Attributes: &commandpb.Command_ContinueAsNewWorkflowExecutionCommandAttributes{ + ContinueAsNewWorkflowExecutionCommandAttributes: &commandpb.ContinueAsNewWorkflowExecutionCommandAttributes{ + WorkflowType: tv.WorkflowType(), + TaskQueue: tv.TaskQueue(), + WorkflowTaskTimeout: durationpb.New(10 * time.Second), + WorkflowRunTimeout: durationpb.New(60 * time.Second), + }, + }, + }, + }, + }, nil + }) + env.NoError(err) + + // Poll Nexus control queue - should receive cancel command for the in-flight activity + var nexusPollResp *workflowservice.PollNexusTaskQueueResponse + env.Eventually(func() bool { + pollCtx, pollCancel := context.WithTimeout(ctx, 5*time.Second) + defer pollCancel() + resp, err := env.FrontendClient().PollNexusTaskQueue(pollCtx, &workflowservice.PollNexusTaskQueueRequest{ + Namespace: env.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{Name: controlQueueName, Kind: enumspb.TASK_QUEUE_KIND_WORKER_COMMANDS}, + Identity: tv.WorkerIdentity(), + }) + if err == nil && resp != nil && resp.Request != nil { + nexusPollResp = resp + return true + } + return false + }, 120*time.Second, 100*time.Millisecond, "Timed out waiting for cancel command on control queue after continue-as-new") + + // Verify the cancel command + startOp := nexusPollResp.Request.GetStartOperation() + env.NotNil(startOp, "Expected StartOperation in Nexus request") + env.Equal("temporal.api.nexusservices.workerservice.v1.WorkerService", startOp.Service) + env.Equal("ExecuteCommands", startOp.Operation) + + var executeReq workerservicepb.ExecuteCommandsRequest + err = payload.Decode(startOp.Payload, &executeReq) + env.NoError(err) + env.Len(executeReq.Commands, 1, "Expected exactly 1 command") + cancelCmd := executeReq.Commands[0].GetCancelActivity() + env.NotNil(cancelCmd, "Expected CancelActivity command") + env.Equal(activityPollResp.TaskToken, cancelCmd.TaskToken, + "Cancel command task token must match the activity's original task token") + t.Log("SUCCESS: Received cancel command on control queue after continue-as-new") +} From 7886bc024c76a2fa3ea3599fa294a09c1ad322c4 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Tue, 12 May 2026 14:57:07 -0700 Subject: [PATCH 08/12] Fix gci import ordering Co-Authored-By: Claude Opus 4.6 --- tests/worker_commands_task_test.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/tests/worker_commands_task_test.go b/tests/worker_commands_task_test.go index 4a57419e305..3cea6f4d478 100644 --- a/tests/worker_commands_task_test.go +++ b/tests/worker_commands_task_test.go @@ -6,6 +6,8 @@ import ( "time" "github.com/google/uuid" + "google.golang.org/protobuf/types/known/durationpb" + commandpb "go.temporal.io/api/command/v1" commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" @@ -15,7 +17,6 @@ import ( "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/payload" "go.temporal.io/server/tests/testcore" - "google.golang.org/protobuf/types/known/durationpb" ) // TestDispatchCancelToWorker tests that when an activity cancellation is requested, @@ -572,10 +573,10 @@ func TestDispatchCancelOnContinueAsNew(t *testing.T) { CommandType: enumspb.COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION, Attributes: &commandpb.Command_ContinueAsNewWorkflowExecutionCommandAttributes{ ContinueAsNewWorkflowExecutionCommandAttributes: &commandpb.ContinueAsNewWorkflowExecutionCommandAttributes{ - WorkflowType: tv.WorkflowType(), - TaskQueue: tv.TaskQueue(), - WorkflowTaskTimeout: durationpb.New(10 * time.Second), - WorkflowRunTimeout: durationpb.New(60 * time.Second), + WorkflowType: tv.WorkflowType(), + TaskQueue: tv.TaskQueue(), + WorkflowTaskTimeout: durationpb.New(10 * time.Second), + WorkflowRunTimeout: durationpb.New(60 * time.Second), }, }, }, From 6681004cd8f9acb271059d0f071b8b2171e059c8 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Tue, 12 May 2026 15:59:03 -0700 Subject: [PATCH 09/12] Fix gci import ordering (use CI config) Co-Authored-By: Claude Opus 4.6 --- tests/worker_commands_task_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/worker_commands_task_test.go b/tests/worker_commands_task_test.go index 3cea6f4d478..76825c9a3f4 100644 --- a/tests/worker_commands_task_test.go +++ b/tests/worker_commands_task_test.go @@ -6,8 +6,6 @@ import ( "time" "github.com/google/uuid" - "google.golang.org/protobuf/types/known/durationpb" - commandpb "go.temporal.io/api/command/v1" commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" @@ -17,6 +15,7 @@ import ( "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/payload" "go.temporal.io/server/tests/testcore" + "google.golang.org/protobuf/types/known/durationpb" ) // TestDispatchCancelToWorker tests that when an activity cancellation is requested, From fbd826a2ca85c7a90ee707ce129084b4e67b59e5 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Fri, 15 May 2026 14:33:26 -0700 Subject: [PATCH 10/12] Skip cancel command generation on standby cluster Cancel commands are best-effort and only dispatched on the active cluster. Avoid creating tasks on standby that will be immediately dropped by the outbound executor. Co-Authored-By: Claude Opus 4.6 --- service/history/workflow/mutable_state_impl.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 02458cba59b..397a0120f6e 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -4544,6 +4544,14 @@ func (ms *MutableStateImpl) GenerateActivityCancelCommandsForClose() error { return nil } + // Cancel commands are best-effort and only dispatched on the active cluster. + // Skip task generation on standby to avoid creating tasks that will be dropped. + activeCluster := ms.clusterMetadata.ClusterNameForFailoverVersion( + ms.namespaceEntry.IsGlobalNamespace(), ms.GetCurrentVersion()) + if activeCluster != ms.clusterMetadata.GetCurrentClusterName() { + return nil + } + serializer := tasktoken.NewSerializer() wfKey := ms.GetWorkflowKey() nsID := ms.GetNamespaceEntry().ID().String() From 3bc0884777d4adaa1028dc449364d59ccb602ab1 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Fri, 15 May 2026 14:39:12 -0700 Subject: [PATCH 11/12] Add standby cluster test case for cancel command generation Co-Authored-By: Claude Opus 4.6 --- .../workflow/mutable_state_impl_test.go | 23 ++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/service/history/workflow/mutable_state_impl_test.go b/service/history/workflow/mutable_state_impl_test.go index 78d7543bf4e..9f546c2f8a9 100644 --- a/service/history/workflow/mutable_state_impl_test.go +++ b/service/history/workflow/mutable_state_impl_test.go @@ -7599,6 +7599,7 @@ func TestGenerateActivityCancelCommandsForClose(t *testing.T) { testCases := []struct { name string featureEnabled bool + standby bool activities map[int64]*persistencespb.ActivityInfo expectedQueues map[string]int // controlQueue -> expected command count expectedNoTasks bool @@ -7692,6 +7693,22 @@ func TestGenerateActivityCancelCommandsForClose(t *testing.T) { }, expectedNoTasks: true, }, + { + name: "standby cluster - no tasks generated", + featureEnabled: true, + standby: true, + activities: map[int64]*persistencespb.ActivityInfo{ + 1: { + ScheduledEventId: 1, + ActivityId: "act-1", + ActivityType: &commonpb.ActivityType{Name: "type1"}, + WorkerControlTaskQueue: "control-queue-1", + StartedClock: startedClock, + Attempt: 1, + }, + }, + expectedNoTasks: true, + }, } for _, tc := range testCases { @@ -7716,7 +7733,11 @@ func TestGenerateActivityCancelCommandsForClose(t *testing.T) { namespaceEntry := tests.GlobalNamespaceEntry mockShard.Resource.NamespaceCache.EXPECT().GetNamespaceByID(tests.NamespaceID).Return(namespaceEntry, nil).AnyTimes() - mockShard.Resource.ClusterMetadata.EXPECT().ClusterNameForFailoverVersion(gomock.Any(), gomock.Any()).Return(cluster.TestCurrentClusterName).AnyTimes() + activeCluster := cluster.TestCurrentClusterName + if tc.standby { + activeCluster = cluster.TestAlternativeClusterName + } + mockShard.Resource.ClusterMetadata.EXPECT().ClusterNameForFailoverVersion(gomock.Any(), gomock.Any()).Return(activeCluster).AnyTimes() mockShard.Resource.ClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes() mockShard.Resource.ClusterMetadata.EXPECT().GetClusterID().Return(int64(1)).AnyTimes() From 655bcfb154539ec74028550f68138823305197ce Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Fri, 15 May 2026 14:40:16 -0700 Subject: [PATCH 12/12] Add comment for standby test field Co-Authored-By: Claude Opus 4.6 --- service/history/workflow/mutable_state_impl_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/history/workflow/mutable_state_impl_test.go b/service/history/workflow/mutable_state_impl_test.go index 9f546c2f8a9..08d6a08e031 100644 --- a/service/history/workflow/mutable_state_impl_test.go +++ b/service/history/workflow/mutable_state_impl_test.go @@ -7599,7 +7599,7 @@ func TestGenerateActivityCancelCommandsForClose(t *testing.T) { testCases := []struct { name string featureEnabled bool - standby bool + standby bool // simulate running on a standby (non-active) cluster activities map[int64]*persistencespb.ActivityInfo expectedQueues map[string]int // controlQueue -> expected command count expectedNoTasks bool