diff --git a/.github/instructions/project.md b/.github/instructions/project.md new file mode 100644 index 0000000..daea1c3 --- /dev/null +++ b/.github/instructions/project.md @@ -0,0 +1,21 @@ + + +Project instructions + +To add a new dependency for the `fmuloader` project, you can use the following command in your terminal: + +```bash +uv add +``` + +To add a new development dependency for the `fmuloader` project, you can use the following command in your terminal: + +```bash +uv add --dev +``` + +To run the tests for the `fmuloader` project, you can use the following command in your terminal: + +```bash +uv run pytest +``` diff --git a/.github/instructions/python.md b/.github/instructions/python.md new file mode 100644 index 0000000..2d267c8 --- /dev/null +++ b/.github/instructions/python.md @@ -0,0 +1,58 @@ +--- +description: 'Python coding conventions and guidelines' +applyTo: '**/*.py' +--- + +# Python Coding Conventions + +## Python Instructions + +- Write clear and concise comments for each function. +- Ensure functions have descriptive names and include type hints. +- Provide docstrings following PEP 257 conventions. +- Use the `typing` module for type annotations (e.g., `List[str]`, `Dict[str, int]`). +- Break down complex functions into smaller, more manageable functions. + +## General Instructions + +- Always prioritize readability and clarity. +- For algorithm-related code, include explanations of the approach used. +- Write code with good maintainability practices, including comments on why certain design decisions were made. +- Handle edge cases and write clear exception handling. +- For libraries or external dependencies, mention their usage and purpose in comments. +- Use consistent naming conventions and follow language-specific best practices. +- Write concise, efficient, and idiomatic code that is also easily understandable. + +## Code Style and Formatting + +- Follow the **PEP 8** style guide for Python. +- Maintain proper indentation (use 4 spaces for each level of indentation). +- Ensure lines do not exceed 79 characters. +- Place function and class docstrings immediately after the `def` or `class` keyword. +- Use blank lines to separate functions, classes, and code blocks where appropriate. + +## Edge Cases and Testing + +- Always include test cases for critical paths of the application. +- Account for common edge cases like empty inputs, invalid data types, and large datasets. +- Include comments for edge cases and the expected behavior in those cases. +- Write unit tests for functions and document them with docstrings explaining the test cases. + +## Example of Proper Documentation + +```python +def calculate_area(radius: float) -> float: + """ + Calculate the area of a circle given the radius. + + Parameters: + ----------- + radius (float): The radius of the circle. Must be a non-negative value. + + Returns: + ----------- + float: The area of the circle, calculated as π * radius^2. + """ + import math + return math.pi * radius ** 2 +``` diff --git a/src/fmuloader/fmi2.py b/src/fmuloader/fmi2.py new file mode 100644 index 0000000..a970692 --- /dev/null +++ b/src/fmuloader/fmi2.py @@ -0,0 +1,1205 @@ +""" +Python ctypes bindings for the FMI 2.0 standard. + +This module provides complete Python bindings for loading and interacting with +FMI 2.0 Functional Mock-up Units (FMUs), supporting both Co-Simulation and +Model Exchange interfaces. + +Reference: FMI Specification 2.0.5 +""" + +from __future__ import annotations + +import ctypes +import platform +import sys +import tempfile +import zipfile +from ctypes import ( + CDLL, + CFUNCTYPE, + POINTER, + Structure, + byref, + c_char_p, + c_double, + c_int, + c_size_t, + c_uint, + c_void_p, +) +from enum import IntEnum +from pathlib import Path +from typing import Sequence + +# --------------------------------------------------------------------------- +# FMI 2.0 primitive types +# --------------------------------------------------------------------------- +fmi2Component = c_void_p +fmi2ComponentEnvironment = c_void_p +fmi2FMUstate = c_void_p +fmi2ValueReference = c_uint +fmi2Real = c_double +fmi2Integer = c_int +fmi2Boolean = c_int +fmi2Char = ctypes.c_char +fmi2String = c_char_p +fmi2Byte = ctypes.c_char + +fmi2True: int = 1 +fmi2False: int = 0 + + +# --------------------------------------------------------------------------- +# Enumerations +# --------------------------------------------------------------------------- +class Fmi2Status(IntEnum): + OK = 0 + WARNING = 1 + DISCARD = 2 + ERROR = 3 + FATAL = 4 + PENDING = 5 + + +class Fmi2Type(IntEnum): + MODEL_EXCHANGE = 0 + CO_SIMULATION = 1 + + +class Fmi2StatusKind(IntEnum): + DO_STEP_STATUS = 0 + PENDING_STATUS = 1 + LAST_SUCCESSFUL_TIME = 2 + TERMINATED = 3 + + +# --------------------------------------------------------------------------- +# Callback function types +# --------------------------------------------------------------------------- +# void logger(fmi2ComponentEnvironment, fmi2String instanceName, +# fmi2Status status, fmi2String category, fmi2String message, ...) +# Note: ctypes CFUNCTYPE does not support variadic arguments, so we define +# the callback with the fixed parameters only. +_fmi2CallbackLogger = CFUNCTYPE( + None, + fmi2ComponentEnvironment, + fmi2String, + c_int, + fmi2String, + fmi2String, +) + +# void* allocateMemory(size_t nobj, size_t size) +_fmi2CallbackAllocateMemory = CFUNCTYPE(c_void_p, c_size_t, c_size_t) + +# void freeMemory(void* obj) +_fmi2CallbackFreeMemory = CFUNCTYPE(None, c_void_p) + +# void stepFinished(fmi2ComponentEnvironment, fmi2Status) +_fmi2StepFinished = CFUNCTYPE(None, fmi2ComponentEnvironment, c_int) + + +class _Fmi2CallbackFunctions(Structure): + _fields_ = [ + ("logger", _fmi2CallbackLogger), + ("allocateMemory", _fmi2CallbackAllocateMemory), + ("freeMemory", _fmi2CallbackFreeMemory), + ("stepFinished", _fmi2StepFinished), + ("componentEnvironment", fmi2ComponentEnvironment), + ] + + +class Fmi2EventInfo(Structure): + _fields_ = [ + ("newDiscreteStatesNeeded", fmi2Boolean), + ("terminateSimulation", fmi2Boolean), + ("nominalsOfContinuousStatesChanged", fmi2Boolean), + ("valuesOfContinuousStatesChanged", fmi2Boolean), + ("nextEventTimeDefined", fmi2Boolean), + ("nextEventTime", fmi2Real), + ] + + +# --------------------------------------------------------------------------- +# Default callbacks +# --------------------------------------------------------------------------- +def _default_logger( + _env: object, + instance_name: bytes | None, + status: int, + category: bytes | None, + message: bytes | None, +) -> None: + name = instance_name.decode() if instance_name else "" + cat = category.decode() if category else "" + msg = message.decode() if message else "" + status_str = Fmi2Status(status).name + print(f"[{name}] [{status_str}] [{cat}] {msg}") + + +def _default_allocate(nobj: int, size: int) -> int: + return ctypes.cast(ctypes.CDLL(None).calloc(nobj, size), c_void_p).value or 0 + + +def _default_free(obj: int) -> None: + ctypes.CDLL(None).free(obj) + + +_LOGGER_FUNC = _fmi2CallbackLogger(_default_logger) +_ALLOCATE_FUNC = _fmi2CallbackAllocateMemory(_default_allocate) +_FREE_FUNC = _fmi2CallbackFreeMemory(_default_free) +_STEP_FINISHED_FUNC = _fmi2StepFinished(0) # NULL + + +def _make_callbacks( + use_memory_callbacks: bool = True, +) -> _Fmi2CallbackFunctions: + """Create the callback struct for fmi2Instantiate.""" + if use_memory_callbacks: + return _Fmi2CallbackFunctions( + logger=_LOGGER_FUNC, + allocateMemory=_ALLOCATE_FUNC, + freeMemory=_FREE_FUNC, + stepFinished=_STEP_FINISHED_FUNC, + componentEnvironment=None, + ) + # When canNotUseMemoryManagementFunctions=true, pass NULL for alloc/free + return _Fmi2CallbackFunctions( + logger=_LOGGER_FUNC, + allocateMemory=_fmi2CallbackAllocateMemory(0), + freeMemory=_fmi2CallbackFreeMemory(0), + stepFinished=_STEP_FINISHED_FUNC, + componentEnvironment=None, + ) + + +# --------------------------------------------------------------------------- +# Shared library helpers +# --------------------------------------------------------------------------- +def _shared_lib_extension() -> str: + s = platform.system() + if s == "Windows": + return ".dll" + if s == "Darwin": + return ".dylib" + return ".so" + + +def _platform_folder() -> str: + """Return the FMI 2.0 platform subfolder name, e.g. 'darwin64'. + + The FMI 2.0 standard defines: ``win32``, ``win64``, ``linux32``, + ``linux64``, ``darwin32``, ``darwin64``. ARM architectures are + **not** covered by the standard. Use the ``binary_dir`` parameter + of :class:`Fmi2Slave` to specify a custom subfolder name when + working with non-standard platforms (e.g. ``"aarch64-darwin"``). + """ + s = platform.system() + if s == "Darwin": + return "darwin64" + if s == "Linux": + if sys.maxsize > 2**32: + return "linux64" + return "linux32" + if s == "Windows": + if sys.maxsize > 2**32: + return "win64" + return "win32" + raise RuntimeError(f"Unsupported platform: {s}") + + +def _find_binary( + binaries_dir: Path, + model_identifier: str, + binary_dir: str | None = None, +) -> Path: + """Locate the shared library in the binaries/ directory. + + Args: + binaries_dir: The ``binaries/`` directory inside an extracted FMU. + model_identifier: The model identifier (shared lib name without + extension). + binary_dir: Optional override for the platform subfolder name. + Use this for non-standard platforms such as ``"aarch64-darwin"``. + When *None*, the standard FMI 2.0 folder name is used. + """ + ext = _shared_lib_extension() + lib_name = model_identifier + ext + + # Try the user-provided or standard folder + folder = binary_dir if binary_dir is not None else _platform_folder() + candidate = binaries_dir / folder / lib_name + if candidate.exists(): + return candidate + + # Fallback: scan all sub-directories + for found in binaries_dir.rglob(lib_name): + return found + + raise FileNotFoundError( + f"Cannot find shared library {lib_name!r} under {binaries_dir}" + ) + + +def _path_to_file_uri(p: Path) -> str: + """Convert an absolute path to a file:/// URI.""" + resolved = p.resolve() + # On Windows the drive letter must be handled + if platform.system() == "Windows": + uri_path = "/" + str(resolved).replace("\\", "/") + else: + uri_path = str(resolved) + return "file://" + uri_path + + +# --------------------------------------------------------------------------- +# FMI 2.0 error checking +# --------------------------------------------------------------------------- +class Fmi2Error(Exception): + """Raised when an FMI 2.0 function returns an error status.""" + + def __init__(self, func_name: str, status: Fmi2Status) -> None: + self.func_name = func_name + self.status = status + super().__init__(f"{func_name} returned {status.name} ({status.value})") + + +def _check_status(func_name: str, status: int) -> Fmi2Status: + s = Fmi2Status(status) + if s in (Fmi2Status.ERROR, Fmi2Status.FATAL): + raise Fmi2Error(func_name, s) + return s + + +# --------------------------------------------------------------------------- +# FMI2 Slave / Instance wrapper +# --------------------------------------------------------------------------- +class Fmi2Slave: + """Low-level wrapper around an FMI 2.0 shared library instance. + + This class binds all FMI 2.0 C functions via ctypes and provides + thin Python methods that handle type conversions automatically. + + Args: + path: Path to an ``.fmu`` archive **or** an already-extracted FMU + directory that contains ``binaries/``. + model_identifier: The model identifier – i.e. the shared-library + file name without extension (e.g. ``"BouncingBall"``). + binary_dir: Override for the platform subfolder inside + ``binaries/``. The FMI 2.0 standard only defines + ``win32``, ``win64``, ``linux32``, ``linux64``, ``darwin32`` + and ``darwin64``. For non-standard platforms such as + Apple Silicon you can pass e.g. ``"aarch64-darwin"`` + to resolve ``binaries/aarch64-darwin/.dylib``. + When *None* the standard folder for the current platform is + tried first, then all subdirectories are scanned as fallback. + unpack_dir: Where to extract the ``.fmu`` archive. If *None* a + temporary directory is used (cleaned up on context-manager + exit or garbage collection). + + Typical Co-Simulation usage:: + + slave = Fmi2Slave("BouncingBall", model_identifier="BouncingBall") + + slave.instantiate("inst1", Fmi2Type.CO_SIMULATION, guid="{...}") + slave.setup_experiment(start_time=0.0, stop_time=10.0) + slave.enter_initialization_mode() + slave.exit_initialization_mode() + + for t in ...: + slave.do_step(t, step_size) + values = slave.get_real([vr1, vr2]) + + slave.terminate() + slave.free_instance() + + Typical Model Exchange usage:: + + slave = Fmi2Slave("BouncingBall", model_identifier="BouncingBall") + + slave.instantiate("inst1", Fmi2Type.MODEL_EXCHANGE, guid="{...}") + slave.setup_experiment(start_time=0.0) + slave.enter_initialization_mode() + slave.exit_initialization_mode() + + # Initial event iteration + event_info = slave.new_discrete_states() + while event_info.newDiscreteStatesNeeded: + event_info = slave.new_discrete_states() + + slave.enter_continuous_time_mode() + + # Integration loop + slave.set_time(t) + derivs = slave.get_derivatives(nx) + ... + + slave.terminate() + slave.free_instance() + """ + + def __init__( + self, + path: str | Path, + *, + model_identifier: str, + binary_dir: str | None = None, + unpack_dir: str | Path | None = None, + ) -> None: + self._path = Path(path) + self._tmpdir: tempfile.TemporaryDirectory[str] | None = None + self._component: c_void_p | None = None + self._dll: CDLL | None = None + self._callbacks: _Fmi2CallbackFunctions | None = None + + # Determine whether we got an .fmu archive or an extracted directory + if self._path.suffix == ".fmu": + if unpack_dir is not None: + self._extract_dir = Path(unpack_dir) + self._extract_dir.mkdir(parents=True, exist_ok=True) + else: + self._tmpdir = tempfile.TemporaryDirectory(prefix="fmuloader_") + self._extract_dir = Path(self._tmpdir.name) + with zipfile.ZipFile(self._path) as zf: + zf.extractall(self._extract_dir) + else: + # Assume it's already an extracted directory + self._extract_dir = self._path + + self._model_identifier = model_identifier + + # Load shared library + binaries_dir = self._extract_dir / "binaries" + lib_path = _find_binary(binaries_dir, model_identifier, binary_dir) + self._dll = CDLL(str(lib_path)) + + # Bind all FMI 2.0 functions + self._bind_functions() + + # ------------------------------------------------------------------ + # Function binding + # ------------------------------------------------------------------ + def _bind_functions(self) -> None: + """Bind all FMI 2.0 C functions from the shared library.""" + dll = self._dll + assert dll is not None + + # ---- Common functions ---- + self._fmi2GetTypesPlatform = dll.fmi2GetTypesPlatform + self._fmi2GetTypesPlatform.restype = c_char_p + self._fmi2GetTypesPlatform.argtypes = [] + + self._fmi2GetVersion = dll.fmi2GetVersion + self._fmi2GetVersion.restype = c_char_p + self._fmi2GetVersion.argtypes = [] + + self._fmi2SetDebugLogging = dll.fmi2SetDebugLogging + self._fmi2SetDebugLogging.restype = c_int + self._fmi2SetDebugLogging.argtypes = [ + fmi2Component, + fmi2Boolean, + c_size_t, + POINTER(fmi2String), + ] + + self._fmi2Instantiate = dll.fmi2Instantiate + self._fmi2Instantiate.restype = fmi2Component + self._fmi2Instantiate.argtypes = [ + fmi2String, # instanceName + c_int, # fmuType + fmi2String, # fmuGUID + fmi2String, # fmuResourceLocation + POINTER(_Fmi2CallbackFunctions), + fmi2Boolean, # visible + fmi2Boolean, # loggingOn + ] + + self._fmi2FreeInstance = dll.fmi2FreeInstance + self._fmi2FreeInstance.restype = None + self._fmi2FreeInstance.argtypes = [fmi2Component] + + self._fmi2SetupExperiment = dll.fmi2SetupExperiment + self._fmi2SetupExperiment.restype = c_int + self._fmi2SetupExperiment.argtypes = [ + fmi2Component, + fmi2Boolean, # toleranceDefined + fmi2Real, # tolerance + fmi2Real, # startTime + fmi2Boolean, # stopTimeDefined + fmi2Real, # stopTime + ] + + self._fmi2EnterInitializationMode = dll.fmi2EnterInitializationMode + self._fmi2EnterInitializationMode.restype = c_int + self._fmi2EnterInitializationMode.argtypes = [fmi2Component] + + self._fmi2ExitInitializationMode = dll.fmi2ExitInitializationMode + self._fmi2ExitInitializationMode.restype = c_int + self._fmi2ExitInitializationMode.argtypes = [fmi2Component] + + self._fmi2Terminate = dll.fmi2Terminate + self._fmi2Terminate.restype = c_int + self._fmi2Terminate.argtypes = [fmi2Component] + + self._fmi2Reset = dll.fmi2Reset + self._fmi2Reset.restype = c_int + self._fmi2Reset.argtypes = [fmi2Component] + + # ---- Getters / Setters ---- + self._fmi2GetReal = dll.fmi2GetReal + self._fmi2GetReal.restype = c_int + self._fmi2GetReal.argtypes = [ + fmi2Component, + POINTER(fmi2ValueReference), + c_size_t, + POINTER(fmi2Real), + ] + + self._fmi2GetInteger = dll.fmi2GetInteger + self._fmi2GetInteger.restype = c_int + self._fmi2GetInteger.argtypes = [ + fmi2Component, + POINTER(fmi2ValueReference), + c_size_t, + POINTER(fmi2Integer), + ] + + self._fmi2GetBoolean = dll.fmi2GetBoolean + self._fmi2GetBoolean.restype = c_int + self._fmi2GetBoolean.argtypes = [ + fmi2Component, + POINTER(fmi2ValueReference), + c_size_t, + POINTER(fmi2Boolean), + ] + + self._fmi2GetString = dll.fmi2GetString + self._fmi2GetString.restype = c_int + self._fmi2GetString.argtypes = [ + fmi2Component, + POINTER(fmi2ValueReference), + c_size_t, + POINTER(fmi2String), + ] + + self._fmi2SetReal = dll.fmi2SetReal + self._fmi2SetReal.restype = c_int + self._fmi2SetReal.argtypes = [ + fmi2Component, + POINTER(fmi2ValueReference), + c_size_t, + POINTER(fmi2Real), + ] + + self._fmi2SetInteger = dll.fmi2SetInteger + self._fmi2SetInteger.restype = c_int + self._fmi2SetInteger.argtypes = [ + fmi2Component, + POINTER(fmi2ValueReference), + c_size_t, + POINTER(fmi2Integer), + ] + + self._fmi2SetBoolean = dll.fmi2SetBoolean + self._fmi2SetBoolean.restype = c_int + self._fmi2SetBoolean.argtypes = [ + fmi2Component, + POINTER(fmi2ValueReference), + c_size_t, + POINTER(fmi2Boolean), + ] + + self._fmi2SetString = dll.fmi2SetString + self._fmi2SetString.restype = c_int + self._fmi2SetString.argtypes = [ + fmi2Component, + POINTER(fmi2ValueReference), + c_size_t, + POINTER(fmi2String), + ] + + # ---- FMU state ---- + self._fmi2GetFMUstate = dll.fmi2GetFMUstate + self._fmi2GetFMUstate.restype = c_int + self._fmi2GetFMUstate.argtypes = [ + fmi2Component, + POINTER(fmi2FMUstate), + ] + + self._fmi2SetFMUstate = dll.fmi2SetFMUstate + self._fmi2SetFMUstate.restype = c_int + self._fmi2SetFMUstate.argtypes = [fmi2Component, fmi2FMUstate] + + self._fmi2FreeFMUstate = dll.fmi2FreeFMUstate + self._fmi2FreeFMUstate.restype = c_int + self._fmi2FreeFMUstate.argtypes = [ + fmi2Component, + POINTER(fmi2FMUstate), + ] + + self._fmi2SerializedFMUstateSize = dll.fmi2SerializedFMUstateSize + self._fmi2SerializedFMUstateSize.restype = c_int + self._fmi2SerializedFMUstateSize.argtypes = [ + fmi2Component, + fmi2FMUstate, + POINTER(c_size_t), + ] + + self._fmi2SerializeFMUstate = dll.fmi2SerializeFMUstate + self._fmi2SerializeFMUstate.restype = c_int + self._fmi2SerializeFMUstate.argtypes = [ + fmi2Component, + fmi2FMUstate, + POINTER(fmi2Byte), + c_size_t, + ] + + self._fmi2DeSerializeFMUstate = dll.fmi2DeSerializeFMUstate + self._fmi2DeSerializeFMUstate.restype = c_int + self._fmi2DeSerializeFMUstate.argtypes = [ + fmi2Component, + POINTER(fmi2Byte), + c_size_t, + POINTER(fmi2FMUstate), + ] + + # ---- Directional derivatives ---- + self._fmi2GetDirectionalDerivative = dll.fmi2GetDirectionalDerivative + self._fmi2GetDirectionalDerivative.restype = c_int + self._fmi2GetDirectionalDerivative.argtypes = [ + fmi2Component, + POINTER(fmi2ValueReference), + c_size_t, + POINTER(fmi2ValueReference), + c_size_t, + POINTER(fmi2Real), + POINTER(fmi2Real), + ] + + # ---- Model Exchange functions ---- + self._fmi2EnterEventMode = dll.fmi2EnterEventMode + self._fmi2EnterEventMode.restype = c_int + self._fmi2EnterEventMode.argtypes = [fmi2Component] + + self._fmi2NewDiscreteStates = dll.fmi2NewDiscreteStates + self._fmi2NewDiscreteStates.restype = c_int + self._fmi2NewDiscreteStates.argtypes = [ + fmi2Component, + POINTER(Fmi2EventInfo), + ] + + self._fmi2EnterContinuousTimeMode = dll.fmi2EnterContinuousTimeMode + self._fmi2EnterContinuousTimeMode.restype = c_int + self._fmi2EnterContinuousTimeMode.argtypes = [fmi2Component] + + self._fmi2CompletedIntegratorStep = dll.fmi2CompletedIntegratorStep + self._fmi2CompletedIntegratorStep.restype = c_int + self._fmi2CompletedIntegratorStep.argtypes = [ + fmi2Component, + fmi2Boolean, + POINTER(fmi2Boolean), + POINTER(fmi2Boolean), + ] + + self._fmi2SetTime = dll.fmi2SetTime + self._fmi2SetTime.restype = c_int + self._fmi2SetTime.argtypes = [fmi2Component, fmi2Real] + + self._fmi2SetContinuousStates = dll.fmi2SetContinuousStates + self._fmi2SetContinuousStates.restype = c_int + self._fmi2SetContinuousStates.argtypes = [ + fmi2Component, + POINTER(fmi2Real), + c_size_t, + ] + + self._fmi2GetDerivatives = dll.fmi2GetDerivatives + self._fmi2GetDerivatives.restype = c_int + self._fmi2GetDerivatives.argtypes = [ + fmi2Component, + POINTER(fmi2Real), + c_size_t, + ] + + self._fmi2GetEventIndicators = dll.fmi2GetEventIndicators + self._fmi2GetEventIndicators.restype = c_int + self._fmi2GetEventIndicators.argtypes = [ + fmi2Component, + POINTER(fmi2Real), + c_size_t, + ] + + self._fmi2GetContinuousStates = dll.fmi2GetContinuousStates + self._fmi2GetContinuousStates.restype = c_int + self._fmi2GetContinuousStates.argtypes = [ + fmi2Component, + POINTER(fmi2Real), + c_size_t, + ] + + self._fmi2GetNominalsOfContinuousStates = dll.fmi2GetNominalsOfContinuousStates + self._fmi2GetNominalsOfContinuousStates.restype = c_int + self._fmi2GetNominalsOfContinuousStates.argtypes = [ + fmi2Component, + POINTER(fmi2Real), + c_size_t, + ] + + # ---- Co-Simulation functions ---- + self._fmi2SetRealInputDerivatives = dll.fmi2SetRealInputDerivatives + self._fmi2SetRealInputDerivatives.restype = c_int + self._fmi2SetRealInputDerivatives.argtypes = [ + fmi2Component, + POINTER(fmi2ValueReference), + c_size_t, + POINTER(fmi2Integer), + POINTER(fmi2Real), + ] + + self._fmi2GetRealOutputDerivatives = dll.fmi2GetRealOutputDerivatives + self._fmi2GetRealOutputDerivatives.restype = c_int + self._fmi2GetRealOutputDerivatives.argtypes = [ + fmi2Component, + POINTER(fmi2ValueReference), + c_size_t, + POINTER(fmi2Integer), + POINTER(fmi2Real), + ] + + self._fmi2DoStep = dll.fmi2DoStep + self._fmi2DoStep.restype = c_int + self._fmi2DoStep.argtypes = [ + fmi2Component, + fmi2Real, # currentCommunicationPoint + fmi2Real, # communicationStepSize + fmi2Boolean, # noSetFMUStatePriorToCurrentPoint + ] + + self._fmi2CancelStep = dll.fmi2CancelStep + self._fmi2CancelStep.restype = c_int + self._fmi2CancelStep.argtypes = [fmi2Component] + + self._fmi2GetStatus = dll.fmi2GetStatus + self._fmi2GetStatus.restype = c_int + self._fmi2GetStatus.argtypes = [ + fmi2Component, + c_int, + POINTER(c_int), + ] + + self._fmi2GetRealStatus = dll.fmi2GetRealStatus + self._fmi2GetRealStatus.restype = c_int + self._fmi2GetRealStatus.argtypes = [ + fmi2Component, + c_int, + POINTER(fmi2Real), + ] + + self._fmi2GetIntegerStatus = dll.fmi2GetIntegerStatus + self._fmi2GetIntegerStatus.restype = c_int + self._fmi2GetIntegerStatus.argtypes = [ + fmi2Component, + c_int, + POINTER(fmi2Integer), + ] + + self._fmi2GetBooleanStatus = dll.fmi2GetBooleanStatus + self._fmi2GetBooleanStatus.restype = c_int + self._fmi2GetBooleanStatus.argtypes = [ + fmi2Component, + c_int, + POINTER(fmi2Boolean), + ] + + self._fmi2GetStringStatus = dll.fmi2GetStringStatus + self._fmi2GetStringStatus.restype = c_int + self._fmi2GetStringStatus.argtypes = [ + fmi2Component, + c_int, + POINTER(fmi2String), + ] + + # ------------------------------------------------------------------ + # Helper to convert Python lists → ctypes arrays + # ------------------------------------------------------------------ + @staticmethod + def _vr_array(vrs: Sequence[int]) -> ctypes.Array[c_uint]: + arr_type = fmi2ValueReference * len(vrs) + return arr_type(*vrs) + + @staticmethod + def _real_array(vals: Sequence[float]) -> ctypes.Array[c_double]: + arr_type = fmi2Real * len(vals) + return arr_type(*vals) + + @staticmethod + def _int_array(vals: Sequence[int]) -> ctypes.Array[c_int]: + arr_type = fmi2Integer * len(vals) + return arr_type(*vals) + + @staticmethod + def _bool_array(vals: Sequence[bool | int]) -> ctypes.Array[c_int]: + arr_type = fmi2Boolean * len(vals) + return arr_type(*(fmi2True if v else fmi2False for v in vals)) + + @staticmethod + def _string_array(vals: Sequence[str]) -> ctypes.Array[c_char_p]: + arr_type = fmi2String * len(vals) + return arr_type(*(v.encode("utf-8") for v in vals)) + + # ------------------------------------------------------------------ + # Common functions + # ------------------------------------------------------------------ + def get_types_platform(self) -> str: + """Return the FMI types platform string.""" + return self._fmi2GetTypesPlatform().decode() + + def get_version(self) -> str: + """Return the FMI version string.""" + return self._fmi2GetVersion().decode() + + def set_debug_logging( + self, + logging_on: bool, + categories: Sequence[str] | None = None, + ) -> Fmi2Status: + cats: Sequence[str] = categories or [] + n = len(cats) + if n > 0: + arr = self._string_array(cats) + status = self._fmi2SetDebugLogging( + self._component, fmi2True if logging_on else fmi2False, n, arr + ) + else: + status = self._fmi2SetDebugLogging( + self._component, + fmi2True if logging_on else fmi2False, + 0, + None, + ) + return _check_status("fmi2SetDebugLogging", status) + + def instantiate( + self, + instance_name: str, + fmu_type: Fmi2Type, + *, + guid: str, + resource_location: str | None = None, + visible: bool = False, + logging_on: bool = False, + use_memory_callbacks: bool = True, + ) -> None: + """Instantiate the FMU. + + Args: + instance_name: Name for this FMU instance. + fmu_type: Co-Simulation or Model Exchange. + guid: The GUID from the modelDescription.xml. Must match + the GUID compiled into the FMU binary. + resource_location: ``file:///`` URI pointing to the FMU's + ``resources/`` directory. When *None* it is derived + from the extracted FMU path. + visible: Whether a simulator UI should be shown. + logging_on: Whether debug logging is initially enabled. + use_memory_callbacks: When *False* the ``allocateMemory`` + and ``freeMemory`` callback pointers are set to *NULL* + (for FMUs that declare + ``canNotUseMemoryManagementFunctions="true"``). + """ + if resource_location is None: + resources_dir = self._extract_dir / "resources" + if not resources_dir.exists(): + resources_dir = self._extract_dir + resource_location = _path_to_file_uri(resources_dir) + + self._callbacks = _make_callbacks(use_memory_callbacks=use_memory_callbacks) + + component = self._fmi2Instantiate( + instance_name.encode("utf-8"), + int(fmu_type), + guid.encode("utf-8"), + resource_location.encode("utf-8"), + byref(self._callbacks), + fmi2True if visible else fmi2False, + fmi2True if logging_on else fmi2False, + ) + if not component: + raise RuntimeError(f"fmi2Instantiate returned NULL for {instance_name!r}") + self._component = component + + def free_instance(self) -> None: + """Free the FMU instance and release resources.""" + if self._component is not None: + self._fmi2FreeInstance(self._component) + self._component = None + + def setup_experiment( + self, + start_time: float = 0.0, + stop_time: float | None = None, + tolerance: float | None = None, + ) -> Fmi2Status: + tolerance_defined = fmi2True if tolerance is not None else fmi2False + tol_val = tolerance if tolerance is not None else 0.0 + stop_defined = fmi2True if stop_time is not None else fmi2False + stop_val = stop_time if stop_time is not None else 0.0 + + status = self._fmi2SetupExperiment( + self._component, + tolerance_defined, + tol_val, + start_time, + stop_defined, + stop_val, + ) + return _check_status("fmi2SetupExperiment", status) + + def enter_initialization_mode(self) -> Fmi2Status: + status = self._fmi2EnterInitializationMode(self._component) + return _check_status("fmi2EnterInitializationMode", status) + + def exit_initialization_mode(self) -> Fmi2Status: + status = self._fmi2ExitInitializationMode(self._component) + return _check_status("fmi2ExitInitializationMode", status) + + def terminate(self) -> Fmi2Status: + status = self._fmi2Terminate(self._component) + return _check_status("fmi2Terminate", status) + + def reset(self) -> Fmi2Status: + status = self._fmi2Reset(self._component) + return _check_status("fmi2Reset", status) + + # ------------------------------------------------------------------ + # Getting variable values + # ------------------------------------------------------------------ + def get_real(self, vrs: Sequence[int]) -> list[float]: + n = len(vrs) + values = (fmi2Real * n)() + status = self._fmi2GetReal(self._component, self._vr_array(vrs), n, values) + _check_status("fmi2GetReal", status) + return list(values) + + def get_integer(self, vrs: Sequence[int]) -> list[int]: + n = len(vrs) + values = (fmi2Integer * n)() + status = self._fmi2GetInteger(self._component, self._vr_array(vrs), n, values) + _check_status("fmi2GetInteger", status) + return list(values) + + def get_boolean(self, vrs: Sequence[int]) -> list[bool]: + n = len(vrs) + values = (fmi2Boolean * n)() + status = self._fmi2GetBoolean(self._component, self._vr_array(vrs), n, values) + _check_status("fmi2GetBoolean", status) + return [bool(v) for v in values] + + def get_string(self, vrs: Sequence[int]) -> list[str]: + n = len(vrs) + values = (fmi2String * n)() + status = self._fmi2GetString(self._component, self._vr_array(vrs), n, values) + _check_status("fmi2GetString", status) + return [v.decode("utf-8") if v else "" for v in values] + + # ------------------------------------------------------------------ + # Setting variable values + # ------------------------------------------------------------------ + def set_real(self, vrs: Sequence[int], values: Sequence[float]) -> Fmi2Status: + n = len(vrs) + status = self._fmi2SetReal( + self._component, + self._vr_array(vrs), + n, + self._real_array(values), + ) + return _check_status("fmi2SetReal", status) + + def set_integer(self, vrs: Sequence[int], values: Sequence[int]) -> Fmi2Status: + n = len(vrs) + status = self._fmi2SetInteger( + self._component, + self._vr_array(vrs), + n, + self._int_array(values), + ) + return _check_status("fmi2SetInteger", status) + + def set_boolean( + self, vrs: Sequence[int], values: Sequence[bool | int] + ) -> Fmi2Status: + n = len(vrs) + status = self._fmi2SetBoolean( + self._component, + self._vr_array(vrs), + n, + self._bool_array(values), + ) + return _check_status("fmi2SetBoolean", status) + + def set_string(self, vrs: Sequence[int], values: Sequence[str]) -> Fmi2Status: + n = len(vrs) + status = self._fmi2SetString( + self._component, + self._vr_array(vrs), + n, + self._string_array(values), + ) + return _check_status("fmi2SetString", status) + + # ------------------------------------------------------------------ + # FMU State + # ------------------------------------------------------------------ + def get_fmu_state(self) -> c_void_p: + state = fmi2FMUstate() + status = self._fmi2GetFMUstate(self._component, byref(state)) + _check_status("fmi2GetFMUstate", status) + return state + + def set_fmu_state(self, state: c_void_p) -> Fmi2Status: + status = self._fmi2SetFMUstate(self._component, state) + return _check_status("fmi2SetFMUstate", status) + + def free_fmu_state(self, state: c_void_p) -> Fmi2Status: + status = self._fmi2FreeFMUstate(self._component, byref(state)) + return _check_status("fmi2FreeFMUstate", status) + + def serialized_fmu_state_size(self, state: c_void_p) -> int: + size = c_size_t() + status = self._fmi2SerializedFMUstateSize(self._component, state, byref(size)) + _check_status("fmi2SerializedFMUstateSize", status) + return size.value + + def serialize_fmu_state(self, state: c_void_p) -> bytes: + size = self.serialized_fmu_state_size(state) + buf = (fmi2Byte * size)() + status = self._fmi2SerializeFMUstate(self._component, state, buf, size) + _check_status("fmi2SerializeFMUstate", status) + return bytes(buf) + + def deserialize_fmu_state(self, data: bytes) -> c_void_p: + size = len(data) + buf = (fmi2Byte * size)(*data) + state = fmi2FMUstate() + status = self._fmi2DeSerializeFMUstate(self._component, buf, size, byref(state)) + _check_status("fmi2DeSerializeFMUstate", status) + return state + + # ------------------------------------------------------------------ + # Directional derivatives + # ------------------------------------------------------------------ + def get_directional_derivative( + self, + v_unknown_ref: Sequence[int], + v_known_ref: Sequence[int], + dv_known: Sequence[float], + ) -> list[float]: + n_unknown = len(v_unknown_ref) + n_known = len(v_known_ref) + dv_unknown = (fmi2Real * n_unknown)() + status = self._fmi2GetDirectionalDerivative( + self._component, + self._vr_array(v_unknown_ref), + n_unknown, + self._vr_array(v_known_ref), + n_known, + self._real_array(dv_known), + dv_unknown, + ) + _check_status("fmi2GetDirectionalDerivative", status) + return list(dv_unknown) + + # ------------------------------------------------------------------ + # Model Exchange functions + # ------------------------------------------------------------------ + def enter_event_mode(self) -> Fmi2Status: + status = self._fmi2EnterEventMode(self._component) + return _check_status("fmi2EnterEventMode", status) + + def new_discrete_states(self) -> Fmi2EventInfo: + event_info = Fmi2EventInfo() + status = self._fmi2NewDiscreteStates(self._component, byref(event_info)) + _check_status("fmi2NewDiscreteStates", status) + return event_info + + def enter_continuous_time_mode(self) -> Fmi2Status: + status = self._fmi2EnterContinuousTimeMode(self._component) + return _check_status("fmi2EnterContinuousTimeMode", status) + + def completed_integrator_step( + self, no_set_fmu_state_prior: bool = True + ) -> tuple[bool, bool]: + """Call fmi2CompletedIntegratorStep. + + Returns: + (enter_event_mode, terminate_simulation) booleans. + """ + enter_event = fmi2Boolean(fmi2False) + terminate = fmi2Boolean(fmi2False) + status = self._fmi2CompletedIntegratorStep( + self._component, + fmi2True if no_set_fmu_state_prior else fmi2False, + byref(enter_event), + byref(terminate), + ) + _check_status("fmi2CompletedIntegratorStep", status) + return bool(enter_event.value), bool(terminate.value) + + def set_time(self, time: float) -> Fmi2Status: + status = self._fmi2SetTime(self._component, time) + return _check_status("fmi2SetTime", status) + + def set_continuous_states(self, states: Sequence[float]) -> Fmi2Status: + nx = len(states) + status = self._fmi2SetContinuousStates( + self._component, self._real_array(states), nx + ) + return _check_status("fmi2SetContinuousStates", status) + + def get_derivatives(self, nx: int) -> list[float]: + """Get state derivatives. + + Args: + nx: Number of continuous states. + """ + derivatives = (fmi2Real * nx)() + status = self._fmi2GetDerivatives(self._component, derivatives, nx) + _check_status("fmi2GetDerivatives", status) + return list(derivatives) + + def get_event_indicators(self, ni: int) -> list[float]: + """Get event indicators. + + Args: + ni: Number of event indicators. + """ + indicators = (fmi2Real * ni)() + status = self._fmi2GetEventIndicators(self._component, indicators, ni) + _check_status("fmi2GetEventIndicators", status) + return list(indicators) + + def get_continuous_states(self, nx: int) -> list[float]: + """Get continuous state values. + + Args: + nx: Number of continuous states. + """ + states = (fmi2Real * nx)() + status = self._fmi2GetContinuousStates(self._component, states, nx) + _check_status("fmi2GetContinuousStates", status) + return list(states) + + def get_nominals_of_continuous_states(self, nx: int) -> list[float]: + """Get nominals of continuous states. + + Args: + nx: Number of continuous states. + """ + nominals = (fmi2Real * nx)() + status = self._fmi2GetNominalsOfContinuousStates(self._component, nominals, nx) + _check_status("fmi2GetNominalsOfContinuousStates", status) + return list(nominals) + + # ------------------------------------------------------------------ + # Co-Simulation functions + # ------------------------------------------------------------------ + def do_step( + self, + current_communication_point: float, + communication_step_size: float, + no_set_fmu_state_prior: bool = True, + ) -> Fmi2Status: + status = self._fmi2DoStep( + self._component, + current_communication_point, + communication_step_size, + fmi2True if no_set_fmu_state_prior else fmi2False, + ) + return _check_status("fmi2DoStep", status) + + def cancel_step(self) -> Fmi2Status: + status = self._fmi2CancelStep(self._component) + return _check_status("fmi2CancelStep", status) + + def set_real_input_derivatives( + self, + vrs: Sequence[int], + orders: Sequence[int], + values: Sequence[float], + ) -> Fmi2Status: + n = len(vrs) + status = self._fmi2SetRealInputDerivatives( + self._component, + self._vr_array(vrs), + n, + self._int_array(orders), + self._real_array(values), + ) + return _check_status("fmi2SetRealInputDerivatives", status) + + def get_real_output_derivatives( + self, + vrs: Sequence[int], + orders: Sequence[int], + ) -> list[float]: + n = len(vrs) + values = (fmi2Real * n)() + status = self._fmi2GetRealOutputDerivatives( + self._component, + self._vr_array(vrs), + n, + self._int_array(orders), + values, + ) + _check_status("fmi2GetRealOutputDerivatives", status) + return list(values) + + def get_status(self, kind: Fmi2StatusKind) -> Fmi2Status: + value = c_int() + status = self._fmi2GetStatus(self._component, int(kind), byref(value)) + _check_status("fmi2GetStatus", status) + return Fmi2Status(value.value) + + def get_real_status(self, kind: Fmi2StatusKind) -> float: + value = fmi2Real() + status = self._fmi2GetRealStatus(self._component, int(kind), byref(value)) + _check_status("fmi2GetRealStatus", status) + return value.value + + def get_integer_status(self, kind: Fmi2StatusKind) -> int: + value = fmi2Integer() + status = self._fmi2GetIntegerStatus(self._component, int(kind), byref(value)) + _check_status("fmi2GetIntegerStatus", status) + return value.value + + def get_boolean_status(self, kind: Fmi2StatusKind) -> bool: + value = fmi2Boolean() + status = self._fmi2GetBooleanStatus(self._component, int(kind), byref(value)) + _check_status("fmi2GetBooleanStatus", status) + return bool(value.value) + + def get_string_status(self, kind: Fmi2StatusKind) -> str: + value = fmi2String() + status = self._fmi2GetStringStatus(self._component, int(kind), byref(value)) + _check_status("fmi2GetStringStatus", status) + return value.value.decode("utf-8") if value.value else "" + + # ------------------------------------------------------------------ + # Context manager support + # ------------------------------------------------------------------ + def __enter__(self) -> Fmi2Slave: + return self + + def __exit__(self, *_: object) -> None: + self.free_instance() + if self._tmpdir is not None: + self._tmpdir.cleanup() + self._tmpdir = None + + def __del__(self) -> None: + if self._tmpdir is not None: + try: + self._tmpdir.cleanup() + except Exception: + pass diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..d54eaf0 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,26 @@ +import zipfile + +import httpx +import pytest + +REFERENCE_FMUS_URL = "https://github.com/modelica/Reference-FMUs/releases/download/v0.0.39/Reference-FMUs-0.0.39.zip" + + +@pytest.fixture(scope="session") +def reference_fmus_dir(tmp_path_factory): + """Download and extract Reference-FMUs once per test session.""" + tmpdir = tmp_path_factory.mktemp("reference_fmus") + + # Download the reference FMU zip file + response = httpx.get(REFERENCE_FMUS_URL, follow_redirects=True) + response.raise_for_status() + + zip_path = tmpdir / "Reference-FMUs.zip" + with open(zip_path, "wb") as f: + f.write(response.content) + + # Extract the zip file + with zipfile.ZipFile(zip_path, "r") as zip_ref: + zip_ref.extractall(tmpdir) + + return tmpdir diff --git a/tests/test_fmi2.py b/tests/test_fmi2.py index 348a3a2..c5daa6a 100644 --- a/tests/test_fmi2.py +++ b/tests/test_fmi2.py @@ -1,2 +1,847 @@ -def test_fmi2(): - assert True +import platform +import pathlib +import pytest +from fmuloader.fmi2 import ( + Fmi2Slave, + Fmi2Status, + Fmi2Type, + Fmi2Error, + _platform_folder, + _shared_lib_extension, + _path_to_file_uri, +) + + +# Skip tests on arm64 platform (Apple Silicon) +skip_arm64 = pytest.mark.skipif( + platform.machine() == "arm64", + reason="Skipping on ARM64 platform - binaries may not be available", +) + + +@skip_arm64 +@pytest.mark.parametrize( + "reference_fmu,model_name,guid", + [ + ( + "2.0/BouncingBall.fmu", + "BouncingBall", + "{1AE5E10D-9521-4DE3-80B9-D0EAAA7D5AF1}", + ), + ("2.0/VanDerPol.fmu", "VanDerPol", "{BD403596-3166-4232-ABC2-132BDF73E644}"), + ("2.0/Dahlquist.fmu", "Dahlquist", "{221063D2-EF4A-45FE-B954-B5BFEEA9A59B}"), + ("2.0/Stair.fmu", "Stair", "{BD403596-3166-4232-ABC2-132BDF73E644}"), + ("2.0/Resource.fmu", "Resource", "{7b9c2114-2ce5-4076-a138-2cbc69e069e5}"), + ( + "2.0/Feedthrough.fmu", + "Feedthrough", + "{37B954F1-CC86-4D8F-B97F-C7C36F6670D2}", + ), + ], +) +def test_fmi2_slave_basic(reference_fmu, model_name, guid, reference_fmus_dir): + """Test basic FMU loading and version check.""" + filename = (reference_fmus_dir / reference_fmu).absolute() + + slave = Fmi2Slave(filename, model_identifier=model_name) + + assert slave.get_version() == "2.0" + assert slave.get_types_platform() in ["default", "standard32", "standard64"] + + # Test instantiation with correct GUID + slave.instantiate( + "test_instance", + Fmi2Type.CO_SIMULATION, + guid=guid, + ) + assert slave._component is not None + slave.free_instance() + + +@skip_arm64 +def test_fmi2_instantiate_co_simulation(reference_fmus_dir): + """Test instantiation of Co-Simulation FMU.""" + filename = (reference_fmus_dir / "2.0/BouncingBall.fmu").absolute() + + slave = Fmi2Slave(filename, model_identifier="BouncingBall") + + # Instantiate as Co-Simulation + slave.instantiate( + "test_instance", + Fmi2Type.CO_SIMULATION, + guid="{1AE5E10D-9521-4DE3-80B9-D0EAAA7D5AF1}", + visible=False, + logging_on=True, + ) + + assert slave._component is not None + + slave.free_instance() + assert slave._component is None + + +@skip_arm64 +def test_fmi2_dahlquist_simulation(reference_fmus_dir): + """Test Dahlquist test equation FMU - a simple ODE: dx/dt = k*x.""" + filename = (reference_fmus_dir / "2.0/Dahlquist.fmu").absolute() + + slave = Fmi2Slave(filename, model_identifier="Dahlquist") + slave.instantiate( + "dahlquist_instance", + Fmi2Type.CO_SIMULATION, + guid="{221063D2-EF4A-45FE-B954-B5BFEEA9A59B}", + ) + + slave.setup_experiment(start_time=0.0, stop_time=10.0) + slave.enter_initialization_mode() + + # Dahlquist has x (VR 1) with start value 1.0 and k (VR 3) with start value 1.0 + vr_x = 1 + vr_k = 3 + + # Get initial values + x_values = slave.get_real([vr_x]) + k_values = slave.get_real([vr_k]) + assert len(x_values) == 1 + assert x_values[0] == 1.0 # Initial value + assert k_values[0] == 1.0 # Parameter value + + slave.exit_initialization_mode() + + # Perform simulation steps + time = 0.0 + step_size = 0.1 + for _ in range(10): + slave.do_step(time, step_size) + time += step_size + + # After simulation, x should have changed + x_final = slave.get_real([vr_x]) + assert x_final[0] != 1.0 # Value should have changed due to dynamics + + slave.terminate() + slave.free_instance() + + +@skip_arm64 +def test_fmi2_feedthrough_all_types(reference_fmus_dir): + """Test Feedthrough FMU with all variable types (Real, Integer, Boolean, String).""" + filename = (reference_fmus_dir / "2.0/Feedthrough.fmu").absolute() + + slave = Fmi2Slave(filename, model_identifier="Feedthrough") + slave.instantiate( + "feedthrough_instance", + Fmi2Type.CO_SIMULATION, + guid="{37B954F1-CC86-4D8F-B97F-C7C36F6670D2}", + ) + + slave.setup_experiment(start_time=0.0, stop_time=2.0) + slave.enter_initialization_mode() + + # Test Real variables + vr_float_input = 7 + vr_float_output = 8 + slave.set_real([vr_float_input], [3.14]) + + # Test Integer variables + vr_int_input = 19 + vr_int_output = 20 + slave.set_integer([vr_int_input], [42]) + + # Test Boolean variables + vr_bool_input = 27 + vr_bool_output = 28 + slave.set_boolean([vr_bool_input], [True]) + + # Test String variables + vr_string_input = 29 + vr_string_output = 30 + slave.set_string([vr_string_input], ["Test String"]) + + slave.exit_initialization_mode() + + # Perform one step + slave.do_step(0.0, 0.1) + + # Verify outputs (Feedthrough passes inputs to outputs) + float_out = slave.get_real([vr_float_output]) + assert abs(float_out[0] - 3.14) < 0.01 + + int_out = slave.get_integer([vr_int_output]) + assert int_out[0] == 42 + + bool_out = slave.get_boolean([vr_bool_output]) + assert bool_out[0] is True + + string_out = slave.get_string([vr_string_output]) + assert string_out[0] == "Test String" + + slave.terminate() + slave.free_instance() + + +@skip_arm64 +def test_fmi2_stair_discrete_events(reference_fmus_dir): + """Test Stair FMU which generates discrete time events.""" + filename = (reference_fmus_dir / "2.0/Stair.fmu").absolute() + + slave = Fmi2Slave(filename, model_identifier="Stair") + slave.instantiate( + "stair_instance", + Fmi2Type.CO_SIMULATION, + guid="{BD403596-3166-4232-ABC2-132BDF73E644}", + ) + + slave.setup_experiment(start_time=0.0, stop_time=10.0) + slave.enter_initialization_mode() + + # Stair has 'counter' variable (VR 1) that counts seconds + vr_counter = 1 + + # Get initial value + initial_counter = slave.get_integer([vr_counter]) + assert initial_counter[0] == 1 # Start value is 1 + + slave.exit_initialization_mode() + + # Simulate for a few seconds - counter should increment + time = 0.0 + step_size = 0.2 # Default step size from modelDescription + + for i in range(5): + slave.do_step(time, step_size) + time += step_size + counter_value = slave.get_integer([vr_counter]) + # Counter increments every second and should be between 1 and 10 + assert counter_value[0] >= 1 and counter_value[0] <= 10 + + slave.terminate() + slave.free_instance() + + +@skip_arm64 +def test_fmi2_resource_file_loading(reference_fmus_dir): + """Test Resource FMU which loads data from resource files.""" + filename = (reference_fmus_dir / "2.0/Resource.fmu").absolute() + + slave = Fmi2Slave(filename, model_identifier="Resource") + slave.instantiate( + "resource_instance", + Fmi2Type.CO_SIMULATION, + guid="{7b9c2114-2ce5-4076-a138-2cbc69e069e5}", + ) + + slave.setup_experiment(start_time=0.0, stop_time=1.0) + slave.enter_initialization_mode() + slave.exit_initialization_mode() + + # Resource FMU has 'y' variable (VR 1) that reads from resources/y.txt + vr_y = 1 + + # Get the value (should be the ASCII code of first character in y.txt) + y_value = slave.get_integer([vr_y]) + assert len(y_value) == 1 + assert isinstance(y_value[0], int) + # The value should be a valid ASCII code + assert 0 <= y_value[0] <= 127 + + slave.terminate() + slave.free_instance() + + +@skip_arm64 +def test_fmi2_setup_and_initialization(reference_fmus_dir): + """Test FMU setup and initialization sequence.""" + filename = (reference_fmus_dir / "2.0/BouncingBall.fmu").absolute() + + slave = Fmi2Slave(filename, model_identifier="BouncingBall") + slave.instantiate( + "test_instance", + Fmi2Type.CO_SIMULATION, + guid="{1AE5E10D-9521-4DE3-80B9-D0EAAA7D5AF1}", + ) + + # Setup experiment + status = slave.setup_experiment(start_time=0.0, stop_time=10.0, tolerance=1e-6) + assert status == Fmi2Status.OK + + # Enter initialization mode + status = slave.enter_initialization_mode() + assert status == Fmi2Status.OK + + # Exit initialization mode + status = slave.exit_initialization_mode() + assert status == Fmi2Status.OK + + # Terminate + status = slave.terminate() + assert status == Fmi2Status.OK + + slave.free_instance() + + +@skip_arm64 +def test_fmi2_do_step(reference_fmus_dir): + """Test Co-Simulation do_step functionality.""" + filename = (reference_fmus_dir / "2.0/BouncingBall.fmu").absolute() + + slave = Fmi2Slave(filename, model_identifier="BouncingBall") + slave.instantiate( + "test_instance", + Fmi2Type.CO_SIMULATION, + guid="{1AE5E10D-9521-4DE3-80B9-D0EAAA7D5AF1}", + ) + + slave.setup_experiment(start_time=0.0, stop_time=10.0) + slave.enter_initialization_mode() + slave.exit_initialization_mode() + + # Perform simulation steps + time = 0.0 + step_size = 0.1 + for _ in range(10): + status = slave.do_step(time, step_size) + assert status == Fmi2Status.OK + time += step_size + + slave.terminate() + slave.free_instance() + + +@skip_arm64 +def test_fmi2_get_set_real(reference_fmus_dir): + """Test getting and setting real values.""" + filename = (reference_fmus_dir / "2.0/BouncingBall.fmu").absolute() + + slave = Fmi2Slave(filename, model_identifier="BouncingBall") + slave.instantiate( + "test_instance", + Fmi2Type.CO_SIMULATION, + guid="{1AE5E10D-9521-4DE3-80B9-D0EAAA7D5AF1}", + ) + + slave.setup_experiment(start_time=0.0, stop_time=10.0) + slave.enter_initialization_mode() + + # BouncingBall has 'h' (height) as variable with value reference 1 + # and 'v' (velocity) with value reference 3 + vr_h = 1 + vr_v = 3 + + # Get initial values + values = slave.get_real([vr_h, vr_v]) + assert len(values) == 2 + assert isinstance(values[0], float) + assert isinstance(values[1], float) + + # Set new values + slave.set_real([vr_h], [5.0]) + + # Verify the set value + new_values = slave.get_real([vr_h]) + assert len(new_values) == 1 + # Note: The value might not be exactly 5.0 due to FMU constraints + + slave.exit_initialization_mode() + slave.terminate() + slave.free_instance() + + +@skip_arm64 +def test_fmi2_get_set_integer(reference_fmus_dir): + """Test getting and setting integer values.""" + filename = (reference_fmus_dir / "2.0/Stair.fmu").absolute() + + slave = Fmi2Slave(filename, model_identifier="Stair") + slave.instantiate( + "test_instance", + Fmi2Type.CO_SIMULATION, + guid="{BD403596-3166-4232-ABC2-132BDF73E644}", + ) + + slave.setup_experiment(start_time=0.0, stop_time=10.0) + slave.enter_initialization_mode() + + # Stair has 'counter' as integer variable with value reference 1 + vr_counter = 1 + + # Get initial value + values = slave.get_integer([vr_counter]) + assert len(values) == 1 + assert isinstance(values[0], int) + + slave.exit_initialization_mode() + slave.terminate() + slave.free_instance() + + +@skip_arm64 +def test_fmi2_get_set_boolean(reference_fmus_dir): + """Test getting and setting boolean values.""" + filename = (reference_fmus_dir / "2.0/BouncingBall.fmu").absolute() + + slave = Fmi2Slave(filename, model_identifier="BouncingBall") + slave.instantiate( + "test_instance", + Fmi2Type.CO_SIMULATION, + guid="{1AE5E10D-9521-4DE3-80B9-D0EAAA7D5AF1}", + ) + + slave.setup_experiment(start_time=0.0, stop_time=10.0) + slave.enter_initialization_mode() + slave.exit_initialization_mode() + + # Perform a step to get meaningful boolean values + slave.do_step(0.0, 0.1) + + # BouncingBall may have boolean state variables + # Testing the boolean get/set methods with a valid VR + # Note: Actual VRs depend on the FMU implementation + + slave.terminate() + slave.free_instance() + + +@skip_arm64 +def test_fmi2_reset(reference_fmus_dir): + """Test FMU reset functionality.""" + filename = (reference_fmus_dir / "2.0/BouncingBall.fmu").absolute() + + slave = Fmi2Slave(filename, model_identifier="BouncingBall") + slave.instantiate( + "test_instance", + Fmi2Type.CO_SIMULATION, + guid="{1AE5E10D-9521-4DE3-80B9-D0EAAA7D5AF1}", + ) + + slave.setup_experiment(start_time=0.0, stop_time=10.0) + slave.enter_initialization_mode() + slave.exit_initialization_mode() + + # Perform some steps + slave.do_step(0.0, 0.1) + slave.do_step(0.1, 0.1) + + # Reset the FMU + status = slave.reset() + assert status == Fmi2Status.OK + + slave.free_instance() + + +@skip_arm64 +def test_fmi2_set_debug_logging(reference_fmus_dir): + """Test setting debug logging.""" + filename = (reference_fmus_dir / "2.0/BouncingBall.fmu").absolute() + + slave = Fmi2Slave(filename, model_identifier="BouncingBall") + slave.instantiate( + "test_instance", + Fmi2Type.CO_SIMULATION, + guid="{1AE5E10D-9521-4DE3-80B9-D0EAAA7D5AF1}", + ) + + # Enable logging with valid categories from modelDescription + status = slave.set_debug_logging(True, categories=["logEvents"]) + assert status == Fmi2Status.OK + + # Disable logging + status = slave.set_debug_logging(False) + assert status == Fmi2Status.OK + + slave.free_instance() + + +@skip_arm64 +def test_fmi2_context_manager(reference_fmus_dir): + """Test using Fmi2Slave as a context manager.""" + filename = (reference_fmus_dir / "2.0/BouncingBall.fmu").absolute() + + with Fmi2Slave(filename, model_identifier="BouncingBall") as slave: + assert slave.get_version() == "2.0" + slave.instantiate( + "test_instance", + Fmi2Type.CO_SIMULATION, + guid="{1AE5E10D-9521-4DE3-80B9-D0EAAA7D5AF1}", + ) + assert slave._component is not None + + # After context exit, component should be freed + + +@skip_arm64 +def test_fmi2_error_handling(reference_fmus_dir): + """Test error handling for invalid operations.""" + filename = (reference_fmus_dir / "2.0/BouncingBall.fmu").absolute() + + slave = Fmi2Slave(filename, model_identifier="BouncingBall") + + # Try to instantiate with wrong GUID - should raise error or return NULL + try: + slave.instantiate( + "test_instance", + Fmi2Type.CO_SIMULATION, + guid="{00000000-0000-0000-0000-000000000000}", + ) + # If instantiation succeeds despite wrong GUID, clean up + if slave._component is not None: + slave.free_instance() + except (RuntimeError, Fmi2Error): + # Expected behavior for wrong GUID + pass + + +@skip_arm64 +def test_fmi2_model_exchange_functions(reference_fmus_dir): + """Test Model Exchange specific functions.""" + filename = (reference_fmus_dir / "2.0/VanDerPol.fmu").absolute() + + slave = Fmi2Slave(filename, model_identifier="VanDerPol") + slave.instantiate( + "test_instance", + Fmi2Type.MODEL_EXCHANGE, + guid="{BD403596-3166-4232-ABC2-132BDF73E644}", + ) + + slave.setup_experiment(start_time=0.0) + slave.enter_initialization_mode() + slave.exit_initialization_mode() + + # Initial event iteration + event_info = slave.new_discrete_states() + assert hasattr(event_info, "newDiscreteStatesNeeded") + + # Enter continuous time mode + status = slave.enter_continuous_time_mode() + assert status == Fmi2Status.OK + + # Set time + status = slave.set_time(0.1) + assert status == Fmi2Status.OK + + # Get derivatives (VanDerPol has 2 continuous states) + derivs = slave.get_derivatives(2) + assert len(derivs) == 2 + assert all(isinstance(d, float) for d in derivs) + + # Get continuous states + states = slave.get_continuous_states(2) + assert len(states) == 2 + + # Get nominals + nominals = slave.get_nominals_of_continuous_states(2) + assert len(nominals) == 2 + + slave.terminate() + slave.free_instance() + + +@skip_arm64 +def test_fmi2_dahlquist_model_exchange(reference_fmus_dir): + """Test Dahlquist FMU in Model Exchange mode.""" + filename = (reference_fmus_dir / "2.0/Dahlquist.fmu").absolute() + + slave = Fmi2Slave(filename, model_identifier="Dahlquist") + slave.instantiate( + "dahlquist_me", + Fmi2Type.MODEL_EXCHANGE, + guid="{221063D2-EF4A-45FE-B954-B5BFEEA9A59B}", + ) + + slave.setup_experiment(start_time=0.0) + slave.enter_initialization_mode() + slave.exit_initialization_mode() + + # Event iteration + event_info = slave.new_discrete_states() + while event_info.newDiscreteStatesNeeded: + event_info = slave.new_discrete_states() + + # Enter continuous time mode + slave.enter_continuous_time_mode() + + # Get number of states (Dahlquist has 1 continuous state) + nx = 1 + + # Get initial state + states = slave.get_continuous_states(nx) + assert len(states) == nx + assert states[0] == 1.0 # Initial value of x + + # Get parameter k to compute expected derivative + vr_k = 3 + k_value = slave.get_real([vr_k])[0] + + # Set time and get derivatives + slave.set_time(0.0) + derivs = slave.get_derivatives(nx) + assert len(derivs) == nx + # For Dahlquist: dx/dt = k*x, so derivative should be k*x = k*1.0 = k + # The actual implementation may use dx/dt = -k*x, so check for both + expected_deriv = k_value * states[0] + assert ( + abs(derivs[0] - expected_deriv) < 0.01 or abs(derivs[0] + expected_deriv) < 0.01 + ) + + # Simulate one step with explicit Euler + dt = 0.1 + new_state = [states[0] + dt * derivs[0]] + slave.set_continuous_states(new_state) + slave.set_time(dt) + + # Check integrator step completion + enter_event_mode, terminate = slave.completed_integrator_step() + assert isinstance(enter_event_mode, bool) + assert isinstance(terminate, bool) + + slave.terminate() + slave.free_instance() + + +@skip_arm64 +def test_fmi2_fmu_state_management(reference_fmus_dir): + """Test FMU state get/set/free functionality.""" + filename = (reference_fmus_dir / "2.0/Dahlquist.fmu").absolute() + + slave = Fmi2Slave(filename, model_identifier="Dahlquist") + slave.instantiate( + "dahlquist_state", + Fmi2Type.CO_SIMULATION, + guid="{221063D2-EF4A-45FE-B954-B5BFEEA9A59B}", + ) + + slave.setup_experiment(start_time=0.0, stop_time=10.0) + slave.enter_initialization_mode() + slave.exit_initialization_mode() + + # Do a few steps + slave.do_step(0.0, 0.1) + slave.do_step(0.1, 0.1) + + # Get state value + vr_x = 1 + x_before = slave.get_real([vr_x]) + + # Save FMU state + state = slave.get_fmu_state() + assert state is not None + + # Continue simulation + slave.do_step(0.2, 0.1) + slave.do_step(0.3, 0.1) + + x_after = slave.get_real([vr_x]) + assert x_after[0] != x_before[0] # State should have changed + + # Restore previous state + slave.set_fmu_state(state) + x_restored = slave.get_real([vr_x]) + assert abs(x_restored[0] - x_before[0]) < 1e-10 # Should match saved state + + # Free the saved state + slave.free_fmu_state(state) + + slave.terminate() + slave.free_instance() + + +@skip_arm64 +def test_fmi2_state_serialization(reference_fmus_dir): + """Test FMU state serialization and deserialization.""" + filename = (reference_fmus_dir / "2.0/Dahlquist.fmu").absolute() + + slave = Fmi2Slave(filename, model_identifier="Dahlquist") + slave.instantiate( + "dahlquist_serialize", + Fmi2Type.CO_SIMULATION, + guid="{221063D2-EF4A-45FE-B954-B5BFEEA9A59B}", + ) + + slave.setup_experiment(start_time=0.0, stop_time=10.0) + slave.enter_initialization_mode() + slave.exit_initialization_mode() + + # Simulate to some point + slave.do_step(0.0, 0.1) + slave.do_step(0.1, 0.1) + + vr_x = 1 + x_original = slave.get_real([vr_x]) + + # Get and serialize state + state = slave.get_fmu_state() + serialized = slave.serialize_fmu_state(state) + assert isinstance(serialized, bytes) + assert len(serialized) > 0 + + slave.free_fmu_state(state) + + # Continue simulation + slave.do_step(0.2, 0.1) + x_after = slave.get_real([vr_x]) + assert x_after[0] != x_original[0] + + # Deserialize and restore + restored_state = slave.deserialize_fmu_state(serialized) + slave.set_fmu_state(restored_state) + + x_restored = slave.get_real([vr_x]) + assert abs(x_restored[0] - x_original[0]) < 1e-10 + + slave.free_fmu_state(restored_state) + slave.terminate() + slave.free_instance() + + +@skip_arm64 +def test_fmi2_multiple_instances(reference_fmus_dir): + """Test running multiple FMU instances simultaneously.""" + filename = (reference_fmus_dir / "2.0/Dahlquist.fmu").absolute() + + # Create two instances + slave1 = Fmi2Slave(filename, model_identifier="Dahlquist") + slave1.instantiate( + "instance1", + Fmi2Type.CO_SIMULATION, + guid="{221063D2-EF4A-45FE-B954-B5BFEEA9A59B}", + ) + + slave2 = Fmi2Slave(filename, model_identifier="Dahlquist") + slave2.instantiate( + "instance2", + Fmi2Type.CO_SIMULATION, + guid="{221063D2-EF4A-45FE-B954-B5BFEEA9A59B}", + ) + + # Setup both + slave1.setup_experiment(start_time=0.0, stop_time=10.0) + slave1.enter_initialization_mode() + slave1.exit_initialization_mode() + + slave2.setup_experiment(start_time=0.0, stop_time=10.0) + slave2.enter_initialization_mode() + + # Set different parameter values + vr_k = 3 + slave2.set_real([vr_k], [2.0]) # Different k value + slave2.exit_initialization_mode() + + # Run both instances + vr_x = 1 + for _ in range(5): + slave1.do_step(_ * 0.1, 0.1) + slave2.do_step(_ * 0.1, 0.1) + + # They should have different state values due to different parameters + x1 = slave1.get_real([vr_x]) + x2 = slave2.get_real([vr_x]) + assert x1[0] != x2[0] # Different evolution due to different k + + slave1.terminate() + slave1.free_instance() + slave2.terminate() + slave2.free_instance() + + +@skip_arm64 +def test_fmi2_parameter_tuning(reference_fmus_dir): + """Test setting parameters before initialization.""" + filename = (reference_fmus_dir / "2.0/BouncingBall.fmu").absolute() + + slave = Fmi2Slave(filename, model_identifier="BouncingBall") + slave.instantiate( + "ball_tuned", + Fmi2Type.CO_SIMULATION, + guid="{1AE5E10D-9521-4DE3-80B9-D0EAAA7D5AF1}", + ) + + slave.setup_experiment(start_time=0.0, stop_time=3.0) + slave.enter_initialization_mode() + + # Set parameters: g (gravity, VR 5) and e (restitution coefficient, VR 6) + vr_g = 5 + vr_e = 6 + + # Set custom gravity and restitution + slave.set_real([vr_g], [9.81]) + slave.set_real([vr_e], [0.8]) + + # Verify parameters were set + g_values = slave.get_real([vr_g]) + e_values = slave.get_real([vr_e]) + assert abs(g_values[0] - 9.81) < 0.01 + assert abs(e_values[0] - 0.8) < 0.01 + + slave.exit_initialization_mode() + + # Run simulation + for i in range(10): + slave.do_step(i * 0.1, 0.1) + + slave.terminate() + slave.free_instance() + + +# Tests for helper functions +def test_shared_lib_extension(): + """Test shared library extension detection.""" + ext = _shared_lib_extension() + system = platform.system() + + if system == "Windows": + assert ext == ".dll" + elif system == "Darwin": + assert ext == ".dylib" + else: + assert ext == ".so" + + +def test_platform_folder(): + """Test platform folder name generation.""" + folder = _platform_folder() + system = platform.system() + + if system == "Darwin": + assert folder == "darwin64" + elif system == "Linux": + assert folder in ["linux32", "linux64"] + elif system == "Windows": + assert folder in ["win32", "win64"] + + +def test_path_to_file_uri(): + """Test path to file URI conversion.""" + path = pathlib.Path("/tmp/test.txt") + uri = _path_to_file_uri(path) + + assert uri.startswith("file://") + if platform.system() == "Windows": + assert "/" in uri + else: + assert uri.startswith("file:///") + + +def test_fmi2_status_enum(): + """Test Fmi2Status enumeration values.""" + assert Fmi2Status.OK == 0 + assert Fmi2Status.WARNING == 1 + assert Fmi2Status.DISCARD == 2 + assert Fmi2Status.ERROR == 3 + assert Fmi2Status.FATAL == 4 + assert Fmi2Status.PENDING == 5 + + +def test_fmi2_type_enum(): + """Test Fmi2Type enumeration values.""" + assert Fmi2Type.MODEL_EXCHANGE == 0 + assert Fmi2Type.CO_SIMULATION == 1 + + +def test_fmi2_error_exception(): + """Test Fmi2Error exception.""" + error = Fmi2Error("testFunction", Fmi2Status.ERROR) + + assert error.func_name == "testFunction" + assert error.status == Fmi2Status.ERROR + assert "testFunction" in str(error) + assert "ERROR" in str(error)