Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
cache: pip

- name: Install dependencies
run: pip install -e ".[llm,dev,observability,lineage]"
run: pip install -e ".[llm,dev,observability,lineage,scheduling]"

# Gating: the test suite must pass before any Phase 0+ refactor lands.
- name: Tests
Expand Down
74 changes: 74 additions & 0 deletions backend/alembic/versions/009_audit_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""Audit events (Phase 4 — Milestone 1)

Revision ID: 009
Revises: 008
Create Date: 2026-06-08

Adds ``audit_events`` — an append-only log of security- and governance-relevant
actions (login, connection CRUD, credential rotation, introspection, query
generated/executed/blocked, metric certified, knowledge imported). Written
fire-and-forget so auditing never breaks the audited action. Org-scoped and
exportable. ``actor_id`` / ``workspace_id`` are nullable so system-driven and
pre-auth events can still be recorded.
"""

from collections.abc import Sequence

import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import JSONB, UUID

from alembic import op

# revision identifiers, used by Alembic.
revision: str = "009"
down_revision: str = "008"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None


def upgrade() -> None:
op.create_table(
"audit_events",
sa.Column("id", UUID(as_uuid=True), primary_key=True),
sa.Column(
"organization_id",
UUID(as_uuid=True),
sa.ForeignKey("organizations.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("event_type", sa.String(64), nullable=False),
sa.Column(
"actor_id",
UUID(as_uuid=True),
sa.ForeignKey("users.id", ondelete="SET NULL"),
nullable=True,
),
sa.Column(
"workspace_id",
UUID(as_uuid=True),
sa.ForeignKey("teams.id", ondelete="SET NULL"),
nullable=True,
),
sa.Column("payload", JSONB, nullable=False, server_default=sa.text("'{}'::jsonb")),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.func.now(),
nullable=False,
),
)
op.create_index("ix_audit_events_event_type", "audit_events", ["event_type"])
op.create_index("ix_audit_events_created_at", "audit_events", ["created_at"])
# Primary access pattern: an org's events, newest first.
op.create_index(
"ix_audit_events_org_created",
"audit_events",
["organization_id", "created_at"],
)


def downgrade() -> None:
op.drop_index("ix_audit_events_org_created", table_name="audit_events")
op.drop_index("ix_audit_events_created_at", table_name="audit_events")
op.drop_index("ix_audit_events_event_type", table_name="audit_events")
op.drop_table("audit_events")
80 changes: 80 additions & 0 deletions backend/alembic/versions/010_schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""Scheduled reports (Phase 4 — Milestone 2)

Revision ID: 010
Revises: 009
Create Date: 2026-06-08

Adds ``schedules`` — recurring delivery of a saved query or dashboard on a cron
schedule over a notification channel (email/Slack/log), with optional
alert-on-threshold. Workspace-scoped like dashboards. The scheduler claims due
rows on ``next_run_at``.
"""

from collections.abc import Sequence

import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import JSONB, UUID

from alembic import op

# revision identifiers, used by Alembic.
revision: str = "010"
down_revision: str = "009"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None


def upgrade() -> None:
op.create_table(
"schedules",
sa.Column("id", UUID(as_uuid=True), primary_key=True),
sa.Column(
"organization_id",
UUID(as_uuid=True),
sa.ForeignKey("organizations.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column(
"workspace_id",
UUID(as_uuid=True),
sa.ForeignKey("teams.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column(
"owner_id",
UUID(as_uuid=True),
sa.ForeignKey("users.id", ondelete="SET NULL"),
nullable=True,
),
sa.Column("name", sa.String(255), nullable=False),
sa.Column("target_type", sa.String(20), nullable=False),
sa.Column("target_id", UUID(as_uuid=True), nullable=False),
sa.Column("cron", sa.String(120), nullable=False),
sa.Column("channel", sa.String(20), nullable=False, server_default="email"),
sa.Column("recipients", JSONB, nullable=False, server_default=sa.text("'[]'::jsonb")),
sa.Column("params", JSONB, nullable=False, server_default=sa.text("'{}'::jsonb")),
sa.Column("threshold", JSONB, nullable=True),
sa.Column(
"only_on_threshold", sa.Boolean, nullable=False, server_default=sa.text("false")
),
sa.Column("enabled", sa.Boolean, nullable=False, server_default=sa.text("true")),
sa.Column("next_run_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("last_run_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("last_status", sa.String(20), nullable=True),
sa.Column("last_error", sa.Text, nullable=True),
sa.Column(
"created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False
),
sa.Column(
"updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False
),
)
# The scheduler scans for enabled, due rows ordered by next_run_at.
op.create_index("ix_schedules_next_run_at", "schedules", ["next_run_at"])
op.create_index("ix_schedules_workspace_id", "schedules", ["workspace_id"])


def downgrade() -> None:
op.drop_index("ix_schedules_workspace_id", table_name="schedules")
op.drop_index("ix_schedules_next_run_at", table_name="schedules")
op.drop_table("schedules")
68 changes: 68 additions & 0 deletions backend/alembic/versions/011_data_policies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""Data policies (Phase 4 — Milestone 3)

Revision ID: 011
Revises: 010
Create Date: 2026-06-08

Adds ``data_policies`` — governance rules enforced before a query reaches the
connector: role-scoped row/runtime caps, allow/block table lists, blocked
columns, PII column masking, and row-level filters. Connection-scoped like the
semantic layer.
"""

from collections.abc import Sequence

import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import JSONB, UUID

from alembic import op

# revision identifiers, used by Alembic.
revision: str = "011"
down_revision: str = "010"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None


def upgrade() -> None:
op.create_table(
"data_policies",
sa.Column("id", UUID(as_uuid=True), primary_key=True),
sa.Column(
"organization_id",
UUID(as_uuid=True),
sa.ForeignKey("organizations.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column(
"connection_id",
UUID(as_uuid=True),
sa.ForeignKey("database_connections.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("name", sa.String(255), nullable=False),
sa.Column("enabled", sa.Boolean, nullable=False, server_default=sa.text("true")),
sa.Column("priority", sa.Integer, nullable=False, server_default=sa.text("100")),
sa.Column(
"applies_to_roles", JSONB, nullable=False, server_default=sa.text("'[]'::jsonb")
),
sa.Column("max_rows", sa.Integer, nullable=True),
sa.Column("max_runtime_seconds", sa.Integer, nullable=True),
sa.Column("allowed_tables", JSONB, nullable=False, server_default=sa.text("'[]'::jsonb")),
sa.Column("blocked_tables", JSONB, nullable=False, server_default=sa.text("'[]'::jsonb")),
sa.Column("blocked_columns", JSONB, nullable=False, server_default=sa.text("'[]'::jsonb")),
sa.Column("masked_columns", JSONB, nullable=False, server_default=sa.text("'[]'::jsonb")),
sa.Column("row_filters", JSONB, nullable=False, server_default=sa.text("'{}'::jsonb")),
sa.Column(
"created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False
),
sa.Column(
"updated_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False
),
)
op.create_index("ix_data_policies_connection_id", "data_policies", ["connection_id"])


def downgrade() -> None:
op.drop_index("ix_data_policies_connection_id", table_name="data_policies")
op.drop_table("data_policies")
85 changes: 85 additions & 0 deletions backend/alembic/versions/012_cost_attribution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
"""Cost & usage attribution (Phase 4 — Milestone 4)

Revision ID: 012
Revises: 011
Create Date: 2026-06-08

Adds ``cost_attributions`` — per-execution usage + estimated cost, attributed to
a workspace/user/connection. Powers the usage analytics dashboards (slowest
queries, error rate, most-queried tables, cost per team). Populated best-effort
after each query execution.
"""

from collections.abc import Sequence

import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import JSONB, UUID

from alembic import op

# revision identifiers, used by Alembic.
revision: str = "012"
down_revision: str = "011"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None


def upgrade() -> None:
op.create_table(
"cost_attributions",
sa.Column("id", UUID(as_uuid=True), primary_key=True),
sa.Column(
"organization_id",
UUID(as_uuid=True),
sa.ForeignKey("organizations.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column(
"workspace_id",
UUID(as_uuid=True),
sa.ForeignKey("teams.id", ondelete="SET NULL"),
nullable=True,
),
sa.Column(
"connection_id",
UUID(as_uuid=True),
sa.ForeignKey("database_connections.id", ondelete="SET NULL"),
nullable=True,
),
sa.Column(
"user_id",
UUID(as_uuid=True),
sa.ForeignKey("users.id", ondelete="SET NULL"),
nullable=True,
),
sa.Column(
"query_execution_id",
UUID(as_uuid=True),
sa.ForeignKey("query_executions.id", ondelete="SET NULL"),
nullable=True,
),
sa.Column("source_provider", sa.String(50), nullable=True),
sa.Column("status", sa.String(20), nullable=False, server_default="success"),
sa.Column("execution_time_ms", sa.Float, nullable=True),
sa.Column("row_count", sa.Integer, nullable=True),
sa.Column("scanned_bytes", sa.Integer, nullable=True),
sa.Column("slot_ms", sa.Integer, nullable=True),
sa.Column("dbu", sa.Float, nullable=True),
sa.Column("cost_usd", sa.Float, nullable=False, server_default="0"),
sa.Column("tables", JSONB, nullable=False, server_default=sa.text("'[]'::jsonb")),
sa.Column(
"created_at", sa.DateTime(timezone=True), server_default=sa.func.now(), nullable=False
),
)
op.create_index("ix_cost_attributions_created_at", "cost_attributions", ["created_at"])
op.create_index(
"ix_cost_attributions_org_created",
"cost_attributions",
["organization_id", "created_at"],
)


def downgrade() -> None:
op.drop_index("ix_cost_attributions_org_created", table_name="cost_attributions")
op.drop_index("ix_cost_attributions_created_at", table_name="cost_attributions")
op.drop_table("cost_attributions")
65 changes: 65 additions & 0 deletions backend/app/api/v1/endpoints/analytics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""Cost & usage analytics. Admin-only, org-scoped, windowed by ``days``."""

from datetime import UTC, datetime, timedelta

from fastapi import APIRouter, Depends, Query
from sqlalchemy.ext.asyncio import AsyncSession

from app.api.v1.schemas.analytics import (
CostByEntry,
SlowestQuery,
TableUsage,
UsageSummary,
)
from app.core.auth import AuthContext, get_org_context
from app.db.session import get_db
from app.services import cost_service

router = APIRouter(prefix="/analytics", tags=["analytics"])


def _since(days: int) -> datetime:
return datetime.now(UTC) - timedelta(days=days)


@router.get("/usage", response_model=UsageSummary)
async def usage(
days: int = Query(30, ge=1, le=365),
ctx: AuthContext = Depends(get_org_context),
db: AsyncSession = Depends(get_db),
):
ctx.require_role("admin")
return await cost_service.usage_summary(db, ctx.organization_id, _since(days))


@router.get("/cost", response_model=list[CostByEntry])
async def cost_by(
by: str = Query("workspace", pattern="^(workspace|user|connection)$"),
days: int = Query(30, ge=1, le=365),
ctx: AuthContext = Depends(get_org_context),
db: AsyncSession = Depends(get_db),
):
ctx.require_role("admin")
return await cost_service.cost_by(db, ctx.organization_id, by, _since(days))


@router.get("/slowest", response_model=list[SlowestQuery])
async def slowest(
days: int = Query(30, ge=1, le=365),
limit: int = Query(10, ge=1, le=100),
ctx: AuthContext = Depends(get_org_context),
db: AsyncSession = Depends(get_db),
):
ctx.require_role("admin")
return await cost_service.slowest_queries(db, ctx.organization_id, _since(days), limit)


@router.get("/tables", response_model=list[TableUsage])
async def tables(
days: int = Query(30, ge=1, le=365),
limit: int = Query(10, ge=1, le=100),
ctx: AuthContext = Depends(get_org_context),
db: AsyncSession = Depends(get_db),
):
ctx.require_role("admin")
return await cost_service.most_queried_tables(db, ctx.organization_id, _since(days), limit)
Loading
Loading