diff --git a/client/src/raw.rs b/client/src/raw.rs index 9b259f5cc..c9fec24f1 100644 --- a/client/src/raw.rs +++ b/client/src/raw.rs @@ -646,7 +646,21 @@ proxier! { let task_queue = req_mut.task_queue.as_ref() .map(|tq| tq.name.clone()).unwrap_or_default(); match workers.try_reserve_wft_slot(namespace, task_queue) { - Some(s) => slot = Some(s), + Some(reservation) => { + // Populate eager_worker_deployment_options from the slot reservation + if let Some(opts) = reservation.deployment_options { + req_mut.eager_worker_deployment_options = Some(temporal_sdk_core_protos::temporal::api::deployment::v1::WorkerDeploymentOptions { + deployment_name: opts.version.deployment_name, + build_id: opts.version.build_id, + worker_versioning_mode: if opts.use_worker_versioning { + temporal_sdk_core_protos::temporal::api::enums::v1::WorkerVersioningMode::Versioned.into() + } else { + temporal_sdk_core_protos::temporal::api::enums::v1::WorkerVersioningMode::Unversioned.into() + }, + }); + } + slot = Some(reservation.slot); + } None => req_mut.request_eager_execution = false } } @@ -1446,6 +1460,24 @@ proxier! { r.extensions_mut().insert(labels); } ); + ( + describe_worker, + DescribeWorkerRequest, + DescribeWorkerResponse, + |r| { + let labels = namespaced_request!(r); + r.extensions_mut().insert(labels); + } + ); + ( + set_worker_deployment_manager, + SetWorkerDeploymentManagerRequest, + SetWorkerDeploymentManagerResponse, + |r| { + let labels = namespaced_request!(r); + r.extensions_mut().insert(labels); + } + ); } proxier! { @@ -1694,7 +1726,7 @@ mod tests { } #[tokio::test] - async fn can_mock_workflow_service() { + async fn can_mock_services() { #[derive(Clone)] struct MyFakeServices {} impl RawGrpcCaller for MyFakeServices {} @@ -1753,4 +1785,131 @@ mod tests { .unwrap(); assert_eq!(r.into_inner().namespaces[0].failover_version, 12345); } + + #[rstest::rstest] + #[case::with_versioning(true)] + #[case::without_versioning(false)] + #[tokio::test] + async fn eager_reservations_attach_deployment_options(#[case] use_worker_versioning: bool) { + use crate::worker_registry::{MockSlot, MockSlotProvider}; + use temporal_sdk_core_api::worker::{WorkerDeploymentOptions, WorkerDeploymentVersion}; + use temporal_sdk_core_protos::temporal::api::enums::v1::WorkerVersioningMode; + + let expected_mode = if use_worker_versioning { + WorkerVersioningMode::Versioned + } else { + WorkerVersioningMode::Unversioned + }; + + #[derive(Clone)] + struct MyFakeServices { + slot_manager: Arc, + expected_mode: WorkerVersioningMode, + } + impl RawGrpcCaller for MyFakeServices {} + impl RawClientProducer for MyFakeServices { + fn get_workers_info(&self) -> Option> { + Some(self.slot_manager.clone()) + } + fn workflow_client(&mut self) -> Box { + Box::new(MyFakeWfClient { + expected_mode: self.expected_mode, + }) + } + fn operator_client(&mut self) -> Box { + unimplemented!() + } + fn cloud_client(&mut self) -> Box { + unimplemented!() + } + fn test_client(&mut self) -> Box { + unimplemented!() + } + fn health_client(&mut self) -> Box { + unimplemented!() + } + } + + let deployment_opts = WorkerDeploymentOptions { + version: WorkerDeploymentVersion { + deployment_name: "test-deployment".to_string(), + build_id: "test-build-123".to_string(), + }, + use_worker_versioning, + default_versioning_behavior: None, + }; + + let mut mock_provider = MockSlotProvider::new(); + mock_provider + .expect_namespace() + .return_const("test-namespace".to_string()); + mock_provider + .expect_task_queue() + .return_const("test-task-queue".to_string()); + let mut mock_slot = MockSlot::new(); + mock_slot.expect_schedule_wft().returning(|_| Ok(())); + mock_provider + .expect_try_reserve_wft_slot() + .return_once(|| Some(Box::new(mock_slot))); + mock_provider + .expect_deployment_options() + .return_const(Some(deployment_opts.clone())); + + let slot_manager = Arc::new(SlotManager::new()); + slot_manager.register(Box::new(mock_provider)); + + #[derive(Clone)] + struct MyFakeWfClient { + expected_mode: WorkerVersioningMode, + } + impl WorkflowService for MyFakeWfClient { + fn start_workflow_execution( + &mut self, + request: tonic::Request, + ) -> BoxFuture<'_, Result, tonic::Status>> + { + let req = request.into_inner(); + let expected_mode = self.expected_mode; + + assert!( + req.eager_worker_deployment_options.is_some(), + "eager_worker_deployment_options should be populated" + ); + + let opts = req.eager_worker_deployment_options.as_ref().unwrap(); + assert_eq!(opts.deployment_name, "test-deployment"); + assert_eq!(opts.build_id, "test-build-123"); + assert_eq!(opts.worker_versioning_mode, expected_mode as i32); + + async { Ok(Response::new(StartWorkflowExecutionResponse::default())) }.boxed() + } + } + + let mut mfs = MyFakeServices { + slot_manager, + expected_mode, + }; + + // Create a request with eager execution enabled + let req = StartWorkflowExecutionRequest { + namespace: "test-namespace".to_string(), + workflow_id: "test-wf-id".to_string(), + workflow_type: Some( + temporal_sdk_core_protos::temporal::api::common::v1::WorkflowType { + name: "test-workflow".to_string(), + }, + ), + task_queue: Some(TaskQueue { + name: "test-task-queue".to_string(), + kind: 0, + normal_name: String::new(), + }), + request_eager_execution: true, + ..Default::default() + }; + + mfs.start_workflow_execution(req.into_request()) + .await + .unwrap(); + } } diff --git a/client/src/worker_registry/mod.rs b/client/src/worker_registry/mod.rs index 90882a718..5fbc78333 100644 --- a/client/src/worker_registry/mod.rs +++ b/client/src/worker_registry/mod.rs @@ -22,6 +22,8 @@ pub trait SlotProvider: std::fmt::Debug { fn task_queue(&self) -> &str; /// Try to reserve a slot on this worker. fn try_reserve_wft_slot(&self) -> Option>; + /// Get the worker deployment options for this worker, if using deployment-based versioning. + fn deployment_options(&self) -> Option; } /// This trait represents a slot reserved for processing a WFT by a worker. @@ -34,6 +36,14 @@ pub trait Slot { ) -> Result<(), anyhow::Error>; } +/// Result of reserving a workflow task slot, including deployment options if applicable. +pub(crate) struct SlotReservation { + /// The reserved slot for processing the workflow task + pub slot: Box, + /// Worker deployment options, if the worker is using deployment-based versioning + pub deployment_options: Option, +} + #[derive(PartialEq, Eq, Hash, Debug, Clone)] struct SlotKey { namespace: String, @@ -71,12 +81,16 @@ impl SlotManagerImpl { &self, namespace: String, task_queue: String, - ) -> Option> { + ) -> Option { let key = SlotKey::new(namespace, task_queue); if let Some(p) = self.providers.get(&key) && let Some(slot) = p.try_reserve_wft_slot() { - return Some(slot); + let deployment_options = p.deployment_options(); + return Some(SlotReservation { + slot, + deployment_options, + }); } None } @@ -126,11 +140,12 @@ impl SlotManager { } /// Try to reserve a compatible processing slot in any of the registered workers. + /// Returns the slot and the worker's deployment options (if using deployment-based versioning). pub(crate) fn try_reserve_wft_slot( &self, namespace: String, task_queue: String, - ) -> Option> { + ) -> Option { self.manager .read() .try_reserve_wft_slot(namespace, task_queue) @@ -188,6 +203,7 @@ mod tests { }); mock_provider.expect_namespace().return_const(namespace); mock_provider.expect_task_queue().return_const(task_queue); + mock_provider.expect_deployment_options().return_const(None); mock_provider } diff --git a/core-c-bridge/src/client.rs b/core-c-bridge/src/client.rs index e5b01f09c..ef8f4535f 100644 --- a/core-c-bridge/src/client.rs +++ b/core-c-bridge/src/client.rs @@ -586,6 +586,9 @@ async fn call_workflow_service( "DescribeTaskQueue" => { rpc_call_on_trait!(client, call, WorkflowService, describe_task_queue) } + "DescribeWorker" => { + rpc_call_on_trait!(client, call, WorkflowService, describe_worker) + } "DescribeWorkerDeployment" => { rpc_call_on_trait!(client, call, WorkflowService, describe_worker_deployment) } @@ -811,6 +814,9 @@ async fn call_workflow_service( set_worker_deployment_current_version ) } + "SetWorkerDeploymentManager" => { + rpc_call_on_trait!(client, call, WorkflowService, set_worker_deployment_manager) + } "SetWorkerDeploymentRampingVersion" => { rpc_call_on_trait!( client, diff --git a/core/src/worker/mod.rs b/core/src/worker/mod.rs index 89bb379db..a2faf5f10 100644 --- a/core/src/worker/mod.rs +++ b/core/src/worker/mod.rs @@ -486,11 +486,18 @@ impl Worker { shutdown_token.child_token(), ); + let deployment_options = match &config.versioning_strategy { + temporal_sdk_core_api::worker::WorkerVersioningStrategy::WorkerDeploymentBased( + opts, + ) => Some(opts.clone()), + _ => None, + }; let provider = SlotProvider::new( config.namespace.clone(), config.task_queue.clone(), wft_slots.clone(), external_wft_tx, + deployment_options, ); let worker_key = Mutex::new(client.workers().register(Box::new(provider))); let sdk_name_and_ver = client.sdk_name_and_version(); diff --git a/core/src/worker/slot_provider.rs b/core/src/worker/slot_provider.rs index 1b5fcba95..e150a7124 100644 --- a/core/src/worker/slot_provider.rs +++ b/core/src/worker/slot_provider.rs @@ -58,6 +58,7 @@ pub(super) struct SlotProvider { task_queue: String, wft_semaphore: MeteredPermitDealer, external_wft_tx: WFTStreamSender, + deployment_options: Option, } impl SlotProvider { @@ -66,12 +67,14 @@ impl SlotProvider { task_queue: String, wft_semaphore: MeteredPermitDealer, external_wft_tx: WFTStreamSender, + deployment_options: Option, ) -> Self { Self { namespace, task_queue, wft_semaphore, external_wft_tx, + deployment_options, } } } @@ -89,6 +92,9 @@ impl SlotProviderTrait for SlotProvider { None => None, } } + fn deployment_options(&self) -> Option { + self.deployment_options.clone() + } } #[cfg(test)] @@ -125,6 +131,7 @@ mod tests { "my_queue".to_string(), wft_semaphore, external_wft_tx, + None, ); let slot = provider @@ -146,6 +153,7 @@ mod tests { "my_queue".to_string(), wft_semaphore, external_wft_tx, + None, ); assert!(provider.try_reserve_wft_slot().is_some()); } @@ -163,6 +171,7 @@ mod tests { "my_queue".to_string(), wft_semaphore.clone(), external_wft_tx, + None, ); let slot = provider.try_reserve_wft_slot(); assert!(slot.is_some()); diff --git a/sdk-core-protos/protos/api_upstream/.github/workflows/create-release.yml b/sdk-core-protos/protos/api_upstream/.github/workflows/create-release.yml index b962fdb8d..988e55b92 100644 --- a/sdk-core-protos/protos/api_upstream/.github/workflows/create-release.yml +++ b/sdk-core-protos/protos/api_upstream/.github/workflows/create-release.yml @@ -16,8 +16,16 @@ on: skip_sdk_check: description: "Skip sdk-go compatibility check" type: boolean - + dispatch_id: + description: An ID used by external tools to identify workflow runs(can be left empty when running manually) + default: "none" + type: string jobs: + dispatch: + runs-on: ubuntu-latest + steps: + - name: Dispatch ${{ inputs.dispatch_id }} + run: echo "Dispatch ${{ inputs.dispatch_id }}" prepare-inputs: name: "Prepare inputs" runs-on: ubuntu-latest diff --git a/sdk-core-protos/protos/api_upstream/openapi/openapiv2.json b/sdk-core-protos/protos/api_upstream/openapi/openapiv2.json index 8591cb0be..cfed16ffd 100644 --- a/sdk-core-protos/protos/api_upstream/openapi/openapiv2.json +++ b/sdk-core-protos/protos/api_upstream/openapi/openapiv2.json @@ -2130,6 +2130,51 @@ ] } }, + "/api/v1/namespaces/{namespace}/worker-deployments/{deploymentName}/set-manager": { + "post": { + "summary": "Set/unset the ManagerIdentity of a Worker Deployment.\nExperimental. This API might significantly change or be removed in a future release.", + "operationId": "SetWorkerDeploymentManager2", + "responses": { + "200": { + "description": "A successful response.", + "schema": { + "$ref": "#/definitions/v1SetWorkerDeploymentManagerResponse" + } + }, + "default": { + "description": "An unexpected error response.", + "schema": { + "$ref": "#/definitions/rpcStatus" + } + } + }, + "parameters": [ + { + "name": "namespace", + "in": "path", + "required": true, + "type": "string" + }, + { + "name": "deploymentName", + "in": "path", + "required": true, + "type": "string" + }, + { + "name": "body", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/WorkflowServiceSetWorkerDeploymentManagerBody" + } + } + ], + "tags": [ + "WorkflowService" + ] + } + }, "/api/v1/namespaces/{namespace}/worker-deployments/{deploymentName}/set-ramping-version": { "post": { "summary": "Set/unset the Ramping Version of a Worker Deployment and its ramp percentage. Can be used for\ngradual ramp to unversioned workers too.\nExperimental. This API might significantly change or be removed in a future release.", @@ -2296,6 +2341,45 @@ ] } }, + "/api/v1/namespaces/{namespace}/workers/describe/{workerInstanceKey}": { + "get": { + "summary": "DescribeWorker returns information about the specified worker.", + "operationId": "DescribeWorker2", + "responses": { + "200": { + "description": "A successful response.", + "schema": { + "$ref": "#/definitions/v1DescribeWorkerResponse" + } + }, + "default": { + "description": "An unexpected error response.", + "schema": { + "$ref": "#/definitions/rpcStatus" + } + } + }, + "parameters": [ + { + "name": "namespace", + "description": "Namespace this worker belongs to.", + "in": "path", + "required": true, + "type": "string" + }, + { + "name": "workerInstanceKey", + "description": "Worker instance key to describe.", + "in": "path", + "required": true, + "type": "string" + } + ], + "tags": [ + "WorkflowService" + ] + } + }, "/api/v1/namespaces/{namespace}/workers/fetch-config": { "post": { "summary": "FetchWorkerConfig returns the worker configuration for a specific worker.", @@ -2822,7 +2906,7 @@ }, "/api/v1/namespaces/{namespace}/workflows/{execution.workflowId}/history-reverse": { "get": { - "summary": "GetWorkflowExecutionHistoryReverse returns the history of specified workflow execution in reverse \norder (starting from last event). Fails with`NotFound` if the specified workflow execution is \nunknown to the service.", + "summary": "GetWorkflowExecutionHistoryReverse returns the history of specified workflow execution in reverse\norder (starting from last event). Fails with`NotFound` if the specified workflow execution is\nunknown to the service.", "operationId": "GetWorkflowExecutionHistoryReverse2", "responses": { "200": { @@ -5871,6 +5955,51 @@ ] } }, + "/namespaces/{namespace}/worker-deployments/{deploymentName}/set-manager": { + "post": { + "summary": "Set/unset the ManagerIdentity of a Worker Deployment.\nExperimental. This API might significantly change or be removed in a future release.", + "operationId": "SetWorkerDeploymentManager", + "responses": { + "200": { + "description": "A successful response.", + "schema": { + "$ref": "#/definitions/v1SetWorkerDeploymentManagerResponse" + } + }, + "default": { + "description": "An unexpected error response.", + "schema": { + "$ref": "#/definitions/rpcStatus" + } + } + }, + "parameters": [ + { + "name": "namespace", + "in": "path", + "required": true, + "type": "string" + }, + { + "name": "deploymentName", + "in": "path", + "required": true, + "type": "string" + }, + { + "name": "body", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/WorkflowServiceSetWorkerDeploymentManagerBody" + } + } + ], + "tags": [ + "WorkflowService" + ] + } + }, "/namespaces/{namespace}/worker-deployments/{deploymentName}/set-ramping-version": { "post": { "summary": "Set/unset the Ramping Version of a Worker Deployment and its ramp percentage. Can be used for\ngradual ramp to unversioned workers too.\nExperimental. This API might significantly change or be removed in a future release.", @@ -6037,6 +6166,45 @@ ] } }, + "/namespaces/{namespace}/workers/describe/{workerInstanceKey}": { + "get": { + "summary": "DescribeWorker returns information about the specified worker.", + "operationId": "DescribeWorker", + "responses": { + "200": { + "description": "A successful response.", + "schema": { + "$ref": "#/definitions/v1DescribeWorkerResponse" + } + }, + "default": { + "description": "An unexpected error response.", + "schema": { + "$ref": "#/definitions/rpcStatus" + } + } + }, + "parameters": [ + { + "name": "namespace", + "description": "Namespace this worker belongs to.", + "in": "path", + "required": true, + "type": "string" + }, + { + "name": "workerInstanceKey", + "description": "Worker instance key to describe.", + "in": "path", + "required": true, + "type": "string" + } + ], + "tags": [ + "WorkflowService" + ] + } + }, "/namespaces/{namespace}/workers/fetch-config": { "post": { "summary": "FetchWorkerConfig returns the worker configuration for a specific worker.", @@ -6563,7 +6731,7 @@ }, "/namespaces/{namespace}/workflows/{execution.workflowId}/history-reverse": { "get": { - "summary": "GetWorkflowExecutionHistoryReverse returns the history of specified workflow execution in reverse \norder (starting from last event). Fails with`NotFound` if the specified workflow execution is \nunknown to the service.", + "summary": "GetWorkflowExecutionHistoryReverse returns the history of specified workflow execution in reverse\norder (starting from last event). Fails with`NotFound` if the specified workflow execution is\nunknown to the service.", "operationId": "GetWorkflowExecutionHistoryReverse", "responses": { "200": { @@ -7861,7 +8029,7 @@ }, "type": { "type": "string", - "description": "Pause all running activities of this type." + "description": "Pause all running activities of this type.\nNote: Experimental - the behavior of pause by activity type might change in a future release." }, "reason": { "type": "string", @@ -8314,10 +8482,37 @@ "ignoreMissingTaskQueues": { "type": "boolean", "description": "Optional. By default this request would be rejected if not all the expected Task Queues are\nbeing polled by the new Version, to protect against accidental removal of Task Queues, or\nworker health issues. Pass `true` here to bypass this protection.\nThe set of expected Task Queues is the set of all the Task Queues that were ever poller by\nthe existing Current Version of the Deployment, with the following exclusions:\n - Task Queues that are not used anymore (inferred by having empty backlog and a task\n add_rate of 0.)\n - Task Queues that are moved to another Worker Deployment (inferred by the Task Queue\n having a different Current Version than the Current Version of this deployment.)\nWARNING: Do not set this flag unless you are sure that the missing task queue pollers are not\nneeded. If the request is unexpectedly rejected due to missing pollers, then that means the\npollers have not reached to the server yet. Only set this if you expect those pollers to\nnever arrive." + }, + "allowNoPollers": { + "type": "boolean", + "description": "Optional. By default this request will be rejected if no pollers have been seen for the proposed\nCurrent Version, in order to protect users from routing tasks to pollers that do not exist, leading\nto possible timeouts. Pass `true` here to bypass this protection." } }, "description": "Set/unset the Current Version of a Worker Deployment." }, + "WorkflowServiceSetWorkerDeploymentManagerBody": { + "type": "object", + "properties": { + "managerIdentity": { + "type": "string", + "description": "Arbitrary value for `manager_identity`.\nEmpty will unset the field." + }, + "self": { + "type": "boolean", + "description": "True will set `manager_identity` to `identity`." + }, + "conflictToken": { + "type": "string", + "format": "byte", + "description": "Optional. This can be the value of conflict_token from a Describe, or another Worker\nDeployment API. Passing a non-nil conflict token will cause this request to fail if the\nDeployment's configuration has been modified between the API call that generated the\ntoken and this one." + }, + "identity": { + "type": "string", + "description": "Required. The identity of the client who initiated this request." + } + }, + "description": "Update the ManagerIdentity of a Worker Deployment." + }, "WorkflowServiceSetWorkerDeploymentRampingVersionBody": { "type": "object", "properties": { @@ -8346,6 +8541,10 @@ "ignoreMissingTaskQueues": { "type": "boolean", "description": "Optional. By default this request would be rejected if not all the expected Task Queues are\nbeing polled by the new Version, to protect against accidental removal of Task Queues, or\nworker health issues. Pass `true` here to bypass this protection.\nThe set of expected Task Queues equals to all the Task Queues ever polled from the existing\nCurrent Version of the Deployment, with the following exclusions:\n - Task Queues that are not used anymore (inferred by having empty backlog and a task\n add_rate of 0.)\n - Task Queues that are moved to another Worker Deployment (inferred by the Task Queue\n having a different Current Version than the Current Version of this deployment.)\nWARNING: Do not set this flag unless you are sure that the missing task queue poller are not\nneeded. If the request is unexpectedly rejected due to missing pollers, then that means the\npollers have not reached to the server yet. Only set this if you expect those pollers to\nnever arrive.\nNote: this check only happens when the ramping version is about to change, not every time\nthat the percentage changes. Also note that the check is against the deployment's Current\nVersion, not the previous Ramping Version." + }, + "allowNoPollers": { + "type": "boolean", + "description": "Optional. By default this request will be rejected if no pollers have been seen for the proposed\nCurrent Version, in order to protect users from routing tasks to pollers that do not exist, leading\nto possible timeouts. Pass `true` here to bypass this protection." } }, "description": "Set/unset the Ramping Version of a Worker Deployment and its ramp percentage." @@ -8643,6 +8842,10 @@ "priority": { "$ref": "#/definitions/v1Priority", "title": "Priority metadata" + }, + "eagerWorkerDeploymentOptions": { + "$ref": "#/definitions/v1WorkerDeploymentOptions", + "description": "Deployment Options of the worker who will process the eager task. Passed when `request_eager_execution=true`." } } }, @@ -10875,6 +11078,14 @@ } } }, + "v1DescribeWorkerResponse": { + "type": "object", + "properties": { + "workerInfo": { + "$ref": "#/definitions/v1WorkerInfo" + } + } + }, "v1DescribeWorkflowExecutionResponse": { "type": "object", "properties": { @@ -11198,6 +11409,14 @@ }, "visibilityStore": { "type": "string" + }, + "initialFailoverVersion": { + "type": "string", + "format": "int64" + }, + "failoverVersionIncrement": { + "type": "string", + "format": "int64" } }, "description": "GetClusterInfoResponse contains information about Temporal cluster." @@ -12167,6 +12386,14 @@ "asyncUpdate": { "type": "boolean", "title": "True if the namespace supports async update" + }, + "workerHeartbeats": { + "type": "boolean", + "title": "True if the namespace supports worker heartbeats" + }, + "reportedProblemsSearchAttribute": { + "type": "boolean", + "title": "True if the namespace supports reported problems search attribute" } }, "description": "Namespace capability details. Should contain what features are enabled in a namespace." @@ -13154,7 +13381,7 @@ "priorityKey": { "type": "integer", "format": "int32", - "description": "Priority key is a positive integer from 1 to n, where smaller integers\ncorrespond to higher priorities (tasks run sooner). In general, tasks in\na queue should be processed in close to priority order, although small\ndeviations are possible.\n\nThe maximum priority value (minimum priority) is determined by server\nconfiguration, and defaults to 5.\n\nIf priority is not present (or zero), then the effective priority will be\nthe default priority, which is is calculated by (min+max)/2. With the\ndefault max of 5, and min of 1, that comes out to 3." + "description": "Priority key is a positive integer from 1 to n, where smaller integers\ncorrespond to higher priorities (tasks run sooner). In general, tasks in\na queue should be processed in close to priority order, although small\ndeviations are possible.\n\nThe maximum priority value (minimum priority) is determined by server\nconfiguration, and defaults to 5.\n\nIf priority is not present (or zero), then the effective priority will be\nthe default priority, which is calculated by (min+max)/2. With the\ndefault max of 5, and min of 1, that comes out to 3." }, "fairnessKey": { "type": "string", @@ -14343,6 +14570,20 @@ } } }, + "v1SetWorkerDeploymentManagerResponse": { + "type": "object", + "properties": { + "conflictToken": { + "type": "string", + "format": "byte", + "description": "This value is returned so that it can be optionally passed to APIs\nthat write to the Worker Deployment state to ensure that the state\ndid not change between this API call and a future write." + }, + "previousManagerIdentity": { + "type": "string", + "description": "What the `manager_identity` field was before this change." + } + } + }, "v1SetWorkerDeploymentRampingVersionResponse": { "type": "object", "properties": { @@ -14891,6 +15132,10 @@ "priority": { "$ref": "#/definitions/v1Priority", "title": "Priority metadata" + }, + "eagerWorkerDeploymentOptions": { + "$ref": "#/definitions/v1WorkerDeploymentOptions", + "description": "Deployment Options of the worker who will process the eager task. Passed when `request_eager_execution=true`." } } }, @@ -15453,7 +15698,7 @@ "additionalProperties": { "type": "string" }, - "description": "A key-value map for any customized purpose.\nIf data already exists on the namespace, \nthis will merge with the existing key values." + "description": "A key-value map for any customized purpose.\nIf data already exists on the namespace,\nthis will merge with the existing key values." }, "state": { "$ref": "#/definitions/v1NamespaceState", @@ -15812,6 +16057,10 @@ "lastModifierIdentity": { "type": "string", "description": "Identity of the last client who modified the configuration of this Deployment. Set to the\n`identity` value sent by APIs such as `SetWorkerDeploymentCurrentVersion` and\n`SetWorkerDeploymentRampingVersion`." + }, + "managerIdentity": { + "type": "string", + "description": "Identity of the client that has the exclusive right to make changes to this Worker Deployment.\nEmpty by default.\nIf this is set, clients whose identity does not match `manager_identity` will not be able to make changes\nto this Worker Deployment. They can either set their own identity as the manager or unset the field to proceed." } }, "description": "A Worker Deployment (Deployment, for short) represents all workers serving \na shared set of Task Queues. Typically, a Deployment represents one service or \napplication.\nA Deployment contains multiple Deployment Versions, each representing a different \nversion of workers. (see documentation of WorkerDeploymentVersionInfo)\nDeployment records are created in Temporal server automatically when their\nfirst poller arrives to the server.\nExperimental. Worker Deployments are experimental and might significantly change in the future." diff --git a/sdk-core-protos/protos/api_upstream/openapi/openapiv3.yaml b/sdk-core-protos/protos/api_upstream/openapi/openapiv3.yaml index 7d587b366..88f8737d0 100644 --- a/sdk-core-protos/protos/api_upstream/openapi/openapiv3.yaml +++ b/sdk-core-protos/protos/api_upstream/openapi/openapiv3.yaml @@ -1914,6 +1914,44 @@ paths: application/json: schema: $ref: '#/components/schemas/Status' + /api/v1/namespaces/{namespace}/worker-deployments/{deploymentName}/set-manager: + post: + tags: + - WorkflowService + description: |- + Set/unset the ManagerIdentity of a Worker Deployment. + Experimental. This API might significantly change or be removed in a future release. + operationId: SetWorkerDeploymentManager + parameters: + - name: namespace + in: path + required: true + schema: + type: string + - name: deploymentName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/SetWorkerDeploymentManagerRequest' + required: true + responses: + "200": + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/SetWorkerDeploymentManagerResponse' + default: + description: Default error response + content: + application/json: + schema: + $ref: '#/components/schemas/Status' /api/v1/namespaces/{namespace}/worker-deployments/{deploymentName}/set-ramping-version: post: tags: @@ -2087,6 +2125,38 @@ paths: application/json: schema: $ref: '#/components/schemas/Status' + /api/v1/namespaces/{namespace}/workers/describe/{workerInstanceKey}: + get: + tags: + - WorkflowService + description: DescribeWorker returns information about the specified worker. + operationId: DescribeWorker + parameters: + - name: namespace + in: path + description: Namespace this worker belongs to. + required: true + schema: + type: string + - name: workerInstanceKey + in: path + description: Worker instance key to describe. + required: true + schema: + type: string + responses: + "200": + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/DescribeWorkerResponse' + default: + description: Default error response + content: + application/json: + schema: + $ref: '#/components/schemas/Status' /api/v1/namespaces/{namespace}/workers/fetch-config: post: tags: @@ -2540,7 +2610,10 @@ paths: get: tags: - WorkflowService - description: "GetWorkflowExecutionHistoryReverse returns the history of specified workflow execution in reverse \n order (starting from last event). Fails with`NotFound` if the specified workflow execution is \n unknown to the service." + description: |- + GetWorkflowExecutionHistoryReverse returns the history of specified workflow execution in reverse + order (starting from last event). Fails with`NotFound` if the specified workflow execution is + unknown to the service. operationId: GetWorkflowExecutionHistoryReverse parameters: - name: namespace @@ -5265,6 +5338,44 @@ paths: application/json: schema: $ref: '#/components/schemas/Status' + /namespaces/{namespace}/worker-deployments/{deploymentName}/set-manager: + post: + tags: + - WorkflowService + description: |- + Set/unset the ManagerIdentity of a Worker Deployment. + Experimental. This API might significantly change or be removed in a future release. + operationId: SetWorkerDeploymentManager + parameters: + - name: namespace + in: path + required: true + schema: + type: string + - name: deploymentName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/SetWorkerDeploymentManagerRequest' + required: true + responses: + "200": + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/SetWorkerDeploymentManagerResponse' + default: + description: Default error response + content: + application/json: + schema: + $ref: '#/components/schemas/Status' /namespaces/{namespace}/worker-deployments/{deploymentName}/set-ramping-version: post: tags: @@ -5438,6 +5549,38 @@ paths: application/json: schema: $ref: '#/components/schemas/Status' + /namespaces/{namespace}/workers/describe/{workerInstanceKey}: + get: + tags: + - WorkflowService + description: DescribeWorker returns information about the specified worker. + operationId: DescribeWorker + parameters: + - name: namespace + in: path + description: Namespace this worker belongs to. + required: true + schema: + type: string + - name: workerInstanceKey + in: path + description: Worker instance key to describe. + required: true + schema: + type: string + responses: + "200": + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/DescribeWorkerResponse' + default: + description: Default error response + content: + application/json: + schema: + $ref: '#/components/schemas/Status' /namespaces/{namespace}/workers/fetch-config: post: tags: @@ -5891,7 +6034,10 @@ paths: get: tags: - WorkflowService - description: "GetWorkflowExecutionHistoryReverse returns the history of specified workflow execution in reverse \n order (starting from last event). Fails with`NotFound` if the specified workflow execution is \n unknown to the service." + description: |- + GetWorkflowExecutionHistoryReverse returns the history of specified workflow execution in reverse + order (starting from last event). Fails with`NotFound` if the specified workflow execution is + unknown to the service. operationId: GetWorkflowExecutionHistoryReverse parameters: - name: namespace @@ -7921,6 +8067,11 @@ components: Only set if `report_task_queue_stats` is set to true in the request. (-- api-linter: core::0140::prepositions=disabled aip.dev/not-precedent: "by" is used to clarify the key. --) + DescribeWorkerResponse: + type: object + properties: + workerInfo: + $ref: '#/components/schemas/WorkerInfo' DescribeWorkflowExecutionResponse: type: object properties: @@ -8242,6 +8393,10 @@ components: type: string visibilityStore: type: string + initialFailoverVersion: + type: string + failoverVersionIncrement: + type: string description: GetClusterInfoResponse contains information about Temporal cluster. GetCurrentDeploymentResponse: type: object @@ -9055,6 +9210,12 @@ components: asyncUpdate: type: boolean description: True if the namespace supports async update + workerHeartbeats: + type: boolean + description: True if the namespace supports worker heartbeats + reportedProblemsSearchAttribute: + type: boolean + description: True if the namespace supports reported problems search attribute description: Namespace capability details. Should contain what features are enabled in a namespace. NamespaceReplicationConfig: type: object @@ -9461,7 +9622,9 @@ components: description: Only the activity with this ID will be paused. type: type: string - description: Pause all running activities of this type. + description: |- + Pause all running activities of this type. + Note: Experimental - the behavior of pause by activity type might change in a future release. reason: type: string description: Reason to pause the activity. @@ -9939,7 +10102,7 @@ components: configuration, and defaults to 5. If priority is not present (or zero), then the effective priority will be - the default priority, which is is calculated by (min+max)/2. With the + the default priority, which is calculated by (min+max)/2. With the default max of 5, and min of 1, that comes out to 3. format: int32 fairnessKey: @@ -11347,6 +11510,12 @@ components: needed. If the request is unexpectedly rejected due to missing pollers, then that means the pollers have not reached to the server yet. Only set this if you expect those pollers to never arrive. + allowNoPollers: + type: boolean + description: |- + Optional. By default this request will be rejected if no pollers have been seen for the proposed + Current Version, in order to protect users from routing tasks to pollers that do not exist, leading + to possible timeouts. Pass `true` here to bypass this protection. description: Set/unset the Current Version of a Worker Deployment. SetWorkerDeploymentCurrentVersionResponse: type: object @@ -11365,6 +11534,46 @@ components: allOf: - $ref: '#/components/schemas/WorkerDeploymentVersion' description: The version that was current before executing this operation. + SetWorkerDeploymentManagerRequest: + type: object + properties: + namespace: + type: string + deploymentName: + type: string + managerIdentity: + type: string + description: |- + Arbitrary value for `manager_identity`. + Empty will unset the field. + self: + type: boolean + description: True will set `manager_identity` to `identity`. + conflictToken: + type: string + description: |- + Optional. This can be the value of conflict_token from a Describe, or another Worker + Deployment API. Passing a non-nil conflict token will cause this request to fail if the + Deployment's configuration has been modified between the API call that generated the + token and this one. + format: bytes + identity: + type: string + description: Required. The identity of the client who initiated this request. + description: Update the ManagerIdentity of a Worker Deployment. + SetWorkerDeploymentManagerResponse: + type: object + properties: + conflictToken: + type: string + description: |- + This value is returned so that it can be optionally passed to APIs + that write to the Worker Deployment state to ensure that the state + did not change between this API call and a future write. + format: bytes + previousManagerIdentity: + type: string + description: What the `manager_identity` field was before this change. SetWorkerDeploymentRampingVersionRequest: type: object properties: @@ -11415,6 +11624,12 @@ components: Note: this check only happens when the ramping version is about to change, not every time that the percentage changes. Also note that the check is against the deployment's Current Version, not the previous Ramping Version. + allowNoPollers: + type: boolean + description: |- + Optional. By default this request will be rejected if no pollers have been seen for the proposed + Current Version, in order to protect users from routing tasks to pollers that do not exist, leading + to possible timeouts. Pass `true` here to bypass this protection. description: Set/unset the Ramping Version of a Worker Deployment and its ramp percentage. SetWorkerDeploymentRampingVersionResponse: type: object @@ -11960,6 +12175,10 @@ components: allOf: - $ref: '#/components/schemas/Priority' description: Priority metadata + eagerWorkerDeploymentOptions: + allOf: + - $ref: '#/components/schemas/WorkerDeploymentOptions' + description: Deployment Options of the worker who will process the eager task. Passed when `request_eager_execution=true`. StartWorkflowExecutionResponse: type: object properties: @@ -12551,7 +12770,10 @@ components: type: object additionalProperties: type: string - description: "A key-value map for any customized purpose.\n If data already exists on the namespace, \n this will merge with the existing key values." + description: |- + A key-value map for any customized purpose. + If data already exists on the namespace, + this will merge with the existing key values. state: enum: - NAMESPACE_STATE_UNSPECIFIED @@ -13093,6 +13315,13 @@ components: Identity of the last client who modified the configuration of this Deployment. Set to the `identity` value sent by APIs such as `SetWorkerDeploymentCurrentVersion` and `SetWorkerDeploymentRampingVersion`. + managerIdentity: + type: string + description: |- + Identity of the client that has the exclusive right to make changes to this Worker Deployment. + Empty by default. + If this is set, clients whose identity does not match `manager_identity` will not be able to make changes + to this Worker Deployment. They can either set their own identity as the manager or unset the field to proceed. description: "A Worker Deployment (Deployment, for short) represents all workers serving \n a shared set of Task Queues. Typically, a Deployment represents one service or \n application.\n A Deployment contains multiple Deployment Versions, each representing a different \n version of workers. (see documentation of WorkerDeploymentVersionInfo)\n Deployment records are created in Temporal server automatically when their\n first poller arrives to the server.\n Experimental. Worker Deployments are experimental and might significantly change in the future." WorkerDeploymentInfo_WorkerDeploymentVersionSummary: type: object diff --git a/sdk-core-protos/protos/api_upstream/temporal/api/common/v1/message.proto b/sdk-core-protos/protos/api_upstream/temporal/api/common/v1/message.proto index 51acfaa2e..838f5fefc 100644 --- a/sdk-core-protos/protos/api_upstream/temporal/api/common/v1/message.proto +++ b/sdk-core-protos/protos/api_upstream/temporal/api/common/v1/message.proto @@ -280,7 +280,7 @@ message Priority { // configuration, and defaults to 5. // // If priority is not present (or zero), then the effective priority will be - // the default priority, which is is calculated by (min+max)/2. With the + // the default priority, which is calculated by (min+max)/2. With the // default max of 5, and min of 1, that comes out to 3. int32 priority_key = 1; diff --git a/sdk-core-protos/protos/api_upstream/temporal/api/deployment/v1/message.proto b/sdk-core-protos/protos/api_upstream/temporal/api/deployment/v1/message.proto index 14b4205c5..8f6685a5d 100644 --- a/sdk-core-protos/protos/api_upstream/temporal/api/deployment/v1/message.proto +++ b/sdk-core-protos/protos/api_upstream/temporal/api/deployment/v1/message.proto @@ -195,6 +195,12 @@ message WorkerDeploymentInfo { // `SetWorkerDeploymentRampingVersion`. string last_modifier_identity = 5; + // Identity of the client that has the exclusive right to make changes to this Worker Deployment. + // Empty by default. + // If this is set, clients whose identity does not match `manager_identity` will not be able to make changes + // to this Worker Deployment. They can either set their own identity as the manager or unset the field to proceed. + string manager_identity = 6; + message WorkerDeploymentVersionSummary { // Deprecated. Use `deployment_version`. string version = 1 [deprecated = true]; diff --git a/sdk-core-protos/protos/api_upstream/temporal/api/namespace/v1/message.proto b/sdk-core-protos/protos/api_upstream/temporal/api/namespace/v1/message.proto index 405cd53c9..79c44cb05 100644 --- a/sdk-core-protos/protos/api_upstream/temporal/api/namespace/v1/message.proto +++ b/sdk-core-protos/protos/api_upstream/temporal/api/namespace/v1/message.proto @@ -34,6 +34,10 @@ message NamespaceInfo { bool sync_update = 2; // True if the namespace supports async update bool async_update = 3; + // True if the namespace supports worker heartbeats + bool worker_heartbeats = 4; + // True if the namespace supports reported problems search attribute + bool reported_problems_search_attribute = 5; } // Whether scheduled workflows are supported on this namespace. This is only needed @@ -68,8 +72,8 @@ message UpdateNamespaceInfo { string description = 1; string owner_email = 2; // A key-value map for any customized purpose. - // If data already exists on the namespace, - // this will merge with the existing key values. + // If data already exists on the namespace, + // this will merge with the existing key values. map data = 3; // New namespace state, server will reject if transition is not allowed. // Allowed transitions are: diff --git a/sdk-core-protos/protos/api_upstream/temporal/api/workflowservice/v1/request_response.proto b/sdk-core-protos/protos/api_upstream/temporal/api/workflowservice/v1/request_response.proto index 5059575dc..37ad083c4 100644 --- a/sdk-core-protos/protos/api_upstream/temporal/api/workflowservice/v1/request_response.proto +++ b/sdk-core-protos/protos/api_upstream/temporal/api/workflowservice/v1/request_response.proto @@ -194,6 +194,8 @@ message StartWorkflowExecutionRequest { temporal.api.workflow.v1.OnConflictOptions on_conflict_options = 26; // Priority metadata temporal.api.common.v1.Priority priority = 27; + // Deployment Options of the worker who will process the eager task. Passed when `request_eager_execution=true`. + temporal.api.deployment.v1.WorkerDeploymentOptions eager_worker_deployment_options = 28; } message StartWorkflowExecutionResponse { @@ -1157,6 +1159,8 @@ message GetClusterInfoResponse { int32 history_shard_count = 6; string persistence_store = 7; string visibility_store = 8; + int64 initial_failover_version = 9; + int64 failover_version_increment = 10; } message GetSystemInfoRequest { @@ -1938,6 +1942,7 @@ message PauseActivityRequest { // Only the activity with this ID will be paused. string id = 4; // Pause all running activities of this type. + // Note: Experimental - the behavior of pause by activity type might change in a future release. string type = 5; } @@ -2163,6 +2168,10 @@ message SetWorkerDeploymentCurrentVersionRequest { // pollers have not reached to the server yet. Only set this if you expect those pollers to // never arrive. bool ignore_missing_task_queues = 6; + // Optional. By default this request will be rejected if no pollers have been seen for the proposed + // Current Version, in order to protect users from routing tasks to pollers that do not exist, leading + // to possible timeouts. Pass `true` here to bypass this protection. + bool allow_no_pollers = 9; } message SetWorkerDeploymentCurrentVersionResponse { @@ -2215,6 +2224,10 @@ message SetWorkerDeploymentRampingVersionRequest { // that the percentage changes. Also note that the check is against the deployment's Current // Version, not the previous Ramping Version. bool ignore_missing_task_queues = 7; + // Optional. By default this request will be rejected if no pollers have been seen for the proposed + // Current Version, in order to protect users from routing tasks to pollers that do not exist, leading + // to possible timeouts. Pass `true` here to bypass this protection. + bool allow_no_pollers = 10; } message SetWorkerDeploymentRampingVersionResponse { @@ -2248,8 +2261,8 @@ message ListWorkerDeploymentsResponse { google.protobuf.Timestamp create_time = 2; temporal.api.deployment.v1.RoutingConfig routing_config = 3; // Summary of the version that was added most recently in the Worker Deployment. - temporal.api.deployment.v1.WorkerDeploymentInfo.WorkerDeploymentVersionSummary latest_version_summary = 4; - // Summary of the current version of the Worker Deployment. + temporal.api.deployment.v1.WorkerDeploymentInfo.WorkerDeploymentVersionSummary latest_version_summary = 4; + // Summary of the current version of the Worker Deployment. temporal.api.deployment.v1.WorkerDeploymentInfo.WorkerDeploymentVersionSummary current_version_summary = 5; // Summary of the ramping version of the Worker Deployment. temporal.api.deployment.v1.WorkerDeploymentInfo.WorkerDeploymentVersionSummary ramping_version_summary = 6; @@ -2309,6 +2322,39 @@ message UpdateWorkerDeploymentVersionMetadataResponse { temporal.api.deployment.v1.VersionMetadata metadata = 1; } +// Update the ManagerIdentity of a Worker Deployment. +message SetWorkerDeploymentManagerRequest { + string namespace = 1; + string deployment_name = 2; + + oneof new_manager_identity { + // Arbitrary value for `manager_identity`. + // Empty will unset the field. + string manager_identity = 3; + + // True will set `manager_identity` to `identity`. + bool self = 4; + } + + // Optional. This can be the value of conflict_token from a Describe, or another Worker + // Deployment API. Passing a non-nil conflict token will cause this request to fail if the + // Deployment's configuration has been modified between the API call that generated the + // token and this one. + bytes conflict_token = 5; + + // Required. The identity of the client who initiated this request. + string identity = 6; +} + +message SetWorkerDeploymentManagerResponse { + // This value is returned so that it can be optionally passed to APIs + // that write to the Worker Deployment state to ensure that the state + // did not change between this API call and a future write. + bytes conflict_token = 1; + + // What the `manager_identity` field was before this change. + string previous_manager_identity = 2; +} // Returns the Current Deployment of a deployment series. // [cleanup-wv-pre-release] Pre-release deployment APIs, clean up later @@ -2537,3 +2583,15 @@ message UpdateWorkerConfigResponse { // Once we support sending update to a multiple workers - it will be converted into a batch job, and job id will be returned. } } + +message DescribeWorkerRequest { + // Namespace this worker belongs to. + string namespace = 1; + + // Worker instance key to describe. + string worker_instance_key = 2; +} + +message DescribeWorkerResponse { + temporal.api.worker.v1.WorkerInfo worker_info = 1; +} diff --git a/sdk-core-protos/protos/api_upstream/temporal/api/workflowservice/v1/service.proto b/sdk-core-protos/protos/api_upstream/temporal/api/workflowservice/v1/service.proto index cc74230af..dc33b84ef 100644 --- a/sdk-core-protos/protos/api_upstream/temporal/api/workflowservice/v1/service.proto +++ b/sdk-core-protos/protos/api_upstream/temporal/api/workflowservice/v1/service.proto @@ -133,9 +133,9 @@ service WorkflowService { } }; } - - // GetWorkflowExecutionHistoryReverse returns the history of specified workflow execution in reverse - // order (starting from last event). Fails with`NotFound` if the specified workflow execution is + + // GetWorkflowExecutionHistoryReverse returns the history of specified workflow execution in reverse + // order (starting from last event). Fails with`NotFound` if the specified workflow execution is // unknown to the service. rpc GetWorkflowExecutionHistoryReverse (GetWorkflowExecutionHistoryReverseRequest) returns (GetWorkflowExecutionHistoryReverseResponse) { option (google.api.http) = { @@ -458,7 +458,8 @@ service WorkflowService { }; } - // ScanWorkflowExecutions is a visibility API to list large amount of workflow executions in a specific namespace without order. + // ScanWorkflowExecutions _was_ a visibility API to list large amount of workflow executions in a specific namespace without order. + // It has since been deprecated in favor of `ListWorkflowExecutions` and rewritten to use `ListWorkflowExecutions` internally. // // Deprecated: Replaced with `ListWorkflowExecutions`. // (-- api-linter: core::0127::http-annotation=disabled @@ -669,8 +670,8 @@ service WorkflowService { // members are compatible with one another. // // A single build id may be mapped to multiple task queues using this API for cases where a single process hosts - // multiple workers. - // + // multiple workers. + // // To query which workers can be retired, use the `GetWorkerTaskReachability` API. // // NOTE: The number of task queues mapped to a single build id is limited by the `limit.taskQueuesPerBuildId` @@ -923,6 +924,19 @@ service WorkflowService { }; } + // Set/unset the ManagerIdentity of a Worker Deployment. + // Experimental. This API might significantly change or be removed in a future release. + rpc SetWorkerDeploymentManager (SetWorkerDeploymentManagerRequest) returns (SetWorkerDeploymentManagerResponse) { + option (google.api.http) = { + post: "/namespaces/{namespace}/worker-deployments/{deployment_name}/set-manager" + body: "*" + additional_bindings { + post: "/api/v1/namespaces/{namespace}/worker-deployments/{deployment_name}/set-manager" + body: "*" + } + }; + } + // Invokes the specified Update function on user Workflow code. rpc UpdateWorkflowExecution(UpdateWorkflowExecutionRequest) returns (UpdateWorkflowExecutionResponse) { option (google.api.http) = { @@ -1235,4 +1249,14 @@ service WorkflowService { } }; } + + // DescribeWorker returns information about the specified worker. + rpc DescribeWorker (DescribeWorkerRequest) returns (DescribeWorkerResponse) { + option (google.api.http) = { + get: "/namespaces/{namespace}/workers/describe/{worker_instance_key}" + additional_bindings { + get: "/api/v1/namespaces/{namespace}/workers/describe/{worker_instance_key}" + } + }; + } }