Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions src/ewoks/task_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
from collections.abc import Mapping
from typing import List, Optional, Union
from ewokscore import Task
from ewoksutils.import_utils import qualname

try:
from ewoksjob.client import submit
except ImportError:
submit = None

from .bindings import execute_graph, load_graph

__all__ = ["task_inputs", "execute_task", "submit_task"]


def task_inputs(
id: Optional[str] = None,
label: Optional[str] = None,
task_identifier: Optional[str] = None,
inputs: Optional[Mapping] = None,
) -> List[dict]:
"""Convert a {name: value} dict of inputs to a list of workflow inputs for given tasks.

Provide one of ``id``, ``label`` and ``task_identifier`` to select the targeted tasks.

.. code:: python

inputs = task_inputs(task_identifier="SumTask", inputs={"a": 1, "b": 1})

"""
if inputs is None:
return []

task_selector = {}
if id is not None:
task_selector["id"] = id
if label is not None:
task_selector["label"] = label
if task_identifier is not None:
task_selector["task_identifier"] = task_identifier

return [{**task_selector, "name": k, "value": v} for k, v in inputs.items()]

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In GitLab by @woutdenolf on Dec 21, 2024, 15:49 GMT+1:

Since task_inputs is public, in which situation would you use it?

Note that ewoks is not installed in the bliss side. If you need it in blissoda then ewoksutils is the best option (dependency of ewoksjob[client] which is installed on the bliss side).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In GitLab by @tvincent on Jan 6, 2025, 10:31 GMT+1:

That would be useful in blissoda and scripts/GUI like in https://gitlab.esrf.fr/workflow/workflowhub/id31workflows.

I'll make a PR with it on ewoksutils.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In GitLab by @tvincent on Jan 6, 2025, 13:54 GMT+1:

PR in ewoksutils: https://gitlab.esrf.fr/workflow/ewoks/ewoksutils/-/merge_requests/32



def _task_graph(
task_identifier: str,
task_type: str,
):
return {
"graph": {"id": task_identifier},
"nodes": [
{
"id": task_identifier,
"task_type": task_type,
"task_identifier": task_identifier,
},
],
"links": [],
}


def _convert_inputs(
task_identifier: str,
inputs: Optional[Union[Mapping, List[Mapping]]],
):
if inputs is None:
return []
if isinstance(inputs, Mapping):
return task_inputs(task_identifier=task_identifier, inputs=inputs)
return inputs


def execute_task(
task: Union[str, Task],
inputs: Optional[Union[Mapping, List[Mapping]]] = None,
task_type: str = "class",
**options,
):

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In GitLab by @woutdenolf on Dec 21, 2024, 15:42 GMT+1:

Suppose we want to run this function with ewoks

# mytasks.py

def sumfunc(a, b):
    return a+b

If the idea is to provide an API that is as close to calling a normal python function, how about this?

sumab = execute_task("mytasks.sumfunc", task_type="method")(a=10, b=20)

And if the default task_type is "method"

sumab = execute_task("mytasks.sumfunc")(a=10, b=20)

And since we can handle positional arguments as well

sumab = execute_task("mytasks.sumfunc")(10, 20)

which is as close as it gets to

sumab = sumfunc(10, 20)

essentially

sumfunc = execute_task("mytasks.sumfunc")

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In GitLab by @woutdenolf on Dec 21, 2024, 15:49 GMT+1:

Same goes for submit_task.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In GitLab by @tvincent on Jan 6, 2025, 10:31 GMT+1:

Makes sense!

As it is, I made it similar to execute_graph and submit_graph API but it would be more pythonic this way like the first attempt for this (https://gitlab.esrf.fr/workflow/ewoks/ewokscore/-/merge_requests/263).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In GitLab by @tvincent on Jan 6, 2025, 14:21 GMT+1:

And if the default task_type is "method"

I used "class" type by default since for execute_task at least it is of limited interest to call method task.

Best IMO would be to see if a "auto" task_type is possible: It sound possible for class, method, graph, script and notebook types at least.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In GitLab by @tvincent on Jan 6, 2025, 14:26 GMT+1:

Using positional only args can also be considered:

def execute_task(task, task_type: str = "class", options: Optional[dict] = None, /, **kwargs):
    ...

sumab = execute_task("mytasks.sumfunc", "method", a=10, b=20)

sumab = execute_task("mytasks.sumtask", a=10, b=20)

though it's a bit cumbersome to pass extra options (here the task_type MUST also be provided)

"""Execute a workflow with a unique task and return its output.

:param task: Task identifier or Task class
:param inputs:
Task inputs as a {name: value} mapping or a list of workflow inputs
:param task_type: The kind of task
:param options: Options passed to :func:`ewoks.bindings.execute_graph`
"""
task_identifier = task if isinstance(task, str) else qualname(task)

return execute_graph(
_task_graph(task_identifier, task_type),
inputs=_convert_inputs(task_identifier, inputs),
**options,
)


def submit_task(
task_identifier: str,
inputs: Optional[Union[Mapping, List[Mapping]]] = None,
task_type: str = "class",
_celery_options=None,
**options,
):
"""Submit a workflow containing a unique task to be executed remotely.

:param task: Task identifier
:param inputs:
Task inputs as a {name: value} mapping or a list of workflow inputs
:param task_type: The kind of task
:param _celery_options: Option passed to :func:`ewoksjob.client.submit`
:param options: Options passed as kwargs to :func:`ewoksjob.client.submit`
"""
if submit is None:
raise RuntimeError("requires the 'ewoksjob' package")
if _celery_options is None:
_celery_options = dict()

graph = load_graph(
graph=_task_graph(task_identifier, task_type),
inputs=_convert_inputs(task_identifier, inputs),
).serialize()
print(graph)

return submit(args=(graph,), kwargs=options, **_celery_options)
21 changes: 21 additions & 0 deletions src/ewoks/tests/test_task_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import pytest

from ewoks.task_utils import execute_task, task_inputs
from ewokscore.tests.examples.tasks.sumtask import SumTask


@pytest.mark.parametrize("selector", ["id", "label", "task_identifier"])
def test_task_inputs(selector):
inputs = task_inputs(**{selector: "task"}, inputs={"a": 1, "b": "test"})
assert inputs == [
{selector: "task", "name": "a", "value": 1},
{selector: "task", "name": "b", "value": "test"},
]


@pytest.mark.parametrize(
"task", [SumTask, "ewokscore.tests.examples.tasks.sumtask.SumTask"]
)
def test_execute_task(task):
result = execute_task(task, inputs={"a": 1, "b": 1})
assert result == {"result": 2}