diff --git a/README.md b/README.md index ac15a28..e738e37 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,7 @@ Connects to the Comdirect REST API (read-only), normalizes your financial data i - **CSV/JSON export** — Accounts, transactions, depot positions, depot transactions, financial overview - **Finance API** — REST API for normalized financial data (transactions, categorization, aggregates) - **Normalization pipeline** — Ingests raw Comdirect data into a canonical schema in Postgres +- **Gmail mail evidence** — Read-only Gmail import for sanitized invoice/order evidence and transaction matching - **AI categorization** — LLM agents (pydantic-ai + Claude) categorize transactions, detect anomalies, generate monthly summaries - **MCP server** — Exposes the Finance API as MCP tools for agent use; read-only by default, with a tiny opt-in write allowlist for trusted local sessions - **Self-hostable** — `docker compose up` for a single laptop or a Helm chart for K3s/K8s; two-microservice split keeps bank credentials off the public-facing API diff --git a/alembic/versions/0026_portfolio_plan.py b/alembic/versions/0026_portfolio_plan.py index 11b86a6..939135a 100644 --- a/alembic/versions/0026_portfolio_plan.py +++ b/alembic/versions/0026_portfolio_plan.py @@ -9,6 +9,7 @@ import sqlalchemy as sa from alembic import op +from sqlalchemy.dialects import postgresql revision = "0026_portfolio_plan" down_revision = "0025_app_settings_own_ibans" @@ -16,16 +17,18 @@ depends_on = None -savings_plan_interval = sa.Enum( +savings_plan_interval = postgresql.ENUM( "monthly", "quarterly", "yearly", name="savings_plan_interval", + create_type=False, ) -portfolio_target_type = sa.Enum( +portfolio_target_type = postgresql.ENUM( "isin", "bucket", name="portfolio_target_type", + create_type=False, ) diff --git a/alembic/versions/0027_mail_evidence_context.py b/alembic/versions/0027_mail_evidence_context.py new file mode 100644 index 0000000..672a535 --- /dev/null +++ b/alembic/versions/0027_mail_evidence_context.py @@ -0,0 +1,167 @@ +"""mail evidence and analysis context semantics + +Revision ID: 0027_mail_evidence_context +Revises: 0026_transaction_links, 0026_portfolio_plan +Create Date: 2026-06-09 + +""" + +from __future__ import annotations + +from collections.abc import Sequence + +import sqlalchemy as sa +from alembic import op + + +revision: str = "0027_mail_evidence_context" +down_revision: str | Sequence[str] | None = ( + "0026_transaction_links", + "0026_portfolio_plan", +) +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + op.add_column( + "categories", + sa.Column("kind", sa.String(length=32), server_default="expense", nullable=False), + ) + op.add_column( + "categories", + sa.Column("budgetable", sa.Boolean(), server_default="true", nullable=False), + ) + op.add_column("categories", sa.Column("analysis_group", sa.String(length=64), nullable=True)) + op.add_column("categories", sa.Column("description", sa.String(length=500), nullable=True)) + op.add_column("categories", sa.Column("examples", sa.JSON(), nullable=True)) + op.add_column("categories", sa.Column("anti_examples", sa.JSON(), nullable=True)) + op.add_column("categories", sa.Column("llm_hints", sa.String(length=1000), nullable=True)) + + op.add_column( + "budgets", + sa.Column("is_active", sa.Boolean(), server_default="true", nullable=False), + ) + op.add_column( + "budgets", + sa.Column("priority", sa.Integer(), server_default="0", nullable=False), + ) + op.add_column( + "budgets", + sa.Column( + "warning_threshold", + sa.Numeric(4, 2), + server_default="0.80", + nullable=False, + ), + ) + op.add_column( + "budgets", + sa.Column( + "critical_threshold", + sa.Numeric(4, 2), + server_default="1.00", + nullable=False, + ), + ) + op.add_column("budgets", sa.Column("context_note", sa.String(length=500), nullable=True)) + + op.create_table( + "mail_evidence", + sa.Column("id", sa.String(length=64), nullable=False), + sa.Column("source", sa.String(length=32), nullable=False), + sa.Column("evidence_type", sa.String(length=32), nullable=False), + sa.Column("merchant_name", sa.String(length=200), nullable=True), + sa.Column("merchant_key", sa.String(length=200), nullable=True), + sa.Column("document_date", sa.Date(), nullable=True), + sa.Column("total_amount", sa.Numeric(14, 2), nullable=True), + sa.Column("currency", sa.String(length=3), nullable=False), + sa.Column("payment_method", sa.String(length=32), nullable=True), + sa.Column("payment_hint", sa.String(length=200), nullable=True), + sa.Column("order_ref_hash", sa.String(length=64), nullable=True), + sa.Column("subject_hint", sa.String(length=200), nullable=True), + sa.Column("redacted_snippet", sa.String(length=500), nullable=True), + sa.Column("line_items", sa.JSON(), nullable=True), + sa.Column("confidence", sa.Numeric(4, 3), server_default="0.000", nullable=False), + sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + sa.Column("updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint( + "source", + "order_ref_hash", + "evidence_type", + name="uq_mail_evidence_source_order_type", + ), + ) + op.create_index("ix_mail_evidence_document_date", "mail_evidence", ["document_date"]) + op.create_index("ix_mail_evidence_merchant_key", "mail_evidence", ["merchant_key"]) + op.create_index("ix_mail_evidence_order_ref_hash", "mail_evidence", ["order_ref_hash"]) + + op.create_table( + "transaction_evidence_links", + sa.Column("id", sa.String(length=64), nullable=False), + sa.Column("transaction_id", sa.String(length=64), nullable=False), + sa.Column("evidence_id", sa.String(length=64), nullable=False), + sa.Column("match_type", sa.String(length=32), nullable=False), + sa.Column("confidence", sa.Numeric(4, 3), nullable=False), + sa.Column("match_reason", sa.String(length=500), nullable=False), + sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False), + sa.ForeignKeyConstraint( + ["evidence_id"], + ["mail_evidence.id"], + ondelete="CASCADE", + ), + sa.ForeignKeyConstraint( + ["transaction_id"], + ["normalized_transactions.id"], + ondelete="CASCADE", + ), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint( + "transaction_id", + "evidence_id", + "match_type", + name="uq_transaction_evidence_link", + ), + ) + op.create_index( + "ix_transaction_evidence_links_evidence", + "transaction_evidence_links", + ["evidence_id"], + ) + op.create_index( + "ix_transaction_evidence_links_transaction", + "transaction_evidence_links", + ["transaction_id"], + ) + + +def downgrade() -> None: + op.drop_index( + "ix_transaction_evidence_links_transaction", + table_name="transaction_evidence_links", + ) + op.drop_index( + "ix_transaction_evidence_links_evidence", + table_name="transaction_evidence_links", + ) + op.drop_table("transaction_evidence_links") + + op.drop_index("ix_mail_evidence_order_ref_hash", table_name="mail_evidence") + op.drop_index("ix_mail_evidence_merchant_key", table_name="mail_evidence") + op.drop_index("ix_mail_evidence_document_date", table_name="mail_evidence") + op.drop_table("mail_evidence") + + op.drop_column("budgets", "context_note") + op.drop_column("budgets", "critical_threshold") + op.drop_column("budgets", "warning_threshold") + op.drop_column("budgets", "priority") + op.drop_column("budgets", "is_active") + + op.drop_column("categories", "llm_hints") + op.drop_column("categories", "anti_examples") + op.drop_column("categories", "examples") + op.drop_column("categories", "description") + op.drop_column("categories", "analysis_group") + op.drop_column("categories", "budgetable") + op.drop_column("categories", "kind") diff --git a/docs/integrations/gmail-mail-evidence.md b/docs/integrations/gmail-mail-evidence.md new file mode 100644 index 0000000..1a48e1d --- /dev/null +++ b/docs/integrations/gmail-mail-evidence.md @@ -0,0 +1,68 @@ +# Gmail Mail Evidence Import + +k-fin can import Gmail order, invoice, receipt, payment, and refund mails as +sanitized mail evidence for transaction matching. + +## Safety Boundary + +- Gmail access is read-only from k-fin's perspective. +- The import script uses `gws` to read Gmail messages. +- k-fin stores only extracted/sanitized evidence rows. +- Raw mail bodies, email addresses, IBANs, and order references are not persisted. +- The script does not archive, label, delete, send, or modify Gmail messages. + +## One-Off Import + +Run a dry-run first: + +```bash +uv run python scripts/import_gmail_evidence.py \ + --dry-run \ + --max-results 25 +``` + +Import into a running local API: + +```bash +FINANCE_API_TOKEN=... \ +uv run python scripts/import_gmail_evidence.py \ + --api-url http://127.0.0.1:8000 \ + --max-results 25 +``` + +Import into a deployed API: + +```bash +FINANCE_API_URL=https://k-fin.example.com \ +FINANCE_API_TOKEN=... \ +uv run python scripts/import_gmail_evidence.py \ + --max-results 25 +``` + +## Query Tuning + +The default Gmail query searches recent finance-like mails: + +```text +newer_than:45d (rechnung OR invoice OR quittung OR receipt OR bestellung OR order OR zahlung OR payment OR refund OR erstattung) +``` + +For a focused run, pass `--query`, for example: + +```bash +uv run python scripts/import_gmail_evidence.py \ + --dry-run \ + --query 'newer_than:45d (from:decathlon.de OR from:unzer.com OR from:paypal.de OR rechnung OR invoice)' +``` + +The script also applies a local quality gate before writing to k-fin: + +- empty bodies are skipped +- messages without an extracted amount are skipped +- messages below `--min-confidence` are skipped + +## API + +`POST /api/v1/mail-evidence/import` accepts the same `MailMessageImport` payload +as the older `/mock-import` endpoint. The service extracts the evidence, upserts +it idempotently, and matches it against transactions in a small date window. diff --git a/scripts/import_gmail_evidence.py b/scripts/import_gmail_evidence.py new file mode 100755 index 0000000..84b9bf8 --- /dev/null +++ b/scripts/import_gmail_evidence.py @@ -0,0 +1,160 @@ +#!/usr/bin/env python +"""Import read-only Gmail messages into k-fin mail evidence. + +The script shells out to ``gws`` for Gmail reads, converts each Gmail payload to +the mail-evidence import shape, and posts it to k-fin. It never writes to Gmail. +""" + +from __future__ import annotations + +import argparse +import json +import os +import subprocess +import sys +from pathlib import Path +from typing import Any +from urllib import error, request + +ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(ROOT)) + +from src.external.gmail_import import gmail_message_to_mail_import # noqa: E402 +from src.services.mail_evidence import extract_evidence_from_message, redact_free_text # noqa: E402 + +DEFAULT_QUERY = ( + "newer_than:45d " + "(rechnung OR invoice OR quittung OR receipt OR bestellung OR order OR zahlung OR payment OR refund OR erstattung)" +) + + +def _run_gws(args: list[str]) -> dict[str, Any]: + completed = subprocess.run( + ["gws", *args], + check=True, + capture_output=True, + text=True, + ) + try: + return json.loads(completed.stdout) + except json.JSONDecodeError as exc: + raise RuntimeError(f"gws returned non-JSON output: {completed.stdout[:200]}") from exc + + +def _list_message_ids(query: str, max_results: int) -> list[str]: + payload = _run_gws( + [ + "gmail", + "users", + "messages", + "list", + "--params", + json.dumps({"userId": "me", "q": query, "maxResults": max_results}), + ] + ) + messages = payload.get("messages") or [] + return [str(message["id"]) for message in messages if message.get("id")] + + +def _fetch_message(message_id: str) -> dict[str, Any]: + return _run_gws( + [ + "gmail", + "users", + "messages", + "get", + "--params", + json.dumps({"userId": "me", "id": message_id, "format": "full"}), + ] + ) + + +def _post_import(api_url: str, token: str, payload: dict[str, Any]) -> dict[str, Any]: + body = json.dumps(payload, default=str).encode("utf-8") + req = request.Request( + f"{api_url.rstrip('/')}/api/v1/mail-evidence/import", + data=body, + headers={ + "Authorization": f"Bearer {token}", + "Content-Type": "application/json", + }, + method="POST", + ) + with request.urlopen(req, timeout=30) as response: + return json.loads(response.read().decode("utf-8")) + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--api-url", default=os.getenv("FINANCE_API_URL", "http://127.0.0.1:8000")) + parser.add_argument("--token", default=os.getenv("FINANCE_API_TOKEN")) + parser.add_argument("--query", default=os.getenv("GMAIL_QUERY", DEFAULT_QUERY)) + parser.add_argument("--max-results", type=int, default=int(os.getenv("GMAIL_MAX_RESULTS", "25"))) + parser.add_argument("--min-confidence", default=os.getenv("GMAIL_MIN_CONFIDENCE", "0.50")) + parser.add_argument("--dry-run", action="store_true") + args = parser.parse_args() + + if not args.dry_run and not args.token: + parser.error("--token or FINANCE_API_TOKEN is required unless --dry-run is set") + + message_ids = _list_message_ids(args.query, args.max_results) + imported = 0 + matched = 0 + skipped = 0 + skipped_low_confidence = 0 + for message_id in message_ids: + gmail_payload = _fetch_message(message_id) + mail_import = gmail_message_to_mail_import(gmail_payload) + if not mail_import["body_text"]: + skipped += 1 + continue + draft = extract_evidence_from_message(mail_import) + if float(draft["confidence"]) < float(args.min_confidence) or draft["total_amount"] is None: + skipped_low_confidence += 1 + continue + if args.dry_run: + print( + json.dumps( + { + "source_message_id": mail_import["source_message_id"], + "received_at": mail_import["received_at"], + "sender": redact_free_text(mail_import["sender"], limit=120), + "subject_hint": redact_free_text(mail_import["subject"], limit=160), + "evidence_type": draft["evidence_type"], + "merchant_key": draft["merchant_key"], + "total_amount": draft["total_amount"], + "confidence": draft["confidence"], + "body_chars": len(mail_import["body_text"]), + }, + default=str, + ensure_ascii=False, + ) + ) + continue + try: + result = _post_import(args.api_url, args.token, mail_import) + except error.HTTPError as exc: + detail = exc.read().decode("utf-8", errors="replace") + raise RuntimeError(f"k-fin import failed for {message_id}: {exc.code} {detail}") from exc + imported += 1 + matched += len(result.get("links") or []) + + print( + json.dumps( + { + "query": args.query, + "seen": len(message_ids), + "imported": imported, + "matched_links": matched, + "skipped_empty_body": skipped, + "skipped_low_confidence": skipped_low_confidence, + "dry_run": args.dry_run, + }, + ensure_ascii=False, + ) + ) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/migrate.py b/scripts/migrate.py index c6f0fc7..c01b14c 100644 --- a/scripts/migrate.py +++ b/scripts/migrate.py @@ -26,7 +26,7 @@ def main(argv: list[str]) -> int: target = argv[1] if len(argv) > 1 else "upgrade" cfg = _config() if target == "upgrade": - command.upgrade(cfg, "head") + command.upgrade(cfg, "heads") elif target == "downgrade": command.downgrade(cfg, "base") elif target == "current": diff --git a/src/agents/anomaly.py b/src/agents/anomaly.py index d2c43d3..0bdc5e1 100644 --- a/src/agents/anomaly.py +++ b/src/agents/anomaly.py @@ -13,6 +13,7 @@ from src.agents._anthropic import make_anthropic_model from src.agents._runner import run_in_fresh_loop from src.agents._usage import AgentUsage, extract_usage +from src.agents.context import get_safe_analysis_context from src.agents.gather import ( get_new_counterparties, get_outlier_transactions, @@ -22,6 +23,7 @@ from src.agents.period import derive_period_label from src.agents.prompts.anomaly import ANOMALY_SYSTEM_PROMPT from src.agents.types import AnomalyResult +from src.services.llm_context import sanitize_context logger = logging.getLogger(__name__) @@ -78,6 +80,7 @@ def run_anomaly_detection( ) memory = get_recent_reports(engine, "anomaly", limit=2) + analysis_context = get_safe_analysis_context(engine, year=ref.year, month=ref.month) data = { "period": period, @@ -86,16 +89,19 @@ def run_anomaly_detection( "outlier_transactions": outliers, "new_counterparties": new_counterparties, "recent_transactions_sample": transactions[:30], + "analysis_context": analysis_context, } + safe_data = sanitize_context(data) + safe_memory = sanitize_context({"previous_reports": memory}) if memory else None prompt_parts = [ f"## Anomalie-Erkennung {period}\n", f"Analysezeitraum: {lookback_days} Tage\n", - f"### Daten\n\n```json\n{json.dumps(data, ensure_ascii=False, indent=2)}\n```\n", + f"### Daten\n\n```json\n{json.dumps(safe_data, ensure_ascii=False, indent=2)}\n```\n", ] - if memory: + if safe_memory: prompt_parts.append( f"### Vorherige Anomalie-Reports (Kontext)\n\n" - f"```json\n{json.dumps(memory, ensure_ascii=False, indent=2)}\n```\n" + f"```json\n{json.dumps(safe_memory, ensure_ascii=False, indent=2)}\n```\n" ) prompt_parts.append("Bewerte die Anomalien gemäß dem Schema.") diff --git a/src/agents/budget_analysis.py b/src/agents/budget_analysis.py new file mode 100644 index 0000000..2d85c2d --- /dev/null +++ b/src/agents/budget_analysis.py @@ -0,0 +1,58 @@ +"""Budget analysis agent — explains budget risks from deterministic context.""" + +from __future__ import annotations + +import json +import logging +from datetime import date + +from pydantic_ai import Agent +from pydantic_ai.settings import ModelSettings +from sqlalchemy import Engine + +from src.agents._anthropic import make_anthropic_model +from src.agents._runner import run_in_fresh_loop +from src.agents._usage import AgentUsage, extract_usage +from src.agents.context import get_safe_analysis_context +from src.agents.period import derive_period_label +from src.agents.prompts.budget_analysis import BUDGET_ANALYSIS_SYSTEM_PROMPT +from src.agents.types import AnalysisResult +from src.services.llm_context import sanitize_context + +logger = logging.getLogger(__name__) + +MODEL = "anthropic:claude-sonnet-4-6" +BUDGET_ANALYSIS_MAX_TOKENS = 8000 + +budget_analysis_agent = Agent( + make_anthropic_model(MODEL, prefer_prompted_output=True), + output_type=AnalysisResult, + system_prompt=BUDGET_ANALYSIS_SYSTEM_PROMPT, + retries=2, + model_settings=ModelSettings(max_tokens=BUDGET_ANALYSIS_MAX_TOKENS), +) + + +def run_budget_analysis( + engine: Engine, + reference_date: date | None = None, + *, + period_days: int | None = None, + usage: AgentUsage | None = None, +) -> AnalysisResult: + ref = reference_date or date.today() + period, first_day, _last_day = derive_period_label("monthly", period_days, ref) + + context = get_safe_analysis_context(engine, year=first_day.year, month=first_day.month) + + safe_context = sanitize_context(context) + prompt = ( + f"## Budget-Analyse {period}\n\n" + f"```json\n{json.dumps(safe_context, ensure_ascii=False, indent=2)}\n```\n\n" + "Erstelle die Budget-Analyse gemäß dem Schema." + ) + result = run_in_fresh_loop(budget_analysis_agent.run(prompt)) + if usage is not None: + in_t, out_t = extract_usage(result) + usage.add_call(MODEL, in_t, out_t) + return result.output diff --git a/src/agents/categorization.py b/src/agents/categorization.py index 5b9f0a0..d4b5461 100644 --- a/src/agents/categorization.py +++ b/src/agents/categorization.py @@ -41,6 +41,7 @@ from src.agents.types import CategorizationResult, CategorySuggestion from src.core.config import settings from src.core.db.models import NormalizedTransaction +from src.services.llm_context import sanitize_search_query logger = logging.getLogger(__name__) @@ -119,11 +120,14 @@ async def search_web(query: str) -> list[dict]: base = settings.searxng_url if not base: return [{"error": "search_disabled"}] + safe_query = sanitize_search_query(query) + if not safe_query: + return [{"error": "search_query_empty_after_sanitizing"}] try: async with httpx.AsyncClient(timeout=_SEARXNG_TIMEOUT_S) as client: resp = await client.get( f"{base.rstrip('/')}/search", - params={"q": query, "format": "json", "language": "de"}, + params={"q": safe_query, "format": "json", "language": "de"}, ) resp.raise_for_status() data = resp.json() diff --git a/src/agents/category_audit.py b/src/agents/category_audit.py new file mode 100644 index 0000000..6f36bbf --- /dev/null +++ b/src/agents/category_audit.py @@ -0,0 +1,58 @@ +"""Category audit agent — flags category/evidence inconsistencies.""" + +from __future__ import annotations + +import json +import logging +from datetime import date + +from pydantic_ai import Agent +from pydantic_ai.settings import ModelSettings +from sqlalchemy import Engine + +from src.agents._anthropic import make_anthropic_model +from src.agents._runner import run_in_fresh_loop +from src.agents._usage import AgentUsage, extract_usage +from src.agents.context import get_safe_analysis_context +from src.agents.period import derive_period_label +from src.agents.prompts.category_audit import CATEGORY_AUDIT_SYSTEM_PROMPT +from src.agents.types import AnalysisResult +from src.services.llm_context import sanitize_context + +logger = logging.getLogger(__name__) + +MODEL = "anthropic:claude-sonnet-4-6" +CATEGORY_AUDIT_MAX_TOKENS = 8000 + +category_audit_agent = Agent( + make_anthropic_model(MODEL, prefer_prompted_output=True), + output_type=AnalysisResult, + system_prompt=CATEGORY_AUDIT_SYSTEM_PROMPT, + retries=2, + model_settings=ModelSettings(max_tokens=CATEGORY_AUDIT_MAX_TOKENS), +) + + +def run_category_audit( + engine: Engine, + reference_date: date | None = None, + *, + period_days: int | None = None, + usage: AgentUsage | None = None, +) -> AnalysisResult: + ref = reference_date or date.today() + period, first_day, _last_day = derive_period_label("monthly", period_days, ref) + + context = get_safe_analysis_context(engine, year=first_day.year, month=first_day.month) + + safe_context = sanitize_context(context) + prompt = ( + f"## Kategorie-Audit {period}\n\n" + f"```json\n{json.dumps(safe_context, ensure_ascii=False, indent=2)}\n```\n\n" + "Erstelle das Kategorie-Audit gemäß dem Schema." + ) + result = run_in_fresh_loop(category_audit_agent.run(prompt)) + if usage is not None: + in_t, out_t = extract_usage(result) + usage.add_call(MODEL, in_t, out_t) + return result.output diff --git a/src/agents/context.py b/src/agents/context.py new file mode 100644 index 0000000..f0482cb --- /dev/null +++ b/src/agents/context.py @@ -0,0 +1,52 @@ +"""Shared deterministic context helpers for analysis agents.""" + +from __future__ import annotations + +import logging +from typing import Any + +from sqlalchemy.engine import Engine +from sqlalchemy.orm import Session + +from src.services import financial_aggregates + +logger = logging.getLogger(__name__) + + +def get_safe_analysis_context( + engine: Engine | None, + *, + year: int, + month: int, +) -> dict[str, Any]: + """Return analysis context when a real DB engine is available. + + Unit tests often exercise agents with ``engine=None`` or ``MagicMock`` to + verify prompt/date behavior without a DB. Production should get the full + deterministic context, while unavailable context is represented explicitly + instead of aborting the whole agent run. + """ + + if not isinstance(engine, Engine): + return { + "available": False, + "reason": "engine_unavailable", + "year": year, + "month": month, + } + + try: + with Session(engine) as session: + return financial_aggregates.analysis_context( + session, + year=year, + month=month, + ) + except Exception as exc: # noqa: BLE001 - analysis should degrade, not crash. + logger.warning("analysis_context unavailable for %04d-%02d: %s", year, month, exc) + return { + "available": False, + "reason": type(exc).__name__, + "year": year, + "month": month, + } diff --git a/src/agents/monthly_analysis.py b/src/agents/monthly_analysis.py index 7cb2762..afca546 100644 --- a/src/agents/monthly_analysis.py +++ b/src/agents/monthly_analysis.py @@ -13,6 +13,7 @@ from src.agents._anthropic import make_anthropic_model from src.agents._runner import run_in_fresh_loop from src.agents._usage import AgentUsage, extract_usage +from src.agents.context import get_safe_analysis_context from src.agents.gather import ( get_category_breakdown, get_monthly_summary, @@ -24,6 +25,7 @@ from src.agents.period import derive_period_label from src.agents.prompts.monthly_analysis import MONTHLY_ANALYSIS_SYSTEM_PROMPT from src.agents.types import AnalysisResult +from src.services.llm_context import sanitize_context logger = logging.getLogger(__name__) @@ -69,6 +71,9 @@ def run_monthly_analysis( recurring = get_recurring_patterns(engine) savings = get_savings_rate(engine, first_day, last_day) memory = get_recent_reports(engine, "monthly_analysis", limit=2) + analysis_context = get_safe_analysis_context( + engine, year=first_day.year, month=first_day.month + ) data = { "period": period, @@ -79,16 +84,19 @@ def run_monthly_analysis( "category_breakdown": categories, "recurring_patterns": recurring, "savings_rate": savings, + "analysis_context": analysis_context, } + safe_data = sanitize_context(data) + safe_memory = sanitize_context({"previous_reports": memory}) if memory else None prompt_parts = [ f"## Monatsanalyse {period}\n", f"Zeitraum: {data['date_range']}\n", - f"### Daten\n\n```json\n{json.dumps(data, ensure_ascii=False, indent=2)}\n```\n", + f"### Daten\n\n```json\n{json.dumps(safe_data, ensure_ascii=False, indent=2)}\n```\n", ] - if memory: + if safe_memory: prompt_parts.append( f"### Vorherige Monatsanalysen (Kontext)\n\n" - f"```json\n{json.dumps(memory, ensure_ascii=False, indent=2)}\n```\n" + f"```json\n{json.dumps(safe_memory, ensure_ascii=False, indent=2)}\n```\n" ) prompt_parts.append("Erstelle die Monatsanalyse gemäß dem Schema.") diff --git a/src/agents/orchestrator.py b/src/agents/orchestrator.py index ca91141..91a6f71 100644 --- a/src/agents/orchestrator.py +++ b/src/agents/orchestrator.py @@ -20,17 +20,21 @@ from src.agents import ( anomaly as anomaly_module, + budget_analysis as budget_analysis_module, categorization as categorization_module, + category_audit as category_audit_module, monthly_analysis as monthly_module, synthesizer as synthesizer_module, weekly_analysis as weekly_module, ) from src.agents._usage import AgentUsage from src.agents.anomaly import run_anomaly_detection +from src.agents.budget_analysis import run_budget_analysis from src.agents.categorization import ( DEFAULT_AUTO_APPLY_CONFIDENCE, run_categorization, ) +from src.agents.category_audit import run_category_audit from src.agents.monthly_analysis import run_monthly_analysis from src.agents.reaper import RunCancelled from src.agents.synthesizer import run_synthesizer @@ -47,6 +51,8 @@ # Map agent_type → model identifier for usage_detail serialisation. AGENT_MODELS: dict[str, str] = { "categorization": categorization_module.MODEL, + "category_audit": category_audit_module.MODEL, + "budget_analysis": budget_analysis_module.MODEL, "weekly_analysis": weekly_module.MODEL, "monthly_analysis": monthly_module.MODEL, "anomaly": anomaly_module.MODEL, @@ -170,6 +176,8 @@ def _parse_period(period: str | None, fallback: date) -> tuple[date, date]: VALID_AGENT_TYPES = frozenset( { "categorization", + "category_audit", + "budget_analysis", "weekly_analysis", "monthly_analysis", "anomaly", @@ -286,6 +294,8 @@ def run_single(self, agent_type: str) -> str: _PIPELINE_STEPS = [ ("categorization", "Kategorisierung"), + ("category_audit", "Kategorie-Audit"), + ("budget_analysis", "Budget-Analyse"), ("weekly_analysis", "Wochenanalyse"), ("monthly_analysis", "Monatsanalyse"), ("anomaly", "Anomalie-Erkennung"), @@ -336,6 +346,20 @@ def _run_all_agents( usage=agent_usage, ), ) + elif agent_type == "category_audit": + result = self._run_with_heartbeat( + run_id, + lambda: run_category_audit( + self.engine, period_days=period_days, usage=agent_usage + ), + ) + elif agent_type == "budget_analysis": + result = self._run_with_heartbeat( + run_id, + lambda: run_budget_analysis( + self.engine, period_days=period_days, usage=agent_usage + ), + ) elif agent_type == "weekly_analysis": result = self._run_with_heartbeat( run_id, @@ -389,6 +413,8 @@ def _run_all_agents( lambda: run_synthesizer( engine=self.engine, categorization=results.get("categorization"), + category_audit=results.get("category_audit"), + budget_analysis=results.get("budget_analysis"), weekly=results.get("weekly_analysis"), monthly=results.get("monthly_analysis"), anomaly=results.get("anomaly"), @@ -472,6 +498,26 @@ def _run_agent( ), usage, ) + elif agent_type == "category_audit": + return ( + self._run_with_heartbeat( + run_id, + lambda: run_category_audit( + self.engine, period_days=period_days, usage=usage + ), + ), + usage, + ) + elif agent_type == "budget_analysis": + return ( + self._run_with_heartbeat( + run_id, + lambda: run_budget_analysis( + self.engine, period_days=period_days, usage=usage + ), + ), + usage, + ) elif agent_type == "monthly_analysis": return ( self._run_with_heartbeat( diff --git a/src/agents/prompts/budget_analysis.py b/src/agents/prompts/budget_analysis.py new file mode 100644 index 0000000..5966fa0 --- /dev/null +++ b/src/agents/prompts/budget_analysis.py @@ -0,0 +1,22 @@ +"""System prompt for the budget analysis agent.""" + +BUDGET_ANALYSIS_SYSTEM_PROMPT = """\ +Du bist ein Budget-Analyse-Agent für persönliche Finanzen. + +Du erhältst einen deterministischen Analysis Context mit Monatsübersicht, +Budgetstatus, Kategorie-Semantik und sanitisierten Mail-Evidence-Hinweisen. + +Deine Aufgabe: +- Erkläre Budgetrisiken anhand von vorberechneten Zahlen. +- Identifiziere Top-Treiber, wiederkehrende Belastungen, Einmalkäufe, + Refund-Effekte und Kategorien ohne sinnvolles Budget. +- Nutze Mail-Evidence, um "was war das wirklich?" zu beantworten. +- Gib konkrete Review-/Handlungsoptionen aus, keine generischen Spartipps. + +Regeln: +- Rechne NICHT selbst. Budgetstatus, Remaining und Utilization sind Fakten. +- Unterscheide harte Fakten (`evidence_level="fact"`) von Hypothesen + (`evidence_level="inference"`) und Review-Bedarf. +- Setze `source_agent="budget_analysis"` für jede Observation. +- Schreibe auf Deutsch, knapp und handlungsorientiert. +""" diff --git a/src/agents/prompts/category_audit.py b/src/agents/prompts/category_audit.py new file mode 100644 index 0000000..eb36b0e --- /dev/null +++ b/src/agents/prompts/category_audit.py @@ -0,0 +1,22 @@ +"""System prompt for the category audit agent.""" + +CATEGORY_AUDIT_SYSTEM_PROMPT = """\ +Du bist ein Kategorie-Audit-Agent für persönliche Finanzdaten. + +Du erhältst einen deterministischen Analysis Context mit Kategorien, +Budgetstatus, Top-Transaktionen und sanitisierten Mail-Evidence-Hinweisen. + +Deine Aufgabe: +- Erkenne inkonsistente Kategorien, unklare Händler und falsche Budget-Wirkung. +- Nutze Mail-Evidence als Zusatzbeleg, nicht als alleinige Wahrheit. +- Markiere Fälle, bei denen eine Budgetüberschreitung vermutlich nur durch + falsche/zu grobe Kategorisierung entsteht. +- Schlage Review-Bedarf vor, aber ändere keine Kategorien. + +Regeln: +- Rechne NICHT selbst. Zahlen kommen aus dem Analysis Context. +- Unterscheide Fakten von Vermutungen über `evidence_level`. +- Setze `source_agent="category_audit"` für jede Observation. +- Nutze `confidence` nur, wenn der Evidence-Konflikt klar genug ist. +- Schreibe auf Deutsch, knapp und konkret. +""" diff --git a/src/agents/prompts/synthesizer.py b/src/agents/prompts/synthesizer.py index 60e6d99..1d6d2c1 100644 --- a/src/agents/prompts/synthesizer.py +++ b/src/agents/prompts/synthesizer.py @@ -3,7 +3,8 @@ SYNTHESIZER_SYSTEM_PROMPT = """\ Du bist ein Finanz-Synthesizer. Du erhältst die Ergebnisse mehrerer \ spezialisierter Analyse-Agents (Kategorisierung, Wochenanalyse, \ -Monatsanalyse, Anomalie-Erkennung) und fasst sie zu einem \ +Kategorie-Audit, Budget-Analyse, Monatsanalyse, Anomalie-Erkennung) \ +und fasst sie zu einem \ kohärenten Wochenbericht zusammen. Deine Aufgabe: @@ -18,6 +19,10 @@ Regeln: - Rechne NICHT selbst. Alle Zahlen kommen aus den Agent-Ergebnissen. - Widersprich den Agents nicht — du fasst zusammen und priorisierst. +- Budget- und Kategorie-Audit-Fakten haben Vorrang vor allgemeinen + Wochen-/Monatsformulierungen, wenn sie konkreter sind. +- Trenne harte Fakten, Hypothesen und Review-Bedarf sauber in Summary und + Action Items. - Halte die Zusammenfassung unter 500 Wörtern. - action_items sollen konkret und umsetzbar sein, keine Allgemeinplätze. diff --git a/src/agents/synthesizer.py b/src/agents/synthesizer.py index b89b147..29fdfaf 100644 --- a/src/agents/synthesizer.py +++ b/src/agents/synthesizer.py @@ -21,6 +21,7 @@ CategorizationResult, SynthesisResult, ) +from src.services.llm_context import sanitize_context logger = logging.getLogger(__name__) @@ -39,6 +40,8 @@ def run_synthesizer( engine: Engine, categorization: CategorizationResult | None = None, + category_audit: AnalysisResult | None = None, + budget_analysis: AnalysisResult | None = None, weekly: AnalysisResult | None = None, monthly: AnalysisResult | None = None, anomaly: AnomalyResult | None = None, @@ -61,6 +64,10 @@ def run_synthesizer( inputs: dict = {"period": period} if categorization: inputs["categorization"] = categorization.model_dump() + if category_audit: + inputs["category_audit"] = category_audit.model_dump() + if budget_analysis: + inputs["budget_analysis"] = budget_analysis.model_dump() if weekly: inputs["weekly_analysis"] = weekly.model_dump() if monthly: @@ -74,6 +81,8 @@ def run_synthesizer( if len(inputs) == 1: for key, report_type in ( ("categorization", "categorization"), + ("category_audit", "category_audit"), + ("budget_analysis", "budget_analysis"), ("weekly_analysis", "weekly_analysis"), ("monthly_analysis", "monthly_analysis"), ("anomaly_detection", "anomaly"), @@ -97,15 +106,17 @@ def run_synthesizer( ) memory = get_recent_reports(engine, "synthesis", limit=2) + safe_inputs = sanitize_context(inputs) + safe_memory = sanitize_context({"previous_syntheses": memory}) if memory else None prompt_parts = [ f"## Wochenbericht-Synthese {period}\n", - f"### Agent-Ergebnisse\n\n```json\n{json.dumps(inputs, ensure_ascii=False, indent=2)}\n```\n", + f"### Agent-Ergebnisse\n\n```json\n{json.dumps(safe_inputs, ensure_ascii=False, indent=2)}\n```\n", ] - if memory: + if safe_memory: prompt_parts.append( f"### Vorherige Synthesen (Kontext)\n\n" - f"```json\n{json.dumps(memory, ensure_ascii=False, indent=2)}\n```\n" + f"```json\n{json.dumps(safe_memory, ensure_ascii=False, indent=2)}\n```\n" ) prompt_parts.append("Erstelle den Wochenbericht gemäß dem Schema.") diff --git a/src/agents/types.py b/src/agents/types.py index dc32988..7b4b336 100644 --- a/src/agents/types.py +++ b/src/agents/types.py @@ -73,6 +73,11 @@ class Observation(AgentOutputModel): severity: str = Field(description="info, warning, or alert") transaction_ids: list[str] = Field(default_factory=list) metrics: list[ObservationMetric] = Field(default_factory=list) + source_agent: str | None = None + confidence: float | None = Field(default=None, ge=0.0, le=1.0) + observation_type: str | None = None + evidence_level: str | None = Field(default=None, description="fact, inference, or review_needed") + related_category_ids: list[str] = Field(default_factory=list) @field_validator("metrics", mode="before") @classmethod diff --git a/src/agents/weekly_analysis.py b/src/agents/weekly_analysis.py index c970043..1d22977 100644 --- a/src/agents/weekly_analysis.py +++ b/src/agents/weekly_analysis.py @@ -13,6 +13,7 @@ from src.agents._anthropic import make_anthropic_model from src.agents._runner import run_in_fresh_loop from src.agents._usage import AgentUsage, extract_usage +from src.agents.context import get_safe_analysis_context from src.agents.gather import ( get_category_breakdown, get_monthly_summary, @@ -24,6 +25,7 @@ from src.agents.period import derive_period_label from src.agents.prompts.weekly_analysis import WEEKLY_ANALYSIS_SYSTEM_PROMPT from src.agents.types import AnalysisResult +from src.services.llm_context import sanitize_context logger = logging.getLogger(__name__) @@ -68,6 +70,7 @@ def run_weekly_analysis( recurring = get_recurring_patterns(engine) savings = get_savings_rate(engine, monday, sunday) memory = get_recent_reports(engine, "weekly_analysis", limit=2) + analysis_context = get_safe_analysis_context(engine, year=monday.year, month=monday.month) data = { "period": iso_week, @@ -78,16 +81,19 @@ def run_weekly_analysis( "category_breakdown": categories, "recurring_patterns": recurring, "savings_rate": savings, + "analysis_context": analysis_context, } + safe_data = sanitize_context(data) + safe_memory = sanitize_context({"previous_reports": memory}) if memory else None prompt_parts = [ f"## Wochenanalyse {iso_week}\n", f"Zeitraum: {data['date_range']}\n", - f"### Daten\n\n```json\n{json.dumps(data, ensure_ascii=False, indent=2)}\n```\n", + f"### Daten\n\n```json\n{json.dumps(safe_data, ensure_ascii=False, indent=2)}\n```\n", ] - if memory: + if safe_memory: prompt_parts.append( f"### Vorherige Analysen (Kontext)\n\n" - f"```json\n{json.dumps(memory, ensure_ascii=False, indent=2)}\n```\n" + f"```json\n{json.dumps(safe_memory, ensure_ascii=False, indent=2)}\n```\n" ) prompt_parts.append("Erstelle die Wochenanalyse gemäß dem Schema.") diff --git a/src/api/app.py b/src/api/app.py index 1152709..3af71bc 100644 --- a/src/api/app.py +++ b/src/api/app.py @@ -19,6 +19,7 @@ depots, dev, import_csv, + mail_evidence, meta, portfolio, reports, @@ -125,6 +126,7 @@ def health(): app.include_router(tags.router, prefix="/api/v1") app.include_router(sync.router, prefix="/api/v1") app.include_router(import_csv.router, prefix="/api/v1") + app.include_router(mail_evidence.router, prefix="/api/v1") app.include_router(meta.router, prefix="/api/v1") app.include_router(settings_router.router, prefix="/api/v1") app.include_router(categorization.router, prefix="/api/v1") diff --git a/src/api/deps.py b/src/api/deps.py index 6e0a56a..df1397f 100644 --- a/src/api/deps.py +++ b/src/api/deps.py @@ -27,6 +27,18 @@ def _get_engine(): Db = Annotated[Session, Depends(get_db)] +def _require_credentials( + credentials: HTTPAuthorizationCredentials | None = Depends(_bearer), +) -> HTTPAuthorizationCredentials: + if not credentials: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid or missing token", + headers={"WWW-Authenticate": "Bearer"}, + ) + return credentials + + # --------------------------------------------------------------------------- # Service principal — represents MCP / Scheduler using the static API_TOKEN # --------------------------------------------------------------------------- @@ -46,8 +58,8 @@ class ServicePrincipal: def _get_current_principal( request: Request, + credentials: Annotated[HTTPAuthorizationCredentials, Depends(_require_credentials)], db: Annotated[Session, Depends(get_db)], - credentials: HTTPAuthorizationCredentials | None = Depends(_bearer), ) -> Union[User, ServicePrincipal]: """Return the authenticated principal or raise 401. @@ -55,13 +67,6 @@ def _get_current_principal( 1. Valid JWT → returns User (browser sessions) 2. Static API_TOKEN match → returns ServicePrincipal (MCP, Scheduler) """ - if not credentials: - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Not authenticated", - headers={"WWW-Authenticate": "Bearer"}, - ) - token = credentials.credentials # Try JWT first (browser sessions) @@ -108,17 +113,10 @@ def _get_current_user( def require_token( + credentials: Annotated[HTTPAuthorizationCredentials, Depends(_require_credentials)], db: Annotated[Session, Depends(get_db)], - credentials: HTTPAuthorizationCredentials | None = Depends(_bearer), ) -> None: """Accept either a valid JWT (browser) or the static API_TOKEN (services).""" - if not credentials: - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail="Invalid or missing token", - headers={"WWW-Authenticate": "Bearer"}, - ) - token = credentials.credentials if settings.jwt_secret: diff --git a/src/api/routers/aggregates.py b/src/api/routers/aggregates.py index 4eeb9d1..13588f9 100644 --- a/src/api/routers/aggregates.py +++ b/src/api/routers/aggregates.py @@ -19,6 +19,7 @@ from src.api.deps import CurrentUser, get_db, require_token from src.api.schemas import ( + AnalysisContextOut, BudgetSpendingItem, BudgetSpendingOut, CashflowOverTimeOut, @@ -29,7 +30,8 @@ RefundAuditOut, RefundAutoApplyResult, ) -from src.core.db.models import Budget, Category, NormalizedTransaction +from src.core.db.models import NormalizedTransaction +from src.services import financial_aggregates from src.services.refund_audit import ( apply_refund_heuristic, list_audit_candidates, @@ -69,62 +71,14 @@ def monthly_summary( exclude_internal: bool = Query(True), ): """Return income, expenses, net, savings rate, and per-category breakdown for a month.""" - totals_stmt = select( - func.coalesce(func.sum(_INCOME_CASE), 0).label("income"), - func.coalesce(func.sum(_EXPENSE_CASE), 0).label("expenses"), - func.count().label("transaction_count"), - ).where( - extract("year", NormalizedTransaction.booking_date) == year, - extract("month", NormalizedTransaction.booking_date) == month, + data = financial_aggregates.monthly_summary( + db, year=year, month=month, exclude_internal=exclude_internal ) - if exclude_internal: - totals_stmt = totals_stmt.where(NormalizedTransaction.internal_transfer.is_(False)) - - row = db.execute(totals_stmt).one() - income = row.income - expenses = row.expenses - net = income + expenses - savings_rate = (net / income) if income else 0 - - # Per-category breakdown sums *all* amounts in the category (refund + - # original automatically net out, since both share the same category). - cat_stmt = ( - select( - Category.id, - Category.name, - func.sum(NormalizedTransaction.amount).label("total"), - func.count().label("count"), - ) - .join(Category, NormalizedTransaction.category_id == Category.id) - .where( - extract("year", NormalizedTransaction.booking_date) == year, - extract("month", NormalizedTransaction.booking_date) == month, - ) - .group_by(Category.id, Category.name) - .order_by(func.sum(NormalizedTransaction.amount).asc()) - ) - if exclude_internal: - cat_stmt = cat_stmt.where(NormalizedTransaction.internal_transfer.is_(False)) - - categories = [ - CategoryBreakdown( - category_id=r.id, - category_name=r.name, - total=r.total, - transaction_count=r.count, - ) - for r in db.execute(cat_stmt).all() - ] - return MonthlySummaryOut( - year=year, - month=month, - income=income, - expenses=expenses, - net=net, - savings_rate=round(savings_rate, 4), - transaction_count=row.transaction_count, - by_category=categories, + **{ + **data, + "by_category": [CategoryBreakdown(**item) for item in data["by_category"]], + } ) @@ -207,72 +161,29 @@ def budget_spending( Internal transfers are always excluded — they never belong to a budget. """ - # Refund sum is computed against `amount` (positive value); the "refund" - # bucket is conceptually a *reduction* of expenses, so we add it directly - # to spent_gross to derive spent_net. - refund_sum = func.coalesce( - func.sum( - case((NormalizedTransaction.is_refund.is_(True), NormalizedTransaction.amount)) - ), - 0, - ) - expense_sum = func.coalesce( - func.sum( - case( - ( - and_( - NormalizedTransaction.amount < 0, - NormalizedTransaction.is_refund.is_(False), - ), - NormalizedTransaction.amount, - ) - ) - ), - 0, + data = financial_aggregates.budget_spending(db, year=year, month=month) + return BudgetSpendingOut( + year=data["year"], + month=data["month"], + items=[BudgetSpendingItem(**item) for item in data["items"]], ) - stmt = ( - select( - Budget.category_id.label("category_id"), - Budget.monthly_limit.label("monthly_limit"), - Budget.currency.label("currency"), - Category.name.label("category_name"), - expense_sum.label("spent_gross"), - refund_sum.label("refunded"), - func.count(NormalizedTransaction.id).label("transaction_count"), - ) - .join(Category, Category.id == Budget.category_id) - .outerjoin( - NormalizedTransaction, - and_( - NormalizedTransaction.category_id == Budget.category_id, - NormalizedTransaction.internal_transfer.is_(False), - extract("year", NormalizedTransaction.booking_date) == year, - extract("month", NormalizedTransaction.booking_date) == month, - ), - ) - .group_by(Budget.category_id, Budget.monthly_limit, Budget.currency, Category.name) - .order_by(Category.name) - ) - items = [] - for r in db.execute(stmt).all(): - spent_net = r.spent_gross + r.refunded - items.append( - BudgetSpendingItem( - category_id=r.category_id, - category_name=r.category_name, - monthly_limit=r.monthly_limit, - currency=r.currency, - spent_gross=r.spent_gross, - refunded=r.refunded, - spent_net=spent_net, - remaining=r.monthly_limit + spent_net, - transaction_count=r.transaction_count, - ) - ) +@router.get("/analysis-context", response_model=AnalysisContextOut) +def analysis_context( + db: Session = Depends(get_db), + year: int = Query(..., ge=2000, le=2100), + month: int = Query(..., ge=1, le=12), +): + """Deterministic context bundle for finance agents. - return BudgetSpendingOut(year=year, month=month, items=items) + This is the read-only bridge between accounting facts and LLM + interpretation: money math, budgets, category semantics, and sanitized + mail evidence are prepared here before agents see anything. + """ + return AnalysisContextOut( + **financial_aggregates.analysis_context(db, year=year, month=month) + ) # --------------------------------------------------------------------------- diff --git a/src/api/routers/categories.py b/src/api/routers/categories.py index fae2a37..35a7c48 100644 --- a/src/api/routers/categories.py +++ b/src/api/routers/categories.py @@ -48,7 +48,18 @@ def create_category(body: CategoryCreate, db: Session = Depends(get_db)): detail=f"Category with {field} '{getattr(clash, field)}' already exists", ) - category = Category(id=category_id, name=body.name, type=body.type) + category = Category( + id=category_id, + name=body.name, + type=body.type, + kind=body.kind, + budgetable=body.budgetable, + analysis_group=body.analysis_group, + description=body.description, + examples=body.examples, + anti_examples=body.anti_examples, + llm_hints=body.llm_hints, + ) db.add(category) db.commit() db.refresh(category) @@ -97,6 +108,11 @@ def list_budgets(db: Session = Depends(get_db)): category_id=b.category_id, monthly_limit=b.monthly_limit, currency=b.currency, + is_active=b.is_active, + priority=b.priority, + warning_threshold=b.warning_threshold, + critical_threshold=b.critical_threshold, + context_note=b.context_note, category=CategoryOut.model_validate(cat) if cat else None, ) ) @@ -120,11 +136,21 @@ def upsert_budget( if budget: budget.monthly_limit = body.monthly_limit budget.currency = body.currency + budget.is_active = body.is_active + budget.priority = body.priority + budget.warning_threshold = body.warning_threshold + budget.critical_threshold = body.critical_threshold + budget.context_note = body.context_note else: budget = Budget( category_id=category_id, monthly_limit=body.monthly_limit, currency=body.currency, + is_active=body.is_active, + priority=body.priority, + warning_threshold=body.warning_threshold, + critical_threshold=body.critical_threshold, + context_note=body.context_note, ) db.add(budget) @@ -134,5 +160,10 @@ def upsert_budget( category_id=budget.category_id, monthly_limit=budget.monthly_limit, currency=budget.currency, + is_active=budget.is_active, + priority=budget.priority, + warning_threshold=budget.warning_threshold, + critical_threshold=budget.critical_threshold, + context_note=budget.context_note, category=CategoryOut.model_validate(cat), ) diff --git a/src/api/routers/mail_evidence.py b/src/api/routers/mail_evidence.py new file mode 100644 index 0000000..f4c86e2 --- /dev/null +++ b/src/api/routers/mail_evidence.py @@ -0,0 +1,77 @@ +"""Mail evidence endpoints. + +These routes accept mail-like input at the API edge. Gmail import tooling feeds +the same service layer; raw mail content is never persisted. +""" + +from __future__ import annotations + +from fastapi import APIRouter, Depends, Query +from sqlalchemy import select +from sqlalchemy.orm import Session + +from src.api.deps import Auth, get_db +from src.api.schemas import ( + EvidenceLinkOut, + MailEvidenceImportOut, + MailEvidenceOut, + MailMessageImport, +) +from src.core.db.models import MailEvidence +from src.services.mail_evidence import import_mail_message, match_evidence_to_transactions + +router = APIRouter(prefix="/mail-evidence", tags=["mail-evidence"], dependencies=[Auth]) + + +@router.get("", response_model=list[MailEvidenceOut]) +def list_mail_evidence( + db: Session = Depends(get_db), + limit: int = Query(50, ge=1, le=500), + offset: int = Query(0, ge=0), + evidence_type: str | None = None, +) -> list[MailEvidenceOut]: + stmt = select(MailEvidence).order_by(MailEvidence.document_date.desc().nullslast()) + if evidence_type: + stmt = stmt.where(MailEvidence.evidence_type == evidence_type) + rows = db.execute(stmt.limit(limit).offset(offset)).scalars().all() + return [MailEvidenceOut.model_validate(row) for row in rows] + + +@router.post("/mock-import", response_model=MailEvidenceImportOut) +def import_mock_mail_evidence( + body: MailMessageImport, + db: Session = Depends(get_db), +) -> MailEvidenceImportOut: + return _import_mail_evidence(body, db) + + +@router.post("/import", response_model=MailEvidenceImportOut) +def import_mail_evidence( + body: MailMessageImport, + db: Session = Depends(get_db), +) -> MailEvidenceImportOut: + return _import_mail_evidence(body, db) + + +def _import_mail_evidence( + body: MailMessageImport, + db: Session, +) -> MailEvidenceImportOut: + evidence, links = import_mail_message(db, body.model_dump()) + return MailEvidenceImportOut( + evidence=MailEvidenceOut.model_validate(evidence), + links=[EvidenceLinkOut.model_validate(link) for link in links], + ) + + +@router.post("/{evidence_id}/match", response_model=list[EvidenceLinkOut]) +def rematch_mail_evidence( + evidence_id: str, + db: Session = Depends(get_db), +) -> list[EvidenceLinkOut]: + evidence = db.get(MailEvidence, evidence_id) + if evidence is None: + return [] + links = match_evidence_to_transactions(db, evidence) + db.commit() + return [EvidenceLinkOut.model_validate(link) for link in links] diff --git a/src/api/routers/runs.py b/src/api/routers/runs.py index 794328c..c765443 100644 --- a/src/api/routers/runs.py +++ b/src/api/routers/runs.py @@ -43,6 +43,8 @@ KNOWN_AGENTS = { "categorization", + "category_audit", + "budget_analysis", "weekly_analysis", "monthly_analysis", "anomaly", diff --git a/src/api/schemas.py b/src/api/schemas.py index fa539af..5ba69f4 100644 --- a/src/api/schemas.py +++ b/src/api/schemas.py @@ -12,12 +12,26 @@ class CategoryOut(BaseModel): id: str name: str type: str + kind: str = "expense" + budgetable: bool = True + analysis_group: str | None = None + description: str | None = None + examples: list[str] | None = None + anti_examples: list[str] | None = None + llm_hints: str | None = None class CategoryCreate(BaseModel): id: str | None = None name: str type: str + kind: str = "expense" + budgetable: bool = True + analysis_group: str | None = None + description: str | None = None + examples: list[str] | None = None + anti_examples: list[str] | None = None + llm_hints: str | None = None class BudgetOut(BaseModel): @@ -26,12 +40,22 @@ class BudgetOut(BaseModel): category_id: str monthly_limit: Decimal currency: str + is_active: bool = True + priority: int = 0 + warning_threshold: Decimal = Decimal("0.80") + critical_threshold: Decimal = Decimal("1.00") + context_note: str | None = None category: CategoryOut | None = None class BudgetUpdate(BaseModel): monthly_limit: Decimal currency: str = "EUR" + is_active: bool = True + priority: int = 0 + warning_threshold: Decimal = Decimal("0.80") + critical_threshold: Decimal = Decimal("1.00") + context_note: str | None = None class TagOut(BaseModel): @@ -93,6 +117,69 @@ class TransactionLinksOut(BaseModel): parents: list[TransactionLinkOut] +# ── Mail evidence layer ───────────────────────────────────────── + + +class MailEvidenceLineItem(BaseModel): + label: str + amount: Decimal | None = None + category_hint: str | None = None + + +class MailMessageImport(BaseModel): + """Raw-ish mail input accepted only at the API edge. + + The service extracts structured evidence and discards the body; no raw mail + content is persisted. + """ + + source: str = "gmail" + source_message_id: str + received_at: date | None = None + sender: str | None = None + subject: str + body_text: str + + +class MailEvidenceOut(BaseModel): + model_config = ConfigDict(from_attributes=True) + + id: str + source: str + evidence_type: str + merchant_name: str | None = None + merchant_key: str | None = None + document_date: date | None = None + total_amount: Decimal | None = None + currency: str = "EUR" + payment_method: str | None = None + payment_hint: str | None = None + order_ref_hash: str | None = None + subject_hint: str | None = None + redacted_snippet: str | None = None + line_items: list[dict] | None = None + confidence: Decimal + created_at: datetime + updated_at: datetime + + +class EvidenceLinkOut(BaseModel): + model_config = ConfigDict(from_attributes=True) + + id: str + transaction_id: str + evidence_id: str + match_type: str + confidence: Decimal + match_reason: str + created_at: datetime + + +class MailEvidenceImportOut(BaseModel): + evidence: MailEvidenceOut + links: list[EvidenceLinkOut] = [] + + # ── Categorization rules ──────────────────────────────────────── @@ -271,6 +358,10 @@ class BudgetSpendingItem(BaseModel): spent_net: Decimal remaining: Decimal transaction_count: int + utilization: Decimal | None = None + status: str | None = None + priority: int | None = None + context_note: str | None = None class BudgetSpendingOut(BaseModel): @@ -279,6 +370,22 @@ class BudgetSpendingOut(BaseModel): items: list[BudgetSpendingItem] +class AnalysisContextOut(BaseModel): + year: int + month: int + period_start: str + period_end: str + generated_at: str + monthly_summary: dict + budget_spending: dict + budget_risks: list[dict] + uncategorized_count: int + category_semantics: list[dict] + mail_evidence: list[dict] + top_transactions: list[dict] + assumptions: list[str] + + class RefundAuditCandidate(BaseModel): """A positive-amount transaction sitting in `erstattungen` with `is_refund=False` — i.e. either a real income (Steuer, Cashback) or diff --git a/src/core/db/models.py b/src/core/db/models.py index 00b2f26..89f6d04 100644 --- a/src/core/db/models.py +++ b/src/core/db/models.py @@ -145,6 +145,19 @@ class Category(Base): SQLEnum(TypeEnum, values_callable=lambda e: [m.value for m in e]), nullable=False, ) + # Semantic hints for analysis agents. ``type`` stays as the legacy UI + # grouping; these fields describe accounting intent and prompt context. + kind: Mapped[str] = mapped_column( + String(32), nullable=False, default="expense", server_default="expense" + ) + budgetable: Mapped[bool] = mapped_column( + Boolean, nullable=False, default=True, server_default="true" + ) + analysis_group: Mapped[Optional[str]] = mapped_column(String(64), nullable=True) + description: Mapped[Optional[str]] = mapped_column(String(500), nullable=True) + examples: Mapped[Optional[list]] = mapped_column(JSON, nullable=True) + anti_examples: Mapped[Optional[list]] = mapped_column(JSON, nullable=True) + llm_hints: Mapped[Optional[str]] = mapped_column(String(1000), nullable=True) class Budget(Base): @@ -155,6 +168,17 @@ class Budget(Base): ) monthly_limit: Mapped[Decimal] = mapped_column(Numeric(14, 2), nullable=False) currency: Mapped[str] = mapped_column(String(3), nullable=False, default="EUR") + is_active: Mapped[bool] = mapped_column( + Boolean, nullable=False, default=True, server_default="true" + ) + priority: Mapped[int] = mapped_column(Integer, nullable=False, default=0, server_default="0") + warning_threshold: Mapped[Decimal] = mapped_column( + Numeric(4, 2), nullable=False, default=Decimal("0.80"), server_default="0.80" + ) + critical_threshold: Mapped[Decimal] = mapped_column( + Numeric(4, 2), nullable=False, default=Decimal("1.00"), server_default="1.00" + ) + context_note: Mapped[Optional[str]] = mapped_column(String(500), nullable=True) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), server_default=func.now(), nullable=False ) @@ -347,6 +371,84 @@ class TransactionLink(Base): ) +class MailEvidence(Base): + """Redacted evidence extracted from email receipts, invoices, and orders. + + Raw email bodies are intentionally not stored here. The row is the + sanitized, structured fact layer used to explain/match transactions. + """ + + __tablename__ = "mail_evidence" + __table_args__ = ( + UniqueConstraint( + "source", + "order_ref_hash", + "evidence_type", + name="uq_mail_evidence_source_order_type", + ), + Index("ix_mail_evidence_merchant_key", "merchant_key"), + Index("ix_mail_evidence_document_date", "document_date"), + Index("ix_mail_evidence_order_ref_hash", "order_ref_hash"), + ) + + id: Mapped[str] = mapped_column(String(64), primary_key=True) + source: Mapped[str] = mapped_column(String(32), nullable=False, default="gmail") + evidence_type: Mapped[str] = mapped_column(String(32), nullable=False) + merchant_name: Mapped[Optional[str]] = mapped_column(String(200), nullable=True) + merchant_key: Mapped[Optional[str]] = mapped_column(String(200), nullable=True) + document_date: Mapped[Optional[date]] = mapped_column(Date, nullable=True) + total_amount: Mapped[Optional[Decimal]] = mapped_column(Numeric(14, 2), nullable=True) + currency: Mapped[str] = mapped_column(String(3), nullable=False, default="EUR") + payment_method: Mapped[Optional[str]] = mapped_column(String(32), nullable=True) + payment_hint: Mapped[Optional[str]] = mapped_column(String(200), nullable=True) + order_ref_hash: Mapped[Optional[str]] = mapped_column(String(64), nullable=True) + subject_hint: Mapped[Optional[str]] = mapped_column(String(200), nullable=True) + redacted_snippet: Mapped[Optional[str]] = mapped_column(String(500), nullable=True) + line_items: Mapped[Optional[list]] = mapped_column(JSON, nullable=True) + confidence: Mapped[Decimal] = mapped_column( + Numeric(4, 3), nullable=False, default=Decimal("0.000"), server_default="0.000" + ) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), server_default=func.now(), nullable=False + ) + updated_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), + server_default=func.now(), + onupdate=func.now(), + nullable=False, + ) + + +class TransactionEvidenceLink(Base): + """Confidence-scored link between a transaction and sanitized evidence.""" + + __tablename__ = "transaction_evidence_links" + __table_args__ = ( + UniqueConstraint( + "transaction_id", + "evidence_id", + "match_type", + name="uq_transaction_evidence_link", + ), + Index("ix_transaction_evidence_links_transaction", "transaction_id"), + Index("ix_transaction_evidence_links_evidence", "evidence_id"), + ) + + id: Mapped[str] = mapped_column(String(64), primary_key=True) + transaction_id: Mapped[str] = mapped_column( + ForeignKey("normalized_transactions.id", ondelete="CASCADE"), nullable=False + ) + evidence_id: Mapped[str] = mapped_column( + ForeignKey("mail_evidence.id", ondelete="CASCADE"), nullable=False + ) + match_type: Mapped[str] = mapped_column(String(32), nullable=False) + confidence: Mapped[Decimal] = mapped_column(Numeric(4, 3), nullable=False) + match_reason: Mapped[str] = mapped_column(String(500), nullable=False) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), server_default=func.now(), nullable=False + ) + + class AgentRun(Base): """Tracks individual agent runs triggered via the Runs API (M6). diff --git a/src/external/gmail_import.py b/src/external/gmail_import.py new file mode 100644 index 0000000..f57a771 --- /dev/null +++ b/src/external/gmail_import.py @@ -0,0 +1,106 @@ +"""Read-only Gmail message conversion for mail evidence imports. + +This module accepts Gmail API ``users.messages.get(format=full)`` payloads and +turns them into the provider-neutral ``MailMessageImport`` shape. It does not +call Gmail itself and it does not persist raw mail content. +""" + +from __future__ import annotations + +import base64 +import html +import re +from datetime import UTC, date, datetime +from email.utils import parsedate_to_datetime +from typing import Any + + +_HTML_TAG_RE = re.compile(r"<[^>]+>") +_HTML_DROP_RE = re.compile(r"<(script|style)\b[^>]*>.*?", re.I | re.S) +_HTML_BREAK_RE = re.compile(r"<\s*(br|/p|/div|/tr|/li)\b[^>]*>", re.I) +_SPACE_RE = re.compile(r"\s+") + + +def _decode_body(data: str | None) -> str: + if not data: + return "" + padding = "=" * (-len(data) % 4) + return base64.urlsafe_b64decode(f"{data}{padding}").decode("utf-8", errors="replace") + + +def _headers(payload: dict[str, Any]) -> dict[str, str]: + values: dict[str, str] = {} + for header in payload.get("headers") or []: + name = str(header.get("name") or "").lower() + value = str(header.get("value") or "") + if name and value: + values[name] = value + return values + + +def _walk_parts(part: dict[str, Any]) -> list[dict[str, Any]]: + parts = [part] + for child in part.get("parts") or []: + parts.extend(_walk_parts(child)) + return parts + + +def _html_to_text(value: str) -> str: + value = _HTML_DROP_RE.sub(" ", value) + value = _HTML_BREAK_RE.sub("\n", value) + without_tags = _HTML_TAG_RE.sub(" ", value) + return _SPACE_RE.sub(" ", html.unescape(without_tags)).strip() + + +def _message_body(payload: dict[str, Any]) -> str: + plain_chunks: list[str] = [] + html_chunks: list[str] = [] + for part in _walk_parts(payload): + mime_type = str(part.get("mimeType") or "").lower() + decoded = _decode_body((part.get("body") or {}).get("data")) + if not decoded: + continue + if mime_type == "text/plain": + plain_chunks.append(decoded) + elif mime_type == "text/html": + html_chunks.append(_html_to_text(decoded)) + + if plain_chunks: + return "\n".join(chunk.strip() for chunk in plain_chunks if chunk.strip()) + return "\n".join(chunk for chunk in html_chunks if chunk) + + +def _received_at(message: dict[str, Any], headers: dict[str, str]) -> date | None: + internal_date = message.get("internalDate") + if internal_date: + try: + return datetime.fromtimestamp(int(internal_date) / 1000, tz=UTC).date() + except (TypeError, ValueError, OSError): + pass + + header_date = headers.get("date") + if header_date: + try: + parsed = parsedate_to_datetime(header_date) + if parsed.tzinfo is None: + parsed = parsed.replace(tzinfo=UTC) + return parsed.date() + except (TypeError, ValueError, IndexError, OverflowError): + pass + return None + + +def gmail_message_to_mail_import(message: dict[str, Any]) -> dict[str, Any]: + """Convert a Gmail API message payload into ``MailMessageImport`` data.""" + + payload = message.get("payload") or {} + headers = _headers(payload) + body_text = _message_body(payload) + return { + "source": "gmail", + "source_message_id": str(message["id"]), + "received_at": _received_at(message, headers), + "sender": headers.get("from"), + "subject": headers.get("subject") or "(no subject)", + "body_text": body_text, + } diff --git a/src/services/financial_aggregates.py b/src/services/financial_aggregates.py new file mode 100644 index 0000000..41cedb3 --- /dev/null +++ b/src/services/financial_aggregates.py @@ -0,0 +1,328 @@ +"""Deterministic financial aggregates shared by API and agents.""" + +from __future__ import annotations + +import calendar +from datetime import date, datetime, timezone +from decimal import Decimal +from typing import Any + +from sqlalchemy import and_, case, extract, func, select +from sqlalchemy.orm import Session + +from src.core.db.models import ( + Budget, + Category, + MailEvidence, + NormalizedTransaction, + TransactionEvidenceLink, +) + +_ZERO = Decimal("0") + + +def month_bounds(year: int, month: int) -> tuple[date, date]: + return date(year, month, 1), date(year, month, calendar.monthrange(year, month)[1]) + + +def _income_case(): + return case( + ( + and_( + NormalizedTransaction.amount > 0, + NormalizedTransaction.is_refund.is_(False), + ), + NormalizedTransaction.amount, + ) + ) + + +def _expense_case(): + return case( + (NormalizedTransaction.amount < 0, NormalizedTransaction.amount), + (NormalizedTransaction.is_refund.is_(True), NormalizedTransaction.amount), + ) + + +def monthly_summary( + db: Session, + *, + year: int, + month: int, + exclude_internal: bool = True, +) -> dict[str, Any]: + totals_stmt = select( + func.coalesce(func.sum(_income_case()), 0).label("income"), + func.coalesce(func.sum(_expense_case()), 0).label("expenses"), + func.count().label("transaction_count"), + ).where( + extract("year", NormalizedTransaction.booking_date) == year, + extract("month", NormalizedTransaction.booking_date) == month, + ) + if exclude_internal: + totals_stmt = totals_stmt.where(NormalizedTransaction.internal_transfer.is_(False)) + + row = db.execute(totals_stmt).one() + income = row.income + expenses = row.expenses + net = income + expenses + savings_rate = (net / income) if income else _ZERO + + cat_stmt = ( + select( + Category.id, + Category.name, + func.sum(NormalizedTransaction.amount).label("total"), + func.count().label("count"), + ) + .join(Category, NormalizedTransaction.category_id == Category.id) + .where( + extract("year", NormalizedTransaction.booking_date) == year, + extract("month", NormalizedTransaction.booking_date) == month, + ) + .group_by(Category.id, Category.name) + .order_by(func.sum(NormalizedTransaction.amount).asc()) + ) + if exclude_internal: + cat_stmt = cat_stmt.where(NormalizedTransaction.internal_transfer.is_(False)) + + categories = [ + { + "category_id": r.id, + "category_name": r.name, + "total": r.total, + "transaction_count": r.count, + } + for r in db.execute(cat_stmt).all() + ] + return { + "year": year, + "month": month, + "income": income, + "expenses": expenses, + "net": net, + "savings_rate": round(savings_rate, 4), + "transaction_count": row.transaction_count, + "by_category": categories, + } + + +def budget_spending(db: Session, *, year: int, month: int) -> dict[str, Any]: + refund_sum = func.coalesce( + func.sum(case((NormalizedTransaction.is_refund.is_(True), NormalizedTransaction.amount))), + 0, + ) + expense_sum = func.coalesce( + func.sum( + case( + ( + and_( + NormalizedTransaction.amount < 0, + NormalizedTransaction.is_refund.is_(False), + ), + NormalizedTransaction.amount, + ) + ) + ), + 0, + ) + + stmt = ( + select( + Budget.category_id.label("category_id"), + Budget.monthly_limit.label("monthly_limit"), + Budget.currency.label("currency"), + Budget.is_active.label("is_active"), + Budget.priority.label("priority"), + Budget.warning_threshold.label("warning_threshold"), + Budget.critical_threshold.label("critical_threshold"), + Budget.context_note.label("context_note"), + Category.name.label("category_name"), + expense_sum.label("spent_gross"), + refund_sum.label("refunded"), + func.count(NormalizedTransaction.id).label("transaction_count"), + ) + .join(Category, Category.id == Budget.category_id) + .outerjoin( + NormalizedTransaction, + and_( + NormalizedTransaction.category_id == Budget.category_id, + NormalizedTransaction.internal_transfer.is_(False), + extract("year", NormalizedTransaction.booking_date) == year, + extract("month", NormalizedTransaction.booking_date) == month, + ), + ) + .where(Budget.is_active.is_(True)) + .group_by( + Budget.category_id, + Budget.monthly_limit, + Budget.currency, + Budget.is_active, + Budget.priority, + Budget.warning_threshold, + Budget.critical_threshold, + Budget.context_note, + Category.name, + ) + .order_by(Budget.priority.desc(), Category.name) + ) + + items = [] + for r in db.execute(stmt).all(): + spent_net = r.spent_gross + r.refunded + utilization = (abs(spent_net) / r.monthly_limit) if r.monthly_limit else _ZERO + if utilization >= r.critical_threshold: + status = "critical" + elif utilization >= r.warning_threshold: + status = "warning" + else: + status = "ok" + items.append( + { + "category_id": r.category_id, + "category_name": r.category_name, + "monthly_limit": r.monthly_limit, + "currency": r.currency, + "spent_gross": r.spent_gross, + "refunded": r.refunded, + "spent_net": spent_net, + "remaining": r.monthly_limit + spent_net, + "transaction_count": r.transaction_count, + "utilization": round(utilization, 4), + "status": status, + "priority": r.priority, + "context_note": r.context_note, + } + ) + + return {"year": year, "month": month, "items": items} + + +def category_semantics(db: Session) -> list[dict[str, Any]]: + rows = db.execute(select(Category).order_by(Category.name)).scalars().all() + return [ + { + "id": row.id, + "name": row.name, + "type": row.type.value if hasattr(row.type, "value") else str(row.type), + "kind": row.kind, + "budgetable": row.budgetable, + "analysis_group": row.analysis_group, + "description": row.description, + "examples": row.examples or [], + "anti_examples": row.anti_examples or [], + "llm_hints": row.llm_hints, + } + for row in rows + ] + + +def _evidence_for_period(db: Session, start: date, end: date) -> list[dict[str, Any]]: + rows = db.execute( + select(MailEvidence) + .where(MailEvidence.document_date >= start) + .where(MailEvidence.document_date <= end) + .order_by(MailEvidence.document_date.desc()) + .limit(100) + ).scalars() + return [ + { + "id": row.id, + "evidence_type": row.evidence_type, + "merchant_name": row.merchant_name, + "document_date": row.document_date.isoformat() if row.document_date else None, + "total_amount": str(row.total_amount) if row.total_amount is not None else None, + "currency": row.currency, + "payment_method": row.payment_method, + "line_items": row.line_items or [], + "confidence": str(row.confidence), + } + for row in rows + ] + + +def _top_transactions(db: Session, start: date, end: date, *, limit: int = 20) -> list[dict[str, Any]]: + rows = db.execute( + select( + NormalizedTransaction, + Category.name.label("category_name"), + MailEvidence.id.label("evidence_id"), + MailEvidence.evidence_type.label("evidence_type"), + MailEvidence.merchant_name.label("evidence_merchant"), + TransactionEvidenceLink.confidence.label("evidence_confidence"), + ) + .outerjoin(Category, NormalizedTransaction.category_id == Category.id) + .outerjoin( + TransactionEvidenceLink, + TransactionEvidenceLink.transaction_id == NormalizedTransaction.id, + ) + .outerjoin(MailEvidence, MailEvidence.id == TransactionEvidenceLink.evidence_id) + .where(NormalizedTransaction.booking_date >= start) + .where(NormalizedTransaction.booking_date <= end) + .where(NormalizedTransaction.internal_transfer.is_(False)) + .where(NormalizedTransaction.amount < 0) + .order_by(NormalizedTransaction.amount.asc()) + .limit(limit) + ).all() + return [ + { + "transaction_id": tx.id, + "booking_date": tx.booking_date.isoformat(), + "amount": str(tx.amount), + "source": tx.source.value if hasattr(tx.source, "value") else str(tx.source), + "category_id": tx.category_id, + "category_name": category_name, + "recipient": tx.recipient, + "description": tx.description, + "evidence": { + "id": evidence_id, + "type": evidence_type, + "merchant": evidence_merchant, + "confidence": str(evidence_confidence) if evidence_confidence is not None else None, + } + if evidence_id + else None, + } + for tx, category_name, evidence_id, evidence_type, evidence_merchant, evidence_confidence in rows + ] + + +def analysis_context(db: Session, *, year: int, month: int) -> dict[str, Any]: + start, end = month_bounds(year, month) + summary = monthly_summary(db, year=year, month=month) + budgets = budget_spending(db, year=year, month=month) + budget_items = budgets["items"] + budget_risks = [ + item + for item in budget_items + if item["status"] in {"warning", "critical"} or item["remaining"] < 0 + ] + + uncategorized_count = db.execute( + select(func.count()) + .select_from(NormalizedTransaction) + .where(NormalizedTransaction.booking_date >= start) + .where(NormalizedTransaction.booking_date <= end) + .where(NormalizedTransaction.category_id.is_(None)) + .where(NormalizedTransaction.internal_transfer.is_(False)) + ).scalar_one() + + return { + "year": year, + "month": month, + "period_start": start.isoformat(), + "period_end": end.isoformat(), + "generated_at": datetime.now(timezone.utc).isoformat(), + "monthly_summary": summary, + "budget_spending": budgets, + "budget_risks": budget_risks, + "uncategorized_count": uncategorized_count, + "category_semantics": category_semantics(db), + "mail_evidence": _evidence_for_period(db, start, end), + "top_transactions": _top_transactions(db, start, end), + "assumptions": [ + "internal transfers are excluded from spend", + "is_refund=true credits reduce original expense categories", + "mail evidence is sanitized and matched by amount/date/merchant/payment hints", + ], + } diff --git a/src/services/llm_context.py b/src/services/llm_context.py new file mode 100644 index 0000000..b9bbbba --- /dev/null +++ b/src/services/llm_context.py @@ -0,0 +1,109 @@ +"""Sanitize context before it is sent to LLM agents.""" + +from __future__ import annotations + +import json +import re +from collections.abc import Mapping, Sequence +from datetime import date, datetime +from decimal import Decimal +from typing import Any + +_EMAIL_RE = re.compile(r"\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b", re.I) +_DE_IBAN_RE = re.compile(r"\bDE\d{20}\b") +_GENERIC_IBAN_RE = re.compile(r"\b[A-Z]{2}\d{2}[A-Z0-9]{10,30}\b") +_LONG_DIGITS_RE = re.compile(r"\b\d{9,}\b") +_BEARER_RE = re.compile(r"(?i)\bBearer\s+[A-Za-z0-9\-_\.~+/=]{8,}") +_REFERENCE_RE = re.compile( + r"(?i)\b(mandat|mandate|reference|referenz|endtoend|order|invoice)" + r"[\s:#-]*[A-Z0-9][A-Z0-9._/-]{3,}\b" +) +_BANNED_KEY_RE = re.compile( + r"(iban|raw|source_payload|account_id|depot_id|external_id|mandate|" + r"reference|order|invoice|message_id|token|password|webhook|database_url|email)", + re.I, +) + + +def sanitize_text(value: str, *, limit: int = 300) -> str: + out = _EMAIL_RE.sub("[email]", value) + out = _DE_IBAN_RE.sub("DE[iban]", out) + out = _GENERIC_IBAN_RE.sub("[iban]", out) + out = _BEARER_RE.sub("Bearer [token]", out) + out = _REFERENCE_RE.sub(lambda m: f"{m.group(1)} [ref]", out) + out = _LONG_DIGITS_RE.sub("[number]", out) + out = re.sub(r"\s+", " ", out).strip() + return out[:limit] + + +class LLMContextSanitizer: + def __init__(self) -> None: + self._tx_map: dict[str, str] = {} + self._evidence_map: dict[str, str] = {} + + def _pseudonym(self, value: Any, mapping: dict[str, str], prefix: str) -> str: + key = str(value) + if key not in mapping: + mapping[key] = f"{prefix}_{len(mapping) + 1:03d}" + return mapping[key] + + def sanitize(self, value: Any, *, key: str | None = None) -> Any: + if key and _BANNED_KEY_RE.search(key): + return None + if key == "transaction_id": + return self._pseudonym(value, self._tx_map, "tx") + if key == "transaction_ids" and isinstance(value, Sequence) and not isinstance(value, str): + return [self._pseudonym(item, self._tx_map, "tx") for item in value] + if key == "evidence_id": + return self._pseudonym(value, self._evidence_map, "ev") + if isinstance(value, str): + return sanitize_text(value) + if isinstance(value, Decimal): + return str(value) + if isinstance(value, datetime | date): + return value.isoformat() + if isinstance(value, Mapping): + return { + item_key: sanitized + for item_key, item_value in value.items() + for sanitized in [self.sanitize(item_value, key=str(item_key))] + if sanitized is not None + } + if isinstance(value, Sequence) and not isinstance(value, bytes | bytearray): + return [self.sanitize(item) for item in value] + return value + + @property + def tx_map(self) -> dict[str, str]: + return dict(self._tx_map) + + +def sanitize_context(context: Mapping[str, Any]) -> dict[str, Any]: + sanitizer = LLMContextSanitizer() + sanitized = sanitizer.sanitize(context) + assert_context_safe(sanitized) + return sanitized + + +def sanitize_search_query(query: str) -> str: + safe = sanitize_text(query, limit=120) + # Web search should get a merchant-ish phrase only, not a whole booking text. + safe = re.sub(r"\[(?:email|iban|number|token|ref)\]", " ", safe, flags=re.I) + safe = re.sub(r"[^A-Za-z0-9ÄÖÜäöüß .&+-]", " ", safe) + safe = re.sub(r"\s+", " ", safe).strip() + return safe[:80] + + +def assert_context_safe(context: Any) -> None: + serialized = json.dumps(context, ensure_ascii=False, default=str) + forbidden = [ + ("email", _EMAIL_RE), + ("de_iban", _DE_IBAN_RE), + ("generic_iban", _GENERIC_IBAN_RE), + ("bearer_token", _BEARER_RE), + ("long_digits", _LONG_DIGITS_RE), + ("reference", _REFERENCE_RE), + ] + for label, pattern in forbidden: + if pattern.search(serialized): + raise ValueError(f"LLM context contains forbidden pattern: {label}") diff --git a/src/services/mail_evidence.py b/src/services/mail_evidence.py new file mode 100644 index 0000000..677adc8 --- /dev/null +++ b/src/services/mail_evidence.py @@ -0,0 +1,382 @@ +"""Mail evidence extraction and transaction matching. + +This module is deliberately provider-neutral. Real Gmail access can feed the +same ``MailMessageImport`` shape later, but only sanitized evidence rows are +persisted. Raw mail bodies stay at the API edge. +""" + +from __future__ import annotations + +import hashlib +import re +from datetime import date, timedelta +from decimal import Decimal, InvalidOperation +from typing import Any + +from sqlalchemy import and_, select +from sqlalchemy.orm import Session + +from src.core.db.models import ( + MailEvidence, + NormalizedTransaction, + TransactionEvidenceLink, +) + +_EMAIL_RE = re.compile(r"\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b", re.I) +_DE_IBAN_RE = re.compile(r"\bDE\d{20}\b") +_GENERIC_IBAN_RE = re.compile(r"\b[A-Z]{2}\d{2}[A-Z0-9]{10,30}\b") +_LONG_DIGITS_RE = re.compile(r"\b\d{7,}\b") +_ORDER_LIKE_RE = re.compile( + r"(?i)\b(order|bestell(?:ung|nummer)?|invoice|rechnung(?:snummer)?|ref)" + r"[\s:#-]*[A-Z0-9][A-Z0-9._/-]{3,}\b" +) +_SPACES_RE = re.compile(r"\s+") + + +def hash_reference(value: str) -> str: + return hashlib.sha256(value.strip().lower().encode("utf-8")).hexdigest() + + +def normalize_merchant(value: str | None) -> str | None: + if not value: + return None + cleaned = value.lower() + cleaned = cleaned.replace("paypal *", " ") + cleaned = re.sub(r"\b(gmbh|ag|inc|ltd|co\.?|kg|se|eu|payments?)\b", " ", cleaned) + cleaned = re.sub(r"[^a-z0-9äöüß]+", " ", cleaned) + cleaned = _SPACES_RE.sub(" ", cleaned).strip() + return cleaned or None + + +def redact_free_text(value: str | None, *, limit: int = 500) -> str | None: + if not value: + return None + out = _EMAIL_RE.sub("[email]", value) + out = _DE_IBAN_RE.sub("DE[iban]", out) + out = _GENERIC_IBAN_RE.sub("[iban]", out) + out = _ORDER_LIKE_RE.sub(lambda m: f"{m.group(1)} [ref]", out) + out = _LONG_DIGITS_RE.sub("[number]", out) + out = _SPACES_RE.sub(" ", out).strip() + return out[:limit] if out else None + + +def _first_match(patterns: tuple[str, ...], text: str) -> str | None: + for pattern in patterns: + match = re.search(pattern, text, re.I | re.M) + if match: + return match.group(1).strip() + return None + + +def _parse_decimal(value: str | None) -> Decimal | None: + if not value: + return None + cleaned = value.strip() + if "," in cleaned and "." in cleaned: + normalized = cleaned.replace(".", "").replace(",", ".") + elif "," in cleaned: + normalized = cleaned.replace(",", ".") + else: + normalized = cleaned + try: + return Decimal(normalized).quantize(Decimal("0.01")) + except InvalidOperation: + return None + + +def _parse_date(value: str | None) -> date | None: + if not value: + return None + for fmt in ("%Y-%m-%d", "%d.%m.%Y", "%d/%m/%Y"): + try: + from datetime import datetime + + return datetime.strptime(value, fmt).date() + except ValueError: + continue + return None + + +def _detect_evidence_type(text: str) -> str: + lowered = text.lower() + refund_patterns = ( + r"\brefund(?:ed)?\b", + r"\brückerstattung\b", + r"\berstattung\b", + r"\berstattet\b", + r"\bgutschrift\b", + ) + if any(re.search(pattern, lowered) for pattern in refund_patterns): + return "refund" + if any(term in lowered for term in ("subscription", "abo", "verlängerung")): + return "subscription" + if any(term in lowered for term in ("receipt", "quittung", "beleg", "kassenbon")): + return "receipt" + if any(term in lowered for term in ("invoice", "rechnung")): + return "invoice" + return "order" + + +def _detect_payment_method(text: str) -> str | None: + lowered = text.lower() + if "paypal" in lowered: + return "paypal" + if any(term in lowered for term in ("visa", "mastercard", "kreditkarte", "credit card")): + return "credit_card" + if any(term in lowered for term in ("sepa", "lastschrift", "direct debit")): + return "direct_debit" + if any(term in lowered for term in ("kauf auf rechnung", "zahlungsart: invoice", "payment method: invoice")): + return "invoice" + return None + + +def _extract_line_items(text: str) -> list[dict[str, str]]: + items: list[dict[str, str]] = [] + for raw_line in text.splitlines(): + line = raw_line.strip() + if not line.startswith(("-", "*")) or "|" not in line: + continue + parts = [p.strip() for p in line[1:].split("|")] + if not parts or not parts[0]: + continue + item: dict[str, str] = {"label": redact_free_text(parts[0], limit=120) or "Artikel"} + if len(parts) > 1: + amount = _parse_decimal(parts[1]) + if amount is not None: + item["amount"] = str(amount) + if len(parts) > 2 and parts[2]: + item["category_hint"] = redact_free_text(parts[2], limit=80) or "" + items.append(item) + return items[:30] + + +def extract_evidence_from_message(message: dict[str, Any]) -> dict[str, Any]: + """Extract a sanitized evidence draft from one mail-like message. + + The parser intentionally supports a simple deterministic mock format first: + ``Merchant:``, ``Date:``, ``Total:``, ``Order:``, ``Payment:`` and optional + item lines in ``- label | amount | category_hint`` form. Real provider + extraction can produce the same draft shape later. + """ + + subject = str(message.get("subject") or "") + body = str(message.get("body_text") or "") + text = f"{subject}\n{body}" + merchant = _first_match( + ( + r"^\s*(?:merchant|händler|shop|vendor)\s*:\s*(.+)$", + r"^\s*(?:from|von)\s*:\s*(.+)$", + ), + text, + ) + if merchant is None: + merchant = str(message.get("sender") or "").split("@", maxsplit=1)[0] + merchant = redact_free_text(merchant, limit=200) + + amount_raw = _first_match( + ( + r"^\s*(?:total|betrag|summe|amount)\s*:\s*(?:EUR|€)?\s*(-?\d+[\d.,]*)", + r"^\s*(?:total|betrag|summe|amount)\s*:\s*(-?\d+[\d.,]*)\s*(?:EUR|€)", + r"^\s*(?:gesamtbetrag|rechnungsbetrag|offener betrag)(?:\s+inkl\.\s+mwst\.)?\s*:\s*(-?\d+[\d.,]*)\s*(?:EUR|€)", + r"\b(?:betrag|summe|gesamtbetrag|rechnungsbetrag)\s+von\s*(-?\d+[\d.,]*)\s*(?:EUR|€)", + r"\b(?:offener betrag|gesamtbetrag|rechnungsbetrag)(?:\s+inkl\.\s+mwst\.)?\s*:\s*(-?\d+[\d.,]*)\s*(?:EUR|€)", + ), + text, + ) + currency = (_first_match((r"\b(EUR|USD|GBP|CHF)\b",), text) or "EUR").upper() + document_date = _parse_date( + _first_match((r"^\s*(?:date|datum|order date|bestelldatum)\s*:\s*([0-9./-]+)",), text) + ) + order_ref = _first_match( + ( + r"^\s*(?:order|bestellnummer|bestellung|invoice|rechnung)\s*:\s*(.+)$", + r"\b(?:order|bestellnummer|bestellung|invoice|rechnung)[\s:#-]+([A-Z0-9._/-]{4,})\b", + ), + text, + ) + payment_method = _detect_payment_method(text) + confidence = Decimal("0.10") + total_amount = _parse_decimal(amount_raw) + if merchant: + confidence += Decimal("0.20") + if total_amount is not None: + confidence += Decimal("0.25") + if document_date is not None: + confidence += Decimal("0.15") + if order_ref: + confidence += Decimal("0.15") + if payment_method: + confidence += Decimal("0.10") + line_items = _extract_line_items(body) + if line_items: + confidence += Decimal("0.05") + + source_message_id = str(message.get("source_message_id") or subject or body) + order_ref_hash = hash_reference(order_ref) if order_ref else None + fallback_hash = hash_reference(source_message_id) + identity = "|".join( + [ + str(message.get("source") or "gmail"), + order_ref_hash or fallback_hash, + _detect_evidence_type(text), + str(total_amount or ""), + str(document_date or ""), + ] + ) + + return { + "id": f"mail_{hash_reference(identity)[:24]}", + "source": str(message.get("source") or "gmail"), + "evidence_type": _detect_evidence_type(text), + "merchant_name": merchant, + "merchant_key": normalize_merchant(merchant), + "document_date": document_date or message.get("received_at"), + "total_amount": total_amount.copy_abs() if total_amount is not None else None, + "currency": currency, + "payment_method": payment_method, + "payment_hint": redact_free_text( + _first_match((r"^\s*(?:payment|zahlung|zahlungsart)\s*:\s*(.+)$",), text), + limit=200, + ), + "order_ref_hash": order_ref_hash, + "subject_hint": redact_free_text(subject, limit=200), + "redacted_snippet": redact_free_text(body, limit=500), + "line_items": line_items, + "confidence": min(confidence, Decimal("0.999")), + } + + +def upsert_mail_evidence(session: Session, draft: dict[str, Any]) -> MailEvidence: + evidence = session.get(MailEvidence, draft["id"]) + if evidence is None and draft.get("order_ref_hash"): + evidence = session.execute( + select(MailEvidence).where( + MailEvidence.source == draft["source"], + MailEvidence.order_ref_hash == draft["order_ref_hash"], + MailEvidence.evidence_type == draft["evidence_type"], + ) + ).scalar_one_or_none() + + if evidence is None: + evidence = MailEvidence(**draft) + session.add(evidence) + else: + for key, value in draft.items(): + if key != "id": + setattr(evidence, key, value) + session.flush() + return evidence + + +def _amount_cents(value: Decimal) -> int: + return int((value * 100).quantize(Decimal("1"))) + + +def _haystack(tx: NormalizedTransaction) -> str: + return normalize_merchant( + " ".join(part for part in (tx.sender, tx.recipient, tx.description) if part) + ) or "" + + +def _score_candidate(evidence: MailEvidence, tx: NormalizedTransaction) -> tuple[Decimal, str]: + if evidence.total_amount is None: + return Decimal("0"), "no evidence amount" + if _amount_cents(evidence.total_amount) != abs(_amount_cents(tx.amount)): + return Decimal("0"), "amount mismatch" + + expected_credit = evidence.evidence_type == "refund" + if expected_credit and tx.amount <= 0: + return Decimal("0"), "refund evidence expects a credit" + if not expected_credit and tx.amount >= 0: + return Decimal("0"), "purchase evidence expects a debit" + + score = Decimal("0.55") + reasons = ["amount"] + + if evidence.document_date: + delta = abs((tx.booking_date - evidence.document_date).days) + if delta <= 1: + score += Decimal("0.15") + reasons.append("date<=1d") + elif delta <= 5: + score += Decimal("0.08") + reasons.append("date<=5d") + + tx_text = _haystack(tx) + if evidence.merchant_key: + merchant_tokens = [t for t in evidence.merchant_key.split() if len(t) >= 4] + if merchant_tokens and any(token in tx_text for token in merchant_tokens): + score += Decimal("0.20") + reasons.append("merchant") + + if evidence.payment_method and evidence.payment_method in tx_text: + score += Decimal("0.10") + reasons.append("payment_method") + elif evidence.payment_method == "credit_card" and any( + token in tx_text for token in ("visa", "mastercard", "santander") + ): + score += Decimal("0.10") + reasons.append("payment_method") + + return min(score, Decimal("0.999")), "+".join(reasons) + + +def match_evidence_to_transactions( + session: Session, + evidence: MailEvidence, + *, + date_window_days: int = 7, + min_confidence: Decimal = Decimal("0.65"), +) -> list[TransactionEvidenceLink]: + if evidence.total_amount is None or evidence.document_date is None: + return [] + + start = evidence.document_date - timedelta(days=date_window_days) + end = evidence.document_date + timedelta(days=date_window_days) + candidates = session.execute( + select(NormalizedTransaction) + .where( + and_( + NormalizedTransaction.booking_date >= start, + NormalizedTransaction.booking_date <= end, + NormalizedTransaction.internal_transfer.is_(False), + ) + ) + .order_by(NormalizedTransaction.booking_date) + ).scalars() + + links: list[TransactionEvidenceLink] = [] + for tx in candidates: + score, reason = _score_candidate(evidence, tx) + if score < min_confidence: + continue + match_type = f"{evidence.evidence_type}_match" + link_id = f"evlink_{hash_reference(f'{tx.id}|{evidence.id}|{match_type}')[:24]}" + link = session.get(TransactionEvidenceLink, link_id) + if link is None: + link = TransactionEvidenceLink( + id=link_id, + transaction_id=tx.id, + evidence_id=evidence.id, + match_type=match_type, + confidence=score, + match_reason=reason, + ) + session.add(link) + else: + link.confidence = score + link.match_reason = reason + links.append(link) + session.flush() + return links + + +def import_mail_message(session: Session, message: dict[str, Any]) -> tuple[MailEvidence, list[TransactionEvidenceLink]]: + draft = extract_evidence_from_message(message) + evidence = upsert_mail_evidence(session, draft) + links = match_evidence_to_transactions(session, evidence) + session.commit() + session.refresh(evidence) + for link in links: + session.refresh(link) + return evidence, links diff --git a/tests/fixtures/schema_snapshot.json b/tests/fixtures/schema_snapshot.json index 7de05cd..5646595 100644 --- a/tests/fixtures/schema_snapshot.json +++ b/tests/fixtures/schema_snapshot.json @@ -11,6 +11,10 @@ "paypal", "santander_cc" ], + "portfolio_target_type": [ + "isin", + "bucket" + ], "report_status_enum": [ "pending", "ready", @@ -28,6 +32,11 @@ "scheduled", "webhook" ], + "savings_plan_interval": [ + "monthly", + "quarterly", + "yearly" + ], "sync_stage_enum": [ "raw_import", "normalize" @@ -239,21 +248,41 @@ "nullable": false, "type": "VARCHAR" }, + "context_note": { + "nullable": true, + "type": "VARCHAR(500)" + }, "created_at": { "nullable": false, "type": "TIMESTAMP" }, + "critical_threshold": { + "nullable": false, + "type": "NUMERIC(4, 2)" + }, "currency": { "nullable": false, "type": "VARCHAR(3)" }, + "is_active": { + "nullable": false, + "type": "BOOLEAN" + }, "monthly_limit": { "nullable": false, "type": "NUMERIC(14, 2)" }, + "priority": { + "nullable": false, + "type": "INTEGER" + }, "updated_at": { "nullable": false, "type": "TIMESTAMP" + }, + "warning_threshold": { + "nullable": false, + "type": "NUMERIC(4, 2)" } }, "foreign_keys": [ @@ -266,10 +295,38 @@ }, "categories": { "columns": { + "analysis_group": { + "nullable": true, + "type": "VARCHAR(64)" + }, + "anti_examples": { + "nullable": true, + "type": "JSON" + }, + "budgetable": { + "nullable": false, + "type": "BOOLEAN" + }, + "description": { + "nullable": true, + "type": "VARCHAR(500)" + }, + "examples": { + "nullable": true, + "type": "JSON" + }, "id": { "nullable": false, "type": "VARCHAR" }, + "kind": { + "nullable": false, + "type": "VARCHAR(32)" + }, + "llm_hints": { + "nullable": true, + "type": "VARCHAR(1000)" + }, "name": { "nullable": false, "type": "VARCHAR" @@ -511,6 +568,110 @@ "isin" ] }, + "mail_evidence": { + "columns": { + "confidence": { + "nullable": false, + "type": "NUMERIC(4, 3)" + }, + "created_at": { + "nullable": false, + "type": "TIMESTAMP" + }, + "currency": { + "nullable": false, + "type": "VARCHAR(3)" + }, + "document_date": { + "nullable": true, + "type": "DATE" + }, + "evidence_type": { + "nullable": false, + "type": "VARCHAR(32)" + }, + "id": { + "nullable": false, + "type": "VARCHAR(64)" + }, + "line_items": { + "nullable": true, + "type": "JSON" + }, + "merchant_key": { + "nullable": true, + "type": "VARCHAR(200)" + }, + "merchant_name": { + "nullable": true, + "type": "VARCHAR(200)" + }, + "order_ref_hash": { + "nullable": true, + "type": "VARCHAR(64)" + }, + "payment_hint": { + "nullable": true, + "type": "VARCHAR(200)" + }, + "payment_method": { + "nullable": true, + "type": "VARCHAR(32)" + }, + "redacted_snippet": { + "nullable": true, + "type": "VARCHAR(500)" + }, + "source": { + "nullable": false, + "type": "VARCHAR(32)" + }, + "subject_hint": { + "nullable": true, + "type": "VARCHAR(200)" + }, + "total_amount": { + "nullable": true, + "type": "NUMERIC(14, 2)" + }, + "updated_at": { + "nullable": false, + "type": "TIMESTAMP" + } + }, + "foreign_keys": [], + "indexes": { + "ix_mail_evidence_document_date": { + "columns": [ + "document_date" + ], + "unique": false + }, + "ix_mail_evidence_merchant_key": { + "columns": [ + "merchant_key" + ], + "unique": false + }, + "ix_mail_evidence_order_ref_hash": { + "columns": [ + "order_ref_hash" + ], + "unique": false + }, + "uq_mail_evidence_source_order_type": { + "columns": [ + "source", + "order_ref_hash", + "evidence_type" + ], + "unique": true + } + }, + "primary_key": [ + "id" + ] + }, "normalized_transactions": { "columns": { "amount": { @@ -709,6 +870,59 @@ "id" ] }, + "portfolio_targets": { + "columns": { + "active": { + "nullable": false, + "type": "BOOLEAN" + }, + "created_at": { + "nullable": false, + "type": "TIMESTAMP" + }, + "id": { + "nullable": false, + "type": "INTEGER" + }, + "max_weight_pct": { + "nullable": true, + "type": "NUMERIC(6, 2)" + }, + "note": { + "nullable": true, + "type": "VARCHAR(500)" + }, + "target_key": { + "nullable": false, + "type": "VARCHAR(64)" + }, + "target_type": { + "nullable": false, + "type": "VARCHAR(6)" + }, + "target_weight_pct": { + "nullable": false, + "type": "NUMERIC(6, 2)" + }, + "updated_at": { + "nullable": false, + "type": "TIMESTAMP" + } + }, + "foreign_keys": [], + "indexes": { + "uq_portfolio_targets_key": { + "columns": [ + "target_type", + "target_key" + ], + "unique": true + } + }, + "primary_key": [ + "id" + ] + }, "positions": { "columns": { "as_of": { @@ -1008,6 +1222,70 @@ "id" ] }, + "savings_plans": { + "columns": { + "active": { + "nullable": false, + "type": "BOOLEAN" + }, + "amount": { + "nullable": false, + "type": "NUMERIC(14, 2)" + }, + "created_at": { + "nullable": false, + "type": "TIMESTAMP" + }, + "currency": { + "nullable": false, + "type": "VARCHAR(3)" + }, + "id": { + "nullable": false, + "type": "INTEGER" + }, + "interval": { + "nullable": false, + "type": "VARCHAR(9)" + }, + "isin": { + "nullable": false, + "type": "VARCHAR(12)" + }, + "note": { + "nullable": true, + "type": "VARCHAR(500)" + }, + "start_date": { + "nullable": false, + "type": "DATE" + }, + "updated_at": { + "nullable": false, + "type": "TIMESTAMP" + } + }, + "foreign_keys": [ + "['isin']->instruments.['isin']" + ], + "indexes": { + "ix_savings_plans_isin": { + "columns": [ + "isin" + ], + "unique": false + }, + "uq_savings_plans_isin": { + "columns": [ + "isin" + ], + "unique": true + } + }, + "primary_key": [ + "id" + ] + }, "sync_runs": { "columns": { "data_source": { @@ -1073,6 +1351,67 @@ "id" ] }, + "transaction_evidence_links": { + "columns": { + "confidence": { + "nullable": false, + "type": "NUMERIC(4, 3)" + }, + "created_at": { + "nullable": false, + "type": "TIMESTAMP" + }, + "evidence_id": { + "nullable": false, + "type": "VARCHAR(64)" + }, + "id": { + "nullable": false, + "type": "VARCHAR(64)" + }, + "match_reason": { + "nullable": false, + "type": "VARCHAR(500)" + }, + "match_type": { + "nullable": false, + "type": "VARCHAR(32)" + }, + "transaction_id": { + "nullable": false, + "type": "VARCHAR(64)" + } + }, + "foreign_keys": [ + "['evidence_id']->mail_evidence.['id']", + "['transaction_id']->normalized_transactions.['id']" + ], + "indexes": { + "ix_transaction_evidence_links_evidence": { + "columns": [ + "evidence_id" + ], + "unique": false + }, + "ix_transaction_evidence_links_transaction": { + "columns": [ + "transaction_id" + ], + "unique": false + }, + "uq_transaction_evidence_link": { + "columns": [ + "transaction_id", + "evidence_id", + "match_type" + ], + "unique": true + } + }, + "primary_key": [ + "id" + ] + }, "transaction_links": { "columns": { "child_transaction_id": { diff --git a/tests/test_agent_orchestrator.py b/tests/test_agent_orchestrator.py index eb177fc..849d1ee 100644 --- a/tests/test_agent_orchestrator.py +++ b/tests/test_agent_orchestrator.py @@ -685,6 +685,44 @@ async def get(self, url, **kwargs): "snippet": "Fleischerei in Bamberg seit 1898", } + def test_sanitizes_query_before_external_search(self): + """SearXNG gets merchant-ish text only, never private references.""" + import asyncio + + from src.agents.categorization import search_web + from src.core.config import settings + + captured_queries: list[str] = [] + + class _FakeResp: + def raise_for_status(self): + return None + + def json(self): + return {"results": []} + + class _FakeClient: + def __init__(self, *args, **kwargs): + pass + + async def __aenter__(self): + return self + + async def __aexit__(self, *args): + return None + + async def get(self, _url, **kwargs): + captured_queries.append(kwargs["params"]["q"]) + return _FakeResp() + + with patch.object(settings, "searxng_url", "https://search.example.com"): + with patch("src.agents.categorization.httpx.AsyncClient", _FakeClient): + asyncio.run(search_web("ACME order ACME-123456789 max@example.invalid")) + + assert captured_queries == ["ACME order"] + assert "ACME-123456789" not in captured_queries[0] + assert "max@example.invalid" not in captured_queries[0] + def test_unreachable_searxng_returns_error_dict(self): """Connection error ⇒ structured error, not raised exception.""" import asyncio @@ -769,7 +807,9 @@ def test_run_full_creates_multiple_reports(self, db_engine, agent_seed): from pydantic_ai.models.test import TestModel from src.agents.anomaly import anomaly_agent + from src.agents.budget_analysis import budget_analysis_agent from src.agents.categorization import categorization_agent + from src.agents.category_audit import category_audit_agent from src.agents.monthly_analysis import monthly_analysis_agent from src.agents.orchestrator import AgentOrchestrator from src.agents.synthesizer import synthesizer_agent @@ -781,6 +821,8 @@ def test_run_full_creates_multiple_reports(self, db_engine, agent_seed): with ( categorization_agent.override(model=TestModel()), + category_audit_agent.override(model=TestModel()), + budget_analysis_agent.override(model=TestModel()), weekly_analysis_agent.override(model=TestModel()), monthly_analysis_agent.override(model=TestModel()), anomaly_agent.override(model=TestModel()), @@ -794,12 +836,14 @@ def test_run_full_creates_multiple_reports(self, db_engine, agent_seed): assert run.agent_name == "full" assert run.status == RunStatus.SUCCEEDED - # 5 new reports + 1 seed report + # 7 new reports + 1 seed report new_reports = s.query(Report).filter(Report.id != "prev-report-001").all() report_types = {r.report_type for r in new_reports} assert "categorization" in report_types + assert "category_audit" in report_types + assert "budget_analysis" in report_types assert "synthesis" in report_types - assert len(new_reports) == 5 + assert len(new_reports) == 7 def test_run_single_for_reuses_existing_run(self, db_engine, agent_seed): """run_single_for picks up an existing PENDING run.""" diff --git a/tests/test_mail_evidence_layer.py b/tests/test_mail_evidence_layer.py new file mode 100644 index 0000000..b4b45ce --- /dev/null +++ b/tests/test_mail_evidence_layer.py @@ -0,0 +1,447 @@ +from __future__ import annotations + +import base64 +import json +import os +from datetime import date +from decimal import Decimal +from unittest.mock import patch + +import pytest +from fastapi.testclient import TestClient +from sqlalchemy import create_engine +from sqlalchemy.orm import Session +from sqlalchemy.pool import StaticPool + +from src.core.db.models import ( + Base, + Budget, + Category, + DataSource, + NormalizedTransaction, + RawTransaction, + TypeEnum, +) +from src.services import financial_aggregates +from src.services.llm_context import assert_context_safe, sanitize_context, sanitize_search_query +from src.services.mail_evidence import extract_evidence_from_message, import_mail_message +from src.external.gmail_import import gmail_message_to_mail_import + +AUTH = {"Authorization": "Bearer test-secret"} + + +@pytest.fixture +def sqlite_engine(): + engine = create_engine( + "sqlite+pysqlite:///:memory:", + connect_args={"check_same_thread": False}, + poolclass=StaticPool, + ) + Base.metadata.create_all(engine) + try: + yield engine + finally: + engine.dispose() + + +@pytest.fixture +def sqlite_api_client(sqlite_engine): + from src.core.db import get_db + + def _override_get_db(): + with Session(sqlite_engine) as session: + yield session + + with patch.dict(os.environ, {"API_TOKEN": "test-secret", "DATABASE_URL": ""}): + from src.api.app import create_app + + app = create_app() + app.dependency_overrides[get_db] = _override_get_db + client = TestClient(app) + yield client + app.dependency_overrides.clear() + + +def _seed_purchase(session: Session) -> None: + session.add( + Category( + id="shopping", + name="Shopping", + type=TypeEnum.DISKRETIONAER, + kind="expense", + budgetable=True, + analysis_group="discretionary", + examples=["retail orders"], + anti_examples=["salary"], + llm_hints="Use receipt line items when available.", + ) + ) + session.add( + Budget( + category_id="shopping", + monthly_limit=Decimal("40.00"), + priority=2, + warning_threshold=Decimal("0.80"), + critical_threshold=Decimal("1.00"), + context_note="Keep discretionary orders tight.", + ) + ) + session.add( + RawTransaction( + content_hash="m" * 64, + source=DataSource.PAYPAL, + external_id="paypal-mock-001", + raw_data={"stub": True}, + ) + ) + session.flush() + session.add( + NormalizedTransaction( + id="txn-mail-match-001", + raw_content_hash="m" * 64, + source=DataSource.PAYPAL, + external_id="paypal-mock-001", + booking_date=date(2026, 6, 7), + valuation_date=date(2026, 6, 7), + amount=Decimal("-42.99"), + currency="EUR", + sender="Max Mustermann", + recipient="PayPal *ACME Shop", + description="ACME Shop Paypal checkout", + category_id="shopping", + is_recurring=False, + is_outlier=False, + internal_transfer=False, + ) + ) + session.commit() + + +def _mock_mail_payload(message_id: str = "gmail-msg-demo-001") -> dict: + return { + "source": "gmail", + "source_message_id": message_id, + "sender": "receipts@acme.example", + "subject": "Invoice for order ACME-123456789", + "body_text": "\n".join( + [ + "Merchant: ACME Shop GmbH", + "Date: 2026-06-07", + "Total: 42,99 EUR", + "Order: ACME-123456789", + "Payment: PayPal", + "Customer email: max@example.invalid", + "IBAN: DE89370400440532013000", + "- Outdoor Jacket | 39,99 | clothing", + "- Shipping | 3,00 | shipping", + ] + ), + } + + +def _gmail_body(value: str) -> str: + return base64.urlsafe_b64encode(value.encode("utf-8")).decode("ascii").rstrip("=") + + +def _gmail_payload( + *, + message_id: str = "gmail-realish-001", + mime_type: str = "text/plain", + body: str = "Merchant: ACME Shop\nDate: 2026-06-07\nTotal: 42,99 EUR", +) -> dict: + return { + "id": message_id, + "internalDate": "1780819200000", + "payload": { + "mimeType": "multipart/alternative", + "headers": [ + {"name": "From", "value": "ACME Receipts "}, + {"name": "Subject", "value": "Invoice for order ACME-123456789"}, + {"name": "Date", "value": "Sun, 07 Jun 2026 12:00:00 +0200"}, + ], + "parts": [ + { + "mimeType": mime_type, + "body": {"data": _gmail_body(body)}, + } + ], + }, + } + + +def test_mock_mail_evidence_is_redacted_and_matched(sqlite_engine): + with Session(sqlite_engine) as session: + _seed_purchase(session) + + evidence, links = import_mail_message(session, _mock_mail_payload()) + + assert evidence.total_amount == Decimal("42.99") + assert evidence.merchant_key == "acme shop" + assert evidence.order_ref_hash is not None + assert "ACME-123456789" not in (evidence.subject_hint or "") + assert "ACME-123456789" not in (evidence.redacted_snippet or "") + assert "max@example.invalid" not in (evidence.redacted_snippet or "") + assert "DE89370400440532013000" not in (evidence.redacted_snippet or "") + assert evidence.line_items == [ + { + "label": "Outdoor Jacket", + "amount": "39.99", + "category_hint": "clothing", + }, + {"label": "Shipping", "amount": "3.00", "category_hint": "shipping"}, + ] + + assert len(links) == 1 + assert links[0].transaction_id == "txn-mail-match-001" + assert links[0].match_type == "invoice_match" + assert links[0].confidence >= Decimal("0.90") + + +def test_analysis_context_includes_budget_and_mail_evidence(sqlite_engine): + with Session(sqlite_engine) as session: + _seed_purchase(session) + evidence, _links = import_mail_message( + session, + { + "source": "gmail", + "source_message_id": "gmail-msg-demo-002", + "sender": "receipts@acme.example", + "subject": "ACME receipt", + "body_text": "\n".join( + [ + "Merchant: ACME Shop", + "Date: 2026-06-07", + "Total: 42.99 EUR", + "Payment: PayPal", + ] + ), + }, + ) + + context = financial_aggregates.analysis_context(session, year=2026, month=6) + + assert context["budget_risks"][0]["category_id"] == "shopping" + assert context["budget_risks"][0]["status"] == "critical" + assert context["mail_evidence"][0]["id"] == evidence.id + assert context["top_transactions"][0]["evidence"]["id"] == evidence.id + + +def test_mail_evidence_api_import_lists_and_feeds_analysis_context( + sqlite_engine, + sqlite_api_client, +): + with Session(sqlite_engine) as session: + _seed_purchase(session) + + import_resp = sqlite_api_client.post( + "/api/v1/mail-evidence/mock-import", + headers=AUTH, + json=_mock_mail_payload(), + ) + assert import_resp.status_code == 200 + imported = import_resp.json() + assert imported["evidence"]["total_amount"] == "42.99" + assert imported["evidence"]["order_ref_hash"] + assert "ACME-123456789" not in imported["evidence"]["subject_hint"] + assert "max@example.invalid" not in imported["evidence"]["redacted_snippet"] + assert imported["links"][0]["transaction_id"] == "txn-mail-match-001" + + list_resp = sqlite_api_client.get("/api/v1/mail-evidence", headers=AUTH) + assert list_resp.status_code == 200 + assert list_resp.json()[0]["id"] == imported["evidence"]["id"] + + context_resp = sqlite_api_client.get( + "/api/v1/aggregates/analysis-context?year=2026&month=6", + headers=AUTH, + ) + assert context_resp.status_code == 200 + context = context_resp.json() + assert context["budget_risks"][0]["category_id"] == "shopping" + assert context["top_transactions"][0]["evidence"]["id"] == imported["evidence"]["id"] + + +def test_gmail_message_converter_builds_provider_neutral_import(): + converted = gmail_message_to_mail_import(_gmail_payload()) + + assert converted == { + "source": "gmail", + "source_message_id": "gmail-realish-001", + "received_at": date(2026, 6, 7), + "sender": "ACME Receipts ", + "subject": "Invoice for order ACME-123456789", + "body_text": "Merchant: ACME Shop\nDate: 2026-06-07\nTotal: 42,99 EUR", + } + + +def test_gmail_message_converter_uses_html_fallback(): + converted = gmail_message_to_mail_import( + _gmail_payload( + mime_type="text/html", + body="Merchant: ACME & Co
Total: 12,34 EUR", + ) + ) + + assert converted["body_text"] == "Merchant: ACME & Co Total: 12,34 EUR" + + +def test_mail_evidence_api_accepts_real_import_endpoint( + sqlite_engine, + sqlite_api_client, +): + with Session(sqlite_engine) as session: + _seed_purchase(session) + + converted = gmail_message_to_mail_import(_gmail_payload()) + import_resp = sqlite_api_client.post( + "/api/v1/mail-evidence/import", + headers=AUTH, + json={**converted, "received_at": str(converted["received_at"])}, + ) + + assert import_resp.status_code == 200 + imported = import_resp.json() + assert imported["evidence"]["merchant_key"] == "acme shop" + assert imported["evidence"]["total_amount"] == "42.99" + assert imported["links"][0]["transaction_id"] == "txn-mail-match-001" + + +def test_llm_context_sanitizer_redacts_and_pseudonymizes_sensitive_fields(): + sanitized = sanitize_context( + { + "transaction_id": "txn-real-001", + "transaction_ids": ["txn-real-001", "txn-real-002"], + "external_id": "provider-secret-id", + "source_payload": {"raw": "do not send"}, + "order_id": 123456789, + "invoice_number": "INV-123456789", + "description": ( + "Invoice ACME-123456789 for max@example.invalid " + "IBAN DE89370400440532013000" + ), + "nested": {"evidence_id": "mail_real_001"}, + } + ) + + assert sanitized["transaction_id"] == "tx_001" + assert sanitized["transaction_ids"] == ["tx_001", "tx_002"] + assert "external_id" not in sanitized + assert "source_payload" not in sanitized + assert "order_id" not in sanitized + assert "invoice_number" not in sanitized + assert "max@example.invalid" not in sanitized["description"] + assert "DE89370400440532013000" not in sanitized["description"] + assert "ACME-123456789" not in sanitized["description"] + assert sanitized["nested"]["evidence_id"] == "ev_001" + + +def test_llm_context_sanitizer_normalizes_json_scalar_types(): + sanitized = sanitize_context( + { + "amount": Decimal("42.99"), + "booking_date": date(2026, 6, 22), + } + ) + + assert sanitized == { + "amount": "42.99", + "booking_date": "2026-06-22", + } + json.dumps(sanitized) + + +def test_search_query_sanitizer_removes_private_reference_noise(): + query = sanitize_search_query( + "ACME Shop order ACME-123456789 max@example.invalid 2026-06-07" + ) + + assert "ACME Shop" in query + assert "ACME-123456789" not in query + assert "max@example.invalid" not in query + + +def test_context_safety_rejects_unsanitized_reference_patterns(): + with pytest.raises(ValueError, match="forbidden pattern"): + assert_context_safe({"description": "Order ACME-123456789"}) + + +@pytest.mark.parametrize( + ("subject", "expected_type"), + [ + ("Receipt from ACME Shop", "receipt"), + ("Invoice from ACME Shop", "invoice"), + ("Subscription renewal ACME Cloud", "subscription"), + ("Refund for ACME return", "refund"), + ("Order confirmation ACME Shop", "order"), + ], +) +def test_mail_evidence_type_detection_covers_planned_mail_objects( + subject: str, + expected_type: str, +): + draft = extract_evidence_from_message( + { + "source_message_id": f"msg-{expected_type}", + "sender": "receipts@acme.example", + "subject": subject, + "body_text": "Merchant: ACME Shop\nTotal: 1.00 EUR\nDate: 2026-06-07", + } + ) + + assert draft["evidence_type"] == expected_type + + +def test_mail_evidence_extraction_handles_unstructured_non_financial_mail(): + draft = extract_evidence_from_message( + { + "source_message_id": "newsletter-noise", + "sender": "newsletter@example.invalid", + "subject": "Some update without finance markers", + "body_text": "Just a regular update with no total and no currency.", + } + ) + + assert draft["currency"] == "EUR" + assert draft["total_amount"] is None + + +def test_mail_evidence_extraction_handles_german_invoice_amounts(): + draft = extract_evidence_from_message( + { + "source_message_id": "unzer-decathlon-demo", + "sender": "decathlon.de Unzer Rechnung ", + "subject": "Zahlungsinformationen zum Kauf auf Rechnung bei decathlon.de", + "received_at": date(2026, 6, 20), + "body_text": ( + "Vielen Dank für Deine Bestellung bei decathlon.de " + "(Bestellnummer ABCD-123456789). Bitte überweise den Betrag von " + "74,98 EUR bis spätestens zum 11.07.2026. " + "Zahlungsinformationen Empfänger: Unzer E-Com GmbH " + "Offener Betrag: 74,98 EUR IBAN: DE89370400440532013000" + ), + } + ) + + assert draft["evidence_type"] == "invoice" + assert draft["total_amount"] == Decimal("74.98") + assert draft["payment_method"] == "invoice" + assert draft["document_date"] == date(2026, 6, 20) + assert draft["order_ref_hash"] is not None + assert "ABCD-123456789" not in (draft["redacted_snippet"] or "") + + +def test_mail_evidence_type_ignores_shop_return_boilerplate(): + draft = extract_evidence_from_message( + { + "source_message_id": "decathlon-order-demo", + "sender": "Decathlon Service ", + "subject": "Vielen Dank für deinen Einkauf!", + "received_at": date(2026, 6, 20), + "body_text": ( + "Bestellnummer ABCD-123456789 Zahlungsart: invoice " + "Gesamtbetrag inkl. MwSt.: 74,98 € " + "Lieferung Retouren und Rückerstattungen 30-Tage Rückgaberecht" + ), + } + ) + + assert draft["evidence_type"] == "invoice"