Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Support links that are explicitly optional (`required=False`).
- Support input caching from optional links (`cache_if_optional=True`).

### Changed

- Buffer inputs from optional links that arrive before the first execution.

### Fixed

- Protect input cache against parallel execution of the same `InputMergeActor` instance.

## [2.0.2] - 2026-02-15

### Fixed
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ classifiers = [
]
requires-python = ">=3.8"
dependencies = [
"ewokscore >=4.0.1",
"pypushflow >=2.0.0rc1"
"ewokscore >=5.0.0rc1",
"pypushflow >=2.0.0rc1",
]

[project.urls]
Expand Down
193 changes: 157 additions & 36 deletions src/ewoksppf/bindings.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import os
import threading
import warnings
from contextlib import contextmanager
from typing import Dict
from typing import Generator
from typing import List
from typing import Optional
Expand Down Expand Up @@ -172,18 +174,20 @@ def __init__(
name="Name mapper",
trigger_on_error=False,
required=False,
cache_if_optional=False,
**kw,
):
super().__init__(name=name, **kw)
self.namemap = namemap
self.map_all_data = map_all_data
self.trigger_on_error = trigger_on_error
self.required = required
self.cache_if_optional = cache_if_optional

def connect(self, actor):
super().connect(actor)
if isinstance(actor, InputMergeActor):
actor.require_input_from_actor(self)
actor.register_input_actor(self)

def _execute(self, inData: dict, _scope_id: Optional[str] = None) -> None:
is_error = "WorkflowExceptionInstance" in inData and inData.get(
Expand Down Expand Up @@ -215,56 +219,163 @@ def _execute(self, inData: dict, _scope_id: Optional[str] = None) -> None:


class InputMergeActor(AbstractActor):
"""Requires triggers from some input actors before triggering
the downstream actors.

It remembers the last input from the required uptstream actors.
Only the last non-required input is remembered.
"""Requires triggers from some input actors before triggering the downstream actors.
Optional triggers are cached or buffered (before first execution) and the last one is retained.
"""

def __init__(self, parent=None, name="Input merger", **kw):
super().__init__(parent=parent, name=name, **kw)
self.startInData = list()
self.requiredInData = dict()
self.nonrequiredInData = dict()

def require_input_from_actor(self, actor):
# List of input dicts provided by the graph startargs (not part of the Ewoks SPEC)
self._cached_start_triggers: List[dict] = list()

# Map actor to input dict provided by that actor
self._cached_required_triggers: Dict[AbstractActor, dict] = dict()
self._cached_optional_triggers: Dict[AbstractActor, dict] = dict()

# List of input dicts provided by optional links without caching
# that arrived before all required triggers arrived
self._buffer_optional_triggers: List[dict] = list()
self._buffering = True

# Retain only one input dict provided by optional links without caching
# after all required triggers arrived
self._retained_optional_trigger: Optional[dict] = None

self._lock = threading.Lock()

def register_input_actor(self, actor: Optional[AbstractActor]):
if actor.required:
self.requiredInData[actor] = None
info = "(required): cache inputs"
self._cached_required_triggers[actor] = None
elif actor.cache_if_optional:
info = "(optional): cache inputs"
self._cached_optional_triggers[actor] = None
else:
info = "(optional): buffer inputs before first execution and then retain the last one"
# see self._buffer_optional_triggers
self.logger.info("%s %s", actor.name, info)

def _execute(
self, inData: dict, _scope_id: Optional[str] = None, source=None
self,
inData: dict,
_scope_id: Optional[str] = None,
source: Optional[AbstractActor] = None,
) -> None:
self.setStarted()
self.setFinished()
if source is None:
self.startInData.append(inData)
with self._lock:
self.setStarted()
try:
self._cache_inputs(source, inData)
finally:
self.setFinished()

if not self._has_all_required_triggers():
return

self._propagate_cached_inputs()

def _propagate_cached_inputs(self) -> None:
if not self._buffering:
# Execute with the retained inputs from the last trigger
# of an optional link without caching. Might be `None`
# when there is none.
buffer = [self._retained_optional_trigger]
else:
if source in self.requiredInData:
self.requiredInData[source] = inData
if self._buffer_optional_triggers:
# Execute for each retained inputs from optional links without caching.
buffer = list(self._buffer_optional_triggers)
else:
# Execute once without any retained inputs.
buffer = [None]

for i, retained_inputs in enumerate(buffer):
try:
self._trigger_downstream(retained_inputs)
except Exception:
if self._buffering:
# Keep the inputs not successfully propagated.
self._buffer_optional_triggers = buffer[i:]
raise

if self._buffering:
if buffer:
# Retain the last one for the next trigger.
# Might be `None` when there is none.
self._retained_optional_trigger = buffer[-1]
else:
self.nonrequiredInData = inData
missing = {k: v for k, v in self.requiredInData.items() if v is None}
if missing:
self._retained_optional_trigger = None

# No more buffering, only retain one.
self._buffering = False

# No longer needed so do not keep references.
self._buffer_optional_triggers.clear()

def _cache_inputs(self, source: Optional[AbstractActor], inData: dict) -> None:
if source is None:
self._cached_start_triggers.append(inData)
return

if source in self._cached_required_triggers:
# Cache inputs from required link
self._cached_required_triggers[source] = inData
elif source in self._cached_optional_triggers:
# Cache inputs from optional link
self._cached_optional_triggers[source] = inData
elif self._buffering:
# Did not execute yet
self._buffer_optional_triggers.append(inData)
else:
# Executed at least once
self._retained_optional_trigger = inData

def _has_all_required_triggers(self) -> bool:
missing_required = {
k: v for k, v in self._cached_required_triggers.items() if v is None
}
if missing_required:
self.logger.info(
"not triggering downstream actors because missing inputs from actors %s",
[actor.name for actor in missing],
[actor.name for actor in missing_required],
)
return
self.logger.info(
"triggering downstream actors (%d start inputs, %d required inputs, %d optional inputs)",
len(self.startInData),
len(self.requiredInData),
int(bool(self.nonrequiredInData)),
)
newInData = dict()
for data in self.startInData:
newInData.update(data)
for data in self.requiredInData.values():
newInData.update(data)
newInData.update(self.nonrequiredInData)
return False
return True

def _trigger_downstream(self, retained_inputs: Optional[dict]):
merged_inputs = self._downstream_inputs(retained_inputs)
for actor in self.listDownStreamActor:
actor.trigger(newInData)
actor.trigger(merged_inputs)

def _downstream_inputs(self, retained_inputs: Optional[dict]) -> dict:
self.logger.debug(
"Trigger downstream actor with merged inputs from\n "
"%d graph start triggers\n "
"%d cached required links\n "
"%d cached optional links\n "
"%d retained optional links",
len(self._cached_start_triggers),
len(self._cached_required_triggers),
len(self._cached_optional_triggers),
int(retained_inputs is not None),

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the number of retained links is always 1 or 0 ?

What is a "retained link" by the way? 😅

@woutdenolf woutdenolf Apr 23, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no such thing as a "retained link". Inputs are cached, buffered or retained:

https://ewokscore.readthedocs.io/en/latest/reference/specs.html#summary

Buffered and retained is essentially the same from a link POV: inputs from optional non-cached links.

We buffer until the first execution (which happens when all required links are cached), after which the buffer is purged one-by-one (execute once for each item we purge) and the last one is retained.

@woutdenolf woutdenolf Apr 23, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the implementation I use two attributes (I added the type annotation in the code)

_buffer_optional_triggers:List[dict]
_retained_optional_trigger:Optional[dict]

In the beginning I had only _buffer_optional_triggers on which we append and pop the inputs from optional non-cached links.

This matches the description better but the implementation is horrible due to the appending and popping which also depends on when it happens.

Now we

  • use _buffer_optional_triggers until the first execution: keep many to use later (aka buffering)
  • purge _buffer_optional_triggers on first execution and put the last one in _retained_optional_trigger
  • after that only use _retained_optional_trigger: keep only 1 (aka it retains the last one)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the number of retained links is always 1 or 0 ?

The number of retained inputs is always 1 or 0.

)

merged_inputs = dict()
for data in self._cached_start_triggers:
merged_inputs.update(data)

for data in self._cached_required_triggers.values():
merged_inputs.update(data)

for data in self._cached_optional_triggers.values():
if data is None:
# Optional link not triggered yet
continue
merged_inputs.update(data)

if retained_inputs:
merged_inputs.update(retained_inputs)

return merged_inputs


class EwoksWorkflow(Workflow):
Expand Down Expand Up @@ -444,25 +555,35 @@ def _create_name_mapper(
self, taskgraph: TaskGraph, source_id: NodeIdType, target_id: NodeIdType
) -> NameMapperActor:
link_attrs = taskgraph.graph[source_id][target_id]

# Data mapping
map_all_data = link_attrs.get("map_all_data", False)
data_mapping = link_attrs.get("data_mapping", list())
data_mapping = {
item["target_input"]: item["source_output"] for item in data_mapping
}

# Conditional link
on_error = link_attrs.get("on_error", False)
cache_if_optional = link_attrs.get("cache_if_optional", False)

# Required link
required = analysis.link_is_required(taskgraph.graph, source_id, target_id)

source_label = ppfname(source_id)
target_label = ppfname(target_id)
if on_error:
name = f"Name mapper <{source_label} -only on error- {target_label}>"
else:
name = f"Name mapper <{source_label} - {target_label}>"

return NameMapperActor(
name=name,
namemap=data_mapping,
map_all_data=map_all_data,
trigger_on_error=on_error,
required=required,
cache_if_optional=cache_if_optional,
**self._actor_arguments,
)

Expand Down
3 changes: 3 additions & 0 deletions src/ewoksppf/tests/test_ppf_workflow21.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,10 @@ def submodel21_on_error():
def workflow21(on_error):
if on_error:
submodel21 = submodel21_on_error
out1_required = False

@woutdenolf woutdenolf Apr 19, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI this test fails in the main branch when on_error=True with ARG_FAILURE since ewokscore 5.0.0rc1

flowchart TD

    n1["in"]
    n2["raise_not_greater_than"]
    n3["out1"]
    n4["out2"]
    n5["out"]

    n1 --> n2
    n2 --> n3
    n2 -->|on error| n4
    n3 -->|"required=False" fixes the test| n5
    n4 --> n5
Loading

@woutdenolf woutdenolf Apr 19, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Graph analysis before ewokscore 5.0.0rc1

in → gt     required=True
gt → out1   required=True
gt → out2   required=False
out1 → out  required=False
out2 → out  required=False

Graph analysis since ewokscore 5.0.0rc1

in → gt     required=True
gt → out1   required=True
gt → out2   required=False
out1 → out  required=True
out2 → out  required=False

With ARG_FAILURE the out1 node does not get executed. So if the graph analysis says it is required then node out never gets executed and the output of the workflow is empty which causes the test to fail.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before ewokscore 5.0.0rc1

def link_is_required(
    graph: networkx.DiGraph, source_id: NodeIdType, target_id: NodeIdType
) -> bool:
    if link_has_required(graph, source_id, target_id):
        return True
    if link_is_conditional(graph, source_id, target_id):
        return False

    not_required = node_has_ancestors(
        graph, source_id, link_has_required=False, link_is_conditional=True
    )
    not_required |= node_has_ancestors(graph, source_id, node_has_error_handlers=True)
    return not not_required

Since ewokscore 5.0.0rc1

def link_is_required(
    graph: networkx.DiGraph, source_id: NodeIdType, target_id: NodeIdType
) -> bool:
    # Explicitly required or optional
    if link_is_explicitly_required(graph, source_id, target_id):
        return True
    if link_is_explicitly_optional(graph, source_id, target_id):
        return False

    # By default, conditional links are optional
    if link_is_conditional(graph, source_id, target_id):
        return False

    # By default, links with at least one non-required link upstream become non-required
    return not node_has_ancestors(graph, source_id, link_is_required=False)

@woutdenolf woutdenolf Apr 19, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition makes the difference (which is now removed since 5.0.0rc1)

optional = node_has_ancestors(graph, source_id, node_has_error_handlers=True)

It makes out1 → out optional because an ancestor node of out1 has an error handler ( that would be the gt node).

@woutdenolf woutdenolf Apr 19, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no right or wrong answer here. We just need to be sure the ewoks SPEC is clear.

New SPEC of the link attribute `required

https://ewokscore.readthedocs.io/en/latest/reference/specs.html#link-attributes

required (optional): The link is required when set to True. The link is optional if False. A target node can only be executed after all its required predecessors have executed successfully. If a target has multiple required incoming links, it will be scheduled once all corresponding source tasks have completed (and may be scheduled multiple times as additional inputs from optional links arrive). See Node execution section for more details.
If the attribute is not explicitly specified (default behavior), the link is considered required when it is unconditional (i.e. has no conditions nor on_error=True) and all ancestors of the source node are connected through required links. Otherwise, the link is treated as optional.

Old SPEC of the link attribute `required

https://ewokscore.readthedocs.io/en/v4.0.2/reference/specs.html#link-attributes

required (optional): setting this to True marks the link as required. When a target receives multiple links, it will be executed (perhaps multiple times) when all the sources connected to the target with required links have been executed. A link is required when it is either “marked as required” (link attribute required=True) or “unconditional and all ancestors of the source node are required”.

Conclusion

Note that the old description does not actually describe the way the graph analysis considers ancestors to be required when required != True. Note also that required=False was ignored, so the same as required=None or not provided.

So ewokscore 5.0.0rc1 has modified the graph analysis result for link_is_required that was not documented before.

@woutdenolf woutdenolf Apr 19, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So before ewokscore 5.0.0rc1 the graph analysis provides the desired required value of out1-out2 in this examples which after ewokscore 5.0.0rc1 it does not

flowchart TD

    n1["in"]
    n2["raise_not_greater_than"]
    n3["out1"]
    n4["out2"]
    n5["out"]

    n1 --> n2
    n2 --> n3
    n2 -->|on error| n4
    n3 -->|"required=False" from old analysis which is desired| n5
    n4 --> n5
Loading

Before ewokscore 5.0.0rc1 the graph analysis does NOT provide the desired required value of out1-out3 in this example while after ewokscore 5.0.0rc1 it does

flowchart TD

    n1["in1"]
    n2["raise_not_greater_than"]
    n3["out1"]
    n4["out2"]
    n5["out3"]
    n6["out4"]
    n7["needed input"]

    n1 --> n2
    n2 --> n3
    n2 -->|on error| n4
    n3 -->|"required=False" from old analysis which is NOT desired| n5
    n4 --> n6
    n1 --> n7
    n7 --> n5
Loading

When raise_not_greater_than raises we do not want out3 to be executed.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course the long term goal is to make the ewokscore graph analysis more intelligent (discussion: https://confluence.esrf.fr/display/AAWWK/Graph+analysis) so that it looks at forking and merging of branches, not just looking at ancestor links being required or not.

@woutdenolf woutdenolf Apr 19, 2026

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Workflows that will be affected but the graph analysis change:

  • Daiquiri workflows: they want the new behavior (error handlers for mimosa never merge back into the main branch)
  • BES workflows: sub-workflows tend to merge to a single output node so this change might require some explicit required=False.

else:
submodel21 = submodel21_conditions
out1_required = None

nodes = [
{"id": "in", "task_type": "method", "task_identifier": qualname(passthrough)},
Expand Down Expand Up @@ -132,6 +134,7 @@ def workflow21(on_error):
{
"source": "out1",
"target": "out",
"required": out1_required,
"data_mapping": [{"source_output": "return_value", "target_input": "a"}],
},
{
Expand Down
Loading
Loading