From 306e1107fa39ba29d249ac340ba81c6f8b6f81af Mon Sep 17 00:00:00 2001 From: woutdenolf Date: Fri, 6 Mar 2026 17:24:03 +0100 Subject: [PATCH 01/15] support caching of inputs from non-required links --- CHANGELOG.md | 5 + src/ewoksppf/bindings.py | 90 ++++++-- src/ewoksppf/tests/test_ppf_workflow25.py | 266 ++++++++++++++++++++++ 3 files changed, 339 insertions(+), 22 deletions(-) create mode 100644 src/ewoksppf/tests/test_ppf_workflow25.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 140d9bf..259a420 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Support non-required link caching. +- Support conditional links that do not have conditions and are not error handlers. + ## [2.0.2] - 2026-02-15 ### Fixed diff --git a/src/ewoksppf/bindings.py b/src/ewoksppf/bindings.py index 5044d84..6941014 100644 --- a/src/ewoksppf/bindings.py +++ b/src/ewoksppf/bindings.py @@ -172,6 +172,7 @@ def __init__( name="Name mapper", trigger_on_error=False, required=False, + cache_non_required=False, **kw, ): super().__init__(name=name, **kw) @@ -179,11 +180,12 @@ def __init__( self.map_all_data = map_all_data self.trigger_on_error = trigger_on_error self.required = required + self.cache_non_required = cache_non_required def connect(self, actor): super().connect(actor) if isinstance(actor, InputMergeActor): - actor.require_input_from_actor(self) + actor.register_input_actor(self) def _execute(self, inData: dict, _scope_id: Optional[str] = None) -> None: is_error = "WorkflowExceptionInstance" in inData and inData.get( @@ -224,47 +226,81 @@ class InputMergeActor(AbstractActor): def __init__(self, parent=None, name="Input merger", **kw): super().__init__(parent=parent, name=name, **kw) - self.startInData = list() - self.requiredInData = dict() - self.nonrequiredInData = dict() - def require_input_from_actor(self, actor): + # List of input dict's provided by the graph startargs + self._start_inputs = list() + + # Inputs dict provided by the last non-required actor + self._non_required_last_inputs = dict() + + # Map required actor to inputs dict provided by that actor + self._required_inputs = dict() + + # Map non-required but cached actor to inputs dict provided by that actor + self._cache_non_required_inputs = dict() + + def register_input_actor(self, actor): if actor.required: - self.requiredInData[actor] = None + info = "(required): cache inputs" + self._required_inputs[actor] = None + elif actor.cache_non_required: + info = "(non-required): cache inputs" + self._cache_non_required_inputs[actor] = None + else: + info = "(non-required): use inputs only once (do not cache)" + # Inputs provided by this actor are not cache and will be used only once + # See: self._non_required_last_inputs + self.logger.info("%s %s", actor.name, info) def _execute( self, inData: dict, _scope_id: Optional[str] = None, source=None ) -> None: self.setStarted() self.setFinished() + if source is None: - self.startInData.append(inData) + self._start_inputs.append(inData) else: - if source in self.requiredInData: - self.requiredInData[source] = inData + if source in self._required_inputs: + self._required_inputs[source] = inData + elif source in self._cache_non_required_inputs: + self._cache_non_required_inputs[source] = inData else: - self.nonrequiredInData = inData - missing = {k: v for k, v in self.requiredInData.items() if v is None} + self._non_required_last_inputs = inData + + missing = {k: v for k, v in self._required_inputs.items() if v is None} if missing: self.logger.info( "not triggering downstream actors because missing inputs from actors %s", [actor.name for actor in missing], ) return + self.logger.info( - "triggering downstream actors (%d start inputs, %d required inputs, %d optional inputs)", - len(self.startInData), - len(self.requiredInData), - int(bool(self.nonrequiredInData)), + "triggering downstream actors with input merger if %d graph start, %d required actors, %d non-required cached actors and %d non-required actors", + len(self._start_inputs), + len(self._required_inputs), + len(self._cache_non_required_inputs), + int(bool(self._non_required_last_inputs)), ) - newInData = dict() - for data in self.startInData: - newInData.update(data) - for data in self.requiredInData.values(): - newInData.update(data) - newInData.update(self.nonrequiredInData) + + merged_inputs = dict() + for data in self._start_inputs: + merged_inputs.update(data) + + for data in self._required_inputs.values(): + merged_inputs.update(data) + + for data in self._cache_non_required_inputs.values(): + if data is None: + # Non-required branch not triggered yet + continue + merged_inputs.update(data) + + merged_inputs.update(self._non_required_last_inputs) + for actor in self.listDownStreamActor: - actor.trigger(newInData) + actor.trigger(merged_inputs) class EwoksWorkflow(Workflow): @@ -444,25 +480,35 @@ def _create_name_mapper( self, taskgraph: TaskGraph, source_id: NodeIdType, target_id: NodeIdType ) -> NameMapperActor: link_attrs = taskgraph.graph[source_id][target_id] + + # Data mapping map_all_data = link_attrs.get("map_all_data", False) data_mapping = link_attrs.get("data_mapping", list()) data_mapping = { item["target_input"]: item["source_output"] for item in data_mapping } + + # Conditional link on_error = link_attrs.get("on_error", False) + cache_non_required = link_attrs.get("cache_non_required", False) + + # Required link required = analysis.link_is_required(taskgraph.graph, source_id, target_id) + source_label = ppfname(source_id) target_label = ppfname(target_id) if on_error: name = f"Name mapper <{source_label} -only on error- {target_label}>" else: name = f"Name mapper <{source_label} - {target_label}>" + return NameMapperActor( name=name, namemap=data_mapping, map_all_data=map_all_data, trigger_on_error=on_error, required=required, + cache_non_required=cache_non_required, **self._actor_arguments, ) diff --git a/src/ewoksppf/tests/test_ppf_workflow25.py b/src/ewoksppf/tests/test_ppf_workflow25.py new file mode 100644 index 0000000..1e2b8ce --- /dev/null +++ b/src/ewoksppf/tests/test_ppf_workflow25.py @@ -0,0 +1,266 @@ +import time + +from ewokscore.task import Task +from ewoksutils.import_utils import qualname + +from ..bindings import execute_graph + + +class Required(Task, input_names=["compute_time"], output_names=["param1", "param2"]): + def run(self): + time.sleep(self.inputs.compute_time) + self.outputs.param1 = -1 + self.outputs.param2 = -2 + + +class Metric(Task, input_names=["compute_time", "metric"], output_names=["metric"]): + def run(self): + time.sleep(self.inputs.compute_time) + self.outputs.metric = self.inputs.metric + + +class Timeout(Task, input_names=["timeout"], output_names=["timeout"]): + def run(self): + time.sleep(self.inputs.timeout) + self.outputs.timeout = True + + +class Decider( + Task, + input_names=["param1", "param2", "metric_threshold"], + optional_input_names=[ + "metric1", + "metric2", + "metric3", + "metric4", + "timeout", + "disable", + ], + output_names=["good_enough", "reason", "set_disable"], +): + def run(self): + print("\nDecider inputs:", self.get_input_values()) + print("Metric threshold:", self.inputs.metric_threshold) + if self.inputs.param1 != -1: + raise ValueError(f"param1 is {self.inputs.param1} intead of -1") + + if self.inputs.param2 != -2: + raise ValueError(f"param2 is {self.inputs.param2} intead of -2") + + if self.inputs.disable: + print("IGNORE") + self.outputs.good_enough = None + self.outputs.reason = None + self.outputs.set_disable = False + return + + good_metrics = [] + nmetrics = 0 + for name in ["metric1", "metric2", "metric3", "metric4"]: + value = self.get_input_value(name, None) + if value is None: + continue + nmetrics += 1 + if value >= self.inputs.metric_threshold: + good_metrics.append(name) + print("Metrics that are good:", good_metrics) + + if any(good_metrics): + print("TRIGGER GOOD") + self.outputs.good_enough = True + self.outputs.reason = ", ".join(good_metrics) + self.outputs.set_disable = True + return + + if self.get_input_value("timeout"): + print("TRIGGER BAD") + self.outputs.good_enough = False + self.outputs.reason = "timeout" + self.outputs.set_disable = True + return + + if nmetrics == 4: + print("TRIGGER BAD") + self.outputs.good_enough = False + self.outputs.reason = "none" + self.outputs.set_disable = True + return + + print("TRIGGER NOTHING") + self.outputs.good_enough = None + self.outputs.reason = None + self.outputs.set_disable = False + + +class Good(Task, input_names=["reason"], output_names=["state", "reason"]): + def run(self): + print("\nGOOD:", self.inputs.reason) + self.outputs.state = "GOOD" + self.outputs.reason = self.inputs.reason + + +class Bad(Task, input_names=["reason"], output_names=["state", "reason"]): + def run(self): + print("\nBAD:", self.inputs.reason) + self.outputs.state = "BAD" + self.outputs.reason = self.inputs.reason + + +def workflow(): + nodes = [ + {"id": "required", "task_type": "class", "task_identifier": qualname(Required)}, + {"id": "metric1", "task_type": "class", "task_identifier": qualname(Metric)}, + {"id": "metric2", "task_type": "class", "task_identifier": qualname(Metric)}, + {"id": "metric3", "task_type": "class", "task_identifier": qualname(Metric)}, + {"id": "metric4", "task_type": "class", "task_identifier": qualname(Metric)}, + {"id": "timeout", "task_type": "class", "task_identifier": qualname(Timeout)}, + {"id": "decider", "task_type": "class", "task_identifier": qualname(Decider)}, + {"id": "good", "task_type": "class", "task_identifier": qualname(Good)}, + {"id": "bad", "task_type": "class", "task_identifier": qualname(Bad)}, + ] + links = [ + { + "source": "required", + "target": "decider", + "map_all_data": True, + }, + { + "source": "metric1", + "target": "decider", + "conditional": True, + "cache_non_required": True, + "data_mapping": [{"source_output": "metric", "target_input": "metric1"}], + }, + { + "source": "metric2", + "target": "decider", + "conditional": True, + "cache_non_required": True, + "data_mapping": [{"source_output": "metric", "target_input": "metric2"}], + }, + { + "source": "metric3", + "target": "decider", + "conditional": True, + "cache_non_required": True, + "data_mapping": [{"source_output": "metric", "target_input": "metric3"}], + }, + { + "source": "metric4", + "target": "decider", + "conditional": True, + "cache_non_required": True, + "data_mapping": [{"source_output": "metric", "target_input": "metric4"}], + }, + { + "source": "timeout", + "target": "decider", + "conditional": True, + "cache_non_required": True, + "data_mapping": [{"source_output": "timeout", "target_input": "timeout"}], + }, + { + "source": "decider", + "target": "decider", + "cache_non_required": True, + "data_mapping": [ + {"source_output": "set_disable", "target_input": "disable"} + ], + "conditions": [{"source_output": "set_disable", "value": True}], + }, + { + "source": "decider", + "target": "good", + "data_mapping": [{"source_output": "reason", "target_input": "reason"}], + "conditions": [{"source_output": "good_enough", "value": True}], + }, + { + "source": "decider", + "target": "bad", + "data_mapping": [{"source_output": "reason", "target_input": "reason"}], + "conditions": [{"source_output": "good_enough", "value": False}], + }, + ] + return {"graph": {"id": "workflow"}, "nodes": nodes, "links": links} + + +def create_inputs(metric_threshold: float): + tm = 0.1 + return [ + {"id": "required", "name": "compute_time", "value": 0.1 * tm}, + {"id": "metric1", "name": "compute_time", "value": 1 * tm}, + {"id": "metric1", "name": "metric", "value": 10}, + {"id": "metric2", "name": "compute_time", "value": 2 * tm}, + {"id": "metric2", "name": "metric", "value": 20}, + {"id": "metric3", "name": "compute_time", "value": 3 * tm}, + {"id": "metric3", "name": "metric", "value": 30}, + {"id": "timeout", "name": "timeout", "value": 4 * tm}, + {"id": "metric4", "name": "compute_time", "value": 5 * tm}, + {"id": "metric4", "name": "metric", "value": 40}, + {"id": "decider", "name": "metric_threshold", "value": metric_threshold}, + ] + + +def create_inputs_timeout_last(metric_threshold: float): + tm = 0.1 + return [ + {"id": "required", "name": "compute_time", "value": 0.1 * tm}, + {"id": "metric1", "name": "compute_time", "value": 1 * tm}, + {"id": "metric1", "name": "metric", "value": 10}, + {"id": "metric2", "name": "compute_time", "value": 2 * tm}, + {"id": "metric2", "name": "metric", "value": 20}, + {"id": "metric3", "name": "compute_time", "value": 3 * tm}, + {"id": "metric3", "name": "metric", "value": 30}, + {"id": "metric4", "name": "compute_time", "value": 4 * tm}, + {"id": "metric4", "name": "metric", "value": 40}, + {"id": "timeout", "name": "timeout", "value": 5 * tm}, + {"id": "decider", "name": "metric_threshold", "value": metric_threshold}, + ] + + +def test_ppf_workflow25_metric1(ppf_log_config): + """test 'cache_non_required' links""" + # Metrics that pass: metric1, metric2, metric3 and metric4 + inputs = create_inputs(5) + result = execute_graph(workflow(), inputs=inputs) + assert result == {"reason": "metric1", "state": "GOOD"} + + +def test_ppf_workflow25_metric2(ppf_log_config): + """test 'cache_non_required' links""" + # Metrics that pass: metric2, metric3 and metric4 + inputs = create_inputs(15) + result = execute_graph(workflow(), inputs=inputs) + assert result == {"reason": "metric2", "state": "GOOD"} + + +def test_ppf_workflow25_metric3(ppf_log_config): + """test 'cache_non_required' links""" + # Metrics that pass: metric3 and metric4 + inputs = create_inputs(25) + result = execute_graph(workflow(), inputs=inputs) + assert result == {"reason": "metric3", "state": "GOOD"} + + +def test_ppf_workflow25_timeout1(ppf_log_config): + """test 'cache_non_required' links""" + # Metrics that pass: metric4 + inputs = create_inputs(35) + result = execute_graph(workflow(), inputs=inputs) + assert result == {"reason": "timeout", "state": "BAD"} + + +def test_ppf_workflow25_timeout2(ppf_log_config): + """test 'cache_non_required' links""" + # Metrics that pass: none + inputs = create_inputs(45) + result = execute_graph(workflow(), inputs=inputs) + assert result == {"reason": "timeout", "state": "BAD"} + + +def test_ppf_workflow25_none(ppf_log_config): + """test 'cache_non_required' links""" + # Metrics that pass: none + inputs = create_inputs_timeout_last(45) + result = execute_graph(workflow(), inputs=inputs) + assert result == {"reason": "none", "state": "BAD"} From 5923719e0e8fa02e4474efa9f73346248485181a Mon Sep 17 00:00:00 2001 From: woutdenolf Date: Mon, 9 Mar 2026 09:18:42 +0100 Subject: [PATCH 02/15] rename to cache_if_not_required --- src/ewoksppf/bindings.py | 22 ++++++++++----------- src/ewoksppf/tests/test_ppf_workflow25.py | 24 +++++++++++------------ 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/src/ewoksppf/bindings.py b/src/ewoksppf/bindings.py index 6941014..8e3f9b8 100644 --- a/src/ewoksppf/bindings.py +++ b/src/ewoksppf/bindings.py @@ -172,7 +172,7 @@ def __init__( name="Name mapper", trigger_on_error=False, required=False, - cache_non_required=False, + cache_if_not_required=False, **kw, ): super().__init__(name=name, **kw) @@ -180,7 +180,7 @@ def __init__( self.map_all_data = map_all_data self.trigger_on_error = trigger_on_error self.required = required - self.cache_non_required = cache_non_required + self.cache_if_not_required = cache_if_not_required def connect(self, actor): super().connect(actor) @@ -237,15 +237,15 @@ def __init__(self, parent=None, name="Input merger", **kw): self._required_inputs = dict() # Map non-required but cached actor to inputs dict provided by that actor - self._cache_non_required_inputs = dict() + self._cache_if_not_required_inputs = dict() def register_input_actor(self, actor): if actor.required: info = "(required): cache inputs" self._required_inputs[actor] = None - elif actor.cache_non_required: + elif actor.cache_if_not_required: info = "(non-required): cache inputs" - self._cache_non_required_inputs[actor] = None + self._cache_if_not_required_inputs[actor] = None else: info = "(non-required): use inputs only once (do not cache)" # Inputs provided by this actor are not cache and will be used only once @@ -263,8 +263,8 @@ def _execute( else: if source in self._required_inputs: self._required_inputs[source] = inData - elif source in self._cache_non_required_inputs: - self._cache_non_required_inputs[source] = inData + elif source in self._cache_if_not_required_inputs: + self._cache_if_not_required_inputs[source] = inData else: self._non_required_last_inputs = inData @@ -280,7 +280,7 @@ def _execute( "triggering downstream actors with input merger if %d graph start, %d required actors, %d non-required cached actors and %d non-required actors", len(self._start_inputs), len(self._required_inputs), - len(self._cache_non_required_inputs), + len(self._cache_if_not_required_inputs), int(bool(self._non_required_last_inputs)), ) @@ -291,7 +291,7 @@ def _execute( for data in self._required_inputs.values(): merged_inputs.update(data) - for data in self._cache_non_required_inputs.values(): + for data in self._cache_if_not_required_inputs.values(): if data is None: # Non-required branch not triggered yet continue @@ -490,7 +490,7 @@ def _create_name_mapper( # Conditional link on_error = link_attrs.get("on_error", False) - cache_non_required = link_attrs.get("cache_non_required", False) + cache_if_not_required = link_attrs.get("cache_if_not_required", False) # Required link required = analysis.link_is_required(taskgraph.graph, source_id, target_id) @@ -508,7 +508,7 @@ def _create_name_mapper( map_all_data=map_all_data, trigger_on_error=on_error, required=required, - cache_non_required=cache_non_required, + cache_if_not_required=cache_if_not_required, **self._actor_arguments, ) diff --git a/src/ewoksppf/tests/test_ppf_workflow25.py b/src/ewoksppf/tests/test_ppf_workflow25.py index 1e2b8ce..7c860b6 100644 --- a/src/ewoksppf/tests/test_ppf_workflow25.py +++ b/src/ewoksppf/tests/test_ppf_workflow25.py @@ -128,41 +128,41 @@ def workflow(): "source": "metric1", "target": "decider", "conditional": True, - "cache_non_required": True, + "cache_if_not_required": True, "data_mapping": [{"source_output": "metric", "target_input": "metric1"}], }, { "source": "metric2", "target": "decider", "conditional": True, - "cache_non_required": True, + "cache_if_not_required": True, "data_mapping": [{"source_output": "metric", "target_input": "metric2"}], }, { "source": "metric3", "target": "decider", "conditional": True, - "cache_non_required": True, + "cache_if_not_required": True, "data_mapping": [{"source_output": "metric", "target_input": "metric3"}], }, { "source": "metric4", "target": "decider", "conditional": True, - "cache_non_required": True, + "cache_if_not_required": True, "data_mapping": [{"source_output": "metric", "target_input": "metric4"}], }, { "source": "timeout", "target": "decider", "conditional": True, - "cache_non_required": True, + "cache_if_not_required": True, "data_mapping": [{"source_output": "timeout", "target_input": "timeout"}], }, { "source": "decider", "target": "decider", - "cache_non_required": True, + "cache_if_not_required": True, "data_mapping": [ {"source_output": "set_disable", "target_input": "disable"} ], @@ -219,7 +219,7 @@ def create_inputs_timeout_last(metric_threshold: float): def test_ppf_workflow25_metric1(ppf_log_config): - """test 'cache_non_required' links""" + """test 'cache_if_not_required' links""" # Metrics that pass: metric1, metric2, metric3 and metric4 inputs = create_inputs(5) result = execute_graph(workflow(), inputs=inputs) @@ -227,7 +227,7 @@ def test_ppf_workflow25_metric1(ppf_log_config): def test_ppf_workflow25_metric2(ppf_log_config): - """test 'cache_non_required' links""" + """test 'cache_if_not_required' links""" # Metrics that pass: metric2, metric3 and metric4 inputs = create_inputs(15) result = execute_graph(workflow(), inputs=inputs) @@ -235,7 +235,7 @@ def test_ppf_workflow25_metric2(ppf_log_config): def test_ppf_workflow25_metric3(ppf_log_config): - """test 'cache_non_required' links""" + """test 'cache_if_not_required' links""" # Metrics that pass: metric3 and metric4 inputs = create_inputs(25) result = execute_graph(workflow(), inputs=inputs) @@ -243,7 +243,7 @@ def test_ppf_workflow25_metric3(ppf_log_config): def test_ppf_workflow25_timeout1(ppf_log_config): - """test 'cache_non_required' links""" + """test 'cache_if_not_required' links""" # Metrics that pass: metric4 inputs = create_inputs(35) result = execute_graph(workflow(), inputs=inputs) @@ -251,7 +251,7 @@ def test_ppf_workflow25_timeout1(ppf_log_config): def test_ppf_workflow25_timeout2(ppf_log_config): - """test 'cache_non_required' links""" + """test 'cache_if_not_required' links""" # Metrics that pass: none inputs = create_inputs(45) result = execute_graph(workflow(), inputs=inputs) @@ -259,7 +259,7 @@ def test_ppf_workflow25_timeout2(ppf_log_config): def test_ppf_workflow25_none(ppf_log_config): - """test 'cache_non_required' links""" + """test 'cache_if_not_required' links""" # Metrics that pass: none inputs = create_inputs_timeout_last(45) result = execute_graph(workflow(), inputs=inputs) From aedb4360c76c748be19f88711ebd8657c7fefc03 Mon Sep 17 00:00:00 2001 From: woutdenolf Date: Wed, 11 Mar 2026 14:03:25 +0100 Subject: [PATCH 03/15] conditional=True was replaced by required=False in ewokscore --- src/ewoksppf/tests/test_ppf_workflow25.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/ewoksppf/tests/test_ppf_workflow25.py b/src/ewoksppf/tests/test_ppf_workflow25.py index 7c860b6..c0e5e97 100644 --- a/src/ewoksppf/tests/test_ppf_workflow25.py +++ b/src/ewoksppf/tests/test_ppf_workflow25.py @@ -127,35 +127,35 @@ def workflow(): { "source": "metric1", "target": "decider", - "conditional": True, + "required": False, "cache_if_not_required": True, "data_mapping": [{"source_output": "metric", "target_input": "metric1"}], }, { "source": "metric2", "target": "decider", - "conditional": True, + "required": False, "cache_if_not_required": True, "data_mapping": [{"source_output": "metric", "target_input": "metric2"}], }, { "source": "metric3", "target": "decider", - "conditional": True, + "required": False, "cache_if_not_required": True, "data_mapping": [{"source_output": "metric", "target_input": "metric3"}], }, { "source": "metric4", "target": "decider", - "conditional": True, + "required": False, "cache_if_not_required": True, "data_mapping": [{"source_output": "metric", "target_input": "metric4"}], }, { "source": "timeout", "target": "decider", - "conditional": True, + "required": False, "cache_if_not_required": True, "data_mapping": [{"source_output": "timeout", "target_input": "timeout"}], }, From 4320dab09f85421121bc8b9f60f57ac662b3e4fa Mon Sep 17 00:00:00 2001 From: woutdenolf Date: Wed, 11 Mar 2026 14:04:41 +0100 Subject: [PATCH 04/15] fix test to handle change in 'required' graph analysis --- src/ewoksppf/tests/test_ppf_workflow21.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/ewoksppf/tests/test_ppf_workflow21.py b/src/ewoksppf/tests/test_ppf_workflow21.py index 1ad15d7..be010da 100644 --- a/src/ewoksppf/tests/test_ppf_workflow21.py +++ b/src/ewoksppf/tests/test_ppf_workflow21.py @@ -99,8 +99,10 @@ def submodel21_on_error(): def workflow21(on_error): if on_error: submodel21 = submodel21_on_error + out1_required = False else: submodel21 = submodel21_conditions + out1_required = None nodes = [ {"id": "in", "task_type": "method", "task_identifier": qualname(passthrough)}, @@ -132,6 +134,7 @@ def workflow21(on_error): { "source": "out1", "target": "out", + "required": out1_required, "data_mapping": [{"source_output": "return_value", "target_input": "a"}], }, { From a85709c2757a02659bf8a9ac643816afd024acff Mon Sep 17 00:00:00 2001 From: woutdenolf Date: Thu, 2 Apr 2026 17:38:35 +0200 Subject: [PATCH 05/15] buffer inputs from optional links that arrive before the first execution and rename cache_if_not_required to cache_if_optional --- CHANGELOG.md | 8 +- pyproject.toml | 4 +- src/ewoksppf/bindings.py | 159 +++++++++++++++------- src/ewoksppf/tests/test_ppf_workflow25.py | 24 ++-- 4 files changed, 131 insertions(+), 64 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 259a420..722f029 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,8 +9,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- Support non-required link caching. -- Support conditional links that do not have conditions and are not error handlers. +- Support links that are explicitely optional (`required=False`). +- Support input caching from optional links (`cache_if_optional=True`). + +### Changed + +- Buffer inputs from optional links that arrive before the first execution. ## [2.0.2] - 2026-02-15 diff --git a/pyproject.toml b/pyproject.toml index 01fbe80..5b6881c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,8 +16,8 @@ classifiers = [ ] requires-python = ">=3.8" dependencies = [ - "ewokscore >=4.0.1", - "pypushflow >=2.0.0rc1" + "ewokscore >=5.0.0rc1", + "pypushflow >=2.0.0rc1", ] [project.urls] diff --git a/src/ewoksppf/bindings.py b/src/ewoksppf/bindings.py index 8e3f9b8..1e2afad 100644 --- a/src/ewoksppf/bindings.py +++ b/src/ewoksppf/bindings.py @@ -172,7 +172,7 @@ def __init__( name="Name mapper", trigger_on_error=False, required=False, - cache_if_not_required=False, + cache_if_optional=False, **kw, ): super().__init__(name=name, **kw) @@ -180,7 +180,7 @@ def __init__( self.map_all_data = map_all_data self.trigger_on_error = trigger_on_error self.required = required - self.cache_if_not_required = cache_if_not_required + self.cache_if_optional = cache_if_optional def connect(self, actor): super().connect(actor) @@ -217,39 +217,39 @@ def _execute(self, inData: dict, _scope_id: Optional[str] = None) -> None: class InputMergeActor(AbstractActor): - """Requires triggers from some input actors before triggering - the downstream actors. - - It remembers the last input from the required uptstream actors. - Only the last non-required input is remembered. + """Requires triggers from some input actors before triggering the downstream actors. + Optional triggers are cached or buffered (before first execution) and the last one is retained. """ def __init__(self, parent=None, name="Input merger", **kw): super().__init__(parent=parent, name=name, **kw) - # List of input dict's provided by the graph startargs - self._start_inputs = list() + # List of input dicts provided by the graph startargs (not part of the Ewoks SPEC) + self._cached_start_triggers = list() - # Inputs dict provided by the last non-required actor - self._non_required_last_inputs = dict() + # Map actor to input dict provided by that actor + self._cached_required_triggers = dict() + self._cached_optional_triggers = dict() - # Map required actor to inputs dict provided by that actor - self._required_inputs = dict() + # List of input dicts provided by optional links without caching + # that arrived before all required triggers arrived + self._buffer_optional_triggers = list() + self._no_buffering = False - # Map non-required but cached actor to inputs dict provided by that actor - self._cache_if_not_required_inputs = dict() + # Retain only one input dict provided by optional links without caching + # after all required triggers arrived + self._retained_optional_trigger = None def register_input_actor(self, actor): if actor.required: info = "(required): cache inputs" - self._required_inputs[actor] = None - elif actor.cache_if_not_required: - info = "(non-required): cache inputs" - self._cache_if_not_required_inputs[actor] = None + self._cached_required_triggers[actor] = None + elif actor.cache_if_optional: + info = "(optional): cache inputs" + self._cached_optional_triggers[actor] = None else: - info = "(non-required): use inputs only once (do not cache)" - # Inputs provided by this actor are not cache and will be used only once - # See: self._non_required_last_inputs + info = "(optional): buffer inputs before first execution and then retain the last one" + # see self._buffer_optional_triggers self.logger.info("%s %s", actor.name, info) def _execute( @@ -258,49 +258,112 @@ def _execute( self.setStarted() self.setFinished() - if source is None: - self._start_inputs.append(inData) + self._cache_inputs(source, inData) + + if not self._has_all_required_triggers(): + return + + if self._no_buffering: + # Execute with the retained inputs from the last trigger + # of an optional link without caching. Might be `None` + # when there is none. + buffer = [self._retained_optional_trigger] else: - if source in self._required_inputs: - self._required_inputs[source] = inData - elif source in self._cache_if_not_required_inputs: - self._cache_if_not_required_inputs[source] = inData + if self._buffer_optional_triggers: + # Execute for each retained inputs from optional links without caching. + buffer = list(self._buffer_optional_triggers) + else: + # Execute once without any retained inputs. + buffer = [None] + + for i, retained_inputs in enumerate(buffer): + try: + self._trigger_downstream(retained_inputs) + except Exception: + if not self._no_buffering: + # Keep the inputs not successfully propagated. + self._buffer_optional_triggers = buffer[i:] + raise + + if not self._no_buffering: + if buffer: + # Retain the last one for the next trigger. + # Might be `None` when there is none. + self._retained_optional_trigger = buffer[-1] else: - self._non_required_last_inputs = inData + self._retained_optional_trigger = None + + # No more buffering, only retain one. + self._no_buffering = True + + # No longer needed so do not keep references. + self._buffer_optional_triggers.clear() + + def _cache_inputs(self, source, inData: dict) -> None: + if source is None: + self._cached_start_triggers.append(inData) + return - missing = {k: v for k, v in self._required_inputs.items() if v is None} - if missing: + if source in self._cached_required_triggers: + # Cache inputs from required link + self._cached_required_triggers[source] = inData + elif source in self._cached_optional_triggers: + # Cache inputs from optional link + self._cached_optional_triggers[source] = inData + elif self._no_buffering: + # Executed at least once + self._retained_optional_trigger = inData + else: + # Did not execute yet + self._buffer_optional_triggers.append(inData) + + def _has_all_required_triggers(self) -> bool: + missing_required = { + k: v for k, v in self._cached_required_triggers.items() if v is None + } + if missing_required: self.logger.info( "not triggering downstream actors because missing inputs from actors %s", - [actor.name for actor in missing], + [actor.name for actor in missing_required], ) - return + return False + return True - self.logger.info( - "triggering downstream actors with input merger if %d graph start, %d required actors, %d non-required cached actors and %d non-required actors", - len(self._start_inputs), - len(self._required_inputs), - len(self._cache_if_not_required_inputs), - int(bool(self._non_required_last_inputs)), + def _trigger_downstream(self, retained_inputs: Optional[dict]): + merged_inputs = self._downstream_inputs(retained_inputs) + for actor in self.listDownStreamActor: + actor.trigger(merged_inputs) + + def _downstream_inputs(self, retained_inputs: Optional[dict]) -> dict: + self.logger.debug( + "Trigger downstream actor with merged inputs from\n " + "%d graph start triggers\n " + "%d cached required links\n " + "%d cached optional links\n " + "%d retained optional links", + len(self._cached_start_triggers), + len(self._cached_required_triggers), + len(self._cached_optional_triggers), + int(retained_inputs is not None), ) merged_inputs = dict() - for data in self._start_inputs: + for data in self._cached_start_triggers: merged_inputs.update(data) - for data in self._required_inputs.values(): + for data in self._cached_required_triggers.values(): merged_inputs.update(data) - for data in self._cache_if_not_required_inputs.values(): + for data in self._cached_optional_triggers.values(): if data is None: - # Non-required branch not triggered yet + # Optional link not triggered yet continue merged_inputs.update(data) - merged_inputs.update(self._non_required_last_inputs) + if retained_inputs: + merged_inputs.update(retained_inputs) - for actor in self.listDownStreamActor: - actor.trigger(merged_inputs) + return merged_inputs class EwoksWorkflow(Workflow): @@ -490,7 +553,7 @@ def _create_name_mapper( # Conditional link on_error = link_attrs.get("on_error", False) - cache_if_not_required = link_attrs.get("cache_if_not_required", False) + cache_if_optional = link_attrs.get("cache_if_optional", False) # Required link required = analysis.link_is_required(taskgraph.graph, source_id, target_id) @@ -508,7 +571,7 @@ def _create_name_mapper( map_all_data=map_all_data, trigger_on_error=on_error, required=required, - cache_if_not_required=cache_if_not_required, + cache_if_optional=cache_if_optional, **self._actor_arguments, ) diff --git a/src/ewoksppf/tests/test_ppf_workflow25.py b/src/ewoksppf/tests/test_ppf_workflow25.py index c0e5e97..4d7ac53 100644 --- a/src/ewoksppf/tests/test_ppf_workflow25.py +++ b/src/ewoksppf/tests/test_ppf_workflow25.py @@ -128,41 +128,41 @@ def workflow(): "source": "metric1", "target": "decider", "required": False, - "cache_if_not_required": True, + "cache_if_optional": True, "data_mapping": [{"source_output": "metric", "target_input": "metric1"}], }, { "source": "metric2", "target": "decider", "required": False, - "cache_if_not_required": True, + "cache_if_optional": True, "data_mapping": [{"source_output": "metric", "target_input": "metric2"}], }, { "source": "metric3", "target": "decider", "required": False, - "cache_if_not_required": True, + "cache_if_optional": True, "data_mapping": [{"source_output": "metric", "target_input": "metric3"}], }, { "source": "metric4", "target": "decider", "required": False, - "cache_if_not_required": True, + "cache_if_optional": True, "data_mapping": [{"source_output": "metric", "target_input": "metric4"}], }, { "source": "timeout", "target": "decider", "required": False, - "cache_if_not_required": True, + "cache_if_optional": True, "data_mapping": [{"source_output": "timeout", "target_input": "timeout"}], }, { "source": "decider", "target": "decider", - "cache_if_not_required": True, + "cache_if_optional": True, "data_mapping": [ {"source_output": "set_disable", "target_input": "disable"} ], @@ -219,7 +219,7 @@ def create_inputs_timeout_last(metric_threshold: float): def test_ppf_workflow25_metric1(ppf_log_config): - """test 'cache_if_not_required' links""" + """test 'cache_if_optional' links""" # Metrics that pass: metric1, metric2, metric3 and metric4 inputs = create_inputs(5) result = execute_graph(workflow(), inputs=inputs) @@ -227,7 +227,7 @@ def test_ppf_workflow25_metric1(ppf_log_config): def test_ppf_workflow25_metric2(ppf_log_config): - """test 'cache_if_not_required' links""" + """test 'cache_if_optional' links""" # Metrics that pass: metric2, metric3 and metric4 inputs = create_inputs(15) result = execute_graph(workflow(), inputs=inputs) @@ -235,7 +235,7 @@ def test_ppf_workflow25_metric2(ppf_log_config): def test_ppf_workflow25_metric3(ppf_log_config): - """test 'cache_if_not_required' links""" + """test 'cache_if_optional' links""" # Metrics that pass: metric3 and metric4 inputs = create_inputs(25) result = execute_graph(workflow(), inputs=inputs) @@ -243,7 +243,7 @@ def test_ppf_workflow25_metric3(ppf_log_config): def test_ppf_workflow25_timeout1(ppf_log_config): - """test 'cache_if_not_required' links""" + """test 'cache_if_optional' links""" # Metrics that pass: metric4 inputs = create_inputs(35) result = execute_graph(workflow(), inputs=inputs) @@ -251,7 +251,7 @@ def test_ppf_workflow25_timeout1(ppf_log_config): def test_ppf_workflow25_timeout2(ppf_log_config): - """test 'cache_if_not_required' links""" + """test 'cache_if_optional' links""" # Metrics that pass: none inputs = create_inputs(45) result = execute_graph(workflow(), inputs=inputs) @@ -259,7 +259,7 @@ def test_ppf_workflow25_timeout2(ppf_log_config): def test_ppf_workflow25_none(ppf_log_config): - """test 'cache_if_not_required' links""" + """test 'cache_if_optional' links""" # Metrics that pass: none inputs = create_inputs_timeout_last(45) result = execute_graph(workflow(), inputs=inputs) From c50c41e3a6485eb6847490ded3e0403647357c99 Mon Sep 17 00:00:00 2001 From: woutdenolf Date: Fri, 3 Apr 2026 19:05:53 +0200 Subject: [PATCH 06/15] workflow25: stop triggering based on cache and not link-to-self --- src/ewoksppf/tests/test_ppf_workflow25.py | 186 +++++++++++++--------- 1 file changed, 113 insertions(+), 73 deletions(-) diff --git a/src/ewoksppf/tests/test_ppf_workflow25.py b/src/ewoksppf/tests/test_ppf_workflow25.py index 4d7ac53..0b2d3f3 100644 --- a/src/ewoksppf/tests/test_ppf_workflow25.py +++ b/src/ewoksppf/tests/test_ppf_workflow25.py @@ -1,5 +1,9 @@ +import json import time +from pathlib import Path +from typing import Dict +import pytest from ewokscore.task import Task from ewoksutils.import_utils import qualname @@ -28,82 +32,93 @@ def run(self): class Decider( Task, input_names=["param1", "param2", "metric_threshold"], - optional_input_names=[ - "metric1", - "metric2", - "metric3", - "metric4", - "timeout", - "disable", - ], - output_names=["good_enough", "reason", "set_disable"], + optional_input_names=["metric1", "metric2", "metric3", "metric4", "timeout"], + output_names=["good_enough", "reason"], ): def run(self): - print("\nDecider inputs:", self.get_input_values()) - print("Metric threshold:", self.inputs.metric_threshold) + print("\nDecider executed with inputs:", self.get_input_values()) + + # Check that we have all required inputs if self.inputs.param1 != -1: raise ValueError(f"param1 is {self.inputs.param1} intead of -1") if self.inputs.param2 != -2: raise ValueError(f"param2 is {self.inputs.param2} intead of -2") - if self.inputs.disable: - print("IGNORE") - self.outputs.good_enough = None - self.outputs.reason = None - self.outputs.set_disable = False - return - + # Check the state of all metrics good_metrics = [] - nmetrics = 0 - for name in ["metric1", "metric2", "metric3", "metric4"]: + bad_metrics = [] + unknown_metrics = [] + metric_names = ["metric1", "metric2", "metric3", "metric4"] + for name in metric_names: value = self.get_input_value(name, None) if value is None: - continue - nmetrics += 1 - if value >= self.inputs.metric_threshold: + unknown_metrics.append(name) + elif value >= self.inputs.metric_threshold: good_metrics.append(name) - print("Metrics that are good:", good_metrics) + else: + bad_metrics.append(name) - if any(good_metrics): + print("Metrics that are good:", good_metrics) + print("Metrics that are bad:", bad_metrics) + print("Metrics that are unknown:", unknown_metrics) + + # Check timeout + has_timeout = bool(self.get_input_value("timeout")) + print("Timeout:", "yes" if has_timeout else "no") + + # Trigger downstream reasons + trigger_good = len(good_metrics) == 1 + trigger_bad_timeout = has_timeout + trigger_bad_none_good_enough = len(unknown_metrics) == 0 + + # Ensure good/bad is trigger only once + n = sum((bool(good_metrics), trigger_bad_timeout, trigger_bad_none_good_enough)) + if n > 1: + trigger_good = False + trigger_bad_timeout = False + trigger_bad_none_good_enough = False + + # Trigger good, bas or nothing + if trigger_good: print("TRIGGER GOOD") self.outputs.good_enough = True self.outputs.reason = ", ".join(good_metrics) - self.outputs.set_disable = True - return - - if self.get_input_value("timeout"): - print("TRIGGER BAD") + elif trigger_bad_timeout: + print("TRIGGER BAD: TIMEOUT") self.outputs.good_enough = False self.outputs.reason = "timeout" - self.outputs.set_disable = True - return - - if nmetrics == 4: - print("TRIGGER BAD") + elif trigger_bad_none_good_enough: + print("TRIGGER BAD: ALL METRIC BAD") self.outputs.good_enough = False - self.outputs.reason = "none" - self.outputs.set_disable = True - return + self.outputs.reason = "no metric good enough" + else: + print("TRIGGER NOTHING") + self.outputs.good_enough = None + self.outputs.reason = None - print("TRIGGER NOTHING") - self.outputs.good_enough = None - self.outputs.reason = None - self.outputs.set_disable = False +class RecordCalls(Task, input_names=["call_record_file"]): + def run(self): + calls = json.loads(self.inputs.call_record_file.read_text()) + calls.append(self.get_output_values()) + self.inputs.call_record_file.write_text(json.dumps(calls)) -class Good(Task, input_names=["reason"], output_names=["state", "reason"]): + +class Good(RecordCalls, input_names=["reason"], output_names=["state", "reason"]): def run(self): print("\nGOOD:", self.inputs.reason) self.outputs.state = "GOOD" self.outputs.reason = self.inputs.reason + super().run() -class Bad(Task, input_names=["reason"], output_names=["state", "reason"]): +class Bad(RecordCalls, input_names=["reason"], output_names=["state", "reason"]): def run(self): print("\nBAD:", self.inputs.reason) self.outputs.state = "BAD" self.outputs.reason = self.inputs.reason + super().run() def workflow(): @@ -159,15 +174,6 @@ def workflow(): "cache_if_optional": True, "data_mapping": [{"source_output": "timeout", "target_input": "timeout"}], }, - { - "source": "decider", - "target": "decider", - "cache_if_optional": True, - "data_mapping": [ - {"source_output": "set_disable", "target_input": "disable"} - ], - "conditions": [{"source_output": "set_disable", "value": True}], - }, { "source": "decider", "target": "good", @@ -184,7 +190,7 @@ def workflow(): return {"graph": {"id": "workflow"}, "nodes": nodes, "links": links} -def create_inputs(metric_threshold: float): +def create_inputs(call_record_file: Path, metric_threshold: float): tm = 0.1 return [ {"id": "required", "name": "compute_time", "value": 0.1 * tm}, @@ -198,10 +204,12 @@ def create_inputs(metric_threshold: float): {"id": "metric4", "name": "compute_time", "value": 5 * tm}, {"id": "metric4", "name": "metric", "value": 40}, {"id": "decider", "name": "metric_threshold", "value": metric_threshold}, + {"id": "good", "name": "call_record_file", "value": call_record_file}, + {"id": "bad", "name": "call_record_file", "value": call_record_file}, ] -def create_inputs_timeout_last(metric_threshold: float): +def create_inputs_timeout_last(call_record_file: Path, metric_threshold: float): tm = 0.1 return [ {"id": "required", "name": "compute_time", "value": 0.1 * tm}, @@ -215,52 +223,84 @@ def create_inputs_timeout_last(metric_threshold: float): {"id": "metric4", "name": "metric", "value": 40}, {"id": "timeout", "name": "timeout", "value": 5 * tm}, {"id": "decider", "name": "metric_threshold", "value": metric_threshold}, + {"id": "good", "name": "call_record_file", "value": call_record_file}, + {"id": "bad", "name": "call_record_file", "value": call_record_file}, ] -def test_ppf_workflow25_metric1(ppf_log_config): +@pytest.fixture +def call_record_file(tmp_path: Path): + file = tmp_path / "calls.json" + file.write_text(json.dumps([])) + return file + + +def test_ppf_workflow25_metric1(ppf_log_config, call_record_file: Path): """test 'cache_if_optional' links""" # Metrics that pass: metric1, metric2, metric3 and metric4 - inputs = create_inputs(5) + inputs = create_inputs(call_record_file, 5) result = execute_graph(workflow(), inputs=inputs) - assert result == {"reason": "metric1", "state": "GOOD"} + + expected = {"state": "GOOD", "reason": "metric1"} + calls = json.loads(call_record_file.read_text()) + assert calls == [expected] + + assert result == expected -def test_ppf_workflow25_metric2(ppf_log_config): +def test_ppf_workflow25_metric2(ppf_log_config, call_record_file: Path): """test 'cache_if_optional' links""" # Metrics that pass: metric2, metric3 and metric4 - inputs = create_inputs(15) + inputs = create_inputs(call_record_file, 15) result = execute_graph(workflow(), inputs=inputs) - assert result == {"reason": "metric2", "state": "GOOD"} + expected = {"state": "GOOD", "reason": "metric2"} + _assert_result(call_record_file, result, expected) -def test_ppf_workflow25_metric3(ppf_log_config): + +def test_ppf_workflow25_metric3(ppf_log_config, call_record_file: Path): """test 'cache_if_optional' links""" # Metrics that pass: metric3 and metric4 - inputs = create_inputs(25) + inputs = create_inputs(call_record_file, 25) result = execute_graph(workflow(), inputs=inputs) - assert result == {"reason": "metric3", "state": "GOOD"} + + expected = {"state": "GOOD", "reason": "metric3"} + _assert_result(call_record_file, result, expected) -def test_ppf_workflow25_timeout1(ppf_log_config): +def test_ppf_workflow25_timeout1(ppf_log_config, call_record_file: Path): """test 'cache_if_optional' links""" # Metrics that pass: metric4 - inputs = create_inputs(35) + inputs = create_inputs(call_record_file, 35) result = execute_graph(workflow(), inputs=inputs) - assert result == {"reason": "timeout", "state": "BAD"} + expected = {"state": "BAD", "reason": "timeout"} + _assert_result(call_record_file, result, expected) -def test_ppf_workflow25_timeout2(ppf_log_config): + +def test_ppf_workflow25_timeout2(ppf_log_config, call_record_file: Path): """test 'cache_if_optional' links""" # Metrics that pass: none - inputs = create_inputs(45) + inputs = create_inputs(call_record_file, 45) result = execute_graph(workflow(), inputs=inputs) - assert result == {"reason": "timeout", "state": "BAD"} + + expected = {"state": "BAD", "reason": "timeout"} + _assert_result(call_record_file, result, expected) -def test_ppf_workflow25_none(ppf_log_config): +def test_ppf_workflow25_timeout3(ppf_log_config, call_record_file: Path): """test 'cache_if_optional' links""" # Metrics that pass: none - inputs = create_inputs_timeout_last(45) + inputs = create_inputs_timeout_last(call_record_file, 45) result = execute_graph(workflow(), inputs=inputs) - assert result == {"reason": "none", "state": "BAD"} + + expected = {"state": "BAD", "reason": "no metric good enough"} + _assert_result(call_record_file, result, expected) + + +def _assert_result( + call_record_file: Path, result: Dict[str, str], expected: Dict[str, str] +) -> None: + calls = json.loads(call_record_file.read_text()) + assert calls == [expected] + assert result == expected From 2e56f290f7b00af16e1b2ed293390e21648398e2 Mon Sep 17 00:00:00 2001 From: woutdenolf Date: Mon, 13 Apr 2026 18:05:50 +0200 Subject: [PATCH 07/15] trigger farther apart in time --- src/ewoksppf/tests/test_ppf_workflow25.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/ewoksppf/tests/test_ppf_workflow25.py b/src/ewoksppf/tests/test_ppf_workflow25.py index 0b2d3f3..6eb1954 100644 --- a/src/ewoksppf/tests/test_ppf_workflow25.py +++ b/src/ewoksppf/tests/test_ppf_workflow25.py @@ -191,7 +191,9 @@ def workflow(): def create_inputs(call_record_file: Path, metric_threshold: float): - tm = 0.1 + tm = 0.5 # Too short and the test fails. If that happens it is not a bug. + # The Ewoks workflow SPECS have nothing to guarantee that downstream + # execution follows the trigger order in time. return [ {"id": "required", "name": "compute_time", "value": 0.1 * tm}, {"id": "metric1", "name": "compute_time", "value": 1 * tm}, @@ -210,7 +212,9 @@ def create_inputs(call_record_file: Path, metric_threshold: float): def create_inputs_timeout_last(call_record_file: Path, metric_threshold: float): - tm = 0.1 + tm = 0.5 # Too short and the test fails. If that happens it is not a bug. + # The Ewoks workflow SPECS have nothing to guarantee that downstream + # execution follows the trigger order in time. return [ {"id": "required", "name": "compute_time", "value": 0.1 * tm}, {"id": "metric1", "name": "compute_time", "value": 1 * tm}, From a590ae6900dd1176ba4d730a62ff8dce21de05a4 Mon Sep 17 00:00:00 2001 From: woutdenolf Date: Tue, 14 Apr 2026 11:00:47 +0200 Subject: [PATCH 08/15] different approach to test_ppf_workflow25 --- src/ewoksppf/tests/test_ppf_workflow25.py | 352 ++++++---------------- 1 file changed, 97 insertions(+), 255 deletions(-) diff --git a/src/ewoksppf/tests/test_ppf_workflow25.py b/src/ewoksppf/tests/test_ppf_workflow25.py index 6eb1954..f37b91b 100644 --- a/src/ewoksppf/tests/test_ppf_workflow25.py +++ b/src/ewoksppf/tests/test_ppf_workflow25.py @@ -1,7 +1,5 @@ -import json +import itertools import time -from pathlib import Path -from typing import Dict import pytest from ewokscore.task import Task @@ -10,301 +8,145 @@ from ..bindings import execute_graph -class Required(Task, input_names=["compute_time"], output_names=["param1", "param2"]): +class Required(Task, input_names=["compute_time"], output_names=["required"]): def run(self): time.sleep(self.inputs.compute_time) - self.outputs.param1 = -1 - self.outputs.param2 = -2 + self.outputs.required = True -class Metric(Task, input_names=["compute_time", "metric"], output_names=["metric"]): +class Optional(Task, input_names=["compute_time"], output_names=["optional"]): def run(self): time.sleep(self.inputs.compute_time) - self.outputs.metric = self.inputs.metric + self.outputs.optional = True -class Timeout(Task, input_names=["timeout"], output_names=["timeout"]): - def run(self): - time.sleep(self.inputs.timeout) - self.outputs.timeout = True - - -class Decider( +class Gather( Task, - input_names=["param1", "param2", "metric_threshold"], - optional_input_names=["metric1", "metric2", "metric3", "metric4", "timeout"], - output_names=["good_enough", "reason"], + input_names=["required1", "required2"], + optional_input_names=["optional1", "optional2", "retained1", "retained2"], + output_names=["cached"], ): def run(self): - print("\nDecider executed with inputs:", self.get_input_values()) - - # Check that we have all required inputs - if self.inputs.param1 != -1: - raise ValueError(f"param1 is {self.inputs.param1} intead of -1") - - if self.inputs.param2 != -2: - raise ValueError(f"param2 is {self.inputs.param2} intead of -2") - - # Check the state of all metrics - good_metrics = [] - bad_metrics = [] - unknown_metrics = [] - metric_names = ["metric1", "metric2", "metric3", "metric4"] - for name in metric_names: - value = self.get_input_value(name, None) - if value is None: - unknown_metrics.append(name) - elif value >= self.inputs.metric_threshold: - good_metrics.append(name) - else: - bad_metrics.append(name) - - print("Metrics that are good:", good_metrics) - print("Metrics that are bad:", bad_metrics) - print("Metrics that are unknown:", unknown_metrics) - - # Check timeout - has_timeout = bool(self.get_input_value("timeout")) - print("Timeout:", "yes" if has_timeout else "no") - - # Trigger downstream reasons - trigger_good = len(good_metrics) == 1 - trigger_bad_timeout = has_timeout - trigger_bad_none_good_enough = len(unknown_metrics) == 0 - - # Ensure good/bad is trigger only once - n = sum((bool(good_metrics), trigger_bad_timeout, trigger_bad_none_good_enough)) - if n > 1: - trigger_good = False - trigger_bad_timeout = False - trigger_bad_none_good_enough = False - - # Trigger good, bas or nothing - if trigger_good: - print("TRIGGER GOOD") - self.outputs.good_enough = True - self.outputs.reason = ", ".join(good_metrics) - elif trigger_bad_timeout: - print("TRIGGER BAD: TIMEOUT") - self.outputs.good_enough = False - self.outputs.reason = "timeout" - elif trigger_bad_none_good_enough: - print("TRIGGER BAD: ALL METRIC BAD") - self.outputs.good_enough = False - self.outputs.reason = "no metric good enough" - else: - print("TRIGGER NOTHING") - self.outputs.good_enough = None - self.outputs.reason = None - - -class RecordCalls(Task, input_names=["call_record_file"]): - def run(self): - calls = json.loads(self.inputs.call_record_file.read_text()) - calls.append(self.get_output_values()) - self.inputs.call_record_file.write_text(json.dumps(calls)) - - -class Good(RecordCalls, input_names=["reason"], output_names=["state", "reason"]): - def run(self): - print("\nGOOD:", self.inputs.reason) - self.outputs.state = "GOOD" - self.outputs.reason = self.inputs.reason - super().run() - - -class Bad(RecordCalls, input_names=["reason"], output_names=["state", "reason"]): - def run(self): - print("\nBAD:", self.inputs.reason) - self.outputs.state = "BAD" - self.outputs.reason = self.inputs.reason - super().run() + cached = self.get_input_values() + print("\nDecider executed with inputs:", cached) + self.outputs.cached = cached def workflow(): nodes = [ - {"id": "required", "task_type": "class", "task_identifier": qualname(Required)}, - {"id": "metric1", "task_type": "class", "task_identifier": qualname(Metric)}, - {"id": "metric2", "task_type": "class", "task_identifier": qualname(Metric)}, - {"id": "metric3", "task_type": "class", "task_identifier": qualname(Metric)}, - {"id": "metric4", "task_type": "class", "task_identifier": qualname(Metric)}, - {"id": "timeout", "task_type": "class", "task_identifier": qualname(Timeout)}, - {"id": "decider", "task_type": "class", "task_identifier": qualname(Decider)}, - {"id": "good", "task_type": "class", "task_identifier": qualname(Good)}, - {"id": "bad", "task_type": "class", "task_identifier": qualname(Bad)}, - ] - links = [ { - "source": "required", - "target": "decider", - "map_all_data": True, + "id": "required1", + "task_type": "class", + "task_identifier": qualname(Required), }, { - "source": "metric1", - "target": "decider", - "required": False, - "cache_if_optional": True, - "data_mapping": [{"source_output": "metric", "target_input": "metric1"}], + "id": "required2", + "task_type": "class", + "task_identifier": qualname(Required), }, { - "source": "metric2", - "target": "decider", - "required": False, - "cache_if_optional": True, - "data_mapping": [{"source_output": "metric", "target_input": "metric2"}], + "id": "optional1", + "task_type": "class", + "task_identifier": qualname(Optional), }, { - "source": "metric3", - "target": "decider", - "required": False, - "cache_if_optional": True, - "data_mapping": [{"source_output": "metric", "target_input": "metric3"}], + "id": "optional2", + "task_type": "class", + "task_identifier": qualname(Optional), + }, + { + "id": "retained1", + "task_type": "class", + "task_identifier": qualname(Optional), + }, + { + "id": "retained2", + "task_type": "class", + "task_identifier": qualname(Optional), }, { - "source": "metric4", - "target": "decider", + "id": "gather", + "task_type": "class", + "task_identifier": qualname(Gather), + }, + ] + links = [ + { + "source": "required1", + "target": "gather", + "data_mapping": [ + {"source_output": "required", "target_input": "required1"} + ], + }, + { + "source": "required2", + "target": "gather", + "data_mapping": [ + {"source_output": "required", "target_input": "required2"} + ], + }, + { + "source": "optional1", + "target": "gather", "required": False, "cache_if_optional": True, - "data_mapping": [{"source_output": "metric", "target_input": "metric4"}], + "data_mapping": [ + {"source_output": "optional", "target_input": "optional1"} + ], }, { - "source": "timeout", - "target": "decider", + "source": "optional2", + "target": "gather", "required": False, "cache_if_optional": True, - "data_mapping": [{"source_output": "timeout", "target_input": "timeout"}], + "data_mapping": [ + {"source_output": "optional", "target_input": "optional2"} + ], }, { - "source": "decider", - "target": "good", - "data_mapping": [{"source_output": "reason", "target_input": "reason"}], - "conditions": [{"source_output": "good_enough", "value": True}], + "source": "retained1", + "target": "gather", + "required": False, + "cache_if_optional": False, + "data_mapping": [ + {"source_output": "optional", "target_input": "retained1"} + ], }, { - "source": "decider", - "target": "bad", - "data_mapping": [{"source_output": "reason", "target_input": "reason"}], - "conditions": [{"source_output": "good_enough", "value": False}], + "source": "retained2", + "target": "gather", + "required": False, + "cache_if_optional": False, + "data_mapping": [ + {"source_output": "optional", "target_input": "retained2"} + ], }, ] return {"graph": {"id": "workflow"}, "nodes": nodes, "links": links} -def create_inputs(call_record_file: Path, metric_threshold: float): - tm = 0.5 # Too short and the test fails. If that happens it is not a bug. - # The Ewoks workflow SPECS have nothing to guarantee that downstream - # execution follows the trigger order in time. - return [ - {"id": "required", "name": "compute_time", "value": 0.1 * tm}, - {"id": "metric1", "name": "compute_time", "value": 1 * tm}, - {"id": "metric1", "name": "metric", "value": 10}, - {"id": "metric2", "name": "compute_time", "value": 2 * tm}, - {"id": "metric2", "name": "metric", "value": 20}, - {"id": "metric3", "name": "compute_time", "value": 3 * tm}, - {"id": "metric3", "name": "metric", "value": 30}, - {"id": "timeout", "name": "timeout", "value": 4 * tm}, - {"id": "metric4", "name": "compute_time", "value": 5 * tm}, - {"id": "metric4", "name": "metric", "value": 40}, - {"id": "decider", "name": "metric_threshold", "value": metric_threshold}, - {"id": "good", "name": "call_record_file", "value": call_record_file}, - {"id": "bad", "name": "call_record_file", "value": call_record_file}, - ] - - -def create_inputs_timeout_last(call_record_file: Path, metric_threshold: float): - tm = 0.5 # Too short and the test fails. If that happens it is not a bug. - # The Ewoks workflow SPECS have nothing to guarantee that downstream - # execution follows the trigger order in time. +def get_inputs(required, optional, retained): return [ - {"id": "required", "name": "compute_time", "value": 0.1 * tm}, - {"id": "metric1", "name": "compute_time", "value": 1 * tm}, - {"id": "metric1", "name": "metric", "value": 10}, - {"id": "metric2", "name": "compute_time", "value": 2 * tm}, - {"id": "metric2", "name": "metric", "value": 20}, - {"id": "metric3", "name": "compute_time", "value": 3 * tm}, - {"id": "metric3", "name": "metric", "value": 30}, - {"id": "metric4", "name": "compute_time", "value": 4 * tm}, - {"id": "metric4", "name": "metric", "value": 40}, - {"id": "timeout", "name": "timeout", "value": 5 * tm}, - {"id": "decider", "name": "metric_threshold", "value": metric_threshold}, - {"id": "good", "name": "call_record_file", "value": call_record_file}, - {"id": "bad", "name": "call_record_file", "value": call_record_file}, + {"id": "required1", "name": "compute_time", "value": required}, + {"id": "required2", "name": "compute_time", "value": required}, + {"id": "optional1", "name": "compute_time", "value": optional}, + {"id": "optional2", "name": "compute_time", "value": optional}, + {"id": "retained1", "name": "compute_time", "value": retained}, + {"id": "retained2", "name": "compute_time", "value": retained}, ] -@pytest.fixture -def call_record_file(tmp_path: Path): - file = tmp_path / "calls.json" - file.write_text(json.dumps([])) - return file - - -def test_ppf_workflow25_metric1(ppf_log_config, call_record_file: Path): - """test 'cache_if_optional' links""" - # Metrics that pass: metric1, metric2, metric3 and metric4 - inputs = create_inputs(call_record_file, 5) - result = execute_graph(workflow(), inputs=inputs) - - expected = {"state": "GOOD", "reason": "metric1"} - calls = json.loads(call_record_file.read_text()) - assert calls == [expected] - - assert result == expected - - -def test_ppf_workflow25_metric2(ppf_log_config, call_record_file: Path): - """test 'cache_if_optional' links""" - # Metrics that pass: metric2, metric3 and metric4 - inputs = create_inputs(call_record_file, 15) - result = execute_graph(workflow(), inputs=inputs) - - expected = {"state": "GOOD", "reason": "metric2"} - _assert_result(call_record_file, result, expected) - - -def test_ppf_workflow25_metric3(ppf_log_config, call_record_file: Path): - """test 'cache_if_optional' links""" - # Metrics that pass: metric3 and metric4 - inputs = create_inputs(call_record_file, 25) - result = execute_graph(workflow(), inputs=inputs) - - expected = {"state": "GOOD", "reason": "metric3"} - _assert_result(call_record_file, result, expected) - - -def test_ppf_workflow25_timeout1(ppf_log_config, call_record_file: Path): - """test 'cache_if_optional' links""" - # Metrics that pass: metric4 - inputs = create_inputs(call_record_file, 35) - result = execute_graph(workflow(), inputs=inputs) - - expected = {"state": "BAD", "reason": "timeout"} - _assert_result(call_record_file, result, expected) - - -def test_ppf_workflow25_timeout2(ppf_log_config, call_record_file: Path): - """test 'cache_if_optional' links""" - # Metrics that pass: none - inputs = create_inputs(call_record_file, 45) - result = execute_graph(workflow(), inputs=inputs) - - expected = {"state": "BAD", "reason": "timeout"} - _assert_result(call_record_file, result, expected) - - -def test_ppf_workflow25_timeout3(ppf_log_config, call_record_file: Path): - """test 'cache_if_optional' links""" - # Metrics that pass: none - inputs = create_inputs_timeout_last(call_record_file, 45) - result = execute_graph(workflow(), inputs=inputs) +_ORDER = list(itertools.permutations(["required", "optional", "retained"])) - expected = {"state": "BAD", "reason": "no metric good enough"} - _assert_result(call_record_file, result, expected) +@pytest.mark.parametrize("order", _ORDER, ids=["-".join(keys) for keys in _ORDER]) +def test_ppf_workflow25(ppf_log_config, order): + """Test input caching for different types of links executed in different orders.""" + compute_times = [0, 0.5, 1] + inputs = get_inputs(**dict(zip(order, compute_times))) -def _assert_result( - call_record_file: Path, result: Dict[str, str], expected: Dict[str, str] -) -> None: - calls = json.loads(call_record_file.read_text()) - assert calls == [expected] - assert result == expected + result = execute_graph(workflow(), pool_type="thread", inputs=inputs) + cached = set(result["cached"]) + cached1 = {"required1", "required2", "optional1", "optional2", "retained1"} + cached2 = {"required1", "required2", "optional1", "optional2", "retained2"} + assert cached == cached1 or cached == cached2, cached From a36416876013aae41d3bb6c843ff95ae24c6610e Mon Sep 17 00:00:00 2001 From: woutdenolf Date: Tue, 14 Apr 2026 11:26:04 +0200 Subject: [PATCH 09/15] pypushflow actor trigger state needs to be protected --- src/ewoksppf/bindings.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/ewoksppf/bindings.py b/src/ewoksppf/bindings.py index 1e2afad..4d84972 100644 --- a/src/ewoksppf/bindings.py +++ b/src/ewoksppf/bindings.py @@ -1,4 +1,5 @@ import os +import threading import warnings from contextlib import contextmanager from typing import Generator @@ -240,6 +241,8 @@ def __init__(self, parent=None, name="Input merger", **kw): # after all required triggers arrived self._retained_optional_trigger = None + self._lock = threading.Lock() + def register_input_actor(self, actor): if actor.required: info = "(required): cache inputs" @@ -255,14 +258,19 @@ def register_input_actor(self, actor): def _execute( self, inData: dict, _scope_id: Optional[str] = None, source=None ) -> None: - self.setStarted() - self.setFinished() + with self._lock: + self.setStarted() + try: + self._cache_inputs(source, inData) + finally: + self.setFinished() - self._cache_inputs(source, inData) + if not self._has_all_required_triggers(): + return - if not self._has_all_required_triggers(): - return + self._propagate_cached_inputs() + def _propagate_cached_inputs(self) -> None: if self._no_buffering: # Execute with the retained inputs from the last trigger # of an optional link without caching. Might be `None` From 93fcb74c77c7d25c045d6af9bde8fe26752be3ec Mon Sep 17 00:00:00 2001 From: woutdenolf Date: Tue, 14 Apr 2026 11:46:35 +0200 Subject: [PATCH 10/15] fix parallel print --- src/ewoksppf/tests/test_ppf_workflow25.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ewoksppf/tests/test_ppf_workflow25.py b/src/ewoksppf/tests/test_ppf_workflow25.py index f37b91b..d28b21e 100644 --- a/src/ewoksppf/tests/test_ppf_workflow25.py +++ b/src/ewoksppf/tests/test_ppf_workflow25.py @@ -28,7 +28,7 @@ class Gather( ): def run(self): cached = self.get_input_values() - print("\nDecider executed with inputs:", cached) + print(f"\nDecider executed with inputs: {cached}") self.outputs.cached = cached From fae606c01d59fa210a89dd8936f51ae832d8dd10 Mon Sep 17 00:00:00 2001 From: woutdenolf Date: Tue, 14 Apr 2026 11:58:53 +0200 Subject: [PATCH 11/15] fix test_ppf_workflow25 --- src/ewoksppf/tests/test_ppf_workflow25.py | 24 +++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/src/ewoksppf/tests/test_ppf_workflow25.py b/src/ewoksppf/tests/test_ppf_workflow25.py index d28b21e..f4d5ceb 100644 --- a/src/ewoksppf/tests/test_ppf_workflow25.py +++ b/src/ewoksppf/tests/test_ppf_workflow25.py @@ -27,7 +27,9 @@ class Gather( output_names=["cached"], ): def run(self): + global _GATHER_CACHE cached = self.get_input_values() + _GATHER_CACHE = cached print(f"\nDecider executed with inputs: {cached}") self.outputs.cached = cached @@ -142,11 +144,29 @@ def get_inputs(required, optional, retained): @pytest.mark.parametrize("order", _ORDER, ids=["-".join(keys) for keys in _ORDER]) def test_ppf_workflow25(ppf_log_config, order): """Test input caching for different types of links executed in different orders.""" + global _GATHER_CACHE + _GATHER_CACHE = None compute_times = [0, 0.5, 1] inputs = get_inputs(**dict(zip(order, compute_times))) - result = execute_graph(workflow(), pool_type="thread", inputs=inputs) - cached = set(result["cached"]) + # result = execute_graph(workflow(), inputs=inputs) + # cached = set(result["cached"]) + # + # When + # + # order = ('retained', 'required', 'optional') + # + # the last two calls to "Gather" could be for example + # + # {'required1': True, 'required2': True, 'optional1': True, 'retained2': True} + # {'required1': True, 'required2': True, 'optional1': True, 'optional2': True, 'retained2': True} + # + # Since these calls happen in parallel and there is nothing in the workflow + # that guarantees we get one or the other as the final workflow result we + # cannot use the result to test the caching. + + _ = execute_graph(workflow(), pool_type="thread", inputs=inputs) + cached = set(_GATHER_CACHE) cached1 = {"required1", "required2", "optional1", "optional2", "retained1"} cached2 = {"required1", "required2", "optional1", "optional2", "retained2"} assert cached == cached1 or cached == cached2, cached From d4025febeff27301f35de108689569a8d2f392ec Mon Sep 17 00:00:00 2001 From: woutdenolf Date: Sun, 19 Apr 2026 13:25:33 +0200 Subject: [PATCH 12/15] modify changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 722f029..0d52a24 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Buffer inputs from optional links that arrive before the first execution. +### Fixed + +- Protect input cache against parallel execution of the same `InputMergeActor` instance. + ## [2.0.2] - 2026-02-15 ### Fixed From de5320e069b14e3013c1a0d92a0a00b15bf92175 Mon Sep 17 00:00:00 2001 From: Wout De Nolf Date: Thu, 23 Apr 2026 21:47:14 +0200 Subject: [PATCH 13/15] Update CHANGELOG.md MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Loïc Huder <42204205+loichuder@users.noreply.github.com> --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0d52a24..589c82b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- Support links that are explicitely optional (`required=False`). +- Support links that are explicitly optional (`required=False`). - Support input caching from optional links (`cache_if_optional=True`). ### Changed From c72f6a91505b36ef9c0ef089e691fd569c3d252f Mon Sep 17 00:00:00 2001 From: woutdenolf Date: Thu, 23 Apr 2026 21:50:36 +0200 Subject: [PATCH 14/15] rename attribute --- src/ewoksppf/bindings.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/ewoksppf/bindings.py b/src/ewoksppf/bindings.py index 4d84972..1eeae2d 100644 --- a/src/ewoksppf/bindings.py +++ b/src/ewoksppf/bindings.py @@ -235,7 +235,7 @@ def __init__(self, parent=None, name="Input merger", **kw): # List of input dicts provided by optional links without caching # that arrived before all required triggers arrived self._buffer_optional_triggers = list() - self._no_buffering = False + self._buffering = True # Retain only one input dict provided by optional links without caching # after all required triggers arrived @@ -271,7 +271,7 @@ def _execute( self._propagate_cached_inputs() def _propagate_cached_inputs(self) -> None: - if self._no_buffering: + if not self._buffering: # Execute with the retained inputs from the last trigger # of an optional link without caching. Might be `None` # when there is none. @@ -288,12 +288,12 @@ def _propagate_cached_inputs(self) -> None: try: self._trigger_downstream(retained_inputs) except Exception: - if not self._no_buffering: + if self._buffering: # Keep the inputs not successfully propagated. self._buffer_optional_triggers = buffer[i:] raise - if not self._no_buffering: + if self._buffering: if buffer: # Retain the last one for the next trigger. # Might be `None` when there is none. @@ -302,7 +302,7 @@ def _propagate_cached_inputs(self) -> None: self._retained_optional_trigger = None # No more buffering, only retain one. - self._no_buffering = True + self._buffering = False # No longer needed so do not keep references. self._buffer_optional_triggers.clear() @@ -318,12 +318,12 @@ def _cache_inputs(self, source, inData: dict) -> None: elif source in self._cached_optional_triggers: # Cache inputs from optional link self._cached_optional_triggers[source] = inData - elif self._no_buffering: - # Executed at least once - self._retained_optional_trigger = inData - else: + elif self._buffering: # Did not execute yet self._buffer_optional_triggers.append(inData) + else: + # Executed at least once + self._retained_optional_trigger = inData def _has_all_required_triggers(self) -> bool: missing_required = { From d3749a9bc3786acb1a3aacd1e8a2d3d45460cf99 Mon Sep 17 00:00:00 2001 From: woutdenolf Date: Thu, 23 Apr 2026 21:52:49 +0200 Subject: [PATCH 15/15] add type annotations --- src/ewoksppf/bindings.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/src/ewoksppf/bindings.py b/src/ewoksppf/bindings.py index 1eeae2d..a6f454d 100644 --- a/src/ewoksppf/bindings.py +++ b/src/ewoksppf/bindings.py @@ -2,6 +2,7 @@ import threading import warnings from contextlib import contextmanager +from typing import Dict from typing import Generator from typing import List from typing import Optional @@ -226,24 +227,24 @@ def __init__(self, parent=None, name="Input merger", **kw): super().__init__(parent=parent, name=name, **kw) # List of input dicts provided by the graph startargs (not part of the Ewoks SPEC) - self._cached_start_triggers = list() + self._cached_start_triggers: List[dict] = list() # Map actor to input dict provided by that actor - self._cached_required_triggers = dict() - self._cached_optional_triggers = dict() + self._cached_required_triggers: Dict[AbstractActor, dict] = dict() + self._cached_optional_triggers: Dict[AbstractActor, dict] = dict() # List of input dicts provided by optional links without caching # that arrived before all required triggers arrived - self._buffer_optional_triggers = list() + self._buffer_optional_triggers: List[dict] = list() self._buffering = True # Retain only one input dict provided by optional links without caching # after all required triggers arrived - self._retained_optional_trigger = None + self._retained_optional_trigger: Optional[dict] = None self._lock = threading.Lock() - def register_input_actor(self, actor): + def register_input_actor(self, actor: Optional[AbstractActor]): if actor.required: info = "(required): cache inputs" self._cached_required_triggers[actor] = None @@ -256,7 +257,10 @@ def register_input_actor(self, actor): self.logger.info("%s %s", actor.name, info) def _execute( - self, inData: dict, _scope_id: Optional[str] = None, source=None + self, + inData: dict, + _scope_id: Optional[str] = None, + source: Optional[AbstractActor] = None, ) -> None: with self._lock: self.setStarted() @@ -307,7 +311,7 @@ def _propagate_cached_inputs(self) -> None: # No longer needed so do not keep references. self._buffer_optional_triggers.clear() - def _cache_inputs(self, source, inData: dict) -> None: + def _cache_inputs(self, source: Optional[AbstractActor], inData: dict) -> None: if source is None: self._cached_start_triggers.append(inData) return