Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/prefect/deployments/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@


if TYPE_CHECKING:
from .flow_runs import run_deployment
from .flow_runs import arun_deployment, run_deployment
from .base import initialize_project
from .runner import deploy

_public_api: dict[str, tuple[str, str]] = {
"initialize_project": (__spec__.parent, ".base"),
"arun_deployment": (__spec__.parent, ".flow_runs"),
"run_deployment": (__spec__.parent, ".flow_runs"),
"deploy": (__spec__.parent, ".runner"),
}

# Declare API for type-checkers
__all__ = ["initialize_project", "deploy", "run_deployment"]
__all__ = ["initialize_project", "deploy", "arun_deployment", "run_deployment"]


def __getattr__(attr_name: str) -> object:
Expand Down
35 changes: 26 additions & 9 deletions src/prefect/deployments/flow_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
import prefect
from prefect._result_records import ResultRecordMetadata
from prefect.client.schemas import FlowRun, TaskRunResult
from prefect.client.utilities import inject_client
from prefect.client.utilities import get_or_create_client
from prefect.context import FlowRunContext, TaskRunContext
from prefect.logging import get_logger
from prefect.states import Pending, Scheduled
from prefect.tasks import Task
from prefect.telemetry.run_telemetry import LABELS_TRACEPARENT_KEY, RunTelemetry
from prefect.types._datetime import now
from prefect.utilities.asyncutils import sync_compatible
from prefect.utilities._engine import dynamic_key_for_task_run
from prefect.utilities.slugify import slugify


Expand Down Expand Up @@ -45,9 +45,7 @@ def _is_instrumentation_enabled() -> bool:
logger: "logging.Logger" = get_logger(__name__)


@sync_compatible
@inject_client
async def run_deployment(
async def arun_deployment(
name: Union[str, UUID],
client: Optional["PrefectClient"] = None,
parameters: Optional[dict[str, Any]] = None,
Expand All @@ -62,7 +60,7 @@ async def run_deployment(
job_variables: Optional[dict[str, Any]] = None,
) -> "FlowRun":
"""
Create a flow run for a deployment and return it after completion or a timeout.
Asynchronously create a flow run for a deployment and return it after completion or a timeout.

By default, this function blocks until the flow run finishes executing.
Specify a timeout (in seconds) to wait for the flow run to execute before
Expand All @@ -79,6 +77,7 @@ async def run_deployment(
Args:
name: The deployment id or deployment name in the form:
`"flow name/deployment name"`
client: An optional PrefectClient to use for API requests.
parameters: Parameter overrides for this flow run. Merged with the deployment
defaults.
scheduled_time: The time to schedule the flow run for, defaults to scheduling
Expand All @@ -100,6 +99,18 @@ async def run_deployment(
job_variables: A dictionary of dot delimited infrastructure overrides that
will be applied at runtime; for example `env.CONFIG_KEY=config_value` or
`namespace='prefect'`

Example:
```python
import asyncio
from prefect.deployments import arun_deployment

async def main():
flow_run = await arun_deployment("my-flow/my-deployment")
print(flow_run.state)

asyncio.run(main())
```
"""
if timeout is not None and timeout < 0:
raise ValueError("`timeout` cannot be negative")
Expand All @@ -119,6 +130,8 @@ async def run_deployment(
except ValueError:
pass

client, _ = get_or_create_client(client)

if deployment_id:
deployment = await client.read_deployment(deployment_id=deployment_id)
else:
Expand All @@ -133,7 +146,7 @@ async def run_deployment(

# This was called from a flow. Link the flow run as a subflow.
task_inputs = {
k: await collect_task_run_inputs(v) for k, v in parameters.items()
k: collect_task_run_inputs(v) for k, v in parameters.items()
}

# Track parent task if this is being called from within a task
Expand Down Expand Up @@ -196,7 +209,7 @@ async def run_deployment(
trace_labels = {LABELS_TRACEPARENT_KEY: traceparent} if traceparent else {}

flow_run = await client.create_flow_run_from_deployment(
deployment.id,
deployment_id,
parameters=parameters,
state=Scheduled(scheduled_time=scheduled_time),
name=flow_run_name,
Expand All @@ -215,10 +228,14 @@ async def run_deployment(

with anyio.move_on_after(timeout):
while True:
await anyio.sleep(poll_interval)
flow_run = await client.read_flow_run(flow_run_id)
flow_state = flow_run.state
if flow_state and flow_state.is_final():
return flow_run
await anyio.sleep(poll_interval)

return flow_run


# Alias for backwards compatibility
run_deployment = arun_deployment
Loading
Loading