From 35434da1979c2fe45c443593f3d272a5903c7ec9 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Tue, 12 May 2026 13:12:36 -0700 Subject: [PATCH 1/6] Clear StartedClock on activity retry StartedClock is a per-attempt field set in RecordActivityTaskStarted and used to reconstruct the task token matching sent to the worker. It was not being cleared in UpdateActivityInfoForRetries or the paused-activity retry path, leaving a stale value during retry backoff. This caused cancel commands (#9233) to be dispatched for activities not currently running on any worker. Clear StartedClock alongside the other per-attempt fields (StartedEventId, StartedTime, StartVersion) and update the cancel command skip log message to reflect that nil StartedClock can mean retry backoff, not just pre-deploy. Co-Authored-By: Claude Opus 4.6 --- .../workflow_task_completed_handler.go | 5 +++-- service/history/workflow/activity.go | 1 + service/history/workflow/mutable_state_impl.go | 1 + 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go b/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go index abf58eb89ff..6bd0d0222ad 100644 --- a/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go +++ b/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go @@ -698,9 +698,10 @@ func (handler *workflowTaskCompletedHandler) handleCommandRequestCancelActivity( handler.activityNotStartedCancelled = true } else if ai.WorkerControlTaskQueue != "" { if ai.StartedClock == nil { - // StartedClock may be nil for activities started before this feature was deployed. + // StartedClock is nil when the activity is not currently running on a worker + // (e.g., in retry backoff, or started before this feature was deployed). // Skip cancel command; the activity will time out normally. - handler.logger.Info("Skipping worker cancel command: activity missing StartedClock (pre-deploy)", + handler.logger.Info("Skipping worker cancel command: activity not currently started", tag.WorkflowNamespaceID(handler.mutableState.GetWorkflowKey().NamespaceID), tag.WorkflowID(handler.mutableState.GetWorkflowKey().WorkflowID), tag.WorkflowRunID(handler.mutableState.GetWorkflowKey().RunID), diff --git a/service/history/workflow/activity.go b/service/history/workflow/activity.go index 8054f4c02ba..610b9a8c5b5 100644 --- a/service/history/workflow/activity.go +++ b/service/history/workflow/activity.go @@ -76,6 +76,7 @@ func UpdateActivityInfoForRetries( ai.StartVersion = common.EmptyVersion ai.RequestId = "" ai.StartedTime = nil + ai.StartedClock = nil // Mark per-attempt timers for recreation. ai.TimerTaskStatus &^= TimerTaskStatusCreatedHeartbeat | TimerTaskStatusCreatedStartToClose | TimerTaskStatusCreatedScheduleToStart ai.RetryLastWorkerIdentity = ai.StartedIdentity diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 862894e92b3..40986e11c89 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -6434,6 +6434,7 @@ func (ms *MutableStateImpl) RetryActivity( activityInfo.StartedEventId = common.EmptyEventID activityInfo.StartVersion = common.EmptyVersion activityInfo.StartedTime = nil + activityInfo.StartedClock = nil activityInfo.RequestId = "" activityInfo.RetryLastFailure = ms.truncateRetryableActivityFailure(activityFailure) activityInfo.Attempt++ From 774c731520b2781597ee9f2241fc9d90fe86334a Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Tue, 12 May 2026 13:22:56 -0700 Subject: [PATCH 2/6] Add test for per-attempt field clearing on activity retry Verify that StartedClock, StartedTime, and StartedEventId are all cleared when an activity is retried. Co-Authored-By: Claude Opus 4.6 --- ...utable_state_impl_restart_activity_test.go | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/service/history/workflow/mutable_state_impl_restart_activity_test.go b/service/history/workflow/mutable_state_impl_restart_activity_test.go index 3cb91b7fbd3..10f96178d92 100644 --- a/service/history/workflow/mutable_state_impl_restart_activity_test.go +++ b/service/history/workflow/mutable_state_impl_restart_activity_test.go @@ -14,8 +14,10 @@ import ( enumspb "go.temporal.io/api/enums/v1" failurepb "go.temporal.io/api/failure/v1" taskqueuepb "go.temporal.io/api/taskqueue/v1" + clockspb "go.temporal.io/server/api/clock/v1" enumsspb "go.temporal.io/server/api/enums/v1" persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/common" "go.temporal.io/server/common/backoff" commonclock "go.temporal.io/server/common/clock" "go.temporal.io/server/common/log" @@ -165,6 +167,24 @@ func (s *retryActivitySuite) TestRetryActivity_should_be_scheduled_when_next_bac s.assertTruncateFailureCalled() } +func (s *retryActivitySuite) TestRetryActivity_should_clear_per_attempt_fields() { + s.mutableState.timeSource = s.timeSource + taskGeneratorMock := NewMockTaskGenerator(s.controller) + taskGeneratorMock.EXPECT().GenerateActivityRetryTasks(s.activity) + s.mutableState.taskGenerator = taskGeneratorMock + + // Set per-attempt fields that should be cleared on retry. + s.activity.StartedClock = &clockspb.VectorClock{ClusterId: 1, ShardId: 1, Clock: 42} + s.activity.StartedTime = timestamppb.Now() + + _, err := s.mutableState.RetryActivity(s.activity, s.failure) + s.NoError(err) + + s.Nil(s.activity.StartedClock, "StartedClock should be cleared on retry") + s.Nil(s.activity.StartedTime, "StartedTime should be cleared on retry") + s.Equal(common.EmptyEventID, s.activity.StartedEventId, "StartedEventId should be reset to EmptyEventID") +} + // TestRetryActivity_should_be_scheduled_when_next_retry_delay_is_set asserts that the activity is retried after NextRetryDelay period specified in the application failure. func (s *retryActivitySuite) TestRetryActivity_should_be_scheduled_when_next_retry_delay_is_set() { s.mutableState.timeSource = s.timeSource From 68c8fc8f26ee5889873819823079b5bc4837eeb4 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Tue, 12 May 2026 13:24:08 -0700 Subject: [PATCH 3/6] Clear StartedClock in pause paths too Two additional paths reset per-attempt fields when an activity is paused (timer executor and RecordActivityTaskStarted) but were also missing the StartedClock clear. Co-Authored-By: Claude Opus 4.6 --- service/history/api/recordactivitytaskstarted/api.go | 1 + service/history/timer_queue_active_task_executor.go | 1 + 2 files changed, 2 insertions(+) diff --git a/service/history/api/recordactivitytaskstarted/api.go b/service/history/api/recordactivitytaskstarted/api.go index 74eb393b030..40ea1bf444c 100644 --- a/service/history/api/recordactivitytaskstarted/api.go +++ b/service/history/api/recordactivitytaskstarted/api.go @@ -391,6 +391,7 @@ func processActivityWorkflowRules( activityInfo.StartedEventId = common.EmptyEventID activityInfo.StartVersion = common.EmptyVersion activityInfo.StartedTime = nil + activityInfo.StartedClock = nil activityInfo.RequestId = "" return nil }); err != nil { diff --git a/service/history/timer_queue_active_task_executor.go b/service/history/timer_queue_active_task_executor.go index bca92e09fb3..bed02284c90 100644 --- a/service/history/timer_queue_active_task_executor.go +++ b/service/history/timer_queue_active_task_executor.go @@ -1029,6 +1029,7 @@ func (t *timerQueueActiveTaskExecutor) processActivityWorkflowRules( activityInfo.StartedEventId = common.EmptyEventID activityInfo.StartVersion = common.EmptyVersion activityInfo.StartedTime = nil + activityInfo.StartedClock = nil activityInfo.RequestId = "" return nil }); err != nil { From 297f04eaaf43a56543ba68c68ebb2d453c8a6ade Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Tue, 12 May 2026 13:28:16 -0700 Subject: [PATCH 4/6] Extract ClearActivityStartedState helper Centralize the per-attempt field reset into a single exported function in the workflow package. All four call sites (normal retry, paused retry, timer executor pause, RecordActivityTaskStarted pause) now call this helper instead of duplicating the field assignments. Co-Authored-By: Claude Opus 4.6 --- .../api/recordactivitytaskstarted/api.go | 6 +----- .../history/timer_queue_active_task_executor.go | 6 +----- service/history/workflow/activity.go | 17 ++++++++++++----- service/history/workflow/mutable_state_impl.go | 6 +----- 4 files changed, 15 insertions(+), 20 deletions(-) diff --git a/service/history/api/recordactivitytaskstarted/api.go b/service/history/api/recordactivitytaskstarted/api.go index 40ea1bf444c..25ae4803254 100644 --- a/service/history/api/recordactivitytaskstarted/api.go +++ b/service/history/api/recordactivitytaskstarted/api.go @@ -388,11 +388,7 @@ func processActivityWorkflowRules( // activity was paused, need to update activity if err := ms.UpdateActivity(ai.ScheduledEventId, func(activityInfo *persistencespb.ActivityInfo, _ historyi.MutableState) error { - activityInfo.StartedEventId = common.EmptyEventID - activityInfo.StartVersion = common.EmptyVersion - activityInfo.StartedTime = nil - activityInfo.StartedClock = nil - activityInfo.RequestId = "" + workflow.ClearActivityStartedState(activityInfo) return nil }); err != nil { return rejectCodeUndefined, err diff --git a/service/history/timer_queue_active_task_executor.go b/service/history/timer_queue_active_task_executor.go index bed02284c90..29cedf9a92f 100644 --- a/service/history/timer_queue_active_task_executor.go +++ b/service/history/timer_queue_active_task_executor.go @@ -1026,11 +1026,7 @@ func (t *timerQueueActiveTaskExecutor) processActivityWorkflowRules( if ai.Paused { // need to update activity if err := ms.UpdateActivity(ai.ScheduledEventId, func(activityInfo *persistencespb.ActivityInfo, _ historyi.MutableState) error { - activityInfo.StartedEventId = common.EmptyEventID - activityInfo.StartVersion = common.EmptyVersion - activityInfo.StartedTime = nil - activityInfo.StartedClock = nil - activityInfo.RequestId = "" + workflow.ClearActivityStartedState(activityInfo) return nil }); err != nil { return err diff --git a/service/history/workflow/activity.go b/service/history/workflow/activity.go index 610b9a8c5b5..e4c77661b1f 100644 --- a/service/history/workflow/activity.go +++ b/service/history/workflow/activity.go @@ -60,6 +60,17 @@ func GetActivityState(ai *persistencespb.ActivityInfo) enumspb.PendingActivitySt return enumspb.PENDING_ACTIVITY_STATE_SCHEDULED } +// ClearActivityStartedState resets the per-attempt "started" fields on an ActivityInfo. +// Called when an activity leaves the started state (retry, pause, etc.) so that stale +// values from the previous attempt don't leak into the next one. +func ClearActivityStartedState(ai *persistencespb.ActivityInfo) { + ai.StartedEventId = common.EmptyEventID + ai.StartVersion = common.EmptyVersion + ai.RequestId = "" + ai.StartedTime = nil + ai.StartedClock = nil +} + func UpdateActivityInfoForRetries( ai *persistencespb.ActivityInfo, version int64, @@ -72,11 +83,7 @@ func UpdateActivityInfoForRetries( ai.Attempt = attempt ai.Version = version ai.ScheduledTime = nextScheduledTime - ai.StartedEventId = common.EmptyEventID - ai.StartVersion = common.EmptyVersion - ai.RequestId = "" - ai.StartedTime = nil - ai.StartedClock = nil + ClearActivityStartedState(ai) // Mark per-attempt timers for recreation. ai.TimerTaskStatus &^= TimerTaskStatusCreatedHeartbeat | TimerTaskStatusCreatedStartToClose | TimerTaskStatusCreatedScheduleToStart ai.RetryLastWorkerIdentity = ai.StartedIdentity diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 40986e11c89..4339f4d39f3 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -6431,11 +6431,7 @@ func (ms *MutableStateImpl) RetryActivity( if ai.Paused { // need to update activity if err := ms.UpdateActivity(ai.ScheduledEventId, func(activityInfo *persistencespb.ActivityInfo, _ historyi.MutableState) error { - activityInfo.StartedEventId = common.EmptyEventID - activityInfo.StartVersion = common.EmptyVersion - activityInfo.StartedTime = nil - activityInfo.StartedClock = nil - activityInfo.RequestId = "" + ClearActivityStartedState(activityInfo) activityInfo.RetryLastFailure = ms.truncateRetryableActivityFailure(activityFailure) activityInfo.Attempt++ if ms.config.EnableActivityRetryStampIncrement() { From af6ca8d41d0418875a33e09e8f42316d8ce7075b Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Tue, 12 May 2026 13:33:56 -0700 Subject: [PATCH 5/6] Add unit test for ClearActivityStartedState Verify that all per-attempt fields are cleared and non-started fields are left untouched. Co-Authored-By: Claude Opus 4.6 --- ...utable_state_impl_restart_activity_test.go | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/service/history/workflow/mutable_state_impl_restart_activity_test.go b/service/history/workflow/mutable_state_impl_restart_activity_test.go index 10f96178d92..3d27646af96 100644 --- a/service/history/workflow/mutable_state_impl_restart_activity_test.go +++ b/service/history/workflow/mutable_state_impl_restart_activity_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/google/uuid" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/uber-go/tally/v4" commandpb "go.temporal.io/api/command/v1" @@ -57,6 +58,32 @@ type ( } ) +func TestClearActivityStartedState(t *testing.T) { + ai := &persistencespb.ActivityInfo{ + StartedEventId: 42, + StartVersion: 10, + RequestId: "req-1", + StartedTime: timestamppb.Now(), + StartedClock: &clockspb.VectorClock{ClusterId: 1, ShardId: 1, Clock: 99}, + // Fields that should NOT be cleared. + ScheduledEventId: 7, + ActivityId: "activity-1", + Attempt: 3, + } + + ClearActivityStartedState(ai) + + require.Equal(t, common.EmptyEventID, ai.StartedEventId) + require.Equal(t, common.EmptyVersion, ai.StartVersion) + require.Empty(t, ai.RequestId) + require.Nil(t, ai.StartedTime) + require.Nil(t, ai.StartedClock) + // Verify non-started fields are untouched. + require.Equal(t, int64(7), ai.ScheduledEventId) + require.Equal(t, "activity-1", ai.ActivityId) + require.Equal(t, int32(3), ai.Attempt) +} + func TestMutableStateRetryActivitySuite(t *testing.T) { s := new(retryActivitySuite) From 315b05823033108e6da0b054bae19f3f91cae037 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Tue, 12 May 2026 14:55:30 -0700 Subject: [PATCH 6/6] Fix testifylint: use s.Require().NoError for error assertion Co-Authored-By: Claude Opus 4.6 --- .../workflow/mutable_state_impl_restart_activity_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/history/workflow/mutable_state_impl_restart_activity_test.go b/service/history/workflow/mutable_state_impl_restart_activity_test.go index 3d27646af96..3873a5b9f23 100644 --- a/service/history/workflow/mutable_state_impl_restart_activity_test.go +++ b/service/history/workflow/mutable_state_impl_restart_activity_test.go @@ -205,7 +205,7 @@ func (s *retryActivitySuite) TestRetryActivity_should_clear_per_attempt_fields() s.activity.StartedTime = timestamppb.Now() _, err := s.mutableState.RetryActivity(s.activity, s.failure) - s.NoError(err) + s.Require().NoError(err) s.Nil(s.activity.StartedClock, "StartedClock should be cleared on retry") s.Nil(s.activity.StartedTime, "StartedTime should be cleared on retry")