diff --git a/docs/architecture.md b/docs/architecture.md index 852657d..f8e860d 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -144,14 +144,21 @@ terminal_port = 8194 ## Real-Time Data -> **구현 예정** — WebSocket 실시간 데이터는 아직 구현되지 않았습니다. 현재는 REST polling만 지원합니다. +Real-time price (and news) streaming is provided by +`FinnhubWebSocketAdapter`, which implements the `StreamingProvider` +capability. It is enabled automatically by `qracer serve` when the +`finnhub` provider is enabled in `providers.toml` and the +`qracer[streaming]` extra is installed. Each trade message is +dispatched to `AlertMonitor.evaluate_price`, allowing threshold alerts +to trigger on the next tick instead of waiting for the next polling +interval. For Live Mode, qracer needs sub-second price data and streaming news: | Capability | Preferred Provider | Protocol | Fallback | |---|---|---|---| -| Real-time quotes (구현 예정) | Finnhub | WebSocket | REST polling (5s interval) | -| Streaming news (구현 예정) | Finnhub | WebSocket | REST polling (30s interval) | +| Real-time quotes | Finnhub | WebSocket | REST polling (5s interval) | +| Streaming news | Finnhub | WebSocket | REST polling (30s interval) | | Price/OHLCV | Finnhub | REST | yfinance | | Fundamental | Finnhub | REST | FMP, yfinance | | Macro | FRED | REST | World Bank | @@ -163,7 +170,9 @@ For Live Mode, qracer needs sub-second price data and streaming news: | Short interest (planned) | FINRA | REST | Ortex (plugin) | | ETF flows (planned) | ETF.com | REST | — (plugin) | -WebSocket connections are opened on session start during market hours and closed on session end. REST fallback activates automatically if WebSocket disconnects. +WebSocket connections are opened when `qracer serve` starts and closed +on shutdown. If the initial handshake fails, the server transparently +falls back to REST polling via `AlertMonitor.check()`. API key missing → adapter auto-skipped. Fallback kicks in transparently. Provider availability is controlled entirely by `providers.toml` — no code changes needed to toggle sources. diff --git a/pyproject.toml b/pyproject.toml index 652eb9b..772dbf8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,6 +21,7 @@ fred = ["fredapi>=0.5.0"] all-llm = ["openai>=1.0.0", "google-generativeai>=0.8.0"] web = ["fastapi>=0.104.0", "uvicorn>=0.24.0"] pdf = ["fpdf2>=2.7.0"] +streaming = ["websockets>=12.0"] [dependency-groups] dev = [ diff --git a/qracer/alert_monitor.py b/qracer/alert_monitor.py index 033e544..803323b 100644 --- a/qracer/alert_monitor.py +++ b/qracer/alert_monitor.py @@ -88,3 +88,22 @@ async def check(self) -> list[AlertResult]: logger.info(msg) return results + + def evaluate_price(self, ticker: str, price: float) -> list[AlertResult]: + """Evaluate alerts for *ticker* against a known *price*. + + Intended for push-based flows — for example, a WebSocket trade + callback that already carries the latest price. Unlike + :meth:`check`, this method does not fetch from the data + registry and only touches alerts for the given ticker. + """ + results: list[AlertResult] = [] + for alert in self._store.get_by_ticker(ticker): + if not alert.active: + continue + if alert.evaluate(price): + self._store.mark_triggered(alert.id, price) + msg = f"Alert triggered: {alert.describe()} (price: {price})" + results.append(AlertResult(alert=alert, triggered_price=price, message=msg)) + logger.info(msg) + return results diff --git a/qracer/cli.py b/qracer/cli.py index 7c3faf3..b4ccdba 100644 --- a/qracer/cli.py +++ b/qracer/cli.py @@ -14,6 +14,8 @@ import click if TYPE_CHECKING: + from qracer.config.models import QracerConfig + from qracer.data.providers import StreamingProvider from qracer.data.registry import DataRegistry from qracer.llm.registry import LLMRegistry @@ -270,6 +272,33 @@ def _write_toml(path: Path, data: dict[str, object]) -> None: # --------------------------------------------------------------------------- +def _build_streaming_adapter(config: QracerConfig) -> StreamingProvider | None: + """Return a Finnhub streaming adapter if enabled and available. + + The adapter is only constructed when the ``finnhub`` provider is + enabled in ``providers.toml`` *and* an API key is available *and* + the ``websockets`` package is importable. Any failure returns + ``None`` so the caller falls back to REST polling. + """ + finnhub_cfg = config.providers.providers.get("finnhub") + if finnhub_cfg is None or not finnhub_cfg.enabled: + return None + api_key_env = finnhub_cfg.api_key_env or "FINNHUB_API_KEY" + api_key = config.credentials.get(api_key_env) or os.environ.get(api_key_env) + if not api_key: + return None + try: + from qracer.data.finnhub_ws import FinnhubWebSocketAdapter + + return FinnhubWebSocketAdapter(api_key=api_key) + except ImportError: + logger.info("Streaming disabled — install 'qracer[streaming]' for WebSocket support") + return None + except Exception as exc: + logger.warning("Streaming adapter unavailable: %s", exc) + return None + + def _build_registries() -> tuple[LLMRegistry, DataRegistry, list[str]]: """Build LLM and data registries from providers.toml + provider catalog. @@ -1059,12 +1088,15 @@ def serve(check_interval: int) -> None: cooldown_minutes=app_cfg.alert_cooldown_minutes, ) + streaming_adapter = _build_streaming_adapter(config) + server = Server( alert_monitor, task_executor, notifications, autonomous_monitor=autonomous_monitor, telegram_poller=telegram_poller, + streaming_adapter=streaming_adapter, tick_interval=1.0, ) @@ -1086,6 +1118,8 @@ def _handle_signal(signum: int, _frame: object) -> None: ) if telegram_poller is not None: click.echo(" Telegram bot: receiving commands (try /help in chat)") + if streaming_adapter is not None: + click.echo(" Streaming: Finnhub WebSocket (real-time alerts)") click.echo(" Press Ctrl+C to stop.\n") try: diff --git a/qracer/data/__init__.py b/qracer/data/__init__.py index ac88ad9..a156c58 100644 --- a/qracer/data/__init__.py +++ b/qracer/data/__init__.py @@ -9,6 +9,7 @@ NewsArticle, NewsProvider, PriceProvider, + StreamingProvider, ) from qracer.data.registry import DataRegistry from qracer.data.yfinance_adapter import YfinanceAdapter @@ -25,5 +26,6 @@ "NewsArticle", "NewsProvider", "PriceProvider", + "StreamingProvider", "YfinanceAdapter", ] diff --git a/qracer/data/finnhub_ws.py b/qracer/data/finnhub_ws.py new file mode 100644 index 0000000..d5ea2b4 --- /dev/null +++ b/qracer/data/finnhub_ws.py @@ -0,0 +1,269 @@ +"""FinnhubWebSocketAdapter — real-time price and news streaming via Finnhub. + +Opens a persistent WebSocket connection to ``wss://ws.finnhub.io`` and +dispatches incoming ``trade`` and ``news`` messages to registered +callbacks. Intended to drive :class:`~qracer.alert_monitor.AlertMonitor` +without REST polling during a live session. + +Implements the :class:`~qracer.data.providers.StreamingProvider` capability. + +Example:: + + adapter = FinnhubWebSocketAdapter(api_key="...") + adapter.on_price(my_price_callback) + await adapter.connect() + await adapter.subscribe(["AAPL", "TSLA"]) + # ... receive callbacks as trades arrive ... + await adapter.disconnect() +""" + +from __future__ import annotations + +import asyncio +import contextlib +import json +import logging +from datetime import datetime +from typing import Any + +from qracer.data.providers import NewsArticle, NewsCallback, PriceCallback + +try: + import websockets # pyright: ignore[reportMissingImports] + + _HAS_WEBSOCKETS = True +except ImportError: + _HAS_WEBSOCKETS = False + +logger = logging.getLogger(__name__) + +_WS_URL = "wss://ws.finnhub.io?token={api_key}" + + +class FinnhubWebSocketAdapter: + """Real-time price and news streaming via Finnhub WebSocket. + + The adapter keeps an internal set of subscribed tickers so that it + can replay subscriptions after a reconnect. Callbacks registered + via :meth:`on_price` and :meth:`on_news` are invoked in order for + every matching message. + """ + + def __init__(self, api_key: str | None = None) -> None: + if not _HAS_WEBSOCKETS: + raise ImportError( + "websockets is not installed. Install it with: uv add 'qracer[streaming]'" + ) + if not api_key: + raise ValueError("FINNHUB_API_KEY is required. Get one at https://finnhub.io/register") + self._api_key = api_key + self._ws: Any = None # websockets client protocol — typed loosely to avoid stub issues. + self._receive_task: asyncio.Task[None] | None = None + self._subscribed: set[str] = set() + self._price_callbacks: list[PriceCallback] = [] + self._news_callbacks: list[NewsCallback] = [] + self._lock = asyncio.Lock() + + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ + + async def connect(self) -> None: + """Open the WebSocket connection and start the receive loop. + + Raises: + ConnectionError: if the initial handshake fails. + """ + async with self._lock: + if self._ws is not None: + return + try: + self._ws = await websockets.connect(_WS_URL.format(api_key=self._api_key)) + except Exception as exc: + self._ws = None + raise ConnectionError(f"Finnhub WebSocket handshake failed: {exc}") from exc + logger.info("Finnhub WebSocket connected") + self._receive_task = asyncio.create_task(self._receive_loop()) + + # Re-subscribe any tickers that were registered before connect(). + if self._subscribed: + tickers = list(self._subscribed) + self._subscribed.clear() + await self._send_subscriptions(tickers) + + async def disconnect(self) -> None: + """Close the WebSocket connection and stop the receive loop.""" + async with self._lock: + if self._receive_task is not None: + self._receive_task.cancel() + with contextlib.suppress(asyncio.CancelledError, Exception): + await self._receive_task + self._receive_task = None + if self._ws is not None: + with contextlib.suppress(Exception): + await self._ws.close() + self._ws = None + logger.info("Finnhub WebSocket disconnected") + + # ------------------------------------------------------------------ + # Subscriptions + # ------------------------------------------------------------------ + + async def subscribe(self, tickers: list[str]) -> None: + """Subscribe to real-time updates for one or more tickers. + + Tickers registered before :meth:`connect` is called are remembered + and sent automatically once the connection is established. + """ + if not tickers: + return + if self._ws is None: + # Defer until connect() establishes the session. + self._subscribed.update(t.upper() for t in tickers) + return + new = [t.upper() for t in tickers if t.upper() not in self._subscribed] + if new: + await self._send_subscriptions(new) + + async def unsubscribe(self, tickers: list[str]) -> None: + """Unsubscribe from real-time updates for one or more tickers.""" + if not tickers or self._ws is None: + return + for ticker in tickers: + key = ticker.upper() + if key not in self._subscribed: + continue + with contextlib.suppress(Exception): + await self._ws.send(json.dumps({"type": "unsubscribe", "symbol": key})) + self._subscribed.discard(key) + + async def _send_subscriptions(self, tickers: list[str]) -> None: + """Send ``subscribe`` messages for the given tickers. + + Assumes ``self._ws`` is not ``None``. + """ + for ticker in tickers: + key = ticker.upper() + try: + await self._ws.send(json.dumps({"type": "subscribe", "symbol": key})) + self._subscribed.add(key) + except Exception as exc: + logger.warning("Finnhub WebSocket subscribe failed for %s: %s", key, exc) + + # ------------------------------------------------------------------ + # Callback registration + # ------------------------------------------------------------------ + + def on_price(self, callback: PriceCallback) -> None: + """Register a callback to be invoked on every trade update. + + The callback receives ``(ticker, price)`` and may be async. + """ + self._price_callbacks.append(callback) + + def on_news(self, callback: NewsCallback) -> None: + """Register a callback to be invoked on every news update. + + The callback receives a :class:`~qracer.data.providers.NewsArticle` + and may be async. + """ + self._news_callbacks.append(callback) + + # ------------------------------------------------------------------ + # Receive loop + # ------------------------------------------------------------------ + + async def _receive_loop(self) -> None: + """Read messages from the socket until the connection closes. + + Errors from individual callbacks are logged but do not stop the + loop. The loop exits when the underlying connection is closed. + """ + ws = self._ws + if ws is None: + return + try: + async for message in ws: + try: + await self._dispatch_message(message) + except Exception: + logger.exception("Finnhub WebSocket message dispatch failed") + except asyncio.CancelledError: + raise + except Exception: + logger.warning("Finnhub WebSocket receive loop ended unexpectedly", exc_info=True) + finally: + # Mark the connection as dead so a later connect() can reopen. + self._ws = None + + async def _dispatch_message(self, raw: str | bytes) -> None: + """Parse a raw message and dispatch it to registered callbacks.""" + if isinstance(raw, (bytes, bytearray)): + text = raw.decode("utf-8", errors="replace") + else: + text = raw + try: + payload = json.loads(text) + except json.JSONDecodeError: + logger.debug("Finnhub WebSocket sent non-JSON message: %r", text[:120]) + return + if not isinstance(payload, dict): + return + + msg_type = payload.get("type") + if msg_type == "trade": + for trade in payload.get("data", []) or []: + if not isinstance(trade, dict): + continue + ticker = trade.get("s") + price = trade.get("p") + if not isinstance(ticker, str) or not isinstance(price, (int, float)): + continue + await self._emit_price(ticker, float(price)) + elif msg_type == "news": + for item in payload.get("data", []) or []: + if not isinstance(item, dict): + continue + article = _parse_news(item) + if article is not None: + await self._emit_news(article) + elif msg_type == "ping": + # Finnhub pings periodically — nothing to do, ``websockets`` + # handles pong frames automatically. + return + elif msg_type == "error": + logger.warning("Finnhub WebSocket error: %s", payload.get("msg")) + + async def _emit_price(self, ticker: str, price: float) -> None: + for callback in list(self._price_callbacks): + try: + await callback(ticker, price) + except Exception: + logger.exception("Finnhub WebSocket price callback failed") + + async def _emit_news(self, article: NewsArticle) -> None: + for callback in list(self._news_callbacks): + try: + await callback(article) + except Exception: + logger.exception("Finnhub WebSocket news callback failed") + + +def _parse_news(item: dict[str, Any]) -> NewsArticle | None: + """Convert a raw Finnhub news message into a :class:`NewsArticle`.""" + headline = item.get("headline") + if not isinstance(headline, str) or not headline: + return None + ts = item.get("datetime", 0) + if isinstance(ts, (int, float)) and ts > 0: + published_at = datetime.fromtimestamp(float(ts)) + else: + published_at = datetime.now() + return NewsArticle( + title=headline, + source=str(item.get("source", "finnhub")), + published_at=published_at, + url=str(item.get("url", "")), + summary=str(item.get("summary", "")), + sentiment=None, + ) diff --git a/qracer/data/providers.py b/qracer/data/providers.py index f6dbb3a..3dc3d9d 100644 --- a/qracer/data/providers.py +++ b/qracer/data/providers.py @@ -6,10 +6,15 @@ from __future__ import annotations +from collections.abc import Awaitable, Callable from dataclasses import dataclass from datetime import date, datetime from typing import Protocol, runtime_checkable +# Callback signatures for streaming data updates. Both callbacks may be async. +PriceCallback = Callable[[str, float], Awaitable[None]] +NewsCallback = Callable[["NewsArticle"], Awaitable[None]] + @dataclass(frozen=True) class OHLCV: @@ -106,3 +111,26 @@ class AlternativeProvider(Protocol): """Capability: Alternative data retrieval (insider trades, etc.).""" async def get_alternative(self, ticker: str, record_type: str) -> list[AlternativeRecord]: ... + + +@runtime_checkable +class StreamingProvider(Protocol): + """Capability: real-time data streaming via WebSocket. + + Implementations maintain a persistent connection and dispatch price + and news updates to registered callbacks as messages arrive. The + caller is responsible for connecting before subscribing and + disconnecting on shutdown. + """ + + async def connect(self) -> None: ... + + async def disconnect(self) -> None: ... + + async def subscribe(self, tickers: list[str]) -> None: ... + + async def unsubscribe(self, tickers: list[str]) -> None: ... + + def on_price(self, callback: PriceCallback) -> None: ... + + def on_news(self, callback: NewsCallback) -> None: ... diff --git a/qracer/provider_catalog.py b/qracer/provider_catalog.py index 16033bf..2d03c0d 100644 --- a/qracer/provider_catalog.py +++ b/qracer/provider_catalog.py @@ -78,6 +78,7 @@ ("qracer.data.providers", "MacroProvider"), ("qracer.data.providers", "NewsProvider"), ("qracer.data.providers", "AlternativeProvider"), + ("qracer.data.providers", "StreamingProvider"), ] diff --git a/qracer/server.py b/qracer/server.py index c9fcb8f..9b8d2f0 100644 --- a/qracer/server.py +++ b/qracer/server.py @@ -13,6 +13,7 @@ from qracer.alert_monitor import AlertMonitor from qracer.alerts import AlertCondition from qracer.autonomous import AutonomousMonitor +from qracer.data.providers import StreamingProvider from qracer.notifications.providers import Notification, NotificationCategory from qracer.notifications.registry import NotificationRegistry from qracer.notifications.telegram_poller import BotCommand, TelegramBotPoller @@ -40,6 +41,7 @@ def __init__( *, autonomous_monitor: AutonomousMonitor | None = None, telegram_poller: TelegramBotPoller | None = None, + streaming_adapter: StreamingProvider | None = None, tick_interval: float = 1.0, ) -> None: self._alert_monitor = alert_monitor @@ -47,6 +49,7 @@ def __init__( self._autonomous_monitor = autonomous_monitor self._notifications = notifications or NotificationRegistry() self._telegram_poller = telegram_poller + self._streaming_adapter = streaming_adapter self._tick_interval = tick_interval self._shutdown_event = asyncio.Event() self._started_at: float | None = None @@ -56,18 +59,76 @@ async def run(self) -> None: logger.info("Server started (tick=%.1fs)", self._tick_interval) self._started_at = time.monotonic() - while not self._shutdown_event.is_set(): - await self._tick() - try: - await asyncio.wait_for( - self._shutdown_event.wait(), - timeout=self._tick_interval, - ) - except asyncio.TimeoutError: - pass + await self._start_streaming() + + try: + while not self._shutdown_event.is_set(): + await self._tick() + try: + await asyncio.wait_for( + self._shutdown_event.wait(), + timeout=self._tick_interval, + ) + except asyncio.TimeoutError: + pass + finally: + await self._stop_streaming() logger.info("Server stopped") + async def _start_streaming(self) -> None: + """Connect the streaming adapter and wire it to the alert monitor. + + On any failure the server falls back to REST polling — the + adapter is simply set to ``None`` so the tick loop keeps working. + """ + if self._streaming_adapter is None: + return + try: + await self._streaming_adapter.connect() + except Exception: + logger.warning( + "Streaming adapter failed to connect — falling back to REST polling", + exc_info=True, + ) + self._streaming_adapter = None + return + + self._streaming_adapter.on_price(self._on_stream_price) + + # Subscribe to every ticker that currently has an active alert. + tickers = sorted({a.ticker for a in self._alert_monitor.store.get_active()}) + if tickers: + try: + await self._streaming_adapter.subscribe(tickers) + except Exception: + logger.warning("Streaming subscribe failed for %s", tickers, exc_info=True) + logger.info("Streaming adapter wired (subscribed=%d)", len(tickers)) + + async def _stop_streaming(self) -> None: + """Disconnect the streaming adapter on shutdown.""" + if self._streaming_adapter is None: + return + try: + await self._streaming_adapter.disconnect() + except Exception: + logger.debug("Streaming adapter disconnect failed", exc_info=True) + + async def _on_stream_price(self, ticker: str, price: float) -> None: + """Evaluate alerts for *ticker* immediately on a real-time price.""" + try: + triggered = self._alert_monitor.evaluate_price(ticker, price) + except Exception: + logger.debug("Streaming alert evaluation failed", exc_info=True) + return + for result in triggered: + logger.info("Alert triggered (stream): %s", result.message) + await self._notify( + NotificationCategory.PRICE_ALERT, + result.message, + result.message, + ) + async def _tick(self) -> None: """Single heartbeat — check alerts and tasks.""" if self._alert_monitor.should_check(): diff --git a/tests/data/test_finnhub_ws.py b/tests/data/test_finnhub_ws.py new file mode 100644 index 0000000..318d74a --- /dev/null +++ b/tests/data/test_finnhub_ws.py @@ -0,0 +1,334 @@ +"""Tests for FinnhubWebSocketAdapter.""" + +from __future__ import annotations + +import asyncio +import json +from typing import Any +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from qracer.data.providers import NewsArticle + + +class _FakeWebSocket: + """Minimal async-iterable WebSocket double for unit tests. + + ``feed()`` queues outgoing messages that the adapter's receive loop + will see; ``close()`` terminates the iteration. ``send()`` records + every outgoing subscribe/unsubscribe frame. + """ + + def __init__(self) -> None: + self.sent: list[str] = [] + self._queue: asyncio.Queue[str | None] = asyncio.Queue() + self.closed = False + + async def send(self, message: str) -> None: + self.sent.append(message) + + async def close(self) -> None: + self.closed = True + await self._queue.put(None) + + def __aiter__(self) -> _FakeWebSocket: + return self + + async def __anext__(self) -> str: + item = await self._queue.get() + if item is None: + raise StopAsyncIteration + return item + + async def feed(self, message: str) -> None: + await self._queue.put(message) + + +class TestFinnhubWebSocketAdapterInit: + """Constructor validation.""" + + @patch("qracer.data.finnhub_ws._HAS_WEBSOCKETS", False) + def test_missing_websockets_raises(self) -> None: + from qracer.data.finnhub_ws import FinnhubWebSocketAdapter + + with pytest.raises(ImportError, match="websockets is not installed"): + FinnhubWebSocketAdapter(api_key="test") + + @patch("qracer.data.finnhub_ws._HAS_WEBSOCKETS", True) + def test_missing_api_key_raises(self) -> None: + from qracer.data.finnhub_ws import FinnhubWebSocketAdapter + + with pytest.raises(ValueError, match="FINNHUB_API_KEY is required"): + FinnhubWebSocketAdapter(api_key=None) + + @patch("qracer.data.finnhub_ws._HAS_WEBSOCKETS", True) + def test_empty_api_key_raises(self) -> None: + from qracer.data.finnhub_ws import FinnhubWebSocketAdapter + + with pytest.raises(ValueError, match="FINNHUB_API_KEY is required"): + FinnhubWebSocketAdapter(api_key="") + + @patch("qracer.data.finnhub_ws._HAS_WEBSOCKETS", True) + def test_valid_init(self) -> None: + from qracer.data.finnhub_ws import FinnhubWebSocketAdapter + + adapter = FinnhubWebSocketAdapter(api_key="key") + assert adapter._api_key == "key" + assert adapter._ws is None + assert adapter._subscribed == set() + + +class TestConnectAndSubscribe: + """Connect / subscribe / receive loop integration.""" + + @patch("qracer.data.finnhub_ws._HAS_WEBSOCKETS", True) + async def test_connect_starts_receive_loop(self) -> None: + from qracer.data.finnhub_ws import FinnhubWebSocketAdapter + + fake_ws = _FakeWebSocket() + with patch("qracer.data.finnhub_ws.websockets", create=True) as mock_ws: + mock_ws.connect = AsyncMock(return_value=fake_ws) + adapter = FinnhubWebSocketAdapter(api_key="k") + await adapter.connect() + try: + assert adapter._ws is fake_ws + assert adapter._receive_task is not None + mock_ws.connect.assert_awaited_once() + finally: + await adapter.disconnect() + assert fake_ws.closed + + @patch("qracer.data.finnhub_ws._HAS_WEBSOCKETS", True) + async def test_connect_handshake_failure_raises(self) -> None: + from qracer.data.finnhub_ws import FinnhubWebSocketAdapter + + with patch("qracer.data.finnhub_ws.websockets", create=True) as mock_ws: + mock_ws.connect = AsyncMock(side_effect=OSError("boom")) + adapter = FinnhubWebSocketAdapter(api_key="k") + with pytest.raises(ConnectionError, match="handshake failed"): + await adapter.connect() + assert adapter._ws is None + + @patch("qracer.data.finnhub_ws._HAS_WEBSOCKETS", True) + async def test_subscribe_before_connect_is_deferred(self) -> None: + from qracer.data.finnhub_ws import FinnhubWebSocketAdapter + + fake_ws = _FakeWebSocket() + with patch("qracer.data.finnhub_ws.websockets", create=True) as mock_ws: + mock_ws.connect = AsyncMock(return_value=fake_ws) + adapter = FinnhubWebSocketAdapter(api_key="k") + await adapter.subscribe(["aapl", "tsla"]) + assert adapter._subscribed == {"AAPL", "TSLA"} + assert fake_ws.sent == [] + + await adapter.connect() + try: + # Subscriptions queued before connect should now be sent. + sent = [json.loads(m) for m in fake_ws.sent] + symbols = {m["symbol"] for m in sent if m["type"] == "subscribe"} + assert symbols == {"AAPL", "TSLA"} + assert adapter._subscribed == {"AAPL", "TSLA"} + finally: + await adapter.disconnect() + + @patch("qracer.data.finnhub_ws._HAS_WEBSOCKETS", True) + async def test_subscribe_after_connect_dedupes(self) -> None: + from qracer.data.finnhub_ws import FinnhubWebSocketAdapter + + fake_ws = _FakeWebSocket() + with patch("qracer.data.finnhub_ws.websockets", create=True) as mock_ws: + mock_ws.connect = AsyncMock(return_value=fake_ws) + adapter = FinnhubWebSocketAdapter(api_key="k") + await adapter.connect() + try: + await adapter.subscribe(["AAPL"]) + await adapter.subscribe(["aapl", "MSFT"]) + subs = [json.loads(m)["symbol"] for m in fake_ws.sent if '"subscribe"' in m] + assert subs == ["AAPL", "MSFT"] + finally: + await adapter.disconnect() + + @patch("qracer.data.finnhub_ws._HAS_WEBSOCKETS", True) + async def test_unsubscribe_sends_frame(self) -> None: + from qracer.data.finnhub_ws import FinnhubWebSocketAdapter + + fake_ws = _FakeWebSocket() + with patch("qracer.data.finnhub_ws.websockets", create=True) as mock_ws: + mock_ws.connect = AsyncMock(return_value=fake_ws) + adapter = FinnhubWebSocketAdapter(api_key="k") + await adapter.connect() + try: + await adapter.subscribe(["AAPL"]) + fake_ws.sent.clear() + await adapter.unsubscribe(["AAPL", "GOOG"]) + assert len(fake_ws.sent) == 1 + frame = json.loads(fake_ws.sent[0]) + assert frame == {"type": "unsubscribe", "symbol": "AAPL"} + assert adapter._subscribed == set() + finally: + await adapter.disconnect() + + +class TestMessageDispatch: + """Price and news message parsing + callback dispatch.""" + + @patch("qracer.data.finnhub_ws._HAS_WEBSOCKETS", True) + async def test_trade_message_invokes_price_callback(self) -> None: + from qracer.data.finnhub_ws import FinnhubWebSocketAdapter + + fake_ws = _FakeWebSocket() + received: list[tuple[str, float]] = [] + + async def on_price(ticker: str, price: float) -> None: + received.append((ticker, price)) + + with patch("qracer.data.finnhub_ws.websockets", create=True) as mock_ws: + mock_ws.connect = AsyncMock(return_value=fake_ws) + adapter = FinnhubWebSocketAdapter(api_key="k") + adapter.on_price(on_price) + await adapter.connect() + try: + await fake_ws.feed( + json.dumps( + { + "type": "trade", + "data": [ + {"s": "AAPL", "p": 182.5, "t": 1, "v": 100}, + {"s": "TSLA", "p": 250.0, "t": 2, "v": 200}, + ], + } + ) + ) + await asyncio.sleep(0) # let the receive loop run + await asyncio.sleep(0) + finally: + await adapter.disconnect() + + assert ("AAPL", 182.5) in received + assert ("TSLA", 250.0) in received + + @patch("qracer.data.finnhub_ws._HAS_WEBSOCKETS", True) + async def test_news_message_invokes_news_callback(self) -> None: + from qracer.data.finnhub_ws import FinnhubWebSocketAdapter + + fake_ws = _FakeWebSocket() + received: list[NewsArticle] = [] + + async def on_news(article: NewsArticle) -> None: + received.append(article) + + with patch("qracer.data.finnhub_ws.websockets", create=True) as mock_ws: + mock_ws.connect = AsyncMock(return_value=fake_ws) + adapter = FinnhubWebSocketAdapter(api_key="k") + adapter.on_news(on_news) + await adapter.connect() + try: + await fake_ws.feed( + json.dumps( + { + "type": "news", + "data": [ + { + "headline": "AAPL beats earnings", + "source": "Reuters", + "datetime": 1_700_000_000, + "url": "https://example.com/a", + "summary": "Apple crushed expectations.", + } + ], + } + ) + ) + await asyncio.sleep(0) + await asyncio.sleep(0) + finally: + await adapter.disconnect() + + assert len(received) == 1 + assert received[0].title == "AAPL beats earnings" + assert received[0].source == "Reuters" + assert received[0].url == "https://example.com/a" + + @patch("qracer.data.finnhub_ws._HAS_WEBSOCKETS", True) + async def test_callback_error_does_not_stop_loop(self) -> None: + from qracer.data.finnhub_ws import FinnhubWebSocketAdapter + + fake_ws = _FakeWebSocket() + good = MagicMock() + + async def bad_callback(ticker: str, price: float) -> None: + raise RuntimeError("boom") + + async def good_callback(ticker: str, price: float) -> None: + good(ticker, price) + + with patch("qracer.data.finnhub_ws.websockets", create=True) as mock_ws: + mock_ws.connect = AsyncMock(return_value=fake_ws) + adapter = FinnhubWebSocketAdapter(api_key="k") + adapter.on_price(bad_callback) + adapter.on_price(good_callback) + await adapter.connect() + try: + await fake_ws.feed(json.dumps({"type": "trade", "data": [{"s": "X", "p": 1.0}]})) + await asyncio.sleep(0) + await asyncio.sleep(0) + finally: + await adapter.disconnect() + + good.assert_called_once_with("X", 1.0) + + @patch("qracer.data.finnhub_ws._HAS_WEBSOCKETS", True) + async def test_non_json_message_is_ignored(self) -> None: + from qracer.data.finnhub_ws import FinnhubWebSocketAdapter + + fake_ws = _FakeWebSocket() + + calls: list[Any] = [] + + async def on_price(ticker: str, price: float) -> None: + calls.append((ticker, price)) + + with patch("qracer.data.finnhub_ws.websockets", create=True) as mock_ws: + mock_ws.connect = AsyncMock(return_value=fake_ws) + adapter = FinnhubWebSocketAdapter(api_key="k") + adapter.on_price(on_price) + await adapter.connect() + try: + await fake_ws.feed("not json") + await fake_ws.feed(json.dumps({"type": "ping"})) + await fake_ws.feed(json.dumps({"type": "error", "msg": "limit"})) + await asyncio.sleep(0) + await asyncio.sleep(0) + finally: + await adapter.disconnect() + + assert calls == [] + + +class TestParseNews: + """Unit tests for the raw news-payload parser.""" + + def test_parse_news_basic(self) -> None: + from qracer.data.finnhub_ws import _parse_news + + article = _parse_news( + { + "headline": "TSLA hits new high", + "source": "Bloomberg", + "datetime": 1_700_000_000, + "url": "https://example.com/n", + "summary": "Shares surged 5%.", + } + ) + assert article is not None + assert article.title == "TSLA hits new high" + assert article.source == "Bloomberg" + assert article.url == "https://example.com/n" + assert article.summary == "Shares surged 5%." + + def test_parse_news_missing_headline_returns_none(self) -> None: + from qracer.data.finnhub_ws import _parse_news + + assert _parse_news({"source": "Reuters", "url": "x"}) is None + assert _parse_news({"headline": ""}) is None diff --git a/tests/test_alert_monitor.py b/tests/test_alert_monitor.py index 3b1ba61..7888767 100644 --- a/tests/test_alert_monitor.py +++ b/tests/test_alert_monitor.py @@ -132,3 +132,46 @@ async def test_multiple_alerts_same_ticker(self, store) -> None: # Only the 180 alert should trigger assert len(results) == 1 assert len(store.get_active()) == 1 + + +class TestEvaluatePrice: + """Synchronous push-based alert evaluation used by the streaming adapter.""" + + def test_triggers_matching_alert(self, store) -> None: + store.create("AAPL", AlertCondition.ABOVE, 200.0) + monitor = AlertMonitor(store, DataRegistry(), check_interval=0) + + results = monitor.evaluate_price("AAPL", 210.0) + + assert len(results) == 1 + assert results[0].triggered_price == 210.0 + # Triggered alert becomes inactive. + assert len(store.get_active()) == 0 + + def test_does_not_trigger_unmatched_ticker(self, store) -> None: + store.create("AAPL", AlertCondition.ABOVE, 200.0) + monitor = AlertMonitor(store, DataRegistry(), check_interval=0) + + results = monitor.evaluate_price("TSLA", 500.0) + + assert results == [] + assert len(store.get_active()) == 1 + + def test_threshold_not_met(self, store) -> None: + store.create("AAPL", AlertCondition.ABOVE, 200.0) + monitor = AlertMonitor(store, DataRegistry(), check_interval=0) + + results = monitor.evaluate_price("AAPL", 199.0) + + assert results == [] + assert len(store.get_active()) == 1 + + def test_inactive_alert_is_skipped(self, store) -> None: + alert = store.create("AAPL", AlertCondition.ABOVE, 200.0) + store.mark_triggered(alert.id, 205.0) + monitor = AlertMonitor(store, DataRegistry(), check_interval=0) + + # Even with a matching price, an already-triggered alert stays + # inactive and is not re-emitted. + results = monitor.evaluate_price("AAPL", 210.0) + assert results == [] diff --git a/tests/test_server.py b/tests/test_server.py index ded6641..0a26ae0 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -133,6 +133,92 @@ async def test_tick_handles_task_error_gracefully(self) -> None: await server._tick() # Should not raise +class TestServerStreamingAdapter: + """Lifecycle + price-callback wiring for the streaming adapter.""" + + def _streaming_adapter(self) -> MagicMock: + adapter = MagicMock() + adapter.connect = AsyncMock() + adapter.disconnect = AsyncMock() + adapter.subscribe = AsyncMock() + adapter.on_price = MagicMock() + adapter.on_news = MagicMock() + return adapter + + async def test_streaming_connect_and_subscribe_active_tickers(self) -> None: + monitor = _make_monitor() + monitor.store.get_active.return_value = [ + _alert("a1", "AAPL", 200.0), + _alert("a2", "AAPL", 180.0), + _alert("a3", "TSLA", 250.0), + ] + executor = _make_executor() + adapter = self._streaming_adapter() + + server = Server(monitor, executor, streaming_adapter=adapter) + await server._start_streaming() + + adapter.connect.assert_awaited_once() + adapter.on_price.assert_called_once() + subscribed = adapter.subscribe.call_args[0][0] + assert sorted(subscribed) == ["AAPL", "TSLA"] + + async def test_streaming_connect_failure_falls_back_to_polling(self) -> None: + monitor = _make_monitor() + monitor.store.get_active.return_value = [] + executor = _make_executor() + adapter = self._streaming_adapter() + adapter.connect = AsyncMock(side_effect=ConnectionError("down")) + + server = Server(monitor, executor, streaming_adapter=adapter) + await server._start_streaming() + + # Failed connect clears the adapter — the tick loop keeps running. + assert server._streaming_adapter is None + adapter.subscribe.assert_not_awaited() + + async def test_stop_streaming_disconnects(self) -> None: + monitor = _make_monitor() + monitor.store.get_active.return_value = [] + executor = _make_executor() + adapter = self._streaming_adapter() + + server = Server(monitor, executor, streaming_adapter=adapter) + await server._start_streaming() + await server._stop_streaming() + + adapter.disconnect.assert_awaited_once() + + async def test_stream_price_evaluates_and_notifies(self) -> None: + result = MagicMock() + result.message = "AAPL above 200" + monitor = _make_monitor() + monitor.evaluate_price = MagicMock(return_value=[result]) + executor = _make_executor() + notifications = MagicMock() + notifications.channels = ["telegram"] + notifications.notify = AsyncMock(return_value={"telegram": True}) + + server = Server(monitor, executor, notifications) + await server._on_stream_price("AAPL", 210.0) + + monitor.evaluate_price.assert_called_once_with("AAPL", 210.0) + notifications.notify.assert_awaited_once() + + async def test_stream_price_swallows_evaluation_errors(self) -> None: + monitor = _make_monitor() + monitor.evaluate_price = MagicMock(side_effect=RuntimeError("boom")) + executor = _make_executor() + notifications = MagicMock() + notifications.channels = ["telegram"] + notifications.notify = AsyncMock() + + server = Server(monitor, executor, notifications) + await server._on_stream_price("AAPL", 210.0) # Must not raise + + notifications.notify.assert_not_awaited() + + # --------------------------------------------------------------------------- # Telegram bot command integration # --------------------------------------------------------------------------- diff --git a/uv.lock b/uv.lock index 06cad85..876cdb5 100644 --- a/uv.lock +++ b/uv.lock @@ -1406,6 +1406,9 @@ openai = [ pdf = [ { name = "fpdf2" }, ] +streaming = [ + { name = "websockets" }, +] web = [ { name = "fastapi" }, { name = "uvicorn" }, @@ -1439,9 +1442,10 @@ requires-dist = [ { name = "textual", specifier = ">=0.70.0" }, { name = "tomli", marker = "python_full_version < '3.11'", specifier = ">=2.0.0" }, { name = "uvicorn", marker = "extra == 'web'", specifier = ">=0.24.0" }, + { name = "websockets", marker = "extra == 'streaming'", specifier = ">=12.0" }, { name = "yfinance", specifier = ">=0.2.0" }, ] -provides-extras = ["openai", "gemini", "finnhub", "fred", "all-llm", "web", "pdf"] +provides-extras = ["openai", "gemini", "finnhub", "fred", "all-llm", "web", "pdf", "streaming"] [package.metadata.requires-dev] dev = [