From 943d12445b2f07ce9b25b90d8ece535691ed2320 Mon Sep 17 00:00:00 2001 From: Bryce Bjork Date: Mon, 23 Feb 2026 00:07:18 -0500 Subject: [PATCH] Add IB Gateway auto-logoff detection with active health check polling When IB Gateway's auto-logoff timer fires, the TCP socket can linger without ib_async firing disconnectedEvent, leaving the daemon in a zombie-connected state with no reconnection triggered. This change adds a lightweight active health check (reqCurrentTimeAsync) to the monitor loop that detects stale sessions within seconds and forces reconnection. Changes: - IBProvider.check_health(): Pings gateway with 5s timeout, forces disconnect and reschedules reconnect on failure - IBProvider._force_disconnect(): Tears down stale connection and schedules exponential backoff reconnect - DaemonServer._monitor_loop: Calls check_health before drawdown check; on failure broadcasts disconnected event - BrokerProvider base class: Default no-op check_health() for non-IB providers - Tests: 5 new tests covering alive, timeout, exception, and edge cases Co-authored-by: Bryce Bjork --- daemon/src/broker_daemon/daemon/server.py | 12 ++ daemon/src/broker_daemon/providers/base.py | 4 + daemon/src/broker_daemon/providers/ib.py | 33 ++++++ .../test_daemon/test_connection_manager.py | 112 ++++++++++++++++++ 4 files changed, 161 insertions(+) diff --git a/daemon/src/broker_daemon/daemon/server.py b/daemon/src/broker_daemon/daemon/server.py index 85ac80c..af22bf1 100644 --- a/daemon/src/broker_daemon/daemon/server.py +++ b/daemon/src/broker_daemon/daemon/server.py @@ -778,6 +778,18 @@ async def _monitor_loop(self) -> None: Event(topic=EventTopic.RISK, payload={"event": "halt", "reason": "heartbeat_timeout"}) ) + if self._provider.is_connected: + health_ok = await self._provider.check_health() + if not health_ok: + self._connection_loss.on_disconnected() + await self._broadcast_event( + Event( + topic=EventTopic.CONNECTION, + payload={"event": "disconnected", "reason": "health_check_failed"}, + ) + ) + continue + if self._provider.is_connected: try: balance = await self._provider.balance() diff --git a/daemon/src/broker_daemon/providers/base.py b/daemon/src/broker_daemon/providers/base.py index d6ff978..205118a 100644 --- a/daemon/src/broker_daemon/providers/base.py +++ b/daemon/src/broker_daemon/providers/base.py @@ -61,6 +61,10 @@ def status(self) -> ConnectionStatus: def is_connected(self) -> bool: raise NotImplementedError + async def check_health(self) -> bool: + """Active connectivity check. Default: delegates to is_connected.""" + return self.is_connected + @abstractmethod async def quote(self, symbols: list[str], *, intent: QuoteIntent = "best_effort") -> list[Quote]: raise NotImplementedError diff --git a/daemon/src/broker_daemon/providers/ib.py b/daemon/src/broker_daemon/providers/ib.py index 720a9cf..1eb1316 100644 --- a/daemon/src/broker_daemon/providers/ib.py +++ b/daemon/src/broker_daemon/providers/ib.py @@ -181,6 +181,39 @@ async def ensure_connected(self) -> None: suggestion="Verify IB Gateway/TWS is running and check [gateway] config host/port/client_id.", ) + async def check_health(self) -> bool: + """Ping IB Gateway to verify the session is alive. + + Returns True if the session responded. On failure forces disconnect + and schedules reconnect, then returns False. + """ + if not self.is_connected: + return False + if self._reconnect_task and not self._reconnect_task.done(): + return False + try: + await asyncio.wait_for(self._ib.reqCurrentTimeAsync(), timeout=5) + return True + except Exception: + logger.warning("IB Gateway health check failed; forcing disconnect") + await self._log_connection( + "auto_logoff_detected", + {"host": self._cfg.host, "port": self._cfg.port}, + ) + self._force_disconnect() + return False + + def _force_disconnect(self) -> None: + """Force-close the IB connection, reset state, and trigger reconnect.""" + if self._ib is not None: + try: + self._ib.disconnect() + except Exception: + pass + self._connected_at = None + self._listeners_registered = False + self._schedule_reconnect() + def _register_event_handlers(self) -> None: if not self._ib or self._listeners_registered: return diff --git a/daemon/tests/test_daemon/test_connection_manager.py b/daemon/tests/test_daemon/test_connection_manager.py index f1be624..63d3a46 100644 --- a/daemon/tests/test_daemon/test_connection_manager.py +++ b/daemon/tests/test_daemon/test_connection_manager.py @@ -43,6 +43,9 @@ def disconnect(self) -> None: def managedAccounts(self) -> list[str]: return ["DU123456"] + async def reqCurrentTimeAsync(self) -> int: + return 1700000000 + @pytest.mark.asyncio async def test_connect_registers_event_handlers_after_reconnect(monkeypatch: pytest.MonkeyPatch) -> None: @@ -100,3 +103,112 @@ async def test_exposure_rejects_unknown_group() -> None: with pytest.raises(BrokerError) as exc: await manager.exposure("invalid-group") assert exc.value.code == ErrorCode.INVALID_ARGS + + +# --- Health check tests --- + + +def _make_connected_manager(monkeypatch: pytest.MonkeyPatch) -> IBConnectionManager: + """Return an IBConnectionManager with a fake IB client that appears connected.""" + _FakeIB.instances.clear() + fake_module = types.SimpleNamespace(IB=_FakeIB) + original_import = builtins.__import__ + + def _fake_import(name: str, *args: object, **kwargs: object) -> object: + if name == "ib_async": + return fake_module + return original_import(name, *args, **kwargs) + + monkeypatch.setattr(builtins, "__import__", _fake_import) + manager = IBConnectionManager(GatewayConfig()) + # Directly set up internal state to simulate a connected session. + fake_ib = _FakeIB() + fake_ib.connected = True + manager._ib = fake_ib # noqa: SLF001 + from datetime import UTC, datetime + + manager._connected_at = datetime.now(UTC) # noqa: SLF001 + return manager + + +@pytest.mark.asyncio +async def test_check_health_returns_true_when_alive(monkeypatch: pytest.MonkeyPatch) -> None: + manager = _make_connected_manager(monkeypatch) + assert manager.is_connected + result = await manager.check_health() + assert result is True + + +@pytest.mark.asyncio +async def test_check_health_forces_disconnect_on_timeout(monkeypatch: pytest.MonkeyPatch) -> None: + manager = _make_connected_manager(monkeypatch) + + async def _hang() -> None: + await asyncio.sleep(60) + + manager._ib.reqCurrentTimeAsync = _hang # noqa: SLF001 + + reconnect_called = asyncio.Event() + + async def _fake_reconnect() -> None: + reconnect_called.set() + + monkeypatch.setattr(manager, "_reconnect_loop", _fake_reconnect) + + result = await manager.check_health() + assert result is False + assert manager._connected_at is None # noqa: SLF001 + if manager._reconnect_task is not None: # noqa: SLF001 + await manager._reconnect_task # noqa: SLF001 + assert reconnect_called.is_set() + + +@pytest.mark.asyncio +async def test_check_health_forces_disconnect_on_exception(monkeypatch: pytest.MonkeyPatch) -> None: + manager = _make_connected_manager(monkeypatch) + + async def _raise() -> None: + raise ConnectionError("session dead") + + manager._ib.reqCurrentTimeAsync = _raise # noqa: SLF001 + + reconnect_called = asyncio.Event() + + async def _fake_reconnect() -> None: + reconnect_called.set() + + monkeypatch.setattr(manager, "_reconnect_loop", _fake_reconnect) + + result = await manager.check_health() + assert result is False + assert manager._connected_at is None # noqa: SLF001 + if manager._reconnect_task is not None: # noqa: SLF001 + await manager._reconnect_task # noqa: SLF001 + assert reconnect_called.is_set() + + +@pytest.mark.asyncio +async def test_check_health_skips_when_not_connected() -> None: + manager = IBConnectionManager(GatewayConfig()) + assert not manager.is_connected + result = await manager.check_health() + assert result is False + + +@pytest.mark.asyncio +async def test_check_health_skips_when_reconnect_in_progress(monkeypatch: pytest.MonkeyPatch) -> None: + manager = _make_connected_manager(monkeypatch) + + # Simulate a reconnect task that is still running. + never_done: asyncio.Future[None] = asyncio.get_event_loop().create_future() + manager._reconnect_task = asyncio.ensure_future(never_done) # noqa: SLF001 + + result = await manager.check_health() + assert result is False + + # Clean up the dangling future. + never_done.cancel() + try: + await manager._reconnect_task # noqa: SLF001 + except asyncio.CancelledError: + pass