Skip to content
51 changes: 51 additions & 0 deletions chasm/lib/activity/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,29 @@ func (a *Activity) createAddActivityTaskRequest(ctx chasm.Context, namespaceID s
}, nil
}

// buildCancelCommandTaskToken builds the serialized task token for a cancel command.
// This token identifies the same activity as the poll response token but is not byte-identical —
// matching builds poll tokens with additional fields (Clock, Version, etc.).
func (a *Activity) buildCancelCommandTaskToken(ctx chasm.Context, activityRef chasm.ComponentRef) ([]byte, error) {
componentRefBytes, err := ctx.Ref(a)
if err != nil {
return nil, err
}

attempt := a.LastAttempt.Get(ctx)
key := ctx.ExecutionKey()

token := &tokenspb.Task{
NamespaceId: key.NamespaceID,
ActivityId: key.BusinessID,
ActivityType: a.GetActivityType().GetName(),
Attempt: attempt.GetCount(),
ComponentRef: componentRefBytes,
}

return token.Marshal()
}

// HandleStarted updates the activity on recording activity task started and populates the response.
func (a *Activity) HandleStarted(ctx chasm.MutableContext, request *historyservice.RecordActivityTaskStartedRequest) (
*historyservice.RecordActivityTaskStartedResponse, error,
Expand Down Expand Up @@ -505,6 +528,13 @@ func (a *Activity) Terminate(
return chasm.TerminateComponentResponse{}, nil
}

// If the activity is running on a worker, proactively notify the worker via Nexus.
// Must be done before the transition since it checks current status.
if a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_STARTED ||
a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED {
a.addCancelCommandDispatchTask(ctx)
}

metricsHandler, err := a.enrichMetricsHandler(ctx, metrics.ActivityTerminatedScope)
if err != nil {
return chasm.TerminateComponentResponse{}, err
Expand All @@ -527,6 +557,23 @@ func (a *Activity) getOrCreateLastHeartbeat(ctx chasm.MutableContext) *activityp
return heartbeat
}

// addCancelCommandDispatchTask schedules a side-effect task to dispatch a cancel command to the
// worker via the Nexus worker commands control queue. No-op if the worker doesn't support worker
// commands (i.e., has no control queue).
func (a *Activity) addCancelCommandDispatchTask(ctx chasm.MutableContext) {
controlQueue := a.LastAttempt.Get(ctx).GetWorkerControlTaskQueue()
if controlQueue == "" {
return
}
ctx.AddTask(
a,
chasm.TaskAttributes{
Destination: controlQueue,
},
&activitypb.CancelCommandDispatchTask{},
)
}

func (a *Activity) handleCancellationRequested(ctx chasm.MutableContext, request *activitypb.RequestCancelActivityExecutionRequest) (
*activitypb.RequestCancelActivityExecutionResponse, error,
) {
Expand All @@ -551,6 +598,10 @@ func (a *Activity) handleCancellationRequested(ctx chasm.MutableContext, request
return nil, err
}

if !isCancelImmediately {
a.addCancelCommandDispatchTask(ctx)
}

if isCancelImmediately {
details := &commonpb.Payloads{
Payloads: []*commonpb.Payload{
Expand Down
184 changes: 184 additions & 0 deletions chasm/lib/activity/activity_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,30 @@ package activity

import (
"context"
"errors"
"fmt"
"time"

"github.com/nexus-rpc/sdk-go/nexus"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
nexuspb "go.temporal.io/api/nexus/v1"
workerservicepb "go.temporal.io/api/nexusservices/workerservice/v1"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
workerpb "go.temporal.io/api/worker/v1"
"go.temporal.io/server/api/matchingservice/v1"
"go.temporal.io/server/chasm"
"go.temporal.io/server/chasm/lib/activity/gen/activitypb/v1"
"go.temporal.io/server/common/debug"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
commonnexus "go.temporal.io/server/common/nexus"
"go.temporal.io/server/common/resource"
"go.temporal.io/server/common/util"
"go.temporal.io/server/service/history/configs"
"go.uber.org/fx"
"google.golang.org/protobuf/proto"
)

type activityDispatchTaskHandlerOptions struct {
Expand Down Expand Up @@ -277,3 +293,171 @@ func (h *heartbeatTimeoutTaskHandler) Execute(
fromStatus: activity.GetStatus(),
})
}

// cancelCommandDispatchTaskHandler dispatches a cancel command to the worker via the Nexus
// worker commands control queue. This is a best-effort mechanism — the activity will eventually
// time out if the worker doesn't respond.
type cancelCommandDispatchTaskHandler struct {
chasm.SideEffectTaskHandlerBase[*activitypb.CancelCommandDispatchTask]
opts cancelCommandDispatchTaskHandlerOptions
}

type cancelCommandDispatchTaskHandlerOptions struct {
fx.In

MatchingClient resource.MatchingClient
Config *configs.Config
MetricsHandler metrics.Handler
Logger log.Logger
}

func newCancelCommandDispatchTaskHandler(opts cancelCommandDispatchTaskHandlerOptions) *cancelCommandDispatchTaskHandler {
return &cancelCommandDispatchTaskHandler{opts: opts}
}

func (h *cancelCommandDispatchTaskHandler) Validate(
_ chasm.Context,
activity *Activity,
_ chasm.TaskAttributes,
_ *activitypb.CancelCommandDispatchTask,
) (bool, error) {
// Valid if the activity is in a state where it has been requested to cancel or terminated
// (meaning it was running on a worker when the cancel/terminate was issued).
return activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_CANCEL_REQUESTED ||
activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_TERMINATED, nil
}

const (
cancelCommandDispatchTimeout = time.Second * 10 * debug.TimeoutMultiplier

workerCommandsServiceName = "temporal.api.nexusservices.workerservice.v1.WorkerService"
workerCommandsOperationName = "ExecuteCommands"
)

func (h *cancelCommandDispatchTaskHandler) Execute(
ctx context.Context,
activityRef chasm.ComponentRef,
taskAttrs chasm.TaskAttributes,
_ *activitypb.CancelCommandDispatchTask,
) error {
if !h.opts.Config.EnableCancelActivityWorkerCommand() {
return nil
}

// Read the activity to build the task token for the cancel command.
taskToken, err := chasm.ReadComponent(
ctx,
activityRef,
(*Activity).buildCancelCommandTaskToken,
activityRef,
)
if err != nil {
return err
}

command := &workerpb.WorkerCommand{
Type: &workerpb.WorkerCommand_CancelActivity{
CancelActivity: &workerpb.CancelActivityCommand{
TaskToken: taskToken,
},
},
}

return h.dispatchToWorker(ctx, activityRef.NamespaceID, taskAttrs.Destination, []*workerpb.WorkerCommand{command})
}

func (h *cancelCommandDispatchTaskHandler) dispatchToWorker(
ctx context.Context,
namespaceID string,
controlQueue string,
commands []*workerpb.WorkerCommand,
) error {
ctx, cancel := context.WithTimeout(ctx, cancelCommandDispatchTimeout)
defer cancel()

request := &workerservicepb.ExecuteCommandsRequest{
Commands: commands,
}
requestData, err := proto.Marshal(request)
if err != nil {
return fmt.Errorf("failed to encode worker commands request: %w", err)
}
requestPayload := &commonpb.Payload{
Metadata: map[string][]byte{
"encoding": []byte("binary/protobuf"),
},
Data: requestData,
}

nexusRequest := &nexuspb.Request{
Header: map[string]string{},
Variant: &nexuspb.Request_StartOperation{
StartOperation: &nexuspb.StartOperationRequest{
Service: workerCommandsServiceName,
Operation: workerCommandsOperationName,
Payload: requestPayload,
},
},
}

resp, err := h.opts.MatchingClient.DispatchNexusTask(ctx, &matchingservice.DispatchNexusTaskRequest{
NamespaceId: namespaceID,
TaskQueue: &taskqueuepb.TaskQueue{
Name: controlQueue,
Kind: enumspb.TASK_QUEUE_KIND_WORKER_COMMANDS,
},
Request: nexusRequest,
})
if err != nil {
h.opts.Logger.Warn("Failed to dispatch cancel command",
tag.NewStringTag("control_queue", controlQueue),
tag.Error(err))
metrics.WorkerCommandsSent.With(h.opts.MetricsHandler).Record(1, metrics.OutcomeTag("rpc_error"))
return err
}

nexusErr := commonnexus.DispatchResponseToError(resp)
if nexusErr == nil {
metrics.WorkerCommandsSent.With(h.opts.MetricsHandler).Record(1, metrics.OutcomeTag("success"))
return nil
}

return h.handleDispatchError(nexusErr, controlQueue)
}

func (h *cancelCommandDispatchTaskHandler) handleDispatchError(nexusErr error, controlQueue string) error {
var handlerErr *nexus.HandlerError
if errors.As(nexusErr, &handlerErr) {
// Handler-level error (transport, timeout, internal).
if handlerErr.Type == nexus.HandlerErrorTypeUpstreamTimeout {
h.opts.Logger.Warn("No worker polling control queue",
tag.NewStringTag("control_queue", controlQueue))
metrics.WorkerCommandsSent.With(h.opts.MetricsHandler).Record(1, metrics.OutcomeTag("no_poller"))
return nexusErr
}

if !handlerErr.Retryable() {
h.opts.Logger.Error("Cancel command non-retryable handler error",
tag.NewStringTag("control_queue", controlQueue),
tag.Error(nexusErr))
metrics.WorkerCommandsSent.With(h.opts.MetricsHandler).Record(1, metrics.OutcomeTag("non_retryable_error"))
return nil
}

h.opts.Logger.Warn("Cancel command transport failure",
tag.NewStringTag("control_queue", controlQueue),
tag.Error(nexusErr))
metrics.WorkerCommandsSent.With(h.opts.MetricsHandler).Record(1, metrics.OutcomeTag("transport_error"))
return nexusErr
}

// Worker-returned failure (ApplicationError, CanceledError, etc.). The worker received
// and processed the request but returned an error. Permanent — the worker contract
// requires success for all defined commands, so this indicates a bug or version
// incompatibility. Retrying won't help.
h.opts.Logger.Error("Worker returned failure for cancel command",
tag.NewStringTag("control_queue", controlQueue),
tag.Error(nexusErr))
metrics.WorkerCommandsSent.With(h.opts.MetricsHandler).Record(1, metrics.OutcomeTag("worker_error"))
return nil
}
Loading
Loading