Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[env]
# This temporarily overrides the version of the CLI used for integration tests, locally and in CI
CLI_VERSION_OVERRIDE = "v1.6.3-serverless"
# CLI_VERSION_OVERRIDE = "v1.6.3-serverless"

[alias]
# Not sure why --all-features doesn't work
Expand Down
36 changes: 36 additions & 0 deletions crates/client/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1587,6 +1587,42 @@ proxier! {
r.extensions_mut().insert(labels);
}
);
(
pause_activity_execution,
PauseActivityExecutionRequest,
PauseActivityExecutionResponse,
|r| {
let labels = namespaced_request!(r);
r.extensions_mut().insert(labels);
}
);
(
unpause_activity_execution,
UnpauseActivityExecutionRequest,
UnpauseActivityExecutionResponse,
|r| {
let labels = namespaced_request!(r);
r.extensions_mut().insert(labels);
}
);
(
reset_activity_execution,
ResetActivityExecutionRequest,
ResetActivityExecutionResponse,
|r| {
let labels = namespaced_request!(r);
r.extensions_mut().insert(labels);
}
);
(
update_activity_execution_options,
UpdateActivityExecutionOptionsRequest,
UpdateActivityExecutionOptionsResponse,
|r| {
let labels = namespaced_request!(r);
r.extensions_mut().insert(labels);
}
);
(
count_nexus_operation_executions,
CountNexusOperationExecutionsRequest,
Expand Down
6 changes: 5 additions & 1 deletion crates/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,11 @@ impl Namespace {
Namespace::Name(n) => (n, "".to_owned()),
Namespace::Id(n) => ("".to_owned(), n),
};
DescribeNamespaceRequest { namespace, id }
DescribeNamespaceRequest {
namespace,
id,
weak_consistency: false,
}
}
}

Expand Down
52 changes: 37 additions & 15 deletions crates/client/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ use std::{
sync::Arc,
};
use temporalio_common::{
protos::temporal::api::{
worker::v1::WorkerHeartbeat, workflowservice::v1::PollWorkflowTaskQueueResponse,
protos::{
TaskToken,
temporal::api::{
worker::v1::WorkerHeartbeat, workflowservice::v1::PollWorkflowTaskQueueResponse,
},
},
worker::{WorkerDeploymentOptions, WorkerTaskTypes},
};
Expand Down Expand Up @@ -194,7 +197,13 @@ impl ClientWorkerSetImpl {
v.insert(shared_worker)
}
};
shared_worker.register_callback(worker_instance_key, heartbeat_callback);
shared_worker.register_callback(
worker_instance_key,
WorkerCallbacks {
heartbeat: heartbeat_callback,
cancel_activity: worker.cancel_activity_callback(),
},
);
}

let worker_info =
Expand Down Expand Up @@ -283,13 +292,13 @@ pub trait SharedNamespaceWorkerTrait {
/// Namespace that the shared namespace worker is connected to.
fn namespace(&self) -> String;

/// Registers a heartbeat callback.
fn register_callback(&self, worker_instance_key: Uuid, heartbeat_callback: HeartbeatCallback);
/// Registers worker callbacks.
fn register_callback(&self, worker_instance_key: Uuid, callbacks: WorkerCallbacks);

/// Unregisters a heartbeat callback. Returns the callback removed, as well as a bool that
/// Unregisters worker callbacks. Returns the callbacks removed, as well as a bool that
/// indicates if there are no remaining callbacks in the SharedNamespaceWorker, indicating
/// the shared worker itself can be shut down.
fn unregister_callback(&self, worker_instance_key: Uuid) -> (Option<HeartbeatCallback>, bool);
fn unregister_callback(&self, worker_instance_key: Uuid) -> (Option<WorkerCallbacks>, bool);

/// Returns the number of workers registered to this shared worker.
fn num_workers(&self) -> usize;
Expand Down Expand Up @@ -393,6 +402,17 @@ impl std::fmt::Debug for ClientWorkerSet {
/// Contains a worker heartbeat callback, wrapped for mocking
pub type HeartbeatCallback = Arc<dyn Fn() -> WorkerHeartbeat + Send + Sync>;

/// Callback to cancel an activity by task token. Returns true if the activity was found.
pub type CancelActivityCallback = Arc<dyn Fn(TaskToken) -> bool + Send + Sync>;

/// Bundles all per-worker callbacks registered with the SharedNamespaceWorker.
pub struct WorkerCallbacks {
/// Callback to collect heartbeat data from the worker.
pub heartbeat: HeartbeatCallback,
/// Callback to cancel an activity by task token.
pub cancel_activity: Option<CancelActivityCallback>,
}

/// Represents a complete worker that can handle both slot management
/// and worker heartbeat functionality.
#[cfg_attr(test, mockall::automock)]
Expand Down Expand Up @@ -423,6 +443,9 @@ pub trait ClientWorker: Send + Sync {
/// Returns the heartbeat callback that can be used to get WorkerHeartbeat data.
fn heartbeat_callback(&self) -> Option<HeartbeatCallback>;

/// Returns a callback that can cancel an activity by task token.
fn cancel_activity_callback(&self) -> Option<CancelActivityCallback>;

/// Creates a new worker that implements the [SharedNamespaceWorkerTrait]
fn new_shared_namespace_worker(
&self,
Expand Down Expand Up @@ -713,7 +736,7 @@ mod tests {

struct MockSharedNamespaceWorker {
namespace: String,
callbacks: Arc<RwLock<HashMap<Uuid, HeartbeatCallback>>>,
callbacks: Arc<RwLock<HashMap<Uuid, WorkerCallbacks>>>,
}

impl std::fmt::Debug for MockSharedNamespaceWorker {
Expand All @@ -739,20 +762,16 @@ mod tests {
self.namespace.clone()
}

fn register_callback(
&self,
worker_instance_key: Uuid,
heartbeat_callback: HeartbeatCallback,
) {
fn register_callback(&self, worker_instance_key: Uuid, callbacks: WorkerCallbacks) {
self.callbacks
.write()
.insert(worker_instance_key, heartbeat_callback);
.insert(worker_instance_key, callbacks);
}

fn unregister_callback(
&self,
worker_instance_key: Uuid,
) -> (Option<HeartbeatCallback>, bool) {
) -> (Option<WorkerCallbacks>, bool) {
let mut callbacks = self.callbacks.write();
let callback = callbacks.remove(&worker_instance_key);
let is_empty = callbacks.is_empty();
Expand Down Expand Up @@ -805,6 +824,9 @@ mod tests {
mock_provider
.expect_heartbeat_callback()
.returning(|| Some(Arc::new(WorkerHeartbeat::default)));
mock_provider
.expect_cancel_activity_callback()
.returning(|| None);

let namespace_clone = namespace.clone();
mock_provider
Expand Down
2 changes: 2 additions & 0 deletions crates/common/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const SERDE_DERIVE_PREFIXES: &[&str] = &[
".temporal.api.history",
".temporal.api.namespace",
".temporal.api.nexus",
".temporal.api.nexusservices",
".temporal.api.operatorservice",
".temporal.api.protocol",
".temporal.api.query",
Expand Down Expand Up @@ -162,6 +163,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
"./protos/local/temporal/sdk/core/core_interface.proto",
"./protos/api_upstream/temporal/api/sdk/v1/workflow_metadata.proto",
"./protos/api_upstream/temporal/api/workflowservice/v1/service.proto",
"./protos/api_upstream/temporal/api/nexusservices/workerservice/v1/request_response.proto",
"./protos/api_upstream/temporal/api/operatorservice/v1/service.proto",
"./protos/api_upstream/temporal/api/errordetails/v1/message.proto",
"./protos/api_cloud_upstream/temporal/api/cloud/cloudservice/v1/service.proto",
Expand Down
Loading
Loading