Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion qracer/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,16 +381,46 @@ async def _repl_loop(
alert_monitor: object | None = None,
task_executor: object | None = None,
data_registry: object | None = None,
sessions_dir: Path | None = None,
current_session: Path | None = None,
) -> None:
"""Run the interactive read-eval-print loop."""
from qracer.alert_monitor import AlertMonitor
from qracer.config.loader import has_config_changed
from qracer.conversation.quickpath import generate_briefing
from qracer.data.registry import DataRegistry
from qracer.task_executor import TaskExecutor
from qracer.watchlist import Watchlist

click.echo(BANNER)
monitor: AlertMonitor | None = alert_monitor # type: ignore[assignment]
executor: TaskExecutor | None = task_executor # type: ignore[assignment]

# One-time session-start briefing summarising activity since the last run.
if (
sessions_dir is not None
and isinstance(data_registry, DataRegistry)
and isinstance(watchlist, Watchlist)
and monitor is not None
and executor is not None
):
try:
briefing = await generate_briefing(
watchlist,
data_registry,
monitor.store,
executor.store,
sessions_dir,
current_session=current_session,
)
except Exception:
logger.debug("Session briefing generation failed", exc_info=True)
briefing = None
if briefing:
click.echo(briefing)
click.echo()

click.echo(BANNER)

while True:
# Check alerts on each iteration if enough time has elapsed.
if monitor and monitor.should_check():
Expand Down Expand Up @@ -926,6 +956,8 @@ def repl() -> None:
alert_monitor=alert_monitor,
task_executor=task_executor,
data_registry=data_registry,
sessions_dir=sessions_dir,
current_session=session_logger.path,
)
)

Expand Down
181 changes: 180 additions & 1 deletion qracer/conversation/quickpath.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,20 @@

from __future__ import annotations

from datetime import datetime
import logging
from datetime import datetime, timezone
from pathlib import Path

from qracer.alerts import Alert, AlertStore
from qracer.conversation.intent import Intent, IntentType
from qracer.data.providers import PriceProvider
from qracer.data.registry import DataRegistry
from qracer.models import ToolResult
from qracer.risk.models import PortfolioSnapshot
from qracer.tasks import TaskStore
from qracer.watchlist import Watchlist

logger = logging.getLogger(__name__)

# ---------------------------------------------------------------------------
# i18n template lookup tables
Expand Down Expand Up @@ -191,3 +200,173 @@ def _format_generic(intent: Intent, results: list[ToolResult], *, language: str
for r in successful:
parts.append(f"[{r.tool}] {r.source}: {len(r.data)} fields")
return "\n".join(parts)


# ---------------------------------------------------------------------------
# Session-start briefing
# ---------------------------------------------------------------------------


async def generate_briefing(
watchlist: Watchlist,
data_registry: DataRegistry,
alert_store: AlertStore,
task_store: TaskStore,
sessions_dir: Path,
current_session: Path | None = None,
) -> str | None:
"""Generate a session-start briefing.

Summarises activity since the previous session: current watchlist
prices, alerts that triggered while away, and any pending scheduled
tasks. Returns ``None`` when there is no previous session on disk
or when nothing noteworthy is available.

Args:
watchlist: User's ticker watchlist.
data_registry: Provides ``PriceProvider`` for live price lookups.
alert_store: Source of triggered alerts.
task_store: Source of pending scheduled tasks.
sessions_dir: Directory containing session JSONL log files.
current_session: Path of the current session log; excluded from
"last session" detection so the briefing reflects activity
since the previous run, not the current one.
"""
last_session = _find_last_session(sessions_dir, current_session=current_session)
if last_session is None:
return None

lines: list[str] = ["Session Briefing", "=" * 40, ""]
last_session_label = last_session.astimezone().strftime("%Y-%m-%d %H:%M")
lines.append(f"Since last session ({last_session_label}):")
lines.append("")
has_content = False

# Watchlist prices
price_lines = await _briefing_price_lines(watchlist, data_registry)
if price_lines:
lines.append("Watchlist:")
lines.extend(price_lines)
lines.append("")
has_content = True

# Alerts triggered since the last session
triggered_lines = _briefing_alert_lines(alert_store, since=last_session)
if triggered_lines:
lines.append(f"Triggered Alerts ({len(triggered_lines)}):")
lines.extend(triggered_lines)
lines.append("")
has_content = True

# Pending tasks
task_lines, pending_count = _briefing_task_lines(task_store)
if task_lines:
lines.append(f"Pending Tasks ({pending_count}):")
lines.extend(task_lines)
lines.append("")
has_content = True

if not has_content:
return None

# Drop trailing blank lines.
while lines and lines[-1] == "":
lines.pop()
return "\n".join(lines)


def _find_last_session(
sessions_dir: Path,
current_session: Path | None = None,
) -> datetime | None:
"""Return the mtime of the most recent session log file.

Excludes ``current_session`` (when provided) so that callers can ask
"what is the previous session?" even after the current session has
started writing.
"""
if not sessions_dir.exists():
return None

try:
files = list(sessions_dir.glob("*.jsonl"))
except OSError:
return None

current_resolved: Path | None = None
if current_session is not None:
try:
current_resolved = current_session.resolve()
except OSError:
current_resolved = current_session

candidates: list[tuple[float, Path]] = []
for path in files:
try:
resolved = path.resolve()
except OSError:
resolved = path
if current_resolved is not None and resolved == current_resolved:
continue
try:
mtime = path.stat().st_mtime
except OSError:
continue
candidates.append((mtime, path))

if not candidates:
return None

candidates.sort(key=lambda item: item[0], reverse=True)
return datetime.fromtimestamp(candidates[0][0], tz=timezone.utc)


async def _briefing_price_lines(
watchlist: Watchlist,
data_registry: DataRegistry,
) -> list[str]:
"""Fetch current prices for watchlist tickers, skipping failures."""
lines: list[str] = []
for ticker in watchlist.tickers:
try:
price = await data_registry.async_get_with_fallback(PriceProvider, "get_price", ticker)
except Exception:
logger.debug("Briefing price fetch failed for %s", ticker, exc_info=True)
continue
if not isinstance(price, (int, float)):
continue
lines.append(f" {ticker}: ${price:,.2f}")
return lines


def _briefing_alert_lines(alert_store: AlertStore, since: datetime) -> list[str]:
"""Return formatted lines for alerts triggered after ``since``."""
triggered: list[tuple[datetime, Alert]] = []
for alert in alert_store.alerts:
if alert.active or not alert.triggered_at:
continue
try:
dt = datetime.fromisoformat(alert.triggered_at)
except ValueError:
continue
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
if dt > since:
triggered.append((dt, alert))

triggered.sort(key=lambda item: item[0], reverse=True)
return [
f" - {alert.describe()} (at {dt.astimezone().strftime('%Y-%m-%d %H:%M')})"
for dt, alert in triggered
]


def _briefing_task_lines(task_store: TaskStore) -> tuple[list[str], int]:
"""Return formatted lines for the first few pending tasks plus the total count."""
pending = task_store.get_active()
if not pending:
return [], 0
lines = [f" [{t.id}] {t.describe()}" for t in pending[:5]]
if len(pending) > 5:
lines.append(f" ... and {len(pending) - 5} more")
return lines, len(pending)
Loading
Loading