Add support for standalone callbacks#10192
Conversation
0d3b36b to
ad9f7cb
Compare
edb387a to
40f2f7e
Compare
| } | ||
|
|
||
| // ScheduleToCloseTimeout | ||
| if req.GetScheduleToCloseTimeout() == nil || req.GetScheduleToCloseTimeout().AsDuration() <= 0 { |
There was a problem hiding this comment.
I wonder if we want a MaxOperationScheduleToCloseTimeout? SANO has it.
There was a problem hiding this comment.
I don't think we do, assuming that there is a system-wide hard cap on callbacks. (Which IIRC was like 10s.) If it is actually possible to be longer than that, then adding and maybe wiring something in would make sense.
There was a problem hiding this comment.
It's important to distinguish the scope of the various timeouts. If we have/had a 10s timeout on the callback delivery attempt, that would be different from the "schedule to close" timeout which covers the entire lifecycle from creation to terminal state - including all retries.
There was a problem hiding this comment.
As you can see, MaxOperationScheduleToCloseTimeout is set to zero by default. But we do set a limit in Cloud to ensure that all nexus operations eventually terminate. I think we'd want the same thing here for the same reason.
|
@stephanos thank you for the close look and detailed review, PTAL. |
bergundy
left a comment
There was a problem hiding this comment.
I got all the way to frontend_validation.go but didn't complete reviewing that file yet.
It's a big PR, submitting my feedback so you can already take action and not be blocked until I complete reviewing the entire change.
Not blocking this PR but we should validate that temporal:// URLs have a token either via the old header format or the new structured Token field.
There was a problem hiding this comment.
Would you remove the WorkflowClosed message that is unused here please?
|
|
||
| // (standalone only) User-supplied business ID set when StartCallbackExecution() is | ||
| // called. Used to identify the callback for operations like Describe- or Terminate-. | ||
| string callback_id = 12; |
There was a problem hiding this comment.
This is available via the chasm context, no need to duplicate this information.
|
|
||
| // (standalone only) Schedule-to-close timeout from when StartCallbackExecution() | ||
| // is called to when the result gets delivered. | ||
| google.protobuf.Duration completion_schedule_to_close_timeout = 13; |
There was a problem hiding this comment.
Agree with @stephanos the callback lifetime is the completion lifetime, no need to qualify this field.
| CALLBACK_STATUS_FAILED = 4; | ||
| // Callback has succeeded. | ||
| CALLBACK_STATUS_SUCCEEDED = 5; | ||
| // Callback was terminated by request. |
There was a problem hiding this comment.
| // Callback was terminated by request. | |
| // Callback was terminated by request. Only relevant for standalone callbacks. |
|
|
||
| rpc DescribeCallbackExecution(DescribeCallbackExecutionRequest) returns (DescribeCallbackExecutionResponse) { | ||
| option (temporal.server.api.routing.v1.routing).business_id = "frontend_request.callback_id"; | ||
| option (temporal.server.api.common.v1.api_category).category = API_CATEGORY_STANDARD; |
There was a problem hiding this comment.
This should be a long poll API for the purpose of this annotation (liveness detection).
There was a problem hiding this comment.
hm both SANO and SAA have this as API_CATEGORY_STANDARD - did that slip through 2 reviews before?
There was a problem hiding this comment.
Roey confirmed that all these should be in the long-poll category. cc #10256 for fixing up the remaining ones.
| func (h *frontendHandler) checkFeatureEnabled(requestProto namespacer) error { | ||
| // Confirm CHASM is enabled. | ||
| targetNamespaceName := requestProto.GetNamespace() | ||
| if !h.config.CHASMEnabled(targetNamespaceName) || !h.config.CHASMCallbacksEnabled(targetNamespaceName) { |
There was a problem hiding this comment.
CHASM callbacks is a feature flag for workflow callbacks, we should name it clearly and not check this here.
| type namespacer interface{ GetNamespace() string } | ||
|
|
||
| // Looks up the namespace ID from the user-supplied namespace name in the request proto. | ||
| func (h *frontendHandler) getTargetNamespace(requestProto namespacer) (namespace.ID, error) { |
There was a problem hiding this comment.
Please don't use the get prefix for getters in Go: https://go.dev/doc/effective_go#Getters
| return nil, err | ||
| } | ||
|
|
||
| resp, err := chasm.ListExecutions[*Callback, *callbackpb.CallbackExecutionListInfo]( |
There was a problem hiding this comment.
As mentioned above, there's no need for a structured memo to implement this API.
| // requiredField is a tuple of a required field name and its value. | ||
| // Used instead of a map[string]string to provide deterministic | ||
| // errors if multiple fields aren't set. | ||
| type requiredField struct { |
There was a problem hiding this comment.
We've had different approaches to provide multi-error feedback from our APIs but don't have a standard in the codebase. I'm okay with introducing yet another form in this PR and have ideas for how to improve the validation process in general that I want us to tackle later on.
| // requiredField is a tuple of a required field name and its value. | ||
| // Used instead of a map[string]string to provide deterministic | ||
| // errors if multiple fields aren't set. | ||
| type requiredField struct { |
There was a problem hiding this comment.
nit: this is a bit more accurate for what this struct represents.
| type requiredField struct { | |
| type requiredStringField struct { |
| ) | ||
|
|
||
| // Test suite for the Nexus "Standalone Callbacks". Which are Nexus operations corresponding to | ||
| // aysynchronous actions that take place outside of Temporal. (e.g. waiting for a payment to |
There was a problem hiding this comment.
| // aysynchronous actions that take place outside of Temporal. (e.g. waiting for a payment to | |
| // asynchronous actions that take place outside of Temporal. (e.g. waiting for a payment to |
| ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) | ||
| defer cancel() |
There was a problem hiding this comment.
nit: you don't really need those (anymore), technically. It's better to use testcore.NewContext() as it gives you a context with a default timeout. We're in the process of making that become s.Context() but that's not merged yet. The only benefit of doing this is using an even tighter timeout than the default. But that can backfire in CI when things take longer. So my recommendation is to remove it.
There was a problem hiding this comment.
Good to know! I updated all the callsites to use ctx := env.Context(). So it's a big longer than I had with the custom timeout, but didn't wire in a lower value like the ones I had before. Since you have scared me worrying about how short durations would interact with under-powered and over-subscribed CI machines.
| s.NotNil(pollResp.GetOutcome(), "expected terminal outcome after timeout") | ||
| s.NotNil(pollResp.GetOutcome().GetFailure(), "expected failure outcome after timeout") | ||
| s.Contains(pollResp.GetOutcome().GetFailure().GetMessage(), "timed out") | ||
| s.NotNil(pollResp.GetOutcome().GetFailure().GetTimeoutFailureInfo()) | ||
| s.Equal(enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE, pollResp.GetOutcome().GetFailure().GetTimeoutFailureInfo().GetTimeoutType()) |
There was a problem hiding this comment.
(nit) (I didn't get around to codify this in testing.md yet) I find it often useful to - where possible - try to use protorequire.ProtoEqual to assert the whole object. There's an option to ignore fields that can't be checked like UUIDs. I find it makes it much easier to see the full picture and also get a better diff.
There was a problem hiding this comment.
I'll keep that in mind in the future. For this PR, I'm inclined to keep things as-is just to explicitly verify individual fields. But if I need to revisit this code in the future, I'll switch to that and just have a wantProto := ... to compare against.
| defer cancel() | ||
|
|
||
| // Short timeout so it fires quickly during the test. | ||
| callbackID := "timeout-test-" + uuid.NewString() |
There was a problem hiding this comment.
So you don't actually need the uuid.NewString() part since the testEnv you get will create a unique namespace for you. And as long as that isolation is sufficient and you don't re-use the identifier in the same test, you're good.
There was a problem hiding this comment.
Good to know. I'm inclined to keep this as-is, since it looks wrong to see a hard-coded ID like that. But yes, I see how it is unnecessary.
| listenAddr := nexustest.AllocListenAddress() | ||
| nexustest.NewNexusServer(s.T(), listenAddr, h) | ||
|
|
||
| _, err := env.OperatorClient().CreateNexusEndpoint(ctx, &operatorservice.CreateNexusEndpointRequest{ |
There was a problem hiding this comment.
You could consider leveraging NexusTestEnv like a lot of the other Nexus tests; it has a createRandomExternalNexusServer and more already.
There was a problem hiding this comment.
I'll keep that in mind for future tests. For now, I'm inclined to keep this more verbose and explicit. But if I ever need to modify this code, I'll look into simplifying it with existing or new test helpers. (But I'm still trying to build muscle memory re: Nexus registration mechanics, and understanding the parts of the RPC handshake, etc.)
|
|
||
| var LongPollTimeout = dynamicconfig.NewNamespaceDurationSetting( | ||
| "callback.longPollTimeout", | ||
| 20*time.Second, |
There was a problem hiding this comment.
Let's make that a constant in common/constants.go, too, while we're at it.
chrsmith
left a comment
There was a problem hiding this comment.
Still working on the feedback (thanks!)
When I'm finished I'll split this all up into a series of smaller PRs applied to a feature branch. That'll make it easier to understand and review the changes.
| "RequestID": c.RequestId, | ||
| // Only set for standalone callbacks. | ||
| "CallbackID": c.CallbackId, |
There was a problem hiding this comment.
Good catch, I missed that.
| } | ||
|
|
||
| // Setup the completion's headers. | ||
| completion.Header = callback.Header |
There was a problem hiding this comment.
It is for one of the cases (internal or outbound, I don't recall off the top of my head). Let's go over this in a separate PR after I've split this across.
This change came from applying a suggestion Stpehan suggested that might need some more thought to do safely.
| // This shouldn't happen outside of tests, since the Nexus machinery | ||
| // would prevent an invalid transition anyways. (e.g. terminating | ||
| // an already terminated Callback.) | ||
| if c.LifecycleState(ctx).IsClosed() { |
There was a problem hiding this comment.
Yup, sorry about that.
|
|
||
| // completionSource returns the completionSource from the callback, which depends on whether it | ||
| // is embedded or is running in standalone mode. | ||
| func (c *Callback) completionSource(ctx chasm.Context) CompletionSource { |
There was a problem hiding this comment.
Good point! I'll just have Callback implement CompletionSource, and then just call that.
|
@bergundy , @stephanos I've addressed all the feedback so far and rebased. The only meaningful change was I backed out moving some of the "outbound request setup logic" from Will ping folks on Slack if we should just do another review here, or if I should push a series of smaller PRs into a feature branch. |
## What changed? Mark `DescribeNexusOperationRequest` as long poll API category. ## Why? I response to #10192 (comment)
bf79b70 to
9a0ee41
Compare
bergundy
left a comment
There was a problem hiding this comment.
I reviewed everything but standalone_callbacks_test.go. This is getting really close to done.
| // requestID (unique per API call) + idx (position within the request) ensures unique, idempotent callback IDs. | ||
| id := fmt.Sprintf("%s-%d", requestID, idx) | ||
| callbackObj := callback.NewCallback(requestID, registrationTime, &callbackspb.CallbackState{}, chasmCB) | ||
| callbackObj := callback.NewEmbeddedCallback(requestID, registrationTime, chasmCB) |
There was a problem hiding this comment.
I would also accept the constructor taking a chasm context and extracting the time but what you have is fine too.
There was a problem hiding this comment.
Ack. I'll keep that in mind if I need to modify this in the future, but for now I'll just keep it as-is.
| return map[string]string{ | ||
| "request-id": c.RequestId, | ||
| // Only set for standalone callbacks. | ||
| "callback-id": ctx.ExecutionKey().BusinessID, |
There was a problem hiding this comment.
nit: ContextMetadata is only called on the root component, the comment makes it sound like that's not the case.
There was a problem hiding this comment.
I removed that comment, and moved it to the function declaration.
// ContextMetadata is used for root CHASM components, so this is only applicable
// for the standalone callback case.
func (c *Callback) ContextMetadata(ctx chasm.Context) map[string]string {
I know it might be cleaner to just drop the comment all together, but IMHO it's important to call out any differences between the embedded vs. standalone case. (What do you think? Do you agree, or is this just noise? I'm still norming on what sort of things to call out.)
| } | ||
|
|
||
| func callbackCompletionToNexusCompleteOperationOpts( | ||
| cb *Callback, |
There was a problem hiding this comment.
nit: this can be a method on Callback.
There was a problem hiding this comment.
I'm inclined to leave this as-is, because I can't think of a good clean way to add it without adding more error handling. (Since the function would need to check if the Callback is in embedded mode or not.)
| case *callbackpb.CallbackExecutionCompletion_Failure: | ||
| f, err := commonnexus.TemporalFailureToNexusFailure(completion.GetFailure()) | ||
| if err != nil { | ||
| wrappedErr := fmt.Errorf("failed to convert failure: %w", err) | ||
| return nexusrpc.CompleteOperationOptions{}, wrappedErr | ||
| } | ||
| opErr := &nexus.OperationError{ | ||
| State: nexus.OperationStateFailed, | ||
| Message: "operation failed", | ||
| Cause: &nexus.FailureError{Failure: f}, | ||
| } | ||
| if err := nexusrpc.MarkAsWrapperError(nexusrpc.DefaultFailureConverter(), opErr); err != nil { | ||
| wrappedErr := fmt.Errorf("failed to mark wrapper error: %w", err) | ||
| return nexusrpc.CompleteOperationOptions{}, wrappedErr | ||
| } | ||
| nexusCompletion.Error = opErr | ||
| return nexusCompletion, nil |
There was a problem hiding this comment.
FYI, I'm tracking this as tech debt. We don't want to have to do all of this conversion every time we extract a completion from a component.
There was a problem hiding this comment.
Thanks, and agreed. This can probably be standardized across all CHASM components.
| return nexusCompletion, nil | ||
|
|
||
| default: | ||
| return nexusrpc.CompleteOperationOptions{}, serviceerror.NewInvalidArgument("no completion result provided") |
There was a problem hiding this comment.
This is an internal error, it's our fault not the user's fault.
| // Assert info object is updated. | ||
| require.Equal(t, callbackspb.CALLBACK_STATUS_SUCCEEDED, callback.StateMachineState()) | ||
| require.Equal(t, int32(2), callback.Attempt) | ||
| require.Nil(t, callback.LastAttemptFailure) |
There was a problem hiding this comment.
This should not reset the field if it existed before. For SAA we use a message that stored the last attempt failure time to preserve as much of the last failure as possible. I would do that here too, can wait for a follow up PR given that it changes existing behavior.
There was a problem hiding this comment.
Makes sense. It's an easy change to make, done.
| t.Run(test.Name, func(t *testing.T) { | ||
| currentTime := time.Now().UTC() | ||
| mctx := &chasm.MockMutableContext{} | ||
| mctx.HandleNow = func(chasm.Component) time.Time { return currentTime } |
There was a problem hiding this comment.
nit: This could be initialized directly as a field on the mutable context struct literal.
There was a problem hiding this comment.
I'll just keep it as is, none of these seem strictly better IMHO.
mctx := &chasm.MockMutableContext{MockContext: chasm.MockContext{HandleNow: func(chasm.Component) time.Time { return currentTime }}}
mctx := &chasm.MockMutableContext{MockContext: chasm.MockContext{
HandleNow: func(chasm.Component) time.Time { return currentTime },
}}
mctx := &chasm.MockMutableContext{
MockContext: chasm.MockContext{
HandleNow: func(chasm.Component) time.Time { return currentTime },
},
}| const ( | ||
| // Copy of common/nexus.CallbackTokenHeader, to avoid import cycle. | ||
| // Header to identify the callback being resolved for callbacks to resolve Nexus operations. | ||
| commonnexusCallbackTokenHeader = "Temporal-Callback-Token" |
There was a problem hiding this comment.
Ah, I forgot that this header was Temporal specific and not generic Nexus. I'd be fine if you moved the logic into the Temporal part of the codebase.
I would also love for us to use a Temporal agnostic header field for this but we should do that as a separate step.
Another thing we should do is make the token structured in the CompletionRequest struct and prevent the need to extract it from header in our handler implementation.
There was a problem hiding this comment.
OK, I'll back out the changes to nexusrpc/ and just set the required header in callback/invocable_outbound.go like before. As far as how to improve this, let's chat about that in a separate PR.
I'm definitely onboard with refactoring this, since the "headers x { Callback, Operation } Token" x { Caller, Temporal, Handler }" situation is certainly hard to keep straight.
| // ErrOperationNotStarted is returned when a completion arrives before the operation has | ||
| // started and no operation token is provided. This error is used by the callback invocation | ||
| // layer to detect this specific condition and retry without triggering the circuit breaker. | ||
| var ErrOperationNotStarted = serviceerror.NewFailedPrecondition("nexus operation not started") |
There was a problem hiding this comment.
I suggest that you create a new server internal service error with the FailedPrecondition status code and use a type assertion from the frontend handler to translate this error to a non retryable Nexus handler error.
There was a problem hiding this comment.
I think I did that right, but please double check the commit that wired it in.
Specifically, the code is returning a &serviceerror.NexusOperationNotStarted{}. But this isn't a GRPC error wrapping that NexusOperationNotStarted. (So I don't think any sort of "failed precondition" would get surfaced upstream.)
| // reject the request. This handles the race where a completion arrives before the start | ||
| // handler returns with the operation token. The caller will retry and by then the start | ||
| // handler will have returned and recorded the token. | ||
| if operationToken == "" { |
There was a problem hiding this comment.
We need a follow up to delay the completion request using chasm.PollComponent for a second to allow the start request to be recorded. I don't want these errors showing up for users.
There was a problem hiding this comment.
OK. I filed https://temporalio.atlassian.net/browse/NEXUS-369 to track that.
I tried to write a unit test to confirm the current behavior, but inside invocable_outbound.go we only retry these errors IFF the callback URL is temporal://system. So I'd need to spend more time figuring out how we can simulate that within functional or integration tests.
I did add unit tests for invocable_outbound.go to cover that case, but that isn't where we'd want to have that logic.
|
I think I got to everything. However, I believe the way I wired in the However, I assume the way we are surfacing errors is fine for the time being, until we can address NEXUS-369: Improve "callback invoked before Nexus operation started" situation, which would entail adding some sort of delay or polling to start the callback until the target Nexus operation has actually started. WDYT? |
What changed?
Adds support for "standalone" callbacks. Today, the CHASM
Callbackcomponent is used to deliver an arbitrary payload to a URL. (e.g. when a Workflow has completed.) As part of supplying the Nexus Connector Foundations, this feature adds CRUD operations on callbacks directly. So callers can invokeStartCallbackExecution(...)and get the durability guarantees to ensure that the callback actually gets invoked.Why?
The primary (only?) use-case for this is for completing Nexus operations. With this capability, a Handler can implement a Nexus operation outside of Temporal. And when that operation completes, simply call
StartCallbackExecution(...)with the right callback URL and Token. Then the CHASMCallbackmachinery will attempt to deliver the result for the Nexus operation. (Rather than, say, the Nexus operation to be implemented as a Workflow that is separately polling the async or out-of-band process.)How did you test it?
Potential risks
TBD. Will need to ask around.