Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion api/app.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from contextlib import asynccontextmanager
from pathlib import Path

Expand All @@ -20,7 +21,8 @@
@asynccontextmanager
async def lifespan(app: Litestar):
await open_pool()
await ensure_schema()
if os.getenv("OSMSG_SKIP_SCHEMA_ENSURE", "").lower() not in {"1", "true", "yes"}:
await ensure_schema()
try:
yield
finally:
Expand Down
3 changes: 3 additions & 0 deletions api/pg_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
geom GEOMETRY(POLYGON)
);
CREATE INDEX IF NOT EXISTS idx_changesets_created_at ON changesets(created_at);
CREATE INDEX IF NOT EXISTS idx_changesets_hashtags ON changesets USING GIN (hashtags);
CREATE INDEX IF NOT EXISTS idx_changesets_editor ON changesets(editor);
CREATE INDEX IF NOT EXISTS idx_changesets_geom ON changesets USING GIST (geom);
CREATE TABLE IF NOT EXISTS changeset_stats (
changeset_id BIGINT NOT NULL REFERENCES changesets(changeset_id),
Expand All @@ -33,6 +35,7 @@
PRIMARY KEY (seq_id, changeset_id)
);
CREATE INDEX IF NOT EXISTS idx_changeset_stats_uid ON changeset_stats(uid);
CREATE INDEX IF NOT EXISTS idx_changeset_stats_changeset_id ON changeset_stats(changeset_id);
CREATE TABLE IF NOT EXISTS state (
source_url TEXT PRIMARY KEY,
last_seq BIGINT NOT NULL,
Expand Down
185 changes: 185 additions & 0 deletions api/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@

from .db import get_pool


def _map_changes_expr(alias: str = "st") -> str:
return f"""
{alias}.nodes_created + {alias}.nodes_modified + {alias}.nodes_deleted +
{alias}.ways_created + {alias}.ways_modified + {alias}.ways_deleted +
{alias}.rels_created + {alias}.rels_modified + {alias}.rels_deleted
"""


_TAG_CTES = """,
tag_agg AS (
SELECT
Expand Down Expand Up @@ -136,6 +145,119 @@ def _user_stats_sql(*, filter_dates: bool, filter_hashtags: bool, include_tags:
"""


def _changeset_filters_sql(*, filter_dates: bool, filter_hashtags: bool = False) -> tuple[str, int]:
n = 1
filters: list[str] = []
if filter_dates:
filters.append(f"cs.created_at >= ${n}")
n += 1
filters.append(f"cs.created_at < ${n}")
n += 1
if filter_hashtags:
filters.append(f"cs.hashtags && ${n}::TEXT[]")
n += 1
where_sql = f"WHERE {' AND '.join(filters)}" if filters else ""
return where_sql, n


def _hashtag_stats_sql(*, filter_dates: bool, filter_hashtags: bool) -> str:
where_sql, n = _changeset_filters_sql(filter_dates=filter_dates, filter_hashtags=filter_hashtags)
limit_param = f"${n}"
offset_param = f"${n + 1}"
map_changes = _map_changes_expr()
return f"""
WITH hashtag_scope AS (
SELECT
ht.hashtag,
st.uid,
st.changeset_id,
({map_changes}) AS map_changes
FROM changesets cs
JOIN changeset_stats st ON st.changeset_id = cs.changeset_id
CROSS JOIN LATERAL UNNEST(cs.hashtags) AS ht(hashtag)
{where_sql}
),
hashtag_totals AS (
SELECT
hashtag,
COUNT(DISTINCT changeset_id) AS changesets,
COUNT(DISTINCT uid) AS users,
COALESCE(SUM(map_changes), 0) AS map_changes
FROM hashtag_scope
GROUP BY hashtag
)
SELECT
hashtag,
changesets,
users,
map_changes,
ROW_NUMBER() OVER (ORDER BY map_changes DESC, hashtag ASC) AS rank
FROM hashtag_totals
ORDER BY map_changes DESC, hashtag ASC
LIMIT {limit_param} OFFSET {offset_param}
"""


def _hashtag_trends_sql(*, filter_hashtags: bool) -> str:
where_sql, n = _changeset_filters_sql(filter_dates=True, filter_hashtags=filter_hashtags)
interval_param = f"${n}"
limit_param = f"${n + 1}"
offset_param = f"${n + 2}"
map_changes = _map_changes_expr()
return f"""
SELECT
DATE_TRUNC({interval_param}, cs.created_at) AS period_start,
ht.hashtag,
COUNT(DISTINCT st.changeset_id) AS changesets,
COUNT(DISTINCT st.uid) AS users,
COALESCE(SUM({map_changes}), 0) AS map_changes
FROM changesets cs
JOIN changeset_stats st ON st.changeset_id = cs.changeset_id
CROSS JOIN LATERAL UNNEST(cs.hashtags) AS ht(hashtag)
{where_sql}
GROUP BY period_start, ht.hashtag
ORDER BY period_start ASC, map_changes DESC, ht.hashtag ASC
LIMIT {limit_param} OFFSET {offset_param}
"""


def _editor_stats_sql(*, filter_dates: bool) -> str:
where_sql, n = _changeset_filters_sql(filter_dates=filter_dates)
limit_param = f"${n}"
offset_param = f"${n + 1}"
map_changes = _map_changes_expr()
return f"""
WITH editor_scope AS (
SELECT
COALESCE(NULLIF(cs.editor, ''), 'unknown') AS editor,
st.uid,
st.changeset_id,
({map_changes}) AS map_changes
FROM changesets cs
JOIN changeset_stats st ON st.changeset_id = cs.changeset_id
{where_sql}
),
editor_totals AS (
SELECT
editor,
COUNT(DISTINCT changeset_id) AS changesets,
COUNT(DISTINCT uid) AS users,
COALESCE(SUM(map_changes), 0) AS map_changes
FROM editor_scope
GROUP BY editor
)
SELECT
editor,
changesets,
users,
map_changes,
ROW_NUMBER() OVER (ORDER BY map_changes DESC, editor ASC) AS rank
FROM editor_totals
ORDER BY map_changes DESC, editor ASC
LIMIT {limit_param} OFFSET {offset_param}
"""


async def fetch_state() -> dict[str, Any] | None:
# last_ts/last_seq come from the worst-lagging source (slowest source bounds real freshness);
# updated_at is the most recent heartbeat across all sources (any tick proves the worker is alive).
Expand Down Expand Up @@ -175,3 +297,66 @@ async def fetch_user_stats(
async with get_pool().acquire() as conn:
rows = await conn.fetch(sql, *params)
return [dict(row) for row in rows]


async def fetch_hashtag_stats(
*,
start: datetime | None = None,
end: datetime | None = None,
hashtag: list[str] | None = None,
limit: int = 100,
offset: int = 0,
) -> list[dict[str, Any]]:
filter_dates = start is not None and end is not None
filter_hashtags = bool(hashtag)
sql = _hashtag_stats_sql(filter_dates=filter_dates, filter_hashtags=filter_hashtags)
params: list[Any] = []
if filter_dates:
params.extend([start, end])
if filter_hashtags:
params.append(hashtag)
params.extend([limit, offset])

async with get_pool().acquire() as conn:
rows = await conn.fetch(sql, *params)
return [dict(row) for row in rows]


async def fetch_hashtag_trends(
*,
start: datetime,
end: datetime,
interval: str,
hashtag: list[str] | None = None,
limit: int = 1000,
offset: int = 0,
) -> list[dict[str, Any]]:
filter_hashtags = bool(hashtag)
sql = _hashtag_trends_sql(filter_hashtags=filter_hashtags)
params: list[Any] = [start, end]
if filter_hashtags:
params.append(hashtag)
params.extend([interval, limit, offset])

async with get_pool().acquire() as conn:
rows = await conn.fetch(sql, *params)
return [dict(row) for row in rows]


async def fetch_editor_stats(
*,
start: datetime | None = None,
end: datetime | None = None,
limit: int = 100,
offset: int = 0,
) -> list[dict[str, Any]]:
filter_dates = start is not None and end is not None
sql = _editor_stats_sql(filter_dates=filter_dates)
params: list[Any] = []
if filter_dates:
params.extend([start, end])
params.extend([limit, offset])

async with get_pool().acquire() as conn:
rows = await conn.fetch(sql, *params)
return [dict(row) for row in rows]
133 changes: 124 additions & 9 deletions api/routers/v1.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
from datetime import UTC, datetime
from datetime import UTC, datetime, timedelta
from typing import Annotated

from litestar import Controller, Router, get
from litestar.exceptions import HTTPException
from litestar.params import Parameter

from ..queries import fetch_user_stats
from ..schemas import UserStat, UserStatsResponse
from ..queries import fetch_editor_stats, fetch_hashtag_stats, fetch_hashtag_trends, fetch_user_stats
from ..schemas import (
EditorStat,
EditorStatsResponse,
HashtagStat,
HashtagStatsResponse,
HashtagTrend,
UserStat,
UserStatsResponse,
)

TREND_INTERVALS = {"day", "week", "month"}


def normalize_hashtags(hashtag: list[str] | None) -> list[str] | None:
Expand All @@ -27,6 +37,22 @@ def normalize_hashtags(hashtag: list[str] | None) -> list[str] | None:
return normalized or None


def resolve_optional_window(start: datetime | None, end: datetime | None) -> tuple[datetime | None, datetime | None]:
start = start or (datetime.min.replace(tzinfo=UTC) if end else None)
end = end or (datetime.now(tz=UTC) if start else None)
if start and end and start >= end:
raise HTTPException(status_code=400, detail="start must be before end")
return start, end


def resolve_required_window(start: datetime | None, end: datetime | None) -> tuple[datetime, datetime]:
end = end or datetime.now(tz=UTC)
start = start or (end - timedelta(days=30))
if start >= end:
raise HTTPException(status_code=400, detail="start must be before end")
return start, end


class StatsController(Controller):
path = "/stats"

Expand All @@ -47,11 +73,7 @@ async def get_user_stats(
limit: Annotated[int, Parameter(ge=1, le=1000, description="Page size (1–1000).")] = 100,
offset: Annotated[int, Parameter(ge=0, description="Page offset.")] = 0,
) -> UserStatsResponse:
start = start or (datetime.min.replace(tzinfo=UTC) if end else None)
end = end or (datetime.now(tz=UTC) if start else None)
if start and end and start >= end:
raise HTTPException(status_code=400, detail="start must be before end")

start, end = resolve_optional_window(start, end)
normalized_hashtag = normalize_hashtags(hashtag)
rows = await fetch_user_stats(
start=start,
Expand All @@ -74,4 +96,97 @@ async def get_user_stats(
)


v1_router = Router(path="/api/v1", route_handlers=[StatsController])
class HashtagStatsController(Controller):
path = "/hashtag-stats"

@get()
async def get_hashtag_stats(
self,
start: Annotated[
datetime | None,
Parameter(description="Inclusive UTC lower bound (ISO 8601). Defaults to 30 days before end."),
] = None,
end: Annotated[
datetime | None,
Parameter(description="Exclusive UTC upper bound (ISO 8601). Defaults to now."),
] = None,
hashtag: Annotated[
list[str] | None, Parameter(description="Optional hashtags to limit the leaderboard to. Repeatable.")
] = None,
interval: Annotated[str, Parameter(description="Trend bucket: day, week, or month.")] = "day",
limit: Annotated[int, Parameter(ge=1, le=1000, description="Page size (1-1000).")] = 100,
offset: Annotated[int, Parameter(ge=0, description="Page offset.")] = 0,
) -> HashtagStatsResponse:
if interval not in TREND_INTERVALS:
raise HTTPException(status_code=400, detail="interval must be one of: day, week, month")

start, end = resolve_required_window(start, end)
normalized_hashtag = normalize_hashtags(hashtag)
hashtag_rows = await fetch_hashtag_stats(
start=start,
end=end,
hashtag=normalized_hashtag,
limit=limit,
offset=offset,
)
trend_rows = await fetch_hashtag_trends(
start=start,
end=end,
interval=interval,
hashtag=normalized_hashtag,
limit=limit,
offset=offset,
)
hashtags = [HashtagStat(**row) for row in hashtag_rows]
trends = [HashtagTrend(**row) for row in trend_rows]
return HashtagStatsResponse(
count=len(hashtags),
start=start,
end=end,
hashtag=normalized_hashtag,
interval=interval,
limit=limit,
offset=offset,
hashtags=hashtags,
trends=trends,
)


class EditorStatsController(Controller):
path = "/editor-stats"

@get()
async def get_editor_stats(
self,
start: Annotated[
datetime | None, Parameter(description="Inclusive UTC lower bound (ISO 8601). If omitted, no lower bound.")
] = None,
end: Annotated[
datetime | None,
Parameter(description="Exclusive UTC upper bound (ISO 8601). Defaults to now if start is set."),
] = None,
limit: Annotated[int, Parameter(ge=1, le=1000, description="Page size (1-1000).")] = 100,
offset: Annotated[int, Parameter(ge=0, description="Page offset.")] = 0,
) -> EditorStatsResponse:
start, end = resolve_optional_window(start, end)
rows = await fetch_editor_stats(
start=start,
end=end,
limit=limit,
offset=offset,
)
editors = [EditorStat(**row) for row in rows]
return EditorStatsResponse(
count=len(editors),
start=start,
end=end,
limit=limit,
offset=offset,
editors=editors,
)


v1_router = Router(
path="/api/v1",
route_handlers=[StatsController, HashtagStatsController, EditorStatsController],
)
Loading
Loading