Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
cache: pip

- name: Install dependencies
run: pip install -e ".[llm,dev,observability]"
run: pip install -e ".[llm,dev,observability,lineage]"

# Gating: the test suite must pass before any Phase 0+ refactor lands.
- name: Tests
Expand Down
26 changes: 26 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,32 @@ product surface; all optional dependencies degrade gracefully).
- New optional dependency extra: `export` (`openpyxl`). Frontend adds `recharts` and
`react-grid-layout`.

### Added (Phase 3 - Discovery, catalog & trust)
- **Certification & semantic versioning** (migration `007`) — metrics, glossary terms, sample
queries, and saved queries gain a governed lifecycle (`draft → in_review → certified →
deprecated`), an integer `version`, and certification stamps (`certified_by`/`certified_at`).
Editors submit for review / revert; admins certify / deprecate. Certifying validates the
entity's SQL (read-only blocklist + a sqlglot parse).
- **Version history & changelog** — every content edit and status transition appends a
`SemanticVersion` snapshot, exposed at `.../{entity}/{id}/versions` with a field-level diff
helper; surfaced in the UI as a per-entity history timeline.
- **Lifecycle logic** centralized in `versioning_service.py` so all four entity types behave
identically; status transitions go through a single governed endpoint
(`POST .../{entity}/{id}/status`).
- **Data catalog** (`catalog_service.py`, `GET /connections/{id}/catalog/search` + `/facets`) — a
unified hybrid search across tables, columns, metrics, glossary, sample/saved queries, and
knowledge, reusing the existing pgvector embeddings + keyword scorer (no new full-text infra).
Certified items are boosted in ranking; facets by type, status, schema, and owner. New
`frontend/src/pages/CatalogPage.tsx` with search, facet sidebar, and a detail/lineage drawer.
- **Lightweight lineage** (migration `008`, `lineage_service.py`) — saved-query and metric SQL is
parsed with sqlglot into `artifact_dependencies` edges on create/update (best-effort; degrades
to a no-op if sqlglot is absent). Powers the per-artifact "what this touches" view
(`.../{entity}/{id}/lineage`) and the impact view "what depends on this table"
(`GET .../catalog/lineage?table=`).
- New optional dependency extra: `lineage` (`sqlglot`); installed in the backend image and in CI
so the lineage tests run (they `importorskip` past `sqlglot` when the extra is absent).
- **Deferred to a later milestone:** column profiling (null rate / distinct counts / sample values).

## [1.0.0] - 2026-06-04

First stable release: natural-language-to-SQL with a semantic metadata layer.
Expand Down
15 changes: 13 additions & 2 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ For manual seeding (if auto-setup disabled): `python backend/scripts/seed_ifrs9_
Run from `backend/`:

```bash
pip install -e ".[llm,dev,bigquery,databricks]" # Install all deps
pip install -e ".[llm,dev,bigquery,databricks,lineage]" # Install all deps (add export,observability,jobs as needed)
alembic upgrade head # Run migrations
uvicorn app.main:app --reload # Dev server on :8000
pytest # Run tests
Expand Down Expand Up @@ -233,7 +233,7 @@ dependencies degrade gracefully — the app boots without `structlog` /
- **Jobs** (`app/jobs/`): `JobQueue` ABC with `InProcessJobQueue` (asyncio, default) and `ArqJobQueue` (Redis). Jobs are registered by name in `registry.py`; `launch_background_embeddings` submits `"generate_embeddings"` through `get_job_queue()`. For arq, run a worker: `JOB_BACKEND=arq arq app.jobs.worker.WorkerSettings` (embedding progress then lives in the worker process).
- **Health** (`app/api/v1/endpoints/health.py`): `GET /health/live` (process) and `GET /health/ready` (DB + job queue + LLM provider, 503 on failure) for K8s probes.
- **LLM endpoints:** Azure OpenAI provider (`azure_openai`) added so the pipeline can run inside a customer VPC; registered in `provider_registry`.
- **Tests/CI:** unit tests in `backend/tests/` (no DB/LLM needed); `.github/workflows/ci.yml` runs pytest (gating) + ruff/mypy/frontend build (advisory until pre-existing lint debt is cleared). Optional deps: `pip install -e ".[observability,jobs]"`.
- **Tests/CI:** unit tests in `backend/tests/` (no DB/LLM needed); `.github/workflows/ci.yml` installs `.[llm,dev,observability,lineage]` and runs pytest (gating) + ruff/mypy/frontend build (advisory until pre-existing lint debt is cleared). The lineage tests need `sqlglot` (the `[lineage]` extra) and `pytest.importorskip` past it otherwise. Optional deps: `pip install -e ".[observability,jobs]"`.

## Identity & auth (Phase 1)

Expand All @@ -258,3 +258,14 @@ One-shot answers become saved, owned, re-runnable, shareable objects. Two milest
- **Export:** client-side CSV/JSON in the frontend; backend CSV/JSON/XLSX for saved queries (XLSX needs the optional `export` extra → `openpyxl`).
- **Endpoints:** `/connections/{id}/saved-queries` (+ `/run`, `/clone`, `/export`, `/charts`), `/dashboards` (+ `/tiles`, `/layout`, `/tiles/{id}/run`).
- **Frontend:** Recharts (`components/charts/ChartView.tsx`) for viz; `react-grid-layout` for the dashboard grid; shared typed `components/common/ParamInputs.tsx` for params/filters. Charts are managed inside the saved-query view (no separate Charts page). Note: the frontend container's anonymous `node_modules` volume means new deps (recharts, react-grid-layout) need `docker compose exec frontend npm install` or an image rebuild.

## Discovery, catalog & trust (Phase 3)

Makes the semantic layer discoverable and trustworthy. Two milestones; migrations `007` (certification + versioning) and `008` (catalog lineage). **Column profiling is deferred** to a later milestone.

- **Certification lifecycle** (`app/services/versioning_service.py`): metrics, glossary terms, sample queries, and saved queries carry `status` (`draft|in_review|certified|deprecated`), an integer `version`, and `certified_by_id`/`certified_at`. Transitions go through one governed endpoint per entity (`POST /connections/{id}/{entity}/{eid}/status`); the state machine (`_ALLOWED_TRANSITIONS`) and role gate (`_ROLE_FOR_TARGET`) live in the service — **editor** submits-for-review/reverts, **admin** certifies/deprecates. Certifying runs a lightweight SQL check (`check_sql_safety` + a sqlglot parse). One service handles all four entity types via `_SNAPSHOT_FIELDS` / `_SQL_FIELD` maps.
- **Versioning & changelog** (`SemanticVersion` model): every content edit (PUT → `record_edit`, bumps version) and status transition appends an append-only snapshot. Exposed at `GET .../{entity}/{eid}/versions` (+ `/{version}`); `versioning_service.diff` gives a field-level diff. UI: shared `frontend/src/components/common/{CertificationBadge,StatusActions,VersionHistory}.tsx`, wired into the Metrics/Glossary/SavedQueries pages.
- **Catalog search** (`app/services/catalog_service.py`, `app/api/v1/endpoints/catalog.py`): `GET /connections/{id}/catalog/search` runs a hybrid search across tables, columns, metrics, glossary, sample/saved queries, and knowledge — **reusing the existing pgvector embeddings + the keyword scorer** (`semantic/relevance_scorer.py`), no tsvector. Hits merge into a uniform `CatalogHit`; certified items are boosted (`rank_hits`). `GET .../catalog/facets` returns schemas/owners/tags/type+status counts. Connection-scoped via `require_connection_read`. Frontend: `pages/CatalogPage.tsx` (search + facet sidebar + detail/lineage drawer).
- **Lineage** (`app/services/lineage_service.py`, `ArtifactDependency` model): saved-query `pinned_sql` and metric `sql_expression` are parsed with **sqlglot** (optional `[lineage]` extra; lazy import, degrades to a no-op if absent) into table/column edges, recomputed on create/update (best-effort, never blocks the write). Per-artifact "what this touches" at `GET .../{saved-queries|metrics}/{id}/lineage`; impact view "what depends on this table" at `GET .../catalog/lineage?table=&column=`. Connector type → sqlglot dialect via `dialect_for`.
- **Endpoints:** `/connections/{id}/catalog/{search,facets,lineage}`, plus `/status`, `/versions`, `/versions/{v}`, and `/lineage` sub-resources on the metric/glossary/sample-query/saved-query routers.
- **Heads-up:** existing rows migrate to `status='draft'`, `version=1`. The saved-query PUT routes any `status` change through the governed lifecycle (no raw status writes). sqlglot is a new optional dep — install the `[lineage]` extra (or rebuild the backend image) for lineage to populate.
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ A full-stack application that translates natural language questions into SQL que
- **Saved queries** — name and pin a question + SQL with typed parameters (`{{region}}`); re-run, version, clone, and export (CSV/JSON/XLSX)
- **Charts & result caching** — visualize a saved query (line/bar/area/pie/scatter via Recharts); results are snapshotted to a Postgres cache so re-runs don't re-hit the warehouse
- **Dashboards** — compose saved queries into a shareable, draggable tile grid with dashboard-level filters that flow into every tile's SQL
- **Certification & versioning** — govern metrics, glossary, and saved queries through a `draft → in_review → certified → deprecated` lifecycle (editors submit, admins certify) with a per-entity version history and changelog
- **Data catalog** — hybrid search (embeddings + keyword) across tables, columns, metrics, glossary, and knowledge, with facets and certified-first ranking
- **Lineage** — sqlglot parses saved-query/metric SQL to show what each touches and what depends on a given table (impact view)
- **Production hardening** — rate limiting, async job queue, OpenTelemetry tracing, structured logging, health probes


Expand Down Expand Up @@ -354,8 +357,8 @@ cd backend
python3.12 -m venv .venv
source .venv/bin/activate

# Install dependencies
pip install -e ".[llm,dev]"
# Install dependencies (add `lineage` for sqlglot-based catalog lineage)
pip install -e ".[llm,dev,lineage]"

# Start PostgreSQL with pgvector (must be running on localhost:5432)
# Run migrations
Expand Down
2 changes: 1 addition & 1 deletion backend/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
rm -rf /var/lib/apt/lists/*

COPY . .
RUN pip install --no-cache-dir -e ".[llm,dev,bigquery,databricks,observability,jobs]"
RUN pip install --no-cache-dir -e ".[llm,dev,bigquery,databricks,observability,jobs,lineage]"


EXPOSE 8000
Expand Down
122 changes: 122 additions & 0 deletions backend/alembic/versions/007_certification_and_versioning.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
"""Certification + semantic versioning (Phase 3 — Milestone 1)

Revision ID: 007
Revises: 006
Create Date: 2026-06-08

Adds a trust/lifecycle layer to the semantic objects. Metrics, glossary terms,
sample queries, and saved queries gain a ``status``
(draft|in_review|certified|deprecated), an integer ``version``, and certification
stamps (``certified_by_id`` / ``certified_at``). The new ``semantic_versions``
table is an append-only changelog: a snapshot of an entity at each version with
the reviewer and reason, written on every content edit and status transition.

Existing rows default to status='draft', version=1 (saved_queries already carry
status + version from migration 005, so only the certification stamps are added
there).
"""

from collections.abc import Sequence

import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import JSONB, UUID

from alembic import op

# revision identifiers, used by Alembic.
revision: str = "007"
down_revision: str = "006"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None

# Tables getting the full lifecycle column set (status + version + cert stamps).
_LIFECYCLE_TABLES = ("metric_definitions", "glossary_terms", "sample_queries")


def upgrade() -> None:
for table in _LIFECYCLE_TABLES:
op.add_column(
table,
sa.Column("status", sa.String(20), nullable=False, server_default=sa.text("'draft'")),
)
op.add_column(
table,
sa.Column("version", sa.Integer, nullable=False, server_default=sa.text("1")),
)
op.add_column(
table,
sa.Column(
"certified_by_id",
UUID(as_uuid=True),
sa.ForeignKey("users.id", ondelete="SET NULL"),
nullable=True,
),
)
op.add_column(
table,
sa.Column("certified_at", sa.DateTime(timezone=True), nullable=True),
)

# saved_queries already has status + version (migration 005); add cert stamps.
op.add_column(
"saved_queries",
sa.Column(
"certified_by_id",
UUID(as_uuid=True),
sa.ForeignKey("users.id", ondelete="SET NULL"),
nullable=True,
),
)
op.add_column(
"saved_queries",
sa.Column("certified_at", sa.DateTime(timezone=True), nullable=True),
)

op.create_table(
"semantic_versions",
sa.Column("id", UUID(as_uuid=True), primary_key=True),
sa.Column(
"organization_id",
UUID(as_uuid=True),
sa.ForeignKey("organizations.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column(
"connection_id",
UUID(as_uuid=True),
sa.ForeignKey("database_connections.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("entity_type", sa.String(20), nullable=False),
sa.Column("entity_id", UUID(as_uuid=True), nullable=False),
sa.Column("version", sa.Integer, nullable=False),
sa.Column("status", sa.String(20), nullable=False),
sa.Column("snapshot", JSONB, nullable=False),
sa.Column("change_reason", sa.Text, nullable=True),
sa.Column(
"changed_by_id",
UUID(as_uuid=True),
sa.ForeignKey("users.id", ondelete="SET NULL"),
nullable=True,
),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
)
op.create_index(
"ix_semantic_versions_entity",
"semantic_versions",
["entity_type", "entity_id", "version"],
)


def downgrade() -> None:
op.drop_index("ix_semantic_versions_entity", table_name="semantic_versions")
op.drop_table("semantic_versions")

op.drop_column("saved_queries", "certified_at")
op.drop_column("saved_queries", "certified_by_id")

for table in _LIFECYCLE_TABLES:
op.drop_column(table, "certified_at")
op.drop_column(table, "certified_by_id")
op.drop_column(table, "version")
op.drop_column(table, "status")
79 changes: 79 additions & 0 deletions backend/alembic/versions/008_catalog_lineage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""Catalog lineage (Phase 3 — Milestone 2)

Revision ID: 008
Revises: 007
Create Date: 2026-06-08

Adds ``artifact_dependencies`` — lineage edges recording which tables/columns a
saved query or metric references, parsed from its SQL via sqlglot. Powers the
catalog impact view ("what depends on this table") and the per-artifact
"what this touches" view. Names are stored denormalized; table_id/column_id are
resolved best-effort against the schema cache.
"""

from collections.abc import Sequence

import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID

from alembic import op

# revision identifiers, used by Alembic.
revision: str = "008"
down_revision: str = "007"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None


def upgrade() -> None:
op.create_table(
"artifact_dependencies",
sa.Column("id", UUID(as_uuid=True), primary_key=True),
sa.Column(
"organization_id",
UUID(as_uuid=True),
sa.ForeignKey("organizations.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column(
"connection_id",
UUID(as_uuid=True),
sa.ForeignKey("database_connections.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("artifact_type", sa.String(20), nullable=False),
sa.Column("artifact_id", UUID(as_uuid=True), nullable=False),
sa.Column("ref_kind", sa.String(10), nullable=False),
sa.Column("schema_name", sa.String(255), nullable=True),
sa.Column("table_name", sa.String(255), nullable=False),
sa.Column("column_name", sa.String(255), nullable=True),
sa.Column(
"table_id",
UUID(as_uuid=True),
sa.ForeignKey("cached_tables.id", ondelete="SET NULL"),
nullable=True,
),
sa.Column(
"column_id",
UUID(as_uuid=True),
sa.ForeignKey("cached_columns.id", ondelete="SET NULL"),
nullable=True,
),
sa.Column("created_at", sa.DateTime(timezone=True), server_default=sa.func.now()),
)
op.create_index(
"ix_artifact_dependencies_artifact",
"artifact_dependencies",
["artifact_type", "artifact_id"],
)
op.create_index(
"ix_artifact_dependencies_table",
"artifact_dependencies",
["connection_id", "table_name"],
)


def downgrade() -> None:
op.drop_index("ix_artifact_dependencies_table", table_name="artifact_dependencies")
op.drop_index("ix_artifact_dependencies_artifact", table_name="artifact_dependencies")
op.drop_table("artifact_dependencies")
71 changes: 71 additions & 0 deletions backend/app/api/v1/endpoints/catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import uuid

from fastapi import APIRouter, Depends, Query
from sqlalchemy.ext.asyncio import AsyncSession

from app.api.v1.deps import require_connection_read
from app.api.v1.schemas.catalog import (
CatalogFacetsResponse,
CatalogHitResponse,
LineageRefResponse,
)
from app.core.auth import AuthContext
from app.db.session import get_db
from app.services import catalog_service, lineage_service

router = APIRouter(tags=["catalog"])


@router.get(
"/connections/{connection_id}/catalog/search",
response_model=list[CatalogHitResponse],
)
async def catalog_search(
connection_id: uuid.UUID,
q: str = Query("", description="Search text"),
types: str | None = Query(None, description="Comma-separated hit types to include"),
status: str | None = Query(None, description="Filter by certification status"),
owner: str | None = Query(None, description="Filter by owner/creator id"),
schema: str | None = Query(None, description="Filter tables/columns by schema"),
limit: int = Query(50, ge=1, le=200),
_ctx: AuthContext = Depends(require_connection_read),
db: AsyncSession = Depends(get_db),
):
type_list = [t.strip() for t in types.split(",") if t.strip()] if types else None
return await catalog_service.search(
db,
connection_id,
q,
types=type_list,
status=status,
owner=owner,
schema=schema,
limit=limit,
)


@router.get(
"/connections/{connection_id}/catalog/facets",
response_model=CatalogFacetsResponse,
)
async def catalog_facets(
connection_id: uuid.UUID,
_ctx: AuthContext = Depends(require_connection_read),
db: AsyncSession = Depends(get_db),
):
return await catalog_service.facets(db, connection_id)


@router.get(
"/connections/{connection_id}/catalog/lineage",
response_model=list[LineageRefResponse],
)
async def catalog_lineage_impact(
connection_id: uuid.UUID,
table: str = Query(..., description="Table name to find dependents of"),
column: str | None = Query(None, description="Optional column name"),
_ctx: AuthContext = Depends(require_connection_read),
db: AsyncSession = Depends(get_db),
):
"""Impact view: which saved queries / metrics depend on a table (or column)."""
return await lineage_service.dependents_of(db, connection_id, table, column)
Loading
Loading