From cbe63fe245419d4dcfd2f1086a81a153ee4d792a Mon Sep 17 00:00:00 2001 From: sandeep erelli Date: Mon, 18 May 2026 09:42:27 -0700 Subject: [PATCH 1/2] Surface repeated activity retry failures in TemporalReportedProblems Add NumConsecutiveActivityRetryProblemsToTriggerSearchAttribute dynamic config (namespace-scoped, default 0 = disabled). When enabled, the TemporalReportedProblems search attribute gains a "category=ActivityRetryFailed" entry whenever any pending activity's attempt count reaches the configured threshold. The entry is removed automatically when the activity reaches a terminal state (completed, failed with no retry, timed out, or canceled). Activity-retry entries are managed independently of the existing workflow-task-failure entries: a successful WFT no longer wipes activity entries, and vice versa. Implementation details: - UpdateReportedProblemsSearchAttribute now scans pendingActivityInfoIDs and merges activity entries with WFT entries into the keyword list. - RemoveReportedProblemsSearchAttribute clears the WFT state and delegates to UpdateReportedProblemsSearchAttribute so activity entries are preserved when only the WFT side is resolved. - maybeUpdateActivityReportedProblems is called from RetryActivity and from the Add{Completed,Failed,TimedOut,Canceled}Activity methods to keep the SA in sync with activity state changes. - Integration tests cover set+clear behavior and the disabled (0) default. Fixes: https://github.com/temporalio/temporal/issues/10149 Co-Authored-By: Claude Sonnet 4.6 --- common/dynamicconfig/constants.go | 8 + service/history/configs/config.go | 4 +- .../history/workflow/mutable_state_impl.go | 87 ++++++++--- tests/activity_reported_problems_test.go | 140 ++++++++++++++++++ 4 files changed, 214 insertions(+), 25 deletions(-) create mode 100644 tests/activity_reported_problems_test.go diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 30bc93c04a..39fe8a6239 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -321,6 +321,14 @@ operator API calls (highest priority). Should be >0.0 and <= 1.0 (defaults to 20 Setting this to 0 prevents the search attribute from being set when a problem is detected, and unset when the problem is resolved.`, ) + NumConsecutiveActivityRetryProblemsToTriggerSearchAttribute = NewNamespaceIntSetting( + "system.numConsecutiveActivityRetryProblemsToTriggerSearchAttribute", + 0, + `NumConsecutiveActivityRetryProblemsToTriggerSearchAttribute is the minimum number of activity retry +attempts before the TemporalReportedProblems search attribute is updated with activity retry failure +information. Setting this to 0 (the default) disables this feature.`, + ) + PollWaitForNamespaceRateLimitToken = NewNamespaceBoolSetting( "system.pollWaitForNamespaceRateLimitToken", false, diff --git a/service/history/configs/config.go b/service/history/configs/config.go index 57cc6ae1e1..5cbf4060c1 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -421,6 +421,7 @@ type Config struct { MaxLocalParentWorkflowVerificationDuration dynamicconfig.DurationPropertyFn NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute dynamicconfig.IntPropertyFnWithNamespaceFilter + NumConsecutiveActivityRetryProblemsToTriggerSearchAttribute dynamicconfig.IntPropertyFnWithNamespaceFilter // Worker-Versioning related settings EnableSuggestCaNOnNewTargetVersion dynamicconfig.BoolPropertyFnWithNamespaceFilter @@ -806,7 +807,8 @@ func NewConfig( LogAllReqErrors: dynamicconfig.LogAllReqErrors.Get(dc), - NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute: dynamicconfig.NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute.Get(dc), + NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute: dynamicconfig.NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute.Get(dc), + NumConsecutiveActivityRetryProblemsToTriggerSearchAttribute: dynamicconfig.NumConsecutiveActivityRetryProblemsToTriggerSearchAttribute.Get(dc), // Worker-Versioning related UseRevisionNumberForWorkerVersioning: dynamicconfig.UseRevisionNumberForWorkerVersioning.Get(dc), diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index e5d410475c..e1c8750583 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -4403,6 +4403,10 @@ func (ms *MutableStateImpl) AddActivityTaskCompletedEvent( return nil, err } + if err := ms.maybeUpdateActivityReportedProblems(); err != nil { + return nil, err + } + return event, nil } @@ -4453,6 +4457,10 @@ func (ms *MutableStateImpl) AddActivityTaskFailedEvent( return nil, err } + if err := ms.maybeUpdateActivityReportedProblems(); err != nil { + return nil, err + } + return event, nil } @@ -4505,6 +4513,10 @@ func (ms *MutableStateImpl) AddActivityTaskTimedOutEvent( return nil, err } + if err := ms.maybeUpdateActivityReportedProblems(); err != nil { + return nil, err + } + return event, nil } @@ -4642,6 +4654,10 @@ func (ms *MutableStateImpl) AddActivityTaskCanceledEvent( return nil, err } + if err := ms.maybeUpdateActivityReportedProblems(); err != nil { + return nil, err + } + return event, nil } @@ -6506,6 +6522,10 @@ func (ms *MutableStateImpl) RetryActivity( return enumspb.RETRY_STATE_INTERNAL_SERVER_ERROR, err } + if err := ms.maybeUpdateActivityReportedProblems(); err != nil { + return enumspb.RETRY_STATE_INTERNAL_SERVER_ERROR, err + } + // TODO: uncomment once RETRY_STATE_PAUSED is supported // return enumspb.RETRY_STATE_PAUSED, nil return enumspb.RETRY_STATE_IN_PROGRESS, nil @@ -6544,6 +6564,11 @@ func (ms *MutableStateImpl) RetryActivity( if err := ms.taskGenerator.GenerateActivityRetryTasks(ai); err != nil { return enumspb.RETRY_STATE_INTERNAL_SERVER_ERROR, err } + + if err := ms.maybeUpdateActivityReportedProblems(); err != nil { + return enumspb.RETRY_STATE_INTERNAL_SERVER_ERROR, err + } + return enumspb.RETRY_STATE_IN_PROGRESS, nil } @@ -6691,22 +6716,31 @@ func (ms *MutableStateImpl) updatePauseInfoSearchAttribute() error { func (ms *MutableStateImpl) UpdateReportedProblemsSearchAttribute() error { var reportedProblems []string + + // Workflow task failure entries. switch wftFailure := ms.executionInfo.LastWorkflowTaskFailure.(type) { case *persistencespb.WorkflowExecutionInfo_LastWorkflowTaskFailureCause: - reportedProblems = []string{ + reportedProblems = append(reportedProblems, "category=WorkflowTaskFailed", fmt.Sprintf("cause=WorkflowTaskFailedCause%s", wftFailure.LastWorkflowTaskFailureCause.String()), - } + ) case *persistencespb.WorkflowExecutionInfo_LastWorkflowTaskTimedOutType: - reportedProblems = []string{ + reportedProblems = append(reportedProblems, "category=WorkflowTaskTimedOut", fmt.Sprintf("cause=WorkflowTaskTimedOutCause%s", wftFailure.LastWorkflowTaskTimedOutType.String()), - } + ) } - reportedProblemsPayload, err := sadefs.EncodeValue(reportedProblems, enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST) - if err != nil { - return err + // Activity retry failure entry: added when any pending activity has accumulated + // enough retries to meet the configured threshold. + activityThreshold := ms.config.NumConsecutiveActivityRetryProblemsToTriggerSearchAttribute(ms.GetNamespaceEntry().Name().String()) + if activityThreshold > 0 { + for _, ai := range ms.pendingActivityInfoIDs { + if int(ai.Attempt) >= activityThreshold { + reportedProblems = append(reportedProblems, "category=ActivityRetryFailed") + break + } + } } exeInfo := ms.executionInfo @@ -6729,31 +6763,36 @@ func (ms *MutableStateImpl) UpdateReportedProblemsSearchAttribute() error { return nil } - // Log the search attribute change + // Log the search attribute change. ms.logReportedProblemsChange(existingProblems, reportedProblems) - ms.updateSearchAttributes(map[string]*commonpb.Payload{sadefs.TemporalReportedProblems: reportedProblemsPayload}) + if len(reportedProblems) == 0 { + ms.updateSearchAttributes(map[string]*commonpb.Payload{sadefs.TemporalReportedProblems: nil}) + } else { + reportedProblemsPayload, err := sadefs.EncodeValue(reportedProblems, enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST) + if err != nil { + return err + } + ms.updateSearchAttributes(map[string]*commonpb.Payload{sadefs.TemporalReportedProblems: reportedProblemsPayload}) + } return ms.taskGenerator.GenerateUpsertVisibilityTask() } func (ms *MutableStateImpl) RemoveReportedProblemsSearchAttribute() error { - if ms.executionInfo.SearchAttributes == nil { - return nil - } - - temporalReportedProblems := ms.executionInfo.SearchAttributes[sadefs.TemporalReportedProblems] - if temporalReportedProblems == nil { - return nil - } - - // Log the removal of the search attribute - ms.logReportedProblemsChange(ms.decodeReportedProblems(temporalReportedProblems), nil) - + // Clear the WFT failure state and recompute the search attribute. This preserves + // any activity-retry entries that should remain even after a WFT succeeds. ms.executionInfo.LastWorkflowTaskFailure = nil + return ms.UpdateReportedProblemsSearchAttribute() +} - // Just remove the search attribute entirely for now - ms.updateSearchAttributes(map[string]*commonpb.Payload{sadefs.TemporalReportedProblems: nil}) - return ms.taskGenerator.GenerateUpsertVisibilityTask() +// maybeUpdateActivityReportedProblems updates the TemporalReportedProblems search attribute +// for activity retry failures if the feature is enabled. It is a no-op when the threshold +// dynamic config is set to 0 (disabled, the default). +func (ms *MutableStateImpl) maybeUpdateActivityReportedProblems() error { + if ms.config.NumConsecutiveActivityRetryProblemsToTriggerSearchAttribute(ms.GetNamespaceEntry().Name().String()) > 0 { + return ms.UpdateReportedProblemsSearchAttribute() + } + return nil } // logReportedProblemsChange logs changes to the TemporalReportedProblems search attribute diff --git a/tests/activity_reported_problems_test.go b/tests/activity_reported_problems_test.go new file mode 100644 index 0000000000..948d7e6da1 --- /dev/null +++ b/tests/activity_reported_problems_test.go @@ -0,0 +1,140 @@ +package tests + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + sdkclient "go.temporal.io/sdk/client" + "go.temporal.io/sdk/temporal" + "go.temporal.io/sdk/workflow" + "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/searchattribute/sadefs" + "go.temporal.io/server/tests/testcore" +) + +type ActivityReportedProblemsTestSuite struct { + testcore.FunctionalTestBase + activityShouldFail atomic.Bool +} + +func TestActivityReportedProblemsTestSuite(t *testing.T) { + s := new(ActivityReportedProblemsTestSuite) + suite.Run(t, s) +} + +func (s *ActivityReportedProblemsTestSuite) SetupTest() { + s.FunctionalTestBase.SetupTest() + // Trigger after 3 retry attempts (i.e., activity is on its 3rd or higher attempt). + s.OverrideDynamicConfig(dynamicconfig.NumConsecutiveActivityRetryProblemsToTriggerSearchAttribute, 3) +} + +func (s *ActivityReportedProblemsTestSuite) failingActivity() (string, error) { + if s.activityShouldFail.Load() { + return "", temporal.NewApplicationError("forced-activity-failure", "ForcedError") + } + return "done!", nil +} + +// workflowWithRetryingActivity runs an activity with a retry policy and waits for it to finish. +func (s *ActivityReportedProblemsTestSuite) workflowWithRetryingActivity(ctx workflow.Context) (string, error) { + var ret string + err := workflow.ExecuteActivity(workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Second, + RetryPolicy: &temporal.RetryPolicy{ + MaximumAttempts: 10, + }, + }), s.failingActivity).Get(ctx, &ret) + if err != nil { + return "", err + } + return ret, nil +} + +// TestActivityReportedProblems_SetAndClear verifies that the TemporalReportedProblems search +// attribute is set once the activity retry threshold is reached, and cleared when the activity +// eventually succeeds. +func (s *ActivityReportedProblemsTestSuite) TestActivityReportedProblems_SetAndClear() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + s.activityShouldFail.Store(true) + + s.SdkWorker().RegisterWorkflow(s.workflowWithRetryingActivity) + s.SdkWorker().RegisterActivity(s.failingActivity) + + workflowOptions := sdkclient.StartWorkflowOptions{ + ID: testcore.RandomizeStr("wf_id-" + s.T().Name()), + TaskQueue: s.TaskQueue(), + } + + workflowRun, err := s.SdkClient().ExecuteWorkflow(ctx, workflowOptions, s.workflowWithRetryingActivity) + s.NoError(err) + + // Wait until TemporalReportedProblems is set with the activity retry category. + s.EventuallyWithT(func(t *assert.CollectT) { + description, err := s.SdkClient().DescribeWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + require.NoError(t, err) + saVal, ok := description.TypedSearchAttributes.GetKeywordList(temporal.NewSearchAttributeKeyKeywordList(sadefs.TemporalReportedProblems)) + require.True(t, ok) + require.Contains(t, saVal, "category=ActivityRetryFailed") + }, 20*time.Second, 500*time.Millisecond) + + // Unblock the activity so the workflow can complete. + s.activityShouldFail.Store(false) + + var out string + s.NoError(workflowRun.Get(ctx, &out)) + s.Equal("done!", out) + + // After the workflow completes, the search attribute should be gone. + description, err := s.SdkClient().DescribeWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + s.NoError(err) + _, ok := description.TypedSearchAttributes.GetKeywordList(temporal.NewSearchAttributeKeyKeywordList(sadefs.TemporalReportedProblems)) + s.False(ok) +} + +// TestActivityReportedProblems_DisabledByDefault verifies that when the threshold is set to 0 +// (the default), the search attribute is never set even when the activity retries many times. +func (s *ActivityReportedProblemsTestSuite) TestActivityReportedProblems_DisabledByDefault() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // Override to disabled (threshold = 0). + cleanup := s.OverrideDynamicConfig(dynamicconfig.NumConsecutiveActivityRetryProblemsToTriggerSearchAttribute, 0) + defer cleanup() + + s.activityShouldFail.Store(true) + + s.SdkWorker().RegisterWorkflow(s.workflowWithRetryingActivity) + s.SdkWorker().RegisterActivity(s.failingActivity) + + workflowOptions := sdkclient.StartWorkflowOptions{ + ID: testcore.RandomizeStr("wf_id-" + s.T().Name()), + TaskQueue: s.TaskQueue(), + } + + workflowRun, err := s.SdkClient().ExecuteWorkflow(ctx, workflowOptions, s.workflowWithRetryingActivity) + s.NoError(err) + + // Wait for several retries but confirm the SA is never set. + s.EventuallyWithT(func(t *assert.CollectT) { + exec, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + require.NoError(t, err) + // Confirm at least 3 retry attempts have happened. + require.NotEmpty(t, exec.PendingActivities) + require.GreaterOrEqual(t, exec.PendingActivities[0].Attempt, int32(3)) + }, 20*time.Second, 500*time.Millisecond) + + description, err := s.SdkClient().DescribeWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + s.NoError(err) + _, ok := description.TypedSearchAttributes.GetKeywordList(temporal.NewSearchAttributeKeyKeywordList(sadefs.TemporalReportedProblems)) + s.False(ok, "TemporalReportedProblems should not be set when threshold is disabled") + + // Clean up. + s.NoError(s.SdkClient().TerminateWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID(), "test cleanup")) +} From 8e5b4e08613ab5f1fcd7927fe7ccaf246ff71359 Mon Sep 17 00:00:00 2001 From: sandeep erelli Date: Tue, 26 May 2026 16:36:45 -0700 Subject: [PATCH 2/2] Address PR review: gate WFT entries on threshold and clear SA on activity reset - `UpdateReportedProblemsSearchAttribute`: gate WFT entries on NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute so that stale LastWorkflowTaskFailure state is not surfaced when WFT reporting is disabled (threshold=0). - `UpdateActivity`: track prevAttempt and call maybeUpdateActivityReportedProblems when the attempt count changes so that ResetActivity / UnpauseWithReset paths immediately clear the category=ActivityRetryFailed SA entry when retries drop below threshold. - Add TestReportedProblems_MixedWFTAndActivity: documents that with WFT threshold=0 and activity threshold=2, a recompute includes the activity entry but not the WFT entry even when LastWorkflowTaskFailure is set. - Add TestReportedProblems_ActivityClearedAfterAttemptReset: documents that the SA entry is cleared when the pending activity's attempt count drops below the threshold. Co-Authored-By: Claude Sonnet 4.6 --- .../history/workflow/mutable_state_impl.go | 41 +++++-- .../workflow/mutable_state_impl_test.go | 113 ++++++++++++++++++ 2 files changed, 142 insertions(+), 12 deletions(-) diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 41e4e06c28..93ffcaf118 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -6923,16 +6923,27 @@ func (ms *MutableStateImpl) UpdateActivity(scheduledEventId int64, updater histo } prevPause := ai.Paused + prevAttempt := ai.Attempt var originalSize int if prev, ok := ms.pendingActivityInfoIDs[ai.ScheduledEventId]; ok { originalSize = prev.Size() prevPause = prev.Paused + prevAttempt = prev.Attempt } if err := updater(ai, ms); err != nil { return err } + // If the attempt count changed (e.g., reset to 1 via ResetActivity or UnpauseWithReset), + // recompute the reported-problems SA so that entries are cleared when retries no longer + // meet the threshold. + if prevAttempt != ai.Attempt { + if err := ms.maybeUpdateActivityReportedProblems(); err != nil { + return err + } + } + if prevPause != ai.Paused { err := ms.updatePauseInfoSearchAttribute() if err != nil { @@ -7008,18 +7019,24 @@ func (ms *MutableStateImpl) updatePauseInfoSearchAttribute() error { func (ms *MutableStateImpl) UpdateReportedProblemsSearchAttribute() error { var reportedProblems []string - // Workflow task failure entries. - switch wftFailure := ms.executionInfo.LastWorkflowTaskFailure.(type) { - case *persistencespb.WorkflowExecutionInfo_LastWorkflowTaskFailureCause: - reportedProblems = append(reportedProblems, - "category=WorkflowTaskFailed", - fmt.Sprintf("cause=WorkflowTaskFailedCause%s", wftFailure.LastWorkflowTaskFailureCause.String()), - ) - case *persistencespb.WorkflowExecutionInfo_LastWorkflowTaskTimedOutType: - reportedProblems = append(reportedProblems, - "category=WorkflowTaskTimedOut", - fmt.Sprintf("cause=WorkflowTaskTimedOutCause%s", wftFailure.LastWorkflowTaskTimedOutType.String()), - ) + // Workflow task failure entries — only when WFT reporting is enabled (threshold > 0). + // Gating here ensures that if the config is set to 0 (disabled), any stale + // LastWorkflowTaskFailure state (e.g., left over from a previous config value) does + // not bleed into the search attribute. + wftThreshold := ms.config.NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute(ms.GetNamespaceEntry().Name().String()) + if wftThreshold > 0 { + switch wftFailure := ms.executionInfo.LastWorkflowTaskFailure.(type) { + case *persistencespb.WorkflowExecutionInfo_LastWorkflowTaskFailureCause: + reportedProblems = append(reportedProblems, + "category=WorkflowTaskFailed", + fmt.Sprintf("cause=WorkflowTaskFailedCause%s", wftFailure.LastWorkflowTaskFailureCause.String()), + ) + case *persistencespb.WorkflowExecutionInfo_LastWorkflowTaskTimedOutType: + reportedProblems = append(reportedProblems, + "category=WorkflowTaskTimedOut", + fmt.Sprintf("cause=WorkflowTaskTimedOutCause%s", wftFailure.LastWorkflowTaskTimedOutType.String()), + ) + } } // Activity retry failure entry: added when any pending activity has accumulated diff --git a/service/history/workflow/mutable_state_impl_test.go b/service/history/workflow/mutable_state_impl_test.go index 54144a3073..e4bb2dcb0e 100644 --- a/service/history/workflow/mutable_state_impl_test.go +++ b/service/history/workflow/mutable_state_impl_test.go @@ -8360,3 +8360,116 @@ func (s *mutableStateSuite) TestCloseTransactionTimeSkipping_Bound() { s.InDelta(float64(time.Hour), float64(got), float64(time.Millisecond)) }) } + +// newReportedProblemsReproMutableState creates a minimal MutableStateImpl suitable for +// unit-testing UpdateReportedProblemsSearchAttribute. wftThreshold and activityThreshold +// map to NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute and +// NumConsecutiveActivityRetryProblemsToTriggerSearchAttribute respectively. +// The caller is responsible for setting up taskGenerator expectations. +func newReportedProblemsReproMutableState(t *testing.T, wftThreshold, activityThreshold int) (*MutableStateImpl, *MockTaskGenerator) { + t.Helper() + ctrl := gomock.NewController(t) + + cfg := tests.NewDynamicConfig() + cfg.NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute = func(string) int { return wftThreshold } + cfg.NumConsecutiveActivityRetryProblemsToTriggerSearchAttribute = func(string) int { return activityThreshold } + + mockShard := shard.NewTestContext(ctrl, &persistencespb.ShardInfo{ShardId: 0, RangeId: 1}, cfg) + t.Cleanup(mockShard.StopForTest) + + reg := hsm.NewRegistry() + require.NoError(t, RegisterStateMachine(reg)) + mockShard.SetStateMachineRegistry(reg) + + mockEventsCache := events.NewMockCache(ctrl) + mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any()).AnyTimes() + mockShard.SetEventsCacheForTesting(mockEventsCache) + + namespaceEntry := tests.GlobalNamespaceEntry + mockShard.Resource.NamespaceCache.EXPECT().GetNamespaceByID(tests.NamespaceID).Return(namespaceEntry, nil).AnyTimes() + mockShard.Resource.ClusterMetadata.EXPECT().ClusterNameForFailoverVersion(namespaceEntry.IsGlobalNamespace(), namespaceEntry.FailoverVersion(tests.WorkflowID)).Return(cluster.TestCurrentClusterName).AnyTimes() + mockShard.Resource.ClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes() + mockShard.Resource.ClusterMetadata.EXPECT().GetClusterID().Return(int64(1)).AnyTimes() + + ms := NewMutableState(mockShard, mockEventsCache, mockShard.GetLogger(), namespaceEntry, tests.WorkflowID, tests.RunID, time.Now().UTC()) + + mockTaskGen := NewMockTaskGenerator(ctrl) + ms.taskGenerator = mockTaskGen + + return ms, mockTaskGen +} + +// reportedProblemsForRepro decodes the TemporalReportedProblems keyword-list from the +// mutable state's search attributes and returns the entries as a string slice. Returns nil +// if the attribute is absent. +func reportedProblemsForRepro(t *testing.T, ms *MutableStateImpl) []string { + t.Helper() + payload := ms.executionInfo.SearchAttributes[sadefs.TemporalReportedProblems] + if payload == nil { + return nil + } + decoded, err := sadefs.DecodeValue(payload, enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST, false) + require.NoError(t, err) + problems, ok := decoded.([]string) + require.True(t, ok, "expected []string from TemporalReportedProblems payload") + return problems +} + +// TestReportedProblems_MixedWFTAndActivity verifies that when WFT reporting is disabled +// (threshold = 0) but activity retry reporting is enabled (threshold = 2), a recompute that +// finds a pending activity at attempt 2 includes the activity entry but does NOT include a +// WFT entry — even when LastWorkflowTaskFailure is non-nil (stale state from a prior config). +func TestReportedProblems_MixedWFTAndActivity(t *testing.T) { + ms, taskGen := newReportedProblemsReproMutableState(t, 0 /* wftThreshold */, 2 /* activityThreshold */) + + // Simulate stale LastWorkflowTaskFailure state (e.g., left over from when the WFT threshold + // was previously > 0). + ms.executionInfo.LastWorkflowTaskFailure = &persistencespb.WorkflowExecutionInfo_LastWorkflowTaskFailureCause{ + LastWorkflowTaskFailureCause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE, + } + + // Activity has reached the retry threshold. + ms.pendingActivityInfoIDs[1] = &persistencespb.ActivityInfo{ + ScheduledEventId: 1, + ActivityId: "activity", + Attempt: 2, + } + + taskGen.EXPECT().GenerateUpsertVisibilityTask().Return(nil) + require.NoError(t, ms.UpdateReportedProblemsSearchAttribute()) + + problems := reportedProblemsForRepro(t, ms) + require.Contains(t, problems, "category=ActivityRetryFailed", + "activity entry should appear when activity threshold is met") + require.NotContains(t, problems, "category=WorkflowTaskFailed", + "WFT entry should not appear when WFT threshold is 0 (disabled), even with stale LastWorkflowTaskFailure") +} + +// TestReportedProblems_ActivityClearedAfterAttemptReset verifies that the +// category=ActivityRetryFailed entry is removed from TemporalReportedProblems when an +// activity's attempt count drops below the threshold (e.g., via ResetActivity or +// UnpauseWithReset). UpdateActivity triggers this recompute whenever the attempt changes. +func TestReportedProblems_ActivityClearedAfterAttemptReset(t *testing.T) { + ms, taskGen := newReportedProblemsReproMutableState(t, 0 /* wftThreshold */, 2 /* activityThreshold */) + + // Prime the SA: activity at attempt 2 meets the threshold. + ms.pendingActivityInfoIDs[1] = &persistencespb.ActivityInfo{ + ScheduledEventId: 1, + ActivityId: "activity", + Attempt: 2, + } + taskGen.EXPECT().GenerateUpsertVisibilityTask().Return(nil) + require.NoError(t, ms.UpdateReportedProblemsSearchAttribute()) + require.Contains(t, reportedProblemsForRepro(t, ms), "category=ActivityRetryFailed", + "precondition: SA entry should be present before reset") + + // Simulate what UpdateActivity does when attempt changes (e.g., ResetActivity sets Attempt=1). + ms.pendingActivityInfoIDs[1].Attempt = 1 + + taskGen.EXPECT().GenerateUpsertVisibilityTask().Return(nil) + require.NoError(t, ms.maybeUpdateActivityReportedProblems()) + + problems := reportedProblemsForRepro(t, ms) + require.NotContains(t, problems, "category=ActivityRetryFailed", + "activity entry should be cleared once attempt count drops below threshold") +}