Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 77 additions & 85 deletions chasm/lib/activity/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,9 @@ func (a *Activity) UpdateActivityExecutionOptions(

attempt.Stamp++

if a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_STARTED || a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED {
if a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_STARTED ||
a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED ||
a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED {
// Re-create the start-to-close timeout task with the new stamp and (possibly updated) timeout.
// The old task was invalidated by the stamp increment above.
if timeout := a.GetStartToCloseTimeout().AsDuration(); timeout > 0 {
Expand Down Expand Up @@ -793,7 +795,7 @@ func (a *Activity) handlePauseRequested(ctx chasm.MutableContext, req *activityp
if a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED {
return nil, serviceerror.NewFailedPrecondition("cannot pause an activity with a pending cancellation")
}
if a.PauseState != nil {
if a.isPaused() {
newReqID := req.GetFrontendRequest().GetRequestId()
existingReqID := a.PauseState.GetRequestId()
if newReqID != "" && existingReqID == newReqID {
Expand All @@ -807,19 +809,19 @@ func (a *Activity) handlePauseRequested(ctx chasm.MutableContext, req *activityp
return nil, err
}

if TransitionPaused.Possible(a) {
// SCHEDULED → real PAUSED status; stamp bumped to invalidate the pending dispatch task.
if err := TransitionPaused.Apply(a, ctx, pauseEvent{
req: req.GetFrontendRequest(),
metricsHandler: metricsHandler,
}); err != nil {
event := pauseEvent{req: req.GetFrontendRequest(), metricsHandler: metricsHandler}
switch a.GetStatus() {
case activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED:
if err := TransitionPaused.Apply(a, ctx, event); err != nil {
return nil, err
}
case activitypb.ACTIVITY_EXECUTION_STATUS_STARTED:
if err := TransitionPauseRequested.Apply(a, ctx, event); err != nil {
return nil, err
}
return &activitypb.PauseActivityExecutionResponse{}, nil
default:
return nil, serviceerror.NewFailedPreconditionf("activity is in non-pausable state %v", a.GetStatus())
}
// STARTED → flag-only pause. Status stays STARTED so the worker's token remains valid.
// The worker will see ActivityPaused=true on the next heartbeat.
a.pause(ctx, pauseEvent{req.GetFrontendRequest(), metricsHandler})
return &activitypb.PauseActivityExecutionResponse{}, nil
}

Expand All @@ -829,8 +831,7 @@ func (a *Activity) handleUnpauseRequested(ctx chasm.MutableContext, req *activit
if a.isTerminal() {
return nil, serviceerror.NewFailedPreconditionf("activity is in terminal state %v", a.GetStatus())
}
// Not paused → no-op.
if a.PauseState == nil {
if !a.isPaused() {
return &activitypb.UnpauseActivityExecutionResponse{}, nil
}

Expand All @@ -839,31 +840,32 @@ func (a *Activity) handleUnpauseRequested(ctx chasm.MutableContext, req *activit
return nil, err
}

if TransitionUnpaused.Possible(a) {
if err := TransitionUnpaused.Apply(a, ctx, unpauseEvent{
req: req.GetFrontendRequest(),
metricsHandler: metricsHandler,
}); err != nil {
event := unpauseEvent{req: req.GetFrontendRequest(), metricsHandler: metricsHandler}
switch a.GetStatus() {
case activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED:
if err := TransitionUnpaused.Apply(a, ctx, event); err != nil {
return nil, err
}
return &activitypb.UnpauseActivityExecutionResponse{}, nil
case activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED:
if err := TransitionUnpausedToStarted.Apply(a, ctx, event); err != nil {
return nil, err
}
default:
return nil, serviceerror.NewFailedPreconditionf("activity is in non-unpausable state %v", a.GetStatus())
}
return &activitypb.UnpauseActivityExecutionResponse{}, nil
}

// Flag-based pause (status is STARTED, CANCEL_REQUESTED, or SCHEDULED after retry while paused).
if a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_STARTED ||
a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED {
// Worker continues with its existing token — no stamp bump needed, no dispatch task.
// Cancel takes precedence over pause. Unpause clears the pause flag but does not re-dispatch;
// the activity remains CANCEL_REQUESTED and will be cancelled when the worker responds.
a.PauseState = nil
a.emitOnUnpausedMetrics(metricsHandler)
return &activitypb.UnpauseActivityExecutionResponse{}, nil
// isPaused reports whether the activity is currently paused (waiting) or has a pending pause request
// (worker still running).
func (a *Activity) isPaused() bool {
switch a.GetStatus() {
case activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED,
activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED:
return true
default:
return false
}
a.unpause(ctx, unpauseEvent{
req: req.GetFrontendRequest(),
metricsHandler: metricsHandler,
})
return &activitypb.UnpauseActivityExecutionResponse{}, nil
}

func (a *Activity) unpause(
Expand Down Expand Up @@ -897,7 +899,7 @@ func (a *Activity) unpause(
a.emitOnUnpausedMetrics(event.metricsHandler)
}

func (a *Activity) pause(
func (a *Activity) recordPauseState(
ctx chasm.MutableContext,
event pauseEvent,
) {
Expand Down Expand Up @@ -970,44 +972,42 @@ func (a *Activity) handleReset(ctx chasm.MutableContext, req *activitypb.ResetAc

switch a.Status {
case activitypb.ACTIVITY_EXECUTION_STATUS_STARTED,
activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED:
activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED,
activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED:
// Activity is running. Defer reset to the next retry so we don't break
// the running worker's task token (which encodes the current attempt count).
a.ActivityReset = true
if frontendReq.GetResetHeartbeat() {
a.ResetHeartbeats = true
}
if !keepPaused {
// Clear PauseState now so TransitionRescheduled can dispatch without being
// blocked by the validator (which drops dispatch tasks when PauseState != nil).
a.PauseState = nil
if a.Status == activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED && !keepPaused {
// Unpause; the deferred reset will apply on the next retry via STARTED->SCHEDULED.
if err := TransitionUnpausedToStarted.Apply(a, ctx, unpauseEvent{
req: &workflowservice.UnpauseActivityExecutionRequest{},
metricsHandler: metricsHandler,
}); err != nil {
return nil, err
}
}
a.emitOnResetMetrics(metricsHandler)
return &activitypb.ResetActivityExecutionResponse{}, nil

case activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED,
activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED:
// A SCHEDULED activity can carry PauseState when a pause was applied concurrently
// (e.g. deferred from a STARTED→retry path). In that case the dispatch task emitted
// by TransitionReset would be dropped by the validator, leaving the activity stuck.
// Treat any non-nil PauseState the same way as the explicit PAUSED status.
if a.PauseState != nil {
if keepPaused {
// Reset counts but keep the activity paused.
// No dispatch task — the user must unpause to re-dispatch.
attempt := a.LastAttempt.Get(ctx)
attempt.Count = 1
attempt.Stamp++
attempt.CurrentRetryInterval = nil
if frontendReq.GetResetHeartbeat() {
a.clearHeartbeat(ctx)
}
a.emitOnResetMetrics(metricsHandler)
return &activitypb.ResetActivityExecutionResponse{}, nil
case activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED:
if keepPaused {
// Reset counts but keep the activity paused.
attempt := a.LastAttempt.Get(ctx)
attempt.Count = 1
attempt.Stamp++
attempt.CurrentRetryInterval = nil
if frontendReq.GetResetHeartbeat() {
a.clearHeartbeat(ctx)
}
// keepPaused=false: clear pause state so the dispatch task isn't dropped.
a.PauseState = nil
a.emitOnResetMetrics(metricsHandler)
return &activitypb.ResetActivityExecutionResponse{}, nil
}
fallthrough

case activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED:
if err := TransitionReset.Apply(a, ctx, resetEvent{
req: frontendReq,
scheduleTime: scheduleTime,
Expand Down Expand Up @@ -1073,9 +1073,8 @@ func (a *Activity) recordFailedAttempt(
}

// tryReschedule attempts to reschedule the activity for retry. Returns true if rescheduled, false
// if retry is not possible. When the activity has PauseState set (flag-based pause from STARTED),
// the retry transitions to SCHEDULED normally but the dispatch task is blocked by the pause flag
// until the activity is unpaused.
// if retry is not possible. If a pause request has been received then we transition to Paused;
// otherwise to Scheduled.
func (a *Activity) tryReschedule(
ctx chasm.MutableContext,
overridingRetryInterval time.Duration,
Expand All @@ -1085,14 +1084,15 @@ func (a *Activity) tryReschedule(
if !shouldRetry {
return false, nil
}
return true, TransitionRescheduled.Apply(a, ctx, rescheduleEvent{
retryInterval: retryInterval,
failure: failure,
})
event := rescheduleEvent{retryInterval: retryInterval, failure: failure}
if a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED {
return true, TransitionAttemptFailedToPaused.Apply(a, ctx, event)
}
return true, TransitionRescheduled.Apply(a, ctx, event)
}

func (a *Activity) shouldRetry(ctx chasm.Context, overridingRetryInterval time.Duration) (bool, time.Duration) {
if !TransitionRescheduled.Possible(a) {
if !TransitionRescheduled.Possible(a) && !TransitionAttemptFailedToPaused.Possible(a) {
return false, 0
}
attempt := a.LastAttempt.Get(ctx)
Expand Down Expand Up @@ -1187,7 +1187,7 @@ func (a *Activity) RecordHeartbeat(
}
return &historyservice.RecordActivityTaskHeartbeatResponse{
CancelRequested: a.Status == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED,
ActivityPaused: a.PauseState != nil,
ActivityPaused: a.Status == activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED,
ActivityReset: a.ActivityReset,
}, nil
}
Expand All @@ -1198,7 +1198,8 @@ func InternalStatusToAPIStatus(status activitypb.ActivityExecutionStatus) enumsp
case activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED,
activitypb.ACTIVITY_EXECUTION_STATUS_STARTED,
activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED,
activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED:
activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED,
activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED:
return enumspb.ACTIVITY_EXECUTION_STATUS_RUNNING
case activitypb.ACTIVITY_EXECUTION_STATUS_COMPLETED:
return enumspb.ACTIVITY_EXECUTION_STATUS_COMPLETED
Expand Down Expand Up @@ -1227,6 +1228,8 @@ func internalStatusToRunState(status activitypb.ActivityExecutionStatus) enumspb
return enumspb.PENDING_ACTIVITY_STATE_CANCEL_REQUESTED
case activitypb.ACTIVITY_EXECUTION_STATUS_PAUSED:
return enumspb.PENDING_ACTIVITY_STATE_PAUSED
case activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED:
return enumspb.PENDING_ACTIVITY_STATE_PAUSE_REQUESTED
case activitypb.ACTIVITY_EXECUTION_STATUS_COMPLETED,
activitypb.ACTIVITY_EXECUTION_STATUS_FAILED,
activitypb.ACTIVITY_EXECUTION_STATUS_CANCELED,
Expand All @@ -1241,19 +1244,7 @@ func internalStatusToRunState(status activitypb.ActivityExecutionStatus) enumspb

func (a *Activity) buildActivityExecutionInfo(ctx chasm.Context) *apiactivitypb.ActivityExecutionInfo {
status := InternalStatusToAPIStatus(a.GetStatus())
// Derive the external run state with hybrid pause logic:
// PAUSED status (real) → PAUSED
// STARTED + PauseState != nil (pause requested while running) → PAUSE_REQUESTED
// SCHEDULED + PauseState != nil (retry while paused flag set) → PAUSED
// All other cases → derived from internal status directly
var runState enumspb.PendingActivityState
if a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_STARTED && a.PauseState != nil {
runState = enumspb.PENDING_ACTIVITY_STATE_PAUSE_REQUESTED
} else if a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED && a.PauseState != nil {
runState = enumspb.PENDING_ACTIVITY_STATE_PAUSED
} else {
runState = internalStatusToRunState(a.GetStatus())
}
runState := internalStatusToRunState(a.GetStatus())

requestData := a.RequestData.Get(ctx)
attempt := a.LastAttempt.Get(ctx)
Expand Down Expand Up @@ -1466,7 +1457,8 @@ func (a *Activity) validateActivityTaskToken(
requestNamespaceID string,
) error {
if a.Status != activitypb.ACTIVITY_EXECUTION_STATUS_STARTED &&
a.Status != activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED {
a.Status != activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED &&
a.Status != activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED {
return serviceerror.NewNotFound("activity task not found")
}
if token.Attempt != ByIDTokenAttempt && token.Attempt != a.LastAttempt.Get(ctx).GetCount() {
Expand Down
12 changes: 4 additions & 8 deletions chasm/lib/activity/activity_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,7 @@ func (h *activityDispatchTaskHandler) Validate(
_ chasm.TaskAttributes,
task *activitypb.ActivityDispatchTask,
) (bool, error) {
// Do not dispatch while the activity has a pause flag set (SCHEDULED + PauseState from a retry
// while a STARTED activity was flag-paused). TransitionStarted.Possible already returns false for
// real PAUSED status activities (source must be SCHEDULED, and PAUSED → SCHEDULED via unpause).
return (TransitionStarted.Possible(activity) &&
activity.PauseState == nil &&
task.Stamp == activity.LastAttempt.Get(ctx).GetStamp()), nil
}

Expand Down Expand Up @@ -96,9 +92,7 @@ func (h *scheduleToStartTimeoutTaskHandler) Validate(
_ chasm.TaskAttributes,
task *activitypb.ScheduleToStartTimeoutTask,
) (bool, error) {
// Do not time out a SCHEDULED activity that has the pause flag set (retry while paused).
return (activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED &&
activity.PauseState == nil &&
Comment thread
dandavison marked this conversation as resolved.
task.Stamp == activity.LastAttempt.Get(ctx).GetStamp()), nil
}

Expand Down Expand Up @@ -182,7 +176,8 @@ func (h *startToCloseTimeoutTaskHandler) Validate(
task *activitypb.StartToCloseTimeoutTask,
) (bool, error) {
valid := ((activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_STARTED ||
activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED) &&
activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED ||
activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED) &&
task.Stamp == activity.LastAttempt.Get(ctx).GetStamp())
return valid, nil
}
Expand Down Expand Up @@ -243,7 +238,8 @@ func (h *heartbeatTimeoutTaskHandler) Validate(
// last heartbeat was received after hb_i. If so, we reject this timeout task. Otherwise, the
// Execute function runs and we fail the attempt.
if activity.Status != activitypb.ACTIVITY_EXECUTION_STATUS_STARTED &&
activity.Status != activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED {
activity.Status != activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED &&
activity.Status != activitypb.ACTIVITY_EXECUTION_STATUS_PAUSE_REQUESTED {
return false, nil
}
// Task attempt must still match current attempt.
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading