From 7b0afb166972eaae4e34f5912e7364efd3221abe Mon Sep 17 00:00:00 2001 From: Alex Streed Date: Fri, 9 Jan 2026 15:42:11 -0600 Subject: [PATCH 1/3] Add arun_deployment and replace sync_compatible with async_dispatch This change follows the intent of issue #15008 to replace implicit sync/async conversion with explicit, type-safe alternatives. Changes: - Add `arun_deployment` as an explicit async function for running deployments - Replace `@sync_compatible` with `@async_dispatch` on `run_deployment` - `run_deployment` now dispatches to `arun_deployment` in async context - Sync context uses `SyncPrefectClient` directly (no event loop magic) - Export `arun_deployment` from `prefect.deployments` - Add comprehensive tests for both sync and async behavior The `run_deployment.aio` attribute is preserved for backward compatibility. Co-Authored-By: Claude Opus 4.5 --- src/prefect/deployments/__init__.py | 5 +- src/prefect/deployments/flow_runs.py | 231 +++++++++++++++++- tests/deployment/test_flow_runs.py | 336 ++++++++++++++++++++++++++- 3 files changed, 563 insertions(+), 9 deletions(-) diff --git a/src/prefect/deployments/__init__.py b/src/prefect/deployments/__init__.py index 8a0d147839d7..629d391ddc03 100644 --- a/src/prefect/deployments/__init__.py +++ b/src/prefect/deployments/__init__.py @@ -3,18 +3,19 @@ if TYPE_CHECKING: - from .flow_runs import run_deployment + from .flow_runs import arun_deployment, run_deployment from .base import initialize_project from .runner import deploy _public_api: dict[str, tuple[str, str]] = { "initialize_project": (__spec__.parent, ".base"), + "arun_deployment": (__spec__.parent, ".flow_runs"), "run_deployment": (__spec__.parent, ".flow_runs"), "deploy": (__spec__.parent, ".runner"), } # Declare API for type-checkers -__all__ = ["initialize_project", "deploy", "run_deployment"] +__all__ = ["initialize_project", "deploy", "arun_deployment", "run_deployment"] def __getattr__(attr_name: str) -> object: diff --git a/src/prefect/deployments/flow_runs.py b/src/prefect/deployments/flow_runs.py index 9f36c5ffce79..c4439af81b28 100644 --- a/src/prefect/deployments/flow_runs.py +++ b/src/prefect/deployments/flow_runs.py @@ -6,16 +6,17 @@ from opentelemetry import trace import prefect +from prefect._internal.compatibility.async_dispatch import async_dispatch from prefect._result_records import ResultRecordMetadata +from prefect.client.orchestration import get_client from prefect.client.schemas import FlowRun, TaskRunResult -from prefect.client.utilities import inject_client +from prefect.client.utilities import get_or_create_client from prefect.context import FlowRunContext, TaskRunContext from prefect.logging import get_logger from prefect.states import Pending, Scheduled from prefect.tasks import Task from prefect.telemetry.run_telemetry import LABELS_TRACEPARENT_KEY, RunTelemetry from prefect.types._datetime import now -from prefect.utilities.asyncutils import sync_compatible from prefect.utilities.slugify import slugify @@ -45,9 +46,7 @@ def _is_instrumentation_enabled() -> bool: logger: "logging.Logger" = get_logger(__name__) -@sync_compatible -@inject_client -async def run_deployment( +async def arun_deployment( name: Union[str, UUID], client: Optional["PrefectClient"] = None, parameters: Optional[dict[str, Any]] = None, @@ -62,7 +61,7 @@ async def run_deployment( job_variables: Optional[dict[str, Any]] = None, ) -> "FlowRun": """ - Create a flow run for a deployment and return it after completion or a timeout. + Asynchronously create a flow run for a deployment and return it after completion or a timeout. By default, this function blocks until the flow run finishes executing. Specify a timeout (in seconds) to wait for the flow run to execute before @@ -79,6 +78,7 @@ async def run_deployment( Args: name: The deployment id or deployment name in the form: `"flow name/deployment name"` + client: An optional PrefectClient to use for API requests. parameters: Parameter overrides for this flow run. Merged with the deployment defaults. scheduled_time: The time to schedule the flow run for, defaults to scheduling @@ -100,6 +100,18 @@ async def run_deployment( job_variables: A dictionary of dot delimited infrastructure overrides that will be applied at runtime; for example `env.CONFIG_KEY=config_value` or `namespace='prefect'` + + Example: + ```python + import asyncio + from prefect.deployments import arun_deployment + + async def main(): + flow_run = await arun_deployment("my-flow/my-deployment") + print(flow_run.state) + + asyncio.run(main()) + ``` """ if timeout is not None and timeout < 0: raise ValueError("`timeout` cannot be negative") @@ -119,6 +131,8 @@ async def run_deployment( except ValueError: pass + client, _ = get_or_create_client(client) + if deployment_id: deployment = await client.read_deployment(deployment_id=deployment_id) else: @@ -222,3 +236,208 @@ async def run_deployment( await anyio.sleep(poll_interval) return flow_run + + +@async_dispatch(arun_deployment) +def run_deployment( + name: Union[str, UUID], + client: Optional["PrefectClient"] = None, + parameters: Optional[dict[str, Any]] = None, + scheduled_time: Optional[datetime] = None, + flow_run_name: Optional[str] = None, + timeout: Optional[float] = None, + poll_interval: Optional[float] = 5, + tags: Optional[Iterable[str]] = None, + idempotency_key: Optional[str] = None, + work_queue_name: Optional[str] = None, + as_subflow: Optional[bool] = True, + job_variables: Optional[dict[str, Any]] = None, +) -> "FlowRun": + """ + Create a flow run for a deployment and return it after completion or a timeout. + + This function will dispatch to `arun_deployment` when called from an async context. + + By default, this function blocks until the flow run finishes executing. + Specify a timeout (in seconds) to wait for the flow run to execute before + returning flow run metadata. To return immediately, without waiting for the + flow run to execute, set `timeout=0`. + + Note that if you specify a timeout, this function will return the flow run + metadata whether or not the flow run finished executing. + + If called within a flow or task, the flow run this function creates will + be linked to the current flow run as a subflow. Disable this behavior by + passing `as_subflow=False`. + + Args: + name: The deployment id or deployment name in the form: + `"flow name/deployment name"` + client: An optional PrefectClient to use for API requests. This is ignored + when called from a synchronous context. + parameters: Parameter overrides for this flow run. Merged with the deployment + defaults. + scheduled_time: The time to schedule the flow run for, defaults to scheduling + the flow run to start now. + flow_run_name: A name for the created flow run + timeout: The amount of time to wait (in seconds) for the flow run to + complete before returning. Setting `timeout` to 0 will return the flow + run metadata immediately. Setting `timeout` to None will allow this + function to poll indefinitely. Defaults to None. + poll_interval: The number of seconds between polls + tags: A list of tags to associate with this flow run; tags can be used in + automations and for organizational purposes. + idempotency_key: A unique value to recognize retries of the same run, and + prevent creating multiple flow runs. + work_queue_name: The name of a work queue to use for this run. Defaults to + the default work queue for the deployment. + as_subflow: Whether to link the flow run as a subflow of the current + flow or task run. + job_variables: A dictionary of dot delimited infrastructure overrides that + will be applied at runtime; for example `env.CONFIG_KEY=config_value` or + `namespace='prefect'` + + Example: + ```python + from prefect.deployments import run_deployment + + # Sync context + flow_run = run_deployment("my-flow/my-deployment") + print(flow_run.state) + + # Async context (will dispatch to arun_deployment) + async def main(): + flow_run = await run_deployment("my-flow/my-deployment") + print(flow_run.state) + ``` + """ + if timeout is not None and timeout < 0: + raise ValueError("`timeout` cannot be negative") + + if scheduled_time is None: + scheduled_time = now("UTC") + + parameters = parameters or {} + + deployment_id = None + + if isinstance(name, UUID): + deployment_id = name + else: + try: + deployment_id = UUID(name) + except ValueError: + pass + + with get_client(sync_client=True) as sync_client: + if deployment_id: + deployment = sync_client.read_deployment(deployment_id=deployment_id) + else: + deployment = sync_client.read_deployment_by_name(name) + + flow_run_ctx = FlowRunContext.get() + task_run_ctx = TaskRunContext.get() + if as_subflow and (flow_run_ctx or task_run_ctx): + # TODO: this logic can likely be simplified by using `Task.create_run` + from prefect.utilities._engine import dynamic_key_for_task_run + from prefect.utilities.engine import collect_task_run_inputs_sync + + # This was called from a flow. Link the flow run as a subflow. + task_inputs = { + k: collect_task_run_inputs_sync(v) for k, v in parameters.items() + } + + # Track parent task if this is being called from within a task + # This enables the execution graph to properly display the deployment + # flow run as nested under the calling task + if task_run_ctx: + # The task run is only considered a parent if it is in the same + # flow run (otherwise the child is in a subflow, so the subflow + # serves as the parent) or if there is no flow run + if not flow_run_ctx or ( + task_run_ctx.task_run.flow_run_id + == getattr(flow_run_ctx.flow_run, "id", None) + ): + task_inputs["__parents__"] = [ + TaskRunResult(id=task_run_ctx.task_run.id) + ] + + if deployment_id: + flow = sync_client.read_flow(deployment.flow_id) + deployment_name = f"{flow.name}/{deployment.name}" + else: + deployment_name = name + + # Generate a task in the parent flow run to represent the result of the subflow + dummy_task = Task( + name=deployment_name, + fn=lambda: None, + version=deployment.version, + ) + # Override the default task key to include the deployment name + dummy_task.task_key = ( + f"{__name__}.run_deployment.{slugify(deployment_name)}" + ) + flow_run_id = ( + flow_run_ctx.flow_run.id + if flow_run_ctx + else task_run_ctx.task_run.flow_run_id + ) + dynamic_key = ( + dynamic_key_for_task_run(flow_run_ctx, dummy_task) + if flow_run_ctx + else task_run_ctx.task_run.dynamic_key + ) + parent_task_run = sync_client.create_task_run( + task=dummy_task, + flow_run_id=flow_run_id, + dynamic_key=dynamic_key, + task_inputs=task_inputs, + state=Pending(), + ) + parent_task_run_id = parent_task_run.id + else: + parent_task_run_id = None + + if flow_run_ctx and flow_run_ctx.flow_run: + traceparent = flow_run_ctx.flow_run.labels.get(LABELS_TRACEPARENT_KEY) + elif _is_instrumentation_enabled(): + traceparent = RunTelemetry.traceparent_from_span( + span=trace.get_current_span() + ) + else: + traceparent = None + + trace_labels = {LABELS_TRACEPARENT_KEY: traceparent} if traceparent else {} + + flow_run = sync_client.create_flow_run_from_deployment( + deployment.id, + parameters=parameters, + state=Scheduled(scheduled_time=scheduled_time), + name=flow_run_name, + tags=tags, + idempotency_key=idempotency_key, + parent_task_run_id=parent_task_run_id, + work_queue_name=work_queue_name, + job_variables=job_variables, + labels=trace_labels, + ) + + flow_run_id = flow_run.id + + if timeout == 0: + return flow_run + + import time + + start_time = time.monotonic() + while True: + flow_run = sync_client.read_flow_run(flow_run_id) + flow_state = flow_run.state + if flow_state and flow_state.is_final(): + return flow_run + if timeout is not None and (time.monotonic() - start_time) >= timeout: + return flow_run + time.sleep(poll_interval) + + return flow_run diff --git a/tests/deployment/test_flow_runs.py b/tests/deployment/test_flow_runs.py index d98998f896ce..349c29e0d233 100644 --- a/tests/deployment/test_flow_runs.py +++ b/tests/deployment/test_flow_runs.py @@ -14,7 +14,7 @@ from prefect.client.schemas import TaskRunResult from prefect.client.schemas.responses import DeploymentResponse from prefect.context import FlowRunContext -from prefect.deployments import run_deployment +from prefect.deployments import arun_deployment, run_deployment from prefect.flow_engine import run_flow_async from prefect.settings import ( PREFECT_API_URL, @@ -595,3 +595,337 @@ async def foo_flow() -> None: assert foo_span.parent assert foo_span.parent.span_id == app_root_span.get_span_context().span_id assert foo_span.parent.trace_id == app_root_span.get_span_context().trace_id + + +class TestArunDeployment: + """Tests for the explicit async arun_deployment function.""" + + @pytest.fixture + async def test_deployment(self, prefect_client: PrefectClient): + flow_id = await prefect_client.create_flow_from_name("foo") + + deployment_id = await prefect_client.create_deployment( + name="foo-deployment", + flow_id=flow_id, + parameter_openapi_schema={"type": "object", "properties": {}}, + ) + deployment = await prefect_client.read_deployment(deployment_id) + + return deployment + + async def test_arun_deployment_basic( + self, prefect_client: PrefectClient, test_deployment: DeploymentResponse + ): + """Test that arun_deployment can be called directly.""" + deployment = test_deployment + flow_run = await arun_deployment( + f"foo/{deployment.name}", + timeout=0, + poll_interval=0, + client=prefect_client, + ) + assert flow_run.deployment_id == deployment.id + assert flow_run.state + + async def test_arun_deployment_with_deployment_id( + self, + test_deployment: DeploymentResponse, + prefect_client: PrefectClient, + ): + """Test that arun_deployment works with deployment UUID.""" + deployment = test_deployment + + flow_run = await arun_deployment( + deployment.id, + timeout=0, + poll_interval=0, + client=prefect_client, + ) + assert flow_run.deployment_id == deployment.id + assert flow_run.state + + async def test_arun_deployment_with_parameters( + self, + test_deployment: DeploymentResponse, + prefect_client: PrefectClient, + ): + """Test that arun_deployment passes parameters correctly.""" + deployment = test_deployment + + flow_run = await arun_deployment( + f"foo/{deployment.name}", + parameters={"test_param": "test_value"}, + timeout=0, + poll_interval=0, + client=prefect_client, + ) + assert flow_run.deployment_id == deployment.id + assert flow_run.parameters == {"test_param": "test_value"} + + async def test_arun_deployment_with_tags( + self, + test_deployment: DeploymentResponse, + prefect_client: PrefectClient, + ): + """Test that arun_deployment accepts tags.""" + deployment = test_deployment + + flow_run = await arun_deployment( + f"foo/{deployment.name}", + tags=["async", "test"], + timeout=0, + poll_interval=0, + client=prefect_client, + ) + assert sorted(flow_run.tags) == ["async", "test"] + + async def test_arun_deployment_with_custom_name( + self, + test_deployment: DeploymentResponse, + prefect_client: PrefectClient, + ): + """Test that arun_deployment accepts custom flow run names.""" + deployment = test_deployment + + flow_run = await arun_deployment( + f"foo/{deployment.name}", + flow_run_name="custom-async-run", + timeout=0, + poll_interval=0, + client=prefect_client, + ) + assert flow_run.name == "custom-async-run" + + async def test_arun_deployment_with_job_variables( + self, + test_deployment: DeploymentResponse, + prefect_client: PrefectClient, + ): + """Test that arun_deployment passes job variables correctly.""" + deployment = test_deployment + + job_vars = {"env.MY_VAR": "my_value"} + flow_run = await arun_deployment( + deployment.id, + timeout=0, + job_variables=job_vars, + client=prefect_client, + ) + assert flow_run.job_variables == job_vars + + async def test_arun_deployment_negative_timeout_raises( + self, + test_deployment: DeploymentResponse, + prefect_client: PrefectClient, + ): + """Test that arun_deployment raises on negative timeout.""" + deployment = test_deployment + + with pytest.raises(ValueError, match="`timeout` cannot be negative"): + await arun_deployment( + f"foo/{deployment.name}", + timeout=-1, + client=prefect_client, + ) + + async def test_arun_deployment_idempotency_key( + self, + test_deployment: DeploymentResponse, + prefect_client: PrefectClient, + ): + """Test that arun_deployment respects idempotency keys.""" + deployment = test_deployment + + flow_run_a = await arun_deployment( + f"foo/{deployment.name}", + idempotency_key="async-12345", + timeout=0, + poll_interval=0, + client=prefect_client, + ) + + flow_run_b = await arun_deployment( + f"foo/{deployment.name}", + idempotency_key="async-12345", + timeout=0, + poll_interval=0, + client=prefect_client, + ) + + assert flow_run_a.id == flow_run_b.id + + async def test_arun_deployment_links_to_parent_flow( + self, + test_deployment: DeploymentResponse, + use_hosted_api_server, + prefect_client: PrefectClient, + ): + """Test that arun_deployment links to parent flow when called from within a flow.""" + my_deployment = test_deployment + + @flow + async def parent_flow(): + return await arun_deployment( + f"foo/{my_deployment.name}", + timeout=0, + poll_interval=0, + ) + + parent_state = await parent_flow(return_state=True) + child_flow_run = await parent_state.result() + assert child_flow_run.parent_task_run_id is not None + task_run = await prefect_client.read_task_run(child_flow_run.parent_task_run_id) + assert task_run.flow_run_id == parent_state.state_details.flow_run_id + + +class TestRunDeploymentSyncContext: + """Tests for run_deployment behavior in sync context.""" + + @pytest.fixture + def test_deployment_sync(self, sync_prefect_client): + flow_id = sync_prefect_client.create_flow_from_name("foo-sync") + + deployment_id = sync_prefect_client.create_deployment( + name="foo-sync-deployment", + flow_id=flow_id, + parameter_openapi_schema={"type": "object", "properties": {}}, + ) + deployment = sync_prefect_client.read_deployment(deployment_id) + + return deployment + + def test_run_deployment_sync_basic( + self, + sync_prefect_client, + test_deployment_sync, + ): + """Test that run_deployment works in a synchronous context.""" + deployment = test_deployment_sync + # Force sync execution using _sync parameter + flow_run = run_deployment( + f"foo-sync/{deployment.name}", + timeout=0, + poll_interval=0, + _sync=True, + ) + assert flow_run.deployment_id == deployment.id + assert flow_run.state + + def test_run_deployment_sync_with_parameters( + self, + sync_prefect_client, + test_deployment_sync, + ): + """Test that run_deployment in sync context passes parameters correctly.""" + deployment = test_deployment_sync + + flow_run = run_deployment( + f"foo-sync/{deployment.name}", + parameters={"sync_param": "sync_value"}, + timeout=0, + poll_interval=0, + _sync=True, + ) + assert flow_run.deployment_id == deployment.id + assert flow_run.parameters == {"sync_param": "sync_value"} + + def test_run_deployment_sync_with_tags( + self, + sync_prefect_client, + test_deployment_sync, + ): + """Test that run_deployment in sync context accepts tags.""" + deployment = test_deployment_sync + + flow_run = run_deployment( + f"foo-sync/{deployment.name}", + tags=["sync", "test"], + timeout=0, + poll_interval=0, + _sync=True, + ) + assert sorted(flow_run.tags) == ["sync", "test"] + + def test_run_deployment_sync_negative_timeout_raises( + self, + sync_prefect_client, + test_deployment_sync, + ): + """Test that run_deployment in sync context raises on negative timeout.""" + deployment = test_deployment_sync + + with pytest.raises(ValueError, match="`timeout` cannot be negative"): + run_deployment( + f"foo-sync/{deployment.name}", + timeout=-1, + _sync=True, + ) + + def test_run_deployment_sync_with_deployment_id( + self, + sync_prefect_client, + test_deployment_sync, + ): + """Test that run_deployment in sync context works with deployment UUID.""" + deployment = test_deployment_sync + + flow_run = run_deployment( + deployment.id, + timeout=0, + poll_interval=0, + _sync=True, + ) + assert flow_run.deployment_id == deployment.id + assert flow_run.state + + +class TestAsyncDispatchBehavior: + """Tests to verify async_dispatch routes correctly between sync and async.""" + + @pytest.fixture + async def test_deployment(self, prefect_client: PrefectClient): + flow_id = await prefect_client.create_flow_from_name("dispatch-test") + + deployment_id = await prefect_client.create_deployment( + name="dispatch-deployment", + flow_id=flow_id, + parameter_openapi_schema={"type": "object", "properties": {}}, + ) + deployment = await prefect_client.read_deployment(deployment_id) + + return deployment + + async def test_run_deployment_dispatches_to_async_in_async_context( + self, + prefect_client: PrefectClient, + test_deployment: DeploymentResponse, + ): + """Test that run_deployment returns a coroutine when called in async context.""" + import inspect + + deployment = test_deployment + + # When called without await, run_deployment should return a coroutine in async context + result = run_deployment( + f"dispatch-test/{deployment.name}", + timeout=0, + poll_interval=0, + client=prefect_client, + ) + + # Verify it's a coroutine + assert inspect.iscoroutine(result) + + # Now await it + flow_run = await result + assert flow_run.deployment_id == deployment.id + + async def test_run_deployment_aio_attribute( + self, + prefect_client: PrefectClient, + test_deployment: DeploymentResponse, + ): + """Test that run_deployment.aio attribute references arun_deployment.""" + # The .aio attribute should be available for backward compatibility + assert hasattr(run_deployment, "aio") + assert run_deployment.aio is arun_deployment From 8b3c0a30274e32f47111db0cdbb97d8ae50c15d7 Mon Sep 17 00:00:00 2001 From: Alex Streed Date: Mon, 12 Jan 2026 09:25:56 -0600 Subject: [PATCH 2/3] Remove unnecessary deferred imports --- src/prefect/deployments/flow_runs.py | 4 ++-- tests/deployment/test_flow_runs.py | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/prefect/deployments/flow_runs.py b/src/prefect/deployments/flow_runs.py index c4439af81b28..838770f4c081 100644 --- a/src/prefect/deployments/flow_runs.py +++ b/src/prefect/deployments/flow_runs.py @@ -17,6 +17,8 @@ from prefect.tasks import Task from prefect.telemetry.run_telemetry import LABELS_TRACEPARENT_KEY, RunTelemetry from prefect.types._datetime import now +from prefect.utilities._engine import dynamic_key_for_task_run +from prefect.utilities.engine import collect_task_run_inputs_sync from prefect.utilities.slugify import slugify @@ -339,8 +341,6 @@ async def main(): task_run_ctx = TaskRunContext.get() if as_subflow and (flow_run_ctx or task_run_ctx): # TODO: this logic can likely be simplified by using `Task.create_run` - from prefect.utilities._engine import dynamic_key_for_task_run - from prefect.utilities.engine import collect_task_run_inputs_sync # This was called from a flow. Link the flow run as a subflow. task_inputs = { diff --git a/tests/deployment/test_flow_runs.py b/tests/deployment/test_flow_runs.py index 349c29e0d233..1aa30da683b3 100644 --- a/tests/deployment/test_flow_runs.py +++ b/tests/deployment/test_flow_runs.py @@ -1,3 +1,4 @@ +import inspect import re from datetime import timedelta from typing import TYPE_CHECKING @@ -901,8 +902,6 @@ async def test_run_deployment_dispatches_to_async_in_async_context( test_deployment: DeploymentResponse, ): """Test that run_deployment returns a coroutine when called in async context.""" - import inspect - deployment = test_deployment # When called without await, run_deployment should return a coroutine in async context From 1ff84aa2efd0669b5d35d4f6c0ac47bf6f7991f4 Mon Sep 17 00:00:00 2001 From: tomerqodo Date: Sun, 25 Jan 2026 12:10:10 +0200 Subject: [PATCH 3/3] update pr --- src/prefect/deployments/flow_runs.py | 212 +-------------------------- 1 file changed, 5 insertions(+), 207 deletions(-) diff --git a/src/prefect/deployments/flow_runs.py b/src/prefect/deployments/flow_runs.py index 838770f4c081..f1f7ed71d8e4 100644 --- a/src/prefect/deployments/flow_runs.py +++ b/src/prefect/deployments/flow_runs.py @@ -6,9 +6,7 @@ from opentelemetry import trace import prefect -from prefect._internal.compatibility.async_dispatch import async_dispatch from prefect._result_records import ResultRecordMetadata -from prefect.client.orchestration import get_client from prefect.client.schemas import FlowRun, TaskRunResult from prefect.client.utilities import get_or_create_client from prefect.context import FlowRunContext, TaskRunContext @@ -18,7 +16,6 @@ from prefect.telemetry.run_telemetry import LABELS_TRACEPARENT_KEY, RunTelemetry from prefect.types._datetime import now from prefect.utilities._engine import dynamic_key_for_task_run -from prefect.utilities.engine import collect_task_run_inputs_sync from prefect.utilities.slugify import slugify @@ -149,7 +146,7 @@ async def main(): # This was called from a flow. Link the flow run as a subflow. task_inputs = { - k: await collect_task_run_inputs(v) for k, v in parameters.items() + k: collect_task_run_inputs(v) for k, v in parameters.items() } # Track parent task if this is being called from within a task @@ -212,7 +209,7 @@ async def main(): trace_labels = {LABELS_TRACEPARENT_KEY: traceparent} if traceparent else {} flow_run = await client.create_flow_run_from_deployment( - deployment.id, + deployment_id, parameters=parameters, state=Scheduled(scheduled_time=scheduled_time), name=flow_run_name, @@ -231,213 +228,14 @@ async def main(): with anyio.move_on_after(timeout): while True: + await anyio.sleep(poll_interval) flow_run = await client.read_flow_run(flow_run_id) flow_state = flow_run.state if flow_state and flow_state.is_final(): return flow_run - await anyio.sleep(poll_interval) return flow_run -@async_dispatch(arun_deployment) -def run_deployment( - name: Union[str, UUID], - client: Optional["PrefectClient"] = None, - parameters: Optional[dict[str, Any]] = None, - scheduled_time: Optional[datetime] = None, - flow_run_name: Optional[str] = None, - timeout: Optional[float] = None, - poll_interval: Optional[float] = 5, - tags: Optional[Iterable[str]] = None, - idempotency_key: Optional[str] = None, - work_queue_name: Optional[str] = None, - as_subflow: Optional[bool] = True, - job_variables: Optional[dict[str, Any]] = None, -) -> "FlowRun": - """ - Create a flow run for a deployment and return it after completion or a timeout. - - This function will dispatch to `arun_deployment` when called from an async context. - - By default, this function blocks until the flow run finishes executing. - Specify a timeout (in seconds) to wait for the flow run to execute before - returning flow run metadata. To return immediately, without waiting for the - flow run to execute, set `timeout=0`. - - Note that if you specify a timeout, this function will return the flow run - metadata whether or not the flow run finished executing. - - If called within a flow or task, the flow run this function creates will - be linked to the current flow run as a subflow. Disable this behavior by - passing `as_subflow=False`. - - Args: - name: The deployment id or deployment name in the form: - `"flow name/deployment name"` - client: An optional PrefectClient to use for API requests. This is ignored - when called from a synchronous context. - parameters: Parameter overrides for this flow run. Merged with the deployment - defaults. - scheduled_time: The time to schedule the flow run for, defaults to scheduling - the flow run to start now. - flow_run_name: A name for the created flow run - timeout: The amount of time to wait (in seconds) for the flow run to - complete before returning. Setting `timeout` to 0 will return the flow - run metadata immediately. Setting `timeout` to None will allow this - function to poll indefinitely. Defaults to None. - poll_interval: The number of seconds between polls - tags: A list of tags to associate with this flow run; tags can be used in - automations and for organizational purposes. - idempotency_key: A unique value to recognize retries of the same run, and - prevent creating multiple flow runs. - work_queue_name: The name of a work queue to use for this run. Defaults to - the default work queue for the deployment. - as_subflow: Whether to link the flow run as a subflow of the current - flow or task run. - job_variables: A dictionary of dot delimited infrastructure overrides that - will be applied at runtime; for example `env.CONFIG_KEY=config_value` or - `namespace='prefect'` - - Example: - ```python - from prefect.deployments import run_deployment - - # Sync context - flow_run = run_deployment("my-flow/my-deployment") - print(flow_run.state) - - # Async context (will dispatch to arun_deployment) - async def main(): - flow_run = await run_deployment("my-flow/my-deployment") - print(flow_run.state) - ``` - """ - if timeout is not None and timeout < 0: - raise ValueError("`timeout` cannot be negative") - - if scheduled_time is None: - scheduled_time = now("UTC") - - parameters = parameters or {} - - deployment_id = None - - if isinstance(name, UUID): - deployment_id = name - else: - try: - deployment_id = UUID(name) - except ValueError: - pass - - with get_client(sync_client=True) as sync_client: - if deployment_id: - deployment = sync_client.read_deployment(deployment_id=deployment_id) - else: - deployment = sync_client.read_deployment_by_name(name) - - flow_run_ctx = FlowRunContext.get() - task_run_ctx = TaskRunContext.get() - if as_subflow and (flow_run_ctx or task_run_ctx): - # TODO: this logic can likely be simplified by using `Task.create_run` - - # This was called from a flow. Link the flow run as a subflow. - task_inputs = { - k: collect_task_run_inputs_sync(v) for k, v in parameters.items() - } - - # Track parent task if this is being called from within a task - # This enables the execution graph to properly display the deployment - # flow run as nested under the calling task - if task_run_ctx: - # The task run is only considered a parent if it is in the same - # flow run (otherwise the child is in a subflow, so the subflow - # serves as the parent) or if there is no flow run - if not flow_run_ctx or ( - task_run_ctx.task_run.flow_run_id - == getattr(flow_run_ctx.flow_run, "id", None) - ): - task_inputs["__parents__"] = [ - TaskRunResult(id=task_run_ctx.task_run.id) - ] - - if deployment_id: - flow = sync_client.read_flow(deployment.flow_id) - deployment_name = f"{flow.name}/{deployment.name}" - else: - deployment_name = name - - # Generate a task in the parent flow run to represent the result of the subflow - dummy_task = Task( - name=deployment_name, - fn=lambda: None, - version=deployment.version, - ) - # Override the default task key to include the deployment name - dummy_task.task_key = ( - f"{__name__}.run_deployment.{slugify(deployment_name)}" - ) - flow_run_id = ( - flow_run_ctx.flow_run.id - if flow_run_ctx - else task_run_ctx.task_run.flow_run_id - ) - dynamic_key = ( - dynamic_key_for_task_run(flow_run_ctx, dummy_task) - if flow_run_ctx - else task_run_ctx.task_run.dynamic_key - ) - parent_task_run = sync_client.create_task_run( - task=dummy_task, - flow_run_id=flow_run_id, - dynamic_key=dynamic_key, - task_inputs=task_inputs, - state=Pending(), - ) - parent_task_run_id = parent_task_run.id - else: - parent_task_run_id = None - - if flow_run_ctx and flow_run_ctx.flow_run: - traceparent = flow_run_ctx.flow_run.labels.get(LABELS_TRACEPARENT_KEY) - elif _is_instrumentation_enabled(): - traceparent = RunTelemetry.traceparent_from_span( - span=trace.get_current_span() - ) - else: - traceparent = None - - trace_labels = {LABELS_TRACEPARENT_KEY: traceparent} if traceparent else {} - - flow_run = sync_client.create_flow_run_from_deployment( - deployment.id, - parameters=parameters, - state=Scheduled(scheduled_time=scheduled_time), - name=flow_run_name, - tags=tags, - idempotency_key=idempotency_key, - parent_task_run_id=parent_task_run_id, - work_queue_name=work_queue_name, - job_variables=job_variables, - labels=trace_labels, - ) - - flow_run_id = flow_run.id - - if timeout == 0: - return flow_run - - import time - - start_time = time.monotonic() - while True: - flow_run = sync_client.read_flow_run(flow_run_id) - flow_state = flow_run.state - if flow_state and flow_state.is_final(): - return flow_run - if timeout is not None and (time.monotonic() - start_time) >= timeout: - return flow_run - time.sleep(poll_interval) - - return flow_run +# Alias for backwards compatibility +run_deployment = arun_deployment