From dd11d0e36b19e96376fa1a2df5e3bb26a16025ae Mon Sep 17 00:00:00 2001 From: loren Date: Thu, 13 Nov 2025 10:38:30 +0100 Subject: [PATCH] lint stuff --- .docker/Dockerfile | 2 +- .gitignore | 2 +- .pre-commit-config.yaml | 2 +- src/moco/roaml_generator/main.py | 9 +- .../ros_helpers/ros_action_handler.py | 2 +- .../ros_helpers/ros_service_handler.py | 2 +- .../scxml_helpers/scxml_event.py | 8 +- .../scxml_helpers/scxml_event_processor.py | 149 ---------- .../scxml_helpers/scxml_expression.py | 278 ------------------ .../scxml_helpers/top_level_interpreter.py | 19 +- 10 files changed, 23 insertions(+), 450 deletions(-) delete mode 100644 src/moco/roaml_generator/scxml_helpers/scxml_event_processor.py delete mode 100644 src/moco/roaml_generator/scxml_helpers/scxml_expression.py diff --git a/.docker/Dockerfile b/.docker/Dockerfile index 96b35f8f..c8887f83 100644 --- a/.docker/Dockerfile +++ b/.docker/Dockerfile @@ -11,7 +11,7 @@ RUN apt update && \ RUN mkdir -p /colcon_ws/src COPY . /colcon_ws/src/moco #TODO fix package name once release is up -RUN pip3 install --break-system-packages /colcon_ws/src/moco[dev] +RUN pip3 install --break-system-packages /colcon_ws/src/moco[dev] # Make ROS workspace and build it diff --git a/.gitignore b/.gitignore index 2979d496..be53b30b 100644 --- a/.gitignore +++ b/.gitignore @@ -372,4 +372,4 @@ MigrationBackup/ .ionide/ # Fody - auto-generated XML schema -FodyWeavers.xsd \ No newline at end of file +FodyWeavers.xsd diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 9b99b203..756b8a64 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -33,7 +33,7 @@ repos: rev: 25.1.0 hooks: - id: black - language_version: python3.10 + language_version: python3.12 # - repo: https://github.com/pycqa/pylint # rev: v3.3.5 # hooks: diff --git a/src/moco/roaml_generator/main.py b/src/moco/roaml_generator/main.py index cfcb6a16..57be64e2 100644 --- a/src/moco/roaml_generator/main.py +++ b/src/moco/roaml_generator/main.py @@ -37,14 +37,16 @@ def main_roaml_to_scxml(_args: Optional[Sequence[str]] = None) -> None: :param args: The arguments to parse. If None, sys.argv is used. :return: None """ - parser = argparse.ArgumentParser(description="Convert RoaML robot system models to plain SCXML.") + parser = argparse.ArgumentParser( + description="Convert RoaML robot system models to plain SCXML." + ) parser.add_argument( "--generated-scxml-dir", type=str, default="./output", help="Path to the folder containing the generated plain-SCXML files.", ) - + parser.add_argument("main_xml", type=str, help="The path to the main XML file to interpret.") args = parser.parse_args(_args) @@ -55,7 +57,7 @@ def main_roaml_to_scxml(_args: Optional[Sequence[str]] = None) -> None: # Process additional, optional parameters scxml_out_dir = args.generated_scxml_dir scxml_out_dir = None if len(scxml_out_dir) == 0 else scxml_out_dir - + print("MOCO - RoAML to SCXML.\n") print(f"Loading model from {main_xml_file}.") @@ -63,6 +65,7 @@ def main_roaml_to_scxml(_args: Optional[Sequence[str]] = None) -> None: print(f"SCXML model saved to {scxml_out_dir}") + if __name__ == "__main__": # for testing purposes only import sys diff --git a/src/moco/roaml_generator/ros_helpers/ros_action_handler.py b/src/moco/roaml_generator/ros_helpers/ros_action_handler.py index c426ee20..9826fdaf 100644 --- a/src/moco/roaml_generator/ros_helpers/ros_action_handler.py +++ b/src/moco/roaml_generator/ros_helpers/ros_action_handler.py @@ -19,7 +19,6 @@ from typing import Callable, Dict, List, Tuple -from moco.roaml_generator.ros_helpers.ros_communication_handler import RosCommunicationHandler from moco.roaml_converter.scxml_entries import ( ScxmlAssign, ScxmlData, @@ -51,6 +50,7 @@ PLAIN_SCXML_EVENT_DATA_PREFIX, ROS_FIELD_PREFIX, ) +from moco.roaml_generator.ros_helpers.ros_communication_handler import RosCommunicationHandler class RosActionHandler(RosCommunicationHandler): diff --git a/src/moco/roaml_generator/ros_helpers/ros_service_handler.py b/src/moco/roaml_generator/ros_helpers/ros_service_handler.py index ef1c913b..ae447078 100644 --- a/src/moco/roaml_generator/ros_helpers/ros_service_handler.py +++ b/src/moco/roaml_generator/ros_helpers/ros_service_handler.py @@ -19,7 +19,6 @@ from typing import Dict, List -from moco.roaml_generator.ros_helpers.ros_communication_handler import RosCommunicationHandler from moco.roaml_converter.scxml_entries import ( ScxmlAssign, ScxmlDataModel, @@ -38,6 +37,7 @@ sanitize_ros_interface_name, ) from moco.roaml_converter.scxml_entries.utils import PLAIN_FIELD_EVENT_PREFIX, ROS_FIELD_PREFIX +from moco.roaml_generator.ros_helpers.ros_communication_handler import RosCommunicationHandler class RosServiceHandler(RosCommunicationHandler): diff --git a/src/moco/roaml_generator/scxml_helpers/scxml_event.py b/src/moco/roaml_generator/scxml_helpers/scxml_event.py index 9a378a4c..057ffd62 100644 --- a/src/moco/roaml_generator/scxml_helpers/scxml_event.py +++ b/src/moco/roaml_generator/scxml_helpers/scxml_event.py @@ -21,10 +21,6 @@ from dataclasses import dataclass from typing import Dict, List, MutableSequence, Optional, Type, Union -from moco.roaml_generator.ros_helpers.ros_timer import ( - ROS_TIMER_RATE_EVENT_PREFIX, - is_global_timer_event, -) from moco.roaml_converter.scxml_entries.bt_utils import ( is_bt_halt_event, is_bt_halt_response_event, @@ -37,6 +33,10 @@ is_action_thread_event, is_srv_event, ) +from moco.roaml_generator.ros_helpers.ros_timer import ( + ROS_TIMER_RATE_EVENT_PREFIX, + is_global_timer_event, +) class EventSender: diff --git a/src/moco/roaml_generator/scxml_helpers/scxml_event_processor.py b/src/moco/roaml_generator/scxml_helpers/scxml_event_processor.py deleted file mode 100644 index 1f42cc7f..00000000 --- a/src/moco/roaml_generator/scxml_helpers/scxml_event_processor.py +++ /dev/null @@ -1,149 +0,0 @@ -# Copyright (c) 2024 - for information on the respective copyright owner -# see the NOTICE file - -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Module to process events from scxml and implement them as syncs between jani automata. -""" - -from typing import Dict, List, MutableSequence, Optional, Tuple - -from moco.moco_common.array_type import ArrayInfo, is_array_type -from moco.roaml_generator.jani_entries import ( - JaniAutomaton, - JaniComposition, - JaniEdge, - JaniModel, - JaniVariable, -) -from moco.roaml_generator.jani_entries.jani_expression_generator import array_create_operator -from moco.roaml_generator.ros_helpers.ros_timer import ( - GLOBAL_TIMER_AUTOMATON, - GLOBAL_TIMER_TICK_ACTION, - GLOBAL_TIMER_TICK_EVENT, - ROS_TIMER_RATE_EVENT_PREFIX, -) -from moco.roaml_generator.scxml_helpers.scxml_event import Event, EventsHolder -from moco.roaml_generator.scxml_helpers.scxml_expression import get_array_length_var_name -from moco.roaml_converter.data_types.type_utils import MEMBER_ACCESS_SUBSTITUTION - -JANI_TIMER_ENABLE_ACTION = "global_timer_enable" - - -def _generate_event_action_names(event_obj: Event) -> Tuple[str, str]: - """Get the action names related to sending and receiving the event.""" - return (f"{event_obj.name}_on_send", f"{event_obj.name}_on_receive") - - -def _generate_event_edge(source_state: str, target_state: str, action_name: str) -> JaniEdge: - """Generate a generic edge with one destination and no assignments.""" - return JaniEdge( - { - "location": source_state, - "destinations": [ - {"location": target_state, "probability": {"exp": 1.0}, "assignments": []} - ], - "action": action_name, - } - ) - - -def _generate_event_automaton(event_obj: Event, add_timer_sync: bool) -> Optional[JaniAutomaton]: - """Generate a JaniAutomaton out of a (non-timer) event.""" - assert not event_obj.is_timer_event(), "This function cannot be used on timer events." - if event_obj.must_be_skipped_in_jani_conversion(): - return None - event_send_action, event_recv_action = _generate_event_action_names(event_obj) - waiting_str = "waiting" - received_str = "received" - assert event_obj.has_senders(), f"Event {event_obj.name} must have at least one sender" - event_automaton = JaniAutomaton() - event_automaton.set_name(event_obj.name) - event_automaton.add_location(waiting_str, is_initial=True) - if event_obj.has_receivers(): - event_automaton.add_location(received_str) - event_automaton.add_edge(_generate_event_edge(waiting_str, received_str, event_send_action)) - event_automaton.add_edge(_generate_event_edge(received_str, waiting_str, event_recv_action)) - if add_timer_sync: - # Additional self-loop in the waiting state - # used to enable the global timer to tick only if all other events have been processed - event_automaton.add_edge( - _generate_event_edge(waiting_str, waiting_str, JANI_TIMER_ENABLE_ACTION) - ) - else: - # In this case, we have only a self-loop since no receive sync is needed - event_automaton.add_edge(_generate_event_edge(waiting_str, waiting_str, event_send_action)) - return event_automaton - - -def _generate_event_variables(event_obj: Event, max_array_size: int) -> List[JaniVariable]: - """Generate the variables required for handling a provided event.""" - jani_vars: List[JaniVariable] = [] - jani_vars.append(JaniVariable(f"{event_obj.name}.valid", bool, False)) - for param_name, param_info in event_obj.get_data_structure().items(): - var_name = f"{event_obj.name}{MEMBER_ACCESS_SUBSTITUTION}{param_name}" - if is_array_type(param_info.p_type): - ar_type = param_info.p_array_type - assert ar_type is not None, "None array type found: this is unexpected." - array_dims = param_info.p_dimensions - array_info = ArrayInfo(ar_type, array_dims, [max_array_size] * array_dims) - array_init = array_create_operator(array_info) - jani_vars.append( - JaniVariable(var_name, param_info.p_type, array_init, False, array_info) - ) - for dim in range(array_dims): - len_var_name = get_array_length_var_name(var_name, dim + 1) - if dim == 0: - jani_vars.append(JaniVariable(len_var_name, int, 0)) - else: - len_array_info = ArrayInfo(int, dim, [max_array_size] * dim) - len_array_init = array_create_operator(len_array_info) - jani_vars.append( - JaniVariable( - len_var_name, MutableSequence, len_array_init, False, len_array_info - ) - ) - else: - jani_vars.append(JaniVariable(var_name, param_info.p_type)) - return jani_vars - - -def _preprocess_global_timer_automaton(timer_automaton: JaniAutomaton): - """ - Modify the global timer automaton to meet different assumptions. - - - We expect no assignments to timer_name.valid variables. - - Similarly, no assignments for the {GLOBAL_TIMER_TICK_EVENT}.valid variable. - - The {GLOBAL_TIMER_TICK_EVENT}_on_send action is treated as a non-sync action. - - The {GLOBAL_TIMER_TICK_EVENT}_on_receive action the global timer step, and is renamed. - """ - global_timer_edges = timer_automaton.get_edges() - for jani_edge in global_timer_edges: - action_name = jani_edge.get_action() - if action_name is not None: - if action_name.startswith(ROS_TIMER_RATE_EVENT_PREFIX): - assert ( - len(jani_edge.destinations) == 1 - ), f"Unexpected n. of destination for timer edge '{action_name}'" - assert ( - len(jani_edge.destinations[0]["assignments"]) == 1 - ), f"Unexpected n. of assignments for timer edge '{action_name}'" - # Get rid of the assignment - jani_edge.destinations[0]["assignments"] = [] - elif action_name == f"{GLOBAL_TIMER_TICK_EVENT}_on_receive": - jani_edge.set_action(GLOBAL_TIMER_TICK_ACTION) - elif action_name == f"{GLOBAL_TIMER_TICK_EVENT}_on_send": - jani_edge.set_action(GLOBAL_TIMER_TICK_EVENT) - jani_edge.destinations[0]["assignments"] = [] - diff --git a/src/moco/roaml_generator/scxml_helpers/scxml_expression.py b/src/moco/roaml_generator/scxml_helpers/scxml_expression.py deleted file mode 100644 index 2531153a..00000000 --- a/src/moco/roaml_generator/scxml_helpers/scxml_expression.py +++ /dev/null @@ -1,278 +0,0 @@ -# Copyright (c) 2024 - for information on the respective copyright owner -# see the NOTICE file - -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Module producing jani expressions from ecmascript. -""" - -# TODO: Rename this file to something more meaningful, like ecmascript_to_jani.py - -from typing import List, Optional, Type, Union - -import esprima -from esprima.syntax import Syntax -from lxml.etree import _Element as XmlElement - -from moco.moco_common.ecmascript_interpretation_functions import ( - get_ast_expression_type, - get_list_from_array_expr, -) -from moco.moco_common.logging import check_assertion, get_error_msg -from moco.roaml_generator.jani_entries.jani_convince_expression_expansion import ( - CALLABLE_OPERATORS_MAP, - OPERATORS_TO_JANI_MAP, - UNARY_OPERATORS_MAP, -) -from moco.roaml_generator.jani_entries.jani_expression import JaniExpression -from moco.roaml_generator.jani_entries.jani_expression_generator import ( - array_access_operator, - array_value_operator, -) -from moco.roaml_generator.jani_entries.jani_value import JaniValue -from moco.roaml_converter.data_types.type_utils import ARRAY_LENGTH_SUFFIX, ArrayInfo - -JS_CALLABLE_PREFIX = "Math" - - -def get_array_length_var_name(array_name: str, dimension: int) -> str: - """ - Generate the name of the variable holding the lengths for a specific dimension. - - :param array_name: The name of the array this variable refers to. - :param dimension: to which dimension the length variable refers to [1-N]. - :return: The variable name - """ - assert isinstance(array_name, str) and len(array_name) > 0 - assert isinstance(dimension, int) and dimension > 0 - return f"{array_name}.d{dimension}_len" - - -def convert_array_access_to_length_access(jani_expr: JaniExpression) -> JaniExpression: - """ - Substitute the array_access ('aa') expression to access the related array dimension variable. - """ - return __convert_array_access_to_length_access(jani_expr, 1) - - -def __convert_array_access_to_length_access( - jani_expr: JaniExpression, dim_level: int -) -> JaniExpression: - """ - Implementation of the `convert_array_access_to_length_access` function. - - :param jani_expr: The expression to process, must be an `aa` operator. - :param dim_level: The level of the previous jani_expr. - :return: The new expression to use for accessing the length. - """ - jani_operator, jani_operands = jani_expr.as_operator() - assert jani_operator == "aa", f"Expected JANI operator `aa`, found `{jani_operator}`." - assert jani_operands is not None, "Expected jani_operands to be not None." - curr_dim_level = dim_level + 1 - array_expr = jani_operands["exp"] - index_expr = jani_operands["index"] - assert isinstance(array_expr, JaniExpression), f"Unexpected value of {array_expr=}." - array_expr_id = array_expr.as_identifier() - if array_expr_id is not None: - array_length_var = get_array_length_var_name(array_expr_id, curr_dim_level) - return array_access_operator(array_length_var, index_expr) - return array_access_operator( - __convert_array_access_to_length_access(array_expr, curr_dim_level), index_expr - ) - - -def parse_ecmascript_to_jani_expression( - ecmascript: str, - elem: Optional[XmlElement], - target_array_info: Optional[ArrayInfo] = None, -) -> JaniExpression: - """ - Parse ecmascript to jani expression. - - :param ecmascript: The ecmascript to parse. - :param elem: The xml element associated to the expression, for error logging. - :param target_array_info: The array info required by the target variable (if any). - - :return: The jani expression. - """ - check_assertion(isinstance(ecmascript, str), elem, f"Unexpected type {type(ecmascript)}.") - try: - ast = esprima.parseScript(ecmascript) - except esprima.error_handler.Error as e: - raise RuntimeError( - get_error_msg(elem, f"Failed parsing ecmascript: {ecmascript}. Error: {e}.") - ) - assert len(ast.body) == 1, get_error_msg( - elem, "The ecmascript must contain exactly one expression." - ) - ast = ast.body[0] - try: - jani_expression = _parse_ecmascript_to_jani_expression(ast, target_array_info) - except NotImplementedError as e: - raise RuntimeError(get_error_msg(elem, f"Unsupported ecmascript '{ecmascript}': {e}")) - except AssertionError as e: - raise RuntimeError(get_error_msg(elem, f"Assertion from ecmascript '{ecmascript}': {e}")) - return jani_expression - - -def _add_padding_to_array( - in_list: List[Union[int, float, List]], array_info: ArrayInfo -) -> List[Union[int, float, List]]: - """ - Add elements to the input list, such that the number of elements match the expected size. - - :param in_list: The list that requires padding. - :array_info: The information required to determine the expected sizes. - """ - expected_shape: List[int] = array_info.array_max_sizes - expected_type: Type[Union[int, float]] = array_info.array_type - return _pad_list_recursive(in_list, expected_shape, expected_type) - - -def _pad_list_recursive( - in_list: List[Union[int, float, List]], shape: List[int], entry_type: Type[Union[int, float]] -): - """The recursive implementation of the padding functionality.""" - curr_size = shape[0] - assert isinstance(curr_size, int), f"Unexpected array size found: {curr_size} is not an int." - assert isinstance(in_list, list), f"Unexpected input list: {in_list} is not a list." - len_padding = curr_size - len(in_list) - assert len_padding >= 0, "Input list is larger than the expected max. dimension." - if len(shape) == 1: - return in_list + [entry_type(0)] * len_padding - # If here, we have a multi-dimensional array: just append empty lists and fill them afterwards - padded_list = in_list + [[]] * len_padding - # Now, extend this list! - return [_pad_list_recursive(lst_entry, shape[1:], entry_type) for lst_entry in padded_list] - - -def _parse_ecmascript_to_jani_expression( - ast: esprima.nodes.Node, - target_array_info: Optional[ArrayInfo], -) -> JaniExpression: - """ - Parse ecmascript to jani expression. - - :param ast: The AST expression to convert. - :param target_array_info: ArrayInfo related to target variable (if defined) - :return: The jani expression. - """ - if ast.type == Syntax.ExpressionStatement: - # This is the highest level for each esprima script - return _parse_ecmascript_to_jani_expression(ast.expression, target_array_info) - elif ast.type == Syntax.Literal: - if isinstance(ast.value, str): - raise RuntimeError("This should not contain string expressions any more.") - return JaniExpression(JaniValue(ast.value)) - elif ast.type == Syntax.Identifier: - # If it is an identifier, we do not need to expand further - assert ast.name not in ("True", "False"), ( - f"Boolean {ast.name} mistaken for an identifier. " - "Did you mean to use 'true' or 'false' instead?" - ) - return JaniExpression(ast.name) - elif ast.type == Syntax.UnaryExpression: - assert ast.prefix is True, "only prefixes are supported" - assert ast.operator in UNARY_OPERATORS_MAP, ( - f"Operator {ast.operator} is not supported. " - + f"Only {UNARY_OPERATORS_MAP.keys()} are supported." - ) - return UNARY_OPERATORS_MAP[ast.operator]( - _parse_ecmascript_to_jani_expression(ast.argument, target_array_info) - ) - elif ast.type == Syntax.BinaryExpression or ast.type == Syntax.LogicalExpression: - # It is a more complex expression - assert ( - ast.operator in OPERATORS_TO_JANI_MAP - ), f"ecmascript to jani expression: unknown operator {ast.operator}" - return JaniExpression( - { - "op": OPERATORS_TO_JANI_MAP[ast.operator], - "left": _parse_ecmascript_to_jani_expression(ast.left, target_array_info), - "right": _parse_ecmascript_to_jani_expression(ast.right, target_array_info), - } - ) - elif ast.type == Syntax.ArrayExpression: - # This is an Array literal, will result in an array_value operator - array_entries_info = get_ast_expression_type(ast, {}) - assert isinstance(array_entries_info, ArrayInfo), "Unexpected type extracted from AST expr." - assert array_entries_info.array_type in (int, float, None), "Unexpected array type." - expected_type: Optional[Type[Union[int, float]]] = array_entries_info.array_type - if target_array_info is not None: - # If target type is float, we are ok with everything - # If entries type is int or None, we are OK with everything as well - assert target_array_info.array_type is float or array_entries_info.array_type in ( - int, - None, - ), "The target var and the expr. type are not compatible." - expected_type = target_array_info.array_type - extracted_list = get_list_from_array_expr(ast, expected_type) - if target_array_info is not None: - extracted_list = _add_padding_to_array(extracted_list, target_array_info) - return array_value_operator(extracted_list) - elif ast.type == Syntax.MemberExpression: - object_expr = _parse_ecmascript_to_jani_expression(ast.object, target_array_info) - property_expr = _parse_ecmascript_to_jani_expression(ast.property, target_array_info) - if ast.computed: - # This is an array access, like `array[0]` - return array_access_operator(object_expr, property_expr) - else: - # Access to the member of an object through dot notation. Two cases: - # 1: Generic dot notation, like `object.member` - # 2: Length access, like `array.length` or `array[2].length` - object_expr_str = object_expr.as_identifier() - property_expr_str = property_expr.as_identifier() - assert ( - property_expr_str is not None - ), f"Unexpected value for property of {ast}. Shall be an Identifier." - is_array_length = property_expr_str == ARRAY_LENGTH_SUFFIX - if is_array_length: - # We are accessing the array length information, some renaming needs to be done - if object_expr_str is not None: - # Accessing array dimension at level 1 - return JaniExpression(get_array_length_var_name(object_expr_str, 1)) - else: - # We need to count how many levels deep we need to go (n. of ArrayAccess) - return convert_array_access_to_length_access(object_expr) - else: - # We are accessing a generic sub-field, just re-assemble the variable name - assert ( - object_expr_str is not None - ), "Only identifiers can be accessed through dot notation." - return JaniExpression(f"{object_expr_str}.{property_expr_str}") - elif ast.type == Syntax.CallExpression: - # We expect function calls to be of the form Math.function_name(args) (JavaScript-like) - # The "." operator is represented as a MemberExpression - assert ( - ast.callee.type == Syntax.MemberExpression - ), f"Functions callee is expected to be MemberExpressions, found {ast.callee}." - assert ( - ast.callee.object.type == Syntax.Identifier - ), f"Callee object is expected to be an Identifier, found {ast.callee.object}." - assert ( - ast.callee.property.type == Syntax.Identifier - ), f"Callee property is expected to be an Identifier, found {ast.callee.property}." - assert ( - ast.callee.object.name == JS_CALLABLE_PREFIX - ), f"Function calls prefix is expected to be 'Math', found {ast.callee.object.name}." - function_name: str = ast.callee.property.name - assert ( - function_name in CALLABLE_OPERATORS_MAP - ), f"Unsupported function call {function_name}." - expression_args: List[JaniExpression] = [] - for arg in ast.arguments: - expression_args.append(_parse_ecmascript_to_jani_expression(arg, target_array_info)) - return CALLABLE_OPERATORS_MAP[function_name](*expression_args) - else: - raise NotImplementedError(f"Unsupported ecmascript type: {ast.type}") diff --git a/src/moco/roaml_generator/scxml_helpers/top_level_interpreter.py b/src/moco/roaml_generator/scxml_helpers/top_level_interpreter.py index 8e3a1c83..624b4092 100644 --- a/src/moco/roaml_generator/scxml_helpers/top_level_interpreter.py +++ b/src/moco/roaml_generator/scxml_helpers/top_level_interpreter.py @@ -20,6 +20,13 @@ from copy import deepcopy from typing import Dict, List, Optional +from moco.roaml_converter.bt_converter import ( + bt_converter, + generate_blackboard_scxml, + get_blackboard_variables_from_models, +) +from moco.roaml_converter.data_types.struct_definition import StructDefinition +from moco.roaml_converter.scxml_entries import EventsToAutomata, ScxmlRoot from moco.roaml_generator.ros_helpers.ros_action_handler import RosActionHandler from moco.roaml_generator.ros_helpers.ros_communication_handler import ( RosCommunicationHandler, @@ -29,13 +36,6 @@ from moco.roaml_generator.ros_helpers.ros_service_handler import RosServiceHandler from moco.roaml_generator.ros_helpers.ros_timer import RosTimer, make_global_timer_scxml from moco.roaml_generator.scxml_helpers.roaml_model import FullModel, RoamlDataStructures, RoamlMain -from moco.roaml_converter.bt_converter import ( - bt_converter, - generate_blackboard_scxml, - get_blackboard_variables_from_models, -) -from moco.roaml_converter.data_types.struct_definition import StructDefinition -from moco.roaml_converter.scxml_entries import EventsToAutomata, ScxmlRoot def generate_plain_scxml_models_and_timers(model: FullModel) -> List[ScxmlRoot]: @@ -137,9 +137,7 @@ def export_plain_scxml_models( f.write(scxml_model.as_xml_string(data_type_as_attribute=False)) -def interpret_top_level_xml( - xml_path: str, scxmls_dir: Optional[str] = None -): +def interpret_top_level_xml(xml_path: str, scxmls_dir: Optional[str] = None): """ Interpret the top-level XML file as a Jani model. And write it to a file. The generated Jani model is written to the same directory as the input XML file under the @@ -159,4 +157,3 @@ def interpret_top_level_xml( if scxmls_dir is not None: plain_scxml_dir = os.path.join(model_dir, scxmls_dir) export_plain_scxml_models(plain_scxml_dir, plain_scxml_models) -