From 1ca2b0f0b3ac246cc50671a93cfd2519f2edeca8 Mon Sep 17 00:00:00 2001 From: LanderOtto Date: Thu, 26 Feb 2026 17:03:56 +0100 Subject: [PATCH 1/4] This commit resolves an issue where unbound input ports with empty tokens lacked persistent id, leading to invalid entries in the provenance database. The commit introduces an `InputInjectorStep` to persist `None` tokens and adds a new step, `BroadcastTransformer`, to handle distributing a single token across multiple output ports. --- streamflow/cwl/transformer.py | 14 +++++++++++++- streamflow/cwl/translator.py | 22 ++++++++++++++++++++-- streamflow/workflow/step.py | 7 ++++++- streamflow/workflow/transformer.py | 27 ++++++++++++++++++++++----- 4 files changed, 61 insertions(+), 9 deletions(-) diff --git a/streamflow/cwl/transformer.py b/streamflow/cwl/transformer.py index fa7a9fc37..ce105429d 100644 --- a/streamflow/cwl/transformer.py +++ b/streamflow/cwl/transformer.py @@ -18,7 +18,11 @@ from streamflow.cwl.step import build_token from streamflow.cwl.workflow import CWLWorkflow from streamflow.workflow.token import ListToken, TerminationToken -from streamflow.workflow.transformer import ManyToOneTransformer, OneToOneTransformer +from streamflow.workflow.transformer import ( + ManyToOneTransformer, + OneToManyTransformer, + OneToOneTransformer, +) from streamflow.workflow.utils import get_token_value @@ -37,6 +41,14 @@ async def transform( return {self.get_output_name(): self._transform(*next(iter(inputs.items())))} +class BroadcastTransformer(OneToManyTransformer): + async def transform( + self, inputs: MutableMapping[str, Token] + ) -> MutableMapping[str, Token | MutableSequence[Token]]: + token = list(inputs.values()).pop() + return {name: token.update(token.value) for name in self.output_ports.keys()} + + class CartesianProductSizeTransformer(ManyToOneTransformer): async def transform( self, inputs: MutableMapping[str, Token] diff --git a/streamflow/cwl/translator.py b/streamflow/cwl/translator.py index 01f690ba8..4b727b146 100644 --- a/streamflow/cwl/translator.py +++ b/streamflow/cwl/translator.py @@ -75,6 +75,7 @@ ) from streamflow.cwl.transformer import ( AllNonNullTransformer, + BroadcastTransformer, CartesianProductSizeTransformer, CloneTransformer, CWLTokenTransformer, @@ -1910,10 +1911,27 @@ def _inject_inputs(self, workflow: Workflow) -> None: value=self.cwl_inputs[port_name], ) # Search empty unbound input ports + empty_ports = [] for input_port in workflow.ports.values(): if input_port.empty() and not input_port.get_input_steps(): - input_port.put(Token(value=None, recoverable=True)) - input_port.put(TerminationToken()) + empty_ports.append(input_port) + if empty_ports: + step = workflow.create_step( + cls=BroadcastTransformer, + name="/__empty_unbound_inputs-bcast__", + ) + upstream_port = workflow.create_port() + step.add_input_port("__empty_unbound__", upstream_port) + for downstream_port in empty_ports: + step.add_output_port(downstream_port.name, downstream_port) + self._inject_input( + workflow=workflow, + global_name="/__empty_unbound_inputs__", + port_name="__empty_unbound_inputs__", + port=upstream_port, + output_directory=output_directory, + value=None, + ) def _recursive_translate( self, diff --git a/streamflow/workflow/step.py b/streamflow/workflow/step.py index c24b29e98..44d0b66b9 100644 --- a/streamflow/workflow/step.py +++ b/streamflow/workflow/step.py @@ -149,10 +149,15 @@ async def _persist_token( ) -> Token: if token.persistent_id: raise WorkflowDefinitionException( - f"Token already has an id: {token.persistent_id}" + f"Step {self.name} failed to save token: Token already has a persistent id: {token.persistent_id}" ) await token.save(self.workflow.context.database, port_id=port.persistent_id) if input_token_ids: + if any(id_ is None for id_ in input_token_ids): + raise WorkflowExecutionException( + f"Step {self.name} cannot establish provenance: " + f"One or more input tokens have not been persisted: {input_token_ids}" + ) await self.workflow.context.database.add_provenance( inputs=input_token_ids, token=token.persistent_id ) diff --git a/streamflow/workflow/transformer.py b/streamflow/workflow/transformer.py index 8fbfaf988..72936fb52 100644 --- a/streamflow/workflow/transformer.py +++ b/streamflow/workflow/transformer.py @@ -11,7 +11,7 @@ def add_output_port(self, name: str, port: Port) -> None: super().add_output_port(name, port) else: raise WorkflowDefinitionException( - f"{self.name} step must contain a single output port." + f"Step {self.name} must contain a single output port." ) def get_output_name(self) -> str: @@ -20,7 +20,24 @@ def get_output_name(self) -> str: async def run(self) -> None: if len(self.output_ports) != 1: raise WorkflowDefinitionException( - f"{self.name} step must contain a single output port." + f"Step {self.name} must contain a single output port." + ) + await super().run() + + +class OneToManyTransformer(Transformer, ABC): + def add_input_port(self, name: str, port: Port) -> None: + if not self.input_ports: + super().add_input_port(name, port) + else: + raise WorkflowDefinitionException( + f"Step {self.name} must contain a single input port." + ) + + async def run(self) -> None: + if len(self.input_ports) != 1: + raise WorkflowDefinitionException( + f"Step {self.name} must contain a single input port." ) await super().run() @@ -31,16 +48,16 @@ def add_input_port(self, name: str, port: Port) -> None: super().add_input_port(name, port) else: raise WorkflowDefinitionException( - f"{self.name} step must contain a single input port." + f"Step {self.name} must contain a single input port." ) async def run(self) -> None: if len(self.input_ports) != 1: raise WorkflowDefinitionException( - f"{self.name} step must contain a single input port." + f"Step {self.name} must contain a single input port." ) if len(self.output_ports) != 1: raise WorkflowDefinitionException( - f"{self.name} step must contain a single output port." + f"Step {self.name} must contain a single output port." ) await super().run() From ef7493f3e8b8c2bdfb15b65d192b2b3f03b5aadb Mon Sep 17 00:00:00 2001 From: LanderOtto Date: Thu, 26 Feb 2026 17:11:08 +0100 Subject: [PATCH 2/4] Refactor --- streamflow/cwl/translator.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/streamflow/cwl/translator.py b/streamflow/cwl/translator.py index 4b727b146..1cef4e47c 100644 --- a/streamflow/cwl/translator.py +++ b/streamflow/cwl/translator.py @@ -1915,15 +1915,24 @@ def _inject_inputs(self, workflow: Workflow) -> None: for input_port in workflow.ports.values(): if input_port.empty() and not input_port.get_input_steps(): empty_ports.append(input_port) - if empty_ports: + if len(empty_ports) == 1: + self._inject_input( + workflow=workflow, + global_name="/__empty_unbound_inputs__", + port_name="__empty_unbound_inputs__", + port=empty_ports[0], + output_directory=output_directory, + value=None, + ) + elif len(empty_ports) > 1: step = workflow.create_step( cls=BroadcastTransformer, - name="/__empty_unbound_inputs-bcast__", + name="/__empty_unbound_inputs__-bcast", ) upstream_port = workflow.create_port() - step.add_input_port("__empty_unbound__", upstream_port) - for downstream_port in empty_ports: - step.add_output_port(downstream_port.name, downstream_port) + step.add_input_port("__upstream__", upstream_port) + for i, downstream_port in enumerate(empty_ports): + step.add_output_port(f"__downstream_{i}__", downstream_port) self._inject_input( workflow=workflow, global_name="/__empty_unbound_inputs__", From 25a11c108fc3dd4680231811f120e03c7e3e557b Mon Sep 17 00:00:00 2001 From: LanderOtto Date: Wed, 20 May 2026 10:51:31 +0200 Subject: [PATCH 3/4] Refactor --- streamflow/cwl/translator.py | 29 +++++++++++------------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/streamflow/cwl/translator.py b/streamflow/cwl/translator.py index 1cef4e47c..f118eafa5 100644 --- a/streamflow/cwl/translator.py +++ b/streamflow/cwl/translator.py @@ -1915,16 +1915,7 @@ def _inject_inputs(self, workflow: Workflow) -> None: for input_port in workflow.ports.values(): if input_port.empty() and not input_port.get_input_steps(): empty_ports.append(input_port) - if len(empty_ports) == 1: - self._inject_input( - workflow=workflow, - global_name="/__empty_unbound_inputs__", - port_name="__empty_unbound_inputs__", - port=empty_ports[0], - output_directory=output_directory, - value=None, - ) - elif len(empty_ports) > 1: + if len(empty_ports) > 1: step = workflow.create_step( cls=BroadcastTransformer, name="/__empty_unbound_inputs__-bcast", @@ -1933,14 +1924,16 @@ def _inject_inputs(self, workflow: Workflow) -> None: step.add_input_port("__upstream__", upstream_port) for i, downstream_port in enumerate(empty_ports): step.add_output_port(f"__downstream_{i}__", downstream_port) - self._inject_input( - workflow=workflow, - global_name="/__empty_unbound_inputs__", - port_name="__empty_unbound_inputs__", - port=upstream_port, - output_directory=output_directory, - value=None, - ) + else: + upstream_port = empty_ports[0] + self._inject_input( + workflow=workflow, + global_name="/__empty_unbound_inputs__", + port_name="__empty_unbound_inputs__", + port=upstream_port, + output_directory=output_directory, + value=None, + ) def _recursive_translate( self, From 443fbf44faac55c7341be45f8f55fe0a2c7d2b78 Mon Sep 17 00:00:00 2001 From: LanderOtto Date: Wed, 20 May 2026 10:55:12 +0200 Subject: [PATCH 4/4] Update `CHANGELOG.md` --- CHANGELOG.md | 1 + streamflow/cwl/translator.py | 39 ++++++++++++++++++------------------ 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 38093aefd..94b908f16 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Fix provenance of empty unbound inputs ([#968](https://github.com/alpha-unito/streamflow/pull/968)) - Fix bind mount inspection in `SingularityConnector` ([#1058](https://github.com/alpha-unito/streamflow/pull/1058)) - Fix `--wait` flag in `Helm4Connector` for Cobra parser ([#1050](https://github.com/alpha-unito/streamflow/pull/1050)) - Fix shell reuse collision across execution locations ([#1045](https://github.com/alpha-unito/streamflow/pull/1045)) diff --git a/streamflow/cwl/translator.py b/streamflow/cwl/translator.py index f118eafa5..fac5d83fb 100644 --- a/streamflow/cwl/translator.py +++ b/streamflow/cwl/translator.py @@ -1915,25 +1915,26 @@ def _inject_inputs(self, workflow: Workflow) -> None: for input_port in workflow.ports.values(): if input_port.empty() and not input_port.get_input_steps(): empty_ports.append(input_port) - if len(empty_ports) > 1: - step = workflow.create_step( - cls=BroadcastTransformer, - name="/__empty_unbound_inputs__-bcast", - ) - upstream_port = workflow.create_port() - step.add_input_port("__upstream__", upstream_port) - for i, downstream_port in enumerate(empty_ports): - step.add_output_port(f"__downstream_{i}__", downstream_port) - else: - upstream_port = empty_ports[0] - self._inject_input( - workflow=workflow, - global_name="/__empty_unbound_inputs__", - port_name="__empty_unbound_inputs__", - port=upstream_port, - output_directory=output_directory, - value=None, - ) + if len(empty_ports) > 0: + if len(empty_ports) > 1: + step = workflow.create_step( + cls=BroadcastTransformer, + name="/__empty_unbound_inputs__-bcast", + ) + upstream_port = workflow.create_port() + step.add_input_port("__upstream__", upstream_port) + for i, downstream_port in enumerate(empty_ports): + step.add_output_port(f"__downstream_{i}__", downstream_port) + else: + upstream_port = empty_ports[0] + self._inject_input( + workflow=workflow, + global_name="/__empty_unbound_inputs__", + port_name="__empty_unbound_inputs__", + port=upstream_port, + output_directory=output_directory, + value=None, + ) def _recursive_translate( self,