diff --git a/axis/interfaces/event_instances.py b/axis/interfaces/event_instances.py index 36d0447b..142e88a6 100644 --- a/axis/interfaces/event_instances.py +++ b/axis/interfaces/event_instances.py @@ -1,23 +1,45 @@ """Event service and action service APIs available in Axis network device.""" -from typing import Any +from typing import TYPE_CHECKING from ..models.event_instance import ( + EventInstance, ListEventInstancesRequest, ListEventInstancesResponse, ) from .api_handler import ApiHandler +from .event_manager import BLACK_LISTED_TOPICS +if TYPE_CHECKING: + from ..models.event import Event -class EventInstanceHandler(ApiHandler[Any]): + +class EventInstanceHandler(ApiHandler[EventInstance]): """Event instances for Axis devices.""" - async def _api_request(self) -> dict[str, Any]: + async def _api_request(self) -> dict[str, EventInstance]: """Get default data of API discovery.""" return await self.get_event_instances() - async def get_event_instances(self) -> dict[str, Any]: + async def get_event_instances(self) -> dict[str, EventInstance]: """List all event instances.""" bytes_data = await self.vapix.api_request(ListEventInstancesRequest()) response = ListEventInstancesResponse.decode(bytes_data) return response.data + + def get_expected_events_per_topic( + self, + include_internal_topics: bool = False, + ) -> dict[str, list[Event]]: + """Return expected startup events grouped by topic. + + Event instances are the protocol-agnostic bootstrap source for startup + predeclaration. Returned events are synthesized from schema data and represent + expected event identity/state (operation=Initialized), not live stream updates. + """ + grouped: dict[str, list[Event]] = {} + for item in self.values(): + if not include_internal_topics and item.topic in BLACK_LISTED_TOPICS: + continue + grouped[item.topic] = item.to_events() + return grouped diff --git a/axis/models/event_instance.py b/axis/models/event_instance.py index 486132bb..f389ba4d 100644 --- a/axis/models/event_instance.py +++ b/axis/models/event_instance.py @@ -1,12 +1,23 @@ """Event service and action service APIs available in Axis network device.""" from dataclasses import dataclass +import enum from typing import Any, Self import xmltodict from .api import ApiItem, ApiRequest, ApiResponse -from .event import traverse +from .event import ( + EVENT_OPERATION, + EVENT_SOURCE, + EVENT_SOURCE_IDX, + EVENT_TOPIC, + EVENT_VALUE, + Event, + EventOperation, + EventTopic, + traverse, +) EVENT_INSTANCE = ( "http://www.w3.org/2003/05/soap-envelope:Envelope", @@ -50,6 +61,85 @@ def get_events(data: dict[str, Any]) -> list[dict[str, Any]]: return events +def _as_simple_item_list( + data: object, +) -> list[dict[str, Any]]: + """Return a list representation for a simple-item payload.""" + if isinstance(data, list): + return data + if isinstance(data, dict): + return [data] + return [] + + +def _as_dict(data: object) -> dict[str, Any]: + """Return dict for mapping-like payloads and normalize other values to empty.""" + if isinstance(data, dict): + return data + return {} + + +def _extract_source_values( + source: dict[str, Any] | list[dict[str, Any]], +) -> tuple[str, list[str]]: + """Extract the source name and source values. + + Keep behavior aligned with event stream parsing by selecting the first source item + when multiple source items exist. + """ + source_items = _as_simple_item_list(source) + if not source_items: + return "", [""] + + source_item = source_items[0] + source_name = str(source_item.get("@Name", "")) + values = source_item.get("Value", "") + if isinstance(values, list): + source_values = [str(value) for value in values] + return source_name, source_values or [""] + if values in (None, ""): + return source_name, [""] + return source_name, [str(values)] + + +def _extract_data_value(data: dict[str, Any] | list[dict[str, Any]]) -> str: + """Extract a representative state value from data definition. + + Prefer the "active" item when available to align with Event._decode_from_bytes(). + """ + data_items = _as_simple_item_list(data) + if not data_items: + return "" + + data_item = next( + (item for item in data_items if item.get("@Name", "") == "active"), + data_items[0], + ) + value = data_item.get("Value", "") + if isinstance(value, list): + return str(value[0]) if value else "" + return "" if value is None else str(value) + + +TOPIC_TO_INACTIVE_STATE = { + EventTopic.LIGHT_STATUS.value: "OFF", + EventTopic.RELAY.value: "inactive", +} + + +class EventProtocol(enum.StrEnum): + """Protocols that consume normalized expected events.""" + + METADATA_STREAM = "metadata_stream" + WEBSOCKET = "websocket" + MQTT = "mqtt" + + +def _default_inactive_state(topic: str) -> str: + """Return a default inactive state for expected event synthesis.""" + return TOPIC_TO_INACTIVE_STATE.get(topic, "0") + + @dataclass(frozen=True) class EventInstance(ApiItem): """Events are emitted when the Axis product detects an occurrence of some kind. @@ -108,22 +198,50 @@ class EventInstance(ApiItem): @classmethod def decode(cls, data: dict[str, Any]) -> Self: """Decode dict to class object.""" - message = data["data"]["MessageInstance"] + event_data = _as_dict(data.get("data")) + message = _as_dict(event_data.get("MessageInstance")) + source_instance = _as_dict(message.get("SourceInstance")) + data_instance = _as_dict(message.get("DataInstance")) + return cls( id=data["topic"], topic=data["topic"], topic_filter=data["topic"] .replace("tns1", "onvif") .replace("tnsaxis", "axis"), - is_available=data["data"]["@topic"] == "true", - is_application_data=data["data"].get("@isApplicationData") == "true", - name=data["data"].get("@NiceName", ""), - stateful=data["data"]["MessageInstance"].get("@isProperty") == "true", - stateless=data["data"]["MessageInstance"].get("@isProperty") != "true", - source=message.get("SourceInstance", {}).get("SimpleItemInstance", {}), - data=message.get("DataInstance", {}).get("SimpleItemInstance", {}), + is_available=event_data.get("@topic") == "true", + is_application_data=event_data.get("@isApplicationData") == "true", + name=event_data.get("@NiceName", ""), + stateful=message.get("@isProperty") == "true", + stateless=message.get("@isProperty") != "true", + source=source_instance.get("SimpleItemInstance", {}), + data=data_instance.get("SimpleItemInstance", {}), ) + def to_events(self) -> list[Event]: + """Synthesize normalized expected events from event-instance schema data. + + Topics are preserved exactly as they are declared by event instances so topic + representation stays identical to emitted event topics. + """ + source_name, source_values = _extract_source_values(self.source) + state_value = _extract_data_value(self.data) + if state_value == "": + state_value = _default_inactive_state(self.topic) + + return [ + Event.decode( + { + EVENT_OPERATION: EventOperation.INITIALIZED, + EVENT_TOPIC: self.topic, + EVENT_SOURCE: source_name, + EVENT_SOURCE_IDX: source_value, + EVENT_VALUE: state_value, + } + ) + for source_value in source_values + ] + @dataclass class ListEventInstancesRequest(ApiRequest): diff --git a/tests/test_event_instances.py b/tests/test_event_instances.py index 9379fcd1..16490d6a 100644 --- a/tests/test_event_instances.py +++ b/tests/test_event_instances.py @@ -7,13 +7,17 @@ import pytest -from axis.models.event_instance import get_events +from axis.models.event import Event +from axis.models.event_instance import EventInstance, get_events from .event_fixtures import ( EVENT_INSTANCE_PIR_SENSOR, EVENT_INSTANCE_STORAGE_ALERT, EVENT_INSTANCE_VMD4_PROFILE1, EVENT_INSTANCES, + LIGHT_STATUS_INIT, + PIR_INIT, + VMD4_C1P1_INIT, ) if TYPE_CHECKING: @@ -269,3 +273,198 @@ async def test_single_event_instance( def test_get_events(input: dict, output: list): """Verify expected output of get_events.""" assert get_events(input) == output + + +@pytest.mark.parametrize( + "event_stream_bytes", + [PIR_INIT, LIGHT_STATUS_INIT, VMD4_C1P1_INIT], +) +async def test_event_instance_synthesized_event_matches_stream_content( + http_route_mock, + event_instances: EventInstanceHandler, + event_stream_bytes: bytes, +) -> None: + """Synthesize events from instances and verify stream-content parity fields.""" + http_route_mock.post("/vapix/services").respond( + text=EVENT_INSTANCES, + headers={"Content-Type": "application/soap+xml; charset=utf-8"}, + ) + + await event_instances.update() + + expected = Event.decode(event_stream_bytes) + per_topic = event_instances.get_expected_events_per_topic() + actual = next( + event + for event in per_topic[expected.topic] + if event.source == expected.source and event.id == expected.id + ) + + assert actual.topic == expected.topic + assert actual.source == expected.source + assert actual.id == expected.id + assert actual.state == expected.state + assert actual.group == expected.group + + +async def test_event_instance_synthesizes_unknown_topics( + http_route_mock, event_instances +): + """Synthesis should include topics not represented in EventTopic enum.""" + http_route_mock.post("/vapix/services").respond( + text=EVENT_INSTANCES, + headers={"Content-Type": "application/soap+xml; charset=utf-8"}, + ) + + await event_instances.update() + + per_topic = event_instances.get_expected_events_per_topic() + assert "tns1:Media/ProfileChanged" in per_topic + assert ( + per_topic["tns1:Media/ProfileChanged"][0].topic == "tns1:Media/ProfileChanged" + ) + + +async def test_expected_events_protocol_normalization(http_route_mock, event_instances): + """Expected-event discovery is protocol-agnostic and deterministic.""" + http_route_mock.post("/vapix/services").respond( + text=EVENT_INSTANCES, + headers={"Content-Type": "application/soap+xml; charset=utf-8"}, + ) + + await event_instances.update() + + topics_first = set(event_instances.get_expected_events_per_topic()) + topics_second = set(event_instances.get_expected_events_per_topic()) + + assert topics_first == topics_second + + +def test_expected_events_internal_topic_filtering(event_instances): + """Internal-only topics are excluded by default and available on request.""" + internal_topic = "tnsaxis:CameraApplicationPlatform/VMD/xinternal_data" + normal_topic = "tns1:Device/tnsaxis:Sensor/PIR" + + event_instances._items = { + internal_topic: EventInstance( + id=internal_topic, + topic=internal_topic, + topic_filter="axis:CameraApplicationPlatform/VMD/xinternal_data", + is_available=True, + is_application_data=False, + name="internal", + stateful=True, + stateless=False, + source={}, + data={}, + ), + normal_topic: EventInstance( + id=normal_topic, + topic=normal_topic, + topic_filter="onvif:Device/axis:Sensor/PIR", + is_available=True, + is_application_data=False, + name="pir", + stateful=True, + stateless=False, + source={}, + data={}, + ), + } + + filtered = event_instances.get_expected_events_per_topic() + unfiltered = event_instances.get_expected_events_per_topic( + include_internal_topics=True + ) + + assert internal_topic not in filtered + assert internal_topic in unfiltered + assert normal_topic in filtered + + +@pytest.mark.parametrize( + "raw_event", + [ + { + "topic": "tns1:Configuration/tnsaxis:Intercom/Changed", + "data": { + "@topic": "true", + "@NiceName": "Intercom Configuration changed", + "MessageInstance": None, + }, + }, + { + "topic": "tns1:Device/Trigger/Relay", + "data": { + "@topic": "true", + "MessageInstance": { + "@isProperty": "true", + "SourceInstance": None, + "DataInstance": None, + }, + }, + }, + { + "topic": "tns1:Device/Trigger/Relay", + "data": { + "@topic": "true", + "MessageInstance": { + "@isProperty": "true", + "SourceInstance": { + "SimpleItemInstance": { + "@Name": "RelayToken", + "Value": "3", + } + }, + "DataInstance": None, + }, + }, + }, + ], +) +def test_event_instance_decode_handles_none_shapes(raw_event: dict) -> None: + """EventInstance.decode should normalize None-shaped nested objects safely.""" + event = EventInstance.decode(raw_event) + + assert event.topic == raw_event["topic"] + assert isinstance(event.source, (dict, list)) + assert isinstance(event.data, (dict, list)) + + +async def test_event_instances_empty_message_instance_xml( + http_route_mock, event_instances +): + """Empty MessageInstance XML nodes should not crash event instance parsing.""" + response = """ + + + + + + + + + + + + + + + +""" + http_route_mock.post("/vapix/services").respond( + text=response, + headers={"Content-Type": "application/soap+xml; charset=utf-8"}, + ) + + await event_instances.update() + + topic = "tns1:Configuration/tnsaxis:Intercom/Changed" + assert topic in event_instances + event = event_instances[topic] + assert event.source == {} + assert event.data == {} diff --git a/tests/test_event_instances_protocol_parity.py b/tests/test_event_instances_protocol_parity.py new file mode 100644 index 00000000..3fa1ad87 --- /dev/null +++ b/tests/test_event_instances_protocol_parity.py @@ -0,0 +1,115 @@ +"""Protocol parity tests for event-instance expected-event discovery. + +pytest --cov-report term-missing --cov=axis.event_instances tests/test_event_instances_protocol_parity.py +""" + +from typing import TYPE_CHECKING + +import orjson +import pytest + +from axis.interfaces.mqtt import mqtt_json_to_event +from axis.models.event import Event +from axis.websocket import _parse_ws_notification + +from .event_fixtures import EVENT_INSTANCES, LIGHT_STATUS_INIT, PIR_INIT, VMD4_C1P1_INIT + +if TYPE_CHECKING: + from axis.interfaces.event_instances import EventInstanceHandler + + +@pytest.fixture +def event_instances(axis_device) -> EventInstanceHandler: + """Return event_instances handler from mocked device.""" + return axis_device.vapix.event_instances + + +def _mqtt_topic(topic: str) -> str: + """Convert internal topic format to MQTT topic namespace format.""" + return topic.replace("tns1", "onvif").replace("tnsaxis", "axis") + + +def _event_identity(event: Event) -> tuple[str, str, str, str, str]: + """Return identity fields used for cross-protocol parity assertions.""" + return (event.topic, event.source, event.id, event.state, event.group.value) + + +@pytest.mark.parametrize( + "event_stream_bytes", + [PIR_INIT, LIGHT_STATUS_INIT, VMD4_C1P1_INIT], +) +async def test_expected_events_match_websocket_shape( + http_route_mock, + event_instances: EventInstanceHandler, + event_stream_bytes: bytes, +) -> None: + """Websocket notify payloads should align with expected-event identities.""" + http_route_mock.post("/vapix/services").respond( + text=EVENT_INSTANCES, + headers={"Content-Type": "application/soap+xml; charset=utf-8"}, + ) + + await event_instances.update() + + expected = Event.decode(event_stream_bytes) + notification = { + "topic": expected.topic, + "message": { + "source": ( + {expected.source: expected.id} + if expected.source and expected.id != "" + else ({expected.source: expected.id} if expected.source else {}) + ), + "key": {}, + "data": {expected.data.get("type", "state"): expected.state}, + }, + } + + parsed = Event.decode(_parse_ws_notification(notification)) + expected_events = event_instances.get_expected_events_per_topic() + + expected_identities = { + _event_identity(event) for event in expected_events[expected.topic] + } + assert _event_identity(parsed) in expected_identities + + +@pytest.mark.parametrize( + "event_stream_bytes", + [PIR_INIT, LIGHT_STATUS_INIT, VMD4_C1P1_INIT], +) +async def test_expected_events_match_mqtt_shape( + http_route_mock, + event_instances: EventInstanceHandler, + event_stream_bytes: bytes, +) -> None: + """MQTT notify payloads should align with expected-event identities.""" + http_route_mock.post("/vapix/services").respond( + text=EVENT_INSTANCES, + headers={"Content-Type": "application/soap+xml; charset=utf-8"}, + ) + + await event_instances.update() + + expected = Event.decode(event_stream_bytes) + mqtt_msg = { + "timestamp": 0, + "topic": _mqtt_topic(expected.topic), + "message": { + "source": ( + {expected.source: expected.id} + if expected.source and expected.id != "" + else ({expected.source: expected.id} if expected.source else {}) + ), + "key": {}, + "data": {expected.data.get("type", "state"): expected.state}, + }, + } + + parsed = Event.decode(mqtt_json_to_event(orjson.dumps(mqtt_msg))) + expected_events = event_instances.get_expected_events_per_topic() + + expected_identities = { + _event_identity(event) for event in expected_events[expected.topic] + } + assert _event_identity(parsed) in expected_identities diff --git a/tests/test_vapix.py b/tests/test_vapix.py index aa920617..887ffffa 100644 --- a/tests/test_vapix.py +++ b/tests/test_vapix.py @@ -528,6 +528,7 @@ async def test_initialize_event_instances(http_route_mock, vapix: Vapix): assert vapix.event_instances assert len(vapix.event_instances) == 44 + assert "tns1:Device/tnsaxis:Sensor/PIR" in vapix.event_instances async def test_applications_dont_load_without_params(http_route_mock, vapix: Vapix):