From 18756d2a21bef08a68d7dfc7b889f29bf2ccbfc6 Mon Sep 17 00:00:00 2001 From: Feiyang Xie Date: Sun, 17 May 2026 18:47:45 -0700 Subject: [PATCH 1/3] make scheduled tasks compatible with time skipping --- service/history/interfaces/mutable_state.go | 16 +- .../history/interfaces/mutable_state_mock.go | 14 + .../timer_queue_active_task_executor.go | 2 +- .../timer_queue_active_task_executor_test.go | 99 ++++ .../timer_queue_standby_task_executor.go | 6 +- .../timer_queue_standby_task_executor_test.go | 219 +++++++++ .../history/workflow/mutable_state_impl.go | 33 +- .../workflow/mutable_state_impl_test.go | 447 +++++++++++++++++- service/history/workflow/task_generator.go | 3 + 9 files changed, 817 insertions(+), 22 deletions(-) diff --git a/service/history/interfaces/mutable_state.go b/service/history/interfaces/mutable_state.go index 8392dcf6361..befcb6cca79 100644 --- a/service/history/interfaces/mutable_state.go +++ b/service/history/interfaces/mutable_state.go @@ -309,6 +309,11 @@ type ( GetExternalPayloadCount() int64 AddExternalPayloadCount(count int64) + // AddTasks adds tasks to the mutable state. + // For scheduled tasks (any CategoryTypeScheduled — e.g. timer, archival), if time has been + // skipped (i.e. the virtual time of this mutable state is ahead of wall-clock time), the + // scheduled time of the task is adjusted to wall-clock time, as the dispatch queues run + // against wall-clock time. AddTasks(tasks ...tasks.Task) PopTasks() map[tasks.Category][]tasks.Task DeleteCHASMPureTasks(maxScheduledTime time.Time) @@ -316,6 +321,9 @@ type ( SetUpdateCondition(int64, int64) GetUpdateCondition() (int64, int64) + // SetSpeculativeWorkflowTaskTimeoutTask submits the task to the shard's in-memory + // scheduled queue, replacing any prior speculative timeout. VisibilityTimestamp must be + // in virtual time; it is converted to wall-clock before scheduling. SetSpeculativeWorkflowTaskTimeoutTask(task *tasks.WorkflowTaskTimeoutTask) error CheckSpeculativeWorkflowTaskTimeoutTask(task *tasks.WorkflowTaskTimeoutTask) bool RemoveSpeculativeWorkflowTaskTimeoutTask() @@ -405,8 +413,12 @@ type ( HasRequestID(requestID string) bool SetSuccessorRunID(runID string) - Now() time.Time // the time of a mutable state may be ahead of the wall-clock time because of time skipping - + // Now returns the current time of the mutable state, which may be ahead of + // wall-clock time if time skipping has happened. + Now() time.Time + // ToRealTime converts a virtual timestamp from mutable state to wall-clock time, + // adjusting for accumulated skipped duration which may have happened. + ToRealTime(virtualTime time.Time) time.Time AddWorkflowExecutionTimeSkippingTransitionedEvent( ctx context.Context, targetTime time.Time, disabledAfterBound bool) (*historypb.HistoryEvent, error) ApplyWorkflowExecutionTimeSkippingTransitionedEvent(ctx context.Context, event *historypb.HistoryEvent) error diff --git a/service/history/interfaces/mutable_state_mock.go b/service/history/interfaces/mutable_state_mock.go index 05faaccf669..03baf2e22e5 100644 --- a/service/history/interfaces/mutable_state_mock.go +++ b/service/history/interfaces/mutable_state_mock.go @@ -3705,6 +3705,20 @@ func (mr *MockMutableStateMockRecorder) TaskQueueScheduleToStartTimeout(name any return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TaskQueueScheduleToStartTimeout", reflect.TypeOf((*MockMutableState)(nil).TaskQueueScheduleToStartTimeout), name) } +// ToRealTime mocks base method. +func (m *MockMutableState) ToRealTime(virtualTime time.Time) time.Time { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ToRealTime", virtualTime) + ret0, _ := ret[0].(time.Time) + return ret0 +} + +// ToRealTime indicates an expected call of ToRealTime. +func (mr *MockMutableStateMockRecorder) ToRealTime(virtualTime any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ToRealTime", reflect.TypeOf((*MockMutableState)(nil).ToRealTime), virtualTime) +} + // UpdateActivity mocks base method. func (m *MockMutableState) UpdateActivity(arg0 int64, arg1 ActivityUpdater) error { m.ctrl.T.Helper() diff --git a/service/history/timer_queue_active_task_executor.go b/service/history/timer_queue_active_task_executor.go index 29cedf9a92f..ee11fb1309c 100644 --- a/service/history/timer_queue_active_task_executor.go +++ b/service/history/timer_queue_active_task_executor.go @@ -237,7 +237,7 @@ func (t *timerQueueActiveTaskExecutor) executeActivityTimeoutTask( // created. isHeartBeatTask := task.TimeoutType == enumspb.TIMEOUT_TYPE_HEARTBEAT ai, heartbeatTimeoutVis, ok := mutableState.GetActivityInfoWithTimerHeartbeat(task.EventID) - if isHeartBeatTask && ok && queues.IsTimeExpired(task, task.GetVisibilityTime(), heartbeatTimeoutVis) { + if isHeartBeatTask && ok && queues.IsTimeExpired(task, task.GetVisibilityTime(), mutableState.ToRealTime(heartbeatTimeoutVis)) { if err := mutableState.UpdateActivityTaskStatusWithTimerHeartbeat( ai.ScheduledEventId, ai.TimerTaskStatus&^workflow.TimerTaskStatusCreatedHeartbeat, nil); err != nil { return err diff --git a/service/history/timer_queue_active_task_executor_test.go b/service/history/timer_queue_active_task_executor_test.go index a31284a5020..1eee344e756 100644 --- a/service/history/timer_queue_active_task_executor_test.go +++ b/service/history/timer_queue_active_task_executor_test.go @@ -2620,3 +2620,102 @@ func (s *timerQueueActiveTaskExecutorSuite) TestProcessSingleActivityTimeoutTask }) } } + +func (s *timerQueueActiveTaskExecutorSuite) TestProcessActivityTimeout_Heartbeat_DedupUnderSkip() { + execution := &commonpb.WorkflowExecution{ + WorkflowId: "some random workflow ID", + RunId: uuid.NewString(), + } + workflowType := "some random workflow type" + taskQueueName := "some random task queue" + + mutableState := workflow.TestGlobalMutableState(s.mockShard, s.mockShard.GetEventsCache(), s.logger, s.version, execution.GetWorkflowId(), execution.GetRunId()) + _, err := mutableState.AddWorkflowExecutionStartedEvent( + execution, + &historyservice.StartWorkflowExecutionRequest{ + Attempt: 1, + NamespaceId: s.namespaceID.String(), + StartRequest: &workflowservice.StartWorkflowExecutionRequest{ + WorkflowType: &commonpb.WorkflowType{Name: workflowType}, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueueName}, + WorkflowRunTimeout: durationpb.New(72 * time.Hour), + WorkflowTaskTimeout: durationpb.New(1 * time.Second), + }, + }, + ) + s.NoError(err) + + wt := addWorkflowTaskScheduledEvent(mutableState) + event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.NewString()) + wt.StartedEventID = event.GetEventId() + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + + // Long timeouts so the activity-timer loop in executeActivityTimeoutTask + // doesn't fire on its own (it uses ms.Now() = wall+1h under the configured + // skip). HeartbeatTimeout is strictly shorter so CreateNextActivityTimer + // emits the heartbeat task first, setting TimerTaskStatusCreatedHeartbeat. + const ( + otherTimeout = 24 * time.Hour + heartbeatTO = 12 * time.Hour + ) + scheduledEvent, _ := addActivityTaskScheduledEventWithRetry( + mutableState, + event.GetEventId(), + "activity", "activity-type", "taskqueue", nil, + otherTimeout, otherTimeout, otherTimeout, heartbeatTO, + &commonpb.RetryPolicy{ + InitialInterval: durationpb.New(1 * time.Second), + BackoffCoefficient: 1.2, + MaximumInterval: durationpb.New(5 * time.Second), + MaximumAttempts: 5, + }, + ) + startedEvent := addActivityTaskStartedEvent(mutableState, scheduledEvent.GetEventId(), "identity") + // Activity has retry policy → started event is transient (nil) but + // activityInfo.StartedTime is set; this is the same shape as the _Noop test. + s.Nil(startedEvent) + + timerSequence := workflow.NewTimerSequence(mutableState) + mutableState.InsertTasks[tasks.CategoryTimer] = nil + modified, err := timerSequence.CreateNextActivityTimer() + s.NoError(err) + s.True(modified) + createdTask := mutableState.InsertTasks[tasks.CategoryTimer][0] + s.Equal(enumspb.TIMEOUT_TYPE_HEARTBEAT, createdTask.(*tasks.ActivityTimeoutTask).TimeoutType) + + persistenceMutableState := s.createPersistenceMutableState(mutableState, scheduledEvent.GetEventId(), scheduledEvent.GetVersion()) + // Attach time-skipping state to the persisted MS so ToRealTime sees a 1h offset + // on reload. After load, pendingActivityTimerHeartbeats[id] is reset to the + // year-2000 sentinel by NewMutableStateFromDB, and ToRealTime(year 2000) = 1999-12-31 23:00. + persistenceMutableState.ExecutionInfo.TimeSkippingInfo = &persistencespb.TimeSkippingInfo{ + Config: &workflowpb.TimeSkippingConfig{Enabled: true}, + AccumulatedSkippedDuration: durationpb.New(time.Hour), + } + + // 1999-12-31 23:30 — strictly between ToRealTime(year 2000) and year 2000. + // Lands in the 1h window where the fix and the bug diverge. + heartbeatYear2000Reset := time.Unix(946684800, 0).UTC() + timerTask := &tasks.ActivityTimeoutTask{ + WorkflowKey: definition.NewWorkflowKey( + s.namespaceID.String(), + execution.GetWorkflowId(), + execution.GetRunId(), + ), + Attempt: 1, + TaskID: s.mustGenerateTaskID(), + TimeoutType: enumspb.TIMEOUT_TYPE_HEARTBEAT, + VisibilityTimestamp: heartbeatYear2000Reset.Add(-30 * time.Minute), + EventID: scheduledEvent.GetEventId(), + } + + s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()). + Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) + // Dedup fires only with the ToRealTime fix; that's what makes + // updateMutableState=true and forces an UpdateWorkflowExecution call. + s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()). + Return(tests.UpdateWorkflowExecutionResponse, nil) + + resp := s.timerQueueActiveTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) + s.NoError(resp.ExecutionErr, + "dedup branch must fire under accumulated skip via ToRealTime(heartbeatTimeoutVis); without it the executor would short-circuit with errNoTimerFired") +} diff --git a/service/history/timer_queue_standby_task_executor.go b/service/history/timer_queue_standby_task_executor.go index d623a828637..e679cd35071 100644 --- a/service/history/timer_queue_standby_task_executor.go +++ b/service/history/timer_queue_standby_task_executor.go @@ -239,7 +239,6 @@ func (t *timerQueueStandbyTaskExecutor) executeUserTimerTimeoutTask( ctx context.Context, timerTask *tasks.UserTimerTask, ) error { - referenceTime := t.Now() actionFn := func(_ context.Context, wfContext historyi.WorkflowContext, mutableState historyi.MutableState, _ historyi.ReleaseWorkflowContextFunc) (any, error) { if !mutableState.IsWorkflowExecutionRunning() { // workflow already finished, no need to process the timer @@ -248,6 +247,7 @@ func (t *timerQueueStandbyTaskExecutor) executeUserTimerTimeoutTask( timerSequence := t.getTimerSequence(mutableState) timerSequenceIDs := timerSequence.LoadAndSortUserTimers() + referenceTime := mutableState.Now() if len(timerSequenceIDs) > 0 { timerSequenceID := timerSequenceIDs[0] _, ok := mutableState.GetUserTimerInfoByEventID(timerSequenceID.EventID) @@ -299,13 +299,13 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityTimeoutTask( // // the overall solution is to attempt to generate a new activity timer task whenever the // task passed in is safe to be throw away. - referenceTime := t.Now() actionFn := func(ctx context.Context, wfContext historyi.WorkflowContext, mutableState historyi.MutableState, _ historyi.ReleaseWorkflowContextFunc) (any, error) { if !mutableState.IsWorkflowExecutionRunning() { // workflow already finished, no need to process the timer return nil, nil } + referenceTime := mutableState.Now() timerSequence := t.getTimerSequence(mutableState) updateMutableState := false timerSequenceIDs := timerSequence.LoadAndSortActivityTimers() @@ -340,7 +340,7 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityTimeoutTask( // created. isHeartBeatTask := timerTask.TimeoutType == enumspb.TIMEOUT_TYPE_HEARTBEAT ai, heartbeatTimeoutVis, ok := mutableState.GetActivityInfoWithTimerHeartbeat(timerTask.EventID) - if isHeartBeatTask && ok && queues.IsTimeExpired(timerTask, timerTask.GetVisibilityTime(), heartbeatTimeoutVis) { + if isHeartBeatTask && ok && queues.IsTimeExpired(timerTask, timerTask.GetVisibilityTime(), mutableState.ToRealTime(heartbeatTimeoutVis)) { if err := mutableState.UpdateActivityTaskStatusWithTimerHeartbeat(ai.ScheduledEventId, ai.TimerTaskStatus&^workflow.TimerTaskStatusCreatedHeartbeat, nil); err != nil { return nil, err } diff --git a/service/history/timer_queue_standby_task_executor_test.go b/service/history/timer_queue_standby_task_executor_test.go index 1d0554f7174..0ca364ece9f 100644 --- a/service/history/timer_queue_standby_task_executor_test.go +++ b/service/history/timer_queue_standby_task_executor_test.go @@ -764,6 +764,225 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityTimeout_Multiple s.Nil(resp.ExecutionErr) } +// TestProcessUserTimerTimeout_StandbyFiresUnderSkip: without virtual time, the +// task's wall fire time arrives before MS's virtual deadline so standby acks +// silently. With virtual time, standby sees the timer as expired and retries +// until the fired event replicates. +func (s *timerQueueStandbyTaskExecutorSuite) TestProcessUserTimerTimeout_StandbyFiresUnderSkip() { + execution := &commonpb.WorkflowExecution{ + WorkflowId: "some random workflow ID", + RunId: uuid.NewString(), + } + workflowType := "some random workflow type" + taskQueueName := "some random task queue" + + mutableState := workflow.TestGlobalMutableState(s.mockShard, s.mockShard.GetEventsCache(), s.logger, s.version, execution.GetWorkflowId(), execution.GetRunId()) + _, err := mutableState.AddWorkflowExecutionStartedEvent( + execution, + &historyservice.StartWorkflowExecutionRequest{ + Attempt: 1, + NamespaceId: s.namespaceID.String(), + StartRequest: &workflowservice.StartWorkflowExecutionRequest{ + WorkflowType: &commonpb.WorkflowType{Name: workflowType}, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueueName}, + WorkflowRunTimeout: durationpb.New(72 * time.Hour), + WorkflowTaskTimeout: durationpb.New(1 * time.Second), + }, + }, + ) + s.NoError(err) + + wt := addWorkflowTaskScheduledEvent(mutableState) + event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.NewString()) + wt.StartedEventID = event.GetEventId() + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + + const timerDur = 30 * time.Minute + timerID := "t1" + event, _ = addTimerStartedEvent(mutableState, event.GetEventId(), timerID, timerDur) + + persistenceMutableState := s.createPersistenceMutableState(mutableState, event.GetEventId(), event.GetVersion()) + // Attach time-skipping state on the persistence proto. On reload, the wrapper + // makes ms.Now() return s.now + 1h, so the persisted virtual expiry (~s.now+30min) + // is already past in the virtual frame. + persistenceMutableState.ExecutionInfo.TimeSkippingInfo = &persistencespb.TimeSkippingInfo{ + AccumulatedSkippedDuration: durationpb.New(30 * time.Minute), + } + + // VisibilityTimestamp = virtual_deadline - skip = wall_deadline ≈ s.now - 30min. + // Dispatchable (≤ wall_now = s.now); mirrors what AddTasks would have produced + // if TSI had been present at scheduling time. + timerTask := &tasks.UserTimerTask{ + WorkflowKey: definition.NewWorkflowKey( + s.namespaceID.String(), + execution.GetWorkflowId(), + execution.GetRunId(), + ), + TaskID: s.mustGenerateTaskID(), + VisibilityTimestamp: s.now, + EventID: event.EventId, + } + + s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()). + Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) + s.mockShard.SetCurrentTime(s.clusterName, s.now) + resp := s.timerQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) + s.Equal(consts.ErrTaskRetry, resp.ExecutionErr, + "standby user-timer branch must see virtual expiry as elapsed under accumulated skip and retry until the fired event replicates") +} + +func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityTimeout_StandbyFiresUnderSkip() { + execution := &commonpb.WorkflowExecution{ + WorkflowId: "some random workflow ID", + RunId: uuid.NewString(), + } + workflowType := "some random workflow type" + taskQueueName := "some random task queue" + + mutableState := workflow.TestGlobalMutableState(s.mockShard, s.mockShard.GetEventsCache(), s.logger, s.version, execution.GetWorkflowId(), execution.GetRunId()) + _, err := mutableState.AddWorkflowExecutionStartedEvent( + execution, + &historyservice.StartWorkflowExecutionRequest{ + Attempt: 1, + NamespaceId: s.namespaceID.String(), + StartRequest: &workflowservice.StartWorkflowExecutionRequest{ + WorkflowType: &commonpb.WorkflowType{Name: workflowType}, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueueName}, + WorkflowRunTimeout: durationpb.New(72 * time.Hour), + WorkflowTaskTimeout: durationpb.New(1 * time.Second), + }, + }, + ) + s.NoError(err) + + wt := addWorkflowTaskScheduledEvent(mutableState) + event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.NewString()) + wt.StartedEventID = event.GetEventId() + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + + const activityTimeout = 30 * time.Minute + scheduledEvent, _ := addActivityTaskScheduledEvent( + mutableState, + event.GetEventId(), + "activity", "activity-type", "taskqueue", nil, + activityTimeout, activityTimeout, activityTimeout, activityTimeout, + ) + + persistenceMutableState := s.createPersistenceMutableState(mutableState, scheduledEvent.GetEventId(), scheduledEvent.GetVersion()) + persistenceMutableState.ExecutionInfo.TimeSkippingInfo = &persistencespb.TimeSkippingInfo{ + AccumulatedSkippedDuration: durationpb.New(time.Hour), + } + + timerTask := &tasks.ActivityTimeoutTask{ + WorkflowKey: definition.NewWorkflowKey( + s.namespaceID.String(), + execution.GetWorkflowId(), + execution.GetRunId(), + ), + Attempt: 1, + TaskID: s.mustGenerateTaskID(), + TimeoutType: enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE, + VisibilityTimestamp: s.now.Add(-30 * time.Minute), + EventID: scheduledEvent.GetEventId(), + } + + s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()). + Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) + + // Same windowing trick as the user-timer test — keep standby time inside the + // NoOp window so NoOp(&struct{}{}) → ErrTaskRetry rather than discard. + s.mockShard.SetCurrentTime(s.clusterName, s.now.Add(-29*time.Minute)) + resp := s.timerQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) + s.Equal(consts.ErrTaskRetry, resp.ExecutionErr, + "standby activity-timer branch must see virtual deadline as elapsed under accumulated skip — referenceTime must come from mutableState.Now() (virtual)") +} + +func (s *timerQueueStandbyTaskExecutorSuite) TestProcessActivityTimeout_StandbyHeartbeat_DedupUnderSkip() { + execution := &commonpb.WorkflowExecution{ + WorkflowId: "some random workflow ID", + RunId: uuid.NewString(), + } + workflowType := "some random workflow type" + taskQueueName := "some random task queue" + + mutableState := workflow.TestGlobalMutableState(s.mockShard, s.mockShard.GetEventsCache(), s.logger, s.version, execution.GetWorkflowId(), execution.GetRunId()) + _, err := mutableState.AddWorkflowExecutionStartedEvent( + execution, + &historyservice.StartWorkflowExecutionRequest{ + Attempt: 1, + NamespaceId: s.namespaceID.String(), + StartRequest: &workflowservice.StartWorkflowExecutionRequest{ + WorkflowType: &commonpb.WorkflowType{Name: workflowType}, + TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueueName}, + WorkflowRunTimeout: durationpb.New(72 * time.Hour), + WorkflowTaskTimeout: durationpb.New(1 * time.Second), + }, + }, + ) + s.NoError(err) + + wt := addWorkflowTaskScheduledEvent(mutableState) + event := addWorkflowTaskStartedEvent(mutableState, wt.ScheduledEventID, taskQueueName, uuid.NewString()) + wt.StartedEventID = event.GetEventId() + event = addWorkflowTaskCompletedEvent(&s.Suite, mutableState, wt.ScheduledEventID, wt.StartedEventID, "some random identity") + + // Heartbeat strictly shorter than the other timeouts so it sorts first in + // LoadAndSortActivityTimers — paired with TimerTaskStatusCreatedHeartbeat + // below, this makes CreateNextActivityTimer a no-op (TimerCreated=true), + // so the dedup branch is the only source of modification. + const ( + longTimeout = 24 * time.Hour + heartbeatTimeoutDur = 12 * time.Hour + ) + scheduledEvent, _ := addActivityTaskScheduledEvent( + mutableState, + event.GetEventId(), + "activity", "activity-type", "taskqueue", nil, + longTimeout, longTimeout, longTimeout, heartbeatTimeoutDur, + ) + addActivityTaskStartedEvent(mutableState, scheduledEvent.GetEventId(), "identity") + + // Flip the heartbeat-created bit so NewMutableStateFromDB populates + // pendingActivityTimerHeartbeats[id] with the year-2000 sentinel. Also set + // LastHeartbeatUpdateTime so the timer-sequence treats the heartbeat as a + // real ongoing tracker. + activityInfo := mutableState.GetPendingActivityInfos()[scheduledEvent.GetEventId()] + activityInfo.TimerTaskStatus |= workflow.TimerTaskStatusCreatedHeartbeat + activityInfo.LastHeartbeatUpdateTime = timestamppb.New(s.now) + + persistenceMutableState := s.createPersistenceMutableState(mutableState, scheduledEvent.GetEventId(), scheduledEvent.GetVersion()) + persistenceMutableState.ExecutionInfo.TimeSkippingInfo = &persistencespb.TimeSkippingInfo{ + AccumulatedSkippedDuration: durationpb.New(time.Hour), + } + + heartbeatYear2000 := time.Unix(946684800, 0).UTC() + timerTask := &tasks.ActivityTimeoutTask{ + WorkflowKey: definition.NewWorkflowKey( + s.namespaceID.String(), + execution.GetWorkflowId(), + execution.GetRunId(), + ), + Attempt: 1, + TaskID: s.mustGenerateTaskID(), + TimeoutType: enumspb.TIMEOUT_TYPE_HEARTBEAT, + VisibilityTimestamp: heartbeatYear2000.Add(-30 * time.Minute), + EventID: scheduledEvent.GetEventId(), + } + + s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()). + Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) + // Dedup branch must fire → bit cleared → CreateNextActivityTimer regenerates + // → UpdateWorkflowExecutionAsPassive routes through persistence.UpdateWorkflowExecution. + // Without ToRealTime the gate would skip and this expectation would go unmet. + s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()). + Return(tests.UpdateWorkflowExecutionResponse, nil) + + s.mockShard.SetCurrentTime(s.clusterName, s.now) + resp := s.timerQueueStandbyTaskExecutor.Execute(context.Background(), s.newTaskExecutable(timerTask)) + s.NoError(resp.ExecutionErr, + "standby heartbeat-dedup gate must fire under accumulated skip via ToRealTime(heartbeatTimeoutVis)") +} + func (s *timerQueueStandbyTaskExecutorSuite) TestProcessWorkflowTaskTimeout_Pending() { execution := &commonpb.WorkflowExecution{ WorkflowId: "some random workflow ID", diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index b890d1fd55c..2b63a67331b 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -1906,11 +1906,7 @@ func (ms *MutableStateImpl) Now() time.Time { // (i.e. they may include skipped time); this offset is the amount to subtract to get real // wall-clock, used by AddTasks when persisting timer-category tasks. func (ms *MutableStateImpl) accumulatedSkippedDuration() time.Duration { - info := ms.executionInfo.GetTimeSkippingInfo() - if info == nil || info.AccumulatedSkippedDuration == nil { - return 0 - } - return info.AccumulatedSkippedDuration.AsDuration() + return ms.executionInfo.GetTimeSkippingInfo().GetAccumulatedSkippedDuration().AsDuration() } // GetWorkflowCloseTime returns workflow closed time, returns a zero time for open workflow @@ -7239,8 +7235,7 @@ func (ms *MutableStateImpl) processCloseCallbacksChasm() error { func (ms *MutableStateImpl) AddTasks( newTasks ...tasks.Task, ) { - now := ms.timeSource.Now() - skip := ms.accumulatedSkippedDuration() + now := ms.Now() for _, task := range newTasks { if chasmTask, ok := task.(*tasks.ChasmTask); ok && chasmTask.GetCategory() == tasks.CategoryVisibility && @@ -7249,19 +7244,23 @@ func (ms *MutableStateImpl) AddTasks( } category := task.GetCategory() + // Drop tasks scheduled too far in the future. VisibilityTime hasn't been + // shifted to wall-clock yet (the conversion runs below), so both sides are + // virtual here; the difference is frame-invariant (skip cancels). Keep + // `now` from ms.Now() so both stay in the same frame. if category.Type() == tasks.CategoryTypeScheduled && task.GetVisibilityTime().Sub(now) > maxScheduledTaskDuration { ms.logger.Info("Dropped long duration scheduled task.", tasks.Tags(task)...) continue } - // Timer tasks are produced with VisibilityTime in the workflow's virtual frame (i.e. - // possibly inflated by accumulated skipped duration). The timer queue dispatches against + // Scheduled tasks are produced with VisibilityTime in the workflow's virtual frame (i.e. + // possibly inflated by accumulated skipped duration). Their dispatch queues run against // real wall-clock, so convert here — callers outside MutableState don't see the virtual // vs. real distinction. The CategoryTypeScheduled drop-check above runs first so it // compares virtual-vs-virtual (now is also virtual). - if skip > 0 && category == tasks.CategoryTimer { - task.SetVisibilityTime(task.GetVisibilityTime().Add(-skip)) + if category.Type() == tasks.CategoryTypeScheduled { + task.SetVisibilityTime(ms.ToRealTime(task.GetVisibilityTime())) } if chasmPureTask, ok := task.(*tasks.ChasmTaskPure); ok { @@ -7326,6 +7325,11 @@ func (ms *MutableStateImpl) SetSpeculativeWorkflowTaskTimeoutTask( return err } task.TaskID = taskID + // Producer stamps VisibilityTimestamp in the workflow's virtual frame (it's derived from + // workflowTask.ScheduledTime / StartedTime, which come from the time-skipping wrapper). + // The in-memory scheduled queue dispatches against real wall-clock, so convert here — + // same boundary contract as AddTasks for persisted scheduled tasks. + task.SetVisibilityTime(ms.ToRealTime(task.GetVisibilityTime())) ms.speculativeWorkflowTaskTimeoutTask = task return ms.shard.AddSpeculativeWorkflowTaskTimeoutTask(task) } @@ -10229,3 +10233,10 @@ func (ms *MutableStateImpl) calculateTimeSkippingTransition() (timeSkippingTrans } return transition, nil } + +func (ms *MutableStateImpl) ToRealTime(virtualTime time.Time) time.Time { + if virtualTime.IsZero() { + return virtualTime + } + return virtualTime.Add(-ms.accumulatedSkippedDuration()) +} diff --git a/service/history/workflow/mutable_state_impl_test.go b/service/history/workflow/mutable_state_impl_test.go index 54144a30737..2a8aae381ca 100644 --- a/service/history/workflow/mutable_state_impl_test.go +++ b/service/history/workflow/mutable_state_impl_test.go @@ -6972,26 +6972,122 @@ func (s *mutableStateSuite) TestAddTasks_SubtractsSkipFromTimerTasks() { VisibilityTimestamp: virtualTime, EventID: 42, } + activityTimeoutTask := &tasks.ActivityTimeoutTask{ + WorkflowKey: s.mutableState.GetWorkflowKey(), + VisibilityTimestamp: virtualTime, + TimeoutType: enumspb.TIMEOUT_TYPE_HEARTBEAT, + EventID: 5, + } + archivalTask := &tasks.ArchiveExecutionTask{ + WorkflowKey: s.mutableState.GetWorkflowKey(), + VisibilityTimestamp: virtualTime, + } transferTask := &tasks.ActivityTask{ WorkflowKey: s.mutableState.GetWorkflowKey(), VisibilityTimestamp: virtualTime, ScheduledEventID: 42, } - s.mutableState.AddTasks(timerTask, transferTask) + s.mutableState.AddTasks(timerTask, activityTimeoutTask, archivalTask, transferTask) - // Timer-category VisibilityTimestamp is shifted back to real wall-clock; other - // categories are left alone because their timestamps are not virtual-framed. + // Scheduled-category VisibilityTimestamps (Timer, Archival, MemoryTimer) are shifted back + // to real wall-clock because producers stamp them in the workflow's virtual frame. + // Immediate categories (Transfer, Visibility, Outbound) are left alone — their visibility + // timestamp is set by the shard at write time. s.Equal(virtualTime.Add(-skipped), timerTask.VisibilityTimestamp) + s.Equal(virtualTime.Add(-skipped), activityTimeoutTask.VisibilityTimestamp) + s.Equal(virtualTime.Add(-skipped), archivalTask.VisibilityTimestamp) s.Equal(virtualTime, transferTask.VisibilityTimestamp) inserted := s.mutableState.InsertTasks - s.Require().Len(inserted[tasks.CategoryTimer], 1) - s.Equal(virtualTime.Add(-skipped), inserted[tasks.CategoryTimer][0].GetVisibilityTime()) + s.Require().Len(inserted[tasks.CategoryTimer], 2, + "UserTimerTask and ActivityTimeoutTask both live in CategoryTimer") + for _, t := range inserted[tasks.CategoryTimer] { + s.Equal(virtualTime.Add(-skipped), t.GetVisibilityTime()) + } + s.Require().Len(inserted[tasks.CategoryArchival], 1) + s.Equal(virtualTime.Add(-skipped), inserted[tasks.CategoryArchival][0].GetVisibilityTime()) s.Require().Len(inserted[tasks.CategoryTransfer], 1) s.Equal(virtualTime, inserted[tasks.CategoryTransfer][0].GetVisibilityTime()) } +// TestAddCompletedWorkflowEvent_ArchivalConvertsVirtualToWall exercises the full +// close→archival path end-to-end: starting a workflow with accumulated skip +// installs the time-skipping wrapper on the MS time source, the close event is +// then stamped in virtual time by hBuilder, GenerateWorkflowCloseTasks computes +// archiveTime in virtual time, and AddTasks shifts it back to wall time. A +// regression in any of those layers (producer using wall-clock, hBuilder losing +// its wrapped time source, the shift removed from AddTasks) lets a year of +// virtual skip leak into the persisted ArchiveExecutionTask's VisibilityTimestamp. +func (s *mutableStateSuite) TestAddCompletedWorkflowEvent_ArchivalConvertsVirtualToWall() { + const skipped = 365 * 24 * time.Hour + + // tests.GlobalNamespaceEntry already has VisibilityArchivalState=ENABLED; + // the cluster-side mock has to agree for archivalEnabled() to return true. + s.mockShard.Resource.ArchivalMetadata.SetVisibilityEnabledByDefault() + // Pin the jitter delay to 0 so `closeTime + delay` is deterministic — without + // this the archive task lands in a ~5-minute window which smears the assertion. + s.mockConfig.ArchivalProcessorArchiveDelay = func() time.Duration { return 0 } + + s.mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any()).AnyTimes() + + realLowerBound := s.mockShard.GetTimeSource().Now() + + // Start with time-skipping enabled and 1 year of inherited skip. This + // installs the time-skipping wrapper on ms.timeSource and ms.hBuilder, so + // every subsequent event is stamped in virtual time. + _, err := s.mutableState.AddWorkflowExecutionStartedEvent( + &commonpb.WorkflowExecution{ + WorkflowId: tests.WorkflowID, + RunId: tests.RunID, + }, + &historyservice.StartWorkflowExecutionRequest{ + StartRequest: &workflowservice.StartWorkflowExecutionRequest{ + TimeSkippingConfig: &workflowpb.TimeSkippingConfig{Enabled: true}, + }, + InitialSkippedDuration: durationpb.New(skipped), + }, + ) + s.NoError(err) + + // Sanity: the wrapper landed and virtual now is ~1 year ahead of wall now. + s.InDelta(float64(skipped), float64(s.mutableState.Now().Sub(realLowerBound)), float64(time.Minute)) + + _, err = s.mutableState.AddWorkflowTaskScheduledEvent(false, enumsspb.WORKFLOW_TASK_TYPE_NORMAL) + s.NoError(err) + // Clear InsertTasks so we only inspect what the close path produces. + _, _, err = s.mutableState.CloseTransactionAsMutation(context.Background(), historyi.TransactionPolicyActive) + s.NoError(err) + + // Run the actual close path. event.EventTime is stamped at virtual close + // (~ wall + 1 year) by hBuilder; GenerateWorkflowCloseTasks computes + // archiveTime = virtualCloseTime + 0; AddTasks shifts by -skip, landing + // VisibilityTimestamp back at real close time. + _, err = s.mutableState.AddCompletedWorkflowEvent( + 5, + &commandpb.CompleteWorkflowExecutionCommandAttributes{}, + "", + ) + s.NoError(err) + realUpperBound := s.mockShard.GetTimeSource().Now() + + archival := s.mutableState.InsertTasks[tasks.CategoryArchival] + s.Require().Len(archival, 1) + archiveTask, ok := archival[0].(*tasks.ArchiveExecutionTask) + s.Require().True(ok) + + // With delay=0 the task must fire within the wall-clock window of this + // test's execution. Without the virtual→wall conversion anywhere in the + // chain, it would fire ~1 year out — the [lowerBound, upperBound] interval + // rejects that conclusively. + s.False(archiveTask.VisibilityTimestamp.Before(realLowerBound), + "archival task fires before workflow started: %v < %v", + archiveTask.VisibilityTimestamp, realLowerBound) + s.False(archiveTask.VisibilityTimestamp.After(realUpperBound), + "archival task fires too far in the future (virtual time leaked into VisibilityTimestamp): %v > %v (skip=%v)", + archiveTask.VisibilityTimestamp, realUpperBound, skipped) +} + func (s *mutableStateSuite) TestAddTasks_NoOpWithoutTimeSkipping() { virtualTime := s.mockShard.GetTimeSource().Now().Add(2 * time.Hour) s.mutableState.executionInfo.TimeSkippingInfo = nil @@ -7006,6 +7102,172 @@ func (s *mutableStateSuite) TestAddTasks_NoOpWithoutTimeSkipping() { s.Equal(virtualTime, timerTask.VisibilityTimestamp) } +// installEngineCapturingSpeculativeTimeout wires a mock engine onto the test shard whose +// NotifyNewTasks records each WorkflowTaskTimeoutTask it receives via the speculative +// in-memory queue path. Returned slice is appended to on every call. +func (s *mutableStateSuite) installEngineCapturingSpeculativeTimeout() *[]*tasks.WorkflowTaskTimeoutTask { + captured := &[]*tasks.WorkflowTaskTimeoutTask{} + engine := historyi.NewMockEngine(s.controller) + engine.EXPECT().NotifyNewTasks(gomock.Any()).Do(func(byCategory map[tasks.Category][]tasks.Task) { + for _, t := range byCategory[tasks.CategoryMemoryTimer] { + wttt, ok := t.(*tasks.WorkflowTaskTimeoutTask) + s.Require().True(ok, "expected *WorkflowTaskTimeoutTask in CategoryMemoryTimer slot") + *captured = append(*captured, wttt) + } + }).AnyTimes() + // Shard teardown calls Stop on whichever engine it's holding. + engine.EXPECT().Stop().AnyTimes() + s.mockShard.SetEngineForTesting(engine) + return captured +} + +func (s *mutableStateSuite) TestSetSpeculativeWorkflowTaskTimeoutTask_SubtractsSkip() { + skipped := 30 * time.Minute + virtualTime := s.mockShard.GetTimeSource().Now().Add(2 * time.Hour) + + s.mutableState.executionInfo.TimeSkippingInfo = &persistencespb.TimeSkippingInfo{ + AccumulatedSkippedDuration: durationpb.New(skipped), + } + captured := s.installEngineCapturingSpeculativeTimeout() + + task := &tasks.WorkflowTaskTimeoutTask{ + WorkflowKey: s.mutableState.GetWorkflowKey(), + VisibilityTimestamp: virtualTime, + TimeoutType: enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START, + EventID: 7, + InMemory: true, + } + s.NoError(s.mutableState.SetSpeculativeWorkflowTaskTimeoutTask(task)) + + // Virtual visibility timestamp is converted to wall-clock before the in-memory queue sees + // it — the queue dispatches against real time, so it would otherwise fire `skipped` late. + s.Equal(virtualTime.Add(-skipped), task.VisibilityTimestamp) + s.NotZero(task.TaskID) + s.True(s.mutableState.CheckSpeculativeWorkflowTaskTimeoutTask(task)) + s.Require().Len(*captured, 1) + s.Equal(virtualTime.Add(-skipped), (*captured)[0].GetVisibilityTime()) +} + +func (s *mutableStateSuite) TestSetSpeculativeWorkflowTaskTimeoutTask_NoOpWithoutTimeSkipping() { + virtualTime := s.mockShard.GetTimeSource().Now().Add(2 * time.Hour) + s.mutableState.executionInfo.TimeSkippingInfo = nil + captured := s.installEngineCapturingSpeculativeTimeout() + + task := &tasks.WorkflowTaskTimeoutTask{ + WorkflowKey: s.mutableState.GetWorkflowKey(), + VisibilityTimestamp: virtualTime, + TimeoutType: enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START, + EventID: 7, + InMemory: true, + } + s.NoError(s.mutableState.SetSpeculativeWorkflowTaskTimeoutTask(task)) + + s.Equal(virtualTime, task.VisibilityTimestamp) + s.Require().Len(*captured, 1) + s.Equal(virtualTime, (*captured)[0].GetVisibilityTime()) +} + +func (s *mutableStateSuite) TestRemoveSpeculativeWorkflowTaskTimeoutTask_CancelsAndClears() { + s.installEngineCapturingSpeculativeTimeout() + + task := &tasks.WorkflowTaskTimeoutTask{ + WorkflowKey: s.mutableState.GetWorkflowKey(), + VisibilityTimestamp: s.mockShard.GetTimeSource().Now().Add(time.Minute), + TimeoutType: enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START, + EventID: 7, + InMemory: true, + } + s.NoError(s.mutableState.SetSpeculativeWorkflowTaskTimeoutTask(task)) + s.True(s.mutableState.CheckSpeculativeWorkflowTaskTimeoutTask(task)) + + s.mutableState.RemoveSpeculativeWorkflowTaskTimeoutTask() + + s.False(s.mutableState.CheckSpeculativeWorkflowTaskTimeoutTask(task), "slot should be cleared") + // A second remove on an empty slot is safe. + s.NotPanics(func() { s.mutableState.RemoveSpeculativeWorkflowTaskTimeoutTask() }) +} + +// TestAccumulatedSkippedDuration_NilSafety pins the contract that the unexported +// accumulatedSkippedDuration helper never panics regardless of how much of the proto chain is +// nil, and returns 0 whenever time-skipping is not configured. AddTasks, +// SetSpeculativeWorkflowTaskTimeoutTask, and ToRealTime all rely on this; if it panicked or +// returned non-zero on a nil chain, every non-time-skipping workflow would break. +func (s *mutableStateSuite) TestAccumulatedSkippedDuration_NilSafety() { + s.Run("TimeSkippingInfoNil", func() { + s.mutableState.executionInfo.TimeSkippingInfo = nil + var got time.Duration + s.NotPanics(func() { got = s.mutableState.accumulatedSkippedDuration() }) + s.Equal(time.Duration(0), got) + }) + + s.Run("AccumulatedSkippedDurationNil", func() { + s.mutableState.executionInfo.TimeSkippingInfo = &persistencespb.TimeSkippingInfo{ + // AccumulatedSkippedDuration intentionally nil. + } + var got time.Duration + s.NotPanics(func() { got = s.mutableState.accumulatedSkippedDuration() }) + s.Equal(time.Duration(0), got) + }) + + s.Run("ZeroDuration", func() { + s.mutableState.executionInfo.TimeSkippingInfo = &persistencespb.TimeSkippingInfo{ + AccumulatedSkippedDuration: durationpb.New(0), + } + s.Equal(time.Duration(0), s.mutableState.accumulatedSkippedDuration()) + }) + + s.Run("PositiveDurationRoundTrips", func() { + skipped := 42 * time.Minute + s.mutableState.executionInfo.TimeSkippingInfo = &persistencespb.TimeSkippingInfo{ + AccumulatedSkippedDuration: durationpb.New(skipped), + } + s.Equal(skipped, s.mutableState.accumulatedSkippedDuration()) + }) + + s.Run("ConfigDisabledPreservesAccumulated", func() { + // Config flag does not gate accumulated skip — flipping Enabled to false + // mid-run leaves the offset intact so the virtual frame stays consistent + // with already-stamped events / tasks. + skipped := 17 * time.Minute + s.mutableState.executionInfo.TimeSkippingInfo = &persistencespb.TimeSkippingInfo{ + Config: &workflowpb.TimeSkippingConfig{Enabled: false}, + AccumulatedSkippedDuration: durationpb.New(skipped), + } + s.Equal(skipped, s.mutableState.accumulatedSkippedDuration()) + }) + + s.Run("ConfigNilPreservesAccumulated", func() { + // Defensive: even a malformed TSI with nil Config but set + // AccumulatedSkippedDuration must surface the offset unchanged. + skipped := 7 * time.Minute + s.mutableState.executionInfo.TimeSkippingInfo = &persistencespb.TimeSkippingInfo{ + AccumulatedSkippedDuration: durationpb.New(skipped), + } + s.Equal(skipped, s.mutableState.accumulatedSkippedDuration()) + }) +} + +// TestMutableStateTimeFrames pins the contract that mutable state exposes virtual time via +// Now() and converts back to wall-clock via ToRealTime() using AccumulatedSkippedDuration. +// This is the foundation that AddTasks and SetSpeculativeWorkflowTaskTimeoutTask rely on. +func (s *mutableStateSuite) TestMutableStateTimeFrames() { + wallclock := s.mockShard.GetTimeSource().Now() + + s.Run("NoTimeSkippingInfo", func() { + s.mutableState.executionInfo.TimeSkippingInfo = nil + s.Equal(wallclock, s.mutableState.ToRealTime(wallclock)) + }) + + s.Run("WithAccumulatedSkip_ToRealTimeSubtracts", func() { + skipped := 45 * time.Minute + s.mutableState.executionInfo.TimeSkippingInfo = &persistencespb.TimeSkippingInfo{ + AccumulatedSkippedDuration: durationpb.New(skipped), + } + virtual := wallclock.Add(skipped) + s.Equal(wallclock, s.mutableState.ToRealTime(virtual)) + }) +} + // TestCloseTransactionTimeSkipping exercises the time-skipping logic that runs inside // closeTransaction: // @@ -7111,6 +7373,55 @@ func (s *mutableStateSuite) TestCloseTransactionTimeSkipping() { s.Equal(int64(0), regenerated.TaskID, "TaskID must be zero — assigned by shard, not the generator") }) + s.Run("Active_Eligible_StateMachineTimerNotDoubleEmittedWhenStartedUnscheduled", func() { + // If the group started Scheduled=false, GenerateDirtySubStateMachineTasks + // (which runs earlier in the close transaction) already emits an SM timer + // task with the current virtual→wall conversion. The time-skipping + // regenerator must NOT add a second one — exactly one task in the mutation, + // with the conversion applied to the SM deadline. + now := s.mockShard.GetTimeSource().Now() + userTimerExpiry := now.Add(2 * time.Hour) + smDeadline := now.Add(3 * time.Hour) + + dbState := buildEligibleState(userTimerExpiry) + dbState.ExecutionInfo.StateMachineTimers = []*persistencespb.StateMachineTimerGroup{ + { + Deadline: timestamppb.New(smDeadline), + Scheduled: false, // group starts without a persisted task + Infos: []*persistencespb.StateMachineTaskInfo{ + { + Ref: &persistencespb.StateMachineRef{Path: []*persistencespb.StateMachineKey{{Type: "x", Id: "y"}}}, + Type: "t", + }, + }, + }, + } + + ms, err := NewMutableStateFromDB(s.mockShard, s.mockEventsCache, s.logger, s.namespaceEntry, dbState, 1) + s.Require().NoError(err) + _, err = ms.StartTransaction(s.namespaceEntry) + s.Require().NoError(err) + + mutation, _, err := ms.CloseTransactionAsMutation(context.Background(), historyi.TransactionPolicyActive) + s.Require().NoError(err) + + accumulated := ms.GetExecutionInfo().TimeSkippingInfo.AccumulatedSkippedDuration + s.Require().NotNil(accumulated) + expectedTS := smDeadline.Add(-accumulated.AsDuration()) + + smTaskCount := 0 + var smTask *tasks.StateMachineTimerTask + for _, task := range mutation.Tasks[tasks.CategoryTimer] { + if smt, ok := task.(*tasks.StateMachineTimerTask); ok { + smTaskCount++ + smTask = smt + } + } + s.Equal(1, smTaskCount, "exactly one StateMachineTimerTask expected; regenerator must not double-emit") + s.Require().NotNil(smTask) + s.Equal(expectedTS, smTask.VisibilityTimestamp, "the single emitted task carries the current virtual→wall conversion") + }) + s.Run("Active_TimeSkippingDisabled_NoEvent", func() { // When TimeSkippingInfo is nil, ShouldExecuteTimeSkipping returns false immediately. // No time-skipping event should be emitted and AccumulatedSkippedDuration stays nil. @@ -8360,3 +8671,129 @@ func (s *mutableStateSuite) TestCloseTransactionTimeSkipping_Bound() { s.InDelta(float64(time.Hour), float64(got), float64(time.Millisecond)) }) } + +func (s *mutableStateSuite) TestToRealTime() { + virtualTime := time.Date(2026, 5, 4, 12, 0, 0, 0, time.UTC) + s.Run("IdentityWhenTimeSkippingInfoNil", func() { + s.mutableState.executionInfo.TimeSkippingInfo = nil + s.Equal(virtualTime, s.mutableState.ToRealTime(virtualTime)) + }) + s.Run("IdentityWhenTimeSkippingInfoWithNilAccumulatedSkippedDuration", func() { + s.mutableState.executionInfo.TimeSkippingInfo = &persistencespb.TimeSkippingInfo{ + Config: &workflowpb.TimeSkippingConfig{Enabled: true}, + } + s.Equal(virtualTime, s.mutableState.ToRealTime(virtualTime)) + }) + s.Run("SubtractsAccumulatedSkip", func() { + accum := time.Hour + s.mutableState.executionInfo.TimeSkippingInfo = &persistencespb.TimeSkippingInfo{ + Config: &workflowpb.TimeSkippingConfig{Enabled: false}, + AccumulatedSkippedDuration: durationpb.New(accum), + } + s.Equal(virtualTime.Add(-accum), s.mutableState.ToRealTime(virtualTime)) + }) + s.Run("ZeroInputReturnedUnchangedEvenUnderSkip", func() { + s.mutableState.executionInfo.TimeSkippingInfo = &persistencespb.TimeSkippingInfo{ + Config: &workflowpb.TimeSkippingConfig{Enabled: true}, + AccumulatedSkippedDuration: durationpb.New(time.Hour), + } + s.True(s.mutableState.ToRealTime(time.Time{}).IsZero()) + }) + s.Run("RoundTripsWithNowYieldsWallClock", func() { + // The defining contract of the virtual-time frame: ToRealTime ∘ Now is + // the identity to wall clock. AddTasks relies on this to stamp timer + // VisibilityTimestamps that fire at the right real moment after a skip. + fixedBase := time.Date(2026, 7, 1, 8, 0, 0, 0, time.UTC) + ts := clock.NewEventTimeSource() + ts.Update(fixedBase) + s.mutableState.timeSource = ts + s.mutableState.executionInfo.TimeSkippingInfo = &persistencespb.TimeSkippingInfo{ + AccumulatedSkippedDuration: durationpb.New(2 * time.Hour), + } + s.mutableState.wrapTimeSourceWithTimeSkipping() + + s.Equal(fixedBase, s.mutableState.ToRealTime(s.mutableState.Now())) + }) + s.Run("PastVirtualTimeStillSubtractsAccumulated", func() { + // Sanity that ToRealTime is unconditional arithmetic — it doesn't gate on + // whether the input is past or future relative to Now. A persisted virtual + // timestamp from an older event should still convert correctly. + accum := 4 * time.Hour + s.mutableState.executionInfo.TimeSkippingInfo = &persistencespb.TimeSkippingInfo{ + AccumulatedSkippedDuration: durationpb.New(accum), + } + past := time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC) + s.Equal(past.Add(-accum), s.mutableState.ToRealTime(past)) + }) +} + +// TestMutableStateImpl_Now pins the production-facing virtual-time contract +// surfaced by MutableState.Now(). Most call sites go through this method (not +// the underlying timeSource), so the test exercises Now() directly across the +// wrap state machine — pre-wrap returns wall, post-wrap returns virtual, and +// the wrapper's closure must observe live mutations to AccumulatedSkippedDuration. +func (s *mutableStateSuite) TestMutableStateImpl_Now() { + fixedBase := time.Date(2026, 7, 1, 8, 0, 0, 0, time.UTC) + fixedTimeSource := func() *clock.EventTimeSource { + ts := clock.NewEventTimeSource() + ts.Update(fixedBase) + return ts + } + + s.Run("UnwrappedReturnsWallClock", func() { + s.mutableState.timeSource = fixedTimeSource() + s.mutableState.executionInfo.TimeSkippingInfo = nil + s.Equal(fixedBase, s.mutableState.Now()) + }) + + s.Run("WrappedWithoutTSIReturnsWallClock", func() { + // Wrapper installed but no TSI → zero offset, virtual == wall. + s.mutableState.timeSource = fixedTimeSource() + s.mutableState.executionInfo.TimeSkippingInfo = nil + s.mutableState.wrapTimeSourceWithTimeSkipping() + s.Equal(fixedBase, s.mutableState.Now()) + }) + + s.Run("WrappedWithAccumulatedReturnsVirtual", func() { + const skip = 3 * time.Hour + s.mutableState.timeSource = fixedTimeSource() + s.mutableState.executionInfo.TimeSkippingInfo = &persistencespb.TimeSkippingInfo{ + AccumulatedSkippedDuration: durationpb.New(skip), + } + s.mutableState.wrapTimeSourceWithTimeSkipping() + s.Equal(fixedBase.Add(skip), s.mutableState.Now()) + }) + + s.Run("LiveClosureTracksAccumulatedMutation", func() { + // Per spec: the wrapper reads the offset lazily on every call. Mutating + // AccumulatedSkippedDuration in place (as the transition apply path does) + // must be observed by subsequent Now() calls without re-wrapping. + const ( + initial = time.Hour + grown = 4 * time.Hour + ) + s.mutableState.timeSource = fixedTimeSource() + s.mutableState.executionInfo.TimeSkippingInfo = &persistencespb.TimeSkippingInfo{ + AccumulatedSkippedDuration: durationpb.New(initial), + } + s.mutableState.wrapTimeSourceWithTimeSkipping() + s.Equal(fixedBase.Add(initial), s.mutableState.Now()) + + s.mutableState.executionInfo.TimeSkippingInfo.AccumulatedSkippedDuration = durationpb.New(grown) + s.Equal(fixedBase.Add(grown), s.mutableState.Now()) + }) + + s.Run("DisabledConfigStillReturnsVirtualWhenAccumulated", func() { + // AccumulatedSkippedDuration is preserved when Config.Enabled is flipped + // off mid-run; Now() must continue returning virtual time so already-stamped + // virtual timestamps remain coherent. + const skip = time.Hour + s.mutableState.timeSource = fixedTimeSource() + s.mutableState.executionInfo.TimeSkippingInfo = &persistencespb.TimeSkippingInfo{ + Config: &workflowpb.TimeSkippingConfig{Enabled: false}, + AccumulatedSkippedDuration: durationpb.New(skip), + } + s.mutableState.wrapTimeSourceWithTimeSkipping() + s.Equal(fixedBase.Add(skip), s.mutableState.Now()) + }) +} diff --git a/service/history/workflow/task_generator.go b/service/history/workflow/task_generator.go index 2aeb97f8798..ee8a2ccc37e 100644 --- a/service/history/workflow/task_generator.go +++ b/service/history/workflow/task_generator.go @@ -1131,5 +1131,8 @@ func (r *TaskGeneratorImpl) RegenerateTimerTasksForTimeSkipping() error { }) } } + // todo@time-skipping: not supported yet + // ChasmTaskPure (in-memory CHASM pure tasks) hasn't been supported yet. + // HSM timer (callbacks, nexusoperations) hasn't been supported yet. return nil } From 592b1e995604c313f4099fcd779725fdea7d2a6a Mon Sep 17 00:00:00 2001 From: Feiyang Xie Date: Fri, 22 May 2026 10:53:40 -0700 Subject: [PATCH 2/3] timer queues use toRealTime for time validation --- .../timer_queue_active_task_executor.go | 10 +- .../timer_queue_active_task_executor_test.go | 1 + .../timer_queue_standby_task_executor.go | 8 +- .../timer_queue_standby_task_executor_test.go | 3 + .../history/timer_queue_task_executor_base.go | 13 +- .../timer_queue_task_executor_base_test.go | 2 + .../workflow/mutable_state_impl_test.go | 144 ++++++++++-------- service/history/workflow/task_generator.go | 7 +- 8 files changed, 104 insertions(+), 84 deletions(-) diff --git a/service/history/timer_queue_active_task_executor.go b/service/history/timer_queue_active_task_executor.go index ee11fb1309c..efb7d446304 100644 --- a/service/history/timer_queue_active_task_executor.go +++ b/service/history/timer_queue_active_task_executor.go @@ -160,7 +160,7 @@ func (t *timerQueueActiveTaskExecutor) executeUserTimerTimeoutTask( } timerSequence := t.getTimerSequence(mutableState) - referenceTime := mutableState.Now() + referenceTime := t.Now() timerFired := false Loop: for _, timerSequenceID := range timerSequence.LoadAndSortUserTimers() { @@ -171,9 +171,7 @@ Loop: return serviceerror.NewInternal(errString) } - // when time-skipping happens, the task.FireTime is way before the timerSequenceID.Timestamp, - // but using the virtual time of ms as the reference time, this function will still return true. - if !queues.IsTimeExpired(task, referenceTime, timerSequenceID.Timestamp) { + if !queues.IsTimeExpired(task, referenceTime, mutableState.ToRealTime(timerSequenceID.Timestamp)) { // Timer sequence IDs are sorted; once we encounter a timer whose // sequence ID has not expired, all subsequent timers will not have // expired. @@ -226,7 +224,7 @@ func (t *timerQueueActiveTaskExecutor) executeActivityTimeoutTask( } timerSequence := t.getTimerSequence(mutableState) - referenceTime := mutableState.Now() + referenceTime := t.Now() updateMutableState := false scheduleWorkflowTask := false @@ -247,7 +245,7 @@ func (t *timerQueueActiveTaskExecutor) executeActivityTimeoutTask( Loop: for _, timerSequenceID := range timerSequence.LoadAndSortActivityTimers() { - if !queues.IsTimeExpired(task, referenceTime, timerSequenceID.Timestamp) { + if !queues.IsTimeExpired(task, referenceTime, mutableState.ToRealTime(timerSequenceID.Timestamp)) { // timer sequence IDs are sorted, once there is one timer // sequence ID not expired, all after that wil not expired break Loop diff --git a/service/history/timer_queue_active_task_executor_test.go b/service/history/timer_queue_active_task_executor_test.go index 1eee344e756..4da471c0c99 100644 --- a/service/history/timer_queue_active_task_executor_test.go +++ b/service/history/timer_queue_active_task_executor_test.go @@ -2153,6 +2153,7 @@ func (s *timerQueueActiveTaskExecutorSuite) TestExecuteStateMachineTimerTask_Exe ).AnyTimes() ms.EXPECT().HSM().Return(root).AnyTimes() ms.EXPECT().Now().Return(s.now).AnyTimes() + ms.EXPECT().ToRealTime(gomock.Any()).DoAndReturn(func(t time.Time) time.Time { return t }).AnyTimes() _, err = dummy.MachineCollection(root).Add("dummy", dummy.NewDummy()) s.NoError(err) diff --git a/service/history/timer_queue_standby_task_executor.go b/service/history/timer_queue_standby_task_executor.go index e679cd35071..96b97992764 100644 --- a/service/history/timer_queue_standby_task_executor.go +++ b/service/history/timer_queue_standby_task_executor.go @@ -239,6 +239,7 @@ func (t *timerQueueStandbyTaskExecutor) executeUserTimerTimeoutTask( ctx context.Context, timerTask *tasks.UserTimerTask, ) error { + referenceTime := t.Now() actionFn := func(_ context.Context, wfContext historyi.WorkflowContext, mutableState historyi.MutableState, _ historyi.ReleaseWorkflowContextFunc) (any, error) { if !mutableState.IsWorkflowExecutionRunning() { // workflow already finished, no need to process the timer @@ -247,7 +248,6 @@ func (t *timerQueueStandbyTaskExecutor) executeUserTimerTimeoutTask( timerSequence := t.getTimerSequence(mutableState) timerSequenceIDs := timerSequence.LoadAndSortUserTimers() - referenceTime := mutableState.Now() if len(timerSequenceIDs) > 0 { timerSequenceID := timerSequenceIDs[0] _, ok := mutableState.GetUserTimerInfoByEventID(timerSequenceID.EventID) @@ -260,7 +260,7 @@ func (t *timerQueueStandbyTaskExecutor) executeUserTimerTimeoutTask( if queues.IsTimeExpired( timerTask, referenceTime, - timerSequenceID.Timestamp, + mutableState.ToRealTime(timerSequenceID.Timestamp), ) { return &struct{}{}, nil } @@ -299,13 +299,13 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityTimeoutTask( // // the overall solution is to attempt to generate a new activity timer task whenever the // task passed in is safe to be throw away. + referenceTime := t.Now() actionFn := func(ctx context.Context, wfContext historyi.WorkflowContext, mutableState historyi.MutableState, _ historyi.ReleaseWorkflowContextFunc) (any, error) { if !mutableState.IsWorkflowExecutionRunning() { // workflow already finished, no need to process the timer return nil, nil } - referenceTime := mutableState.Now() timerSequence := t.getTimerSequence(mutableState) updateMutableState := false timerSequenceIDs := timerSequence.LoadAndSortActivityTimers() @@ -321,7 +321,7 @@ func (t *timerQueueStandbyTaskExecutor) executeActivityTimeoutTask( if queues.IsTimeExpired( timerTask, referenceTime, - timerSequenceID.Timestamp, + mutableState.ToRealTime(timerSequenceID.Timestamp), ) { return &struct{}{}, nil } diff --git a/service/history/timer_queue_standby_task_executor_test.go b/service/history/timer_queue_standby_task_executor_test.go index 0ca364ece9f..9b843781713 100644 --- a/service/history/timer_queue_standby_task_executor_test.go +++ b/service/history/timer_queue_standby_task_executor_test.go @@ -1947,6 +1947,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestExecuteStateMachineTimerTask_Ex ).AnyTimes() ms.EXPECT().HSM().Return(root).AnyTimes() ms.EXPECT().Now().Return(s.mockShard.GetTimeSource().Now()).AnyTimes() + ms.EXPECT().ToRealTime(gomock.Any()).DoAndReturn(func(t time.Time) time.Time { return t }).AnyTimes() _, err = dummy.MachineCollection(root).Add("dummy", dummy.NewDummy()) s.NoError(err) @@ -2082,6 +2083,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestExecuteStateMachineTimerTask_Va ).AnyTimes() ms.EXPECT().HSM().Return(root).AnyTimes() ms.EXPECT().Now().Return(s.mockShard.GetTimeSource().Now()).AnyTimes() + ms.EXPECT().ToRealTime(gomock.Any()).DoAndReturn(func(t time.Time) time.Time { return t }).AnyTimes() _, err = dummy.MachineCollection(root).Add("dummy", dummy.NewDummy()) s.NoError(err) @@ -2192,6 +2194,7 @@ func (s *timerQueueStandbyTaskExecutorSuite) TestExecuteStateMachineTimerTask_St ).AnyTimes() ms.EXPECT().HSM().Return(root).AnyTimes() ms.EXPECT().Now().Return(s.mockShard.GetTimeSource().Now()).AnyTimes() + ms.EXPECT().ToRealTime(gomock.Any()).DoAndReturn(func(t time.Time) time.Time { return t }).AnyTimes() _, err = dummy.MachineCollection(root).Add("dummy", dummy.NewDummy()) s.NoError(err) diff --git a/service/history/timer_queue_task_executor_base.go b/service/history/timer_queue_task_executor_base.go index 5372fa39841..0292aa38291 100644 --- a/service/history/timer_queue_task_executor_base.go +++ b/service/history/timer_queue_task_executor_base.go @@ -159,9 +159,11 @@ func (t *timerQueueTaskExecutorBase) deleteHistoryBranch( return nil } -// isValidExpirationTime checks if the expiration time is expired. -// The current time of the mutable state is used to check the expiration because -// when time skipping happens, time related to a mutable state is virtual-framed. +// isValidExpirationTime returns true when the workflow is still running and +// expirationTime has been reached. Callers pass fields from an execution +// (e.g. WorkflowRunExpirationTime), which are stored in the execution's virtual +// frame (if time skipping happens); this converts via ms.ToRealTime before +// comparing to t.Now() because the timer queue dispatches against wall-clock. func (t *timerQueueTaskExecutorBase) isValidExpirationTime( mutableState historyi.MutableState, task tasks.Task, @@ -171,7 +173,7 @@ func (t *timerQueueTaskExecutorBase) isValidExpirationTime( return false } taskShouldTriggerAt := expirationTime.AsTime() - expired := queues.IsTimeExpired(task, mutableState.Now(), taskShouldTriggerAt) + expired := queues.IsTimeExpired(task, t.Now(), mutableState.ToRealTime(taskShouldTriggerAt)) return expired } @@ -306,8 +308,7 @@ func (t *timerQueueTaskExecutorBase) executeStateMachineTimers( // StateMachineTimers are sorted by Deadline, iterate through them as long as the deadline is expired. for len(timers) > 0 { group := timers[0] - // TODO@time-skipping: review needed, used ms.Now() instead of t.Now() for consistency. - if !queues.IsTimeExpired(task, ms.Now(), group.Deadline.AsTime()) { + if !queues.IsTimeExpired(task, t.Now(), ms.ToRealTime(group.Deadline.AsTime())) { break } diff --git a/service/history/timer_queue_task_executor_base_test.go b/service/history/timer_queue_task_executor_base_test.go index cf562a3f168..14806701bb1 100644 --- a/service/history/timer_queue_task_executor_base_test.go +++ b/service/history/timer_queue_task_executor_base_test.go @@ -229,6 +229,7 @@ func (s *timerQueueTaskExecutorBaseSuite) TestIsValidExecutionTimeoutTask() { }).AnyTimes() mockMutableState.EXPECT().IsWorkflowExecutionRunning().Return(tc.workflowRunning).AnyTimes() mockMutableState.EXPECT().Now().Return(s.testShardContext.GetTimeSource().Now()).AnyTimes() + mockMutableState.EXPECT().ToRealTime(gomock.Any()).DoAndReturn(func(t time.Time) time.Time { return t }).AnyTimes() isValid := s.timerQueueTaskExecutorBase.isValidWorkflowExecutionTimeoutTask(mockMutableState, timerTask) s.Equal(tc.isValid, isValid) @@ -251,6 +252,7 @@ func (s *timerQueueTaskExecutorBaseSuite) TestIsValidExecutionTimeouts() { mockMutableState := historyi.NewMockMutableState(s.controller) mockMutableState.EXPECT().IsWorkflowExecutionRunning().Return(true).AnyTimes() mockMutableState.EXPECT().Now().Return(timeNow).AnyTimes() + mockMutableState.EXPECT().ToRealTime(gomock.Any()).DoAndReturn(func(t time.Time) time.Time { return t }).AnyTimes() testCases := []struct { name string diff --git a/service/history/workflow/mutable_state_impl_test.go b/service/history/workflow/mutable_state_impl_test.go index 2a8aae381ca..4c5c6b942fa 100644 --- a/service/history/workflow/mutable_state_impl_test.go +++ b/service/history/workflow/mutable_state_impl_test.go @@ -7148,45 +7148,6 @@ func (s *mutableStateSuite) TestSetSpeculativeWorkflowTaskTimeoutTask_SubtractsS s.Equal(virtualTime.Add(-skipped), (*captured)[0].GetVisibilityTime()) } -func (s *mutableStateSuite) TestSetSpeculativeWorkflowTaskTimeoutTask_NoOpWithoutTimeSkipping() { - virtualTime := s.mockShard.GetTimeSource().Now().Add(2 * time.Hour) - s.mutableState.executionInfo.TimeSkippingInfo = nil - captured := s.installEngineCapturingSpeculativeTimeout() - - task := &tasks.WorkflowTaskTimeoutTask{ - WorkflowKey: s.mutableState.GetWorkflowKey(), - VisibilityTimestamp: virtualTime, - TimeoutType: enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START, - EventID: 7, - InMemory: true, - } - s.NoError(s.mutableState.SetSpeculativeWorkflowTaskTimeoutTask(task)) - - s.Equal(virtualTime, task.VisibilityTimestamp) - s.Require().Len(*captured, 1) - s.Equal(virtualTime, (*captured)[0].GetVisibilityTime()) -} - -func (s *mutableStateSuite) TestRemoveSpeculativeWorkflowTaskTimeoutTask_CancelsAndClears() { - s.installEngineCapturingSpeculativeTimeout() - - task := &tasks.WorkflowTaskTimeoutTask{ - WorkflowKey: s.mutableState.GetWorkflowKey(), - VisibilityTimestamp: s.mockShard.GetTimeSource().Now().Add(time.Minute), - TimeoutType: enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START, - EventID: 7, - InMemory: true, - } - s.NoError(s.mutableState.SetSpeculativeWorkflowTaskTimeoutTask(task)) - s.True(s.mutableState.CheckSpeculativeWorkflowTaskTimeoutTask(task)) - - s.mutableState.RemoveSpeculativeWorkflowTaskTimeoutTask() - - s.False(s.mutableState.CheckSpeculativeWorkflowTaskTimeoutTask(task), "slot should be cleared") - // A second remove on an empty slot is safe. - s.NotPanics(func() { s.mutableState.RemoveSpeculativeWorkflowTaskTimeoutTask() }) -} - // TestAccumulatedSkippedDuration_NilSafety pins the contract that the unexported // accumulatedSkippedDuration helper never panics regardless of how much of the proto chain is // nil, and returns 0 whenever time-skipping is not configured. AddTasks, @@ -7247,27 +7208,6 @@ func (s *mutableStateSuite) TestAccumulatedSkippedDuration_NilSafety() { }) } -// TestMutableStateTimeFrames pins the contract that mutable state exposes virtual time via -// Now() and converts back to wall-clock via ToRealTime() using AccumulatedSkippedDuration. -// This is the foundation that AddTasks and SetSpeculativeWorkflowTaskTimeoutTask rely on. -func (s *mutableStateSuite) TestMutableStateTimeFrames() { - wallclock := s.mockShard.GetTimeSource().Now() - - s.Run("NoTimeSkippingInfo", func() { - s.mutableState.executionInfo.TimeSkippingInfo = nil - s.Equal(wallclock, s.mutableState.ToRealTime(wallclock)) - }) - - s.Run("WithAccumulatedSkip_ToRealTimeSubtracts", func() { - skipped := 45 * time.Minute - s.mutableState.executionInfo.TimeSkippingInfo = &persistencespb.TimeSkippingInfo{ - AccumulatedSkippedDuration: durationpb.New(skipped), - } - virtual := wallclock.Add(skipped) - s.Equal(wallclock, s.mutableState.ToRealTime(virtual)) - }) -} - // TestCloseTransactionTimeSkipping exercises the time-skipping logic that runs inside // closeTransaction: // @@ -8340,6 +8280,65 @@ func (s *mutableStateSuite) TestInitTimeSkippingInfo() { }) } +// TestTimeSkippingPreservesUnboundedExpiration locks down the invariant that +// time skipping never turns an unbounded workflow (no execution / run timeout) +// into a bounded one. The only write path under skip that touches +// WorkflowExecutionExpirationTime / WorkflowRunExpirationTime is +// shiftWorkflowTimes (driven by initTimeSkippingInfo for propagated runs), +// which gates per-field on timeNotSet — so nil/zero inputs must round-trip +// unchanged. RefreshExpirationTimeoutTask has its own pre-skipping guards +// (weTimeout > 0 / wrTimeout != 0) that are independent of time-skipping +// state. The companion read path ToRealTime is covered by +// TestToRealTime/ZeroInputReturnedUnchangedEvenUnderSkip. +func (s *mutableStateSuite) TestTimeSkippingPreservesUnboundedExpiration() { + const initialDur = 50 * time.Minute + baseTime := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + + s.Run("NilExpiration_StaysNil_AfterPositiveInitialSkip", func() { + s.mutableState.timeSource = clock.NewEventTimeSource().Update(baseTime) + s.mutableState.executionInfo.StartTime = timestamppb.New(baseTime) + s.mutableState.executionInfo.ExecutionTime = timestamppb.New(baseTime) + s.mutableState.executionState.StartTime = timestamppb.New(baseTime) + s.mutableState.executionInfo.WorkflowRunExpirationTime = nil + s.mutableState.executionInfo.WorkflowExecutionExpirationTime = nil + + cfg := &workflowpb.TimeSkippingConfig{Enabled: true} + s.mutableState.initTimeSkippingInfo(cfg, durationpb.New(initialDur), 1) + + s.Nil(s.mutableState.executionInfo.WorkflowRunExpirationTime) + s.Nil(s.mutableState.executionInfo.WorkflowExecutionExpirationTime) + }) + + s.Run("ZeroExpiration_StaysZero_AfterPositiveInitialSkip", func() { + // A proto-wrapped zero time is the second flavor of "not set" — timeNotSet + // treats it the same as nil, so shiftWorkflowTimes must leave it at zero. + s.mutableState.timeSource = clock.NewEventTimeSource().Update(baseTime) + s.mutableState.executionInfo.StartTime = timestamppb.New(baseTime) + s.mutableState.executionInfo.ExecutionTime = timestamppb.New(baseTime) + s.mutableState.executionState.StartTime = timestamppb.New(baseTime) + s.mutableState.executionInfo.WorkflowRunExpirationTime = timestamppb.New(time.Time{}) + s.mutableState.executionInfo.WorkflowExecutionExpirationTime = timestamppb.New(time.Time{}) + + cfg := &workflowpb.TimeSkippingConfig{Enabled: true} + s.mutableState.initTimeSkippingInfo(cfg, durationpb.New(initialDur), 1) + + s.True(s.mutableState.executionInfo.WorkflowRunExpirationTime.AsTime().IsZero()) + s.True(s.mutableState.executionInfo.WorkflowExecutionExpirationTime.AsTime().IsZero()) + }) + + s.Run("ShiftWorkflowTimes_DirectCall_NilAndZero_NotShifted", func() { + // Directly exercise shiftWorkflowTimes to pin the contract independent of + // the initTimeSkippingInfo wrapper. accum > 0 must be a no-op on + // unset expiration fields regardless of how it was invoked. + s.mutableState.executionInfo.WorkflowRunExpirationTime = nil + s.mutableState.executionInfo.WorkflowExecutionExpirationTime = timestamppb.New(time.Time{}) + s.mutableState.shiftWorkflowTimes(durationpb.New(2 * time.Hour)) + s.Nil(s.mutableState.executionInfo.WorkflowRunExpirationTime) + s.True(s.mutableState.executionInfo.WorkflowExecutionExpirationTime.AsTime().IsZero()) + }) + +} + // TestUpdateTimeSkippingInfo verifies updateTimeSkippingInfo replaces Config but // preserves AccumulatedSkippedDuration, and re-runs applyTimeSkippingBound. func (s *mutableStateSuite) TestUpdateTimeSkippingInfo() { @@ -8672,6 +8671,8 @@ func (s *mutableStateSuite) TestCloseTransactionTimeSkipping_Bound() { }) } +// TestToRealTime tests ms.ToRealTime() exhaustively as this function is also used by executions that don't +// use time skipping and need to be tested thoroughly. This function converts virtual time to wall-clock time. func (s *mutableStateSuite) TestToRealTime() { virtualTime := time.Date(2026, 5, 4, 12, 0, 0, 0, time.UTC) s.Run("IdentityWhenTimeSkippingInfoNil", func() { @@ -8725,13 +8726,26 @@ func (s *mutableStateSuite) TestToRealTime() { past := time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC) s.Equal(past.Add(-accum), s.mutableState.ToRealTime(past)) }) + s.Run("ToRealTimeDoesNotMutateState", func() { + ts := clock.NewEventTimeSource() + ts.Update(time.Date(2026, 7, 1, 8, 0, 0, 0, time.UTC)) + s.mutableState.timeSource = ts + s.mutableState.executionInfo.TimeSkippingInfo = &persistencespb.TimeSkippingInfo{ + AccumulatedSkippedDuration: durationpb.New(time.Hour), + } + + tsiBefore := proto.Clone(s.mutableState.executionInfo.TimeSkippingInfo).(*persistencespb.TimeSkippingInfo) + nowBefore := s.mutableState.timeSource.Now() + + _ = s.mutableState.ToRealTime(virtualTime) + + s.Equal(nowBefore, s.mutableState.timeSource.Now()) + s.True(proto.Equal(tsiBefore, s.mutableState.executionInfo.TimeSkippingInfo)) + }) } -// TestMutableStateImpl_Now pins the production-facing virtual-time contract -// surfaced by MutableState.Now(). Most call sites go through this method (not -// the underlying timeSource), so the test exercises Now() directly across the -// wrap state machine — pre-wrap returns wall, post-wrap returns virtual, and -// the wrapper's closure must observe live mutations to AccumulatedSkippedDuration. +// TestMutableStateImpl_Now tests ms.Now() exhaustively as this function is also used by executions that don't +// use time skipping and need to be tested thoroughly. func (s *mutableStateSuite) TestMutableStateImpl_Now() { fixedBase := time.Date(2026, 7, 1, 8, 0, 0, 0, time.UTC) fixedTimeSource := func() *clock.EventTimeSource { diff --git a/service/history/workflow/task_generator.go b/service/history/workflow/task_generator.go index ee8a2ccc37e..4d252fc6a42 100644 --- a/service/history/workflow/task_generator.go +++ b/service/history/workflow/task_generator.go @@ -1047,6 +1047,9 @@ func (r *TaskGeneratorImpl) RegenerateTimerTasksForTimeSkipping() error { } // Task regeneration: mutableState.AddTask will adapt virtual time to wall time. + // WorkflowTask, Activity, HSM(only nexusoperations) timer tasks won't be regenerated + // because time skipping pauses when there are in-flight work. + // (1) user timers — regenerate one task per pending user timer. User timers // are only one of the task types that may need regeneration, so continue to // the timeout timers below even when none are pending. @@ -1131,8 +1134,6 @@ func (r *TaskGeneratorImpl) RegenerateTimerTasksForTimeSkipping() error { }) } } - // todo@time-skipping: not supported yet - // ChasmTaskPure (in-memory CHASM pure tasks) hasn't been supported yet. - // HSM timer (callbacks, nexusoperations) hasn't been supported yet. + // todo@time-skipping: ChasmTaskPure is not supported yet. return nil } From f6fc1cc349a2fba7ad62dbde9d7627faa98eb53a Mon Sep 17 00:00:00 2001 From: Feiyang Xie Date: Tue, 26 May 2026 12:38:51 -0700 Subject: [PATCH 3/3] refine add task comment