diff --git a/py/host-emulator/src/__init__.py b/py/host-emulator/src/__init__.py index e69de29..3285105 100644 --- a/py/host-emulator/src/__init__.py +++ b/py/host-emulator/src/__init__.py @@ -0,0 +1,16 @@ +"""Host emulator for embedded C++ applications.""" + +from .common import Status, UnhandledMessageError +from .emulator import DeviceEmulator +from .i2c import I2C +from .pin import Pin +from .uart import Uart + +__all__ = [ + "DeviceEmulator", + "Pin", + "Uart", + "I2C", + "Status", + "UnhandledMessageError", +] diff --git a/py/host-emulator/src/common.py b/py/host-emulator/src/common.py new file mode 100644 index 0000000..c5267fc --- /dev/null +++ b/py/host-emulator/src/common.py @@ -0,0 +1,19 @@ +"""Common types and exceptions for the host emulator.""" + +from enum import Enum + + +class UnhandledMessageError(Exception): + """Exception raised when a message cannot be handled.""" + + pass + + +class Status(Enum): + """Status codes for emulator responses.""" + + Ok = "Ok" + Unknown = "Unknown" + InvalidArgument = "InvalidArgument" + InvalidState = "InvalidState" + InvalidOperation = "InvalidOperation" diff --git a/py/host-emulator/src/emulator.py b/py/host-emulator/src/emulator.py index 1f42540..81f7ed9 100755 --- a/py/host-emulator/src/emulator.py +++ b/py/host-emulator/src/emulator.py @@ -2,317 +2,14 @@ import json import sys -from enum import Enum from threading import Thread import zmq - -class UnhandledMessageError(Exception): - pass - - -class Status(Enum): - Ok = "Ok" - Unknown = "Unknown" - InvalidArgument = "InvalidArgument" - InvalidState = "InvalidState" - InvalidOperation = "InvalidOperation" - - -class Pin: - direction = Enum("direction", ["IN", "OUT"]) - state = Enum("state", ["Low", "High", "Hi_Z"]) - - def __init__(self, name, direction, state, to_device_socket): - self.name = name - self.direction = direction - self.state = state - self.to_device_socket = to_device_socket - self.on_response = None - self.on_request = None - - def handle_request(self, message): - response = { - "type": "Response", - "object": "Pin", - "name": self.name, - "state": self.state.name, - "status": Status.InvalidOperation.name, - } - if message["operation"] == "Get": - response.update( - { - "status": Status.Ok.name, - } - ) - elif message["operation"] == "Set": - self.state = Pin.state[message["state"]] - response.update( - { - "state": self.state.name, - "status": Status.Ok.name, - } - ) - else: - pass - # default response status is InvalidOperation - if self.on_request: - self.on_request(message) - return json.dumps(response) - - def set_state(self, state): - self.state = state - request = { - "type": "Request", - "object": "Pin", - "name": self.name, - "operation": "Set", - "state": self.state.name, - } - print(f"[Pin Set] Sending request: {request}") - self.to_device_socket.send_string(json.dumps(request)) - reply = self.to_device_socket.recv() - print(f"[Pin Set] Received response: {reply}") - self.handle_response(json.loads(reply)) - return json.loads(reply) - - def get_state(self): - request = { - "type": "Request", - "object": "Pin", - "name": self.name, - "operation": "Get", - "state": self.state.Hi_Z.name, - } - print(f"[Pin Get] Sending request: {request}") - self.to_device_socket.send_string(json.dumps(request)) - reply = self.to_device_socket.recv() - print(f"[Pin Get] Received response: {reply}") - self.handle_response(json.loads(reply)) - return json.loads(reply) - - def handle_response(self, message): - print(f"[Pin Handler] Received response: {message}") - if self.on_response: - self.on_response(message) - return None - - def set_on_request(self, on_request): - print(f"[Pin Handler] Setting on_request for {self.name}: {on_request}") - self.on_request = on_request - - def set_on_response(self, on_response): - print(f"[Pin Handler] Setting on_response for {self.name}: {on_response}") - self.on_response = on_response - - def handle_message(self, message): - if message["object"] != "Pin": - return None - if message["name"] != self.name: - return None - if message["type"] == "Request": - return self.handle_request(message) - if message["type"] == "Response": - return self.handle_response(message) - - -class Uart: - def __init__(self, name, to_device_socket): - self.name = name - self.to_device_socket = to_device_socket - self.rx_buffer = bytearray() # Data waiting to be read - self.on_response = None - self.on_request = None - - def handle_request(self, message): - response = { - "type": "Response", - "object": "Uart", - "name": self.name, - "data": [], - "bytes_transferred": 0, - "status": Status.InvalidOperation.name, - } - - if message["operation"] == "Init": - # Initialize UART with given configuration - print(f"[UART {self.name}] Initialized") - response.update({"status": Status.Ok.name}) - - elif message["operation"] == "Send": - # Receive data from the device and store in RX buffer - data = message.get("data", []) - self.rx_buffer.extend(data) - response.update( - { - "bytes_transferred": len(data), - "status": Status.Ok.name, - } - ) - print(f"[UART {self.name}] Received {len(data)} bytes: {bytes(data)}") - - elif message["operation"] == "Receive": - # Send buffered data back to the device - size = message.get("size", 0) - bytes_to_send = min(size, len(self.rx_buffer)) - data = list(self.rx_buffer[:bytes_to_send]) - self.rx_buffer = self.rx_buffer[bytes_to_send:] - response.update( - { - "data": data, - "bytes_transferred": bytes_to_send, - "status": Status.Ok.name, - } - ) - print(f"[UART {self.name}] Sent {bytes_to_send} bytes: {bytes(data)}") - - if self.on_request: - self.on_request(message) - return json.dumps(response) - - def send_data(self, data): - """Send data to the device (emulator -> device)""" - request = { - "type": "Request", - "object": "Uart", - "name": self.name, - "operation": "Receive", - "data": list(data), - "size": len(data), - "timeout_ms": 0, - } - print(f"[UART {self.name}] Sending data to device: {data}") - self.to_device_socket.send_string(json.dumps(request)) - reply = self.to_device_socket.recv() - print(f"[UART {self.name}] Received response: {reply}") - return json.loads(reply) - - def handle_response(self, message): - print(f"[UART {self.name}] Received response: {message}") - if self.on_response: - self.on_response(message) - return None - - def set_on_request(self, on_request): - self.on_request = on_request - - def set_on_response(self, on_response): - self.on_response = on_response - - def handle_message(self, message): - if message["object"] != "Uart": - return None - if message["name"] != self.name: - return None - if message["type"] == "Request": - return self.handle_request(message) - if message["type"] == "Response": - return self.handle_response(message) - - -class I2C: - def __init__(self, name): - self.name = name - # Store data for each I2C address (address -> bytearray) - self.device_buffers = {} - self.on_response = None - self.on_request = None - - def handle_request(self, message): - response = { - "type": "Response", - "object": "I2C", - "name": self.name, - "address": message.get("address", 0), - "data": [], - "bytes_transferred": 0, - "status": Status.InvalidOperation.name, - } - - address = message.get("address", 0) - - if message["operation"] == "Send": - # Device is sending data to I2C peripheral - # Store the data in the buffer for this address - data = message.get("data", []) - if address not in self.device_buffers: - self.device_buffers[address] = bytearray() - self.device_buffers[address] = bytearray(data) - response.update( - { - "bytes_transferred": len(data), - "status": Status.Ok.name, - } - ) - print( - f"[I2C {self.name}] Wrote {len(data)} bytes to address " - f"0x{address:02X}: {bytes(data)}" - ) - - elif message["operation"] == "Receive": - # Device is receiving data from I2C peripheral - # Return data from the buffer for this address - size = message.get("size", 0) - if address in self.device_buffers: - bytes_to_send = min(size, len(self.device_buffers[address])) - data = list(self.device_buffers[address][:bytes_to_send]) - else: - # No data available, return empty - bytes_to_send = 0 - data = [] - response.update( - { - "data": data, - "bytes_transferred": bytes_to_send, - "status": Status.Ok.name, - } - ) - print( - f"[I2C {self.name}] Read {bytes_to_send} bytes from address " - f"0x{address:02X}: {bytes(data)}" - ) - - if self.on_request: - self.on_request(message) - return json.dumps(response) - - def handle_response(self, message): - print(f"[I2C {self.name}] Received response: {message}") - if self.on_response: - self.on_response(message) - return None - - def set_on_request(self, on_request): - self.on_request = on_request - - def set_on_response(self, on_response): - self.on_response = on_response - - def handle_message(self, message): - if message["object"] != "I2C": - return None - if message["name"] != self.name: - return None - if message["type"] == "Request": - return self.handle_request(message) - if message["type"] == "Response": - return self.handle_response(message) - - def write_to_device(self, address, data): - """Write data to a simulated I2C device (for testing)""" - if address not in self.device_buffers: - self.device_buffers[address] = bytearray() - self.device_buffers[address] = bytearray(data) - print( - f"[I2C {self.name}] Device buffer at 0x{address:02X} set to: {bytes(data)}" - ) - - def read_from_device(self, address): - """Read data from a simulated I2C device (for testing)""" - if address in self.device_buffers: - return bytes(self.device_buffers[address]) - return b"" +from .common import UnhandledMessageError +from .i2c import I2C +from .pin import Pin +from .uart import Uart class DeviceEmulator: diff --git a/py/host-emulator/src/i2c.py b/py/host-emulator/src/i2c.py new file mode 100644 index 0000000..b9260df --- /dev/null +++ b/py/host-emulator/src/i2c.py @@ -0,0 +1,111 @@ +"""I2C emulation for the host emulator.""" + +import json + +from .common import Status + + +class I2C: + """Emulates an I2C controller/peripheral.""" + + def __init__(self, name): + self.name = name + # Store data for each I2C address (address -> bytearray) + self.device_buffers = {} + self.on_response = None + self.on_request = None + + def handle_request(self, message): + response = { + "type": "Response", + "object": "I2C", + "name": self.name, + "address": message.get("address", 0), + "data": [], + "bytes_transferred": 0, + "status": Status.InvalidOperation.name, + } + + address = message.get("address", 0) + + if message["operation"] == "Send": + # Device is sending data to I2C peripheral + # Store the data in the buffer for this address + data = message.get("data", []) + if address not in self.device_buffers: + self.device_buffers[address] = bytearray() + self.device_buffers[address] = bytearray(data) + response.update( + { + "bytes_transferred": len(data), + "status": Status.Ok.name, + } + ) + print( + f"[I2C {self.name}] Wrote {len(data)} bytes to address " + f"0x{address:02X}: {bytes(data)}" + ) + + elif message["operation"] == "Receive": + # Device is receiving data from I2C peripheral + # Return data from the buffer for this address + size = message.get("size", 0) + if address in self.device_buffers: + bytes_to_send = min(size, len(self.device_buffers[address])) + data = list(self.device_buffers[address][:bytes_to_send]) + else: + # No data available, return empty + bytes_to_send = 0 + data = [] + response.update( + { + "data": data, + "bytes_transferred": bytes_to_send, + "status": Status.Ok.name, + } + ) + print( + f"[I2C {self.name}] Read {bytes_to_send} bytes from address " + f"0x{address:02X}: {bytes(data)}" + ) + + if self.on_request: + self.on_request(message) + return json.dumps(response) + + def handle_response(self, message): + print(f"[I2C {self.name}] Received response: {message}") + if self.on_response: + self.on_response(message) + return None + + def set_on_request(self, on_request): + self.on_request = on_request + + def set_on_response(self, on_response): + self.on_response = on_response + + def handle_message(self, message): + if message["object"] != "I2C": + return None + if message["name"] != self.name: + return None + if message["type"] == "Request": + return self.handle_request(message) + if message["type"] == "Response": + return self.handle_response(message) + + def write_to_device(self, address, data): + """Write data to a simulated I2C device (for testing)""" + if address not in self.device_buffers: + self.device_buffers[address] = bytearray() + self.device_buffers[address] = bytearray(data) + print( + f"[I2C {self.name}] Device buffer at 0x{address:02X} set to: {bytes(data)}" + ) + + def read_from_device(self, address): + """Read data from a simulated I2C device (for testing)""" + if address in self.device_buffers: + return bytes(self.device_buffers[address]) + return b"" diff --git a/py/host-emulator/src/pin.py b/py/host-emulator/src/pin.py new file mode 100644 index 0000000..539b223 --- /dev/null +++ b/py/host-emulator/src/pin.py @@ -0,0 +1,105 @@ +"""Pin emulation for the host emulator.""" + +import json +from enum import Enum + +from .common import Status + + +class Pin: + """Emulates a digital pin (input/output).""" + + direction = Enum("direction", ["IN", "OUT"]) + state = Enum("state", ["Low", "High", "Hi_Z"]) + + def __init__(self, name, direction, state, to_device_socket): + self.name = name + self.direction = direction + self.state = state + self.to_device_socket = to_device_socket + self.on_response = None + self.on_request = None + + def handle_request(self, message): + response = { + "type": "Response", + "object": "Pin", + "name": self.name, + "state": self.state.name, + "status": Status.InvalidOperation.name, + } + if message["operation"] == "Get": + response.update( + { + "status": Status.Ok.name, + } + ) + elif message["operation"] == "Set": + self.state = Pin.state[message["state"]] + response.update( + { + "state": self.state.name, + "status": Status.Ok.name, + } + ) + else: + pass + # default response status is InvalidOperation + if self.on_request: + self.on_request(message) + return json.dumps(response) + + def set_state(self, state): + self.state = state + request = { + "type": "Request", + "object": "Pin", + "name": self.name, + "operation": "Set", + "state": self.state.name, + } + print(f"[Pin Set] Sending request: {request}") + self.to_device_socket.send_string(json.dumps(request)) + reply = self.to_device_socket.recv() + print(f"[Pin Set] Received response: {reply}") + self.handle_response(json.loads(reply)) + return json.loads(reply) + + def get_state(self): + request = { + "type": "Request", + "object": "Pin", + "name": self.name, + "operation": "Get", + "state": self.state.Hi_Z.name, + } + print(f"[Pin Get] Sending request: {request}") + self.to_device_socket.send_string(json.dumps(request)) + reply = self.to_device_socket.recv() + print(f"[Pin Get] Received response: {reply}") + self.handle_response(json.loads(reply)) + return json.loads(reply) + + def handle_response(self, message): + print(f"[Pin Handler] Received response: {message}") + if self.on_response: + self.on_response(message) + return None + + def set_on_request(self, on_request): + print(f"[Pin Handler] Setting on_request for {self.name}: {on_request}") + self.on_request = on_request + + def set_on_response(self, on_response): + print(f"[Pin Handler] Setting on_response for {self.name}: {on_response}") + self.on_response = on_response + + def handle_message(self, message): + if message["object"] != "Pin": + return None + if message["name"] != self.name: + return None + if message["type"] == "Request": + return self.handle_request(message) + if message["type"] == "Response": + return self.handle_response(message) diff --git a/py/host-emulator/src/uart.py b/py/host-emulator/src/uart.py new file mode 100644 index 0000000..aaec5d6 --- /dev/null +++ b/py/host-emulator/src/uart.py @@ -0,0 +1,101 @@ +"""UART emulation for the host emulator.""" + +import json + +from .common import Status + + +class Uart: + """Emulates a UART peripheral.""" + + def __init__(self, name, to_device_socket): + self.name = name + self.to_device_socket = to_device_socket + self.rx_buffer = bytearray() # Data waiting to be read + self.on_response = None + self.on_request = None + + def handle_request(self, message): + response = { + "type": "Response", + "object": "Uart", + "name": self.name, + "data": [], + "bytes_transferred": 0, + "status": Status.InvalidOperation.name, + } + + if message["operation"] == "Init": + # Initialize UART with given configuration + print(f"[UART {self.name}] Initialized") + response.update({"status": Status.Ok.name}) + + elif message["operation"] == "Send": + # Receive data from the device and store in RX buffer + data = message.get("data", []) + self.rx_buffer.extend(data) + response.update( + { + "bytes_transferred": len(data), + "status": Status.Ok.name, + } + ) + print(f"[UART {self.name}] Received {len(data)} bytes: {bytes(data)}") + + elif message["operation"] == "Receive": + # Send buffered data back to the device + size = message.get("size", 0) + bytes_to_send = min(size, len(self.rx_buffer)) + data = list(self.rx_buffer[:bytes_to_send]) + self.rx_buffer = self.rx_buffer[bytes_to_send:] + response.update( + { + "data": data, + "bytes_transferred": bytes_to_send, + "status": Status.Ok.name, + } + ) + print(f"[UART {self.name}] Sent {bytes_to_send} bytes: {bytes(data)}") + + if self.on_request: + self.on_request(message) + return json.dumps(response) + + def send_data(self, data): + """Send data to the device (emulator -> device)""" + request = { + "type": "Request", + "object": "Uart", + "name": self.name, + "operation": "Receive", + "data": list(data), + "size": len(data), + "timeout_ms": 0, + } + print(f"[UART {self.name}] Sending data to device: {data}") + self.to_device_socket.send_string(json.dumps(request)) + reply = self.to_device_socket.recv() + print(f"[UART {self.name}] Received response: {reply}") + return json.loads(reply) + + def handle_response(self, message): + print(f"[UART {self.name}] Received response: {message}") + if self.on_response: + self.on_response(message) + return None + + def set_on_request(self, on_request): + self.on_request = on_request + + def set_on_response(self, on_response): + self.on_response = on_response + + def handle_message(self, message): + if message["object"] != "Uart": + return None + if message["name"] != self.name: + return None + if message["type"] == "Request": + return self.handle_request(message) + if message["type"] == "Response": + return self.handle_response(message) diff --git a/py/host-emulator/tests/conftest.py b/py/host-emulator/tests/conftest.py index 23f3eb1..335a066 100644 --- a/py/host-emulator/tests/conftest.py +++ b/py/host-emulator/tests/conftest.py @@ -1,9 +1,10 @@ import pathlib import subprocess -from emulator import DeviceEmulator from pytest import fixture +from src import DeviceEmulator + def pytest_addoption(parser): parser.addoption( diff --git a/py/host-emulator/tests/test_blinky.py b/py/host-emulator/tests/test_blinky.py index f2a5ee6..c71f3c0 100644 --- a/py/host-emulator/tests/test_blinky.py +++ b/py/host-emulator/tests/test_blinky.py @@ -1,6 +1,6 @@ from time import sleep -from emulator import Pin +from src import Pin pin_stats = {}