diff --git a/qracer/cli.py b/qracer/cli.py index ec47581..98b62fe 100644 --- a/qracer/cli.py +++ b/qracer/cli.py @@ -381,16 +381,46 @@ async def _repl_loop( alert_monitor: object | None = None, task_executor: object | None = None, data_registry: object | None = None, + sessions_dir: Path | None = None, + current_session: Path | None = None, ) -> None: """Run the interactive read-eval-print loop.""" from qracer.alert_monitor import AlertMonitor from qracer.config.loader import has_config_changed + from qracer.conversation.quickpath import generate_briefing + from qracer.data.registry import DataRegistry from qracer.task_executor import TaskExecutor + from qracer.watchlist import Watchlist - click.echo(BANNER) monitor: AlertMonitor | None = alert_monitor # type: ignore[assignment] executor: TaskExecutor | None = task_executor # type: ignore[assignment] + # One-time session-start briefing summarising activity since the last run. + if ( + sessions_dir is not None + and isinstance(data_registry, DataRegistry) + and isinstance(watchlist, Watchlist) + and monitor is not None + and executor is not None + ): + try: + briefing = await generate_briefing( + watchlist, + data_registry, + monitor.store, + executor.store, + sessions_dir, + current_session=current_session, + ) + except Exception: + logger.debug("Session briefing generation failed", exc_info=True) + briefing = None + if briefing: + click.echo(briefing) + click.echo() + + click.echo(BANNER) + while True: # Check alerts on each iteration if enough time has elapsed. if monitor and monitor.should_check(): @@ -926,6 +956,8 @@ def repl() -> None: alert_monitor=alert_monitor, task_executor=task_executor, data_registry=data_registry, + sessions_dir=sessions_dir, + current_session=session_logger.path, ) ) diff --git a/qracer/conversation/quickpath.py b/qracer/conversation/quickpath.py index 24ccd0d..806d768 100644 --- a/qracer/conversation/quickpath.py +++ b/qracer/conversation/quickpath.py @@ -6,11 +6,20 @@ from __future__ import annotations -from datetime import datetime +import logging +from datetime import datetime, timezone +from pathlib import Path +from qracer.alerts import Alert, AlertStore from qracer.conversation.intent import Intent, IntentType +from qracer.data.providers import PriceProvider +from qracer.data.registry import DataRegistry from qracer.models import ToolResult from qracer.risk.models import PortfolioSnapshot +from qracer.tasks import TaskStore +from qracer.watchlist import Watchlist + +logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # i18n template lookup tables @@ -191,3 +200,173 @@ def _format_generic(intent: Intent, results: list[ToolResult], *, language: str for r in successful: parts.append(f"[{r.tool}] {r.source}: {len(r.data)} fields") return "\n".join(parts) + + +# --------------------------------------------------------------------------- +# Session-start briefing +# --------------------------------------------------------------------------- + + +async def generate_briefing( + watchlist: Watchlist, + data_registry: DataRegistry, + alert_store: AlertStore, + task_store: TaskStore, + sessions_dir: Path, + current_session: Path | None = None, +) -> str | None: + """Generate a session-start briefing. + + Summarises activity since the previous session: current watchlist + prices, alerts that triggered while away, and any pending scheduled + tasks. Returns ``None`` when there is no previous session on disk + or when nothing noteworthy is available. + + Args: + watchlist: User's ticker watchlist. + data_registry: Provides ``PriceProvider`` for live price lookups. + alert_store: Source of triggered alerts. + task_store: Source of pending scheduled tasks. + sessions_dir: Directory containing session JSONL log files. + current_session: Path of the current session log; excluded from + "last session" detection so the briefing reflects activity + since the previous run, not the current one. + """ + last_session = _find_last_session(sessions_dir, current_session=current_session) + if last_session is None: + return None + + lines: list[str] = ["Session Briefing", "=" * 40, ""] + last_session_label = last_session.astimezone().strftime("%Y-%m-%d %H:%M") + lines.append(f"Since last session ({last_session_label}):") + lines.append("") + has_content = False + + # Watchlist prices + price_lines = await _briefing_price_lines(watchlist, data_registry) + if price_lines: + lines.append("Watchlist:") + lines.extend(price_lines) + lines.append("") + has_content = True + + # Alerts triggered since the last session + triggered_lines = _briefing_alert_lines(alert_store, since=last_session) + if triggered_lines: + lines.append(f"Triggered Alerts ({len(triggered_lines)}):") + lines.extend(triggered_lines) + lines.append("") + has_content = True + + # Pending tasks + task_lines, pending_count = _briefing_task_lines(task_store) + if task_lines: + lines.append(f"Pending Tasks ({pending_count}):") + lines.extend(task_lines) + lines.append("") + has_content = True + + if not has_content: + return None + + # Drop trailing blank lines. + while lines and lines[-1] == "": + lines.pop() + return "\n".join(lines) + + +def _find_last_session( + sessions_dir: Path, + current_session: Path | None = None, +) -> datetime | None: + """Return the mtime of the most recent session log file. + + Excludes ``current_session`` (when provided) so that callers can ask + "what is the previous session?" even after the current session has + started writing. + """ + if not sessions_dir.exists(): + return None + + try: + files = list(sessions_dir.glob("*.jsonl")) + except OSError: + return None + + current_resolved: Path | None = None + if current_session is not None: + try: + current_resolved = current_session.resolve() + except OSError: + current_resolved = current_session + + candidates: list[tuple[float, Path]] = [] + for path in files: + try: + resolved = path.resolve() + except OSError: + resolved = path + if current_resolved is not None and resolved == current_resolved: + continue + try: + mtime = path.stat().st_mtime + except OSError: + continue + candidates.append((mtime, path)) + + if not candidates: + return None + + candidates.sort(key=lambda item: item[0], reverse=True) + return datetime.fromtimestamp(candidates[0][0], tz=timezone.utc) + + +async def _briefing_price_lines( + watchlist: Watchlist, + data_registry: DataRegistry, +) -> list[str]: + """Fetch current prices for watchlist tickers, skipping failures.""" + lines: list[str] = [] + for ticker in watchlist.tickers: + try: + price = await data_registry.async_get_with_fallback(PriceProvider, "get_price", ticker) + except Exception: + logger.debug("Briefing price fetch failed for %s", ticker, exc_info=True) + continue + if not isinstance(price, (int, float)): + continue + lines.append(f" {ticker}: ${price:,.2f}") + return lines + + +def _briefing_alert_lines(alert_store: AlertStore, since: datetime) -> list[str]: + """Return formatted lines for alerts triggered after ``since``.""" + triggered: list[tuple[datetime, Alert]] = [] + for alert in alert_store.alerts: + if alert.active or not alert.triggered_at: + continue + try: + dt = datetime.fromisoformat(alert.triggered_at) + except ValueError: + continue + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + if dt > since: + triggered.append((dt, alert)) + + triggered.sort(key=lambda item: item[0], reverse=True) + return [ + f" - {alert.describe()} (at {dt.astimezone().strftime('%Y-%m-%d %H:%M')})" + for dt, alert in triggered + ] + + +def _briefing_task_lines(task_store: TaskStore) -> tuple[list[str], int]: + """Return formatted lines for the first few pending tasks plus the total count.""" + pending = task_store.get_active() + if not pending: + return [], 0 + lines = [f" [{t.id}] {t.describe()}" for t in pending[:5]] + if len(pending) > 5: + lines.append(f" ... and {len(pending) - 5} more") + return lines, len(pending) diff --git a/tests/conversation/test_quickpath.py b/tests/conversation/test_quickpath.py index 96d8dec..1438b4c 100644 --- a/tests/conversation/test_quickpath.py +++ b/tests/conversation/test_quickpath.py @@ -2,9 +2,19 @@ from __future__ import annotations +import os +import time +from datetime import datetime, timedelta, timezone +from pathlib import Path + +from qracer.alerts import AlertCondition, AlertStore from qracer.conversation.intent import Intent, IntentType -from qracer.conversation.quickpath import format_quickpath +from qracer.conversation.quickpath import format_quickpath, generate_briefing +from qracer.data.providers import PriceProvider +from qracer.data.registry import DataRegistry from qracer.models import ToolResult +from qracer.tasks import TaskActionType, TaskStore +from qracer.watchlist import Watchlist def _price_result(ticker: str = "AAPL", price: float = 178.52) -> ToolResult: @@ -194,3 +204,173 @@ def test_quick_news_keywords(self) -> None: intent = asyncio.get_event_loop().run_until_complete(parser.parse("Any news on TSLA?")) assert intent.intent_type == IntentType.QUICK_NEWS + + +# --------------------------------------------------------------------------- +# Briefing helpers +# --------------------------------------------------------------------------- + + +class _FakePriceProvider: + """Async price provider returning canned values.""" + + def __init__(self, prices: dict[str, float]) -> None: + self._prices = prices + + async def get_price(self, ticker: str) -> float: + if ticker not in self._prices: + raise KeyError(f"No price for {ticker}") + return self._prices[ticker] + + async def get_ohlcv(self, ticker, start, end): # pragma: no cover - unused + return [] + + +def _make_price_registry(prices: dict[str, float]) -> DataRegistry: + registry = DataRegistry() + registry.register("fake", _FakePriceProvider(prices), [PriceProvider]) + return registry + + +def _make_previous_session(sessions_dir: Path, *, name: str = "prev.jsonl") -> Path: + """Create a previous session file with an mtime well before "now".""" + sessions_dir.mkdir(parents=True, exist_ok=True) + prev = sessions_dir / name + prev.write_text("{}\n", encoding="utf-8") + # Backdate so any "since last session" datetime comparisons are unambiguous. + past = time.time() - 3600 + os.utime(prev, (past, past)) + return prev + + +class TestGenerateBriefing: + async def test_returns_none_without_previous_session(self, tmp_path: Path) -> None: + sessions_dir = tmp_path / "sessions" + sessions_dir.mkdir() + watchlist = Watchlist(tmp_path / "watchlist.json") + registry = _make_price_registry({}) + alert_store = AlertStore(tmp_path / "alerts.json") + task_store = TaskStore(tmp_path / "tasks.json") + + result = await generate_briefing(watchlist, registry, alert_store, task_store, sessions_dir) + assert result is None + + async def test_excludes_current_session_from_history(self, tmp_path: Path) -> None: + sessions_dir = tmp_path / "sessions" + sessions_dir.mkdir() + current = sessions_dir / "current.jsonl" + current.write_text("", encoding="utf-8") + watchlist = Watchlist(tmp_path / "watchlist.json") + registry = _make_price_registry({}) + alert_store = AlertStore(tmp_path / "alerts.json") + task_store = TaskStore(tmp_path / "tasks.json") + + # Only the current session exists -> nothing to compare against. + result = await generate_briefing( + watchlist, + registry, + alert_store, + task_store, + sessions_dir, + current_session=current, + ) + assert result is None + + async def test_returns_none_when_nothing_to_report(self, tmp_path: Path) -> None: + sessions_dir = tmp_path / "sessions" + _make_previous_session(sessions_dir) + watchlist = Watchlist(tmp_path / "watchlist.json") # empty + registry = _make_price_registry({}) + alert_store = AlertStore(tmp_path / "alerts.json") # empty + task_store = TaskStore(tmp_path / "tasks.json") # empty + + result = await generate_briefing(watchlist, registry, alert_store, task_store, sessions_dir) + assert result is None + + async def test_includes_watchlist_prices(self, tmp_path: Path) -> None: + sessions_dir = tmp_path / "sessions" + _make_previous_session(sessions_dir) + watchlist = Watchlist(tmp_path / "watchlist.json") + watchlist.add("AAPL") + watchlist.add("MSFT") + registry = _make_price_registry({"AAPL": 178.52, "MSFT": 412.10}) + alert_store = AlertStore(tmp_path / "alerts.json") + task_store = TaskStore(tmp_path / "tasks.json") + + result = await generate_briefing(watchlist, registry, alert_store, task_store, sessions_dir) + assert result is not None + assert "Session Briefing" in result + assert "Watchlist:" in result + assert "AAPL" in result and "$178.52" in result + assert "MSFT" in result and "$412.10" in result + + async def test_skips_unavailable_prices(self, tmp_path: Path) -> None: + sessions_dir = tmp_path / "sessions" + _make_previous_session(sessions_dir) + watchlist = Watchlist(tmp_path / "watchlist.json") + watchlist.add("AAPL") + watchlist.add("UNKNOWN") + registry = _make_price_registry({"AAPL": 100.0}) # UNKNOWN raises + alert_store = AlertStore(tmp_path / "alerts.json") + task_store = TaskStore(tmp_path / "tasks.json") + + result = await generate_briefing(watchlist, registry, alert_store, task_store, sessions_dir) + assert result is not None + assert "AAPL" in result + assert "UNKNOWN" not in result + + async def test_includes_recent_triggered_alerts(self, tmp_path: Path) -> None: + sessions_dir = tmp_path / "sessions" + _make_previous_session(sessions_dir) + watchlist = Watchlist(tmp_path / "watchlist.json") + registry = _make_price_registry({}) + alert_store = AlertStore(tmp_path / "alerts.json") + task_store = TaskStore(tmp_path / "tasks.json") + + # Two alerts: one triggered after the previous session, one before. + recent = alert_store.create("AAPL", AlertCondition.ABOVE, 200.0) + stale = alert_store.create("MSFT", AlertCondition.BELOW, 300.0) + alert_store.mark_triggered(recent.id, 210.0) + # Backdate the stale alert's triggered_at so it falls before the + # previous session timestamp. + for alert in alert_store.alerts: + if alert.id == stale.id: + alert.active = False + alert.triggered_at = (datetime.now(timezone.utc) - timedelta(days=2)).isoformat() + alert_store._save() # type: ignore[attr-defined] + + result = await generate_briefing(watchlist, registry, alert_store, task_store, sessions_dir) + assert result is not None + assert "Triggered Alerts (1)" in result + assert "AAPL goes above 200.0" in result + assert "MSFT" not in result + + async def test_includes_pending_tasks(self, tmp_path: Path) -> None: + sessions_dir = tmp_path / "sessions" + _make_previous_session(sessions_dir) + watchlist = Watchlist(tmp_path / "watchlist.json") + registry = _make_price_registry({}) + alert_store = AlertStore(tmp_path / "alerts.json") + task_store = TaskStore(tmp_path / "tasks.json") + task_store.create(TaskActionType.NEWS_SCAN, {"ticker": "AAPL"}, "every 1h") + + result = await generate_briefing(watchlist, registry, alert_store, task_store, sessions_dir) + assert result is not None + assert "Pending Tasks (1)" in result + assert "news scan" in result + assert "AAPL" in result + + async def test_truncates_pending_tasks_over_five(self, tmp_path: Path) -> None: + sessions_dir = tmp_path / "sessions" + _make_previous_session(sessions_dir) + watchlist = Watchlist(tmp_path / "watchlist.json") + registry = _make_price_registry({}) + alert_store = AlertStore(tmp_path / "alerts.json") + task_store = TaskStore(tmp_path / "tasks.json") + for i in range(7): + task_store.create(TaskActionType.NEWS_SCAN, {"ticker": f"T{i}"}, "every 1h") + + result = await generate_briefing(watchlist, registry, alert_store, task_store, sessions_dir) + assert result is not None + assert "Pending Tasks (7)" in result + assert "and 2 more" in result