Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lghorizon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from .lghorizon_api import LGHorizonApi
from .lghorizon_device import LGHorizonDevice
from .lghorizon_models import (
LGHorizonAdBreak,
LGHorizonAuth,
LGHorizonChannel,
LGHorizonCustomer,
Expand Down Expand Up @@ -46,6 +47,7 @@
from .const import COUNTRY_SETTINGS

__all__ = [
"LGHorizonAdBreak",
"LGHorizonApi",
"LGHorizonDevice",
"LGHorizonAuth",
Expand Down
34 changes: 34 additions & 0 deletions lghorizon/lghorizon_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import asyncio
import json
import logging
import time
from typing import Any, Callable, Coroutine, Dict, Optional
from .lghorizon_models import (
LGHorizonAdBreak,
LGHorizonRunningState,
LGHorizonStatusMessage,
LGHorizonUIStatusMessage,
Expand Down Expand Up @@ -296,6 +298,38 @@ async def set_player_position(self, position: int) -> None:
f"{self._auth.household_id}/{self.device_id}", payload_str
)

def get_current_ad_break(self) -> Optional[LGHorizonAdBreak]:
"""Return the ad break at the current estimated playback position, or None.

Calculates real-time position based on last known position and elapsed time.
"""
ds = self._device_state
if not ds.ad_breaks or ds.position is None or ds.last_position_update is None:
return None

# Calculate real-time position in seconds
elapsed = time.time() - ds.last_position_update
speed = ds.speed if ds.speed is not None else 1
current_position_s = ds.position + (elapsed * speed)
current_position_ms = int(current_position_s * 1000)

for ab in ds.ad_breaks:
if ab.start_ms <= current_position_ms < ab.end_ms:
return ab
return None

async def skip_ad_break(self) -> bool:
"""Skip to the end of the current ad break. Returns True if skipped.

Calculates the real-time playback position and, if currently within
an ad break, seeks to the end of that break.
"""
ad_break = self.get_current_ad_break()
if ad_break is None:
return False
await self.set_player_position(ad_break.end_ms)
return True

async def display_message(self, sourceType: str, message: str) -> None:
"""Display a message on the set-top box and repeat it for longer visibility.

Expand Down
1 change: 1 addition & 0 deletions lghorizon/lghorizon_device_state_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ async def _process_ndvr_state(
player_state.last_speed_change_time
)
device_state.position = int(player_state.relative_position / 1000)
device_state.ad_breaks = source.ad_manifest
parsed_start = self._parse_timestamp(recording.start_time)
parsed_end = self._parse_timestamp(recording.end_time)
if parsed_start is not None:
Expand Down
62 changes: 62 additions & 0 deletions lghorizon/lghorizon_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,32 @@ def source_type(self) -> LGHorizonSourceType:
return LGHorizonSourceType.REVIEWBUFFER


@dataclass
class LGHorizonAdBreak:
"""Represent an ad break within an nDVR recording."""

start_ms: int
end_ms: int
ad_type: str
is_skippable: bool
has_counter: bool

@property
def duration_ms(self) -> int:
"""Return the duration of this ad break in milliseconds."""
return self.end_ms - self.start_ms

@property
def start_s(self) -> float:
"""Return the start position in seconds."""
return self.start_ms / 1000

@property
def end_s(self) -> float:
"""Return the end position in seconds."""
return self.end_ms / 1000


class LGHorizonNDVRSource(LGHorizonSource):
"""Represent the Network Digital Video Recorder (NDVR) Source of an LG Horizon device."""

Expand All @@ -232,6 +258,21 @@ def channel_id(self) -> str:
"""Return the channel ID."""
return self._raw_json.get("channelId", "")

@property
def ad_manifest(self) -> List[LGHorizonAdBreak]:
"""Return the list of ad breaks from the ad manifest."""
raw_manifest = self._raw_json.get("adManifest", [])
breaks = []
for entry in raw_manifest:
breaks.append(LGHorizonAdBreak(
start_ms=entry.get("dStart", 0),
end_ms=entry.get("dEnd", 0),
ad_type=entry.get("adType", "UNKNOWN"),
is_skippable=entry.get("isSkippable", False),
has_counter=entry.get("adCounter", False),
))
return breaks

@property
def source_type(self) -> LGHorizonSourceType:
return LGHorizonSourceType.NDVR
Expand Down Expand Up @@ -931,6 +972,7 @@ class LGHorizonDeviceState:
start_time: Optional[int] = None
end_time: Optional[int] = None
last_position_update: Optional[int] = None
ad_breaks: List[LGHorizonAdBreak] = field(default_factory=list)

@property
def paused(self) -> bool:
Expand All @@ -939,6 +981,25 @@ def paused(self) -> bool:
return False
return self.speed == 0

@property
def is_in_ad_break(self) -> bool:
"""Return True if current position is within an ad break."""
if not self.ad_breaks or self.position is None:
return False
position_ms = int(self.position * 1000)
return any(ab.start_ms <= position_ms < ab.end_ms for ab in self.ad_breaks)

@property
def current_ad_break_end(self) -> Optional[float]:
"""Return the end position (in seconds) of the current ad break, or None."""
if not self.ad_breaks or self.position is None:
return None
position_ms = int(self.position * 1000)
for ab in self.ad_breaks:
if ab.start_ms <= position_ms < ab.end_ms:
return ab.end_s
return None

def reset_progress(self) -> None:
"""Reset the progress-related attributes."""
self.position = None
Expand All @@ -962,6 +1023,7 @@ def reset(self) -> None:
self.source_type = LGHorizonSourceType.UNKNOWN
self.ui_state_type = LGHorizonUIStateType.UNKNOWN
self.media_type = LGHorizonMediaType.UNKNOWN
self.ad_breaks = []
self.reset_progress()


Expand Down
11 changes: 11 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,17 @@ async def device_callback(device_id: str):
bar = "█" * filled + "░" * (bar_len - filled)
print(f" Progress: [{bar}] {pct:.0%}")

# Ad break info
if s.ad_breaks:
print(f" {SEPARATOR}")
print(f" 🚫 Ad Breaks: {len(s.ad_breaks)} detected")
for i, ab in enumerate(s.ad_breaks, 1):
marker = " ◀ NOW" if (s.position and ab.start_ms <= s.position * 1000 < ab.end_ms) else ""
print(f" {i}. {format_duration(ab.start_ms // 1000)} - {format_duration(ab.end_ms // 1000)} ({format_duration(ab.duration_ms // 1000)}){marker}")
ad_break = device.get_current_ad_break()
if ad_break:
print(f" >>> IN AD BREAK - ends at {format_duration(int(ad_break.end_s))}")

# EPG now/next (simulates HA media_player extra_state_attributes)
epg = epg_cache.get("epg")
if epg and s.channel_id:
Expand Down
167 changes: 167 additions & 0 deletions tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from unittest.mock import AsyncMock, MagicMock, patch, call

from lghorizon.lghorizon_models import (
LGHorizonAdBreak,
LGHorizonAppsState,
LGHorizonChannel,
LGHorizonCustomer,
Expand Down Expand Up @@ -1966,3 +1967,169 @@ def test_none_returns_none(self):
def test_invalid_string_returns_none(self):
proc = self._make_processor()
assert proc._parse_timestamp("not-a-date") is None


# ---------------------------------------------------------------------------
# TestAdBreaks
# ---------------------------------------------------------------------------

# Fixture ad manifest from captured MQTT data
_AD_MANIFEST_RAW = [
{"dStart": 0, "dEnd": 15000, "adType": "IP_OTHER", "adCounter": True, "isSkippable": False},
{"dStart": 527760, "dEnd": 1012640, "adType": "IP_OTHER", "adCounter": True, "isSkippable": False},
{"dStart": 1815360, "dEnd": 1846360, "adType": "IP_OTHER", "adCounter": True, "isSkippable": False},
]


class TestAdBreaks:
# ------------------------------------------------------------------
# LGHorizonAdBreak dataclass properties
# ------------------------------------------------------------------

def test_duration_ms(self):
ab = LGHorizonAdBreak(start_ms=527760, end_ms=1012640, ad_type="IP_OTHER", is_skippable=False, has_counter=True)
assert ab.duration_ms == 1012640 - 527760

def test_start_s(self):
ab = LGHorizonAdBreak(start_ms=527760, end_ms=1012640, ad_type="IP_OTHER", is_skippable=False, has_counter=True)
assert ab.start_s == 527760 / 1000

def test_end_s(self):
ab = LGHorizonAdBreak(start_ms=527760, end_ms=1012640, ad_type="IP_OTHER", is_skippable=False, has_counter=True)
assert ab.end_s == 1012640 / 1000

def test_duration_ms_first_break(self):
ab = LGHorizonAdBreak(start_ms=0, end_ms=15000, ad_type="IP_OTHER", is_skippable=False, has_counter=True)
assert ab.duration_ms == 15000

def test_start_s_zero(self):
ab = LGHorizonAdBreak(start_ms=0, end_ms=15000, ad_type="IP_OTHER", is_skippable=False, has_counter=True)
assert ab.start_s == 0.0

def test_end_s_first_break(self):
ab = LGHorizonAdBreak(start_ms=0, end_ms=15000, ad_type="IP_OTHER", is_skippable=False, has_counter=True)
assert ab.end_s == 15.0

# ------------------------------------------------------------------
# LGHorizonNDVRSource.ad_manifest property
# ------------------------------------------------------------------

def test_ad_manifest_parses_breaks(self):
src = LGHorizonNDVRSource({"recordingId": "rec-1", "adManifest": _AD_MANIFEST_RAW})
breaks = src.ad_manifest
assert len(breaks) == 3
for ab in breaks:
assert isinstance(ab, LGHorizonAdBreak)

def test_ad_manifest_correct_values(self):
src = LGHorizonNDVRSource({"adManifest": _AD_MANIFEST_RAW})
breaks = src.ad_manifest
assert breaks[0].start_ms == 0
assert breaks[0].end_ms == 15000
assert breaks[0].ad_type == "IP_OTHER"
assert breaks[0].is_skippable is False
assert breaks[0].has_counter is True
assert breaks[1].start_ms == 527760
assert breaks[1].end_ms == 1012640
assert breaks[2].start_ms == 1815360
assert breaks[2].end_ms == 1846360

def test_ad_manifest_no_key_returns_empty_list(self):
src = LGHorizonNDVRSource({"recordingId": "rec-1"})
assert src.ad_manifest == []

def test_ad_manifest_empty_array_returns_empty_list(self):
src = LGHorizonNDVRSource({"recordingId": "rec-1", "adManifest": []})
assert src.ad_manifest == []

# ------------------------------------------------------------------
# LGHorizonDeviceState.is_in_ad_break
# ------------------------------------------------------------------

def _make_state_with_breaks(self) -> LGHorizonDeviceState:
ds = LGHorizonDeviceState()
ds.ad_breaks = [
LGHorizonAdBreak(start_ms=0, end_ms=15000, ad_type="IP_OTHER", is_skippable=False, has_counter=True),
LGHorizonAdBreak(start_ms=527760, end_ms=1012640, ad_type="IP_OTHER", is_skippable=False, has_counter=True),
LGHorizonAdBreak(start_ms=1815360, end_ms=1846360, ad_type="IP_OTHER", is_skippable=False, has_counter=True),
]
return ds

def test_is_in_ad_break_no_breaks(self):
ds = LGHorizonDeviceState()
ds.position = 10.0
assert ds.is_in_ad_break is False

def test_is_in_ad_break_position_none(self):
ds = self._make_state_with_breaks()
ds.position = None
assert ds.is_in_ad_break is False

def test_is_in_ad_break_within_first_break(self):
ds = self._make_state_with_breaks()
ds.position = 7.5 # 7500 ms, within [0, 15000)
assert ds.is_in_ad_break is True

def test_is_in_ad_break_within_second_break(self):
ds = self._make_state_with_breaks()
ds.position = 600.0 # 600000 ms, within [527760, 1012640)
assert ds.is_in_ad_break is True

def test_is_in_ad_break_at_break_start(self):
ds = self._make_state_with_breaks()
ds.position = 527.760 # exactly 527760 ms
assert ds.is_in_ad_break is True

def test_is_in_ad_break_at_break_end_exclusive(self):
ds = self._make_state_with_breaks()
ds.position = 1012.640 # exactly 1012640 ms — end is exclusive
assert ds.is_in_ad_break is False

def test_is_in_ad_break_between_breaks(self):
ds = self._make_state_with_breaks()
ds.position = 1100.0 # 1100000 ms, between second [527760,1012640) and third [1815360,1846360)
assert ds.is_in_ad_break is False

def test_is_in_ad_break_after_all_breaks(self):
ds = self._make_state_with_breaks()
ds.position = 2000.0 # 2000000 ms, after all breaks
assert ds.is_in_ad_break is False

# ------------------------------------------------------------------
# LGHorizonDeviceState.current_ad_break_end
# ------------------------------------------------------------------

def test_current_ad_break_end_no_breaks(self):
ds = LGHorizonDeviceState()
ds.position = 10.0
assert ds.current_ad_break_end is None

def test_current_ad_break_end_position_none(self):
ds = self._make_state_with_breaks()
ds.position = None
assert ds.current_ad_break_end is None

def test_current_ad_break_end_in_first_break(self):
ds = self._make_state_with_breaks()
ds.position = 7.5 # within [0, 15000)
assert ds.current_ad_break_end == 15.0 # end_s = 15000 / 1000

def test_current_ad_break_end_in_second_break(self):
ds = self._make_state_with_breaks()
ds.position = 600.0 # within [527760, 1012640)
assert ds.current_ad_break_end == pytest.approx(1012.640)

def test_current_ad_break_end_not_in_break(self):
ds = self._make_state_with_breaks()
ds.position = 1100.0 # between breaks
assert ds.current_ad_break_end is None

# ------------------------------------------------------------------
# LGHorizonDeviceState.reset() clears ad_breaks
# ------------------------------------------------------------------

def test_reset_clears_ad_breaks(self):
ds = self._make_state_with_breaks()
assert len(ds.ad_breaks) == 3
ds.reset()
assert ds.ad_breaks == []