diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index c3703bc0d..4235e7644 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -3944,3 +3944,182 @@ func (res *updateResult) post_callbacks(env *testWorkflowEnvironmentImpl) { } res.callbacks = []updateCallbacksWrapper{} } + +// testClientOutboundInterceptor is the terminal interceptor in the +// ClientOutboundInterceptor chain for TestWorkflowEnvironment. Instead of +// making gRPC calls to a real server, it executes activities directly via the +// test activity task handler. +type testClientOutboundInterceptor struct { + ClientOutboundInterceptorBase + env *testWorkflowEnvironmentImpl +} + +func (t *testClientOutboundInterceptor) ExecuteActivity( + ctx context.Context, + in *ClientExecuteActivityInput, +) (ClientActivityHandle, error) { + if in.Options == nil { + return nil, errors.New("options are required") + } + if in.Options.ID == "" { + in.Options.ID = uuid.NewString() + } + if in.Options.TaskQueue == "" { + in.Options.TaskQueue = defaultTestTaskQueue + } + if in.Options.ScheduleToCloseTimeout == 0 && in.Options.StartToCloseTimeout == 0 { + in.Options.StartToCloseTimeout = 10 * time.Minute + } + + dc := t.env.GetDataConverter() + input, err := encodeArgs(dc, in.Args) + if err != nil { + return nil, fmt.Errorf("failed to encode activity args: %w", err) + } + + scheduleTaskAttr := &commandpb.ScheduleActivityTaskCommandAttributes{ + ActivityId: in.Options.ID, + ActivityType: &commonpb.ActivityType{Name: in.ActivityType}, + TaskQueue: &taskqueuepb.TaskQueue{Name: in.Options.TaskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}, + Input: input, + ScheduleToCloseTimeout: durationpb.New(in.Options.ScheduleToCloseTimeout), + StartToCloseTimeout: durationpb.New(in.Options.StartToCloseTimeout), + ScheduleToStartTimeout: durationpb.New(in.Options.ScheduleToStartTimeout), + HeartbeatTimeout: durationpb.New(in.Options.HeartbeatTimeout), + RetryPolicy: convertToPBRetryPolicy(in.Options.RetryPolicy), + } + + // Propagate headers from context + header, err := headerPropagated(ctx, t.env.contextPropagators) + if err != nil { + return nil, fmt.Errorf("failed to propagate headers: %w", err) + } + scheduleTaskAttr.Header = header + + task := newTestActivityTask(t.env.workflowInfo.Namespace, scheduleTaskAttr) + activityRunID := getStringID(t.env.nextID()) + task.ActivityRunId = activityRunID + + taskHandler := t.env.newTestActivityTaskHandler(in.Options.TaskQueue, dc) + t.env.addNewActivityHandle(task, func(result *commonpb.Payloads, err error) {}, dc, t.env.GetFailureConverter()) + + result, err := taskHandler.Execute(in.Options.TaskQueue, task) + if err != nil { + if err == context.DeadlineExceeded { + return nil, fmt.Errorf("activity %v timed out: %w", in.ActivityType, err) + } + return nil, fmt.Errorf("activity %v panicked: %w", in.ActivityType, err) + } + + if result == ErrActivityResultPending { + return &testClientActivityHandleImpl{ + id: in.Options.ID, + runID: activityRunID, + env: t.env, + }, nil + } + + handle := &testClientActivityHandleImpl{ + id: in.Options.ID, + runID: activityRunID, + env: t.env, + } + + switch request := result.(type) { + case *workflowservice.RespondActivityTaskCanceledRequest: + handle.result = &ClientPollActivityResultOutput{ + Error: NewCanceledError(newEncodedValues(request.Details, dc)), + } + case *workflowservice.RespondActivityTaskFailedRequest: + handle.result = &ClientPollActivityResultOutput{ + Error: t.env.GetFailureConverter().FailureToError(request.GetFailure()), + } + case *workflowservice.RespondActivityTaskCompletedRequest: + handle.result = &ClientPollActivityResultOutput{ + Result: newEncodedValue(request.Result, dc), + } + default: + return nil, fmt.Errorf("unsupported activity result type %T", result) + } + + return handle, nil +} + +func (*testClientOutboundInterceptor) mustEmbedClientOutboundInterceptorBase() {} + +// testClientActivityHandleImpl implements ClientActivityHandle for the test +// environment. Since the test environment executes activities synchronously, +// the result is typically available immediately. +type testClientActivityHandleImpl struct { + id string + runID string + env *testWorkflowEnvironmentImpl + result *ClientPollActivityResultOutput +} + +func (h *testClientActivityHandleImpl) GetID() string { + return h.id +} + +func (h *testClientActivityHandleImpl) GetRunID() string { + return h.runID +} + +func (h *testClientActivityHandleImpl) Get(ctx context.Context, valuePtr any) error { + if h.result == nil { + return fmt.Errorf("activity result not available") + } + if h.result.Error != nil { + return h.result.Error + } + if h.result.Result != nil && valuePtr != nil { + return h.result.Result.Get(valuePtr) + } + return nil +} + +func (h *testClientActivityHandleImpl) Describe(ctx context.Context, options ClientDescribeActivityOptions) (*ClientActivityExecutionDescription, error) { + return nil, fmt.Errorf("Describe is not supported in the test environment") +} + +func (h *testClientActivityHandleImpl) Cancel(ctx context.Context, options ClientCancelActivityOptions) error { + return fmt.Errorf("Cancel is not supported in the test environment for standalone activities") +} + +func (h *testClientActivityHandleImpl) Terminate(ctx context.Context, options ClientTerminateActivityOptions) error { + return fmt.Errorf("Terminate is not supported in the test environment for standalone activities") +} + +// executeStandaloneActivity runs a standalone activity through the +// ClientOutboundInterceptor chain, allowing user interceptors (tracing, +// metrics, etc.) to be exercised during tests. +func (env *testWorkflowEnvironmentImpl) executeStandaloneActivity( + ctx context.Context, + options ClientStartActivityOptions, + activityFn interface{}, + args ...interface{}, +) (ClientActivityHandle, error) { + activityType, err := getValidatedActivityFunction(activityFn, args, env.registry) + if err != nil { + return nil, err + } + + // Set up header context so interceptors can read/write headers + ctx = contextWithNewHeader(ctx) + + // Build the client interceptor chain: user interceptors wrapping our + // terminal test interceptor. The chain is built per-call, matching the + // real client's initialization pattern. + var chain ClientOutboundInterceptor = &testClientOutboundInterceptor{env: env} + for i := len(env.registry.interceptors) - 1; i >= 0; i-- { + if ci, ok := env.registry.interceptors[i].(ClientInterceptor); ok { + chain = ci.InterceptClient(chain) + } + } + + return chain.ExecuteActivity(ctx, &ClientExecuteActivityInput{ + Options: &options, + ActivityType: activityType.Name, + Args: args, + }) +} diff --git a/internal/workflow_testsuite.go b/internal/workflow_testsuite.go index 3d15a2c59..36a2ef76c 100644 --- a/internal/workflow_testsuite.go +++ b/internal/workflow_testsuite.go @@ -331,6 +331,33 @@ func (e *TestWorkflowEnvironment) RegisterNexusService(s *nexus.Service) { e.impl.RegisterNexusService(s) } +// ExecuteStandaloneActivity executes a standalone activity through the full +// client interceptor chain. Unlike [TestActivityEnvironment.ExecuteActivity], +// which runs the activity function directly, this method invokes the +// [interceptor.ClientOutboundInterceptor] chain, enabling testing of +// interceptor-dependent features such as tracing, metrics, and header +// propagation. +// +// The activity is executed synchronously. The returned [ClientActivityHandle] +// can be used to retrieve the activity result via [ClientActivityHandle.Get]. +// +// Interceptors are sourced from [TestWorkflowEnvironment.SetWorkerOptions]. +// Only interceptors implementing [interceptor.ClientInterceptor] participate +// in the chain; pure [interceptor.WorkerInterceptor] implementations are +// skipped. +// +// NOTE: Experimental +// +// Exposed as: [go.temporal.io/sdk/testsuite.TestWorkflowEnvironment.ExecuteStandaloneActivity] +func (e *TestWorkflowEnvironment) ExecuteStandaloneActivity( + ctx context.Context, + options ClientStartActivityOptions, + activity interface{}, + args ...interface{}, +) (ClientActivityHandle, error) { + return e.impl.executeStandaloneActivity(ctx, options, activity, args...) +} + // SetStartTime sets the start time of the workflow. This is optional, default start time will be the wall clock time when // workflow starts. Start time is the workflow.Now(ctx) time at the beginning of the workflow. func (e *TestWorkflowEnvironment) SetStartTime(startTime time.Time) { diff --git a/internal/workflow_testsuite_standalone_activity_test.go b/internal/workflow_testsuite_standalone_activity_test.go new file mode 100644 index 000000000..6bb9bd447 --- /dev/null +++ b/internal/workflow_testsuite_standalone_activity_test.go @@ -0,0 +1,388 @@ +package internal + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "go.temporal.io/sdk/converter" +) + +func testStandaloneGreetActivity(ctx context.Context, name string) (string, error) { + return "Hello, " + name + "!", nil +} + +func TestExecuteStandaloneActivity_BasicExecution(t *testing.T) { + var suite WorkflowTestSuite + env := suite.NewTestWorkflowEnvironment() + env.RegisterActivity(testStandaloneGreetActivity) + + handle, err := env.ExecuteStandaloneActivity( + context.Background(), + ClientStartActivityOptions{ + ID: "test-activity-1", + TaskQueue: "test-queue", + StartToCloseTimeout: 10 * 60 * 1e9, // 10 minutes in nanoseconds + }, + testStandaloneGreetActivity, + "World", + ) + require.NoError(t, err) + require.NotNil(t, handle) + require.Equal(t, "test-activity-1", handle.GetID()) + require.NotEmpty(t, handle.GetRunID()) + + var result string + err = handle.Get(context.Background(), &result) + require.NoError(t, err) + require.Equal(t, "Hello, World!", result) +} + +func TestExecuteStandaloneActivity_DefaultOptions(t *testing.T) { + var suite WorkflowTestSuite + env := suite.NewTestWorkflowEnvironment() + env.RegisterActivity(testStandaloneGreetActivity) + + // Test with minimal options — defaults should be applied + handle, err := env.ExecuteStandaloneActivity( + context.Background(), + ClientStartActivityOptions{}, + testStandaloneGreetActivity, + "Temporal", + ) + require.NoError(t, err) + require.NotNil(t, handle) + require.NotEmpty(t, handle.GetID()) + + var result string + err = handle.Get(context.Background(), &result) + require.NoError(t, err) + require.Equal(t, "Hello, Temporal!", result) +} + +func testStandaloneFailActivity(ctx context.Context) (string, error) { + return "", NewApplicationError("intentional failure", "TEST_ERROR", false, nil) +} + +func TestExecuteStandaloneActivity_Failure(t *testing.T) { + var suite WorkflowTestSuite + env := suite.NewTestWorkflowEnvironment() + env.RegisterActivity(testStandaloneFailActivity) + + handle, err := env.ExecuteStandaloneActivity( + context.Background(), + ClientStartActivityOptions{ + ID: "fail-activity", + TaskQueue: "test-queue", + StartToCloseTimeout: 10 * 60 * 1e9, + }, + testStandaloneFailActivity, + ) + require.NoError(t, err) + require.NotNil(t, handle) + + var result string + err = handle.Get(context.Background(), &result) + require.Error(t, err) + require.Contains(t, err.Error(), "intentional failure") +} + +// tracingInterceptor records whether ExecuteActivity was called on the +// ClientOutboundInterceptor chain. +type standaloneActivityTestInterceptor struct { + WorkerInterceptorBase + ClientInterceptorBase + executeActivityCalled bool + activityType string +} + +func (s *standaloneActivityTestInterceptor) InterceptClient(next ClientOutboundInterceptor) ClientOutboundInterceptor { + return &standaloneActivityTestOutbound{ + ClientOutboundInterceptorBase: ClientOutboundInterceptorBase{Next: next}, + interceptor: s, + } +} + +type standaloneActivityTestOutbound struct { + ClientOutboundInterceptorBase + interceptor *standaloneActivityTestInterceptor +} + +func (s *standaloneActivityTestOutbound) ExecuteActivity( + ctx context.Context, + in *ClientExecuteActivityInput, +) (ClientActivityHandle, error) { + s.interceptor.executeActivityCalled = true + s.interceptor.activityType = in.ActivityType + + // Verify header is available (proves contextWithNewHeader was called) + header := Header(ctx) + if header == nil { + panic("expected non-nil header in interceptor") + } + + return s.Next.ExecuteActivity(ctx, in) +} + +func TestExecuteStandaloneActivity_InterceptorChain(t *testing.T) { + interceptor := &standaloneActivityTestInterceptor{} + + var suite WorkflowTestSuite + env := suite.NewTestWorkflowEnvironment() + env.SetWorkerOptions(WorkerOptions{ + Interceptors: []WorkerInterceptor{interceptor}, + }) + env.RegisterActivity(testStandaloneGreetActivity) + + handle, err := env.ExecuteStandaloneActivity( + context.Background(), + ClientStartActivityOptions{ + ID: "intercepted-activity", + TaskQueue: "test-queue", + StartToCloseTimeout: 10 * 60 * 1e9, + }, + testStandaloneGreetActivity, + "Intercepted", + ) + require.NoError(t, err) + require.NotNil(t, handle) + + // Verify interceptor was called + require.True(t, interceptor.executeActivityCalled, "expected interceptor ExecuteActivity to be called") + require.Equal(t, "testStandaloneGreetActivity", interceptor.activityType) + + var result string + err = handle.Get(context.Background(), &result) + require.NoError(t, err) + require.Equal(t, "Hello, Intercepted!", result) +} + +func TestExecuteStandaloneActivity_UnregisteredActivity(t *testing.T) { + var suite WorkflowTestSuite + env := suite.NewTestWorkflowEnvironment() + + // The test suite panics when no activities are registered for the task queue. + // This matches the existing behavior of TestActivityEnvironment.ExecuteActivity. + require.Panics(t, func() { + _, _ = env.ExecuteStandaloneActivity( + context.Background(), + ClientStartActivityOptions{ + ID: "unregistered", + TaskQueue: "test-queue", + }, + func(ctx context.Context) error { return nil }, + ) + }) +} + +func TestExecuteStandaloneActivity_CustomDataConverter(t *testing.T) { + var suite WorkflowTestSuite + env := suite.NewTestWorkflowEnvironment() + env.SetDataConverter(converter.GetDefaultDataConverter()) + env.RegisterActivity(testStandaloneGreetActivity) + + handle, err := env.ExecuteStandaloneActivity( + context.Background(), + ClientStartActivityOptions{ + ID: "custom-dc-activity", + TaskQueue: "test-queue", + StartToCloseTimeout: 10 * 60 * 1e9, + }, + testStandaloneGreetActivity, + "DataConverter", + ) + require.NoError(t, err) + + var result string + err = handle.Get(context.Background(), &result) + require.NoError(t, err) + require.Equal(t, "Hello, DataConverter!", result) +} + +// TestExecuteStandaloneActivity_InterceptorTestProxy validates that the +// interceptortest.CallRecordingInvoker (the SDK's canonical proxy pattern) +// correctly records ExecuteActivity calls made via ExecuteStandaloneActivity. +// This is the critical test for issue #2318: it proves that standalone activity +// execution traverses the ClientOutboundInterceptor chain, enabling OTel +// tracing support (PR #2302). +func TestExecuteStandaloneActivity_InterceptorTestProxy(t *testing.T) { + var rec callRecordingInterceptor + + var suite WorkflowTestSuite + env := suite.NewTestWorkflowEnvironment() + env.SetWorkerOptions(WorkerOptions{ + Interceptors: []WorkerInterceptor{&rec}, + }) + env.RegisterActivity(testStandaloneGreetActivity) + + handle, err := env.ExecuteStandaloneActivity( + context.Background(), + ClientStartActivityOptions{ + ID: "proxy-activity", + TaskQueue: "test-queue", + StartToCloseTimeout: 10 * 60 * 1e9, + }, + testStandaloneGreetActivity, + "Proxy", + ) + require.NoError(t, err) + require.NotNil(t, handle) + + var result string + err = handle.Get(context.Background(), &result) + require.NoError(t, err) + require.Equal(t, "Hello, Proxy!", result) + + // Verify the interceptor recorded the call with correct method and args + require.True(t, rec.executeActivityCalled) + require.Equal(t, "testStandaloneGreetActivity", rec.activityType) + require.NotNil(t, rec.options) + require.Equal(t, "proxy-activity", rec.options.ID) +} + +// callRecordingInterceptor is a lightweight recording interceptor that +// captures ExecuteActivity calls including the full input options. +type callRecordingInterceptor struct { + WorkerInterceptorBase + ClientInterceptorBase + executeActivityCalled bool + activityType string + options *ClientStartActivityOptions +} + +func (c *callRecordingInterceptor) InterceptClient(next ClientOutboundInterceptor) ClientOutboundInterceptor { + return &callRecordingOutbound{ + ClientOutboundInterceptorBase: ClientOutboundInterceptorBase{Next: next}, + rec: c, + } +} + +type callRecordingOutbound struct { + ClientOutboundInterceptorBase + rec *callRecordingInterceptor +} + +func (c *callRecordingOutbound) ExecuteActivity( + ctx context.Context, + in *ClientExecuteActivityInput, +) (ClientActivityHandle, error) { + c.rec.executeActivityCalled = true + c.rec.activityType = in.ActivityType + c.rec.options = in.Options + return c.Next.ExecuteActivity(ctx, in) +} + +// TestExecuteStandaloneActivity_MultipleInterceptors verifies that multiple +// interceptors are chained correctly, with the outermost interceptor called +// first (matching the real client's behavior). +func TestExecuteStandaloneActivity_MultipleInterceptors(t *testing.T) { + var callOrder []string + + outer := &orderTrackingInterceptor{name: "outer", callOrder: &callOrder} + inner := &orderTrackingInterceptor{name: "inner", callOrder: &callOrder} + + var suite WorkflowTestSuite + env := suite.NewTestWorkflowEnvironment() + env.SetWorkerOptions(WorkerOptions{ + // Earlier interceptors wrap later ones + Interceptors: []WorkerInterceptor{outer, inner}, + }) + env.RegisterActivity(testStandaloneGreetActivity) + + handle, err := env.ExecuteStandaloneActivity( + context.Background(), + ClientStartActivityOptions{}, + testStandaloneGreetActivity, + "Multi", + ) + require.NoError(t, err) + + var result string + err = handle.Get(context.Background(), &result) + require.NoError(t, err) + require.Equal(t, "Hello, Multi!", result) + + // Outer interceptor should be called first + require.Equal(t, []string{"outer", "inner"}, callOrder) +} + +type orderTrackingInterceptor struct { + WorkerInterceptorBase + ClientInterceptorBase + name string + callOrder *[]string +} + +func (o *orderTrackingInterceptor) InterceptClient(next ClientOutboundInterceptor) ClientOutboundInterceptor { + return &orderTrackingOutbound{ + ClientOutboundInterceptorBase: ClientOutboundInterceptorBase{Next: next}, + interceptor: o, + } +} + +type orderTrackingOutbound struct { + ClientOutboundInterceptorBase + interceptor *orderTrackingInterceptor +} + +func (o *orderTrackingOutbound) ExecuteActivity( + ctx context.Context, + in *ClientExecuteActivityInput, +) (ClientActivityHandle, error) { + *o.interceptor.callOrder = append(*o.interceptor.callOrder, o.interceptor.name) + return o.Next.ExecuteActivity(ctx, in) +} + +func testStandaloneVoidActivity(ctx context.Context) error { + return nil +} + +// TestExecuteStandaloneActivity_VoidReturn verifies that activities with no +// return value (only error) work correctly. +func TestExecuteStandaloneActivity_VoidReturn(t *testing.T) { + var suite WorkflowTestSuite + env := suite.NewTestWorkflowEnvironment() + env.RegisterActivity(testStandaloneVoidActivity) + + handle, err := env.ExecuteStandaloneActivity( + context.Background(), + ClientStartActivityOptions{}, + testStandaloneVoidActivity, + ) + require.NoError(t, err) + require.NotNil(t, handle) + + // Get with nil valuePtr should succeed + err = handle.Get(context.Background(), nil) + require.NoError(t, err) +} + +func testStandaloneMultiArgActivity(ctx context.Context, first string, second int, third bool) (string, error) { + if third { + return fmt.Sprintf("%s-%d-true", first, second), nil + } + return fmt.Sprintf("%s-%d-false", first, second), nil +} + +// TestExecuteStandaloneActivity_MultipleArgs verifies that activities with +// multiple arguments are encoded and passed correctly. +func TestExecuteStandaloneActivity_MultipleArgs(t *testing.T) { + var suite WorkflowTestSuite + env := suite.NewTestWorkflowEnvironment() + env.RegisterActivity(testStandaloneMultiArgActivity) + + handle, err := env.ExecuteStandaloneActivity( + context.Background(), + ClientStartActivityOptions{}, + testStandaloneMultiArgActivity, + "hello", 42, true, + ) + require.NoError(t, err) + + var result string + err = handle.Get(context.Background(), &result) + require.NoError(t, err) + require.Equal(t, "hello-42-true", result) +}