Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions service/history/api/recordactivitytaskstarted/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,10 +388,7 @@ func processActivityWorkflowRules(

// activity was paused, need to update activity
if err := ms.UpdateActivity(ai.ScheduledEventId, func(activityInfo *persistencespb.ActivityInfo, _ historyi.MutableState) error {
activityInfo.StartedEventId = common.EmptyEventID
activityInfo.StartVersion = common.EmptyVersion
activityInfo.StartedTime = nil
activityInfo.RequestId = ""
workflow.ClearActivityStartedState(activityInfo)
return nil
}); err != nil {
return rejectCodeUndefined, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -698,9 +698,10 @@ func (handler *workflowTaskCompletedHandler) handleCommandRequestCancelActivity(
handler.activityNotStartedCancelled = true
} else if ai.WorkerControlTaskQueue != "" {
if ai.StartedClock == nil {
// StartedClock may be nil for activities started before this feature was deployed.
// StartedClock is nil when the activity is not currently running on a worker
// (e.g., in retry backoff, or started before this feature was deployed).
// Skip cancel command; the activity will time out normally.
handler.logger.Info("Skipping worker cancel command: activity missing StartedClock (pre-deploy)",
handler.logger.Info("Skipping worker cancel command: activity not currently started",
tag.WorkflowNamespaceID(handler.mutableState.GetWorkflowKey().NamespaceID),
tag.WorkflowID(handler.mutableState.GetWorkflowKey().WorkflowID),
tag.WorkflowRunID(handler.mutableState.GetWorkflowKey().RunID),
Expand Down
5 changes: 1 addition & 4 deletions service/history/timer_queue_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1026,10 +1026,7 @@ func (t *timerQueueActiveTaskExecutor) processActivityWorkflowRules(
if ai.Paused {
// need to update activity
if err := ms.UpdateActivity(ai.ScheduledEventId, func(activityInfo *persistencespb.ActivityInfo, _ historyi.MutableState) error {
activityInfo.StartedEventId = common.EmptyEventID
activityInfo.StartVersion = common.EmptyVersion
activityInfo.StartedTime = nil
activityInfo.RequestId = ""
workflow.ClearActivityStartedState(activityInfo)
return nil
}); err != nil {
return err
Expand Down
16 changes: 12 additions & 4 deletions service/history/workflow/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,17 @@ func GetActivityState(ai *persistencespb.ActivityInfo) enumspb.PendingActivitySt
return enumspb.PENDING_ACTIVITY_STATE_SCHEDULED
}

// ClearActivityStartedState resets the per-attempt "started" fields on an ActivityInfo.
// Called when an activity leaves the started state (retry, pause, etc.) so that stale
// values from the previous attempt don't leak into the next one.
func ClearActivityStartedState(ai *persistencespb.ActivityInfo) {
ai.StartedEventId = common.EmptyEventID
ai.StartVersion = common.EmptyVersion
ai.RequestId = ""
ai.StartedTime = nil
ai.StartedClock = nil
}

func UpdateActivityInfoForRetries(
ai *persistencespb.ActivityInfo,
version int64,
Expand All @@ -72,10 +83,7 @@ func UpdateActivityInfoForRetries(
ai.Attempt = attempt
ai.Version = version
ai.ScheduledTime = nextScheduledTime
ai.StartedEventId = common.EmptyEventID
ai.StartVersion = common.EmptyVersion
ai.RequestId = ""
ai.StartedTime = nil
ClearActivityStartedState(ai)
// Mark per-attempt timers for recreation.
ai.TimerTaskStatus &^= TimerTaskStatusCreatedHeartbeat | TimerTaskStatusCreatedStartToClose | TimerTaskStatusCreatedScheduleToStart
ai.RetryLastWorkerIdentity = ai.StartedIdentity
Expand Down
5 changes: 1 addition & 4 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -6431,10 +6431,7 @@ func (ms *MutableStateImpl) RetryActivity(
if ai.Paused {
// need to update activity
if err := ms.UpdateActivity(ai.ScheduledEventId, func(activityInfo *persistencespb.ActivityInfo, _ historyi.MutableState) error {
activityInfo.StartedEventId = common.EmptyEventID
activityInfo.StartVersion = common.EmptyVersion
activityInfo.StartedTime = nil
activityInfo.RequestId = ""
ClearActivityStartedState(activityInfo)
activityInfo.RetryLastFailure = ms.truncateRetryableActivityFailure(activityFailure)
activityInfo.Attempt++
if ms.config.EnableActivityRetryStampIncrement() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@ import (
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/uber-go/tally/v4"
commandpb "go.temporal.io/api/command/v1"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
failurepb "go.temporal.io/api/failure/v1"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
clockspb "go.temporal.io/server/api/clock/v1"
enumsspb "go.temporal.io/server/api/enums/v1"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/backoff"
commonclock "go.temporal.io/server/common/clock"
"go.temporal.io/server/common/log"
Expand Down Expand Up @@ -55,6 +58,32 @@ type (
}
)

func TestClearActivityStartedState(t *testing.T) {
ai := &persistencespb.ActivityInfo{
StartedEventId: 42,
StartVersion: 10,
RequestId: "req-1",
StartedTime: timestamppb.Now(),
StartedClock: &clockspb.VectorClock{ClusterId: 1, ShardId: 1, Clock: 99},
// Fields that should NOT be cleared.
ScheduledEventId: 7,
ActivityId: "activity-1",
Attempt: 3,
}

ClearActivityStartedState(ai)

require.Equal(t, common.EmptyEventID, ai.StartedEventId)
require.Equal(t, common.EmptyVersion, ai.StartVersion)
require.Empty(t, ai.RequestId)
require.Nil(t, ai.StartedTime)
require.Nil(t, ai.StartedClock)
// Verify non-started fields are untouched.
require.Equal(t, int64(7), ai.ScheduledEventId)
require.Equal(t, "activity-1", ai.ActivityId)
require.Equal(t, int32(3), ai.Attempt)
}

func TestMutableStateRetryActivitySuite(t *testing.T) {
s := new(retryActivitySuite)

Expand Down Expand Up @@ -165,6 +194,24 @@ func (s *retryActivitySuite) TestRetryActivity_should_be_scheduled_when_next_bac
s.assertTruncateFailureCalled()
}

func (s *retryActivitySuite) TestRetryActivity_should_clear_per_attempt_fields() {
s.mutableState.timeSource = s.timeSource
taskGeneratorMock := NewMockTaskGenerator(s.controller)
taskGeneratorMock.EXPECT().GenerateActivityRetryTasks(s.activity)
s.mutableState.taskGenerator = taskGeneratorMock

// Set per-attempt fields that should be cleared on retry.
s.activity.StartedClock = &clockspb.VectorClock{ClusterId: 1, ShardId: 1, Clock: 42}
s.activity.StartedTime = timestamppb.Now()

_, err := s.mutableState.RetryActivity(s.activity, s.failure)
s.Require().NoError(err)

s.Nil(s.activity.StartedClock, "StartedClock should be cleared on retry")
s.Nil(s.activity.StartedTime, "StartedTime should be cleared on retry")
s.Equal(common.EmptyEventID, s.activity.StartedEventId, "StartedEventId should be reset to EmptyEventID")
}

// TestRetryActivity_should_be_scheduled_when_next_retry_delay_is_set asserts that the activity is retried after NextRetryDelay period specified in the application failure.
func (s *retryActivitySuite) TestRetryActivity_should_be_scheduled_when_next_retry_delay_is_set() {
s.mutableState.timeSource = s.timeSource
Expand Down
Loading