From b1b238e9b5ccce238fed9860347e938fe1651b71 Mon Sep 17 00:00:00 2001 From: Thomas VINCENT Date: Thu, 19 Dec 2024 16:14:47 +0100 Subject: [PATCH] Add few helper to create task inputs and execute/submit one task --- src/ewoks/task_utils.py | 123 +++++++++++++++++++++++++++++ src/ewoks/tests/test_task_utils.py | 21 +++++ 2 files changed, 144 insertions(+) create mode 100644 src/ewoks/task_utils.py create mode 100644 src/ewoks/tests/test_task_utils.py diff --git a/src/ewoks/task_utils.py b/src/ewoks/task_utils.py new file mode 100644 index 0000000..f37c887 --- /dev/null +++ b/src/ewoks/task_utils.py @@ -0,0 +1,123 @@ +from collections.abc import Mapping +from typing import List, Optional, Union +from ewokscore import Task +from ewoksutils.import_utils import qualname + +try: + from ewoksjob.client import submit +except ImportError: + submit = None + +from .bindings import execute_graph, load_graph + +__all__ = ["task_inputs", "execute_task", "submit_task"] + + +def task_inputs( + id: Optional[str] = None, + label: Optional[str] = None, + task_identifier: Optional[str] = None, + inputs: Optional[Mapping] = None, +) -> List[dict]: + """Convert a {name: value} dict of inputs to a list of workflow inputs for given tasks. + + Provide one of ``id``, ``label`` and ``task_identifier`` to select the targeted tasks. + + .. code:: python + + inputs = task_inputs(task_identifier="SumTask", inputs={"a": 1, "b": 1}) + + """ + if inputs is None: + return [] + + task_selector = {} + if id is not None: + task_selector["id"] = id + if label is not None: + task_selector["label"] = label + if task_identifier is not None: + task_selector["task_identifier"] = task_identifier + + return [{**task_selector, "name": k, "value": v} for k, v in inputs.items()] + + +def _task_graph( + task_identifier: str, + task_type: str, +): + return { + "graph": {"id": task_identifier}, + "nodes": [ + { + "id": task_identifier, + "task_type": task_type, + "task_identifier": task_identifier, + }, + ], + "links": [], + } + + +def _convert_inputs( + task_identifier: str, + inputs: Optional[Union[Mapping, List[Mapping]]], +): + if inputs is None: + return [] + if isinstance(inputs, Mapping): + return task_inputs(task_identifier=task_identifier, inputs=inputs) + return inputs + + +def execute_task( + task: Union[str, Task], + inputs: Optional[Union[Mapping, List[Mapping]]] = None, + task_type: str = "class", + **options, +): + """Execute a workflow with a unique task and return its output. + + :param task: Task identifier or Task class + :param inputs: + Task inputs as a {name: value} mapping or a list of workflow inputs + :param task_type: The kind of task + :param options: Options passed to :func:`ewoks.bindings.execute_graph` + """ + task_identifier = task if isinstance(task, str) else qualname(task) + + return execute_graph( + _task_graph(task_identifier, task_type), + inputs=_convert_inputs(task_identifier, inputs), + **options, + ) + + +def submit_task( + task_identifier: str, + inputs: Optional[Union[Mapping, List[Mapping]]] = None, + task_type: str = "class", + _celery_options=None, + **options, +): + """Submit a workflow containing a unique task to be executed remotely. + + :param task: Task identifier + :param inputs: + Task inputs as a {name: value} mapping or a list of workflow inputs + :param task_type: The kind of task + :param _celery_options: Option passed to :func:`ewoksjob.client.submit` + :param options: Options passed as kwargs to :func:`ewoksjob.client.submit` + """ + if submit is None: + raise RuntimeError("requires the 'ewoksjob' package") + if _celery_options is None: + _celery_options = dict() + + graph = load_graph( + graph=_task_graph(task_identifier, task_type), + inputs=_convert_inputs(task_identifier, inputs), + ).serialize() + print(graph) + + return submit(args=(graph,), kwargs=options, **_celery_options) diff --git a/src/ewoks/tests/test_task_utils.py b/src/ewoks/tests/test_task_utils.py new file mode 100644 index 0000000..fda9c6f --- /dev/null +++ b/src/ewoks/tests/test_task_utils.py @@ -0,0 +1,21 @@ +import pytest + +from ewoks.task_utils import execute_task, task_inputs +from ewokscore.tests.examples.tasks.sumtask import SumTask + + +@pytest.mark.parametrize("selector", ["id", "label", "task_identifier"]) +def test_task_inputs(selector): + inputs = task_inputs(**{selector: "task"}, inputs={"a": 1, "b": "test"}) + assert inputs == [ + {selector: "task", "name": "a", "value": 1}, + {selector: "task", "name": "b", "value": "test"}, + ] + + +@pytest.mark.parametrize( + "task", [SumTask, "ewokscore.tests.examples.tasks.sumtask.SumTask"] +) +def test_execute_task(task): + result = execute_task(task, inputs={"a": 1, "b": 1}) + assert result == {"result": 2}