diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 76fd9d44f0..6029ee9189 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -321,6 +321,14 @@ operator API calls (highest priority). Should be >0.0 and <= 1.0 (defaults to 20 Setting this to 0 prevents the search attribute from being set when a problem is detected, and unset when the problem is resolved.`, ) + NumConsecutiveActivityRetryProblemsToTriggerSearchAttribute = NewNamespaceIntSetting( + "system.numConsecutiveActivityRetryProblemsToTriggerSearchAttribute", + 0, + `NumConsecutiveActivityRetryProblemsToTriggerSearchAttribute is the minimum number of activity retry +attempts before the TemporalReportedProblems search attribute is updated with activity retry failure +information. Setting this to 0 (the default) disables this feature.`, + ) + PollWaitForNamespaceRateLimitToken = NewNamespaceBoolSetting( "system.pollWaitForNamespaceRateLimitToken", false, diff --git a/service/history/configs/config.go b/service/history/configs/config.go index e8e7c592eb..f2004924c4 100644 --- a/service/history/configs/config.go +++ b/service/history/configs/config.go @@ -423,6 +423,7 @@ type Config struct { MaxLocalParentWorkflowVerificationDuration dynamicconfig.DurationPropertyFn NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute dynamicconfig.IntPropertyFnWithNamespaceFilter + NumConsecutiveActivityRetryProblemsToTriggerSearchAttribute dynamicconfig.IntPropertyFnWithNamespaceFilter // Worker-Versioning related settings EnableSuggestCaNOnNewTargetVersion dynamicconfig.BoolPropertyFnWithNamespaceFilter @@ -810,7 +811,8 @@ func NewConfig( LogAllReqErrors: dynamicconfig.LogAllReqErrors.Get(dc), - NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute: dynamicconfig.NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute.Get(dc), + NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute: dynamicconfig.NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute.Get(dc), + NumConsecutiveActivityRetryProblemsToTriggerSearchAttribute: dynamicconfig.NumConsecutiveActivityRetryProblemsToTriggerSearchAttribute.Get(dc), // Worker-Versioning related UseRevisionNumberForWorkerVersioning: dynamicconfig.UseRevisionNumberForWorkerVersioning.Get(dc), diff --git a/service/history/workflow/mutable_state_impl.go b/service/history/workflow/mutable_state_impl.go index 2b63a67331..4defd8f971 100644 --- a/service/history/workflow/mutable_state_impl.go +++ b/service/history/workflow/mutable_state_impl.go @@ -4491,6 +4491,10 @@ func (ms *MutableStateImpl) AddActivityTaskCompletedEvent( return nil, err } + if err := ms.maybeUpdateActivityReportedProblems(); err != nil { + return nil, err + } + return event, nil } @@ -4541,6 +4545,10 @@ func (ms *MutableStateImpl) AddActivityTaskFailedEvent( return nil, err } + if err := ms.maybeUpdateActivityReportedProblems(); err != nil { + return nil, err + } + return event, nil } @@ -4593,6 +4601,10 @@ func (ms *MutableStateImpl) AddActivityTaskTimedOutEvent( return nil, err } + if err := ms.maybeUpdateActivityReportedProblems(); err != nil { + return nil, err + } + return event, nil } @@ -4804,6 +4816,10 @@ func (ms *MutableStateImpl) AddActivityTaskCanceledEvent( return nil, err } + if err := ms.maybeUpdateActivityReportedProblems(); err != nil { + return nil, err + } + return event, nil } @@ -6793,6 +6809,10 @@ func (ms *MutableStateImpl) RetryActivity( return enumspb.RETRY_STATE_INTERNAL_SERVER_ERROR, err } + if err := ms.maybeUpdateActivityReportedProblems(); err != nil { + return enumspb.RETRY_STATE_INTERNAL_SERVER_ERROR, err + } + // TODO: uncomment once RETRY_STATE_PAUSED is supported // return enumspb.RETRY_STATE_PAUSED, nil return enumspb.RETRY_STATE_IN_PROGRESS, nil @@ -6831,6 +6851,11 @@ func (ms *MutableStateImpl) RetryActivity( if err := ms.taskGenerator.GenerateActivityRetryTasks(ai); err != nil { return enumspb.RETRY_STATE_INTERNAL_SERVER_ERROR, err } + + if err := ms.maybeUpdateActivityReportedProblems(); err != nil { + return enumspb.RETRY_STATE_INTERNAL_SERVER_ERROR, err + } + return enumspb.RETRY_STATE_IN_PROGRESS, nil } @@ -6894,16 +6919,27 @@ func (ms *MutableStateImpl) UpdateActivity(scheduledEventId int64, updater histo } prevPause := ai.Paused + prevAttempt := ai.Attempt var originalSize int if prev, ok := ms.pendingActivityInfoIDs[ai.ScheduledEventId]; ok { originalSize = prev.Size() prevPause = prev.Paused + prevAttempt = prev.Attempt } if err := updater(ai, ms); err != nil { return err } + // If the attempt count changed (e.g., reset to 1 via ResetActivity or UnpauseWithReset), + // recompute the reported-problems SA so that entries are cleared when retries no longer + // meet the threshold. + if prevAttempt != ai.Attempt { + if err := ms.maybeUpdateActivityReportedProblems(); err != nil { + return err + } + } + if prevPause != ai.Paused { err := ms.updatePauseInfoSearchAttribute() if err != nil { @@ -6978,22 +7014,37 @@ func (ms *MutableStateImpl) updatePauseInfoSearchAttribute() error { func (ms *MutableStateImpl) UpdateReportedProblemsSearchAttribute() error { var reportedProblems []string - switch wftFailure := ms.executionInfo.LastWorkflowTaskFailure.(type) { - case *persistencespb.WorkflowExecutionInfo_LastWorkflowTaskFailureCause: - reportedProblems = []string{ - "category=WorkflowTaskFailed", - fmt.Sprintf("cause=WorkflowTaskFailedCause%s", wftFailure.LastWorkflowTaskFailureCause.String()), - } - case *persistencespb.WorkflowExecutionInfo_LastWorkflowTaskTimedOutType: - reportedProblems = []string{ - "category=WorkflowTaskTimedOut", - fmt.Sprintf("cause=WorkflowTaskTimedOutCause%s", wftFailure.LastWorkflowTaskTimedOutType.String()), + + // Workflow task failure entries — only when WFT reporting is enabled (threshold > 0). + // Gating here ensures that if the config is set to 0 (disabled), any stale + // LastWorkflowTaskFailure state (e.g., left over from a previous config value) does + // not bleed into the search attribute. + wftThreshold := ms.config.NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute(ms.GetNamespaceEntry().Name().String()) + if wftThreshold > 0 { + switch wftFailure := ms.executionInfo.LastWorkflowTaskFailure.(type) { + case *persistencespb.WorkflowExecutionInfo_LastWorkflowTaskFailureCause: + reportedProblems = append(reportedProblems, + "category=WorkflowTaskFailed", + fmt.Sprintf("cause=WorkflowTaskFailedCause%s", wftFailure.LastWorkflowTaskFailureCause.String()), + ) + case *persistencespb.WorkflowExecutionInfo_LastWorkflowTaskTimedOutType: + reportedProblems = append(reportedProblems, + "category=WorkflowTaskTimedOut", + fmt.Sprintf("cause=WorkflowTaskTimedOutCause%s", wftFailure.LastWorkflowTaskTimedOutType.String()), + ) } } - reportedProblemsPayload, err := sadefs.EncodeValue(reportedProblems, enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST) - if err != nil { - return err + // Activity retry failure entry: added when any pending activity has accumulated + // enough retries to meet the configured threshold. + activityThreshold := ms.config.NumConsecutiveActivityRetryProblemsToTriggerSearchAttribute(ms.GetNamespaceEntry().Name().String()) + if activityThreshold > 0 { + for _, ai := range ms.pendingActivityInfoIDs { + if int(ai.Attempt) >= activityThreshold { + reportedProblems = append(reportedProblems, "category=ActivityRetryFailed") + break + } + } } exeInfo := ms.executionInfo @@ -7016,31 +7067,36 @@ func (ms *MutableStateImpl) UpdateReportedProblemsSearchAttribute() error { return nil } - // Log the search attribute change + // Log the search attribute change. ms.logReportedProblemsChange(existingProblems, reportedProblems) - ms.updateSearchAttributes(map[string]*commonpb.Payload{sadefs.TemporalReportedProblems: reportedProblemsPayload}) + if len(reportedProblems) == 0 { + ms.updateSearchAttributes(map[string]*commonpb.Payload{sadefs.TemporalReportedProblems: nil}) + } else { + reportedProblemsPayload, err := sadefs.EncodeValue(reportedProblems, enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST) + if err != nil { + return err + } + ms.updateSearchAttributes(map[string]*commonpb.Payload{sadefs.TemporalReportedProblems: reportedProblemsPayload}) + } return ms.taskGenerator.GenerateUpsertVisibilityTask() } func (ms *MutableStateImpl) RemoveReportedProblemsSearchAttribute() error { - if ms.executionInfo.SearchAttributes == nil { - return nil - } - - temporalReportedProblems := ms.executionInfo.SearchAttributes[sadefs.TemporalReportedProblems] - if temporalReportedProblems == nil { - return nil - } - - // Log the removal of the search attribute - ms.logReportedProblemsChange(ms.decodeReportedProblems(temporalReportedProblems), nil) - + // Clear the WFT failure state and recompute the search attribute. This preserves + // any activity-retry entries that should remain even after a WFT succeeds. ms.executionInfo.LastWorkflowTaskFailure = nil + return ms.UpdateReportedProblemsSearchAttribute() +} - // Just remove the search attribute entirely for now - ms.updateSearchAttributes(map[string]*commonpb.Payload{sadefs.TemporalReportedProblems: nil}) - return ms.taskGenerator.GenerateUpsertVisibilityTask() +// maybeUpdateActivityReportedProblems updates the TemporalReportedProblems search attribute +// for activity retry failures if the feature is enabled. It is a no-op when the threshold +// dynamic config is set to 0 (disabled, the default). +func (ms *MutableStateImpl) maybeUpdateActivityReportedProblems() error { + if ms.config.NumConsecutiveActivityRetryProblemsToTriggerSearchAttribute(ms.GetNamespaceEntry().Name().String()) > 0 { + return ms.UpdateReportedProblemsSearchAttribute() + } + return nil } // logReportedProblemsChange logs changes to the TemporalReportedProblems search attribute diff --git a/service/history/workflow/mutable_state_impl_test.go b/service/history/workflow/mutable_state_impl_test.go index 4c5c6b942f..484ca320b8 100644 --- a/service/history/workflow/mutable_state_impl_test.go +++ b/service/history/workflow/mutable_state_impl_test.go @@ -8671,6 +8671,118 @@ func (s *mutableStateSuite) TestCloseTransactionTimeSkipping_Bound() { }) } +// newReportedProblemsReproMutableState creates a minimal MutableStateImpl suitable for +// unit-testing UpdateReportedProblemsSearchAttribute. wftThreshold and activityThreshold +// map to NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute and +// NumConsecutiveActivityRetryProblemsToTriggerSearchAttribute respectively. +// The caller is responsible for setting up taskGenerator expectations. +func newReportedProblemsReproMutableState(t *testing.T, wftThreshold, activityThreshold int) (*MutableStateImpl, *MockTaskGenerator) { + t.Helper() + ctrl := gomock.NewController(t) + + cfg := tests.NewDynamicConfig() + cfg.NumConsecutiveWorkflowTaskProblemsToTriggerSearchAttribute = func(string) int { return wftThreshold } + cfg.NumConsecutiveActivityRetryProblemsToTriggerSearchAttribute = func(string) int { return activityThreshold } + + mockShard := shard.NewTestContext(ctrl, &persistencespb.ShardInfo{ShardId: 0, RangeId: 1}, cfg) + t.Cleanup(mockShard.StopForTest) + + reg := hsm.NewRegistry() + require.NoError(t, RegisterStateMachine(reg)) + mockShard.SetStateMachineRegistry(reg) + + mockEventsCache := events.NewMockCache(ctrl) + mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any()).AnyTimes() + mockShard.SetEventsCacheForTesting(mockEventsCache) + + namespaceEntry := tests.GlobalNamespaceEntry + mockShard.Resource.NamespaceCache.EXPECT().GetNamespaceByID(tests.NamespaceID).Return(namespaceEntry, nil).AnyTimes() + mockShard.Resource.ClusterMetadata.EXPECT().ClusterNameForFailoverVersion(namespaceEntry.IsGlobalNamespace(), namespaceEntry.FailoverVersion(tests.WorkflowID)).Return(cluster.TestCurrentClusterName).AnyTimes() + mockShard.Resource.ClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes() + mockShard.Resource.ClusterMetadata.EXPECT().GetClusterID().Return(int64(1)).AnyTimes() + + ms := NewMutableState(mockShard, mockEventsCache, mockShard.GetLogger(), namespaceEntry, tests.WorkflowID, tests.RunID, time.Now().UTC()) + + mockTaskGen := NewMockTaskGenerator(ctrl) + ms.taskGenerator = mockTaskGen + + return ms, mockTaskGen +} + +// reportedProblemsForRepro decodes the TemporalReportedProblems keyword-list from the +// mutable state's search attributes and returns the entries as a string slice. Returns nil +// if the attribute is absent. +func reportedProblemsForRepro(t *testing.T, ms *MutableStateImpl) []string { + t.Helper() + payload := ms.executionInfo.SearchAttributes[sadefs.TemporalReportedProblems] + if payload == nil { + return nil + } + decoded, err := sadefs.DecodeValue(payload, enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST, false) + require.NoError(t, err) + problems, ok := decoded.([]string) + require.True(t, ok, "expected []string from TemporalReportedProblems payload") + return problems +} + +// TestReportedProblems_MixedWFTAndActivity verifies that when WFT reporting is disabled +// (threshold = 0) but activity retry reporting is enabled (threshold = 2), a recompute that +// finds a pending activity at attempt 2 includes the activity entry but does NOT include a +// WFT entry — even when LastWorkflowTaskFailure is non-nil (stale state from a prior config). +func TestReportedProblems_MixedWFTAndActivity(t *testing.T) { + ms, taskGen := newReportedProblemsReproMutableState(t, 0 /* wftThreshold */, 2 /* activityThreshold */) + + // Simulate stale LastWorkflowTaskFailure state (e.g., left over from when the WFT threshold + // was previously > 0). + ms.executionInfo.LastWorkflowTaskFailure = &persistencespb.WorkflowExecutionInfo_LastWorkflowTaskFailureCause{ + LastWorkflowTaskFailureCause: enumspb.WORKFLOW_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE, + } + + // Activity has reached the retry threshold. + ms.pendingActivityInfoIDs[1] = &persistencespb.ActivityInfo{ + ScheduledEventId: 1, + ActivityId: "activity", + Attempt: 2, + } + + taskGen.EXPECT().GenerateUpsertVisibilityTask().Return(nil) + require.NoError(t, ms.UpdateReportedProblemsSearchAttribute()) + + problems := reportedProblemsForRepro(t, ms) + require.Contains(t, problems, "category=ActivityRetryFailed", + "activity entry should appear when activity threshold is met") + require.NotContains(t, problems, "category=WorkflowTaskFailed", + "WFT entry should not appear when WFT threshold is 0 (disabled), even with stale LastWorkflowTaskFailure") +} + +// TestReportedProblems_ActivityClearedAfterAttemptReset verifies that the +// category=ActivityRetryFailed entry is removed from TemporalReportedProblems when an +// activity's attempt count drops below the threshold (e.g., via ResetActivity or +// UnpauseWithReset). UpdateActivity triggers this recompute whenever the attempt changes. +func TestReportedProblems_ActivityClearedAfterAttemptReset(t *testing.T) { + ms, taskGen := newReportedProblemsReproMutableState(t, 0 /* wftThreshold */, 2 /* activityThreshold */) + + // Prime the SA: activity at attempt 2 meets the threshold. + ms.pendingActivityInfoIDs[1] = &persistencespb.ActivityInfo{ + ScheduledEventId: 1, + ActivityId: "activity", + Attempt: 2, + } + taskGen.EXPECT().GenerateUpsertVisibilityTask().Return(nil) + require.NoError(t, ms.UpdateReportedProblemsSearchAttribute()) + require.Contains(t, reportedProblemsForRepro(t, ms), "category=ActivityRetryFailed", + "precondition: SA entry should be present before reset") + + // Simulate what UpdateActivity does when attempt changes (e.g., ResetActivity sets Attempt=1). + ms.pendingActivityInfoIDs[1].Attempt = 1 + + taskGen.EXPECT().GenerateUpsertVisibilityTask().Return(nil) + require.NoError(t, ms.maybeUpdateActivityReportedProblems()) + + problems := reportedProblemsForRepro(t, ms) + require.NotContains(t, problems, "category=ActivityRetryFailed", + "activity entry should be cleared once attempt count drops below threshold") + // TestToRealTime tests ms.ToRealTime() exhaustively as this function is also used by executions that don't // use time skipping and need to be tested thoroughly. This function converts virtual time to wall-clock time. func (s *mutableStateSuite) TestToRealTime() { diff --git a/tests/activity_reported_problems_test.go b/tests/activity_reported_problems_test.go new file mode 100644 index 0000000000..948d7e6da1 --- /dev/null +++ b/tests/activity_reported_problems_test.go @@ -0,0 +1,140 @@ +package tests + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + sdkclient "go.temporal.io/sdk/client" + "go.temporal.io/sdk/temporal" + "go.temporal.io/sdk/workflow" + "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/searchattribute/sadefs" + "go.temporal.io/server/tests/testcore" +) + +type ActivityReportedProblemsTestSuite struct { + testcore.FunctionalTestBase + activityShouldFail atomic.Bool +} + +func TestActivityReportedProblemsTestSuite(t *testing.T) { + s := new(ActivityReportedProblemsTestSuite) + suite.Run(t, s) +} + +func (s *ActivityReportedProblemsTestSuite) SetupTest() { + s.FunctionalTestBase.SetupTest() + // Trigger after 3 retry attempts (i.e., activity is on its 3rd or higher attempt). + s.OverrideDynamicConfig(dynamicconfig.NumConsecutiveActivityRetryProblemsToTriggerSearchAttribute, 3) +} + +func (s *ActivityReportedProblemsTestSuite) failingActivity() (string, error) { + if s.activityShouldFail.Load() { + return "", temporal.NewApplicationError("forced-activity-failure", "ForcedError") + } + return "done!", nil +} + +// workflowWithRetryingActivity runs an activity with a retry policy and waits for it to finish. +func (s *ActivityReportedProblemsTestSuite) workflowWithRetryingActivity(ctx workflow.Context) (string, error) { + var ret string + err := workflow.ExecuteActivity(workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Second, + RetryPolicy: &temporal.RetryPolicy{ + MaximumAttempts: 10, + }, + }), s.failingActivity).Get(ctx, &ret) + if err != nil { + return "", err + } + return ret, nil +} + +// TestActivityReportedProblems_SetAndClear verifies that the TemporalReportedProblems search +// attribute is set once the activity retry threshold is reached, and cleared when the activity +// eventually succeeds. +func (s *ActivityReportedProblemsTestSuite) TestActivityReportedProblems_SetAndClear() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + s.activityShouldFail.Store(true) + + s.SdkWorker().RegisterWorkflow(s.workflowWithRetryingActivity) + s.SdkWorker().RegisterActivity(s.failingActivity) + + workflowOptions := sdkclient.StartWorkflowOptions{ + ID: testcore.RandomizeStr("wf_id-" + s.T().Name()), + TaskQueue: s.TaskQueue(), + } + + workflowRun, err := s.SdkClient().ExecuteWorkflow(ctx, workflowOptions, s.workflowWithRetryingActivity) + s.NoError(err) + + // Wait until TemporalReportedProblems is set with the activity retry category. + s.EventuallyWithT(func(t *assert.CollectT) { + description, err := s.SdkClient().DescribeWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + require.NoError(t, err) + saVal, ok := description.TypedSearchAttributes.GetKeywordList(temporal.NewSearchAttributeKeyKeywordList(sadefs.TemporalReportedProblems)) + require.True(t, ok) + require.Contains(t, saVal, "category=ActivityRetryFailed") + }, 20*time.Second, 500*time.Millisecond) + + // Unblock the activity so the workflow can complete. + s.activityShouldFail.Store(false) + + var out string + s.NoError(workflowRun.Get(ctx, &out)) + s.Equal("done!", out) + + // After the workflow completes, the search attribute should be gone. + description, err := s.SdkClient().DescribeWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + s.NoError(err) + _, ok := description.TypedSearchAttributes.GetKeywordList(temporal.NewSearchAttributeKeyKeywordList(sadefs.TemporalReportedProblems)) + s.False(ok) +} + +// TestActivityReportedProblems_DisabledByDefault verifies that when the threshold is set to 0 +// (the default), the search attribute is never set even when the activity retries many times. +func (s *ActivityReportedProblemsTestSuite) TestActivityReportedProblems_DisabledByDefault() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // Override to disabled (threshold = 0). + cleanup := s.OverrideDynamicConfig(dynamicconfig.NumConsecutiveActivityRetryProblemsToTriggerSearchAttribute, 0) + defer cleanup() + + s.activityShouldFail.Store(true) + + s.SdkWorker().RegisterWorkflow(s.workflowWithRetryingActivity) + s.SdkWorker().RegisterActivity(s.failingActivity) + + workflowOptions := sdkclient.StartWorkflowOptions{ + ID: testcore.RandomizeStr("wf_id-" + s.T().Name()), + TaskQueue: s.TaskQueue(), + } + + workflowRun, err := s.SdkClient().ExecuteWorkflow(ctx, workflowOptions, s.workflowWithRetryingActivity) + s.NoError(err) + + // Wait for several retries but confirm the SA is never set. + s.EventuallyWithT(func(t *assert.CollectT) { + exec, err := s.SdkClient().DescribeWorkflowExecution(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + require.NoError(t, err) + // Confirm at least 3 retry attempts have happened. + require.NotEmpty(t, exec.PendingActivities) + require.GreaterOrEqual(t, exec.PendingActivities[0].Attempt, int32(3)) + }, 20*time.Second, 500*time.Millisecond) + + description, err := s.SdkClient().DescribeWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID()) + s.NoError(err) + _, ok := description.TypedSearchAttributes.GetKeywordList(temporal.NewSearchAttributeKeyKeywordList(sadefs.TemporalReportedProblems)) + s.False(ok, "TemporalReportedProblems should not be set when threshold is disabled") + + // Clean up. + s.NoError(s.SdkClient().TerminateWorkflow(ctx, workflowRun.GetID(), workflowRun.GetRunID(), "test cleanup")) +}