Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 161 additions & 2 deletions client/src/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,21 @@ proxier! {
let task_queue = req_mut.task_queue.as_ref()
.map(|tq| tq.name.clone()).unwrap_or_default();
match workers.try_reserve_wft_slot(namespace, task_queue) {
Some(s) => slot = Some(s),
Some(reservation) => {
// Populate eager_worker_deployment_options from the slot reservation
if let Some(opts) = reservation.deployment_options {
req_mut.eager_worker_deployment_options = Some(temporal_sdk_core_protos::temporal::api::deployment::v1::WorkerDeploymentOptions {
deployment_name: opts.version.deployment_name,
build_id: opts.version.build_id,
worker_versioning_mode: if opts.use_worker_versioning {
temporal_sdk_core_protos::temporal::api::enums::v1::WorkerVersioningMode::Versioned.into()
} else {
temporal_sdk_core_protos::temporal::api::enums::v1::WorkerVersioningMode::Unversioned.into()
},
});
}
slot = Some(reservation.slot);
}
None => req_mut.request_eager_execution = false
}
}
Expand Down Expand Up @@ -1446,6 +1460,24 @@ proxier! {
r.extensions_mut().insert(labels);
}
);
(
describe_worker,
DescribeWorkerRequest,
DescribeWorkerResponse,
|r| {
let labels = namespaced_request!(r);
r.extensions_mut().insert(labels);
}
);
(
set_worker_deployment_manager,
SetWorkerDeploymentManagerRequest,
SetWorkerDeploymentManagerResponse,
|r| {
let labels = namespaced_request!(r);
r.extensions_mut().insert(labels);
}
);
}

proxier! {
Expand Down Expand Up @@ -1694,7 +1726,7 @@ mod tests {
}

#[tokio::test]
async fn can_mock_workflow_service() {
async fn can_mock_services() {
#[derive(Clone)]
struct MyFakeServices {}
impl RawGrpcCaller for MyFakeServices {}
Expand Down Expand Up @@ -1753,4 +1785,131 @@ mod tests {
.unwrap();
assert_eq!(r.into_inner().namespaces[0].failover_version, 12345);
}

#[rstest::rstest]
#[case::with_versioning(true)]
#[case::without_versioning(false)]
#[tokio::test]
async fn eager_reservations_attach_deployment_options(#[case] use_worker_versioning: bool) {
use crate::worker_registry::{MockSlot, MockSlotProvider};
use temporal_sdk_core_api::worker::{WorkerDeploymentOptions, WorkerDeploymentVersion};
use temporal_sdk_core_protos::temporal::api::enums::v1::WorkerVersioningMode;

let expected_mode = if use_worker_versioning {
WorkerVersioningMode::Versioned
} else {
WorkerVersioningMode::Unversioned
};

#[derive(Clone)]
struct MyFakeServices {
slot_manager: Arc<SlotManager>,
expected_mode: WorkerVersioningMode,
}
impl RawGrpcCaller for MyFakeServices {}
impl RawClientProducer for MyFakeServices {
fn get_workers_info(&self) -> Option<Arc<SlotManager>> {
Some(self.slot_manager.clone())
}
fn workflow_client(&mut self) -> Box<dyn WorkflowService> {
Box::new(MyFakeWfClient {
expected_mode: self.expected_mode,
})
}
fn operator_client(&mut self) -> Box<dyn OperatorService> {
unimplemented!()
}
fn cloud_client(&mut self) -> Box<dyn CloudService> {
unimplemented!()
}
fn test_client(&mut self) -> Box<dyn TestService> {
unimplemented!()
}
fn health_client(&mut self) -> Box<dyn HealthService> {
unimplemented!()
}
}

let deployment_opts = WorkerDeploymentOptions {
version: WorkerDeploymentVersion {
deployment_name: "test-deployment".to_string(),
build_id: "test-build-123".to_string(),
},
use_worker_versioning,
default_versioning_behavior: None,
};

let mut mock_provider = MockSlotProvider::new();
mock_provider
.expect_namespace()
.return_const("test-namespace".to_string());
mock_provider
.expect_task_queue()
.return_const("test-task-queue".to_string());
let mut mock_slot = MockSlot::new();
mock_slot.expect_schedule_wft().returning(|_| Ok(()));
mock_provider
.expect_try_reserve_wft_slot()
.return_once(|| Some(Box::new(mock_slot)));
mock_provider
.expect_deployment_options()
.return_const(Some(deployment_opts.clone()));

let slot_manager = Arc::new(SlotManager::new());
slot_manager.register(Box::new(mock_provider));

#[derive(Clone)]
struct MyFakeWfClient {
expected_mode: WorkerVersioningMode,
}
impl WorkflowService for MyFakeWfClient {
fn start_workflow_execution(
&mut self,
request: tonic::Request<StartWorkflowExecutionRequest>,
) -> BoxFuture<'_, Result<tonic::Response<StartWorkflowExecutionResponse>, tonic::Status>>
{
let req = request.into_inner();
let expected_mode = self.expected_mode;

assert!(
req.eager_worker_deployment_options.is_some(),
"eager_worker_deployment_options should be populated"
);

let opts = req.eager_worker_deployment_options.as_ref().unwrap();
assert_eq!(opts.deployment_name, "test-deployment");
assert_eq!(opts.build_id, "test-build-123");
assert_eq!(opts.worker_versioning_mode, expected_mode as i32);

async { Ok(Response::new(StartWorkflowExecutionResponse::default())) }.boxed()
}
}

let mut mfs = MyFakeServices {
slot_manager,
expected_mode,
};

// Create a request with eager execution enabled
let req = StartWorkflowExecutionRequest {
namespace: "test-namespace".to_string(),
workflow_id: "test-wf-id".to_string(),
workflow_type: Some(
temporal_sdk_core_protos::temporal::api::common::v1::WorkflowType {
name: "test-workflow".to_string(),
},
),
task_queue: Some(TaskQueue {
name: "test-task-queue".to_string(),
kind: 0,
normal_name: String::new(),
}),
request_eager_execution: true,
..Default::default()
};

mfs.start_workflow_execution(req.into_request())
.await
.unwrap();
}
}
22 changes: 19 additions & 3 deletions client/src/worker_registry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ pub trait SlotProvider: std::fmt::Debug {
fn task_queue(&self) -> &str;
/// Try to reserve a slot on this worker.
fn try_reserve_wft_slot(&self) -> Option<Box<dyn Slot + Send>>;
/// Get the worker deployment options for this worker, if using deployment-based versioning.
fn deployment_options(&self) -> Option<temporal_sdk_core_api::worker::WorkerDeploymentOptions>;
}

/// This trait represents a slot reserved for processing a WFT by a worker.
Expand All @@ -34,6 +36,14 @@ pub trait Slot {
) -> Result<(), anyhow::Error>;
}

/// Result of reserving a workflow task slot, including deployment options if applicable.
pub(crate) struct SlotReservation {
/// The reserved slot for processing the workflow task
pub slot: Box<dyn Slot + Send>,
/// Worker deployment options, if the worker is using deployment-based versioning
pub deployment_options: Option<temporal_sdk_core_api::worker::WorkerDeploymentOptions>,
}

#[derive(PartialEq, Eq, Hash, Debug, Clone)]
struct SlotKey {
namespace: String,
Expand Down Expand Up @@ -71,12 +81,16 @@ impl SlotManagerImpl {
&self,
namespace: String,
task_queue: String,
) -> Option<Box<dyn Slot + Send>> {
) -> Option<SlotReservation> {
let key = SlotKey::new(namespace, task_queue);
if let Some(p) = self.providers.get(&key)
&& let Some(slot) = p.try_reserve_wft_slot()
{
return Some(slot);
let deployment_options = p.deployment_options();
return Some(SlotReservation {
slot,
deployment_options,
});
}
None
}
Expand Down Expand Up @@ -126,11 +140,12 @@ impl SlotManager {
}

/// Try to reserve a compatible processing slot in any of the registered workers.
/// Returns the slot and the worker's deployment options (if using deployment-based versioning).
pub(crate) fn try_reserve_wft_slot(
&self,
namespace: String,
task_queue: String,
) -> Option<Box<dyn Slot + Send>> {
) -> Option<SlotReservation> {
self.manager
.read()
.try_reserve_wft_slot(namespace, task_queue)
Expand Down Expand Up @@ -188,6 +203,7 @@ mod tests {
});
mock_provider.expect_namespace().return_const(namespace);
mock_provider.expect_task_queue().return_const(task_queue);
mock_provider.expect_deployment_options().return_const(None);
mock_provider
}

Expand Down
6 changes: 6 additions & 0 deletions core-c-bridge/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,9 @@ async fn call_workflow_service(
"DescribeTaskQueue" => {
rpc_call_on_trait!(client, call, WorkflowService, describe_task_queue)
}
"DescribeWorker" => {
rpc_call_on_trait!(client, call, WorkflowService, describe_worker)
}
"DescribeWorkerDeployment" => {
rpc_call_on_trait!(client, call, WorkflowService, describe_worker_deployment)
}
Expand Down Expand Up @@ -811,6 +814,9 @@ async fn call_workflow_service(
set_worker_deployment_current_version
)
}
"SetWorkerDeploymentManager" => {
rpc_call_on_trait!(client, call, WorkflowService, set_worker_deployment_manager)
}
"SetWorkerDeploymentRampingVersion" => {
rpc_call_on_trait!(
client,
Expand Down
7 changes: 7 additions & 0 deletions core/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,11 +486,18 @@ impl Worker {
shutdown_token.child_token(),
);

let deployment_options = match &config.versioning_strategy {
temporal_sdk_core_api::worker::WorkerVersioningStrategy::WorkerDeploymentBased(
opts,
) => Some(opts.clone()),
_ => None,
};
let provider = SlotProvider::new(
config.namespace.clone(),
config.task_queue.clone(),
wft_slots.clone(),
external_wft_tx,
deployment_options,
);
let worker_key = Mutex::new(client.workers().register(Box::new(provider)));
let sdk_name_and_ver = client.sdk_name_and_version();
Expand Down
9 changes: 9 additions & 0 deletions core/src/worker/slot_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub(super) struct SlotProvider {
task_queue: String,
wft_semaphore: MeteredPermitDealer<WorkflowSlotKind>,
external_wft_tx: WFTStreamSender,
deployment_options: Option<temporal_sdk_core_api::worker::WorkerDeploymentOptions>,
}

impl SlotProvider {
Expand All @@ -66,12 +67,14 @@ impl SlotProvider {
task_queue: String,
wft_semaphore: MeteredPermitDealer<WorkflowSlotKind>,
external_wft_tx: WFTStreamSender,
deployment_options: Option<temporal_sdk_core_api::worker::WorkerDeploymentOptions>,
) -> Self {
Self {
namespace,
task_queue,
wft_semaphore,
external_wft_tx,
deployment_options,
}
}
}
Expand All @@ -89,6 +92,9 @@ impl SlotProviderTrait for SlotProvider {
None => None,
}
}
fn deployment_options(&self) -> Option<temporal_sdk_core_api::worker::WorkerDeploymentOptions> {
self.deployment_options.clone()
}
}

#[cfg(test)]
Expand Down Expand Up @@ -125,6 +131,7 @@ mod tests {
"my_queue".to_string(),
wft_semaphore,
external_wft_tx,
None,
);

let slot = provider
Expand All @@ -146,6 +153,7 @@ mod tests {
"my_queue".to_string(),
wft_semaphore,
external_wft_tx,
None,
);
assert!(provider.try_reserve_wft_slot().is_some());
}
Expand All @@ -163,6 +171,7 @@ mod tests {
"my_queue".to_string(),
wft_semaphore.clone(),
external_wft_tx,
None,
);
let slot = provider.try_reserve_wft_slot();
assert!(slot.is_some());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,16 @@ on:
skip_sdk_check:
description: "Skip sdk-go compatibility check"
type: boolean

dispatch_id:
description: An ID used by external tools to identify workflow runs(can be left empty when running manually)
default: "none"
type: string
jobs:
dispatch:
runs-on: ubuntu-latest
steps:
- name: Dispatch ${{ inputs.dispatch_id }}
run: echo "Dispatch ${{ inputs.dispatch_id }}"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semgrep identified a blocking 🔴 issue in your code:
Using variable interpolation ${{...}} with github context data in a run: step could allow an attacker to inject their own code into the runner. This would allow them to steal secrets and code. github context data can have arbitrary user input and should be treated as untrusted. Instead, use an intermediate environment variable with env: to store the data and use the environment variable in the run: script. Be sure to use double-quotes the environment variable, like this: "$ENVVAR".

To resolve this comment:

🔧 No guidance has been designated for this issue. Fix according to your organization's approved methods.

💬 Ignore this finding

Reply with Semgrep commands to ignore this finding.

  • /fp <comment> for false positive
  • /ar <comment> for acceptable risk
  • /other <comment> for all other reasons

Alternatively, triage in Semgrep AppSec Platform to ignore the finding created by run-shell-injection.

You can view more details about this finding in the Semgrep AppSec Platform.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/other this is from an upstream repo and doesn't run here

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Status updated to ignored by @Sushisource. Re-run the pipeline to make this finding non-blocking.

Reply with /open to re-open this finding

prepare-inputs:
name: "Prepare inputs"
runs-on: ubuntu-latest
Expand Down
Loading
Loading