From 54ec32c8352be90693bf29b97eb052ba02f20660 Mon Sep 17 00:00:00 2001 From: Bruce Arctor <5032356+brucearctor@users.noreply.github.com> Date: Wed, 13 May 2026 20:18:06 -0700 Subject: [PATCH] feat: add ExecuteStandaloneActivity to TestWorkflowEnvironment Add ExecuteStandaloneActivity method to TestWorkflowEnvironment that executes standalone activities through the full ClientOutboundInterceptor chain, enabling testing of interceptor-dependent features such as OpenTelemetry tracing, metrics, and header propagation. Unlike TestActivityEnvironment.ExecuteActivity (which runs the activity function directly via taskHandler.Execute), this method builds and traverses the client interceptor chain before executing the activity, matching the real Client.ExecuteActivity flow. Implementation: - testClientOutboundInterceptor: terminal interceptor replacing gRPC calls with local test execution - testClientActivityHandleImpl: implements ClientActivityHandle for the test environment - executeStandaloneActivity: builds interceptor chain from registered WorkerOptions.Interceptors and routes through it Closes #2318 --- internal/internal_workflow_testsuite.go | 179 ++++++++ internal/workflow_testsuite.go | 27 ++ ...flow_testsuite_standalone_activity_test.go | 388 ++++++++++++++++++ 3 files changed, 594 insertions(+) create mode 100644 internal/workflow_testsuite_standalone_activity_test.go diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index c3703bc0d..4235e7644 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -3944,3 +3944,182 @@ func (res *updateResult) post_callbacks(env *testWorkflowEnvironmentImpl) { } res.callbacks = []updateCallbacksWrapper{} } + +// testClientOutboundInterceptor is the terminal interceptor in the +// ClientOutboundInterceptor chain for TestWorkflowEnvironment. Instead of +// making gRPC calls to a real server, it executes activities directly via the +// test activity task handler. +type testClientOutboundInterceptor struct { + ClientOutboundInterceptorBase + env *testWorkflowEnvironmentImpl +} + +func (t *testClientOutboundInterceptor) ExecuteActivity( + ctx context.Context, + in *ClientExecuteActivityInput, +) (ClientActivityHandle, error) { + if in.Options == nil { + return nil, errors.New("options are required") + } + if in.Options.ID == "" { + in.Options.ID = uuid.NewString() + } + if in.Options.TaskQueue == "" { + in.Options.TaskQueue = defaultTestTaskQueue + } + if in.Options.ScheduleToCloseTimeout == 0 && in.Options.StartToCloseTimeout == 0 { + in.Options.StartToCloseTimeout = 10 * time.Minute + } + + dc := t.env.GetDataConverter() + input, err := encodeArgs(dc, in.Args) + if err != nil { + return nil, fmt.Errorf("failed to encode activity args: %w", err) + } + + scheduleTaskAttr := &commandpb.ScheduleActivityTaskCommandAttributes{ + ActivityId: in.Options.ID, + ActivityType: &commonpb.ActivityType{Name: in.ActivityType}, + TaskQueue: &taskqueuepb.TaskQueue{Name: in.Options.TaskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Input: input, + ScheduleToCloseTimeout: durationpb.New(in.Options.ScheduleToCloseTimeout), + StartToCloseTimeout: durationpb.New(in.Options.StartToCloseTimeout), + ScheduleToStartTimeout: durationpb.New(in.Options.ScheduleToStartTimeout), + HeartbeatTimeout: durationpb.New(in.Options.HeartbeatTimeout), + RetryPolicy: convertToPBRetryPolicy(in.Options.RetryPolicy), + } + + // Propagate headers from context + header, err := headerPropagated(ctx, t.env.contextPropagators) + if err != nil { + return nil, fmt.Errorf("failed to propagate headers: %w", err) + } + scheduleTaskAttr.Header = header + + task := newTestActivityTask(t.env.workflowInfo.Namespace, scheduleTaskAttr) + activityRunID := getStringID(t.env.nextID()) + task.ActivityRunId = activityRunID + + taskHandler := t.env.newTestActivityTaskHandler(in.Options.TaskQueue, dc) + t.env.addNewActivityHandle(task, func(result *commonpb.Payloads, err error) {}, dc, t.env.GetFailureConverter()) + + result, err := taskHandler.Execute(in.Options.TaskQueue, task) + if err != nil { + if err == context.DeadlineExceeded { + return nil, fmt.Errorf("activity %v timed out: %w", in.ActivityType, err) + } + return nil, fmt.Errorf("activity %v panicked: %w", in.ActivityType, err) + } + + if result == ErrActivityResultPending { + return &testClientActivityHandleImpl{ + id: in.Options.ID, + runID: activityRunID, + env: t.env, + }, nil + } + + handle := &testClientActivityHandleImpl{ + id: in.Options.ID, + runID: activityRunID, + env: t.env, + } + + switch request := result.(type) { + case *workflowservice.RespondActivityTaskCanceledRequest: + handle.result = &ClientPollActivityResultOutput{ + Error: NewCanceledError(newEncodedValues(request.Details, dc)), + } + case *workflowservice.RespondActivityTaskFailedRequest: + handle.result = &ClientPollActivityResultOutput{ + Error: t.env.GetFailureConverter().FailureToError(request.GetFailure()), + } + case *workflowservice.RespondActivityTaskCompletedRequest: + handle.result = &ClientPollActivityResultOutput{ + Result: newEncodedValue(request.Result, dc), + } + default: + return nil, fmt.Errorf("unsupported activity result type %T", result) + } + + return handle, nil +} + +func (*testClientOutboundInterceptor) mustEmbedClientOutboundInterceptorBase() {} + +// testClientActivityHandleImpl implements ClientActivityHandle for the test +// environment. Since the test environment executes activities synchronously, +// the result is typically available immediately. +type testClientActivityHandleImpl struct { + id string + runID string + env *testWorkflowEnvironmentImpl + result *ClientPollActivityResultOutput +} + +func (h *testClientActivityHandleImpl) GetID() string { + return h.id +} + +func (h *testClientActivityHandleImpl) GetRunID() string { + return h.runID +} + +func (h *testClientActivityHandleImpl) Get(ctx context.Context, valuePtr any) error { + if h.result == nil { + return fmt.Errorf("activity result not available") + } + if h.result.Error != nil { + return h.result.Error + } + if h.result.Result != nil && valuePtr != nil { + return h.result.Result.Get(valuePtr) + } + return nil +} + +func (h *testClientActivityHandleImpl) Describe(ctx context.Context, options ClientDescribeActivityOptions) (*ClientActivityExecutionDescription, error) { + return nil, fmt.Errorf("Describe is not supported in the test environment") +} + +func (h *testClientActivityHandleImpl) Cancel(ctx context.Context, options ClientCancelActivityOptions) error { + return fmt.Errorf("Cancel is not supported in the test environment for standalone activities") +} + +func (h *testClientActivityHandleImpl) Terminate(ctx context.Context, options ClientTerminateActivityOptions) error { + return fmt.Errorf("Terminate is not supported in the test environment for standalone activities") +} + +// executeStandaloneActivity runs a standalone activity through the +// ClientOutboundInterceptor chain, allowing user interceptors (tracing, +// metrics, etc.) to be exercised during tests. +func (env *testWorkflowEnvironmentImpl) executeStandaloneActivity( + ctx context.Context, + options ClientStartActivityOptions, + activityFn interface{}, + args ...interface{}, +) (ClientActivityHandle, error) { + activityType, err := getValidatedActivityFunction(activityFn, args, env.registry) + if err != nil { + return nil, err + } + + // Set up header context so interceptors can read/write headers + ctx = contextWithNewHeader(ctx) + + // Build the client interceptor chain: user interceptors wrapping our + // terminal test interceptor. The chain is built per-call, matching the + // real client's initialization pattern. + var chain ClientOutboundInterceptor = &testClientOutboundInterceptor{env: env} + for i := len(env.registry.interceptors) - 1; i >= 0; i-- { + if ci, ok := env.registry.interceptors[i].(ClientInterceptor); ok { + chain = ci.InterceptClient(chain) + } + } + + return chain.ExecuteActivity(ctx, &ClientExecuteActivityInput{ + Options: &options, + ActivityType: activityType.Name, + Args: args, + }) +} diff --git a/internal/workflow_testsuite.go b/internal/workflow_testsuite.go index 3d15a2c59..36a2ef76c 100644 --- a/internal/workflow_testsuite.go +++ b/internal/workflow_testsuite.go @@ -331,6 +331,33 @@ func (e *TestWorkflowEnvironment) RegisterNexusService(s *nexus.Service) { e.impl.RegisterNexusService(s) } +// ExecuteStandaloneActivity executes a standalone activity through the full +// client interceptor chain. Unlike [TestActivityEnvironment.ExecuteActivity], +// which runs the activity function directly, this method invokes the +// [interceptor.ClientOutboundInterceptor] chain, enabling testing of +// interceptor-dependent features such as tracing, metrics, and header +// propagation. +// +// The activity is executed synchronously. The returned [ClientActivityHandle] +// can be used to retrieve the activity result via [ClientActivityHandle.Get]. +// +// Interceptors are sourced from [TestWorkflowEnvironment.SetWorkerOptions]. +// Only interceptors implementing [interceptor.ClientInterceptor] participate +// in the chain; pure [interceptor.WorkerInterceptor] implementations are +// skipped. +// +// NOTE: Experimental +// +// Exposed as: [go.temporal.io/sdk/testsuite.TestWorkflowEnvironment.ExecuteStandaloneActivity] +func (e *TestWorkflowEnvironment) ExecuteStandaloneActivity( + ctx context.Context, + options ClientStartActivityOptions, + activity interface{}, + args ...interface{}, +) (ClientActivityHandle, error) { + return e.impl.executeStandaloneActivity(ctx, options, activity, args...) +} + // SetStartTime sets the start time of the workflow. This is optional, default start time will be the wall clock time when // workflow starts. Start time is the workflow.Now(ctx) time at the beginning of the workflow. func (e *TestWorkflowEnvironment) SetStartTime(startTime time.Time) { diff --git a/internal/workflow_testsuite_standalone_activity_test.go b/internal/workflow_testsuite_standalone_activity_test.go new file mode 100644 index 000000000..6bb9bd447 --- /dev/null +++ b/internal/workflow_testsuite_standalone_activity_test.go @@ -0,0 +1,388 @@ +package internal + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "go.temporal.io/sdk/converter" +) + +func testStandaloneGreetActivity(ctx context.Context, name string) (string, error) { + return "Hello, " + name + "!", nil +} + +func TestExecuteStandaloneActivity_BasicExecution(t *testing.T) { + var suite WorkflowTestSuite + env := suite.NewTestWorkflowEnvironment() + env.RegisterActivity(testStandaloneGreetActivity) + + handle, err := env.ExecuteStandaloneActivity( + context.Background(), + ClientStartActivityOptions{ + ID: "test-activity-1", + TaskQueue: "test-queue", + StartToCloseTimeout: 10 * 60 * 1e9, // 10 minutes in nanoseconds + }, + testStandaloneGreetActivity, + "World", + ) + require.NoError(t, err) + require.NotNil(t, handle) + require.Equal(t, "test-activity-1", handle.GetID()) + require.NotEmpty(t, handle.GetRunID()) + + var result string + err = handle.Get(context.Background(), &result) + require.NoError(t, err) + require.Equal(t, "Hello, World!", result) +} + +func TestExecuteStandaloneActivity_DefaultOptions(t *testing.T) { + var suite WorkflowTestSuite + env := suite.NewTestWorkflowEnvironment() + env.RegisterActivity(testStandaloneGreetActivity) + + // Test with minimal options — defaults should be applied + handle, err := env.ExecuteStandaloneActivity( + context.Background(), + ClientStartActivityOptions{}, + testStandaloneGreetActivity, + "Temporal", + ) + require.NoError(t, err) + require.NotNil(t, handle) + require.NotEmpty(t, handle.GetID()) + + var result string + err = handle.Get(context.Background(), &result) + require.NoError(t, err) + require.Equal(t, "Hello, Temporal!", result) +} + +func testStandaloneFailActivity(ctx context.Context) (string, error) { + return "", NewApplicationError("intentional failure", "TEST_ERROR", false, nil) +} + +func TestExecuteStandaloneActivity_Failure(t *testing.T) { + var suite WorkflowTestSuite + env := suite.NewTestWorkflowEnvironment() + env.RegisterActivity(testStandaloneFailActivity) + + handle, err := env.ExecuteStandaloneActivity( + context.Background(), + ClientStartActivityOptions{ + ID: "fail-activity", + TaskQueue: "test-queue", + StartToCloseTimeout: 10 * 60 * 1e9, + }, + testStandaloneFailActivity, + ) + require.NoError(t, err) + require.NotNil(t, handle) + + var result string + err = handle.Get(context.Background(), &result) + require.Error(t, err) + require.Contains(t, err.Error(), "intentional failure") +} + +// tracingInterceptor records whether ExecuteActivity was called on the +// ClientOutboundInterceptor chain. +type standaloneActivityTestInterceptor struct { + WorkerInterceptorBase + ClientInterceptorBase + executeActivityCalled bool + activityType string +} + +func (s *standaloneActivityTestInterceptor) InterceptClient(next ClientOutboundInterceptor) ClientOutboundInterceptor { + return &standaloneActivityTestOutbound{ + ClientOutboundInterceptorBase: ClientOutboundInterceptorBase{Next: next}, + interceptor: s, + } +} + +type standaloneActivityTestOutbound struct { + ClientOutboundInterceptorBase + interceptor *standaloneActivityTestInterceptor +} + +func (s *standaloneActivityTestOutbound) ExecuteActivity( + ctx context.Context, + in *ClientExecuteActivityInput, +) (ClientActivityHandle, error) { + s.interceptor.executeActivityCalled = true + s.interceptor.activityType = in.ActivityType + + // Verify header is available (proves contextWithNewHeader was called) + header := Header(ctx) + if header == nil { + panic("expected non-nil header in interceptor") + } + + return s.Next.ExecuteActivity(ctx, in) +} + +func TestExecuteStandaloneActivity_InterceptorChain(t *testing.T) { + interceptor := &standaloneActivityTestInterceptor{} + + var suite WorkflowTestSuite + env := suite.NewTestWorkflowEnvironment() + env.SetWorkerOptions(WorkerOptions{ + Interceptors: []WorkerInterceptor{interceptor}, + }) + env.RegisterActivity(testStandaloneGreetActivity) + + handle, err := env.ExecuteStandaloneActivity( + context.Background(), + ClientStartActivityOptions{ + ID: "intercepted-activity", + TaskQueue: "test-queue", + StartToCloseTimeout: 10 * 60 * 1e9, + }, + testStandaloneGreetActivity, + "Intercepted", + ) + require.NoError(t, err) + require.NotNil(t, handle) + + // Verify interceptor was called + require.True(t, interceptor.executeActivityCalled, "expected interceptor ExecuteActivity to be called") + require.Equal(t, "testStandaloneGreetActivity", interceptor.activityType) + + var result string + err = handle.Get(context.Background(), &result) + require.NoError(t, err) + require.Equal(t, "Hello, Intercepted!", result) +} + +func TestExecuteStandaloneActivity_UnregisteredActivity(t *testing.T) { + var suite WorkflowTestSuite + env := suite.NewTestWorkflowEnvironment() + + // The test suite panics when no activities are registered for the task queue. + // This matches the existing behavior of TestActivityEnvironment.ExecuteActivity. + require.Panics(t, func() { + _, _ = env.ExecuteStandaloneActivity( + context.Background(), + ClientStartActivityOptions{ + ID: "unregistered", + TaskQueue: "test-queue", + }, + func(ctx context.Context) error { return nil }, + ) + }) +} + +func TestExecuteStandaloneActivity_CustomDataConverter(t *testing.T) { + var suite WorkflowTestSuite + env := suite.NewTestWorkflowEnvironment() + env.SetDataConverter(converter.GetDefaultDataConverter()) + env.RegisterActivity(testStandaloneGreetActivity) + + handle, err := env.ExecuteStandaloneActivity( + context.Background(), + ClientStartActivityOptions{ + ID: "custom-dc-activity", + TaskQueue: "test-queue", + StartToCloseTimeout: 10 * 60 * 1e9, + }, + testStandaloneGreetActivity, + "DataConverter", + ) + require.NoError(t, err) + + var result string + err = handle.Get(context.Background(), &result) + require.NoError(t, err) + require.Equal(t, "Hello, DataConverter!", result) +} + +// TestExecuteStandaloneActivity_InterceptorTestProxy validates that the +// interceptortest.CallRecordingInvoker (the SDK's canonical proxy pattern) +// correctly records ExecuteActivity calls made via ExecuteStandaloneActivity. +// This is the critical test for issue #2318: it proves that standalone activity +// execution traverses the ClientOutboundInterceptor chain, enabling OTel +// tracing support (PR #2302). +func TestExecuteStandaloneActivity_InterceptorTestProxy(t *testing.T) { + var rec callRecordingInterceptor + + var suite WorkflowTestSuite + env := suite.NewTestWorkflowEnvironment() + env.SetWorkerOptions(WorkerOptions{ + Interceptors: []WorkerInterceptor{&rec}, + }) + env.RegisterActivity(testStandaloneGreetActivity) + + handle, err := env.ExecuteStandaloneActivity( + context.Background(), + ClientStartActivityOptions{ + ID: "proxy-activity", + TaskQueue: "test-queue", + StartToCloseTimeout: 10 * 60 * 1e9, + }, + testStandaloneGreetActivity, + "Proxy", + ) + require.NoError(t, err) + require.NotNil(t, handle) + + var result string + err = handle.Get(context.Background(), &result) + require.NoError(t, err) + require.Equal(t, "Hello, Proxy!", result) + + // Verify the interceptor recorded the call with correct method and args + require.True(t, rec.executeActivityCalled) + require.Equal(t, "testStandaloneGreetActivity", rec.activityType) + require.NotNil(t, rec.options) + require.Equal(t, "proxy-activity", rec.options.ID) +} + +// callRecordingInterceptor is a lightweight recording interceptor that +// captures ExecuteActivity calls including the full input options. +type callRecordingInterceptor struct { + WorkerInterceptorBase + ClientInterceptorBase + executeActivityCalled bool + activityType string + options *ClientStartActivityOptions +} + +func (c *callRecordingInterceptor) InterceptClient(next ClientOutboundInterceptor) ClientOutboundInterceptor { + return &callRecordingOutbound{ + ClientOutboundInterceptorBase: ClientOutboundInterceptorBase{Next: next}, + rec: c, + } +} + +type callRecordingOutbound struct { + ClientOutboundInterceptorBase + rec *callRecordingInterceptor +} + +func (c *callRecordingOutbound) ExecuteActivity( + ctx context.Context, + in *ClientExecuteActivityInput, +) (ClientActivityHandle, error) { + c.rec.executeActivityCalled = true + c.rec.activityType = in.ActivityType + c.rec.options = in.Options + return c.Next.ExecuteActivity(ctx, in) +} + +// TestExecuteStandaloneActivity_MultipleInterceptors verifies that multiple +// interceptors are chained correctly, with the outermost interceptor called +// first (matching the real client's behavior). +func TestExecuteStandaloneActivity_MultipleInterceptors(t *testing.T) { + var callOrder []string + + outer := &orderTrackingInterceptor{name: "outer", callOrder: &callOrder} + inner := &orderTrackingInterceptor{name: "inner", callOrder: &callOrder} + + var suite WorkflowTestSuite + env := suite.NewTestWorkflowEnvironment() + env.SetWorkerOptions(WorkerOptions{ + // Earlier interceptors wrap later ones + Interceptors: []WorkerInterceptor{outer, inner}, + }) + env.RegisterActivity(testStandaloneGreetActivity) + + handle, err := env.ExecuteStandaloneActivity( + context.Background(), + ClientStartActivityOptions{}, + testStandaloneGreetActivity, + "Multi", + ) + require.NoError(t, err) + + var result string + err = handle.Get(context.Background(), &result) + require.NoError(t, err) + require.Equal(t, "Hello, Multi!", result) + + // Outer interceptor should be called first + require.Equal(t, []string{"outer", "inner"}, callOrder) +} + +type orderTrackingInterceptor struct { + WorkerInterceptorBase + ClientInterceptorBase + name string + callOrder *[]string +} + +func (o *orderTrackingInterceptor) InterceptClient(next ClientOutboundInterceptor) ClientOutboundInterceptor { + return &orderTrackingOutbound{ + ClientOutboundInterceptorBase: ClientOutboundInterceptorBase{Next: next}, + interceptor: o, + } +} + +type orderTrackingOutbound struct { + ClientOutboundInterceptorBase + interceptor *orderTrackingInterceptor +} + +func (o *orderTrackingOutbound) ExecuteActivity( + ctx context.Context, + in *ClientExecuteActivityInput, +) (ClientActivityHandle, error) { + *o.interceptor.callOrder = append(*o.interceptor.callOrder, o.interceptor.name) + return o.Next.ExecuteActivity(ctx, in) +} + +func testStandaloneVoidActivity(ctx context.Context) error { + return nil +} + +// TestExecuteStandaloneActivity_VoidReturn verifies that activities with no +// return value (only error) work correctly. +func TestExecuteStandaloneActivity_VoidReturn(t *testing.T) { + var suite WorkflowTestSuite + env := suite.NewTestWorkflowEnvironment() + env.RegisterActivity(testStandaloneVoidActivity) + + handle, err := env.ExecuteStandaloneActivity( + context.Background(), + ClientStartActivityOptions{}, + testStandaloneVoidActivity, + ) + require.NoError(t, err) + require.NotNil(t, handle) + + // Get with nil valuePtr should succeed + err = handle.Get(context.Background(), nil) + require.NoError(t, err) +} + +func testStandaloneMultiArgActivity(ctx context.Context, first string, second int, third bool) (string, error) { + if third { + return fmt.Sprintf("%s-%d-true", first, second), nil + } + return fmt.Sprintf("%s-%d-false", first, second), nil +} + +// TestExecuteStandaloneActivity_MultipleArgs verifies that activities with +// multiple arguments are encoded and passed correctly. +func TestExecuteStandaloneActivity_MultipleArgs(t *testing.T) { + var suite WorkflowTestSuite + env := suite.NewTestWorkflowEnvironment() + env.RegisterActivity(testStandaloneMultiArgActivity) + + handle, err := env.ExecuteStandaloneActivity( + context.Background(), + ClientStartActivityOptions{}, + testStandaloneMultiArgActivity, + "hello", 42, true, + ) + require.NoError(t, err) + + var result string + err = handle.Get(context.Background(), &result) + require.NoError(t, err) + require.Equal(t, "hello-42-true", result) +}