Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 7 additions & 27 deletions swirlc/compiler/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
sock.close()
"""

exec_function = """def _exec(step_name: str, step_display_name: str, input_port_names: MutableSequence[str], output_port_name: str, data_type: str, glob_regex: str | None, cmd: str, args: MutableSequence[str], args_from: MutableSequence[tuple[str, str]]):
exec_function = """def _exec(step_name: str, step_display_name: str, input_port_names: MutableSequence[str], output_port_name: str, data_type: str, glob_regex: str | None, cmd: str, args: MutableSequence[tuple[str,bool]]):
# Wait all the data
for port_name in input_port_names:
available_port_data[port_name].wait()
Expand All @@ -89,26 +89,8 @@
os.mkdir(workdir)
for port_name in input_port_names:
os.symlink(os.path.abspath(ports[port_name]), os.path.join(workdir, os.path.basename(ports[port_name])))
# Populate the arguments
arguments = []
if (len_args := len(args)) > 0:
args = iter(args)
if (len_args_from := len(args_from)) > 0:
args_from = iter(args_from)
elem = next(args_from)
next_pos, next_port = elem
else:
next_pos, next_port = -1, None
for i in range(len_args + len_args_from):
if i == next_pos:
arguments.append(ports[next_port])
if i < len_args_from - 1:
next_pos, next_port = next(args_from)
else:
next_pos, next_port = -1, None
else:
arguments.append(next(args))
cmd = " ".join((cmd, *arguments))
# Execute command
cmd = " ".join([cmd, *(ports[elem] if is_data else elem for elem, is_data in args)])
if logger.isEnabledFor(logging.INFO):
logger.info(f"Step {step_display_name}-{step_name} executes command '{cmd}'")
result = subprocess.run(cmd, capture_output=True, shell=True, cwd=workdir)
Expand Down Expand Up @@ -456,11 +438,9 @@ def exec(
flow: tuple[set[tuple[str, str]], set[tuple[str, str]]],
mapping: set[str],
):
arguments = [arg for arg in step.arguments if not isinstance(arg, Port)]
arguments_from_port = [
(i, arg.name)
for i, arg in enumerate(step.arguments)
if isinstance(arg, Port)
arguments = [
(arg.name if isinstance(arg, Port) else arg, isinstance(arg, Port))
for arg in step.arguments
]

outputs = flow[1]
Expand All @@ -471,7 +451,7 @@ def exec(
{self._get_indentation()}input_port_names = {[port_name for port_name, _ in flow[0]]}
{self._get_indentation()}for port_name in input_port_names:
{self._get_indentation()} available_port_data.setdefault(port_name, Event())
{self._get_indentation()}_exec("{step.name}", "{step.display_name}", input_port_names, "{output_port_name}", "{step.processors[output_port_name].type if output_port_name else ""}", "{step.processors[output_port_name].glob if output_port_name else ""}", "{step.command}", {arguments}, {arguments_from_port})"""
{self._get_indentation()}_exec("{step.name}", "{step.display_name}", input_port_names, "{output_port_name}", "{step.processors[output_port_name].type if output_port_name else ""}", "{step.processors[output_port_name].glob if output_port_name else ""}", "{step.command}", {arguments})"""
)

def par(self) -> None:
Expand Down