Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions service/history/interfaces/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,13 +309,21 @@ type (
GetExternalPayloadCount() int64
AddExternalPayloadCount(count int64)

// AddTasks adds tasks to the mutable state.
// For scheduled tasks (any CategoryTypeScheduled — e.g. timer, archival), if time has been
// skipped (i.e. the virtual time of this mutable state is ahead of wall-clock time), the
// scheduled time of the task is adjusted to wall-clock time, as the dispatch queues run
// against wall-clock time.
AddTasks(tasks ...tasks.Task)
PopTasks() map[tasks.Category][]tasks.Task
DeleteCHASMPureTasks(maxScheduledTime time.Time)

SetUpdateCondition(int64, int64)
GetUpdateCondition() (int64, int64)

// SetSpeculativeWorkflowTaskTimeoutTask submits the task to the shard's in-memory
// scheduled queue, replacing any prior speculative timeout. VisibilityTimestamp must be
// in virtual time; it is converted to wall-clock before scheduling.
SetSpeculativeWorkflowTaskTimeoutTask(task *tasks.WorkflowTaskTimeoutTask) error
CheckSpeculativeWorkflowTaskTimeoutTask(task *tasks.WorkflowTaskTimeoutTask) bool
RemoveSpeculativeWorkflowTaskTimeoutTask()
Expand Down Expand Up @@ -405,8 +413,12 @@ type (
HasRequestID(requestID string) bool
SetSuccessorRunID(runID string)

Now() time.Time // the time of a mutable state may be ahead of the wall-clock time because of time skipping

// Now returns the current time of the mutable state, which may be ahead of
// wall-clock time if time skipping has happened.
Now() time.Time
// ToRealTime converts a virtual timestamp from mutable state to wall-clock time,
// adjusting for accumulated skipped duration which may have happened.
ToRealTime(virtualTime time.Time) time.Time
AddWorkflowExecutionTimeSkippingTransitionedEvent(
ctx context.Context, targetTime time.Time, disabledAfterBound bool) (*historypb.HistoryEvent, error)
ApplyWorkflowExecutionTimeSkippingTransitionedEvent(ctx context.Context, event *historypb.HistoryEvent) error
Expand Down
14 changes: 14 additions & 0 deletions service/history/interfaces/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 5 additions & 7 deletions service/history/timer_queue_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (t *timerQueueActiveTaskExecutor) executeUserTimerTimeoutTask(
}

timerSequence := t.getTimerSequence(mutableState)
referenceTime := mutableState.Now()
referenceTime := t.Now()
timerFired := false
Loop:
for _, timerSequenceID := range timerSequence.LoadAndSortUserTimers() {
Expand All @@ -171,9 +171,7 @@ Loop:
return serviceerror.NewInternal(errString)
}

// when time-skipping happens, the task.FireTime is way before the timerSequenceID.Timestamp,
// but using the virtual time of ms as the reference time, this function will still return true.
if !queues.IsTimeExpired(task, referenceTime, timerSequenceID.Timestamp) {
if !queues.IsTimeExpired(task, referenceTime, mutableState.ToRealTime(timerSequenceID.Timestamp)) {
// Timer sequence IDs are sorted; once we encounter a timer whose
// sequence ID has not expired, all subsequent timers will not have
// expired.
Expand Down Expand Up @@ -226,7 +224,7 @@ func (t *timerQueueActiveTaskExecutor) executeActivityTimeoutTask(
}

timerSequence := t.getTimerSequence(mutableState)
referenceTime := mutableState.Now()
referenceTime := t.Now()
updateMutableState := false
scheduleWorkflowTask := false

Expand All @@ -237,7 +235,7 @@ func (t *timerQueueActiveTaskExecutor) executeActivityTimeoutTask(
// created.
isHeartBeatTask := task.TimeoutType == enumspb.TIMEOUT_TYPE_HEARTBEAT
ai, heartbeatTimeoutVis, ok := mutableState.GetActivityInfoWithTimerHeartbeat(task.EventID)
if isHeartBeatTask && ok && queues.IsTimeExpired(task, task.GetVisibilityTime(), heartbeatTimeoutVis) {
if isHeartBeatTask && ok && queues.IsTimeExpired(task, task.GetVisibilityTime(), mutableState.ToRealTime(heartbeatTimeoutVis)) {
if err := mutableState.UpdateActivityTaskStatusWithTimerHeartbeat(
ai.ScheduledEventId, ai.TimerTaskStatus&^workflow.TimerTaskStatusCreatedHeartbeat, nil); err != nil {
return err
Expand All @@ -247,7 +245,7 @@ func (t *timerQueueActiveTaskExecutor) executeActivityTimeoutTask(

Loop:
for _, timerSequenceID := range timerSequence.LoadAndSortActivityTimers() {
if !queues.IsTimeExpired(task, referenceTime, timerSequenceID.Timestamp) {
if !queues.IsTimeExpired(task, referenceTime, mutableState.ToRealTime(timerSequenceID.Timestamp)) {
// timer sequence IDs are sorted, once there is one timer
// sequence ID not expired, all after that wil not expired
break Loop
Expand Down
100 changes: 100 additions & 0 deletions service/history/timer_queue_active_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2153,6 +2153,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestExecuteStateMachineTimerTask_Exe
).AnyTimes()
ms.EXPECT().HSM().Return(root).AnyTimes()
ms.EXPECT().Now().Return(s.now).AnyTimes()
ms.EXPECT().ToRealTime(gomock.Any()).DoAndReturn(func(t time.Time) time.Time { return t }).AnyTimes()

_, err = dummy.MachineCollection(root).Add("dummy", dummy.NewDummy())
s.NoError(err)
Expand Down Expand Up @@ -2620,3 +2621,102 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessSingleActivityTimeoutTask
})
}
}

func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_Heartbeat_DedupUnderSkip() {
execution := &commonpb.WorkflowExecution{
WorkflowId: "some random workflow ID",
RunId: uuid.NewString(),
}
workflowType := "some random workflow type"
taskQueueName := "some random task queue"

mutableState := workflow.TestGlobalMutableState(s.mockShard, s.mockShard.GetEventsCache(), s.logger, s.version, execution.GetWorkflowId(), execution.GetRunId())
_, err := mutableState.AddWorkflowExecutionStartedEvent(
execution,
&historyservice.StartWorkflowExecutionRequest{
Attempt: 1,
NamespaceId: s.namespaceID.String(),
StartRequest: &workflowservice.StartWorkflowExecutionRequest{
WorkflowType: &commonpb.WorkflowType{Name: workflowType},
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueueName},
WorkflowRunTimeout: durationpb.New(72 * time.Hour),
WorkflowTaskTimeout: durationpb.New(1 * time.Second),
},
},
)
s.NoError(err)

wt := addWorkflowTaskScheduledEvent(mutableState)
event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.NewString())
wt.StartedEventID = event.GetEventId()
event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity")

// Long timeouts so the activity-timer loop in executeActivityTimeoutTask
// doesn't fire on its own (it uses ms.Now() = wall+1h under the configured
// skip). HeartbeatTimeout is strictly shorter so CreateNextActivityTimer
// emits the heartbeat task first, setting TimerTaskStatusCreatedHeartbeat.
const (
otherTimeout = 24 * time.Hour
heartbeatTO = 12 * time.Hour
)
scheduledEvent, _ := addActivityTaskScheduledEventWithRetry(
mutableState,
event.GetEventId(),
"activity", "activity-type", "taskqueue", nil,
otherTimeout, otherTimeout, otherTimeout, heartbeatTO,
&commonpb.RetryPolicy{
InitialInterval: durationpb.New(1 * time.Second),
BackoffCoefficient: 1.2,
MaximumInterval: durationpb.New(5 * time.Second),
MaximumAttempts: 5,
},
)
startedEvent := addActivityTaskStartedEvent(mutableState, scheduledEvent.GetEventId(), "identity")
// Activity has retry policy → started event is transient (nil) but
// activityInfo.StartedTime is set; this is the same shape as the _Noop test.
s.Nil(startedEvent)

timerSequence := workflow.NewTimerSequence(mutableState)
mutableState.InsertTasks[tasks.CategoryTimer] = nil
modified, err := timerSequence.CreateNextActivityTimer()
s.NoError(err)
s.True(modified)
createdTask := mutableState.InsertTasks[tasks.CategoryTimer][0]
s.Equal(enumspb.TIMEOUT_TYPE_HEARTBEAT, createdTask.(*tasks.ActivityTimeoutTask).TimeoutType)

persistenceMutableState := s.createPersistenceMutableState(mutableState, scheduledEvent.GetEventId(), scheduledEvent.GetVersion())
// Attach time-skipping state to the persisted MS so ToRealTime sees a 1h offset
// on reload. After load, pendingActivityTimerHeartbeats[id] is reset to the
// year-2000 sentinel by NewMutableStateFromDB, and ToRealTime(year 2000) = 1999-12-31 23:00.
persistenceMutableState.ExecutionInfo.TimeSkippingInfo = &persistencespb.TimeSkippingInfo{
Config: &workflowpb.TimeSkippingConfig{Enabled: true},
AccumulatedSkippedDuration: durationpb.New(time.Hour),
}

// 1999-12-31 23:30 — strictly between ToRealTime(year 2000) and year 2000.
// Lands in the 1h window where the fix and the bug diverge.
heartbeatYear2000Reset := time.Unix(946684800, 0).UTC()
timerTask := &tasks.ActivityTimeoutTask{
WorkflowKey: definition.NewWorkflowKey(
s.namespaceID.String(),
execution.GetWorkflowId(),
execution.GetRunId(),
),
Attempt: 1,
TaskID: s.mustGenerateTaskID(),
TimeoutType: enumspb.TIMEOUT_TYPE_HEARTBEAT,
VisibilityTimestamp: heartbeatYear2000Reset.Add(-30 * time.Minute),
EventID: scheduledEvent.GetEventId(),
}

s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).
Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
// Dedup fires only with the ToRealTime fix; that's what makes
// updateMutableState=true and forces an UpdateWorkflowExecution call.
s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).
Return(tests.UpdateWorkflowExecutionResponse, nil)

resp := s.timerQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask))
s.NoError(resp.ExecutionErr,
"dedup branch must fire under accumulated skip via ToRealTime(heartbeatTimeoutVis); without it the executor would short-circuit with errNoTimerFired")
}
6 changes: 3 additions & 3 deletions service/history/timer_queue_standby_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (t *timerQueueStandbyTaskExecutor) executeUserTimerTimeoutTask(
if queues.IsTimeExpired(
timerTask,
referenceTime,
timerSequenceID.Timestamp,
mutableState.ToRealTime(timerSequenceID.Timestamp),
) {
return &struct{}{}, nil
}
Expand Down Expand Up @@ -321,7 +321,7 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityTimeoutTask(
if queues.IsTimeExpired(
timerTask,
referenceTime,
timerSequenceID.Timestamp,
mutableState.ToRealTime(timerSequenceID.Timestamp),
) {
return &struct{}{}, nil
}
Expand All @@ -340,7 +340,7 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityTimeoutTask(
// created.
isHeartBeatTask := timerTask.TimeoutType == enumspb.TIMEOUT_TYPE_HEARTBEAT
ai, heartbeatTimeoutVis, ok := mutableState.GetActivityInfoWithTimerHeartbeat(timerTask.EventID)
if isHeartBeatTask && ok && queues.IsTimeExpired(timerTask, timerTask.GetVisibilityTime(), heartbeatTimeoutVis) {
if isHeartBeatTask && ok && queues.IsTimeExpired(timerTask, timerTask.GetVisibilityTime(), mutableState.ToRealTime(heartbeatTimeoutVis)) {
if err := mutableState.UpdateActivityTaskStatusWithTimerHeartbeat(ai.ScheduledEventId, ai.TimerTaskStatus&^workflow.TimerTaskStatusCreatedHeartbeat, nil); err != nil {
return nil, err
}
Expand Down
Loading
Loading