From 943fc12f928be1263987632d85e38d2cec37b73e Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 8 Apr 2026 06:25:39 +0000 Subject: [PATCH] feat: watchlist-based daily briefing on session start (closes #140) Add `generate_briefing()` to surface a one-time summary at REPL startup of activity since the previous session: current watchlist prices, alerts that triggered while away, and pending scheduled tasks. Detects the prior session via the most recent JSONL log mtime in the sessions directory, excluding the current session. The REPL loop calls the briefing once before printing the banner. The briefing is silently skipped on first-ever runs (no previous session) or when there is nothing noteworthy to report, and any failures are logged at debug level so they never block the REPL. https://claude.ai/code/session_01TDznA5KxCsxF5oeaBy3JH6 --- qracer/cli.py | 34 ++++- qracer/conversation/quickpath.py | 181 +++++++++++++++++++++++++- tests/conversation/test_quickpath.py | 182 ++++++++++++++++++++++++++- 3 files changed, 394 insertions(+), 3 deletions(-) diff --git a/qracer/cli.py b/qracer/cli.py index ec47581..98b62fe 100644 --- a/qracer/cli.py +++ b/qracer/cli.py @@ -381,16 +381,46 @@ async def _repl_loop( alert_monitor: object | None = None, task_executor: object | None = None, data_registry: object | None = None, + sessions_dir: Path | None = None, + current_session: Path | None = None, ) -> None: """Run the interactive read-eval-print loop.""" from qracer.alert_monitor import AlertMonitor from qracer.config.loader import has_config_changed + from qracer.conversation.quickpath import generate_briefing + from qracer.data.registry import DataRegistry from qracer.task_executor import TaskExecutor + from qracer.watchlist import Watchlist - click.echo(BANNER) monitor: AlertMonitor | None = alert_monitor # type: ignore[assignment] executor: TaskExecutor | None = task_executor # type: ignore[assignment] + # One-time session-start briefing summarising activity since the last run. + if ( + sessions_dir is not None + and isinstance(data_registry, DataRegistry) + and isinstance(watchlist, Watchlist) + and monitor is not None + and executor is not None + ): + try: + briefing = await generate_briefing( + watchlist, + data_registry, + monitor.store, + executor.store, + sessions_dir, + current_session=current_session, + ) + except Exception: + logger.debug("Session briefing generation failed", exc_info=True) + briefing = None + if briefing: + click.echo(briefing) + click.echo() + + click.echo(BANNER) + while True: # Check alerts on each iteration if enough time has elapsed. if monitor and monitor.should_check(): @@ -926,6 +956,8 @@ def repl() -> None: alert_monitor=alert_monitor, task_executor=task_executor, data_registry=data_registry, + sessions_dir=sessions_dir, + current_session=session_logger.path, ) ) diff --git a/qracer/conversation/quickpath.py b/qracer/conversation/quickpath.py index 24ccd0d..806d768 100644 --- a/qracer/conversation/quickpath.py +++ b/qracer/conversation/quickpath.py @@ -6,11 +6,20 @@ from __future__ import annotations -from datetime import datetime +import logging +from datetime import datetime, timezone +from pathlib import Path +from qracer.alerts import Alert, AlertStore from qracer.conversation.intent import Intent, IntentType +from qracer.data.providers import PriceProvider +from qracer.data.registry import DataRegistry from qracer.models import ToolResult from qracer.risk.models import PortfolioSnapshot +from qracer.tasks import TaskStore +from qracer.watchlist import Watchlist + +logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # i18n template lookup tables @@ -191,3 +200,173 @@ def _format_generic(intent: Intent, results: list[ToolResult], *, language: str for r in successful: parts.append(f"[{r.tool}] {r.source}: {len(r.data)} fields") return "\n".join(parts) + + +# --------------------------------------------------------------------------- +# Session-start briefing +# --------------------------------------------------------------------------- + + +async def generate_briefing( + watchlist: Watchlist, + data_registry: DataRegistry, + alert_store: AlertStore, + task_store: TaskStore, + sessions_dir: Path, + current_session: Path | None = None, +) -> str | None: + """Generate a session-start briefing. + + Summarises activity since the previous session: current watchlist + prices, alerts that triggered while away, and any pending scheduled + tasks. Returns ``None`` when there is no previous session on disk + or when nothing noteworthy is available. + + Args: + watchlist: User's ticker watchlist. + data_registry: Provides ``PriceProvider`` for live price lookups. + alert_store: Source of triggered alerts. + task_store: Source of pending scheduled tasks. + sessions_dir: Directory containing session JSONL log files. + current_session: Path of the current session log; excluded from + "last session" detection so the briefing reflects activity + since the previous run, not the current one. + """ + last_session = _find_last_session(sessions_dir, current_session=current_session) + if last_session is None: + return None + + lines: list[str] = ["Session Briefing", "=" * 40, ""] + last_session_label = last_session.astimezone().strftime("%Y-%m-%d %H:%M") + lines.append(f"Since last session ({last_session_label}):") + lines.append("") + has_content = False + + # Watchlist prices + price_lines = await _briefing_price_lines(watchlist, data_registry) + if price_lines: + lines.append("Watchlist:") + lines.extend(price_lines) + lines.append("") + has_content = True + + # Alerts triggered since the last session + triggered_lines = _briefing_alert_lines(alert_store, since=last_session) + if triggered_lines: + lines.append(f"Triggered Alerts ({len(triggered_lines)}):") + lines.extend(triggered_lines) + lines.append("") + has_content = True + + # Pending tasks + task_lines, pending_count = _briefing_task_lines(task_store) + if task_lines: + lines.append(f"Pending Tasks ({pending_count}):") + lines.extend(task_lines) + lines.append("") + has_content = True + + if not has_content: + return None + + # Drop trailing blank lines. + while lines and lines[-1] == "": + lines.pop() + return "\n".join(lines) + + +def _find_last_session( + sessions_dir: Path, + current_session: Path | None = None, +) -> datetime | None: + """Return the mtime of the most recent session log file. + + Excludes ``current_session`` (when provided) so that callers can ask + "what is the previous session?" even after the current session has + started writing. + """ + if not sessions_dir.exists(): + return None + + try: + files = list(sessions_dir.glob("*.jsonl")) + except OSError: + return None + + current_resolved: Path | None = None + if current_session is not None: + try: + current_resolved = current_session.resolve() + except OSError: + current_resolved = current_session + + candidates: list[tuple[float, Path]] = [] + for path in files: + try: + resolved = path.resolve() + except OSError: + resolved = path + if current_resolved is not None and resolved == current_resolved: + continue + try: + mtime = path.stat().st_mtime + except OSError: + continue + candidates.append((mtime, path)) + + if not candidates: + return None + + candidates.sort(key=lambda item: item[0], reverse=True) + return datetime.fromtimestamp(candidates[0][0], tz=timezone.utc) + + +async def _briefing_price_lines( + watchlist: Watchlist, + data_registry: DataRegistry, +) -> list[str]: + """Fetch current prices for watchlist tickers, skipping failures.""" + lines: list[str] = [] + for ticker in watchlist.tickers: + try: + price = await data_registry.async_get_with_fallback(PriceProvider, "get_price", ticker) + except Exception: + logger.debug("Briefing price fetch failed for %s", ticker, exc_info=True) + continue + if not isinstance(price, (int, float)): + continue + lines.append(f" {ticker}: ${price:,.2f}") + return lines + + +def _briefing_alert_lines(alert_store: AlertStore, since: datetime) -> list[str]: + """Return formatted lines for alerts triggered after ``since``.""" + triggered: list[tuple[datetime, Alert]] = [] + for alert in alert_store.alerts: + if alert.active or not alert.triggered_at: + continue + try: + dt = datetime.fromisoformat(alert.triggered_at) + except ValueError: + continue + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + if dt > since: + triggered.append((dt, alert)) + + triggered.sort(key=lambda item: item[0], reverse=True) + return [ + f" - {alert.describe()} (at {dt.astimezone().strftime('%Y-%m-%d %H:%M')})" + for dt, alert in triggered + ] + + +def _briefing_task_lines(task_store: TaskStore) -> tuple[list[str], int]: + """Return formatted lines for the first few pending tasks plus the total count.""" + pending = task_store.get_active() + if not pending: + return [], 0 + lines = [f" [{t.id}] {t.describe()}" for t in pending[:5]] + if len(pending) > 5: + lines.append(f" ... and {len(pending) - 5} more") + return lines, len(pending) diff --git a/tests/conversation/test_quickpath.py b/tests/conversation/test_quickpath.py index 96d8dec..1438b4c 100644 --- a/tests/conversation/test_quickpath.py +++ b/tests/conversation/test_quickpath.py @@ -2,9 +2,19 @@ from __future__ import annotations +import os +import time +from datetime import datetime, timedelta, timezone +from pathlib import Path + +from qracer.alerts import AlertCondition, AlertStore from qracer.conversation.intent import Intent, IntentType -from qracer.conversation.quickpath import format_quickpath +from qracer.conversation.quickpath import format_quickpath, generate_briefing +from qracer.data.providers import PriceProvider +from qracer.data.registry import DataRegistry from qracer.models import ToolResult +from qracer.tasks import TaskActionType, TaskStore +from qracer.watchlist import Watchlist def _price_result(ticker: str = "AAPL", price: float = 178.52) -> ToolResult: @@ -194,3 +204,173 @@ def test_quick_news_keywords(self) -> None: intent = asyncio.get_event_loop().run_until_complete(parser.parse("Any news on TSLA?")) assert intent.intent_type == IntentType.QUICK_NEWS + + +# --------------------------------------------------------------------------- +# Briefing helpers +# --------------------------------------------------------------------------- + + +class _FakePriceProvider: + """Async price provider returning canned values.""" + + def __init__(self, prices: dict[str, float]) -> None: + self._prices = prices + + async def get_price(self, ticker: str) -> float: + if ticker not in self._prices: + raise KeyError(f"No price for {ticker}") + return self._prices[ticker] + + async def get_ohlcv(self, ticker, start, end): # pragma: no cover - unused + return [] + + +def _make_price_registry(prices: dict[str, float]) -> DataRegistry: + registry = DataRegistry() + registry.register("fake", _FakePriceProvider(prices), [PriceProvider]) + return registry + + +def _make_previous_session(sessions_dir: Path, *, name: str = "prev.jsonl") -> Path: + """Create a previous session file with an mtime well before "now".""" + sessions_dir.mkdir(parents=True, exist_ok=True) + prev = sessions_dir / name + prev.write_text("{}\n", encoding="utf-8") + # Backdate so any "since last session" datetime comparisons are unambiguous. + past = time.time() - 3600 + os.utime(prev, (past, past)) + return prev + + +class TestGenerateBriefing: + async def test_returns_none_without_previous_session(self, tmp_path: Path) -> None: + sessions_dir = tmp_path / "sessions" + sessions_dir.mkdir() + watchlist = Watchlist(tmp_path / "watchlist.json") + registry = _make_price_registry({}) + alert_store = AlertStore(tmp_path / "alerts.json") + task_store = TaskStore(tmp_path / "tasks.json") + + result = await generate_briefing(watchlist, registry, alert_store, task_store, sessions_dir) + assert result is None + + async def test_excludes_current_session_from_history(self, tmp_path: Path) -> None: + sessions_dir = tmp_path / "sessions" + sessions_dir.mkdir() + current = sessions_dir / "current.jsonl" + current.write_text("", encoding="utf-8") + watchlist = Watchlist(tmp_path / "watchlist.json") + registry = _make_price_registry({}) + alert_store = AlertStore(tmp_path / "alerts.json") + task_store = TaskStore(tmp_path / "tasks.json") + + # Only the current session exists -> nothing to compare against. + result = await generate_briefing( + watchlist, + registry, + alert_store, + task_store, + sessions_dir, + current_session=current, + ) + assert result is None + + async def test_returns_none_when_nothing_to_report(self, tmp_path: Path) -> None: + sessions_dir = tmp_path / "sessions" + _make_previous_session(sessions_dir) + watchlist = Watchlist(tmp_path / "watchlist.json") # empty + registry = _make_price_registry({}) + alert_store = AlertStore(tmp_path / "alerts.json") # empty + task_store = TaskStore(tmp_path / "tasks.json") # empty + + result = await generate_briefing(watchlist, registry, alert_store, task_store, sessions_dir) + assert result is None + + async def test_includes_watchlist_prices(self, tmp_path: Path) -> None: + sessions_dir = tmp_path / "sessions" + _make_previous_session(sessions_dir) + watchlist = Watchlist(tmp_path / "watchlist.json") + watchlist.add("AAPL") + watchlist.add("MSFT") + registry = _make_price_registry({"AAPL": 178.52, "MSFT": 412.10}) + alert_store = AlertStore(tmp_path / "alerts.json") + task_store = TaskStore(tmp_path / "tasks.json") + + result = await generate_briefing(watchlist, registry, alert_store, task_store, sessions_dir) + assert result is not None + assert "Session Briefing" in result + assert "Watchlist:" in result + assert "AAPL" in result and "$178.52" in result + assert "MSFT" in result and "$412.10" in result + + async def test_skips_unavailable_prices(self, tmp_path: Path) -> None: + sessions_dir = tmp_path / "sessions" + _make_previous_session(sessions_dir) + watchlist = Watchlist(tmp_path / "watchlist.json") + watchlist.add("AAPL") + watchlist.add("UNKNOWN") + registry = _make_price_registry({"AAPL": 100.0}) # UNKNOWN raises + alert_store = AlertStore(tmp_path / "alerts.json") + task_store = TaskStore(tmp_path / "tasks.json") + + result = await generate_briefing(watchlist, registry, alert_store, task_store, sessions_dir) + assert result is not None + assert "AAPL" in result + assert "UNKNOWN" not in result + + async def test_includes_recent_triggered_alerts(self, tmp_path: Path) -> None: + sessions_dir = tmp_path / "sessions" + _make_previous_session(sessions_dir) + watchlist = Watchlist(tmp_path / "watchlist.json") + registry = _make_price_registry({}) + alert_store = AlertStore(tmp_path / "alerts.json") + task_store = TaskStore(tmp_path / "tasks.json") + + # Two alerts: one triggered after the previous session, one before. + recent = alert_store.create("AAPL", AlertCondition.ABOVE, 200.0) + stale = alert_store.create("MSFT", AlertCondition.BELOW, 300.0) + alert_store.mark_triggered(recent.id, 210.0) + # Backdate the stale alert's triggered_at so it falls before the + # previous session timestamp. + for alert in alert_store.alerts: + if alert.id == stale.id: + alert.active = False + alert.triggered_at = (datetime.now(timezone.utc) - timedelta(days=2)).isoformat() + alert_store._save() # type: ignore[attr-defined] + + result = await generate_briefing(watchlist, registry, alert_store, task_store, sessions_dir) + assert result is not None + assert "Triggered Alerts (1)" in result + assert "AAPL goes above 200.0" in result + assert "MSFT" not in result + + async def test_includes_pending_tasks(self, tmp_path: Path) -> None: + sessions_dir = tmp_path / "sessions" + _make_previous_session(sessions_dir) + watchlist = Watchlist(tmp_path / "watchlist.json") + registry = _make_price_registry({}) + alert_store = AlertStore(tmp_path / "alerts.json") + task_store = TaskStore(tmp_path / "tasks.json") + task_store.create(TaskActionType.NEWS_SCAN, {"ticker": "AAPL"}, "every 1h") + + result = await generate_briefing(watchlist, registry, alert_store, task_store, sessions_dir) + assert result is not None + assert "Pending Tasks (1)" in result + assert "news scan" in result + assert "AAPL" in result + + async def test_truncates_pending_tasks_over_five(self, tmp_path: Path) -> None: + sessions_dir = tmp_path / "sessions" + _make_previous_session(sessions_dir) + watchlist = Watchlist(tmp_path / "watchlist.json") + registry = _make_price_registry({}) + alert_store = AlertStore(tmp_path / "alerts.json") + task_store = TaskStore(tmp_path / "tasks.json") + for i in range(7): + task_store.create(TaskActionType.NEWS_SCAN, {"ticker": f"T{i}"}, "every 1h") + + result = await generate_briefing(watchlist, registry, alert_store, task_store, sessions_dir) + assert result is not None + assert "Pending Tasks (7)" in result + assert "and 2 more" in result