Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions daemon/src/broker_daemon/daemon/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,18 @@ async def _monitor_loop(self) -> None:
Event(topic=EventTopic.RISK, payload={"event": "halt", "reason": "heartbeat_timeout"})
)

if self._provider.is_connected:
health_ok = await self._provider.check_health()
if not health_ok:
self._connection_loss.on_disconnected()
await self._broadcast_event(
Event(
topic=EventTopic.CONNECTION,
payload={"event": "disconnected", "reason": "health_check_failed"},
)
)
continue

if self._provider.is_connected:
try:
balance = await self._provider.balance()
Expand Down
4 changes: 4 additions & 0 deletions daemon/src/broker_daemon/providers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ def status(self) -> ConnectionStatus:
def is_connected(self) -> bool:
raise NotImplementedError

async def check_health(self) -> bool:
"""Active connectivity check. Default: delegates to is_connected."""
return self.is_connected

@abstractmethod
async def quote(self, symbols: list[str], *, intent: QuoteIntent = "best_effort") -> list[Quote]:
raise NotImplementedError
Expand Down
33 changes: 33 additions & 0 deletions daemon/src/broker_daemon/providers/ib.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,39 @@ async def ensure_connected(self) -> None:
suggestion="Verify IB Gateway/TWS is running and check [gateway] config host/port/client_id.",
)

async def check_health(self) -> bool:
"""Ping IB Gateway to verify the session is alive.

Returns True if the session responded. On failure forces disconnect
and schedules reconnect, then returns False.
"""
if not self.is_connected:
return False
if self._reconnect_task and not self._reconnect_task.done():
return False
try:
await asyncio.wait_for(self._ib.reqCurrentTimeAsync(), timeout=5)
return True
except Exception:
logger.warning("IB Gateway health check failed; forcing disconnect")
await self._log_connection(
"auto_logoff_detected",
{"host": self._cfg.host, "port": self._cfg.port},
)
self._force_disconnect()
return False

def _force_disconnect(self) -> None:
"""Force-close the IB connection, reset state, and trigger reconnect."""
if self._ib is not None:
try:
self._ib.disconnect()
except Exception:
pass
self._connected_at = None
self._listeners_registered = False
self._schedule_reconnect()

def _register_event_handlers(self) -> None:
if not self._ib or self._listeners_registered:
return
Expand Down
112 changes: 112 additions & 0 deletions daemon/tests/test_daemon/test_connection_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ def disconnect(self) -> None:
def managedAccounts(self) -> list[str]:
return ["DU123456"]

async def reqCurrentTimeAsync(self) -> int:
return 1700000000


@pytest.mark.asyncio
async def test_connect_registers_event_handlers_after_reconnect(monkeypatch: pytest.MonkeyPatch) -> None:
Expand Down Expand Up @@ -100,3 +103,112 @@ async def test_exposure_rejects_unknown_group() -> None:
with pytest.raises(BrokerError) as exc:
await manager.exposure("invalid-group")
assert exc.value.code == ErrorCode.INVALID_ARGS


# --- Health check tests ---


def _make_connected_manager(monkeypatch: pytest.MonkeyPatch) -> IBConnectionManager:
"""Return an IBConnectionManager with a fake IB client that appears connected."""
_FakeIB.instances.clear()
fake_module = types.SimpleNamespace(IB=_FakeIB)
original_import = builtins.__import__

def _fake_import(name: str, *args: object, **kwargs: object) -> object:
if name == "ib_async":
return fake_module
return original_import(name, *args, **kwargs)

monkeypatch.setattr(builtins, "__import__", _fake_import)
manager = IBConnectionManager(GatewayConfig())
# Directly set up internal state to simulate a connected session.
fake_ib = _FakeIB()
fake_ib.connected = True
manager._ib = fake_ib # noqa: SLF001
from datetime import UTC, datetime

manager._connected_at = datetime.now(UTC) # noqa: SLF001
return manager


@pytest.mark.asyncio
async def test_check_health_returns_true_when_alive(monkeypatch: pytest.MonkeyPatch) -> None:
manager = _make_connected_manager(monkeypatch)
assert manager.is_connected
result = await manager.check_health()
assert result is True


@pytest.mark.asyncio
async def test_check_health_forces_disconnect_on_timeout(monkeypatch: pytest.MonkeyPatch) -> None:
manager = _make_connected_manager(monkeypatch)

async def _hang() -> None:
await asyncio.sleep(60)

manager._ib.reqCurrentTimeAsync = _hang # noqa: SLF001

reconnect_called = asyncio.Event()

async def _fake_reconnect() -> None:
reconnect_called.set()

monkeypatch.setattr(manager, "_reconnect_loop", _fake_reconnect)

result = await manager.check_health()
assert result is False
assert manager._connected_at is None # noqa: SLF001
if manager._reconnect_task is not None: # noqa: SLF001
await manager._reconnect_task # noqa: SLF001
assert reconnect_called.is_set()


@pytest.mark.asyncio
async def test_check_health_forces_disconnect_on_exception(monkeypatch: pytest.MonkeyPatch) -> None:
manager = _make_connected_manager(monkeypatch)

async def _raise() -> None:
raise ConnectionError("session dead")

manager._ib.reqCurrentTimeAsync = _raise # noqa: SLF001

reconnect_called = asyncio.Event()

async def _fake_reconnect() -> None:
reconnect_called.set()

monkeypatch.setattr(manager, "_reconnect_loop", _fake_reconnect)

result = await manager.check_health()
assert result is False
assert manager._connected_at is None # noqa: SLF001
if manager._reconnect_task is not None: # noqa: SLF001
await manager._reconnect_task # noqa: SLF001
assert reconnect_called.is_set()


@pytest.mark.asyncio
async def test_check_health_skips_when_not_connected() -> None:
manager = IBConnectionManager(GatewayConfig())
assert not manager.is_connected
result = await manager.check_health()
assert result is False


@pytest.mark.asyncio
async def test_check_health_skips_when_reconnect_in_progress(monkeypatch: pytest.MonkeyPatch) -> None:
manager = _make_connected_manager(monkeypatch)

# Simulate a reconnect task that is still running.
never_done: asyncio.Future[None] = asyncio.get_event_loop().create_future()
manager._reconnect_task = asyncio.ensure_future(never_done) # noqa: SLF001

result = await manager.check_health()
assert result is False

# Clean up the dangling future.
never_done.cancel()
try:
await manager._reconnect_task # noqa: SLF001
except asyncio.CancelledError:
pass