diff --git a/.pylintrc b/.pylintrc index 1a254ee..0c2006b 100644 --- a/.pylintrc +++ b/.pylintrc @@ -10,4 +10,4 @@ ignore-paths=src/s2python/generated/ # avoid hangs. jobs=1 -disable=missing-class-docstring,missing-module-docstring,too-few-public-methods,missing-function-docstring,no-member,unsubscriptable-object,line-too-long +disable=missing-class-docstring,missing-module-docstring,too-few-public-methods,missing-function-docstring,no-member,unsubscriptable-object,line-too-long,duplicate-code diff --git a/examples/async_frbc_rm.py b/examples/async_frbc_rm.py new file mode 100644 index 0000000..4a111e5 --- /dev/null +++ b/examples/async_frbc_rm.py @@ -0,0 +1,202 @@ +import argparse +import asyncio +import logging +import sys +import uuid +import signal +import datetime + +from s2python.connection.types import S2ConnectionEventsAndMessages, SendOkayRunAsync +from s2python.common import ( + Duration, + Role, + RoleType, + Commodity, + Currency, + NumberRange, + PowerRange, + CommodityQuantity, +) +from s2python.frbc import ( + FRBCInstruction, + FRBCSystemDescription, + FRBCActuatorDescription, + FRBCStorageDescription, + FRBCOperationMode, + FRBCOperationModeElement, + FRBCFillLevelTargetProfile, + FRBCFillLevelTargetProfileElement, + FRBCStorageStatus, + FRBCActuatorStatus, +) +from s2python.connection import AssetDetails +from s2python.connection.async_ import S2AsyncConnection, WebsocketClientMedium +from s2python.connection.async_.control_type.class_based import ( + FRBCControlType, + NoControlControlType, + ResourceManagerHandler, +) + +logger = logging.getLogger("s2python") +logger.addHandler(logging.StreamHandler(sys.stdout)) +logger.setLevel(logging.DEBUG) + + +class MyFRBCControlType(FRBCControlType): + async def handle_instruction( + self, + connection: S2AsyncConnection, + msg: S2ConnectionEventsAndMessages, + send_okay: SendOkayRunAsync, + ) -> None: + if not isinstance(msg, FRBCInstruction): + raise RuntimeError( + f"Expected an FRBCInstruction but received a message of type {type(msg)}." + ) + print(f"I have received the message {msg} from {connection}") + + async def activate(self, connection: S2AsyncConnection) -> None: + print("The control type FRBC is now activated.") + + print("Time to send a FRBC SystemDescription") + actuator_id = uuid.uuid4() + operation_mode_id = uuid.uuid4() + await connection.send_msg_and_await_reception_status( + FRBCSystemDescription( + message_id=uuid.uuid4(), + valid_from=datetime.datetime.now(tz=datetime.timezone.utc), + actuators=[ + FRBCActuatorDescription( + id=actuator_id, + operation_modes=[ + FRBCOperationMode( + id=operation_mode_id, + elements=[ + FRBCOperationModeElement( + fill_level_range=NumberRange( + start_of_range=0.0, end_of_range=100.0 + ), + fill_rate=NumberRange( + start_of_range=-5.0, end_of_range=5.0 + ), + power_ranges=[ + PowerRange( + start_of_range=-200.0, + end_of_range=200.0, + commodity_quantity=CommodityQuantity.ELECTRIC_POWER_L1, + ) + ], + ) + ], + diagnostic_label="Load & unload battery", + abnormal_condition_only=False, + ) + ], + transitions=[], + timers=[], + supported_commodities=[Commodity.ELECTRICITY], + ) + ], + storage=FRBCStorageDescription( + fill_level_range=NumberRange(start_of_range=0.0, end_of_range=100.0), + fill_level_label="%", + diagnostic_label="Imaginary battery", + provides_fill_level_target_profile=True, + provides_leakage_behaviour=False, + provides_usage_forecast=False, + ), + ) + ) + print("Also send the target profile") + + await connection.send_msg_and_await_reception_status( + FRBCFillLevelTargetProfile( + message_id=uuid.uuid4(), + start_time=datetime.datetime.now(tz=datetime.timezone.utc), + elements=[ + FRBCFillLevelTargetProfileElement( + duration=Duration.from_milliseconds(30_000), + fill_level_range=NumberRange(start_of_range=20.0, end_of_range=30.0), + ), + FRBCFillLevelTargetProfileElement( + duration=Duration.from_milliseconds(300_000), + fill_level_range=NumberRange(start_of_range=40.0, end_of_range=50.0), + ), + ], + ) + ) + + print("Also send the storage status.") + await connection.send_msg_and_await_reception_status( + FRBCStorageStatus(message_id=uuid.uuid4(), present_fill_level=10.0) + ) + + print("Also send the actuator status.") + await connection.send_msg_and_await_reception_status( + FRBCActuatorStatus( + message_id=uuid.uuid4(), + actuator_id=actuator_id, + active_operation_mode_id=operation_mode_id, + operation_mode_factor=0.5, + ) + ) + + async def deactivate(self, connection: S2AsyncConnection) -> None: + print("The control type FRBC is now deactivated.") + + +class MyNoControlControlType(NoControlControlType): + async def activate(self, connection: S2AsyncConnection) -> None: + print("The control type NoControl is now activated.") + + async def deactivate(self, connection: S2AsyncConnection) -> None: + print("The control type NoControl is now deactivated.") + + +async def start_s2_session(url, rm_id: uuid.UUID): + # Configure a resource manager + rm_handler = ResourceManagerHandler( + asset_details=AssetDetails( + resource_id=rm_id, + name="Some asset", + instruction_processing_delay=Duration.from_milliseconds(20), + roles=[Role(role=RoleType.ENERGY_CONSUMER, commodity=Commodity.ELECTRICITY)], + currency=Currency.EUR, + provides_forecast=False, + provides_power_measurements=[CommodityQuantity.ELECTRIC_POWER_L1], + ), + control_types=[MyFRBCControlType(), MyNoControlControlType()], + ) + + # Setup the underlying websocket connection + ws_medium = WebsocketClientMedium(url=url, verify_certificate=False) + await ws_medium.connect() + + # Configure the S2 connection on top of the websocket connection + s2_conn = S2AsyncConnection(medium=ws_medium) + rm_handler.register_handlers(s2_conn) + + eventloop = asyncio.get_running_loop() + + async def stop(): + print("Received signal. Will stop S2 connection.") + await s2_conn.stop() + + eventloop.add_signal_handler(signal.SIGINT, lambda: eventloop.create_task(stop())) + eventloop.add_signal_handler(signal.SIGTERM, lambda: eventloop.create_task(stop())) + await s2_conn.run() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="A simple S2 reseource manager example.") + RM_ID = uuid.uuid4() + parser.add_argument( + "--endpoint", + type=str, + required=False, + help=f"WebSocket endpoint uri for the server (CEM) e.g. ws://localhost:8003/ws/{RM_ID}", + default=f"ws://localhost:8003/ws/{RM_ID}", + ) + args = parser.parse_args() + + asyncio.run(start_s2_session(args.endpoint, RM_ID)) diff --git a/examples/example_frbc_rm.py b/examples/quickstart_websocket_rm.py similarity index 72% rename from examples/example_frbc_rm.py rename to examples/quickstart_websocket_rm.py index d69473d..187c3cf 100644 --- a/examples/example_frbc_rm.py +++ b/examples/quickstart_websocket_rm.py @@ -1,14 +1,12 @@ import argparse -from functools import partial import logging import sys import uuid import signal import datetime -from typing import Callable +from typing import Callable, Optional from s2python.common import ( - EnergyManagementRole, Duration, Role, RoleType, @@ -18,6 +16,7 @@ PowerRange, CommodityQuantity, ) +from s2python.connection.types import S2ConnectionEventsAndMessages from s2python.frbc import ( FRBCInstruction, FRBCSystemDescription, @@ -30,9 +29,9 @@ FRBCStorageStatus, FRBCActuatorStatus, ) -from s2python.s2_connection import S2Connection, AssetDetails -from s2python.s2_control_type import FRBCControlType, NoControlControlType -from s2python.message import S2Message +from s2python.connection import AssetDetails, BlockingWebsocketClientRM +from s2python.connection.sync import S2SyncConnection +from s2python.connection.sync.control_type.class_based import FRBCControlType, NoControlControlType logger = logging.getLogger("s2python") logger.addHandler(logging.StreamHandler(sys.stdout)) @@ -41,21 +40,23 @@ class MyFRBCControlType(FRBCControlType): def handle_instruction( - self, conn: S2Connection, msg: S2Message, send_okay: Callable[[], None] + self, connection: S2SyncConnection, msg: S2ConnectionEventsAndMessages, send_okay: Optional[Callable[[], None]] ) -> None: + assert send_okay if not isinstance(msg, FRBCInstruction): raise RuntimeError( f"Expected an FRBCInstruction but received a message of type {type(msg)}." ) - print(f"I have received the message {msg} from {conn}") + print(f"I have received the message {msg} from {connection}") + send_okay() - def activate(self, conn: S2Connection) -> None: + def activate(self, connection: S2SyncConnection) -> None: print("The control type FRBC is now activated.") print("Time to send a FRBC SystemDescription") actuator_id = uuid.uuid4() operation_mode_id = uuid.uuid4() - conn.send_msg_and_await_reception_status_sync( + connection.send_msg_and_await_reception_status( FRBCSystemDescription( message_id=uuid.uuid4(), valid_from=datetime.datetime.now(tz=datetime.timezone.utc), @@ -103,7 +104,7 @@ def activate(self, conn: S2Connection) -> None: ) print("Also send the target profile") - conn.send_msg_and_await_reception_status_sync( + connection.send_msg_and_await_reception_status( FRBCFillLevelTargetProfile( message_id=uuid.uuid4(), start_time=datetime.datetime.now(tz=datetime.timezone.utc), @@ -121,12 +122,12 @@ def activate(self, conn: S2Connection) -> None: ) print("Also send the storage status.") - conn.send_msg_and_await_reception_status_sync( + connection.send_msg_and_await_reception_status( FRBCStorageStatus(message_id=uuid.uuid4(), present_fill_level=10.0) ) print("Also send the actuator status.") - conn.send_msg_and_await_reception_status_sync( + connection.send_msg_and_await_reception_status( FRBCActuatorStatus( message_id=uuid.uuid4(), actuator_id=actuator_id, @@ -135,54 +136,56 @@ def activate(self, conn: S2Connection) -> None: ) ) - def deactivate(self, conn: S2Connection) -> None: + def deactivate(self, connection: S2SyncConnection) -> None: print("The control type FRBC is now deactivated.") class MyNoControlControlType(NoControlControlType): - def activate(self, conn: S2Connection) -> None: + def activate(self, connection: S2SyncConnection) -> None: print("The control type NoControl is now activated.") - def deactivate(self, conn: S2Connection) -> None: + def deactivate(self, connection: S2SyncConnection) -> None: print("The control type NoControl is now deactivated.") -def stop(s2_connection, signal_num, _current_stack_frame): - print(f"Received signal {signal_num}. Will stop S2 connection.") - s2_connection.stop() - - -def start_s2_session(url, client_node_id=str(uuid.uuid4())): - s2_conn = S2Connection( - url=url, - role=EnergyManagementRole.RM, - control_types=[MyFRBCControlType(), MyNoControlControlType()], - asset_details=AssetDetails( - resource_id=client_node_id, +def start_s2_session(url, rm_id: uuid.UUID): + # Configure a resource manager + asset_details = AssetDetails( + resource_id=rm_id, name="Some asset", instruction_processing_delay=Duration.from_milliseconds(20), roles=[Role(role=RoleType.ENERGY_CONSUMER, commodity=Commodity.ELECTRICITY)], currency=Currency.EUR, provides_forecast=False, provides_power_measurements=[CommodityQuantity.ELECTRIC_POWER_L1], - ), - reconnect=True, - verify_certificate=False, - ) - signal.signal(signal.SIGINT, partial(stop, s2_conn)) - signal.signal(signal.SIGTERM, partial(stop, s2_conn)) + ) + + # Configure the S2 connection on top of the websocket connection + s2_conn = BlockingWebsocketClientRM(url=url, asset_details=asset_details, control_types=[MyFRBCControlType(), MyNoControlControlType()]) + + def stop(signal_num, _current_stack_frame): + print(f"Received signal {signal_num}. Will stop S2 connection.") + s2_conn.stop() + + signal.signal(signal.SIGINT, stop) + signal.signal(signal.SIGTERM, stop) - s2_conn.start_as_rm() + print('Starting s2 connection') + s2_conn.start() + print('S2 connection stopped') + s2_conn.wait_till_done() if __name__ == "__main__": parser = argparse.ArgumentParser(description="A simple S2 reseource manager example.") + RM_ID = uuid.uuid4() parser.add_argument( - "endpoint", + "--endpoint", type=str, - help="WebSocket endpoint uri for the server (CEM) e.g. " - "ws://localhost:8080/backend/rm/s2python-frbc/cem/dummy_model/ws", + required=False, + help=f"WebSocket endpoint uri for the server (CEM) e.g. ws://localhost:8003/ws/{RM_ID}", + default=f"ws://localhost:8003/ws/{RM_ID}", ) args = parser.parse_args() - start_s2_session(args.endpoint) + start_s2_session(args.endpoint, RM_ID) diff --git a/examples/sync_frbc_rm.py b/examples/sync_frbc_rm.py new file mode 100644 index 0000000..500de3b --- /dev/null +++ b/examples/sync_frbc_rm.py @@ -0,0 +1,208 @@ +import argparse +import asyncio +import logging +import sys +import uuid +import signal +import datetime + +from s2python.common import ( + Duration, + Role, + RoleType, + Commodity, + Currency, + NumberRange, + PowerRange, + CommodityQuantity, +) +from s2python.connection.types import S2ConnectionEventsAndMessages, SendOkayRunSync +from s2python.frbc import ( + FRBCInstruction, + FRBCSystemDescription, + FRBCActuatorDescription, + FRBCStorageDescription, + FRBCOperationMode, + FRBCOperationModeElement, + FRBCFillLevelTargetProfile, + FRBCFillLevelTargetProfileElement, + FRBCStorageStatus, + FRBCActuatorStatus, +) +from s2python.connection import AssetDetails +from s2python.connection.sync import S2SyncConnection +from s2python.connection.async_ import WebsocketClientMedium +from s2python.connection.sync.control_type.class_based import ( + FRBCControlType, + NoControlControlType, + ResourceManagerHandler, +) + +logger = logging.getLogger("s2python") +logger.addHandler(logging.StreamHandler(sys.stdout)) +logger.setLevel(logging.DEBUG) + + +class MyFRBCControlType(FRBCControlType): + def handle_instruction( + self, + connection: S2SyncConnection, + msg: S2ConnectionEventsAndMessages, + send_okay: SendOkayRunSync, + ) -> None: + if not isinstance(msg, FRBCInstruction): + raise RuntimeError( + f"Expected an FRBCInstruction but received a message of type {type(msg)}." + ) + print(f"I have received the message {msg} from {connection}") + + def activate(self, connection: S2SyncConnection) -> None: + print("The control type FRBC is now activated.") + + print("Time to send a FRBC SystemDescription") + actuator_id = uuid.uuid4() + operation_mode_id = uuid.uuid4() + connection.send_msg_and_await_reception_status( + FRBCSystemDescription( + message_id=uuid.uuid4(), + valid_from=datetime.datetime.now(tz=datetime.timezone.utc), + actuators=[ + FRBCActuatorDescription( + id=actuator_id, + operation_modes=[ + FRBCOperationMode( + id=operation_mode_id, + elements=[ + FRBCOperationModeElement( + fill_level_range=NumberRange( + start_of_range=0.0, end_of_range=100.0 + ), + fill_rate=NumberRange( + start_of_range=-5.0, end_of_range=5.0 + ), + power_ranges=[ + PowerRange( + start_of_range=-200.0, + end_of_range=200.0, + commodity_quantity=CommodityQuantity.ELECTRIC_POWER_L1, + ) + ], + ) + ], + diagnostic_label="Load & unload battery", + abnormal_condition_only=False, + ) + ], + transitions=[], + timers=[], + supported_commodities=[Commodity.ELECTRICITY], + ) + ], + storage=FRBCStorageDescription( + fill_level_range=NumberRange(start_of_range=0.0, end_of_range=100.0), + fill_level_label="%", + diagnostic_label="Imaginary battery", + provides_fill_level_target_profile=True, + provides_leakage_behaviour=False, + provides_usage_forecast=False, + ), + ) + ) + print("Also send the target profile") + + connection.send_msg_and_await_reception_status( + FRBCFillLevelTargetProfile( + message_id=uuid.uuid4(), + start_time=datetime.datetime.now(tz=datetime.timezone.utc), + elements=[ + FRBCFillLevelTargetProfileElement( + duration=Duration.from_milliseconds(30_000), + fill_level_range=NumberRange(start_of_range=20.0, end_of_range=30.0), + ), + FRBCFillLevelTargetProfileElement( + duration=Duration.from_milliseconds(300_000), + fill_level_range=NumberRange(start_of_range=40.0, end_of_range=50.0), + ), + ], + ) + ) + + print("Also send the storage status.") + connection.send_msg_and_await_reception_status( + FRBCStorageStatus(message_id=uuid.uuid4(), present_fill_level=10.0) + ) + + print("Also send the actuator status.") + connection.send_msg_and_await_reception_status( + FRBCActuatorStatus( + message_id=uuid.uuid4(), + actuator_id=actuator_id, + active_operation_mode_id=operation_mode_id, + operation_mode_factor=0.5, + ) + ) + + def deactivate(self, connection: S2SyncConnection) -> None: + print("The control type FRBC is now deactivated.") + + +class MyNoControlControlType(NoControlControlType): + def activate(self, connection: S2SyncConnection) -> None: + print("The control type NoControl is now activated.") + + def deactivate(self, connection: S2SyncConnection) -> None: + print("The control type NoControl is now deactivated.") + + +def start_s2_session(url, rm_id: uuid.UUID): + # Configure a resource manager + rm_handler = ResourceManagerHandler( + asset_details=AssetDetails( + resource_id=rm_id, + name="Some asset", + instruction_processing_delay=Duration.from_milliseconds(20), + roles=[Role(role=RoleType.ENERGY_CONSUMER, commodity=Commodity.ELECTRICITY)], + currency=Currency.EUR, + provides_forecast=False, + provides_power_measurements=[CommodityQuantity.ELECTRIC_POWER_L1], + ), + control_types=[MyFRBCControlType(), MyNoControlControlType()], + ) + + # Setup the underlying websocket connection + ws_medium = WebsocketClientMedium(url=url, verify_certificate=False) + + eventloop = asyncio.get_event_loop() + print("Before connecting to websocket") + eventloop.run_until_complete(ws_medium.connect()) + print("After connecting to websocket") + + # Configure the S2 connection on top of the websocket connection + s2_conn = S2SyncConnection(medium=ws_medium, eventloop=eventloop) + rm_handler.register_handlers(s2_conn) + + def stop(signal_num, _current_stack_frame): + print(f"Received signal {signal_num}. Will stop S2 connection.") + s2_conn.stop() + + signal.signal(signal.SIGINT, stop) + signal.signal(signal.SIGTERM, stop) + + print("Starting s2 connection") + s2_conn.run() + print("S2 connection stopped") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="A simple S2 reseource manager example.") + RM_ID = uuid.uuid4() + parser.add_argument( + "--endpoint", + type=str, + required=False, + help=f"WebSocket endpoint uri for the server (CEM) e.g. ws://localhost:8003/ws/{RM_ID}", + default=f"ws://localhost:8003/ws/{RM_ID}", + ) + args = parser.parse_args() + + start_s2_session(args.endpoint, RM_ID) diff --git a/src/s2python/connection/__init__.py b/src/s2python/connection/__init__.py new file mode 100644 index 0000000..c7a0d92 --- /dev/null +++ b/src/s2python/connection/__init__.py @@ -0,0 +1,15 @@ +from s2python.connection.asset_details import AssetDetails +from s2python.connection.connection_events import ConnectionStarted, ConnectionStopped, S2ConnectionEvent +from s2python.connection.control_type import RoleHandler +from s2python.connection.types import S2ConnectionEventsAndMessages +from s2python.connection.quickstarts import BlockingWebsocketClientRM + +__all__ = [ + "AssetDetails", + "ConnectionStarted", + "ConnectionStopped", + "S2ConnectionEvent", + "RoleHandler", + "S2ConnectionEventsAndMessages", + "BlockingWebsocketClientRM" +] diff --git a/src/s2python/s2_asset_details.py b/src/s2python/connection/asset_details.py similarity index 85% rename from src/s2python/s2_asset_details.py rename to src/s2python/connection/asset_details.py index cf05539..297a1b2 100644 --- a/src/s2python/s2_asset_details.py +++ b/src/s2python/connection/asset_details.py @@ -1,19 +1,21 @@ -import logging +import typing import uuid from dataclasses import dataclass from typing import Optional, List + from s2python.common import ( Role, ResourceManagerDetails, Duration, Currency, ) -from s2python.generated.gen_s2 import CommodityQuantity -from s2python.s2_control_type import S2ControlType +from s2python.common import CommodityQuantity, ControlType -logger = logging.getLogger("s2python") +class HasProtocolControlType(typing.Protocol): + def get_protocol_control_type(self) -> ControlType: + ... @dataclass class AssetDetails: # pylint: disable=too-many-instance-attributes @@ -33,7 +35,7 @@ class AssetDetails: # pylint: disable=too-many-instance-attributes serial_number: Optional[str] = None def to_resource_manager_details( - self, control_types: List[S2ControlType] + self, control_types: typing.Sequence[HasProtocolControlType] ) -> ResourceManagerDetails: return ResourceManagerDetails( available_control_types=[ diff --git a/src/s2python/connection/async_/__init__.py b/src/s2python/connection/async_/__init__.py new file mode 100644 index 0000000..52ca6dc --- /dev/null +++ b/src/s2python/connection/async_/__init__.py @@ -0,0 +1,13 @@ +from s2python.connection.async_.connection import S2AsyncConnection, CouldNotReceiveStatusReceptionError +from s2python.connection.async_.message_handlers import S2EventHandlerAsync +from s2python.connection.async_.medium.s2_medium import S2MediumConnection +from s2python.connection.async_.medium.websocket import WebsocketClientMedium + + +__all__ = [ + "S2AsyncConnection", + "CouldNotReceiveStatusReceptionError", + "S2EventHandlerAsync", + "S2MediumConnection", + "WebsocketClientMedium" +] diff --git a/src/s2python/connection/async_/connection.py b/src/s2python/connection/async_/connection.py new file mode 100644 index 0000000..24c3b1d --- /dev/null +++ b/src/s2python/connection/async_/connection.py @@ -0,0 +1,238 @@ +import asyncio +import json +import logging +import uuid +from typing import Optional, Type + +from s2python.connection.connection_events import ConnectionStopped +from s2python.connection.async_.medium.s2_medium import S2MediumConnection, MediumClosedConnectionError, \ + S2AsyncMediumConnection, S2SyncToAsyncMediumConnection, S2SyncMediumConnection +from s2python.common import ( + ReceptionStatusValues, + ReceptionStatus, +) +from s2python.connection.async_.message_handlers import MessageHandlers, S2EventHandlerAsync +from s2python.connection.errors import PermanentConnectionError, CouldNotReceiveStatusReceptionError +from s2python.connection.types import S2ConnectionEventsAndMessages +from s2python.reception_status_awaiter import ReceptionStatusAwaiter +from s2python.s2_parser import S2Parser +from s2python.s2_validation_error import S2ValidationError +from s2python.message import S2Message, S2MessageWithID +from s2python.connection.connection_events import ConnectionStarted + +logger = logging.getLogger("s2python") + + +class S2AsyncConnection: + _eventloop: asyncio.AbstractEventLoop + _stop_event: asyncio.Event + """Stop the S2 connection permanently.""" + _received_messages: asyncio.Queue + + _reception_status_awaiter: ReceptionStatusAwaiter + _medium: S2AsyncMediumConnection + _s2_parser: S2Parser + _handlers: MessageHandlers + + def __init__( + self, + medium: S2MediumConnection, + eventloop: Optional[asyncio.AbstractEventLoop] = None, + ) -> None: + self._eventloop = eventloop if eventloop is not None else asyncio.get_event_loop() + self._stop_event = asyncio.Event() + + self._reception_status_awaiter = ReceptionStatusAwaiter() + if isinstance(medium, S2AsyncMediumConnection): + self._medium = medium + elif isinstance(medium, S2SyncMediumConnection): + self._medium = S2SyncToAsyncMediumConnection(medium) + else: + raise RuntimeError(f'Unexpected medium type {type(medium)}. Medium must be either an ' + 'S2AsyncMediumConnection or S2SyncMediumConnection.') + self._s2_parser = S2Parser() + self._handlers = MessageHandlers() + + async def stop(self) -> None: + """Stop the S2 connection gracefully. + + Note: Does not stop the underlying medium! + """ + logger.info("Will stop the S2 connection at the earliest moment.") + self._stop_event.set() + + async def _wait_till_stop(self) -> None: + await self._stop_event.wait() + + async def run(self) -> None: + logger.debug('Starting S2 connection on eventloop %s.', id(self._eventloop)) + self._received_messages = asyncio.Queue() + + if not await self._medium.is_connected(): + raise MediumClosedConnectionError("Cannot start the S2 connection if the underlying medium is closed.") + + background_tasks = [ + self._eventloop.create_task(self._receive_messages()), + self._eventloop.create_task(self._wait_till_stop()), + self._eventloop.create_task(self._handle_received_messages()), + ] + + await self._handlers.handle_event(self, ConnectionStarted()) + + (done, pending) = await asyncio.wait( + background_tasks, return_when=asyncio.FIRST_COMPLETED + ) + + await self._handlers.handle_event(self, ConnectionStopped()) + + for task in pending: + try: + task.cancel() + await task + except (asyncio.CancelledError, Exception): # pylint: disable=broad-exception-caught + pass + + for task in done: + try: + await task + except asyncio.CancelledError: + pass + except MediumClosedConnectionError: + logger.info("The other party closed the websocket connection.") + except Exception: # pylint: disable=broad-exception-caught + logger.exception("An error occurred in the S2 connection. Terminating current connection.") + + async def _handle_received_messages(self) -> None: + while not self._stop_event.is_set(): + msg = await self._received_messages.get() + logger.debug('Handling received message %s', msg.to_json()) + await self._handlers.handle_event(self, msg) + + async def _receive_messages(self) -> None: + """Receives all incoming messages in the form of a generator. + + Will also receive the ReceptionStatus messages but instead of yielding these messages, they are routed + to any calls of `send_msg_and_await_reception_status`. + """ + logger.info("S2 connection has started to receive messages.") + + async for message in self._medium.messages(): + try: + s2_msg: S2Message = self._s2_parser.parse_as_any_message(message) + except json.JSONDecodeError: + await self.send_and_forget( + ReceptionStatus( + subject_message_id=uuid.UUID("00000000-0000-0000-0000-000000000000"), + status=ReceptionStatusValues.INVALID_DATA, + diagnostic_label="Not valid json.", + ) + ) + except S2ValidationError as e: + json_msg = json.loads(message) + message_id = json_msg.get("message_id") + if message_id: + await self.respond_with_reception_status( + subject_message_id=message_id, + status=ReceptionStatusValues.INVALID_MESSAGE, + diagnostic_label=str(e), + ) + else: + await self.respond_with_reception_status( + subject_message_id=uuid.UUID("00000000-0000-0000-0000-000000000000"), + status=ReceptionStatusValues.INVALID_DATA, + diagnostic_label="Message appears valid json but could not find a message_id field.", + ) + else: + logger.debug("Received message %s", s2_msg.to_json()) + + if isinstance(s2_msg, ReceptionStatus): + logger.debug( + "Message is a reception status for %s so registering in cache.", + s2_msg.subject_message_id, + ) + await self._reception_status_awaiter.receive_reception_status(s2_msg) + else: + logger.debug('Message is not a reception status, putting it in the received messages queue.') + await self._received_messages.put(s2_msg) + + def register_handler(self, event_type: Type[S2ConnectionEventsAndMessages], handler: S2EventHandlerAsync) -> None: + """Register a handler for a specific S2 message type. + + :param event_type: The S2 connection event to register the handler for. + :param handler: The handler function (asynchronous or normal) which will handle the message. + """ + self._handlers.register_handler(event_type, handler) + + def unregister_handler(self, s2_message_type: Type[S2ConnectionEventsAndMessages]) -> None: + self._handlers.unregister_handler(s2_message_type) + + async def send_and_forget(self, s2_msg: S2Message) -> None: + json_msg = s2_msg.to_json() + logger.debug("Sending message %s", json_msg) + try: + await self._medium.send(json_msg) + except MediumClosedConnectionError as e: + logger.error("Unable to send message %s due to %s", s2_msg, str(e)) + raise + + async def respond_with_reception_status( + self, subject_message_id: uuid.UUID, status: ReceptionStatusValues, diagnostic_label: str + ) -> None: + logger.debug( + "Responding to message %s with status %s", subject_message_id, status + ) + await self.send_and_forget( + ReceptionStatus( + subject_message_id=subject_message_id, + status=status, + diagnostic_label=diagnostic_label, + ) + ) + + async def send_msg_and_await_reception_status( + self, + s2_msg: S2MessageWithID, + timeout_reception_status: float = 5.0, + raise_on_error: bool = True, + ) -> ReceptionStatus: + await self.send_and_forget(s2_msg) + logger.debug( + "Waiting for ReceptionStatus for %s %s seconds", + s2_msg.message_id, + timeout_reception_status, + ) + reception_status_task = self._eventloop.create_task(self._reception_status_awaiter.wait_for_reception_status( + s2_msg.message_id, timeout_reception_status + )) + stop_event_task = self._eventloop.create_task(self._wait_till_stop()) + + (done, pending) = await asyncio.wait([reception_status_task, stop_event_task], return_when=asyncio.FIRST_COMPLETED) + + for task in pending: + try: + task.cancel() + await task + except (asyncio.CancelledError, Exception): # pylint: disable=broad-exception-caught + pass + + if reception_status_task in done: + try: + reception_status = await reception_status_task + except TimeoutError: + logger.error("Did not receive a reception status on time for %s", s2_msg.message_id) + self._stop_event.set() + raise + else: + #stop_event_task in done + await stop_event_task + raise CouldNotReceiveStatusReceptionError(f"Connection stopped while waiting for ReceptionStatus for message {s2_msg.message_id}") + + if raise_on_error: + if reception_status.status == ReceptionStatusValues.PERMANENT_ERROR: + error = f"Received a permanent error for message {s2_msg.message_id} with diagnostic label: {reception_status.diagnostic_label}" + logger.error(error) + raise PermanentConnectionError(error) + if reception_status.status != ReceptionStatusValues.OK and raise_on_error: + raise RuntimeError(f"ReceptionStatus was not OK but rather {reception_status.status}") + + return reception_status diff --git a/src/s2python/connection/async_/control_type/__init__.py b/src/s2python/connection/async_/control_type/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/s2python/connection/async_/control_type/class_based.py b/src/s2python/connection/async_/control_type/class_based.py new file mode 100644 index 0000000..a9d49dd --- /dev/null +++ b/src/s2python/connection/async_/control_type/class_based.py @@ -0,0 +1,280 @@ +import abc +import logging +import uuid +from typing import Optional, List + +from s2python.connection.asset_details import AssetDetails +from s2python.common import ( + Handshake, + EnergyManagementRole, + HandshakeResponse, + SelectControlType, +) +from s2python.connection.async_.connection import S2AsyncConnection +from s2python.connection.types import ( + S2ConnectionEventsAndMessages, + SendOkayRunAsync, +) +from s2python.version import S2_VERSION + +from s2python.connection.connection_events import ConnectionStarted, ConnectionStopped + +from s2python.common import ControlType as ProtocolControlType +from s2python.frbc import FRBCInstruction +from s2python.ppbc import PPBCScheduleInstruction +from s2python.ombc import OMBCInstruction +from s2python.pebc import PEBCInstruction + +logger = logging.getLogger("s2python") + + +class S2ControlType(abc.ABC): + asset_details: AssetDetails + + def set_asset_details(self, asset_details: AssetDetails) -> None: + self.asset_details = asset_details + + @abc.abstractmethod + def get_protocol_control_type(self) -> ProtocolControlType: ... + + @abc.abstractmethod + def register_handlers(self, connection: S2AsyncConnection) -> None: ... + + @abc.abstractmethod + async def activate(self, connection: S2AsyncConnection) -> None: ... + + @abc.abstractmethod + async def deactivate(self, connection: S2AsyncConnection) -> None: ... + + +class ResourceManagerHandler: + asset_details: AssetDetails + _current_control_type: Optional[S2ControlType] + _control_types: List[S2ControlType] + + def __init__(self, control_types: List[S2ControlType], asset_details: AssetDetails) -> None: + self.asset_details = asset_details + self._current_control_type = None + self._control_types = control_types + + for control_type in self._control_types: + control_type.set_asset_details(asset_details) + + def get_s2_role(self) -> EnergyManagementRole: + return EnergyManagementRole.RM + + def register_handlers(self, connection: S2AsyncConnection) -> None: + connection.register_handler(ConnectionStarted, self._on_connection_started) + connection.register_handler(Handshake, self._on_handshake) + connection.register_handler(HandshakeResponse, self._on_handshake_response) + connection.register_handler(SelectControlType, self._on_select_control_type) + connection.register_handler(ConnectionStopped, self._on_connection_stop) + + async def _on_connection_started( + self, connection: S2AsyncConnection, _: S2ConnectionEventsAndMessages, __: SendOkayRunAsync + ) -> None: + await connection.send_msg_and_await_reception_status( + Handshake( + message_id=uuid.uuid4(), + role=self.get_s2_role(), + supported_protocol_versions=[S2_VERSION], + ) + ) + logger.debug("Send handshake to CEM. Expecting Handshake and HandshakeResponse from CEM.") + + async def _on_handshake( + self, + _: S2AsyncConnection, + event: S2ConnectionEventsAndMessages, + send_okay: SendOkayRunAsync, + ) -> None: + assert send_okay is not None + if not isinstance(event, Handshake): + logger.error( + "Handler for Handshake received a message of the wrong type: %s", + type(event), + ) + return + + logger.debug( + "%s supports S2 protocol versions: %s", + event.role, + event.supported_protocol_versions, + ) + await send_okay() + + async def _on_handshake_response( + self, + connection: S2AsyncConnection, + event: S2ConnectionEventsAndMessages, + send_okay: SendOkayRunAsync, + ) -> None: + assert send_okay is not None + if not isinstance(event, HandshakeResponse): + logger.error( + "Handler for HandshakeResponse received a message of the wrong type: %s", + type(event), + ) + return + + logger.debug("Received HandshakeResponse %s", event.to_json()) + logger.debug("CEM selected to use version %s", event.selected_protocol_version) + await send_okay() + logger.debug("Handshake complete. Sending first ResourceManagerDetails.") + + await connection.send_msg_and_await_reception_status( + self.asset_details.to_resource_manager_details(self._control_types) + ) + + async def _on_select_control_type( + self, + connection: S2AsyncConnection, + event: S2ConnectionEventsAndMessages, + send_okay: SendOkayRunAsync, + ) -> None: + assert send_okay is not None + if not isinstance(event, SelectControlType): + logger.error( + "Handler for SelectControlType received a message of the wrong type: %s", + type(event), + ) + return + + await send_okay() + + logger.debug( + "CEM selected control type %s. Activating control type.", + event.control_type, + ) + + control_types_by_protocol_name = { + c.get_protocol_control_type(): c for c in self._control_types + } + selected_control_type = control_types_by_protocol_name.get(event.control_type) + + if self._current_control_type is not None: + await self._current_control_type.deactivate(connection) + + self._current_control_type = selected_control_type + + if self._current_control_type is not None: + self._current_control_type.register_handlers(connection) + await self._current_control_type.activate(connection) + + async def _on_connection_stop( + self, + connection: S2AsyncConnection, + __: S2ConnectionEventsAndMessages, + ___: SendOkayRunAsync, + ) -> None: + if self._current_control_type: + await self._current_control_type.deactivate(connection) + self._current_control_type = None + + +class FRBCControlType(S2ControlType): + def get_protocol_control_type(self) -> ProtocolControlType: + return ProtocolControlType.FILL_RATE_BASED_CONTROL + + def register_handlers(self, connection: S2AsyncConnection) -> None: + connection.register_handler(FRBCInstruction, self.handle_instruction) + + @abc.abstractmethod + async def handle_instruction( + self, + connection: S2AsyncConnection, + msg: S2ConnectionEventsAndMessages, + send_okay: SendOkayRunAsync, + ) -> None: ... + + @abc.abstractmethod + async def activate(self, connection: S2AsyncConnection) -> None: + """Overwrite with the actual activation logic of your Resource Manager for this particular control type.""" + + @abc.abstractmethod + async def deactivate(self, connection: S2AsyncConnection) -> None: + """Overwrite with the actual deactivation logic of your Resource Manager for this particular control type.""" + + +class PPBCControlType(S2ControlType): + def get_protocol_control_type(self) -> ProtocolControlType: + return ProtocolControlType.POWER_PROFILE_BASED_CONTROL + + def register_handlers(self, connection: S2AsyncConnection) -> None: + connection.register_handler(PPBCScheduleInstruction, self.handle_instruction) + + @abc.abstractmethod + async def handle_instruction( + self, + connection: S2AsyncConnection, + msg: S2ConnectionEventsAndMessages, + send_okay: SendOkayRunAsync, + ) -> None: ... + + @abc.abstractmethod + async def activate(self, connection: S2AsyncConnection) -> None: + """Overwrite with the actual activation logic of your Resource Manager for this particular control type.""" + + @abc.abstractmethod + async def deactivate(self, connection: S2AsyncConnection) -> None: + """Overwrite with the actual deactivation logic of your Resource Manager for this particular control type.""" + + +class OMBCControlType(S2ControlType): + def get_protocol_control_type(self) -> ProtocolControlType: + return ProtocolControlType.OPERATION_MODE_BASED_CONTROL + + def register_handlers(self, connection: S2AsyncConnection) -> None: + connection.register_handler(OMBCInstruction, self.handle_instruction) + + @abc.abstractmethod + async def handle_instruction( + self, + connection: S2AsyncConnection, + msg: S2ConnectionEventsAndMessages, + send_okay: SendOkayRunAsync, + ) -> None: ... + + @abc.abstractmethod + async def activate(self, connection: S2AsyncConnection) -> None: + """Overwrite with the actual activation logic of your Resource Manager for this particular control type.""" + + @abc.abstractmethod + async def deactivate(self, connection: S2AsyncConnection) -> None: + """Overwrite with the actual deactivation logic of your Resource Manager for this particular control type.""" + + +class PEBCControlType(S2ControlType): + def get_protocol_control_type(self) -> ProtocolControlType: + return ProtocolControlType.POWER_ENVELOPE_BASED_CONTROL + + def register_handlers(self, connection: S2AsyncConnection) -> None: + connection.register_handler(PEBCInstruction, self.handle_instruction) + + @abc.abstractmethod + async def handle_instruction( + self, + connection: S2AsyncConnection, + msg: S2ConnectionEventsAndMessages, + send_okay: SendOkayRunAsync, + ) -> None: ... + + @abc.abstractmethod + async def activate(self, connection: S2AsyncConnection) -> None: ... + + @abc.abstractmethod + async def deactivate(self, connection: S2AsyncConnection) -> None: ... + + +class NoControlControlType(S2ControlType): + def get_protocol_control_type(self) -> ProtocolControlType: + return ProtocolControlType.NOT_CONTROLABLE + + def register_handlers(self, connection: S2AsyncConnection) -> None: + pass + + @abc.abstractmethod + async def activate(self, connection: S2AsyncConnection) -> None: ... + + @abc.abstractmethod + async def deactivate(self, connection: S2AsyncConnection) -> None: ... diff --git a/src/s2python/connection/async_/medium/__init__.py b/src/s2python/connection/async_/medium/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/s2python/connection/async_/medium/s2_medium.py b/src/s2python/connection/async_/medium/s2_medium.py new file mode 100644 index 0000000..08a7ea7 --- /dev/null +++ b/src/s2python/connection/async_/medium/s2_medium.py @@ -0,0 +1,72 @@ +import abc +import asyncio +from asyncio import AbstractEventLoop +import typing +from typing import AsyncGenerator, Union +from typing_extensions import override + +UnparsedMediumData = Union[str, bytes] + + +class S2MediumException(Exception): ... + + +class MediumClosedConnectionError(S2MediumException): ... + + +class MediumCouldNotConnectError(S2MediumException): ... + + +class S2AsyncMediumConnection(abc.ABC): + @abc.abstractmethod + async def is_connected(self) -> bool: ... + + @abc.abstractmethod + def messages(self) -> AsyncGenerator[UnparsedMediumData, None]: ... + + @abc.abstractmethod + async def send(self, message: str) -> None: ... + + +class S2SyncMediumConnection(abc.ABC): + @abc.abstractmethod + def is_connected(self) -> bool: ... + + @abc.abstractmethod + def messages(self) -> typing.Generator[UnparsedMediumData, None, None]: ... + + @abc.abstractmethod + def send(self, message: str) -> None: ... + + +S2MediumConnection = Union[S2AsyncMediumConnection, S2SyncMediumConnection] + + +class S2SyncToAsyncMediumConnection(S2AsyncMediumConnection): + _sync_medium: S2SyncMediumConnection + _eventloop: AbstractEventLoop + + def __init__( + self, + sync_medium: S2SyncMediumConnection, + eventloop: typing.Optional[AbstractEventLoop] = None, + ) -> None: + self._sync_medium = sync_medium + self._eventloop = asyncio.get_event_loop() if eventloop is None else eventloop + + @override + async def is_connected(self) -> bool: + return await self._eventloop.run_in_executor(None, self._sync_medium.is_connected) + + @override + async def messages( # pylint: disable=invalid-overridden-method + self, + ) -> AsyncGenerator[UnparsedMediumData, None]: + generator = await self._eventloop.run_in_executor(None, self._sync_medium.messages) + + while True: + yield await self._eventloop.run_in_executor(None, generator.__next__) + + @override + async def send(self, message: str) -> None: + await self._eventloop.run_in_executor(None, self._sync_medium.send, message) diff --git a/src/s2python/connection/async_/medium/websocket.py b/src/s2python/connection/async_/medium/websocket.py new file mode 100644 index 0000000..88bdd94 --- /dev/null +++ b/src/s2python/connection/async_/medium/websocket.py @@ -0,0 +1,95 @@ +import logging +import ssl +from typing import AsyncGenerator, Optional, Dict, Any +from typing_extensions import override +from websockets import Data + +from s2python.connection.async_.medium.s2_medium import ( + MediumClosedConnectionError, + MediumCouldNotConnectError, + S2AsyncMediumConnection, + UnparsedMediumData, +) + +try: + import websockets + from websockets.asyncio.client import ( + ClientConnection as WSConnection, + connect as ws_connect, + ) +except ImportError as exc: + raise ImportError( + "The 'websockets' package is required. Run 'pip install s2-python[ws]' to use this feature." + ) from exc + +logger = logging.getLogger("s2python") + + +class WebsocketClientMedium(S2AsyncMediumConnection): + url: str + + _ws: Optional[WSConnection] + _verify_certificate: bool + _bearer_token: Optional[str] + _closed: bool + + def __init__( + self, url: str, verify_certificate: bool = True, bearer_token: Optional[str] = None + ) -> None: + self.url = url + + self._ws = None + self._verify_certificate = verify_certificate + self._bearer_token = bearer_token + self._closed = False + + async def connect(self) -> None: + try: + # set up connection arguments for SSL and bearer token, if required + connection_kwargs: Dict[str, Any] = {} + if self.url.startswith("wss://") and not self._verify_certificate: + connection_kwargs["ssl"] = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + connection_kwargs["ssl"].check_hostname = False + connection_kwargs["ssl"].verify_mode = ssl.CERT_NONE + + if self._bearer_token: + connection_kwargs["additional_headers"] = { + "Authorization": f"Bearer {self._bearer_token}" + } + + self._ws = await ws_connect(uri=self.url, **connection_kwargs) + except (EOFError, OSError, websockets.WebSocketException) as e: + self._closed = True + message = f"Could not connect due to: {e}" + logger.error(message) + raise MediumCouldNotConnectError(message) from e + + @override + async def is_connected(self) -> bool: + return self._ws is not None and not self._closed + + @override + async def messages( # pylint: disable=invalid-overridden-method + self, + ) -> AsyncGenerator[UnparsedMediumData, None]: + if self._ws is None: + raise RuntimeError("Websocket is not connected") + try: + message: Data + async for message in self._ws: + yield message + except websockets.WebSocketException as e: + self._closed = True + raise MediumClosedConnectionError( + f"Could not receive more messages on websocket connection {self.url}" + ) from e + + @override + async def send(self, message: str) -> None: + if self._ws is None: + raise RuntimeError("Websocket is not connected") + try: + await self._ws.send(message) + except websockets.WebSocketException as e: + self._closed = True + raise MediumClosedConnectionError(f"Could not send message {message}") from e diff --git a/src/s2python/connection/async_/message_handlers.py b/src/s2python/connection/async_/message_handlers.py new file mode 100644 index 0000000..f904a9d --- /dev/null +++ b/src/s2python/connection/async_/message_handlers.py @@ -0,0 +1,129 @@ +import asyncio +import logging +import uuid +from typing import Any, Coroutine, Optional, Type, Dict, Callable, TYPE_CHECKING, cast + +from s2python.common import ReceptionStatusValues +from s2python.connection.types import S2ConnectionEventsAndMessages +from s2python.message import S2Message, S2MessageWithID +from s2python.connection.errors import PermanentConnectionError + +if TYPE_CHECKING: + from s2python.connection.async_.connection import S2AsyncConnection + + +logger = logging.getLogger("s2python") + +S2EventHandlerAsync = Callable[ + [ + "S2AsyncConnection", + S2ConnectionEventsAndMessages, + Optional[Callable[[], Coroutine[Any, Any, None]]], + ], + Coroutine[Any, Any, None], +] + + +class SendOkay: + _status_is_send: asyncio.Event + _connection: "S2AsyncConnection" + _subject_message_id: uuid.UUID + + def __init__(self, connection: "S2AsyncConnection", subject_message_id: uuid.UUID): + self._status_is_send = asyncio.Event() + self._connection = connection + self._subject_message_id = subject_message_id + + async def run(self) -> None: + """Send the ReceptionStatus OK asynchronously and register it.""" + self._status_is_send.set() + + await self._connection.respond_with_reception_status( # pylint: disable=protected-access + subject_message_id=self._subject_message_id, + status=ReceptionStatusValues.OK, + diagnostic_label="Processed okay.", + ) + + async def ensure_send(self, type_msg: Type[S2Message]) -> None: + """Ensure that the ReceptionStatus has been send. + + Send the ReceptionStatus OK if it hasn't been send yet. + + :param type_msg: The type of S2 message for which the ReceptionStatus should have been send. Logging purposes. + """ + if not self._status_is_send.is_set(): + logger.warning( + "Handler for message %s %s did not call send_okay / function to send the ReceptionStatus. " + "Sending it now.", + type_msg, + self._subject_message_id, + ) + await self.run() + + +class MessageHandlers: + handlers: Dict[Type[S2ConnectionEventsAndMessages], S2EventHandlerAsync] + + def __init__(self) -> None: + self.handlers = {} + + async def handle_event( + self, connection: "S2AsyncConnection", event: S2ConnectionEventsAndMessages + ) -> None: + """Handle the S2 message using the registered handler. + + :param connection: The S2 conncetion the `msg` is received from. + :param msg: The S2 message + """ + handler = self.handlers.get(type(event)) + if handler is not None: + if hasattr(event, "message_id"): + msg_event = cast(S2MessageWithID, event) + logger.debug( + "Handling S2 message with message id %s using handler %s", + msg_event.message_id, + handler, + ) + send_okay = SendOkay(connection, msg_event.message_id) + try: + await handler(connection, event, send_okay.run) + except PermanentConnectionError: + logger.error( + "While processing message %s a permanent connection error occurred. Stopping the connection." + ) + raise + except Exception: + if not send_okay._status_is_send.is_set(): # pylint: disable=protected-access + await connection.respond_with_reception_status( + subject_message_id=msg_event.message_id, + status=ReceptionStatusValues.PERMANENT_ERROR, + diagnostic_label=f"While processing message {msg_event.message_id} " + f"an unrecoverable error occurred.", + ) + raise + await send_okay.ensure_send(type(msg_event)) + else: + logger.debug( + "Handling S2 connection event (without message id) using handler %s", + handler, + ) + await handler(connection, event, None) + else: + logger.warning( + "Received an event of type %s but no handler is registered. Ignoring the event.", + type(event), + ) + + def register_handler( + self, event_type: Type[S2ConnectionEventsAndMessages], handler: S2EventHandlerAsync + ) -> None: + """Register a coroutine function or a normal function as the handler for a specific S2 message type. + + :param msg_type: The S2 message type to attach the handler to. + :param handler: The function (asynchronuous or normal) which should handle the S2 message. + """ + self.handlers[event_type] = handler + + def unregister_handler(self, s2_message_type: Type[S2ConnectionEventsAndMessages]) -> None: + if s2_message_type in self.handlers: + del self.handlers[s2_message_type] diff --git a/src/s2python/connection/connection_events.py b/src/s2python/connection/connection_events.py new file mode 100644 index 0000000..9f65d40 --- /dev/null +++ b/src/s2python/connection/connection_events.py @@ -0,0 +1,12 @@ +import abc + + +class S2ConnectionEvent(abc.ABC): + pass + + +class ConnectionStarted(S2ConnectionEvent): + pass + +class ConnectionStopped(S2ConnectionEvent): + pass diff --git a/src/s2python/connection/control_type.py b/src/s2python/connection/control_type.py new file mode 100644 index 0000000..09f6252 --- /dev/null +++ b/src/s2python/connection/control_type.py @@ -0,0 +1,11 @@ +import abc + +from s2python.common import EnergyManagementRole +from s2python.connection.async_.connection import S2AsyncConnection + +class RoleHandler(abc.ABC): + @abc.abstractmethod + def get_s2_role(self) -> EnergyManagementRole: ... + + @abc.abstractmethod + def register_handlers(self, connection: S2AsyncConnection) -> None: ... diff --git a/src/s2python/connection/errors.py b/src/s2python/connection/errors.py new file mode 100644 index 0000000..8389a76 --- /dev/null +++ b/src/s2python/connection/errors.py @@ -0,0 +1,5 @@ +class CouldNotReceiveStatusReceptionError(Exception): + ... + +class PermanentConnectionError(Exception): + ... diff --git a/src/s2python/connection/quickstarts.py b/src/s2python/connection/quickstarts.py new file mode 100644 index 0000000..3f85d21 --- /dev/null +++ b/src/s2python/connection/quickstarts.py @@ -0,0 +1,68 @@ +import asyncio +import logging +import threading + +from s2python.connection.asset_details import AssetDetails +from s2python.connection.async_ import WebsocketClientMedium +from s2python.connection.sync import S2SyncConnection +from s2python.connection.sync.control_type.class_based import ResourceManagerHandler, S2ControlType + +logger = logging.getLogger("s2python") + + +class BlockingWebsocketClientRM: + _thread: threading.Thread + _eventloop: asyncio.AbstractEventLoop + _control_types: list[S2ControlType] + _s2_connection: S2SyncConnection + + url: str + asset_details: AssetDetails + + def __init__( + self, asset_details: AssetDetails, url: str, control_types: list[S2ControlType] + ) -> None: + self.url = url + self.asset_details = asset_details + self._thread = threading.Thread(target=self._run) + self._control_types = control_types + + def _run(self) -> None: + self._eventloop = asyncio.new_event_loop() + + rm_handler = ResourceManagerHandler( + asset_details=self.asset_details, control_types=self._control_types + ) + + ws_medium = WebsocketClientMedium(url=self.url, verify_certificate=False) + self._eventloop.run_until_complete(ws_medium.connect()) + + # Configure the S2 connection on top of the websocket connection + self._s2_connection = S2SyncConnection(medium=ws_medium, eventloop=self._eventloop) + rm_handler.register_handlers(self._s2_connection) + logger.debug( + "Starting synchronous S2 connection event loop in thread %s", self._thread.name + ) + self._s2_connection.run() + logger.debug( + "Synchronous S2 connection event loop in thread %s has stopped", self._thread.name + ) + + def start(self) -> None: + self._thread.start() + + def wait_till_done(self) -> None: + self._thread.join() + + def stop(self) -> None: + """Stops the S2 connection. + + Note: Ensure this method is called from a different thread than the thread running the S2 connection. + Otherwise it will block waiting on the coroutine _do_stop to terminate successfully but it can't run + the coroutine. A `RuntimeError` will be raised to prevent the indefinite block. + """ + logger.info("Stopping the S2 connection...") + self._s2_connection.stop() + self._eventloop.stop() + self.wait_till_done() + logger.info("Stopped the S2 connection.") diff --git a/src/s2python/connection/sync/__init__.py b/src/s2python/connection/sync/__init__.py new file mode 100644 index 0000000..9e4f98b --- /dev/null +++ b/src/s2python/connection/sync/__init__.py @@ -0,0 +1,7 @@ +from s2python.connection.sync.connection import S2SyncConnection +from s2python.connection.sync.connection import S2EventHandlerSync + +__all__ = [ + "S2SyncConnection", + "S2EventHandlerSync" +] diff --git a/src/s2python/connection/sync/connection.py b/src/s2python/connection/sync/connection.py new file mode 100644 index 0000000..050a229 --- /dev/null +++ b/src/s2python/connection/sync/connection.py @@ -0,0 +1,113 @@ +import asyncio +import logging +import uuid +from typing import Optional, Type, Callable + +from s2python.common import ( + ReceptionStatusValues, +) +from s2python.connection.types import ( + S2ConnectionEventsAndMessages, + SendOkayRunAsync, + SendOkayRunSync, +) +from s2python.message import S2Message + +from s2python.common import ReceptionStatus +from s2python.connection.async_.medium.s2_medium import S2MediumConnection +from s2python.connection.async_ import S2AsyncConnection +from s2python.message import S2MessageWithID + +logger = logging.getLogger("s2python") + +S2EventHandlerSync = Callable[ + ["S2SyncConnection", S2ConnectionEventsAndMessages, SendOkayRunSync], None +] + + +class S2SyncConnection: + _eventloop: asyncio.AbstractEventLoop + _async_s2_connection: S2AsyncConnection + + def __init__( + self, + medium: S2MediumConnection, + eventloop: Optional[asyncio.AbstractEventLoop] = None, + ) -> None: + self._eventloop = asyncio.new_event_loop() if eventloop is None else eventloop + self._async_s2_connection = self._eventloop.run_until_complete( + S2SyncConnection._create_async_s2_connection(medium, self._eventloop) + ) + + @staticmethod + async def _create_async_s2_connection( + medium: S2MediumConnection, eventloop: asyncio.AbstractEventLoop + ) -> S2AsyncConnection: + return S2AsyncConnection(medium, eventloop) + + def run(self) -> None: + self._eventloop.run_until_complete(self._async_s2_connection.run()) + + def stop(self) -> None: + """Gracefully stops the S2 connection.""" + asyncio.run_coroutine_threadsafe(self._async_s2_connection.stop(), self._eventloop).result() + + def register_handler( + self, s2_message_type: Type[S2ConnectionEventsAndMessages], handler: S2EventHandlerSync + ) -> None: + """Register a handler for a specific S2 message type. + + :param s2_message_type: The S2 message type to register the handler for. + :param handler: The handler function (asynchronous or normal) which will handle the message. + """ + + async def handle_s2_message_async_wrapper( + _: S2AsyncConnection, + s2_msg: S2ConnectionEventsAndMessages, + send_okay: SendOkayRunAsync, + ) -> None: + await self._eventloop.run_in_executor( + None, + handler, + self, + s2_msg, + lambda: ( + asyncio.run_coroutine_threadsafe(send_okay(), self._eventloop).result() + if send_okay + else None + ), + ) + + self._async_s2_connection.register_handler(s2_message_type, handle_s2_message_async_wrapper) + + def unregister_handler(self, s2_message_type: Type[S2MessageWithID]) -> None: + self._async_s2_connection.unregister_handler(s2_message_type) + + def send_and_forget(self, s2_msg: S2Message) -> None: + asyncio.run_coroutine_threadsafe( + self._async_s2_connection.send_and_forget(s2_msg), + self._eventloop, + ).result() + + def respond_with_reception_status( + self, subject_message_id: uuid.UUID, status: ReceptionStatusValues, diagnostic_label: str + ) -> None: + asyncio.run_coroutine_threadsafe( + self._async_s2_connection.respond_with_reception_status( + subject_message_id, status, diagnostic_label + ), + self._eventloop, + ).result() + + def send_msg_and_await_reception_status( + self, + s2_msg: S2MessageWithID, + timeout_reception_status: float = 5.0, + raise_on_error: bool = True, + ) -> ReceptionStatus: + return asyncio.run_coroutine_threadsafe( + self._async_s2_connection.send_msg_and_await_reception_status( + s2_msg, timeout_reception_status, raise_on_error + ), + self._eventloop, + ).result() diff --git a/src/s2python/connection/sync/control_type/__init__.py b/src/s2python/connection/sync/control_type/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/s2python/connection/sync/control_type/class_based.py b/src/s2python/connection/sync/control_type/class_based.py new file mode 100644 index 0000000..2d38338 --- /dev/null +++ b/src/s2python/connection/sync/control_type/class_based.py @@ -0,0 +1,263 @@ +import abc +import logging +import uuid +from typing import Optional, List + +from s2python.connection.asset_details import AssetDetails +from s2python.common import ( + Handshake, + EnergyManagementRole, + HandshakeResponse, + SelectControlType, +) +from s2python.connection.sync.connection import S2SyncConnection +from s2python.connection.types import S2ConnectionEventsAndMessages, SendOkayRunSync +from s2python.version import S2_VERSION + +from s2python.connection.connection_events import ConnectionStarted, ConnectionStopped + +from s2python.common import ControlType as ProtocolControlType +from s2python.frbc import FRBCInstruction +from s2python.ppbc import PPBCScheduleInstruction +from s2python.ombc import OMBCInstruction + +logger = logging.getLogger("s2python") + + +class S2ControlType(abc.ABC): + @abc.abstractmethod + def get_protocol_control_type(self) -> ProtocolControlType: ... + + @abc.abstractmethod + def register_handlers(self, connection: S2SyncConnection) -> None: ... + + @abc.abstractmethod + def activate(self, connection: S2SyncConnection) -> None: ... + + @abc.abstractmethod + def deactivate(self, connection: S2SyncConnection) -> None: ... + + +class ResourceManagerHandler: + asset_details: AssetDetails + _current_control_type: Optional[S2ControlType] + _control_types: List[S2ControlType] + + def __init__(self, control_types: List[S2ControlType], asset_details: AssetDetails) -> None: + self.asset_details = asset_details + self._current_control_type = None + self._control_types = control_types + + def get_s2_role(self) -> EnergyManagementRole: + return EnergyManagementRole.RM + + def register_handlers(self, connection: S2SyncConnection) -> None: + connection.register_handler(ConnectionStarted, self._on_connection_started) + connection.register_handler(Handshake, self._on_handshake) + connection.register_handler(HandshakeResponse, self._on_handshake_response) + connection.register_handler(SelectControlType, self._on_select_control_type) + connection.register_handler(ConnectionStopped, self._on_connection_stop) + + def _on_connection_started( + self, + connection: S2SyncConnection, + _: S2ConnectionEventsAndMessages, + __: SendOkayRunSync, + ) -> None: + connection.send_msg_and_await_reception_status( + Handshake( + message_id=uuid.uuid4(), + role=self.get_s2_role(), + supported_protocol_versions=[S2_VERSION], + ) + ) + logger.debug("Send handshake to CEM. Expecting Handshake and HandshakeResponse from CEM.") + + def _on_handshake( + self, + _: S2SyncConnection, + event: S2ConnectionEventsAndMessages, + send_okay: SendOkayRunSync, + ) -> None: + assert send_okay is not None + if not isinstance(event, Handshake): + logger.error( + "Handler for Handshake received a message of the wrong type: %s", + type(event), + ) + return + + logger.debug( + "%s supports S2 protocol versions: %s", + event.role, + event.supported_protocol_versions, + ) + send_okay() + + def _on_handshake_response( + self, + connection: S2SyncConnection, + event: S2ConnectionEventsAndMessages, + send_okay: SendOkayRunSync, + ) -> None: + assert send_okay is not None + if not isinstance(event, HandshakeResponse): + logger.error( + "Handler for HandshakeResponse received a message of the wrong type: %s", + type(event), + ) + return + + logger.debug("Received HandshakeResponse %s", event.to_json()) + logger.debug("CEM selected to use version %s", event.selected_protocol_version) + send_okay() + logger.debug("Handshake complete. Sending first ResourceManagerDetails.") + + connection.send_msg_and_await_reception_status( + self.asset_details.to_resource_manager_details(self._control_types) + ) + + def _on_select_control_type( + self, + connection: S2SyncConnection, + event: S2ConnectionEventsAndMessages, + send_okay: SendOkayRunSync, + ) -> None: + assert send_okay is not None + if not isinstance(event, SelectControlType): + logger.error( + "Handler for SelectControlType received a message of the wrong type: %s", + type(event), + ) + return + + send_okay() + + logger.debug( + "CEM selected control type %s. Activating control type.", + event.control_type, + ) + + control_types_by_protocol_name = { + c.get_protocol_control_type(): c for c in self._control_types + } + selected_control_type = control_types_by_protocol_name.get(event.control_type) + + if self._current_control_type is not None: + self._current_control_type.deactivate(connection) + + self._current_control_type = selected_control_type + + if self._current_control_type is not None: + self._current_control_type.register_handlers(connection) + self._current_control_type.activate(connection) + + def _on_connection_stop( + self, + connection: S2SyncConnection, + __: S2ConnectionEventsAndMessages, + ___: SendOkayRunSync, + ) -> None: + if self._current_control_type: + self._current_control_type.deactivate(connection) + self._current_control_type = None + + +class FRBCControlType(S2ControlType): + def get_protocol_control_type(self) -> ProtocolControlType: + return ProtocolControlType.FILL_RATE_BASED_CONTROL + + def register_handlers(self, connection: S2SyncConnection) -> None: + connection.register_handler(FRBCInstruction, self.handle_instruction) + + @abc.abstractmethod + def handle_instruction( + self, + connection: S2SyncConnection, + msg: S2ConnectionEventsAndMessages, + send_okay: SendOkayRunSync, + ) -> None: ... + + @abc.abstractmethod + def activate(self, connection: S2SyncConnection) -> None: + """Overwrite with the actual dctivation logic of your Resource Manager for this particular control type.""" + + @abc.abstractmethod + def deactivate(self, connection: S2SyncConnection) -> None: + """Overwrite with the actual deactivation logic of your Resource Manager for this particular control type.""" + + +class PPBCControlType(S2ControlType): + def get_protocol_control_type(self) -> ProtocolControlType: + return ProtocolControlType.POWER_PROFILE_BASED_CONTROL + + def register_handlers(self, connection: S2SyncConnection) -> None: + connection.register_handler(PPBCScheduleInstruction, self.handle_instruction) + + @abc.abstractmethod + def handle_instruction( + self, + connection: S2SyncConnection, + msg: S2ConnectionEventsAndMessages, + send_okay: SendOkayRunSync, + ) -> None: ... + + @abc.abstractmethod + def activate(self, connection: S2SyncConnection) -> None: + """Overwrite with the actual dctivation logic of your Resource Manager for this particular control type.""" + + @abc.abstractmethod + def deactivate(self, connection: S2SyncConnection) -> None: + """Overwrite with the actual deactivation logic of your Resource Manager for this particular control type.""" + + +class OMBCControlType(S2ControlType): + def get_protocol_control_type(self) -> ProtocolControlType: + return ProtocolControlType.OPERATION_MODE_BASED_CONTROL + + def register_handlers(self, connection: S2SyncConnection) -> None: + connection.register_handler(OMBCInstruction, self.handle_instruction) + + @abc.abstractmethod + def handle_instruction( + self, + connection: S2SyncConnection, + msg: S2ConnectionEventsAndMessages, + send_okay: SendOkayRunSync, + ) -> None: ... + + @abc.abstractmethod + def activate(self, connection: S2SyncConnection) -> None: + """Overwrite with the actual dctivation logic of your Resource Manager for this particular control type.""" + + @abc.abstractmethod + def deactivate(self, connection: S2SyncConnection) -> None: + """Overwrite with the actual deactivation logic of your Resource Manager for this particular control type.""" + + +class PEBCControlType(S2ControlType): + def get_protocol_control_type(self) -> ProtocolControlType: + return ProtocolControlType.POWER_ENVELOPE_BASED_CONTROL + + def register_handlers(self, connection: S2SyncConnection) -> None: + pass + + @abc.abstractmethod + def activate(self, connection: S2SyncConnection) -> None: ... + + @abc.abstractmethod + def deactivate(self, connection: S2SyncConnection) -> None: ... + + +class NoControlControlType(S2ControlType): + def get_protocol_control_type(self) -> ProtocolControlType: + return ProtocolControlType.NOT_CONTROLABLE + + def register_handlers(self, connection: S2SyncConnection) -> None: + pass + + @abc.abstractmethod + def activate(self, connection: S2SyncConnection) -> None: ... + + @abc.abstractmethod + def deactivate(self, connection: S2SyncConnection) -> None: ... diff --git a/src/s2python/connection/types.py b/src/s2python/connection/types.py new file mode 100644 index 0000000..9e10cbc --- /dev/null +++ b/src/s2python/connection/types.py @@ -0,0 +1,9 @@ +from typing import Union, Optional, Callable, Coroutine, Any + +from s2python.connection.connection_events import S2ConnectionEvent +from s2python.message import S2MessageWithID + + +S2ConnectionEventsAndMessages = Union[S2MessageWithID, S2ConnectionEvent] +SendOkayRunAsync = Optional[Callable[[], Coroutine[Any, Any, None]]] +SendOkayRunSync = Optional[Callable[[], None]] diff --git a/src/s2python/s2_connection.py b/src/s2python/s2_connection.py deleted file mode 100644 index 1c1e7b6..0000000 --- a/src/s2python/s2_connection.py +++ /dev/null @@ -1,442 +0,0 @@ -__all__ = [ - "AssetDetails", - "S2MessageHandler", - "SendOkay", - "MessageHandlers", - "S2Connection" -] # re-export for backward compatibility - -try: - import websockets -except ImportError as exc: - raise ImportError( - "The 'websockets' package is required. Run 'pip install s2-python[ws]' to use this feature." - ) from exc - -import asyncio -import json -import logging -import time -import threading -import uuid -import ssl -from typing import Any, Optional, List, Dict, Awaitable - -from websockets.asyncio.client import ( - ClientConnection as WSConnection, - connect as ws_connect, -) - -from s2python.common import ( - ReceptionStatusValues, - ReceptionStatus, - Handshake, - EnergyManagementRole, - HandshakeResponse, - SelectControlType, -) -from s2python.reception_status_awaiter import ReceptionStatusAwaiter -from s2python.s2_control_type import S2ControlType -from s2python.s2_parser import S2Parser -from s2python.s2_validation_error import S2ValidationError -from s2python.s2_asset_details import AssetDetails -from s2python.s2_message_handlers import S2MessageHandler, SendOkay, MessageHandlers -from s2python.message import S2Message -from s2python.version import S2_VERSION - -logger = logging.getLogger("s2python") - - -class S2Connection: # pylint: disable=too-many-instance-attributes - url: str - reconnect: bool - reception_status_awaiter: ReceptionStatusAwaiter - ws: Optional[WSConnection] - s2_parser: S2Parser - control_types: List[S2ControlType] - role: EnergyManagementRole - asset_details: AssetDetails - - _thread: threading.Thread - - _handlers: MessageHandlers - _current_control_type: Optional[S2ControlType] - _received_messages: asyncio.Queue - - _eventloop: asyncio.AbstractEventLoop - _stop_event: asyncio.Event - _restart_connection_event: asyncio.Event - _verify_certificate: bool - _bearer_token: Optional[str] - - def __init__( # pylint: disable=too-many-arguments - self, - url: str, - role: EnergyManagementRole, - control_types: List[S2ControlType], - asset_details: AssetDetails, - reconnect: bool = False, - verify_certificate: bool = True, - bearer_token: Optional[str] = None, - ) -> None: - self.url = url - self.reconnect = reconnect - self.reception_status_awaiter = ReceptionStatusAwaiter() - self.ws = None - self.s2_parser = S2Parser() - - self._handlers = MessageHandlers() - self._current_control_type = None - - self._eventloop = asyncio.new_event_loop() - - self.control_types = control_types - self.role = role - self.asset_details = asset_details - self._verify_certificate = verify_certificate - - self._handlers.register_handler( - SelectControlType, self._handle_select_control_type_as_rm - ) - self._handlers.register_handler(Handshake, self._handle_handshake) - self._handlers.register_handler(HandshakeResponse, self._handle_handshake_response_as_rm) - self._bearer_token = bearer_token - - def start_as_rm(self) -> None: - self._run_eventloop(self._run_as_rm()) - - def _run_eventloop(self, main_task: Awaitable[None]) -> None: - self._thread = threading.current_thread() - logger.debug("Starting eventloop") - try: - self._eventloop.run_until_complete(main_task) - except asyncio.CancelledError: - pass - logger.debug("S2 connection thread has stopped.") - - def stop(self) -> None: - """Stops the S2 connection. - - Note: Ensure this method is called from a different thread than the thread running the S2 connection. - Otherwise it will block waiting on the coroutine _do_stop to terminate successfully but it can't run - the coroutine. A `RuntimeError` will be raised to prevent the indefinite block. - """ - if threading.current_thread() == self._thread: - raise RuntimeError( - "Do not call stop from the thread running the S2 connection. This results in an infinite block!" - ) - if self._eventloop.is_running(): - asyncio.run_coroutine_threadsafe(self._do_stop(), self._eventloop).result() - self._thread.join() - logger.info("Stopped the S2 connection.") - - async def _do_stop(self) -> None: - logger.info("Will stop the S2 connection.") - self._stop_event.set() - - async def _run_as_rm(self) -> None: - logger.debug("Connecting as S2 resource manager.") - - self._stop_event = asyncio.Event() - - first_run = True - - while (first_run or self.reconnect) and not self._stop_event.is_set(): - first_run = False - self._restart_connection_event = asyncio.Event() - await self._connect_and_run() - time.sleep(1) - - logger.debug("Finished S2 connection eventloop.") - - async def _connect_and_run(self) -> None: - self._received_messages = asyncio.Queue() - await self._connect_ws() - if self.ws: - - async def wait_till_stop() -> None: - await self._stop_event.wait() - - async def wait_till_connection_restart() -> None: - await self._restart_connection_event.wait() - - background_tasks = [ - self._eventloop.create_task(self._receive_messages()), - self._eventloop.create_task(wait_till_stop()), - self._eventloop.create_task(self._connect_as_rm()), - self._eventloop.create_task(wait_till_connection_restart()), - ] - - (done, pending) = await asyncio.wait( - background_tasks, return_when=asyncio.FIRST_COMPLETED - ) - if self._current_control_type: - self._current_control_type.deactivate(self) - self._current_control_type = None - - for task in done: - try: - await task - except asyncio.CancelledError: - pass - except ( - websockets.ConnectionClosedError, - websockets.ConnectionClosedOK, - ): - logger.info("The other party closed the websocket connection.") - - for task in pending: - try: - task.cancel() - await task - except asyncio.CancelledError: - pass - - await self.ws.close() - await self.ws.wait_closed() - - async def _connect_ws(self) -> None: - try: - # set up connection arguments for SSL and bearer token, if required - connection_kwargs: Dict[str, Any] = {} - if self.url.startswith("wss://") and not self._verify_certificate: - connection_kwargs["ssl"] = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) - connection_kwargs["ssl"].check_hostname = False - connection_kwargs["ssl"].verify_mode = ssl.CERT_NONE - - if self._bearer_token: - connection_kwargs["additional_headers"] = { - "Authorization": f"Bearer {self._bearer_token}" - } - - self.ws = await ws_connect(uri=self.url, **connection_kwargs) - except (EOFError, OSError) as e: - logger.info("Could not connect due to: %s", str(e)) - - async def _connect_as_rm(self) -> None: - await self._send_msg_and_await_reception_status_async( - Handshake( - message_id=uuid.uuid4(), - role=self.role, - supported_protocol_versions=[S2_VERSION], - ) - ) - logger.debug( - "Send handshake to CEM. Expecting Handshake and HandshakeResponse from CEM." - ) - - await self._handle_received_messages() - - async def _handle_handshake( - self, _: "S2Connection", message: S2Message, send_okay: Awaitable[None] - ) -> None: - if not isinstance(message, Handshake): - logger.error( - "Handler for Handshake received a message of the wrong type: %s", - type(message), - ) - return - - logger.debug( - "%s supports S2 protocol versions: %s", - message.role, - message.supported_protocol_versions, - ) - await send_okay - - async def _handle_handshake_response_as_rm( - self, _: "S2Connection", message: S2Message, send_okay: Awaitable[None] - ) -> None: - if not isinstance(message, HandshakeResponse): - logger.error( - "Handler for HandshakeResponse received a message of the wrong type: %s", - type(message), - ) - return - - logger.debug("Received HandshakeResponse %s", message.to_json()) - - logger.debug( - "CEM selected to use version %s", message.selected_protocol_version - ) - await send_okay - logger.debug("Handshake complete. Sending first ResourceManagerDetails.") - - await self._send_msg_and_await_reception_status_async( - self.asset_details.to_resource_manager_details(self.control_types) - ) - - async def _handle_select_control_type_as_rm( - self, _: "S2Connection", message: S2Message, send_okay: Awaitable[None] - ) -> None: - if not isinstance(message, SelectControlType): - logger.error( - "Handler for SelectControlType received a message of the wrong type: %s", - type(message), - ) - return - - await send_okay - - logger.debug( - "CEM selected control type %s. Activating control type.", - message.control_type, - ) - - control_types_by_protocol_name = { - c.get_protocol_control_type(): c for c in self.control_types - } - selected_control_type: Optional[S2ControlType] = ( - control_types_by_protocol_name.get(message.control_type) - ) - - if self._current_control_type is not None: - await self._eventloop.run_in_executor( - None, self._current_control_type.deactivate, self - ) - - self._current_control_type = selected_control_type - - if self._current_control_type is not None: - await self._eventloop.run_in_executor( - None, self._current_control_type.activate, self - ) - self._current_control_type.register_handlers(self._handlers) - - async def _receive_messages(self) -> None: - """Receives all incoming messages in the form of a generator. - - Will also receive the ReceptionStatus messages but instead of yielding these messages, they are routed - to any calls of `send_msg_and_await_reception_status`. - """ - if self.ws is None: - raise RuntimeError( - "Cannot receive messages if websocket connection is not yet established." - ) - - logger.info("S2 connection has started to receive messages.") - - async for message in self.ws: - try: - s2_msg: S2Message = self.s2_parser.parse_as_any_message(message) - except json.JSONDecodeError: - await self._send_and_forget( - ReceptionStatus( - subject_message_id=uuid.UUID("00000000-0000-0000-0000-000000000000"), - status=ReceptionStatusValues.INVALID_DATA, - diagnostic_label="Not valid json.", - ) - ) - except S2ValidationError as e: - json_msg = json.loads(message) - message_id = json_msg.get("message_id") - if message_id: - await self._respond_with_reception_status( - subject_message_id=message_id, - status=ReceptionStatusValues.INVALID_MESSAGE, - diagnostic_label=str(e), - ) - else: - await self._respond_with_reception_status( - subject_message_id=uuid.UUID("00000000-0000-0000-0000-000000000000"), - status=ReceptionStatusValues.INVALID_DATA, - diagnostic_label="Message appears valid json but could not find a message_id field.", - ) - else: - logger.debug("Received message %s", s2_msg.to_json()) - - if isinstance(s2_msg, ReceptionStatus): - logger.debug( - "Message is a reception status for %s so registering in cache.", - s2_msg.subject_message_id, - ) - await self.reception_status_awaiter.receive_reception_status(s2_msg) - else: - await self._received_messages.put(s2_msg) - - async def _send_and_forget(self, s2_msg: S2Message) -> None: - if self.ws is None: - raise RuntimeError( - "Cannot send messages if websocket connection is not yet established." - ) - - json_msg = s2_msg.to_json() - logger.debug("Sending message %s", json_msg) - try: - await self.ws.send(json_msg) - except websockets.ConnectionClosedError as e: - logger.error("Unable to send message %s due to %s", s2_msg, str(e)) - self._restart_connection_event.set() - - async def _respond_with_reception_status( - self, subject_message_id: uuid.UUID, status: ReceptionStatusValues, diagnostic_label: str - ) -> None: - logger.debug( - "Responding to message %s with status %s", subject_message_id, status - ) - await self._send_and_forget( - ReceptionStatus( - subject_message_id=subject_message_id, - status=status, - diagnostic_label=diagnostic_label, - ) - ) - - def respond_with_reception_status_sync( - self, subject_message_id: uuid.UUID, status: ReceptionStatusValues, diagnostic_label: str - ) -> None: - asyncio.run_coroutine_threadsafe( - self._respond_with_reception_status( - subject_message_id, status, diagnostic_label - ), - self._eventloop, - ).result() - - async def _send_msg_and_await_reception_status_async( - self, - s2_msg: S2Message, - timeout_reception_status: float = 5.0, - raise_on_error: bool = True, - ) -> ReceptionStatus: - await self._send_and_forget(s2_msg) - logger.debug( - "Waiting for ReceptionStatus for %s %s seconds", - s2_msg.message_id, # type: ignore[attr-defined, union-attr] - timeout_reception_status, - ) - try: - reception_status = await self.reception_status_awaiter.wait_for_reception_status( - s2_msg.message_id, timeout_reception_status # type: ignore[attr-defined, union-attr] - ) - except TimeoutError: - logger.error( - "Did not receive a reception status on time for %s", - s2_msg.message_id, # type: ignore[attr-defined, union-attr] - ) - self._stop_event.set() - raise - - if reception_status.status != ReceptionStatusValues.OK and raise_on_error: - raise RuntimeError( - f"ReceptionStatus was not OK but rather {reception_status.status}" - ) - - return reception_status - - def send_msg_and_await_reception_status_sync( - self, - s2_msg: S2Message, - timeout_reception_status: float = 5.0, - raise_on_error: bool = True, - ) -> ReceptionStatus: - return asyncio.run_coroutine_threadsafe( - self._send_msg_and_await_reception_status_async( - s2_msg, timeout_reception_status, raise_on_error - ), - self._eventloop, - ).result() - - async def _handle_received_messages(self) -> None: - while True: - msg = await self._received_messages.get() - await self._handlers.handle_message(self, msg) diff --git a/src/s2python/s2_control_type.py b/src/s2python/s2_control_type.py deleted file mode 100644 index 135f775..0000000 --- a/src/s2python/s2_control_type.py +++ /dev/null @@ -1,116 +0,0 @@ -import abc -import typing - -from s2python.common import ControlType as ProtocolControlType -from s2python.frbc import FRBCInstruction -from s2python.ppbc import PPBCScheduleInstruction -from s2python.ombc import OMBCInstruction -from s2python.message import S2Message - -if typing.TYPE_CHECKING: - from s2python.s2_connection import S2Connection, MessageHandlers - - -class S2ControlType(abc.ABC): - @abc.abstractmethod - def get_protocol_control_type(self) -> ProtocolControlType: ... - - @abc.abstractmethod - def register_handlers(self, handlers: "MessageHandlers") -> None: ... - - @abc.abstractmethod - def activate(self, conn: "S2Connection") -> None: ... - - @abc.abstractmethod - def deactivate(self, conn: "S2Connection") -> None: ... - - -class FRBCControlType(S2ControlType): - def get_protocol_control_type(self) -> ProtocolControlType: - return ProtocolControlType.FILL_RATE_BASED_CONTROL - - def register_handlers(self, handlers: "MessageHandlers") -> None: - handlers.register_handler(FRBCInstruction, self.handle_instruction) - - @abc.abstractmethod - def handle_instruction( - self, conn: "S2Connection", msg: S2Message, send_okay: typing.Callable[[], None] - ) -> None: ... - - @abc.abstractmethod - def activate(self, conn: "S2Connection") -> None: - """Overwrite with the actual dctivation logic of your Resource Manager for this particular control type.""" - - @abc.abstractmethod - def deactivate(self, conn: "S2Connection") -> None: - """Overwrite with the actual deactivation logic of your Resource Manager for this particular control type.""" - - -class PPBCControlType(S2ControlType): - def get_protocol_control_type(self) -> ProtocolControlType: - return ProtocolControlType.POWER_PROFILE_BASED_CONTROL - - def register_handlers(self, handlers: "MessageHandlers") -> None: - handlers.register_handler(PPBCScheduleInstruction, self.handle_instruction) - - @abc.abstractmethod - def handle_instruction( - self, conn: "S2Connection", msg: S2Message, send_okay: typing.Callable[[], None] - ) -> None: ... - - @abc.abstractmethod - def activate(self, conn: "S2Connection") -> None: - """Overwrite with the actual dctivation logic of your Resource Manager for this particular control type.""" - - @abc.abstractmethod - def deactivate(self, conn: "S2Connection") -> None: - """Overwrite with the actual deactivation logic of your Resource Manager for this particular control type.""" - - -class OMBCControlType(S2ControlType): - def get_protocol_control_type(self) -> ProtocolControlType: - return ProtocolControlType.OPERATION_MODE_BASED_CONTROL - - def register_handlers(self, handlers: "MessageHandlers") -> None: - handlers.register_handler(OMBCInstruction, self.handle_instruction) - - @abc.abstractmethod - def handle_instruction( - self, conn: "S2Connection", msg: S2Message, send_okay: typing.Callable[[], None] - ) -> None: ... - - @abc.abstractmethod - def activate(self, conn: "S2Connection") -> None: - """Overwrite with the actual dctivation logic of your Resource Manager for this particular control type.""" - - @abc.abstractmethod - def deactivate(self, conn: "S2Connection") -> None: - """Overwrite with the actual deactivation logic of your Resource Manager for this particular control type.""" - - -class PEBCControlType(S2ControlType): - def get_protocol_control_type(self) -> ProtocolControlType: - return ProtocolControlType.POWER_ENVELOPE_BASED_CONTROL - - def register_handlers(self, handlers: "MessageHandlers") -> None: - pass - - @abc.abstractmethod - def activate(self, conn: "S2Connection") -> None: ... - - @abc.abstractmethod - def deactivate(self, conn: "S2Connection") -> None: ... - - -class NoControlControlType(S2ControlType): - def get_protocol_control_type(self) -> ProtocolControlType: - return ProtocolControlType.NOT_CONTROLABLE - - def register_handlers(self, handlers: "MessageHandlers") -> None: - pass - - @abc.abstractmethod - def activate(self, conn: "S2Connection") -> None: ... - - @abc.abstractmethod - def deactivate(self, conn: "S2Connection") -> None: ... diff --git a/src/s2python/s2_message_handlers.py b/src/s2python/s2_message_handlers.py deleted file mode 100644 index 4b62544..0000000 --- a/src/s2python/s2_message_handlers.py +++ /dev/null @@ -1,122 +0,0 @@ -import asyncio -import logging -import threading -import uuid -from typing import Type, Dict, Callable, Awaitable, Union, TYPE_CHECKING - -from s2python.common import ReceptionStatusValues -from s2python.message import S2Message - -if TYPE_CHECKING: - from s2python.s2_connection import S2Connection - -logger = logging.getLogger("s2python") - - -S2MessageHandler = Union[ - Callable[["S2Connection", S2Message, Callable[[], None]], None], - Callable[["S2Connection", S2Message, Awaitable[None]], Awaitable[None]], -] - - -class SendOkay: - status_is_send: threading.Event - connection: "S2Connection" - subject_message_id: uuid.UUID - - def __init__(self, connection: "S2Connection", subject_message_id: uuid.UUID): - self.status_is_send = threading.Event() - self.connection = connection - self.subject_message_id = subject_message_id - - async def run_async(self) -> None: - self.status_is_send.set() - - await self.connection._respond_with_reception_status( # pylint: disable=protected-access - subject_message_id=self.subject_message_id, - status=ReceptionStatusValues.OK, - diagnostic_label="Processed okay.", - ) - - def run_sync(self) -> None: - self.status_is_send.set() - - self.connection.respond_with_reception_status_sync( - subject_message_id=self.subject_message_id, - status=ReceptionStatusValues.OK, - diagnostic_label="Processed okay.", - ) - - async def ensure_send_async(self, type_msg: Type[S2Message]) -> None: - if not self.status_is_send.is_set(): - logger.warning( - "Handler for message %s %s did not call send_okay / function to send the ReceptionStatus. " - "Sending it now.", - type_msg, - self.subject_message_id, - ) - await self.run_async() - - def ensure_send_sync(self, type_msg: Type[S2Message]) -> None: - if not self.status_is_send.is_set(): - logger.warning( - "Handler for message %s %s did not call send_okay / function to send the ReceptionStatus. " - "Sending it now.", - type_msg, - self.subject_message_id, - ) - self.run_sync() - - -class MessageHandlers: - handlers: Dict[Type[S2Message], S2MessageHandler] - - def __init__(self) -> None: - self.handlers = {} - - async def handle_message(self, connection: "S2Connection", msg: S2Message) -> None: - """Handle the S2 message using the registered handler. - - :param connection: The S2 conncetion the `msg` is received from. - :param msg: The S2 message - """ - handler = self.handlers.get(type(msg)) - if handler is not None: - send_okay = SendOkay(connection, msg.message_id) # type: ignore[attr-defined, union-attr] - - try: - if asyncio.iscoroutinefunction(handler): - await handler(connection, msg, send_okay.run_async()) # type: ignore[arg-type] - await send_okay.ensure_send_async(type(msg)) - else: - - def do_message() -> None: - handler(connection, msg, send_okay.run_sync) # type: ignore[arg-type] - send_okay.ensure_send_sync(type(msg)) - - eventloop = asyncio.get_event_loop() - await eventloop.run_in_executor(executor=None, func=do_message) - except Exception: - if not send_okay.status_is_send.is_set(): - await connection._respond_with_reception_status( # pylint: disable=protected-access - subject_message_id=msg.message_id, # type: ignore[attr-defined, union-attr] - status=ReceptionStatusValues.PERMANENT_ERROR, - diagnostic_label=f"While processing message {msg.message_id} " # type: ignore[attr-defined, union-attr] # pylint: disable=line-too-long - f"an unrecoverable error occurred.", - ) - raise - else: - logger.warning( - "Received a message of type %s but no handler is registered. Ignoring the message.", - type(msg), - ) - - def register_handler( - self, msg_type: Type[S2Message], handler: S2MessageHandler - ) -> None: - """Register a coroutine function or a normal function as the handler for a specific S2 message type. - - :param msg_type: The S2 message type to attach the handler to. - :param handler: The function (asynchronuous or normal) which should handle the S2 message. - """ - self.handlers[msg_type] = handler diff --git a/src/s2python/s2_parser.py b/src/s2python/s2_parser.py index ed2e957..b67c14e 100644 --- a/src/s2python/s2_parser.py +++ b/src/s2python/s2_parser.py @@ -61,6 +61,9 @@ M = TypeVar("M", bound=S2MessageComponent) +UnparsedS2Message = Union[dict[Any, Any], str, bytes] + + # May be generated with development_utilities/generate_s2_message_type_to_class.py TYPE_TO_MESSAGE_CLASS: Dict[str, Type[S2Message]] = { 'DDBC.ActuatorStatus': DDBCActuatorStatus, @@ -103,13 +106,13 @@ class S2Parser: @staticmethod - def _parse_json_if_required(unparsed_message: Union[dict[Any, Any], str, bytes]) -> dict: + def _parse_json_if_required(unparsed_message: UnparsedS2Message) -> dict: if isinstance(unparsed_message, (str, bytes)): return json.loads(unparsed_message) return unparsed_message @staticmethod - def parse_as_any_message(unparsed_message: Union[dict[Any, Any], str, bytes]) -> S2Message: + def parse_as_any_message(unparsed_message: UnparsedS2Message) -> S2Message: """Parse the message as any S2 python message regardless of message type. :param unparsed_message: The message as a JSON-formatted string or as a json-parsed dictionary. @@ -130,7 +133,7 @@ def parse_as_any_message(unparsed_message: Union[dict[Any, Any], str, bytes]) -> @staticmethod def parse_as_message( - unparsed_message: Union[dict[Any, Any], str, bytes], as_message: Type[M] + unparsed_message: UnparsedS2Message, as_message: Type[M] ) -> M: """Parse the message to a specific S2 python message. @@ -144,7 +147,7 @@ def parse_as_message( @staticmethod def parse_message_type( - unparsed_message: Union[dict[Any, Any], str, bytes], + unparsed_message: UnparsedS2Message, ) -> Optional[S2MessageType]: """Parse only the message type from the unparsed message.