From 55562689b7449db19fb883179d8e81db00e95911 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Mon, 11 May 2026 10:26:30 -0700 Subject: [PATCH 1/8] Dispatch cancel command to worker for standalone activities When a standalone activity's cancellation is requested or it is terminated while running on a worker, proactively dispatch a cancel command via the Nexus worker commands control queue. This avoids relying on the worker to discover cancellation only through heartbeat responses. Changes: - Add worker_control_task_queue field to ActivityAttemptState proto - Store control queue from poll request in TransitionStarted - Add CancelCommandDispatchTask side-effect task - Schedule dispatch task on cancel request and terminate - Dispatch cancel command via Nexus to matching service Co-Authored-By: Claude Opus 4.6 --- chasm/lib/activity/activity.go | 50 +++++ chasm/lib/activity/activity_tasks.go | 181 ++++++++++++++++++ chasm/lib/activity/fx.go | 1 + .../gen/activitypb/v1/activity_state.pb.go | 20 +- .../gen/activitypb/v1/tasks.go-helpers.pb.go | 37 ++++ .../activity/gen/activitypb/v1/tasks.pb.go | 46 ++++- chasm/lib/activity/library.go | 33 ++-- .../activity/proto/v1/activity_state.proto | 4 + chasm/lib/activity/proto/v1/tasks.proto | 4 + chasm/lib/activity/statemachine.go | 1 + 10 files changed, 357 insertions(+), 20 deletions(-) diff --git a/chasm/lib/activity/activity.go b/chasm/lib/activity/activity.go index df45a9ac490..afb3a676946 100644 --- a/chasm/lib/activity/activity.go +++ b/chasm/lib/activity/activity.go @@ -201,6 +201,28 @@ func (a *Activity) createAddActivityTaskRequest(ctx chasm.Context, namespaceID s }, nil } +// buildCancelCommandTaskToken builds the serialized task token for a cancel command. +// This token matches what the worker received when the activity was dispatched. +func (a *Activity) buildCancelCommandTaskToken(ctx chasm.Context, activityRef chasm.ComponentRef) ([]byte, error) { + componentRefBytes, err := ctx.Ref(a) + if err != nil { + return nil, err + } + + attempt := a.LastAttempt.Get(ctx) + key := ctx.ExecutionKey() + + token := &tokenspb.Task{ + NamespaceId: key.NamespaceID, + ActivityId: key.BusinessID, + ActivityType: a.GetActivityType().GetName(), + Attempt: attempt.GetCount(), + ComponentRef: componentRefBytes, + } + + return token.Marshal() +} + // HandleStarted updates the activity on recording activity task started and populates the response. func (a *Activity) HandleStarted(ctx chasm.MutableContext, request *historyservice.RecordActivityTaskStartedRequest) ( *historyservice.RecordActivityTaskStartedResponse, error, @@ -505,6 +527,13 @@ func (a *Activity) Terminate( return chasm.TerminateComponentResponse{}, nil } + // If the activity is running on a worker, proactively notify the worker via Nexus. + // Must be done before the transition since it checks current status. + if a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_STARTED || + a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED { + a.addCancelCommandDispatchTask(ctx) + } + metricsHandler, err := a.enrichMetricsHandler(ctx, metrics.ActivityTerminatedScope) if err != nil { return chasm.TerminateComponentResponse{}, err @@ -527,6 +556,23 @@ func (a *Activity) getOrCreateLastHeartbeat(ctx chasm.MutableContext) *activityp return heartbeat } +// addCancelCommandDispatchTask schedules a side-effect task to dispatch a cancel command to the +// worker via the Nexus worker commands control queue. No-op if the worker doesn't support worker +// commands (i.e., has no control queue). +func (a *Activity) addCancelCommandDispatchTask(ctx chasm.MutableContext) { + controlQueue := a.LastAttempt.Get(ctx).GetWorkerControlTaskQueue() + if controlQueue == "" { + return + } + ctx.AddTask( + a, + chasm.TaskAttributes{ + Destination: controlQueue, + }, + &activitypb.CancelCommandDispatchTask{}, + ) +} + func (a *Activity) handleCancellationRequested(ctx chasm.MutableContext, request *activitypb.RequestCancelActivityExecutionRequest) ( *activitypb.RequestCancelActivityExecutionResponse, error, ) { @@ -551,6 +597,10 @@ func (a *Activity) handleCancellationRequested(ctx chasm.MutableContext, request return nil, err } + if !isCancelImmediately { + a.addCancelCommandDispatchTask(ctx) + } + if isCancelImmediately { details := &commonpb.Payloads{ Payloads: []*commonpb.Payload{ diff --git a/chasm/lib/activity/activity_tasks.go b/chasm/lib/activity/activity_tasks.go index e22b2f586a6..1339a2ab930 100644 --- a/chasm/lib/activity/activity_tasks.go +++ b/chasm/lib/activity/activity_tasks.go @@ -2,14 +2,30 @@ package activity import ( "context" + "errors" + "fmt" + "time" + "github.com/nexus-rpc/sdk-go/nexus" + commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" + nexuspb "go.temporal.io/api/nexus/v1" + workerservicepb "go.temporal.io/api/nexusservices/workerservice/v1" + taskqueuepb "go.temporal.io/api/taskqueue/v1" + workerpb "go.temporal.io/api/worker/v1" + "go.temporal.io/sdk/temporal" + "go.temporal.io/server/api/matchingservice/v1" "go.temporal.io/server/chasm" "go.temporal.io/server/chasm/lib/activity/gen/activitypb/v1" + "go.temporal.io/server/common/debug" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/resource" "go.temporal.io/server/common/util" + "go.temporal.io/server/service/history/configs" "go.uber.org/fx" + "google.golang.org/protobuf/proto" ) type activityDispatchTaskHandlerOptions struct { @@ -277,3 +293,168 @@ func (h *heartbeatTimeoutTaskHandler) Execute( fromStatus: activity.GetStatus(), }) } + +// cancelCommandDispatchTaskHandler dispatches a cancel command to the worker via the Nexus +// worker commands control queue. This is a best-effort mechanism — the activity will eventually +// time out if the worker doesn't respond. +type cancelCommandDispatchTaskHandler struct { + chasm.SideEffectTaskHandlerBase[*activitypb.CancelCommandDispatchTask] + opts cancelCommandDispatchTaskHandlerOptions +} + +type cancelCommandDispatchTaskHandlerOptions struct { + fx.In + + MatchingClient resource.MatchingClient + Config *configs.Config + MetricsHandler metrics.Handler + Logger log.Logger +} + +func newCancelCommandDispatchTaskHandler(opts cancelCommandDispatchTaskHandlerOptions) *cancelCommandDispatchTaskHandler { + return &cancelCommandDispatchTaskHandler{opts: opts} +} + +func (h *cancelCommandDispatchTaskHandler) Validate( + _ chasm.Context, + activity *Activity, + _ chasm.TaskAttributes, + _ *activitypb.CancelCommandDispatchTask, +) (bool, error) { + // Valid if the activity is in a state where it has been requested to cancel or terminated + // (meaning it was running on a worker when the cancel/terminate was issued). + return activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED || + activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_TERMINATED, nil +} + +const ( + cancelCommandDispatchTimeout = time.Second * 10 * debug.TimeoutMultiplier + cancelCommandDispatchMaxAttempt = 3 + + workerCommandsServiceName = "temporal.api.nexusservices.workerservice.v1.WorkerService" + workerCommandsOperationName = "ExecuteCommands" +) + +func (h *cancelCommandDispatchTaskHandler) Execute( + ctx context.Context, + activityRef chasm.ComponentRef, + taskAttrs chasm.TaskAttributes, + _ *activitypb.CancelCommandDispatchTask, +) error { + if !h.opts.Config.EnableCancelActivityWorkerCommand() { + return nil + } + + // Read the activity to build the task token for the cancel command. + taskToken, err := chasm.ReadComponent( + ctx, + activityRef, + (*Activity).buildCancelCommandTaskToken, + activityRef, + ) + if err != nil { + return err + } + + command := &workerpb.WorkerCommand{ + Type: &workerpb.WorkerCommand_CancelActivity{ + CancelActivity: &workerpb.CancelActivityCommand{ + TaskToken: taskToken, + }, + }, + } + + return h.dispatchToWorker(ctx, activityRef.NamespaceID, taskAttrs.Destination, []*workerpb.WorkerCommand{command}) +} + +func (h *cancelCommandDispatchTaskHandler) dispatchToWorker( + ctx context.Context, + namespaceID string, + controlQueue string, + commands []*workerpb.WorkerCommand, +) error { + ctx, cancel := context.WithTimeout(ctx, cancelCommandDispatchTimeout) + defer cancel() + + request := &workerservicepb.ExecuteCommandsRequest{ + Commands: commands, + } + requestData, err := proto.Marshal(request) + if err != nil { + return fmt.Errorf("failed to encode worker commands request: %w", err) + } + requestPayload := &commonpb.Payload{ + Metadata: map[string][]byte{ + "encoding": []byte("binary/protobuf"), + }, + Data: requestData, + } + + nexusRequest := &nexuspb.Request{ + Header: map[string]string{}, + Variant: &nexuspb.Request_StartOperation{ + StartOperation: &nexuspb.StartOperationRequest{ + Service: workerCommandsServiceName, + Operation: workerCommandsOperationName, + Payload: requestPayload, + }, + }, + } + + resp, err := h.opts.MatchingClient.DispatchNexusTask(ctx, &matchingservice.DispatchNexusTaskRequest{ + NamespaceId: namespaceID, + TaskQueue: &taskqueuepb.TaskQueue{ + Name: controlQueue, + Kind: enumspb.TASK_QUEUE_KIND_NORMAL, + }, + Request: nexusRequest, + }) + if err != nil { + h.opts.Logger.Warn("Failed to dispatch cancel command", + tag.NewStringTag("control_queue", controlQueue), + tag.Error(err)) + metrics.WorkerCommandsSent.With(h.opts.MetricsHandler).Record(1, metrics.OutcomeTag("rpc_error")) + return err + } + + nexusErr := cancelCommandDispatchResponseToError(resp) + if nexusErr == nil { + metrics.WorkerCommandsSent.With(h.opts.MetricsHandler).Record(1, metrics.OutcomeTag("success")) + return nil + } + + // Non-retryable errors are dropped — the activity will eventually time out. + var handlerErr *nexus.HandlerError + if errors.As(nexusErr, &handlerErr) && !handlerErr.Retryable() { + h.opts.Logger.Error("Cancel command non-retryable error", + tag.NewStringTag("control_queue", controlQueue), + tag.Error(nexusErr)) + metrics.WorkerCommandsSent.With(h.opts.MetricsHandler).Record(1, metrics.OutcomeTag("non_retryable_error")) + return nil + } + + metrics.WorkerCommandsSent.With(h.opts.MetricsHandler).Record(1, metrics.OutcomeTag("transport_error")) + return nexusErr +} + +// cancelCommandDispatchResponseToError converts a DispatchNexusTaskResponse into a Go error. +func cancelCommandDispatchResponseToError(resp *matchingservice.DispatchNexusTaskResponse) error { + switch t := resp.GetOutcome().(type) { + case *matchingservice.DispatchNexusTaskResponse_Failure: + return temporal.GetDefaultFailureConverter().FailureToError(t.Failure) + case *matchingservice.DispatchNexusTaskResponse_RequestTimeout: + return nexus.NewHandlerErrorf(nexus.HandlerErrorTypeUpstreamTimeout, "upstream timeout") + case *matchingservice.DispatchNexusTaskResponse_Response: + startResp := t.Response.GetStartOperation() + switch startResp.GetVariant().(type) { + case *nexuspb.StartOperationResponse_SyncSuccess, *nexuspb.StartOperationResponse_AsyncSuccess: + return nil + case *nexuspb.StartOperationResponse_Failure: + return temporal.GetDefaultFailureConverter().FailureToError(startResp.GetFailure()) + default: + return nexus.NewHandlerErrorf(nexus.HandlerErrorTypeInternal, "unknown start operation response") + } + default: + return nexus.NewHandlerErrorf(nexus.HandlerErrorTypeInternal, "empty or unknown dispatch outcome") + } +} diff --git a/chasm/lib/activity/fx.go b/chasm/lib/activity/fx.go index 905042382c2..0639862b381 100644 --- a/chasm/lib/activity/fx.go +++ b/chasm/lib/activity/fx.go @@ -12,6 +12,7 @@ var HistoryModule = fx.Module( fx.Provide( ConfigProvider, newActivityDispatchTaskHandler, + newCancelCommandDispatchTaskHandler, newScheduleToStartTimeoutTaskHandler, newScheduleToCloseTimeoutTaskHandler, newStartToCloseTimeoutTaskHandler, diff --git a/chasm/lib/activity/gen/activitypb/v1/activity_state.pb.go b/chasm/lib/activity/gen/activitypb/v1/activity_state.pb.go index 3e95ee84f59..90034d167b6 100644 --- a/chasm/lib/activity/gen/activitypb/v1/activity_state.pb.go +++ b/chasm/lib/activity/gen/activitypb/v1/activity_state.pb.go @@ -456,8 +456,11 @@ type ActivityAttemptState struct { // The request ID that came from matching's RecordActivityTaskStarted API call. Used to make this API idempotent in // case of implicit retries. StartRequestId string `protobuf:"bytes,9,opt,name=start_request_id,json=startRequestId,proto3" json:"start_request_id,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + // The worker's control task queue for sending commands (e.g. cancel) via Nexus. + // Set when the worker reports it during poll. Empty if the worker doesn't support worker commands. + WorkerControlTaskQueue string `protobuf:"bytes,10,opt,name=worker_control_task_queue,json=workerControlTaskQueue,proto3" json:"worker_control_task_queue,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *ActivityAttemptState) Reset() { @@ -553,6 +556,13 @@ func (x *ActivityAttemptState) GetStartRequestId() string { return "" } +func (x *ActivityAttemptState) GetWorkerControlTaskQueue() string { + if x != nil { + return x.WorkerControlTaskQueue + } + return "" +} + type ActivityHeartbeatState struct { state protoimpl.MessageState `protogen:"open.v1"` // Details provided in the last recorded activity heartbeat. @@ -934,7 +944,7 @@ const file_temporal_server_chasm_lib_activity_proto_v1_activity_state_proto_rawD "\x06reason\x18\x04 \x01(\tR\x06reason\"7\n" + "\x16ActivityTerminateState\x12\x1d\n" + "\n" + - "request_id\x18\x01 \x01(\tR\trequestId\"\xe8\x05\n" + + "request_id\x18\x01 \x01(\tR\trequestId\"\xa3\x06\n" + "\x14ActivityAttemptState\x12\x14\n" + "\x05count\x18\x01 \x01(\x05R\x05count\x12O\n" + "\x16current_retry_interval\x18\x02 \x01(\v2\x19.google.protobuf.DurationR\x14currentRetryInterval\x12=\n" + @@ -944,7 +954,9 @@ const file_temporal_server_chasm_lib_activity_proto_v1_activity_state_proto_rawD "\x05stamp\x18\x06 \x01(\x05R\x05stamp\x120\n" + "\x14last_worker_identity\x18\a \x01(\tR\x12lastWorkerIdentity\x12k\n" + "\x17last_deployment_version\x18\b \x01(\v23.temporal.api.deployment.v1.WorkerDeploymentVersionR\x15lastDeploymentVersion\x12(\n" + - "\x10start_request_id\x18\t \x01(\tR\x0estartRequestId\x1a\x80\x01\n" + + "\x10start_request_id\x18\t \x01(\tR\x0estartRequestId\x129\n" + + "\x19worker_control_task_queue\x18\n" + + " \x01(\tR\x16workerControlTaskQueue\x1a\x80\x01\n" + "\x12LastFailureDetails\x12.\n" + "\x04time\x18\x01 \x01(\v2\x1a.google.protobuf.TimestampR\x04time\x12:\n" + "\afailure\x18\x02 \x01(\v2 .temporal.api.failure.v1.FailureR\afailure\"\xc9\x01\n" + diff --git a/chasm/lib/activity/gen/activitypb/v1/tasks.go-helpers.pb.go b/chasm/lib/activity/gen/activitypb/v1/tasks.go-helpers.pb.go index d7628a6e9e6..a4173d9659f 100644 --- a/chasm/lib/activity/gen/activitypb/v1/tasks.go-helpers.pb.go +++ b/chasm/lib/activity/gen/activitypb/v1/tasks.go-helpers.pb.go @@ -189,3 +189,40 @@ func (this *HeartbeatTimeoutTask) Equal(that interface{}) bool { return proto.Equal(this, that1) } + +// Marshal an object of type CancelCommandDispatchTask to the protobuf v3 wire format +func (val *CancelCommandDispatchTask) Marshal() ([]byte, error) { + return proto.Marshal(val) +} + +// Unmarshal an object of type CancelCommandDispatchTask from the protobuf v3 wire format +func (val *CancelCommandDispatchTask) Unmarshal(buf []byte) error { + return proto.Unmarshal(buf, val) +} + +// Size returns the size of the object, in bytes, once serialized +func (val *CancelCommandDispatchTask) Size() int { + return proto.Size(val) +} + +// Equal returns whether two CancelCommandDispatchTask values are equivalent by recursively +// comparing the message's fields. +// For more information see the documentation for +// https://pkg.go.dev/google.golang.org/protobuf/proto#Equal +func (this *CancelCommandDispatchTask) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + var that1 *CancelCommandDispatchTask + switch t := that.(type) { + case *CancelCommandDispatchTask: + that1 = t + case CancelCommandDispatchTask: + that1 = &t + default: + return false + } + + return proto.Equal(this, that1) +} diff --git a/chasm/lib/activity/gen/activitypb/v1/tasks.pb.go b/chasm/lib/activity/gen/activitypb/v1/tasks.pb.go index 796574e7db2..23fc96a8db5 100644 --- a/chasm/lib/activity/gen/activitypb/v1/tasks.pb.go +++ b/chasm/lib/activity/gen/activitypb/v1/tasks.pb.go @@ -239,6 +239,44 @@ func (x *HeartbeatTimeoutTask) GetStamp() int32 { return 0 } +// CancelCommandDispatchTask is a side-effect task that dispatches a cancel command to the worker +// via the Nexus worker commands control queue. +type CancelCommandDispatchTask struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *CancelCommandDispatchTask) Reset() { + *x = CancelCommandDispatchTask{} + mi := &file_temporal_server_chasm_lib_activity_proto_v1_tasks_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *CancelCommandDispatchTask) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CancelCommandDispatchTask) ProtoMessage() {} + +func (x *CancelCommandDispatchTask) ProtoReflect() protoreflect.Message { + mi := &file_temporal_server_chasm_lib_activity_proto_v1_tasks_proto_msgTypes[5] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CancelCommandDispatchTask.ProtoReflect.Descriptor instead. +func (*CancelCommandDispatchTask) Descriptor() ([]byte, []int) { + return file_temporal_server_chasm_lib_activity_proto_v1_tasks_proto_rawDescGZIP(), []int{5} +} + var File_temporal_server_chasm_lib_activity_proto_v1_tasks_proto protoreflect.FileDescriptor const file_temporal_server_chasm_lib_activity_proto_v1_tasks_proto_rawDesc = "" + @@ -252,7 +290,8 @@ const file_temporal_server_chasm_lib_activity_proto_v1_tasks_proto_rawDesc = "" "\x17StartToCloseTimeoutTask\x12\x14\n" + "\x05stamp\x18\x01 \x01(\x05R\x05stamp\",\n" + "\x14HeartbeatTimeoutTask\x12\x14\n" + - "\x05stamp\x18\x01 \x01(\x05R\x05stampBDZBgo.temporal.io/server/chasm/lib/activity/gen/activitypb;activitypbb\x06proto3" + "\x05stamp\x18\x01 \x01(\x05R\x05stamp\"\x1b\n" + + "\x19CancelCommandDispatchTaskBDZBgo.temporal.io/server/chasm/lib/activity/gen/activitypb;activitypbb\x06proto3" var ( file_temporal_server_chasm_lib_activity_proto_v1_tasks_proto_rawDescOnce sync.Once @@ -266,13 +305,14 @@ func file_temporal_server_chasm_lib_activity_proto_v1_tasks_proto_rawDescGZIP() return file_temporal_server_chasm_lib_activity_proto_v1_tasks_proto_rawDescData } -var file_temporal_server_chasm_lib_activity_proto_v1_tasks_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_temporal_server_chasm_lib_activity_proto_v1_tasks_proto_msgTypes = make([]protoimpl.MessageInfo, 6) var file_temporal_server_chasm_lib_activity_proto_v1_tasks_proto_goTypes = []any{ (*ActivityDispatchTask)(nil), // 0: temporal.server.chasm.lib.activity.proto.v1.ActivityDispatchTask (*ScheduleToStartTimeoutTask)(nil), // 1: temporal.server.chasm.lib.activity.proto.v1.ScheduleToStartTimeoutTask (*ScheduleToCloseTimeoutTask)(nil), // 2: temporal.server.chasm.lib.activity.proto.v1.ScheduleToCloseTimeoutTask (*StartToCloseTimeoutTask)(nil), // 3: temporal.server.chasm.lib.activity.proto.v1.StartToCloseTimeoutTask (*HeartbeatTimeoutTask)(nil), // 4: temporal.server.chasm.lib.activity.proto.v1.HeartbeatTimeoutTask + (*CancelCommandDispatchTask)(nil), // 5: temporal.server.chasm.lib.activity.proto.v1.CancelCommandDispatchTask } var file_temporal_server_chasm_lib_activity_proto_v1_tasks_proto_depIdxs = []int32{ 0, // [0:0] is the sub-list for method output_type @@ -293,7 +333,7 @@ func file_temporal_server_chasm_lib_activity_proto_v1_tasks_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_temporal_server_chasm_lib_activity_proto_v1_tasks_proto_rawDesc), len(file_temporal_server_chasm_lib_activity_proto_v1_tasks_proto_rawDesc)), NumEnums: 0, - NumMessages: 5, + NumMessages: 6, NumExtensions: 0, NumServices: 0, }, diff --git a/chasm/lib/activity/library.go b/chasm/lib/activity/library.go index 83e3d9067af..cb128c20111 100644 --- a/chasm/lib/activity/library.go +++ b/chasm/lib/activity/library.go @@ -77,17 +77,19 @@ func (l *componentOnlyLibrary) Components() []*chasm.RegistrableComponent { type library struct { componentOnlyLibrary - handler *handler - activityDispatchTaskHandler *activityDispatchTaskHandler - scheduleToStartTimeoutTaskHandler *scheduleToStartTimeoutTaskHandler - scheduleToCloseTimeoutTaskHandler *scheduleToCloseTimeoutTaskHandler - startToCloseTimeoutTaskHandler *startToCloseTimeoutTaskHandler - heartbeatTimeoutTaskHandler *heartbeatTimeoutTaskHandler + handler *handler + activityDispatchTaskHandler *activityDispatchTaskHandler + cancelCommandDispatchTaskHandler *cancelCommandDispatchTaskHandler + scheduleToStartTimeoutTaskHandler *scheduleToStartTimeoutTaskHandler + scheduleToCloseTimeoutTaskHandler *scheduleToCloseTimeoutTaskHandler + startToCloseTimeoutTaskHandler *startToCloseTimeoutTaskHandler + heartbeatTimeoutTaskHandler *heartbeatTimeoutTaskHandler } func newLibrary( handler *handler, activityDispatchTaskHandler *activityDispatchTaskHandler, + cancelCommandDispatchTaskHandler *cancelCommandDispatchTaskHandler, scheduleToStartTimeoutTaskHandler *scheduleToStartTimeoutTaskHandler, scheduleToCloseTimeoutTaskHandler *scheduleToCloseTimeoutTaskHandler, startToCloseTimeoutTaskHandler *startToCloseTimeoutTaskHandler, @@ -96,13 +98,14 @@ func newLibrary( namespaceRegistry namespace.Registry, ) *library { return &library{ - componentOnlyLibrary: *newComponentOnlyLibrary(config, namespaceRegistry), - handler: handler, - activityDispatchTaskHandler: activityDispatchTaskHandler, - scheduleToStartTimeoutTaskHandler: scheduleToStartTimeoutTaskHandler, - scheduleToCloseTimeoutTaskHandler: scheduleToCloseTimeoutTaskHandler, - startToCloseTimeoutTaskHandler: startToCloseTimeoutTaskHandler, - heartbeatTimeoutTaskHandler: heartbeatTimeoutTaskHandler, + componentOnlyLibrary: *newComponentOnlyLibrary(config, namespaceRegistry), + handler: handler, + activityDispatchTaskHandler: activityDispatchTaskHandler, + cancelCommandDispatchTaskHandler: cancelCommandDispatchTaskHandler, + scheduleToStartTimeoutTaskHandler: scheduleToStartTimeoutTaskHandler, + scheduleToCloseTimeoutTaskHandler: scheduleToCloseTimeoutTaskHandler, + startToCloseTimeoutTaskHandler: startToCloseTimeoutTaskHandler, + heartbeatTimeoutTaskHandler: heartbeatTimeoutTaskHandler, } } @@ -132,5 +135,9 @@ func (l *library) Tasks() []*chasm.RegistrableTask { "heartbeatTimer", l.heartbeatTimeoutTaskHandler, ), + chasm.NewRegistrableSideEffectTask( + "cancelCommandDispatch", + l.cancelCommandDispatchTaskHandler, + ), } } diff --git a/chasm/lib/activity/proto/v1/activity_state.proto b/chasm/lib/activity/proto/v1/activity_state.proto index 931afb0b881..7519ff46be0 100644 --- a/chasm/lib/activity/proto/v1/activity_state.proto +++ b/chasm/lib/activity/proto/v1/activity_state.proto @@ -155,6 +155,10 @@ message ActivityAttemptState { // The request ID that came from matching's RecordActivityTaskStarted API call. Used to make this API idempotent in // case of implicit retries. string start_request_id = 9; + + // The worker's control task queue for sending commands (e.g. cancel) via Nexus. + // Set when the worker reports it during poll. Empty if the worker doesn't support worker commands. + string worker_control_task_queue = 10; } message ActivityHeartbeatState { diff --git a/chasm/lib/activity/proto/v1/tasks.proto b/chasm/lib/activity/proto/v1/tasks.proto index 9a1996e3dd2..70dd3ea992a 100644 --- a/chasm/lib/activity/proto/v1/tasks.proto +++ b/chasm/lib/activity/proto/v1/tasks.proto @@ -26,3 +26,7 @@ message HeartbeatTimeoutTask { // The current stamp for this activity execution. Used for task validation. See also [ActivityAttemptState]. int32 stamp = 1; } + +// CancelCommandDispatchTask is a side-effect task that dispatches a cancel command to the worker +// via the Nexus worker commands control queue. +message CancelCommandDispatchTask {} diff --git a/chasm/lib/activity/statemachine.go b/chasm/lib/activity/statemachine.go index b594e56a6d1..5cd7e9c8a86 100644 --- a/chasm/lib/activity/statemachine.go +++ b/chasm/lib/activity/statemachine.go @@ -146,6 +146,7 @@ var TransitionStarted = chasm.NewTransition( attempt.StartedTime = timestamppb.New(ctx.Now(a)) attempt.StartRequestId = request.GetRequestId() attempt.LastWorkerIdentity = request.GetPollRequest().GetIdentity() + attempt.WorkerControlTaskQueue = request.GetPollRequest().GetWorkerControlTaskQueue() if versionDirective := request.GetVersionDirective().GetDeploymentVersion(); versionDirective != nil { attempt.LastDeploymentVersion = &deploymentpb.WorkerDeploymentVersion{ BuildId: versionDirective.GetBuildId(), From cd991a370d59a188a06cb16d62b542bf07c5ef06 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Mon, 11 May 2026 10:31:32 -0700 Subject: [PATCH 2/8] Add unit tests for standalone activity cancel command dispatch Co-Authored-By: Claude Opus 4.6 --- chasm/lib/activity/activity_test.go | 212 ++++++++++++++++++++++++++++ 1 file changed, 212 insertions(+) diff --git a/chasm/lib/activity/activity_test.go b/chasm/lib/activity/activity_test.go index b614c3eba7a..f7797afe6e2 100644 --- a/chasm/lib/activity/activity_test.go +++ b/chasm/lib/activity/activity_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/require" commonpb "go.temporal.io/api/common/v1" taskqueuepb "go.temporal.io/api/taskqueue/v1" + "go.temporal.io/api/workflowservice/v1" "go.temporal.io/server/api/historyservice/v1" "go.temporal.io/server/chasm" "go.temporal.io/server/chasm/lib/activity/gen/activitypb/v1" @@ -305,3 +306,214 @@ func TestContextMetadata(t *testing.T) { require.Nil(t, md) }) } + +func TestTransitionStartedStoresWorkerControlTaskQueue(t *testing.T) { + testTime := time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC) + ctx := &chasm.MockMutableContext{ + MockContext: chasm.MockContext{ + HandleNow: func(chasm.Component) time.Time { return testTime }, + HandleExecutionKey: func() chasm.ExecutionKey { + return chasm.ExecutionKey{BusinessID: "test-activity-id", RunID: "test-run-id"} + }, + }, + } + + attemptState := &activitypb.ActivityAttemptState{Count: 1, Stamp: 1} + a := &Activity{ + ActivityState: &activitypb.ActivityState{ + ActivityType: &commonpb.ActivityType{Name: "test-type"}, + Status: activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, + TaskQueue: &taskqueuepb.TaskQueue{Name: "test-queue"}, + StartToCloseTimeout: durationpb.New(3 * time.Minute), + }, + LastAttempt: chasm.NewDataField(ctx, attemptState), + RequestData: chasm.NewDataField(ctx, &activitypb.ActivityRequestData{}), + Outcome: chasm.NewDataField(ctx, &activitypb.ActivityOutcome{}), + } + + request := &historyservice.RecordActivityTaskStartedRequest{ + Stamp: 1, + RequestId: "req-1", + PollRequest: &workflowservice.PollActivityTaskQueueRequest{ + WorkerControlTaskQueue: "test-control-queue", + }, + } + + _, err := a.HandleStarted(ctx, request) + require.NoError(t, err) + require.Equal(t, "test-control-queue", a.LastAttempt.Get(ctx).GetWorkerControlTaskQueue()) +} + +func TestCancelRequestDispatchesCancelCommand(t *testing.T) { + testTime := time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC) + + testCases := []struct { + name string + activityStatus activitypb.ActivityExecutionStatus + controlQueue string + expectDispatchTask bool + }{ + { + name: "started with control queue dispatches cancel task", + activityStatus: activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, + controlQueue: "test-control-queue", + expectDispatchTask: true, + }, + { + name: "started without control queue does not dispatch", + activityStatus: activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, + controlQueue: "", + expectDispatchTask: false, + }, + { + name: "scheduled cancels immediately, no dispatch", + activityStatus: activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, + controlQueue: "", + expectDispatchTask: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + nsRegistry := namespace.NewMockRegistry(ctrl) + nsRegistry.EXPECT().GetNamespaceName(gomock.Any()).Return(namespace.Name("test-ns"), nil).AnyTimes() + + ctx := &chasm.MockMutableContext{ + MockContext: chasm.MockContext{ + HandleNow: func(chasm.Component) time.Time { return testTime }, + GoCtx: context.WithValue(context.Background(), ctxKeyActivityContext, &activityContext{ + config: &Config{ + BreakdownMetricsByTaskQueue: dynamicconfig.GetBoolPropertyFnFilteredByTaskQueue(true), + }, + namespaceRegistry: nsRegistry, + }), + }, + } + + a := &Activity{ + ActivityState: &activitypb.ActivityState{ + ActivityType: &commonpb.ActivityType{Name: "test-type"}, + Status: tc.activityStatus, + TaskQueue: &taskqueuepb.TaskQueue{Name: "test-queue"}, + ScheduleToCloseTimeout: durationpb.New(10 * time.Minute), + StartToCloseTimeout: durationpb.New(3 * time.Minute), + }, + LastAttempt: chasm.NewDataField(ctx, &activitypb.ActivityAttemptState{ + Count: 1, + Stamp: 1, + WorkerControlTaskQueue: tc.controlQueue, + }), + Outcome: chasm.NewDataField(ctx, &activitypb.ActivityOutcome{}), + } + + req := &activitypb.RequestCancelActivityExecutionRequest{ + FrontendRequest: &workflowservice.RequestCancelActivityExecutionRequest{ + RequestId: "cancel-req-1", + Identity: "test-identity", + }, + } + _, err := a.handleCancellationRequested(ctx, req) + require.NoError(t, err) + + hasCancelTask := false + for _, task := range ctx.Tasks { + if _, ok := task.Payload.(*activitypb.CancelCommandDispatchTask); ok { + hasCancelTask = true + require.Equal(t, tc.controlQueue, task.Attributes.Destination) + } + } + require.Equal(t, tc.expectDispatchTask, hasCancelTask, + "expected dispatch task: %v, but found: %v", tc.expectDispatchTask, hasCancelTask) + }) + } +} + +func TestTerminateDispatchesCancelCommand(t *testing.T) { + testTime := time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC) + + testCases := []struct { + name string + activityStatus activitypb.ActivityExecutionStatus + controlQueue string + expectDispatchTask bool + }{ + { + name: "started with control queue dispatches cancel task", + activityStatus: activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, + controlQueue: "test-control-queue", + expectDispatchTask: true, + }, + { + name: "cancel_requested with control queue dispatches cancel task", + activityStatus: activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED, + controlQueue: "test-control-queue", + expectDispatchTask: true, + }, + { + name: "started without control queue does not dispatch", + activityStatus: activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, + controlQueue: "", + expectDispatchTask: false, + }, + { + name: "scheduled does not dispatch", + activityStatus: activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, + controlQueue: "", + expectDispatchTask: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + nsRegistry := namespace.NewMockRegistry(ctrl) + nsRegistry.EXPECT().GetNamespaceName(gomock.Any()).Return(namespace.Name("test-ns"), nil).AnyTimes() + + ctx := &chasm.MockMutableContext{ + MockContext: chasm.MockContext{ + HandleNow: func(chasm.Component) time.Time { return testTime }, + GoCtx: context.WithValue(context.Background(), ctxKeyActivityContext, &activityContext{ + config: &Config{ + BreakdownMetricsByTaskQueue: dynamicconfig.GetBoolPropertyFnFilteredByTaskQueue(true), + }, + namespaceRegistry: nsRegistry, + }), + }, + } + + a := &Activity{ + ActivityState: &activitypb.ActivityState{ + ActivityType: &commonpb.ActivityType{Name: "test-type"}, + Status: tc.activityStatus, + TaskQueue: &taskqueuepb.TaskQueue{Name: "test-queue"}, + ScheduleToCloseTimeout: durationpb.New(10 * time.Minute), + StartToCloseTimeout: durationpb.New(3 * time.Minute), + }, + LastAttempt: chasm.NewDataField(ctx, &activitypb.ActivityAttemptState{ + Count: 1, + Stamp: 1, + WorkerControlTaskQueue: tc.controlQueue, + }), + Outcome: chasm.NewDataField(ctx, &activitypb.ActivityOutcome{}), + } + + _, err := a.Terminate(ctx, chasm.TerminateComponentRequest{ + Reason: "test terminate", + }) + require.NoError(t, err) + + hasCancelTask := false + for _, task := range ctx.Tasks { + if _, ok := task.Payload.(*activitypb.CancelCommandDispatchTask); ok { + hasCancelTask = true + require.Equal(t, tc.controlQueue, task.Attributes.Destination) + } + } + require.Equal(t, tc.expectDispatchTask, hasCancelTask, + "expected dispatch task: %v, but found: %v", tc.expectDispatchTask, hasCancelTask) + }) + } +} From ae47ada3a65f2007a7333ba36863f99f1d77f61b Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Mon, 11 May 2026 10:49:35 -0700 Subject: [PATCH 3/8] Use TASK_QUEUE_KIND_WORKER_COMMANDS for cancel command dispatch Co-Authored-By: Claude Opus 4.6 --- chasm/lib/activity/activity_tasks.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chasm/lib/activity/activity_tasks.go b/chasm/lib/activity/activity_tasks.go index 1339a2ab930..9c222346a0e 100644 --- a/chasm/lib/activity/activity_tasks.go +++ b/chasm/lib/activity/activity_tasks.go @@ -405,7 +405,7 @@ func (h *cancelCommandDispatchTaskHandler) dispatchToWorker( NamespaceId: namespaceID, TaskQueue: &taskqueuepb.TaskQueue{ Name: controlQueue, - Kind: enumspb.TASK_QUEUE_KIND_NORMAL, + Kind: enumspb.TASK_QUEUE_KIND_WORKER_COMMANDS, }, Request: nexusRequest, }) From 94a9a6d2213bb279cb5a6306a840569ccd536485 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Mon, 11 May 2026 11:05:15 -0700 Subject: [PATCH 4/8] Remove unused constant and add TODO for duplicated dispatch helper Co-Authored-By: Claude Opus 4.6 --- chasm/lib/activity/activity_tasks.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/chasm/lib/activity/activity_tasks.go b/chasm/lib/activity/activity_tasks.go index 9c222346a0e..c6d5ababb0d 100644 --- a/chasm/lib/activity/activity_tasks.go +++ b/chasm/lib/activity/activity_tasks.go @@ -328,8 +328,7 @@ func (h *cancelCommandDispatchTaskHandler) Validate( } const ( - cancelCommandDispatchTimeout = time.Second * 10 * debug.TimeoutMultiplier - cancelCommandDispatchMaxAttempt = 3 + cancelCommandDispatchTimeout = time.Second * 10 * debug.TimeoutMultiplier workerCommandsServiceName = "temporal.api.nexusservices.workerservice.v1.WorkerService" workerCommandsOperationName = "ExecuteCommands" @@ -438,6 +437,7 @@ func (h *cancelCommandDispatchTaskHandler) dispatchToWorker( } // cancelCommandDispatchResponseToError converts a DispatchNexusTaskResponse into a Go error. +// TODO: consolidate with service/history.dispatchResponseToError into a shared package. func cancelCommandDispatchResponseToError(resp *matchingservice.DispatchNexusTaskResponse) error { switch t := resp.GetOutcome().(type) { case *matchingservice.DispatchNexusTaskResponse_Failure: From a1a2d84c8e37ea8d96e204989f2f17b9b7d31a28 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Mon, 11 May 2026 11:28:50 -0700 Subject: [PATCH 5/8] Add e2e tests for cancel command dispatch on standalone activities Tests both cancel-request and terminate paths: start activity with worker control queue, trigger cancellation/termination, verify cancel command arrives on the Nexus control queue with correct task token. Co-Authored-By: Claude Opus 4.6 --- tests/standalone_activity_test.go | 142 ++++++++++++++++++++++++++++++ 1 file changed, 142 insertions(+) diff --git a/tests/standalone_activity_test.go b/tests/standalone_activity_test.go index 49ce3fe4dc4..c36cdd319c2 100644 --- a/tests/standalone_activity_test.go +++ b/tests/standalone_activity_test.go @@ -15,6 +15,7 @@ import ( commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" failurepb "go.temporal.io/api/failure/v1" + workerservicepb "go.temporal.io/api/nexusservices/workerservice/v1" "go.temporal.io/api/operatorservice/v1" sdkpb "go.temporal.io/api/sdk/v1" "go.temporal.io/api/serviceerror" @@ -6328,3 +6329,144 @@ func (s *standaloneActivityTestSuite) TestCallbacks() { require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT, descResp.GetInfo().GetStatus()) }) } + +func (s *standaloneActivityTestSuite) TestDispatchCancelCommandToWorker() { + t := s.T() + ctx, cancel := context.WithTimeout(t.Context(), 30*time.Second) + defer cancel() + + s.OverrideDynamicConfig(dynamicconfig.EnableCancelActivityWorkerCommand, true) + + controlQueueName := s.tv.ControlQueueName(s.Namespace().String()) + + tokenSerializer := tasktoken.NewSerializer() + + // assertCancelTokenMatchesPoll verifies the cancel command's task token identifies the same + // activity as the poll response's token. The tokens won't be byte-identical because: + // 1. Matching builds poll tokens with additional fields (Clock, Version, etc.) + // 2. The ComponentRef version advances after state mutations (cancel/terminate) + // We compare the stable identity fields that the SDK uses to find the running activity. + assertCancelTokenMatchesPoll := func(t *testing.T, pollToken, cancelToken []byte) { + t.Helper() + pollTask, err := tokenSerializer.Deserialize(pollToken) + require.NoError(t, err) + cancelTask, err := tokenSerializer.Deserialize(cancelToken) + require.NoError(t, err) + require.Equal(t, pollTask.GetActivityId(), cancelTask.GetActivityId()) + require.Equal(t, pollTask.GetNamespaceId(), cancelTask.GetNamespaceId()) + require.Equal(t, pollTask.GetActivityType(), cancelTask.GetActivityType()) + require.Equal(t, pollTask.GetAttempt(), cancelTask.GetAttempt()) + require.NotEmpty(t, cancelTask.GetComponentRef(), "cancel token must have a ComponentRef") + } + + // pollNexusControlQueue polls the worker commands control queue for a cancel command and + // returns the decoded ExecuteCommandsRequest. Returns nil if no task is received. + pollNexusControlQueue := func() *workerservicepb.ExecuteCommandsRequest { + pollCtx, pollCancel := context.WithTimeout(ctx, 5*time.Second) + defer pollCancel() + resp, err := s.FrontendClient().PollNexusTaskQueue(pollCtx, &workflowservice.PollNexusTaskQueueRequest{ + Namespace: s.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{Name: controlQueueName, Kind: enumspb.TASK_QUEUE_KIND_WORKER_COMMANDS}, + Identity: s.tv.WorkerIdentity(), + }) + if err != nil || resp == nil || resp.Request == nil { + return nil + } + startOp := resp.Request.GetStartOperation() + if startOp == nil { + return nil + } + var executeReq workerservicepb.ExecuteCommandsRequest + if err := payload.Decode(startOp.Payload, &executeReq); err != nil { + return nil + } + return &executeReq + } + + t.Run("CancelRequest", func(t *testing.T) { + activityID := testcore.RandomizeStr(t.Name()) + taskQueue := testcore.RandomizeStr(t.Name()) + + startResp := s.startAndValidateActivity(ctx, t, activityID, taskQueue) + runID := startResp.RunId + + // Poll with a worker control task queue so the activity stores it. + pollTaskResp, err := s.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ + Namespace: s.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{ + Name: taskQueue, + Kind: enumspb.TASK_QUEUE_KIND_NORMAL, + }, + Identity: s.tv.WorkerIdentity(), + WorkerInstanceKey: s.tv.WorkerInstanceKey(), + WorkerControlTaskQueue: controlQueueName, + }) + require.NoError(t, err) + require.NotEmpty(t, pollTaskResp.TaskToken) + + // Request cancellation — should dispatch cancel command to the control queue. + _, err = s.FrontendClient().RequestCancelActivityExecution(ctx, &workflowservice.RequestCancelActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: runID, + Identity: "canceller", + RequestId: s.tv.RequestID(), + Reason: "test cancel", + }) + require.NoError(t, err) + + var executeReq *workerservicepb.ExecuteCommandsRequest + s.Eventually(func() bool { + executeReq = pollNexusControlQueue() + return executeReq != nil + }, 15*time.Second, 100*time.Millisecond, "cancel command not received on control queue") + + require.Len(t, executeReq.Commands, 1) + cancelCmd := executeReq.Commands[0].GetCancelActivity() + require.NotNil(t, cancelCmd, "expected CancelActivity command") + assertCancelTokenMatchesPoll(t, pollTaskResp.TaskToken, cancelCmd.TaskToken) + }) + + t.Run("Terminate", func(t *testing.T) { + activityID := testcore.RandomizeStr(t.Name()) + taskQueue := testcore.RandomizeStr(t.Name()) + + startResp := s.startAndValidateActivity(ctx, t, activityID, taskQueue) + runID := startResp.RunId + + // Poll with a worker control task queue. + pollTaskResp, err := s.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ + Namespace: s.Namespace().String(), + TaskQueue: &taskqueuepb.TaskQueue{ + Name: taskQueue, + Kind: enumspb.TASK_QUEUE_KIND_NORMAL, + }, + Identity: s.tv.WorkerIdentity(), + WorkerInstanceKey: s.tv.WorkerInstanceKey(), + WorkerControlTaskQueue: controlQueueName, + }) + require.NoError(t, err) + require.NotEmpty(t, pollTaskResp.TaskToken) + + // Terminate — should dispatch cancel command to the control queue. + _, err = s.FrontendClient().TerminateActivityExecution(ctx, &workflowservice.TerminateActivityExecutionRequest{ + Namespace: s.Namespace().String(), + ActivityId: activityID, + RunId: runID, + Reason: "test terminate", + Identity: "terminator", + }) + require.NoError(t, err) + + var executeReq *workerservicepb.ExecuteCommandsRequest + s.Eventually(func() bool { + executeReq = pollNexusControlQueue() + return executeReq != nil + }, 15*time.Second, 100*time.Millisecond, "cancel command not received on control queue after terminate") + + require.Len(t, executeReq.Commands, 1) + cancelCmd := executeReq.Commands[0].GetCancelActivity() + require.NotNil(t, cancelCmd, "expected CancelActivity command") + assertCancelTokenMatchesPoll(t, pollTaskResp.TaskToken, cancelCmd.TaskToken) + }) +} From d68c034c705d50c14a1f483f26d7c24e4e14fb1a Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Mon, 11 May 2026 11:43:30 -0700 Subject: [PATCH 6/8] Use shared DispatchResponseToError from common/nexus Replace the duplicated cancelCommandDispatchResponseToError with the shared commonnexus.DispatchResponseToError now available from the merged kannan/move-dispatch-response-to-error branch. Co-Authored-By: Claude Opus 4.6 --- chasm/lib/activity/activity_tasks.go | 27 ++------------------------- 1 file changed, 2 insertions(+), 25 deletions(-) diff --git a/chasm/lib/activity/activity_tasks.go b/chasm/lib/activity/activity_tasks.go index c6d5ababb0d..a1d59d9c66c 100644 --- a/chasm/lib/activity/activity_tasks.go +++ b/chasm/lib/activity/activity_tasks.go @@ -13,7 +13,6 @@ import ( workerservicepb "go.temporal.io/api/nexusservices/workerservice/v1" taskqueuepb "go.temporal.io/api/taskqueue/v1" workerpb "go.temporal.io/api/worker/v1" - "go.temporal.io/sdk/temporal" "go.temporal.io/server/api/matchingservice/v1" "go.temporal.io/server/chasm" "go.temporal.io/server/chasm/lib/activity/gen/activitypb/v1" @@ -21,6 +20,7 @@ import ( "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" + commonnexus "go.temporal.io/server/common/nexus" "go.temporal.io/server/common/resource" "go.temporal.io/server/common/util" "go.temporal.io/server/service/history/configs" @@ -416,7 +416,7 @@ func (h *cancelCommandDispatchTaskHandler) dispatchToWorker( return err } - nexusErr := cancelCommandDispatchResponseToError(resp) + nexusErr := commonnexus.DispatchResponseToError(resp) if nexusErr == nil { metrics.WorkerCommandsSent.With(h.opts.MetricsHandler).Record(1, metrics.OutcomeTag("success")) return nil @@ -435,26 +435,3 @@ func (h *cancelCommandDispatchTaskHandler) dispatchToWorker( metrics.WorkerCommandsSent.With(h.opts.MetricsHandler).Record(1, metrics.OutcomeTag("transport_error")) return nexusErr } - -// cancelCommandDispatchResponseToError converts a DispatchNexusTaskResponse into a Go error. -// TODO: consolidate with service/history.dispatchResponseToError into a shared package. -func cancelCommandDispatchResponseToError(resp *matchingservice.DispatchNexusTaskResponse) error { - switch t := resp.GetOutcome().(type) { - case *matchingservice.DispatchNexusTaskResponse_Failure: - return temporal.GetDefaultFailureConverter().FailureToError(t.Failure) - case *matchingservice.DispatchNexusTaskResponse_RequestTimeout: - return nexus.NewHandlerErrorf(nexus.HandlerErrorTypeUpstreamTimeout, "upstream timeout") - case *matchingservice.DispatchNexusTaskResponse_Response: - startResp := t.Response.GetStartOperation() - switch startResp.GetVariant().(type) { - case *nexuspb.StartOperationResponse_SyncSuccess, *nexuspb.StartOperationResponse_AsyncSuccess: - return nil - case *nexuspb.StartOperationResponse_Failure: - return temporal.GetDefaultFailureConverter().FailureToError(startResp.GetFailure()) - default: - return nexus.NewHandlerErrorf(nexus.HandlerErrorTypeInternal, "unknown start operation response") - } - default: - return nexus.NewHandlerErrorf(nexus.HandlerErrorTypeInternal, "empty or unknown dispatch outcome") - } -} From 7759b0d34d03a0ad8d00d34a527d5386c76251a1 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Tue, 12 May 2026 11:54:25 -0700 Subject: [PATCH 7/8] Improve cancel command dispatch error handling Port error handling pattern from workerCommandsTaskDispatcher: distinguish UpstreamTimeout (no_poller metric), non-retryable handler errors, transport errors, and permanent worker-returned failures. Fix misleading comment on buildCancelCommandTaskToken. Co-Authored-By: Claude Opus 4.6 --- chasm/lib/activity/activity.go | 3 ++- chasm/lib/activity/activity_tasks.go | 40 +++++++++++++++++++++++----- 2 files changed, 35 insertions(+), 8 deletions(-) diff --git a/chasm/lib/activity/activity.go b/chasm/lib/activity/activity.go index afb3a676946..59f02aff550 100644 --- a/chasm/lib/activity/activity.go +++ b/chasm/lib/activity/activity.go @@ -202,7 +202,8 @@ func (a *Activity) createAddActivityTaskRequest(ctx chasm.Context, namespaceID s } // buildCancelCommandTaskToken builds the serialized task token for a cancel command. -// This token matches what the worker received when the activity was dispatched. +// This token identifies the same activity as the poll response token but is not byte-identical — +// matching builds poll tokens with additional fields (Clock, Version, etc.). func (a *Activity) buildCancelCommandTaskToken(ctx chasm.Context, activityRef chasm.ComponentRef) ([]byte, error) { componentRefBytes, err := ctx.Ref(a) if err != nil { diff --git a/chasm/lib/activity/activity_tasks.go b/chasm/lib/activity/activity_tasks.go index a1d59d9c66c..3396f5539dc 100644 --- a/chasm/lib/activity/activity_tasks.go +++ b/chasm/lib/activity/activity_tasks.go @@ -422,16 +422,42 @@ func (h *cancelCommandDispatchTaskHandler) dispatchToWorker( return nil } - // Non-retryable errors are dropped — the activity will eventually time out. + return h.handleDispatchError(nexusErr, controlQueue) +} + +func (h *cancelCommandDispatchTaskHandler) handleDispatchError(nexusErr error, controlQueue string) error { var handlerErr *nexus.HandlerError - if errors.As(nexusErr, &handlerErr) && !handlerErr.Retryable() { - h.opts.Logger.Error("Cancel command non-retryable error", + if errors.As(nexusErr, &handlerErr) { + // Handler-level error (transport, timeout, internal). + if handlerErr.Type == nexus.HandlerErrorTypeUpstreamTimeout { + h.opts.Logger.Warn("No worker polling control queue", + tag.NewStringTag("control_queue", controlQueue)) + metrics.WorkerCommandsSent.With(h.opts.MetricsHandler).Record(1, metrics.OutcomeTag("no_poller")) + return nexusErr + } + + if !handlerErr.Retryable() { + h.opts.Logger.Error("Cancel command non-retryable handler error", + tag.NewStringTag("control_queue", controlQueue), + tag.Error(nexusErr)) + metrics.WorkerCommandsSent.With(h.opts.MetricsHandler).Record(1, metrics.OutcomeTag("non_retryable_error")) + return nil + } + + h.opts.Logger.Warn("Cancel command transport failure", tag.NewStringTag("control_queue", controlQueue), tag.Error(nexusErr)) - metrics.WorkerCommandsSent.With(h.opts.MetricsHandler).Record(1, metrics.OutcomeTag("non_retryable_error")) - return nil + metrics.WorkerCommandsSent.With(h.opts.MetricsHandler).Record(1, metrics.OutcomeTag("transport_error")) + return nexusErr } - metrics.WorkerCommandsSent.With(h.opts.MetricsHandler).Record(1, metrics.OutcomeTag("transport_error")) - return nexusErr + // Worker-returned failure (ApplicationError, CanceledError, etc.). The worker received + // and processed the request but returned an error. Permanent — the worker contract + // requires success for all defined commands, so this indicates a bug or version + // incompatibility. Retrying won't help. + h.opts.Logger.Error("Worker returned failure for cancel command", + tag.NewStringTag("control_queue", controlQueue), + tag.Error(nexusErr)) + metrics.WorkerCommandsSent.With(h.opts.MetricsHandler).Record(1, metrics.OutcomeTag("worker_error")) + return nil } From 2800c8e3cf5442e1a878f9484b77394c4016dcfa Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Tue, 12 May 2026 11:57:36 -0700 Subject: [PATCH 8/8] Fix e2e test to use env pattern and fix formatting MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The test was using s.OverrideDynamicConfig, s.FrontendClient(), s.tv etc. which don't exist on the suite — must use env from newTestEnv(). Also fix gofmt formatting in activity_test.go and library.go. Co-Authored-By: Claude Opus 4.6 --- chasm/lib/activity/activity_test.go | 12 ++++---- chasm/lib/activity/library.go | 30 +++++++++---------- tests/standalone_activity_test.go | 46 +++++++++++++++-------------- 3 files changed, 45 insertions(+), 43 deletions(-) diff --git a/chasm/lib/activity/activity_test.go b/chasm/lib/activity/activity_test.go index f7797afe6e2..70a39e69066 100644 --- a/chasm/lib/activity/activity_test.go +++ b/chasm/lib/activity/activity_test.go @@ -321,9 +321,9 @@ func TestTransitionStartedStoresWorkerControlTaskQueue(t *testing.T) { attemptState := &activitypb.ActivityAttemptState{Count: 1, Stamp: 1} a := &Activity{ ActivityState: &activitypb.ActivityState{ - ActivityType: &commonpb.ActivityType{Name: "test-type"}, - Status: activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, - TaskQueue: &taskqueuepb.TaskQueue{Name: "test-queue"}, + ActivityType: &commonpb.ActivityType{Name: "test-type"}, + Status: activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, + TaskQueue: &taskqueuepb.TaskQueue{Name: "test-queue"}, StartToCloseTimeout: durationpb.New(3 * time.Minute), }, LastAttempt: chasm.NewDataField(ctx, attemptState), @@ -348,9 +348,9 @@ func TestCancelRequestDispatchesCancelCommand(t *testing.T) { testTime := time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC) testCases := []struct { - name string - activityStatus activitypb.ActivityExecutionStatus - controlQueue string + name string + activityStatus activitypb.ActivityExecutionStatus + controlQueue string expectDispatchTask bool }{ { diff --git a/chasm/lib/activity/library.go b/chasm/lib/activity/library.go index cb128c20111..269648a4a0f 100644 --- a/chasm/lib/activity/library.go +++ b/chasm/lib/activity/library.go @@ -77,13 +77,13 @@ func (l *componentOnlyLibrary) Components() []*chasm.RegistrableComponent { type library struct { componentOnlyLibrary - handler *handler - activityDispatchTaskHandler *activityDispatchTaskHandler - cancelCommandDispatchTaskHandler *cancelCommandDispatchTaskHandler - scheduleToStartTimeoutTaskHandler *scheduleToStartTimeoutTaskHandler - scheduleToCloseTimeoutTaskHandler *scheduleToCloseTimeoutTaskHandler - startToCloseTimeoutTaskHandler *startToCloseTimeoutTaskHandler - heartbeatTimeoutTaskHandler *heartbeatTimeoutTaskHandler + handler *handler + activityDispatchTaskHandler *activityDispatchTaskHandler + cancelCommandDispatchTaskHandler *cancelCommandDispatchTaskHandler + scheduleToStartTimeoutTaskHandler *scheduleToStartTimeoutTaskHandler + scheduleToCloseTimeoutTaskHandler *scheduleToCloseTimeoutTaskHandler + startToCloseTimeoutTaskHandler *startToCloseTimeoutTaskHandler + heartbeatTimeoutTaskHandler *heartbeatTimeoutTaskHandler } func newLibrary( @@ -98,14 +98,14 @@ func newLibrary( namespaceRegistry namespace.Registry, ) *library { return &library{ - componentOnlyLibrary: *newComponentOnlyLibrary(config, namespaceRegistry), - handler: handler, - activityDispatchTaskHandler: activityDispatchTaskHandler, - cancelCommandDispatchTaskHandler: cancelCommandDispatchTaskHandler, - scheduleToStartTimeoutTaskHandler: scheduleToStartTimeoutTaskHandler, - scheduleToCloseTimeoutTaskHandler: scheduleToCloseTimeoutTaskHandler, - startToCloseTimeoutTaskHandler: startToCloseTimeoutTaskHandler, - heartbeatTimeoutTaskHandler: heartbeatTimeoutTaskHandler, + componentOnlyLibrary: *newComponentOnlyLibrary(config, namespaceRegistry), + handler: handler, + activityDispatchTaskHandler: activityDispatchTaskHandler, + cancelCommandDispatchTaskHandler: cancelCommandDispatchTaskHandler, + scheduleToStartTimeoutTaskHandler: scheduleToStartTimeoutTaskHandler, + scheduleToCloseTimeoutTaskHandler: scheduleToCloseTimeoutTaskHandler, + startToCloseTimeoutTaskHandler: startToCloseTimeoutTaskHandler, + heartbeatTimeoutTaskHandler: heartbeatTimeoutTaskHandler, } } diff --git a/tests/standalone_activity_test.go b/tests/standalone_activity_test.go index c36cdd319c2..d88ab8c9fcc 100644 --- a/tests/standalone_activity_test.go +++ b/tests/standalone_activity_test.go @@ -6331,13 +6331,15 @@ func (s *standaloneActivityTestSuite) TestCallbacks() { } func (s *standaloneActivityTestSuite) TestDispatchCancelCommandToWorker() { + env := s.newTestEnv() t := s.T() ctx, cancel := context.WithTimeout(t.Context(), 30*time.Second) defer cancel() - s.OverrideDynamicConfig(dynamicconfig.EnableCancelActivityWorkerCommand, true) + env.OverrideDynamicConfig(dynamicconfig.EnableCancelActivityWorkerCommand, true) - controlQueueName := s.tv.ControlQueueName(s.Namespace().String()) + tv := env.Tv() + controlQueueName := tv.ControlQueueName(env.Namespace().String()) tokenSerializer := tasktoken.NewSerializer() @@ -6364,10 +6366,10 @@ func (s *standaloneActivityTestSuite) TestDispatchCancelCommandToWorker() { pollNexusControlQueue := func() *workerservicepb.ExecuteCommandsRequest { pollCtx, pollCancel := context.WithTimeout(ctx, 5*time.Second) defer pollCancel() - resp, err := s.FrontendClient().PollNexusTaskQueue(pollCtx, &workflowservice.PollNexusTaskQueueRequest{ - Namespace: s.Namespace().String(), + resp, err := env.FrontendClient().PollNexusTaskQueue(pollCtx, &workflowservice.PollNexusTaskQueueRequest{ + Namespace: env.Namespace().String(), TaskQueue: &taskqueuepb.TaskQueue{Name: controlQueueName, Kind: enumspb.TASK_QUEUE_KIND_WORKER_COMMANDS}, - Identity: s.tv.WorkerIdentity(), + Identity: tv.WorkerIdentity(), }) if err != nil || resp == nil || resp.Request == nil { return nil @@ -6387,36 +6389,36 @@ func (s *standaloneActivityTestSuite) TestDispatchCancelCommandToWorker() { activityID := testcore.RandomizeStr(t.Name()) taskQueue := testcore.RandomizeStr(t.Name()) - startResp := s.startAndValidateActivity(ctx, t, activityID, taskQueue) + startResp := env.startAndValidateActivity(ctx, t, activityID, taskQueue) runID := startResp.RunId // Poll with a worker control task queue so the activity stores it. - pollTaskResp, err := s.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ - Namespace: s.Namespace().String(), + pollTaskResp, err := env.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ + Namespace: env.Namespace().String(), TaskQueue: &taskqueuepb.TaskQueue{ Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL, }, - Identity: s.tv.WorkerIdentity(), - WorkerInstanceKey: s.tv.WorkerInstanceKey(), + Identity: tv.WorkerIdentity(), + WorkerInstanceKey: tv.WorkerInstanceKey(), WorkerControlTaskQueue: controlQueueName, }) require.NoError(t, err) require.NotEmpty(t, pollTaskResp.TaskToken) // Request cancellation — should dispatch cancel command to the control queue. - _, err = s.FrontendClient().RequestCancelActivityExecution(ctx, &workflowservice.RequestCancelActivityExecutionRequest{ - Namespace: s.Namespace().String(), + _, err = env.FrontendClient().RequestCancelActivityExecution(ctx, &workflowservice.RequestCancelActivityExecutionRequest{ + Namespace: env.Namespace().String(), ActivityId: activityID, RunId: runID, Identity: "canceller", - RequestId: s.tv.RequestID(), + RequestId: tv.RequestID(), Reason: "test cancel", }) require.NoError(t, err) var executeReq *workerservicepb.ExecuteCommandsRequest - s.Eventually(func() bool { + require.Eventually(t, func() bool { executeReq = pollNexusControlQueue() return executeReq != nil }, 15*time.Second, 100*time.Millisecond, "cancel command not received on control queue") @@ -6431,26 +6433,26 @@ func (s *standaloneActivityTestSuite) TestDispatchCancelCommandToWorker() { activityID := testcore.RandomizeStr(t.Name()) taskQueue := testcore.RandomizeStr(t.Name()) - startResp := s.startAndValidateActivity(ctx, t, activityID, taskQueue) + startResp := env.startAndValidateActivity(ctx, t, activityID, taskQueue) runID := startResp.RunId // Poll with a worker control task queue. - pollTaskResp, err := s.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ - Namespace: s.Namespace().String(), + pollTaskResp, err := env.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ + Namespace: env.Namespace().String(), TaskQueue: &taskqueuepb.TaskQueue{ Name: taskQueue, Kind: enumspb.TASK_QUEUE_KIND_NORMAL, }, - Identity: s.tv.WorkerIdentity(), - WorkerInstanceKey: s.tv.WorkerInstanceKey(), + Identity: tv.WorkerIdentity(), + WorkerInstanceKey: tv.WorkerInstanceKey(), WorkerControlTaskQueue: controlQueueName, }) require.NoError(t, err) require.NotEmpty(t, pollTaskResp.TaskToken) // Terminate — should dispatch cancel command to the control queue. - _, err = s.FrontendClient().TerminateActivityExecution(ctx, &workflowservice.TerminateActivityExecutionRequest{ - Namespace: s.Namespace().String(), + _, err = env.FrontendClient().TerminateActivityExecution(ctx, &workflowservice.TerminateActivityExecutionRequest{ + Namespace: env.Namespace().String(), ActivityId: activityID, RunId: runID, Reason: "test terminate", @@ -6459,7 +6461,7 @@ func (s *standaloneActivityTestSuite) TestDispatchCancelCommandToWorker() { require.NoError(t, err) var executeReq *workerservicepb.ExecuteCommandsRequest - s.Eventually(func() bool { + require.Eventually(t, func() bool { executeReq = pollNexusControlQueue() return executeReq != nil }, 15*time.Second, 100*time.Millisecond, "cancel command not received on control queue after terminate")