Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ python backend/scripts/seed_ifrs9_metadata.py

The sample-db contains an **IFRS 9 banking schema** with 6 tables: `counterparties`, `facilities`, `exposures`, `ecl_provisions`, `collateral`, `staging_history`. Connection string (from Docker): `postgresql://sample:sample_dev@sample-db:5432/sampledb`.

The same container hosts a second database, **`opsdb`** — a deliberately hostile operational-style schema (no FKs, `tenant_id` scoping, soft deletes, int status codes, lookup tables, business-logic views, a dead `customers_bak` table) used to exercise the semantic layer compiler. Connection string: `postgresql://sample:sample_dev@sample-db:5432/opsdb`. The container runs with `pg_stat_statements` preloaded; populate query logs with `python backend/scripts/run_ops_workload.py`. Fixtures: `backend/tests/fixtures/ops_seed.sql` (+ `ops_extensions.sql`). Init scripts only apply on a fresh volume (`docker compose down -v`).

**Auto-setup** (`AUTO_SETUP_SAMPLE_DB=true`, default): On first `docker compose up`, the backend automatically creates the connection, introspects the schema, seeds all metadata (10 glossary terms, 8 metrics, 43 dictionary entries across 12 columns, 1 knowledge document), and launches background embedding generation. Logic in `app/services/setup_service.py`, called from `main.py` lifespan hook. Idempotent — safe to restart.

**Startup sequence** (in `main.py` lifespan):
Expand Down Expand Up @@ -270,6 +272,17 @@ Makes the semantic layer discoverable and trustworthy. Two milestones; migration
- **Endpoints:** `/connections/{id}/catalog/{search,facets,lineage}`, plus `/status`, `/versions`, `/versions/{v}`, and `/lineage` sub-resources on the metric/glossary/sample-query/saved-query routers.
- **Heads-up:** existing rows migrate to `status='draft'`, `version=1`. The saved-query PUT routes any `status` change through the governed lifecycle (no raw status writes). sqlglot is a new optional dep — install the `[lineage]` extra (or rebuild the backend image) for lineage to populate.

## Semantic layer compiler (Slice 1)

Attacks the cold-start problem: point QueryWise at an operational DB with an empty semantic layer and get reviewable draft objects. Migration `013`.

- **Engine** (`app/semantic_compiler/`): self-contained package (dataclasses + pure functions, no FastAPI/ORM imports — standalone-CLI extractable). Collectors gather evidence (catalog via the connector, `pg_stats`/CHECK/enums/unique indexes, `pg_get_viewdef`, `pg_stat_statements`); `sqlmeta.py` (sqlglot, graceful degradation) extracts join pairs/aggregates/GROUP BY/WHERE; inference modules emit `Finding`s with evidence + confidence: **join inference without FKs** (naming + value-overlap probe + log co-occurrence; failed probe kills the candidate), dictionaries (enum/CHECK/lookup-table labels/most_common_vals — note pg_stats `n_distinct` is negative when it scales with rows), view→metric extraction, recurring log aggregates, dead tables, tenant scoping (call-weighted log confirmation required), PII (name + sampled value shape), fan-out warnings (1:N parent-measure double-count). The LLM pass (`app/llm/agents/semantic_annotator.py`) only names/describes — output merges onto naming fields, never structure; runs fine without a provider. Output is hard-capped per kind (`Thresholds`) — review fatigue kills draft tools.
- **Staging, not drafts** (`CompilationRun`/`CompilationFinding`, `app/services/compilation_service.py`): findings never touch semantic tables until accepted (draft metrics/glossary feed the context builder today). Accept dispatches per kind through existing creation paths (embed + lineage), landing as `status='draft'`; policies (`PII masking`, `dead tables`, row filters) are created **disabled**; fan-out guidance becomes a knowledge doc (so the prompt assembler picks it up via RAG). Runs as a background job (`semantic_compilation`) with progress (`compilation_progress.py`).
- **Rematerialization:** `introspect_and_cache` wipes cached tables (cascading to inferred relationships + dictionary entries), so accepted findings are **name-keyed** and `rematerialize_accepted` re-creates them after every introspect. `cached_relationships` gained `origin` (`fk|inferred`), `confidence`, `cardinality`, `evidence`.
- **Endpoints:** `/connections/{id}/compilation/runs` (+ `/runs/{rid}`), `/compilation/findings` (+ `/{fid}/accept`, `/{fid}/dismiss`, `/bulk`). Frontend: `pages/CompilerPage.tsx` (run button, progress, findings grouped by kind with evidence + confidence, bulk accept/dismiss).
- **Eval:** `python backend/scripts/eval_compiler_ifrs9.py` scores recovery of the IFRS 9 seed metadata with FKs hidden (`ignore_declared_fks`). Baseline: relationships 5/5 @ 100% precision, dictionary 79%/89%, glossary table-coverage 10/10; metrics need views/logs (sampledb has neither — expected 0).
- **Heads-up:** `pg_stats` is empty until ANALYZE; `pg_stat_statements` needs the extension + read rights (`pg_read_all_stats`). Every collector degrades to empty and the run records `sources_available` so the UI explains reduced confidence. Collectors are Postgres-only for now — other connectors compile catalog-only.

## Packaging & deployability (parallel track)

Production deployment artifacts under `deploy/` (+ root prod compose), separate from the dev `docker-compose.yml` / `Dockerfile`s (which stay untouched for local work). The whole **Packaging & deployability** parallel track from `planfull.md` is complete: hardened images, prod compose, Helm chart, Terraform for AWS + GCP + Azure, CI/CD (build/push/deploy), and ops (backup/restore, DR runbook, config reference). The only deferred item is the **SaaS control plane** (provisioning/billing/fleet upgrades), which is additive and build-on-demand. Overview: `deploy/README.md`.
Expand Down
121 changes: 121 additions & 0 deletions backend/alembic/versions/013_semantic_compiler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
"""Semantic layer compiler (Slice 1)

Revision ID: 013
Revises: 012
Create Date: 2026-06-10

Adds the compiler staging tables and inferred-relationship support:

* ``cached_relationships`` gains ``origin`` ('fk' | 'inferred'), ``confidence``,
``cardinality`` and ``evidence`` so join edges inferred by the compiler can
coexist with FK-derived ones.
* ``compilation_runs`` — one row per compiler execution against a connection.
* ``compilation_findings`` — proposed semantic objects with name-keyed payloads,
evidence, and confidence. Findings become real semantic objects only on
explicit accept; accepted findings are the durable source for rematerializing
inferred relationships and dictionary entries after re-introspection (which
wipes the schema cache).
"""

from collections.abc import Sequence

import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import JSONB, UUID

from alembic import op

# revision identifiers, used by Alembic.
revision: str = "013"
down_revision: str = "012"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None


def upgrade() -> None:
op.add_column(
"cached_relationships",
sa.Column("origin", sa.String(20), nullable=False, server_default="fk"),
)
op.add_column("cached_relationships", sa.Column("confidence", sa.Float, nullable=True))
op.add_column("cached_relationships", sa.Column("cardinality", sa.String(10), nullable=True))
op.add_column("cached_relationships", sa.Column("evidence", JSONB, nullable=True))

op.create_table(
"compilation_runs",
sa.Column("id", UUID(as_uuid=True), primary_key=True),
sa.Column(
"connection_id",
UUID(as_uuid=True),
sa.ForeignKey("database_connections.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("status", sa.String(20), nullable=False, server_default="queued"),
sa.Column("options", JSONB, nullable=False, server_default=sa.text("'{}'::jsonb")),
sa.Column("stats", JSONB, nullable=False, server_default=sa.text("'{}'::jsonb")),
sa.Column("error", sa.Text, nullable=True),
sa.Column(
"triggered_by_id",
UUID(as_uuid=True),
sa.ForeignKey("users.id", ondelete="SET NULL"),
nullable=True,
),
sa.Column("started_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("finished_at", sa.DateTime(timezone=True), nullable=True),
sa.Column(
"created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False
),
)
op.create_index("ix_compilation_runs_connection_id", "compilation_runs", ["connection_id"])

op.create_table(
"compilation_findings",
sa.Column("id", UUID(as_uuid=True), primary_key=True),
sa.Column(
"run_id",
UUID(as_uuid=True),
sa.ForeignKey("compilation_runs.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column(
"connection_id",
UUID(as_uuid=True),
sa.ForeignKey("database_connections.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("kind", sa.String(40), nullable=False),
sa.Column("title", sa.String(255), nullable=False),
sa.Column("payload", JSONB, nullable=False, server_default=sa.text("'{}'::jsonb")),
sa.Column("evidence", JSONB, nullable=False, server_default=sa.text("'[]'::jsonb")),
sa.Column("confidence", sa.Float, nullable=False, server_default=sa.text("0")),
sa.Column("status", sa.String(20), nullable=False, server_default="proposed"),
sa.Column("created_entity_type", sa.String(40), nullable=True),
sa.Column("created_entity_id", UUID(as_uuid=True), nullable=True),
sa.Column(
"reviewed_by_id",
UUID(as_uuid=True),
sa.ForeignKey("users.id", ondelete="SET NULL"),
nullable=True,
),
sa.Column("reviewed_at", sa.DateTime(timezone=True), nullable=True),
sa.Column(
"created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False
),
)
op.create_index(
"ix_compilation_findings_conn_status_kind",
"compilation_findings",
["connection_id", "status", "kind"],
)
op.create_index("ix_compilation_findings_run_id", "compilation_findings", ["run_id"])


def downgrade() -> None:
op.drop_index("ix_compilation_findings_run_id", table_name="compilation_findings")
op.drop_index("ix_compilation_findings_conn_status_kind", table_name="compilation_findings")
op.drop_table("compilation_findings")
op.drop_index("ix_compilation_runs_connection_id", table_name="compilation_runs")
op.drop_table("compilation_runs")
op.drop_column("cached_relationships", "evidence")
op.drop_column("cached_relationships", "cardinality")
op.drop_column("cached_relationships", "confidence")
op.drop_column("cached_relationships", "origin")
140 changes: 140 additions & 0 deletions backend/app/api/v1/endpoints/compilation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
"""Semantic layer compiler endpoints: runs + findings review."""

import uuid

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession

from app.api.v1.deps import require_connection_read, require_connection_write
from app.api.v1.schemas.compilation import (
BulkReviewRequest,
BulkReviewResponse,
CompilationFindingResponse,
CompilationProgressResponse,
CompilationRunCreate,
CompilationRunResponse,
)
from app.core.auth import AuthContext
from app.db.session import get_db
from app.services import compilation_progress, compilation_service

router = APIRouter(tags=["compilation"])


def _with_progress(run) -> CompilationRunResponse:
response = CompilationRunResponse.model_validate(run)
p = compilation_progress.get_progress(str(run.connection_id))
if p is not None and run.status in ("queued", "running"):
response.progress = CompilationProgressResponse(
total=p.total,
completed=p.completed,
stage=p.stage,
status=p.status,
error=p.error,
)
return response


@router.post(
"/connections/{connection_id}/compilation/runs",
response_model=CompilationRunResponse,
status_code=202,
)
async def start_compilation(
connection_id: uuid.UUID,
body: CompilationRunCreate,
ctx: AuthContext = Depends(require_connection_write),
db: AsyncSession = Depends(get_db),
):
try:
run = await compilation_service.start_run(db, connection_id, ctx, options=body.model_dump())
except ValueError as exc:
raise HTTPException(status_code=409, detail=str(exc)) from exc
return _with_progress(run)


@router.get(
"/connections/{connection_id}/compilation/runs",
response_model=list[CompilationRunResponse],
)
async def list_compilation_runs(
connection_id: uuid.UUID,
ctx: AuthContext = Depends(require_connection_read),
db: AsyncSession = Depends(get_db),
):
runs = await compilation_service.list_runs(db, connection_id, ctx)
return [_with_progress(run) for run in runs]


@router.get(
"/connections/{connection_id}/compilation/runs/{run_id}",
response_model=CompilationRunResponse,
)
async def get_compilation_run(
connection_id: uuid.UUID,
run_id: uuid.UUID,
ctx: AuthContext = Depends(require_connection_read),
db: AsyncSession = Depends(get_db),
):
run = await compilation_service.get_run(db, run_id, ctx)
return _with_progress(run)


@router.get(
"/connections/{connection_id}/compilation/findings",
response_model=list[CompilationFindingResponse],
)
async def list_compilation_findings(
connection_id: uuid.UUID,
status: str | None = None,
kind: str | None = None,
ctx: AuthContext = Depends(require_connection_read),
db: AsyncSession = Depends(get_db),
):
return await compilation_service.list_findings(db, connection_id, ctx, status, kind)


@router.post(
"/connections/{connection_id}/compilation/findings/{finding_id}/accept",
response_model=CompilationFindingResponse,
)
async def accept_finding(
connection_id: uuid.UUID,
finding_id: uuid.UUID,
ctx: AuthContext = Depends(require_connection_write),
db: AsyncSession = Depends(get_db),
):
try:
return await compilation_service.accept_finding(db, finding_id, ctx)
except ValueError as exc:
raise HTTPException(status_code=409, detail=str(exc)) from exc


@router.post(
"/connections/{connection_id}/compilation/findings/{finding_id}/dismiss",
response_model=CompilationFindingResponse,
)
async def dismiss_finding(
connection_id: uuid.UUID,
finding_id: uuid.UUID,
ctx: AuthContext = Depends(require_connection_write),
db: AsyncSession = Depends(get_db),
):
try:
return await compilation_service.dismiss_finding(db, finding_id, ctx)
except ValueError as exc:
raise HTTPException(status_code=409, detail=str(exc)) from exc


@router.post(
"/connections/{connection_id}/compilation/findings/bulk",
response_model=BulkReviewResponse,
)
async def bulk_review_findings(
connection_id: uuid.UUID,
body: BulkReviewRequest,
ctx: AuthContext = Depends(require_connection_write),
db: AsyncSession = Depends(get_db),
):
result = await compilation_service.bulk_review(db, body.finding_ids, body.action, ctx)
return BulkReviewResponse(**result)
2 changes: 2 additions & 0 deletions backend/app/api/v1/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
audit,
auth,
catalog,
compilation,
connections,
dashboards,
dictionary,
Expand Down Expand Up @@ -43,6 +44,7 @@
api_router.include_router(query_history.router)
api_router.include_router(knowledge.router)
api_router.include_router(catalog.router)
api_router.include_router(compilation.router)
api_router.include_router(audit.router)
api_router.include_router(schedules.router)
api_router.include_router(policies.router)
Expand Down
62 changes: 62 additions & 0 deletions backend/app/api/v1/schemas/compilation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import uuid
from datetime import datetime

from pydantic import BaseModel, ConfigDict, Field


class CompilationRunCreate(BaseModel):
llm_enabled: bool = True
min_confidence: float = Field(default=0.5, ge=0.0, le=1.0)
# Eval mode: pretend declared FKs don't exist so join inference is exercised.
ignore_declared_fks: bool = False


class CompilationProgressResponse(BaseModel):
total: int
completed: int
stage: str
status: str
error: str | None = None


class CompilationRunResponse(BaseModel):
model_config = ConfigDict(from_attributes=True)

id: uuid.UUID
connection_id: uuid.UUID
status: str
options: dict
stats: dict
error: str | None
started_at: datetime | None
finished_at: datetime | None
created_at: datetime
progress: CompilationProgressResponse | None = None


class CompilationFindingResponse(BaseModel):
model_config = ConfigDict(from_attributes=True)

id: uuid.UUID
run_id: uuid.UUID
connection_id: uuid.UUID
kind: str
title: str
payload: dict
evidence: list
confidence: float
status: str
created_entity_type: str | None
created_entity_id: uuid.UUID | None
reviewed_at: datetime | None
created_at: datetime


class BulkReviewRequest(BaseModel):
finding_ids: list[uuid.UUID] = Field(min_length=1, max_length=500)
action: str = Field(pattern="^(accept|dismiss)$")


class BulkReviewResponse(BaseModel):
succeeded: int
failed: int
3 changes: 3 additions & 0 deletions backend/app/db/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from app.db.models.artifact_dependency import ArtifactDependency
from app.db.models.audit_event import AuditEvent
from app.db.models.chart import Chart
from app.db.models.compilation import CompilationFinding, CompilationRun
from app.db.models.connection import DatabaseConnection
from app.db.models.cost_attribution import CostAttribution
from app.db.models.dashboard import Dashboard
Expand Down Expand Up @@ -51,4 +52,6 @@
"Schedule",
"DataPolicy",
"CostAttribution",
"CompilationRun",
"CompilationFinding",
]
Loading
Loading