Skip to content
30 changes: 26 additions & 4 deletions axis/interfaces/event_instances.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,45 @@
"""Event service and action service APIs available in Axis network device."""

from typing import Any
from typing import TYPE_CHECKING

from ..models.event_instance import (
EventInstance,
ListEventInstancesRequest,
ListEventInstancesResponse,
)
from .api_handler import ApiHandler
from .event_manager import BLACK_LISTED_TOPICS

if TYPE_CHECKING:
from ..models.event import Event

class EventInstanceHandler(ApiHandler[Any]):

class EventInstanceHandler(ApiHandler[EventInstance]):
"""Event instances for Axis devices."""

async def _api_request(self) -> dict[str, Any]:
async def _api_request(self) -> dict[str, EventInstance]:
"""Get default data of API discovery."""
return await self.get_event_instances()

async def get_event_instances(self) -> dict[str, Any]:
async def get_event_instances(self) -> dict[str, EventInstance]:
"""List all event instances."""
bytes_data = await self.vapix.api_request(ListEventInstancesRequest())
response = ListEventInstancesResponse.decode(bytes_data)
return response.data

def get_expected_events_per_topic(
self,
include_internal_topics: bool = False,
) -> dict[str, list[Event]]:
"""Return expected startup events grouped by topic.

Event instances are the protocol-agnostic bootstrap source for startup
predeclaration. Returned events are synthesized from schema data and represent
expected event identity/state (operation=Initialized), not live stream updates.
"""
grouped: dict[str, list[Event]] = {}
for item in self.values():
if not include_internal_topics and item.topic in BLACK_LISTED_TOPICS:
continue
grouped[item.topic] = item.to_events()
return grouped
136 changes: 127 additions & 9 deletions axis/models/event_instance.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
"""Event service and action service APIs available in Axis network device."""

from dataclasses import dataclass
import enum
from typing import Any, Self

import xmltodict

from .api import ApiItem, ApiRequest, ApiResponse
from .event import traverse
from .event import (
EVENT_OPERATION,
EVENT_SOURCE,
EVENT_SOURCE_IDX,
EVENT_TOPIC,
EVENT_VALUE,
Event,
EventOperation,
EventTopic,
traverse,
)

EVENT_INSTANCE = (
"http://www.w3.org/2003/05/soap-envelope:Envelope",
Expand Down Expand Up @@ -50,6 +61,85 @@ def get_events(data: dict[str, Any]) -> list[dict[str, Any]]:
return events


def _as_simple_item_list(
data: object,
) -> list[dict[str, Any]]:
"""Return a list representation for a simple-item payload."""
if isinstance(data, list):
return data
if isinstance(data, dict):
return [data]
return []


def _as_dict(data: object) -> dict[str, Any]:
"""Return dict for mapping-like payloads and normalize other values to empty."""
if isinstance(data, dict):
return data
return {}


def _extract_source_values(
source: dict[str, Any] | list[dict[str, Any]],
) -> tuple[str, list[str]]:
"""Extract the source name and source values.

Keep behavior aligned with event stream parsing by selecting the first source item
when multiple source items exist.
"""
source_items = _as_simple_item_list(source)
if not source_items:
return "", [""]

source_item = source_items[0]
source_name = str(source_item.get("@Name", ""))
values = source_item.get("Value", "")
if isinstance(values, list):
source_values = [str(value) for value in values]
return source_name, source_values or [""]
if values in (None, ""):
return source_name, [""]
return source_name, [str(values)]


def _extract_data_value(data: dict[str, Any] | list[dict[str, Any]]) -> str:
"""Extract a representative state value from data definition.

Prefer the "active" item when available to align with Event._decode_from_bytes().
"""
data_items = _as_simple_item_list(data)
if not data_items:
return ""

data_item = next(
(item for item in data_items if item.get("@Name", "") == "active"),
data_items[0],
)
value = data_item.get("Value", "")
if isinstance(value, list):
return str(value[0]) if value else ""
return "" if value is None else str(value)


TOPIC_TO_INACTIVE_STATE = {
EventTopic.LIGHT_STATUS.value: "OFF",
EventTopic.RELAY.value: "inactive",
}


class EventProtocol(enum.StrEnum):
"""Protocols that consume normalized expected events."""

METADATA_STREAM = "metadata_stream"
WEBSOCKET = "websocket"
MQTT = "mqtt"


def _default_inactive_state(topic: str) -> str:
"""Return a default inactive state for expected event synthesis."""
return TOPIC_TO_INACTIVE_STATE.get(topic, "0")


@dataclass(frozen=True)
class EventInstance(ApiItem):
"""Events are emitted when the Axis product detects an occurrence of some kind.
Expand Down Expand Up @@ -108,22 +198,50 @@ class EventInstance(ApiItem):
@classmethod
def decode(cls, data: dict[str, Any]) -> Self:
"""Decode dict to class object."""
message = data["data"]["MessageInstance"]
event_data = _as_dict(data.get("data"))
message = _as_dict(event_data.get("MessageInstance"))
source_instance = _as_dict(message.get("SourceInstance"))
data_instance = _as_dict(message.get("DataInstance"))

return cls(
id=data["topic"],
topic=data["topic"],
topic_filter=data["topic"]
.replace("tns1", "onvif")
.replace("tnsaxis", "axis"),
is_available=data["data"]["@topic"] == "true",
is_application_data=data["data"].get("@isApplicationData") == "true",
name=data["data"].get("@NiceName", ""),
stateful=data["data"]["MessageInstance"].get("@isProperty") == "true",
stateless=data["data"]["MessageInstance"].get("@isProperty") != "true",
source=message.get("SourceInstance", {}).get("SimpleItemInstance", {}),
data=message.get("DataInstance", {}).get("SimpleItemInstance", {}),
is_available=event_data.get("@topic") == "true",
is_application_data=event_data.get("@isApplicationData") == "true",
name=event_data.get("@NiceName", ""),
stateful=message.get("@isProperty") == "true",
stateless=message.get("@isProperty") != "true",
source=source_instance.get("SimpleItemInstance", {}),
data=data_instance.get("SimpleItemInstance", {}),
)

def to_events(self) -> list[Event]:
"""Synthesize normalized expected events from event-instance schema data.

Topics are preserved exactly as they are declared by event instances so topic
representation stays identical to emitted event topics.
"""
source_name, source_values = _extract_source_values(self.source)
state_value = _extract_data_value(self.data)
if state_value == "":
state_value = _default_inactive_state(self.topic)

return [
Event.decode(
{
EVENT_OPERATION: EventOperation.INITIALIZED,
EVENT_TOPIC: self.topic,
EVENT_SOURCE: source_name,
EVENT_SOURCE_IDX: source_value,
EVENT_VALUE: state_value,
}
)
for source_value in source_values
]


@dataclass
class ListEventInstancesRequest(ApiRequest):
Expand Down
Loading