Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,14 @@ operator API calls (highest priority). Should be >0.0 and <= 1.0 (defaults to 20
Setting this to 0 prevents the search attribute from being set when a problem is detected, and unset when the problem is resolved.`,
)

NumConsecutiveActivityRetryProblemsToTriggerSearchAttribute = NewNamespaceIntSetting(
"system.numConsecutiveActivityRetryProblemsToTriggerSearchAttribute",
0,
`NumConsecutiveActivityRetryProblemsToTriggerSearchAttribute is the minimum number of activity retry
attempts before the TemporalReportedProblems search attribute is updated with activity retry failure
information. Setting this to 0 (the default) disables this feature.`,
)

PollWaitForNamespaceRateLimitToken = NewNamespaceBoolSetting(
"system.pollWaitForNamespaceRateLimitToken",
false,
Expand Down
4 changes: 3 additions & 1 deletion service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ type Config struct {
MaxLocalParentWorkflowVerificationDuration dynamicconfig.DurationPropertyFn

NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute dynamicconfig.IntPropertyFnWithNamespaceFilter
NumConsecutiveActivityRetryProblemsToTriggerSearchAttribute dynamicconfig.IntPropertyFnWithNamespaceFilter

// Worker-Versioning related settings
EnableSuggestCaNOnNewTargetVersion dynamicconfig.BoolPropertyFnWithNamespaceFilter
Expand Down Expand Up @@ -810,7 +811,8 @@ func NewConfig(

LogAllReqErrors: dynamicconfig.LogAllReqErrors.Get(dc),

NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute: dynamicconfig.NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute.Get(dc),
NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute: dynamicconfig.NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute.Get(dc),
NumConsecutiveActivityRetryProblemsToTriggerSearchAttribute: dynamicconfig.NumConsecutiveActivityRetryProblemsToTriggerSearchAttribute.Get(dc),

// Worker-Versioning related
UseRevisionNumberForWorkerVersioning: dynamicconfig.UseRevisionNumberForWorkerVersioning.Get(dc),
Expand Down
116 changes: 86 additions & 30 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -4491,6 +4491,10 @@ func (ms *MutableStateImpl) AddActivityTaskCompletedEvent(
return nil, err
}

if err := ms.maybeUpdateActivityReportedProblems(); err != nil {
return nil, err
}

return event, nil
}

Expand Down Expand Up @@ -4541,6 +4545,10 @@ func (ms *MutableStateImpl) AddActivityTaskFailedEvent(
return nil, err
}

if err := ms.maybeUpdateActivityReportedProblems(); err != nil {
return nil, err
}

return event, nil
}

Expand Down Expand Up @@ -4593,6 +4601,10 @@ func (ms *MutableStateImpl) AddActivityTaskTimedOutEvent(
return nil, err
}

if err := ms.maybeUpdateActivityReportedProblems(); err != nil {
return nil, err
}

return event, nil
}

Expand Down Expand Up @@ -4804,6 +4816,10 @@ func (ms *MutableStateImpl) AddActivityTaskCanceledEvent(
return nil, err
}

if err := ms.maybeUpdateActivityReportedProblems(); err != nil {
return nil, err
}

return event, nil
}

Expand Down Expand Up @@ -6793,6 +6809,10 @@ func (ms *MutableStateImpl) RetryActivity(
return enumspb.RETRY_STATE_INTERNAL_SERVER_ERROR, err
}

if err := ms.maybeUpdateActivityReportedProblems(); err != nil {
return enumspb.RETRY_STATE_INTERNAL_SERVER_ERROR, err
}

// TODO: uncomment once RETRY_STATE_PAUSED is supported
// return enumspb.RETRY_STATE_PAUSED, nil
return enumspb.RETRY_STATE_IN_PROGRESS, nil
Expand Down Expand Up @@ -6831,6 +6851,11 @@ func (ms *MutableStateImpl) RetryActivity(
if err := ms.taskGenerator.GenerateActivityRetryTasks(ai); err != nil {
return enumspb.RETRY_STATE_INTERNAL_SERVER_ERROR, err
}

if err := ms.maybeUpdateActivityReportedProblems(); err != nil {
return enumspb.RETRY_STATE_INTERNAL_SERVER_ERROR, err
}

return enumspb.RETRY_STATE_IN_PROGRESS, nil
}

Expand Down Expand Up @@ -6894,16 +6919,27 @@ func (ms *MutableStateImpl) UpdateActivity(scheduledEventId int64, updater histo
}

prevPause := ai.Paused
prevAttempt := ai.Attempt
var originalSize int
if prev, ok := ms.pendingActivityInfoIDs[ai.ScheduledEventId]; ok {
originalSize = prev.Size()
prevPause = prev.Paused
prevAttempt = prev.Attempt
}

if err := updater(ai, ms); err != nil {
return err
}

// If the attempt count changed (e.g., reset to 1 via ResetActivity or UnpauseWithReset),
// recompute the reported-problems SA so that entries are cleared when retries no longer
// meet the threshold.
if prevAttempt != ai.Attempt {
if err := ms.maybeUpdateActivityReportedProblems(); err != nil {
return err
}
}

if prevPause != ai.Paused {
err := ms.updatePauseInfoSearchAttribute()
if err != nil {
Expand Down Expand Up @@ -6978,22 +7014,37 @@ func (ms *MutableStateImpl) updatePauseInfoSearchAttribute() error {

func (ms *MutableStateImpl) UpdateReportedProblemsSearchAttribute() error {
var reportedProblems []string
switch wftFailure := ms.executionInfo.LastWorkflowTaskFailure.(type) {
case *persistencespb.WorkflowExecutionInfo_LastWorkflowTaskFailureCause:
reportedProblems = []string{
"category=WorkflowTaskFailed",
fmt.Sprintf("cause=WorkflowTaskFailedCause%s", wftFailure.LastWorkflowTaskFailureCause.String()),
}
case *persistencespb.WorkflowExecutionInfo_LastWorkflowTaskTimedOutType:
reportedProblems = []string{
"category=WorkflowTaskTimedOut",
fmt.Sprintf("cause=WorkflowTaskTimedOutCause%s", wftFailure.LastWorkflowTaskTimedOutType.String()),

// Workflow task failure entries — only when WFT reporting is enabled (threshold > 0).
// Gating here ensures that if the config is set to 0 (disabled), any stale
// LastWorkflowTaskFailure state (e.g., left over from a previous config value) does
// not bleed into the search attribute.
wftThreshold := ms.config.NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute(ms.GetNamespaceEntry().Name().String())
if wftThreshold > 0 {
switch wftFailure := ms.executionInfo.LastWorkflowTaskFailure.(type) {
case *persistencespb.WorkflowExecutionInfo_LastWorkflowTaskFailureCause:
reportedProblems = append(reportedProblems,
"category=WorkflowTaskFailed",
fmt.Sprintf("cause=WorkflowTaskFailedCause%s", wftFailure.LastWorkflowTaskFailureCause.String()),
)
case *persistencespb.WorkflowExecutionInfo_LastWorkflowTaskTimedOutType:
reportedProblems = append(reportedProblems,
"category=WorkflowTaskTimedOut",
fmt.Sprintf("cause=WorkflowTaskTimedOutCause%s", wftFailure.LastWorkflowTaskTimedOutType.String()),
)
}
}

reportedProblemsPayload, err := sadefs.EncodeValue(reportedProblems, enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST)
if err != nil {
return err
// Activity retry failure entry: added when any pending activity has accumulated
// enough retries to meet the configured threshold.
activityThreshold := ms.config.NumConsecutiveActivityRetryProblemsToTriggerSearchAttribute(ms.GetNamespaceEntry().Name().String())
if activityThreshold > 0 {
for _, ai := range ms.pendingActivityInfoIDs {
if int(ai.Attempt) >= activityThreshold {
reportedProblems = append(reportedProblems, "category=ActivityRetryFailed")
break
}
}
}

exeInfo := ms.executionInfo
Expand All @@ -7016,31 +7067,36 @@ func (ms *MutableStateImpl) UpdateReportedProblemsSearchAttribute() error {
return nil
}

// Log the search attribute change
// Log the search attribute change.
ms.logReportedProblemsChange(existingProblems, reportedProblems)

ms.updateSearchAttributes(map[string]*commonpb.Payload{sadefs.TemporalReportedProblems: reportedProblemsPayload})
if len(reportedProblems) == 0 {
ms.updateSearchAttributes(map[string]*commonpb.Payload{sadefs.TemporalReportedProblems: nil})
} else {
reportedProblemsPayload, err := sadefs.EncodeValue(reportedProblems, enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST)
if err != nil {
return err
}
ms.updateSearchAttributes(map[string]*commonpb.Payload{sadefs.TemporalReportedProblems: reportedProblemsPayload})
}
return ms.taskGenerator.GenerateUpsertVisibilityTask()
}

func (ms *MutableStateImpl) RemoveReportedProblemsSearchAttribute() error {
if ms.executionInfo.SearchAttributes == nil {
return nil
}

temporalReportedProblems := ms.executionInfo.SearchAttributes[sadefs.TemporalReportedProblems]
if temporalReportedProblems == nil {
return nil
}

// Log the removal of the search attribute
ms.logReportedProblemsChange(ms.decodeReportedProblems(temporalReportedProblems), nil)

// Clear the WFT failure state and recompute the search attribute. This preserves
// any activity-retry entries that should remain even after a WFT succeeds.
ms.executionInfo.LastWorkflowTaskFailure = nil
return ms.UpdateReportedProblemsSearchAttribute()
}

// Just remove the search attribute entirely for now
ms.updateSearchAttributes(map[string]*commonpb.Payload{sadefs.TemporalReportedProblems: nil})
return ms.taskGenerator.GenerateUpsertVisibilityTask()
// maybeUpdateActivityReportedProblems updates the TemporalReportedProblems search attribute
// for activity retry failures if the feature is enabled. It is a no-op when the threshold
// dynamic config is set to 0 (disabled, the default).
func (ms *MutableStateImpl) maybeUpdateActivityReportedProblems() error {
if ms.config.NumConsecutiveActivityRetryProblemsToTriggerSearchAttribute(ms.GetNamespaceEntry().Name().String()) > 0 {
return ms.UpdateReportedProblemsSearchAttribute()
}
return nil
}

// logReportedProblemsChange logs changes to the TemporalReportedProblems search attribute
Expand Down
112 changes: 112 additions & 0 deletions service/history/workflow/mutable_state_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8671,6 +8671,118 @@ func (s *mutableStateSuite) TestCloseTransactionTimeSkipping_Bound() {
})
}

// newReportedProblemsReproMutableState creates a minimal MutableStateImpl suitable for
// unit-testing UpdateReportedProblemsSearchAttribute. wftThreshold and activityThreshold
// map to NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute and
// NumConsecutiveActivityRetryProblemsToTriggerSearchAttribute respectively.
// The caller is responsible for setting up taskGenerator expectations.
func newReportedProblemsReproMutableState(t *testing.T, wftThreshold, activityThreshold int) (*MutableStateImpl, *MockTaskGenerator) {
t.Helper()
ctrl := gomock.NewController(t)

cfg := tests.NewDynamicConfig()
cfg.NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute = func(string) int { return wftThreshold }
cfg.NumConsecutiveActivityRetryProblemsToTriggerSearchAttribute = func(string) int { return activityThreshold }

mockShard := shard.NewTestContext(ctrl, &persistencespb.ShardInfo{ShardId: 0, RangeId: 1}, cfg)
t.Cleanup(mockShard.StopForTest)

reg := hsm.NewRegistry()
require.NoError(t, RegisterStateMachine(reg))
mockShard.SetStateMachineRegistry(reg)

mockEventsCache := events.NewMockCache(ctrl)
mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any()).AnyTimes()
mockShard.SetEventsCacheForTesting(mockEventsCache)

namespaceEntry := tests.GlobalNamespaceEntry
mockShard.Resource.NamespaceCache.EXPECT().GetNamespaceByID(tests.NamespaceID).Return(namespaceEntry, nil).AnyTimes()
mockShard.Resource.ClusterMetadata.EXPECT().ClusterNameForFailoverVersion(namespaceEntry.IsGlobalNamespace(), namespaceEntry.FailoverVersion(tests.WorkflowID)).Return(cluster.TestCurrentClusterName).AnyTimes()
mockShard.Resource.ClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()
mockShard.Resource.ClusterMetadata.EXPECT().GetClusterID().Return(int64(1)).AnyTimes()

ms := NewMutableState(mockShard, mockEventsCache, mockShard.GetLogger(), namespaceEntry, tests.WorkflowID, tests.RunID, time.Now().UTC())

mockTaskGen := NewMockTaskGenerator(ctrl)
ms.taskGenerator = mockTaskGen

return ms, mockTaskGen
}

// reportedProblemsForRepro decodes the TemporalReportedProblems keyword-list from the
// mutable state's search attributes and returns the entries as a string slice. Returns nil
// if the attribute is absent.
func reportedProblemsForRepro(t *testing.T, ms *MutableStateImpl) []string {
t.Helper()
payload := ms.executionInfo.SearchAttributes[sadefs.TemporalReportedProblems]
if payload == nil {
return nil
}
decoded, err := sadefs.DecodeValue(payload, enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST, false)
require.NoError(t, err)
problems, ok := decoded.([]string)
require.True(t, ok, "expected []string from TemporalReportedProblems payload")
return problems
}

// TestReportedProblems_MixedWFTAndActivity verifies that when WFT reporting is disabled
// (threshold = 0) but activity retry reporting is enabled (threshold = 2), a recompute that
// finds a pending activity at attempt 2 includes the activity entry but does NOT include a
// WFT entry — even when LastWorkflowTaskFailure is non-nil (stale state from a prior config).
func TestReportedProblems_MixedWFTAndActivity(t *testing.T) {
ms, taskGen := newReportedProblemsReproMutableState(t, 0 /* wftThreshold */, 2 /* activityThreshold */)

// Simulate stale LastWorkflowTaskFailure state (e.g., left over from when the WFT threshold
// was previously > 0).
ms.executionInfo.LastWorkflowTaskFailure = &persistencespb.WorkflowExecutionInfo_LastWorkflowTaskFailureCause{
LastWorkflowTaskFailureCause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE,
}

// Activity has reached the retry threshold.
ms.pendingActivityInfoIDs[1] = &persistencespb.ActivityInfo{
ScheduledEventId: 1,
ActivityId: "activity",
Attempt: 2,
}

taskGen.EXPECT().GenerateUpsertVisibilityTask().Return(nil)
require.NoError(t, ms.UpdateReportedProblemsSearchAttribute())

problems := reportedProblemsForRepro(t, ms)
require.Contains(t, problems, "category=ActivityRetryFailed",
"activity entry should appear when activity threshold is met")
require.NotContains(t, problems, "category=WorkflowTaskFailed",
"WFT entry should not appear when WFT threshold is 0 (disabled), even with stale LastWorkflowTaskFailure")
}

// TestReportedProblems_ActivityClearedAfterAttemptReset verifies that the
// category=ActivityRetryFailed entry is removed from TemporalReportedProblems when an
// activity's attempt count drops below the threshold (e.g., via ResetActivity or
// UnpauseWithReset). UpdateActivity triggers this recompute whenever the attempt changes.
func TestReportedProblems_ActivityClearedAfterAttemptReset(t *testing.T) {
ms, taskGen := newReportedProblemsReproMutableState(t, 0 /* wftThreshold */, 2 /* activityThreshold */)

// Prime the SA: activity at attempt 2 meets the threshold.
ms.pendingActivityInfoIDs[1] = &persistencespb.ActivityInfo{
ScheduledEventId: 1,
ActivityId: "activity",
Attempt: 2,
}
taskGen.EXPECT().GenerateUpsertVisibilityTask().Return(nil)
require.NoError(t, ms.UpdateReportedProblemsSearchAttribute())
require.Contains(t, reportedProblemsForRepro(t, ms), "category=ActivityRetryFailed",
"precondition: SA entry should be present before reset")

// Simulate what UpdateActivity does when attempt changes (e.g., ResetActivity sets Attempt=1).
ms.pendingActivityInfoIDs[1].Attempt = 1

taskGen.EXPECT().GenerateUpsertVisibilityTask().Return(nil)
require.NoError(t, ms.maybeUpdateActivityReportedProblems())

problems := reportedProblemsForRepro(t, ms)
require.NotContains(t, problems, "category=ActivityRetryFailed",
"activity entry should be cleared once attempt count drops below threshold")

// TestToRealTime tests ms.ToRealTime() exhaustively as this function is also used by executions that don't
// use time skipping and need to be tested thoroughly. This function converts virtual time to wall-clock time.
func (s *mutableStateSuite) TestToRealTime() {
Expand Down
Loading