diff --git a/service/history/api/recordactivitytaskstarted/api.go b/service/history/api/recordactivitytaskstarted/api.go index 74eb393b030..25ae4803254 100644 --- a/service/history/api/recordactivitytaskstarted/api.go +++ b/service/history/api/recordactivitytaskstarted/api.go @@ -388,10 +388,7 @@ func processActivityWorkflowRules( // activity was paused, need to update activity if err := ms.UpdateActivity(ai.ScheduledEventId, func(activityInfo *persistencespb.ActivityInfo, _ historyi.MutableState) error { - activityInfo.StartedEventId = common.EmptyEventID - activityInfo.StartVersion = common.EmptyVersion - activityInfo.StartedTime = nil - activityInfo.RequestId = "" + workflow.ClearActivityStartedState(activityInfo) return nil }); err != nil { return rejectCodeUndefined, err diff --git a/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go b/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go index abf58eb89ff..6bd0d0222ad 100644 --- a/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go +++ b/service/history/api/respondworkflowtaskcompleted/workflow_task_completed_handler.go @@ -698,9 +698,10 @@ func (handler *workflowTaskCompletedHandler) handleCommandRequestCancelActivity( handler.activityNotStartedCancelled = true } else if ai.WorkerControlTaskQueue != "" { if ai.StartedClock == nil { - // StartedClock may be nil for activities started before this feature was deployed. + // StartedClock is nil when the activity is not currently running on a worker + // (e.g., in retry backoff, or started before this feature was deployed). // Skip cancel command; the activity will time out normally. - handler.logger.Info("Skipping worker cancel command: activity missing StartedClock (pre-deploy)", + handler.logger.Info("Skipping worker cancel command: activity not currently started", tag.WorkflowNamespaceID(handler.mutableState.GetWorkflowKey().NamespaceID), tag.WorkflowID(handler.mutableState.GetWorkflowKey().WorkflowID), tag.WorkflowRunID(handler.mutableState.GetWorkflowKey().RunID), diff --git a/service/history/timer_queue_active_task_executor.go b/service/history/timer_queue_active_task_executor.go index bca92e09fb3..29cedf9a92f 100644 --- a/service/history/timer_queue_active_task_executor.go +++ b/service/history/timer_queue_active_task_executor.go @@ -1026,10 +1026,7 @@ func (t *timerQueueActiveTaskExecutor) processActivityWorkflowRules( if ai.Paused { // need to update activity if err := ms.UpdateActivity(ai.ScheduledEventId, func(activityInfo *persistencespb.ActivityInfo, _ historyi.MutableState) error { - activityInfo.StartedEventId = common.EmptyEventID - activityInfo.StartVersion = common.EmptyVersion - activityInfo.StartedTime = nil - activityInfo.RequestId = "" + workflow.ClearActivityStartedState(activityInfo) return nil }); err != nil { return err diff --git a/service/history/workflow/activity.go b/service/history/workflow/activity.go index 8054f4c02ba..e4c77661b1f 100644 --- a/service/history/workflow/activity.go +++ b/service/history/workflow/activity.go @@ -60,6 +60,17 @@ func GetActivityState(ai *persistencespb.ActivityInfo) enumspb.PendingActivitySt return enumspb.PENDING_ACTIVITY_STATE_SCHEDULED } +// ClearActivityStartedState resets the per-attempt "started" fields on an ActivityInfo. +// Called when an activity leaves the started state (retry, pause, etc.) so that stale +// values from the previous attempt don't leak into the next one. +func ClearActivityStartedState(ai *persistencespb.ActivityInfo) { + ai.StartedEventId = common.EmptyEventID + ai.StartVersion = common.EmptyVersion + ai.RequestId = "" + ai.StartedTime = nil + ai.StartedClock = nil +} + func UpdateActivityInfoForRetries( ai *persistencespb.ActivityInfo, version int64, @@ -72,10 +83,7 @@ func UpdateActivityInfoForRetries( ai.Attempt = attempt ai.Version = version ai.ScheduledTime = nextScheduledTime - ai.StartedEventId = common.EmptyEventID - ai.StartVersion = common.EmptyVersion - ai.RequestId = "" - ai.StartedTime = nil + ClearActivityStartedState(ai) // Mark per-attempt timers for recreation. ai.TimerTaskStatus &^= TimerTaskStatusCreatedHeartbeat | TimerTaskStatusCreatedStartToClose | TimerTaskStatusCreatedScheduleToStart ai.RetryLastWorkerIdentity = ai.StartedIdentity diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 862894e92b3..4339f4d39f3 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -6431,10 +6431,7 @@ func (ms *MutableStateImpl) RetryActivity( if ai.Paused { // need to update activity if err := ms.UpdateActivity(ai.ScheduledEventId, func(activityInfo *persistencespb.ActivityInfo, _ historyi.MutableState) error { - activityInfo.StartedEventId = common.EmptyEventID - activityInfo.StartVersion = common.EmptyVersion - activityInfo.StartedTime = nil - activityInfo.RequestId = "" + ClearActivityStartedState(activityInfo) activityInfo.RetryLastFailure = ms.truncateRetryableActivityFailure(activityFailure) activityInfo.Attempt++ if ms.config.EnableActivityRetryStampIncrement() { diff --git a/service/history/workflow/mutable_state_impl_restart_activity_test.go b/service/history/workflow/mutable_state_impl_restart_activity_test.go index 3cb91b7fbd3..3873a5b9f23 100644 --- a/service/history/workflow/mutable_state_impl_restart_activity_test.go +++ b/service/history/workflow/mutable_state_impl_restart_activity_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/google/uuid" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/uber-go/tally/v4" commandpb "go.temporal.io/api/command/v1" @@ -14,8 +15,10 @@ import ( enumspb "go.temporal.io/api/enums/v1" failurepb "go.temporal.io/api/failure/v1" taskqueuepb "go.temporal.io/api/taskqueue/v1" + clockspb "go.temporal.io/server/api/clock/v1" enumsspb "go.temporal.io/server/api/enums/v1" persistencespb "go.temporal.io/server/api/persistence/v1" + "go.temporal.io/server/common" "go.temporal.io/server/common/backoff" commonclock "go.temporal.io/server/common/clock" "go.temporal.io/server/common/log" @@ -55,6 +58,32 @@ type ( } ) +func TestClearActivityStartedState(t *testing.T) { + ai := &persistencespb.ActivityInfo{ + StartedEventId: 42, + StartVersion: 10, + RequestId: "req-1", + StartedTime: timestamppb.Now(), + StartedClock: &clockspb.VectorClock{ClusterId: 1, ShardId: 1, Clock: 99}, + // Fields that should NOT be cleared. + ScheduledEventId: 7, + ActivityId: "activity-1", + Attempt: 3, + } + + ClearActivityStartedState(ai) + + require.Equal(t, common.EmptyEventID, ai.StartedEventId) + require.Equal(t, common.EmptyVersion, ai.StartVersion) + require.Empty(t, ai.RequestId) + require.Nil(t, ai.StartedTime) + require.Nil(t, ai.StartedClock) + // Verify non-started fields are untouched. + require.Equal(t, int64(7), ai.ScheduledEventId) + require.Equal(t, "activity-1", ai.ActivityId) + require.Equal(t, int32(3), ai.Attempt) +} + func TestMutableStateRetryActivitySuite(t *testing.T) { s := new(retryActivitySuite) @@ -165,6 +194,24 @@ func (s *retryActivitySuite) TestRetryActivity_should_be_scheduled_when_next_bac s.assertTruncateFailureCalled() } +func (s *retryActivitySuite) TestRetryActivity_should_clear_per_attempt_fields() { + s.mutableState.timeSource = s.timeSource + taskGeneratorMock := NewMockTaskGenerator(s.controller) + taskGeneratorMock.EXPECT().GenerateActivityRetryTasks(s.activity) + s.mutableState.taskGenerator = taskGeneratorMock + + // Set per-attempt fields that should be cleared on retry. + s.activity.StartedClock = &clockspb.VectorClock{ClusterId: 1, ShardId: 1, Clock: 42} + s.activity.StartedTime = timestamppb.Now() + + _, err := s.mutableState.RetryActivity(s.activity, s.failure) + s.Require().NoError(err) + + s.Nil(s.activity.StartedClock, "StartedClock should be cleared on retry") + s.Nil(s.activity.StartedTime, "StartedTime should be cleared on retry") + s.Equal(common.EmptyEventID, s.activity.StartedEventId, "StartedEventId should be reset to EmptyEventID") +} + // TestRetryActivity_should_be_scheduled_when_next_retry_delay_is_set asserts that the activity is retried after NextRetryDelay period specified in the application failure. func (s *retryActivitySuite) TestRetryActivity_should_be_scheduled_when_next_retry_delay_is_set() { s.mutableState.timeSource = s.timeSource