diff --git a/api/app.py b/api/app.py index 3940056..a37f38c 100644 --- a/api/app.py +++ b/api/app.py @@ -1,3 +1,4 @@ +import os from contextlib import asynccontextmanager from pathlib import Path @@ -20,7 +21,8 @@ @asynccontextmanager async def lifespan(app: Litestar): await open_pool() - await ensure_schema() + if os.getenv("OSMSG_SKIP_SCHEMA_ENSURE", "").lower() not in {"1", "true", "yes"}: + await ensure_schema() try: yield finally: diff --git a/api/pg_schema.py b/api/pg_schema.py index 178670a..1874322 100644 --- a/api/pg_schema.py +++ b/api/pg_schema.py @@ -13,6 +13,8 @@ geom GEOMETRY(POLYGON) ); CREATE INDEX IF NOT EXISTS idx_changesets_created_at ON changesets(created_at); +CREATE INDEX IF NOT EXISTS idx_changesets_hashtags ON changesets USING GIN (hashtags); +CREATE INDEX IF NOT EXISTS idx_changesets_editor ON changesets(editor); CREATE INDEX IF NOT EXISTS idx_changesets_geom ON changesets USING GIST (geom); CREATE TABLE IF NOT EXISTS changeset_stats ( changeset_id BIGINT NOT NULL REFERENCES changesets(changeset_id), @@ -33,6 +35,7 @@ PRIMARY KEY (seq_id, changeset_id) ); CREATE INDEX IF NOT EXISTS idx_changeset_stats_uid ON changeset_stats(uid); +CREATE INDEX IF NOT EXISTS idx_changeset_stats_changeset_id ON changeset_stats(changeset_id); CREATE TABLE IF NOT EXISTS state ( source_url TEXT PRIMARY KEY, last_seq BIGINT NOT NULL, diff --git a/api/queries.py b/api/queries.py index 4175c28..729e7b6 100644 --- a/api/queries.py +++ b/api/queries.py @@ -3,6 +3,15 @@ from .db import get_pool + +def _map_changes_expr(alias: str = "st") -> str: + return f""" + {alias}.nodes_created + {alias}.nodes_modified + {alias}.nodes_deleted + + {alias}.ways_created + {alias}.ways_modified + {alias}.ways_deleted + + {alias}.rels_created + {alias}.rels_modified + {alias}.rels_deleted + """ + + _TAG_CTES = """, tag_agg AS ( SELECT @@ -136,6 +145,119 @@ def _user_stats_sql(*, filter_dates: bool, filter_hashtags: bool, include_tags: """ +def _changeset_filters_sql(*, filter_dates: bool, filter_hashtags: bool = False) -> tuple[str, int]: + n = 1 + filters: list[str] = [] + if filter_dates: + filters.append(f"cs.created_at >= ${n}") + n += 1 + filters.append(f"cs.created_at < ${n}") + n += 1 + if filter_hashtags: + filters.append(f"cs.hashtags && ${n}::TEXT[]") + n += 1 + where_sql = f"WHERE {' AND '.join(filters)}" if filters else "" + return where_sql, n + + +def _hashtag_stats_sql(*, filter_dates: bool, filter_hashtags: bool) -> str: + where_sql, n = _changeset_filters_sql(filter_dates=filter_dates, filter_hashtags=filter_hashtags) + limit_param = f"${n}" + offset_param = f"${n + 1}" + map_changes = _map_changes_expr() + return f""" + WITH hashtag_scope AS ( + SELECT + ht.hashtag, + st.uid, + st.changeset_id, + ({map_changes}) AS map_changes + FROM changesets cs + JOIN changeset_stats st ON st.changeset_id = cs.changeset_id + CROSS JOIN LATERAL UNNEST(cs.hashtags) AS ht(hashtag) + {where_sql} + ), + hashtag_totals AS ( + SELECT + hashtag, + COUNT(DISTINCT changeset_id) AS changesets, + COUNT(DISTINCT uid) AS users, + COALESCE(SUM(map_changes), 0) AS map_changes + FROM hashtag_scope + GROUP BY hashtag + ) + SELECT + hashtag, + changesets, + users, + map_changes, + ROW_NUMBER() OVER (ORDER BY map_changes DESC, hashtag ASC) AS rank + FROM hashtag_totals + ORDER BY map_changes DESC, hashtag ASC + LIMIT {limit_param} OFFSET {offset_param} + """ + + +def _hashtag_trends_sql(*, filter_hashtags: bool) -> str: + where_sql, n = _changeset_filters_sql(filter_dates=True, filter_hashtags=filter_hashtags) + interval_param = f"${n}" + limit_param = f"${n + 1}" + offset_param = f"${n + 2}" + map_changes = _map_changes_expr() + return f""" + SELECT + DATE_TRUNC({interval_param}, cs.created_at) AS period_start, + ht.hashtag, + COUNT(DISTINCT st.changeset_id) AS changesets, + COUNT(DISTINCT st.uid) AS users, + COALESCE(SUM({map_changes}), 0) AS map_changes + FROM changesets cs + JOIN changeset_stats st ON st.changeset_id = cs.changeset_id + CROSS JOIN LATERAL UNNEST(cs.hashtags) AS ht(hashtag) + {where_sql} + GROUP BY period_start, ht.hashtag + ORDER BY period_start ASC, map_changes DESC, ht.hashtag ASC + LIMIT {limit_param} OFFSET {offset_param} + """ + + +def _editor_stats_sql(*, filter_dates: bool) -> str: + where_sql, n = _changeset_filters_sql(filter_dates=filter_dates) + limit_param = f"${n}" + offset_param = f"${n + 1}" + map_changes = _map_changes_expr() + return f""" + WITH editor_scope AS ( + SELECT + COALESCE(NULLIF(cs.editor, ''), 'unknown') AS editor, + st.uid, + st.changeset_id, + ({map_changes}) AS map_changes + FROM changesets cs + JOIN changeset_stats st ON st.changeset_id = cs.changeset_id + {where_sql} + ), + editor_totals AS ( + SELECT + editor, + COUNT(DISTINCT changeset_id) AS changesets, + COUNT(DISTINCT uid) AS users, + COALESCE(SUM(map_changes), 0) AS map_changes + FROM editor_scope + GROUP BY editor + ) + SELECT + editor, + changesets, + users, + map_changes, + ROW_NUMBER() OVER (ORDER BY map_changes DESC, editor ASC) AS rank + FROM editor_totals + ORDER BY map_changes DESC, editor ASC + LIMIT {limit_param} OFFSET {offset_param} + """ + + async def fetch_state() -> dict[str, Any] | None: # last_ts/last_seq come from the worst-lagging source (slowest source bounds real freshness); # updated_at is the most recent heartbeat across all sources (any tick proves the worker is alive). @@ -175,3 +297,66 @@ async def fetch_user_stats( async with get_pool().acquire() as conn: rows = await conn.fetch(sql, *params) return [dict(row) for row in rows] + + +async def fetch_hashtag_stats( + *, + start: datetime | None = None, + end: datetime | None = None, + hashtag: list[str] | None = None, + limit: int = 100, + offset: int = 0, +) -> list[dict[str, Any]]: + filter_dates = start is not None and end is not None + filter_hashtags = bool(hashtag) + sql = _hashtag_stats_sql(filter_dates=filter_dates, filter_hashtags=filter_hashtags) + params: list[Any] = [] + if filter_dates: + params.extend([start, end]) + if filter_hashtags: + params.append(hashtag) + params.extend([limit, offset]) + + async with get_pool().acquire() as conn: + rows = await conn.fetch(sql, *params) + return [dict(row) for row in rows] + + +async def fetch_hashtag_trends( + *, + start: datetime, + end: datetime, + interval: str, + hashtag: list[str] | None = None, + limit: int = 1000, + offset: int = 0, +) -> list[dict[str, Any]]: + filter_hashtags = bool(hashtag) + sql = _hashtag_trends_sql(filter_hashtags=filter_hashtags) + params: list[Any] = [start, end] + if filter_hashtags: + params.append(hashtag) + params.extend([interval, limit, offset]) + + async with get_pool().acquire() as conn: + rows = await conn.fetch(sql, *params) + return [dict(row) for row in rows] + + +async def fetch_editor_stats( + *, + start: datetime | None = None, + end: datetime | None = None, + limit: int = 100, + offset: int = 0, +) -> list[dict[str, Any]]: + filter_dates = start is not None and end is not None + sql = _editor_stats_sql(filter_dates=filter_dates) + params: list[Any] = [] + if filter_dates: + params.extend([start, end]) + params.extend([limit, offset]) + + async with get_pool().acquire() as conn: + rows = await conn.fetch(sql, *params) + return [dict(row) for row in rows] diff --git a/api/routers/v1.py b/api/routers/v1.py index bf91ede..29f64fd 100644 --- a/api/routers/v1.py +++ b/api/routers/v1.py @@ -1,12 +1,22 @@ -from datetime import UTC, datetime +from datetime import UTC, datetime, timedelta from typing import Annotated from litestar import Controller, Router, get from litestar.exceptions import HTTPException from litestar.params import Parameter -from ..queries import fetch_user_stats -from ..schemas import UserStat, UserStatsResponse +from ..queries import fetch_editor_stats, fetch_hashtag_stats, fetch_hashtag_trends, fetch_user_stats +from ..schemas import ( + EditorStat, + EditorStatsResponse, + HashtagStat, + HashtagStatsResponse, + HashtagTrend, + UserStat, + UserStatsResponse, +) + +TREND_INTERVALS = {"day", "week", "month"} def normalize_hashtags(hashtag: list[str] | None) -> list[str] | None: @@ -27,6 +37,22 @@ def normalize_hashtags(hashtag: list[str] | None) -> list[str] | None: return normalized or None +def resolve_optional_window(start: datetime | None, end: datetime | None) -> tuple[datetime | None, datetime | None]: + start = start or (datetime.min.replace(tzinfo=UTC) if end else None) + end = end or (datetime.now(tz=UTC) if start else None) + if start and end and start >= end: + raise HTTPException(status_code=400, detail="start must be before end") + return start, end + + +def resolve_required_window(start: datetime | None, end: datetime | None) -> tuple[datetime, datetime]: + end = end or datetime.now(tz=UTC) + start = start or (end - timedelta(days=30)) + if start >= end: + raise HTTPException(status_code=400, detail="start must be before end") + return start, end + + class StatsController(Controller): path = "/stats" @@ -47,11 +73,7 @@ async def get_user_stats( limit: Annotated[int, Parameter(ge=1, le=1000, description="Page size (1–1000).")] = 100, offset: Annotated[int, Parameter(ge=0, description="Page offset.")] = 0, ) -> UserStatsResponse: - start = start or (datetime.min.replace(tzinfo=UTC) if end else None) - end = end or (datetime.now(tz=UTC) if start else None) - if start and end and start >= end: - raise HTTPException(status_code=400, detail="start must be before end") - + start, end = resolve_optional_window(start, end) normalized_hashtag = normalize_hashtags(hashtag) rows = await fetch_user_stats( start=start, @@ -74,4 +96,97 @@ async def get_user_stats( ) -v1_router = Router(path="/api/v1", route_handlers=[StatsController]) +class HashtagStatsController(Controller): + path = "/hashtag-stats" + + @get() + async def get_hashtag_stats( + self, + start: Annotated[ + datetime | None, + Parameter(description="Inclusive UTC lower bound (ISO 8601). Defaults to 30 days before end."), + ] = None, + end: Annotated[ + datetime | None, + Parameter(description="Exclusive UTC upper bound (ISO 8601). Defaults to now."), + ] = None, + hashtag: Annotated[ + list[str] | None, Parameter(description="Optional hashtags to limit the leaderboard to. Repeatable.") + ] = None, + interval: Annotated[str, Parameter(description="Trend bucket: day, week, or month.")] = "day", + limit: Annotated[int, Parameter(ge=1, le=1000, description="Page size (1-1000).")] = 100, + offset: Annotated[int, Parameter(ge=0, description="Page offset.")] = 0, + ) -> HashtagStatsResponse: + if interval not in TREND_INTERVALS: + raise HTTPException(status_code=400, detail="interval must be one of: day, week, month") + + start, end = resolve_required_window(start, end) + normalized_hashtag = normalize_hashtags(hashtag) + hashtag_rows = await fetch_hashtag_stats( + start=start, + end=end, + hashtag=normalized_hashtag, + limit=limit, + offset=offset, + ) + trend_rows = await fetch_hashtag_trends( + start=start, + end=end, + interval=interval, + hashtag=normalized_hashtag, + limit=limit, + offset=offset, + ) + hashtags = [HashtagStat(**row) for row in hashtag_rows] + trends = [HashtagTrend(**row) for row in trend_rows] + return HashtagStatsResponse( + count=len(hashtags), + start=start, + end=end, + hashtag=normalized_hashtag, + interval=interval, + limit=limit, + offset=offset, + hashtags=hashtags, + trends=trends, + ) + + +class EditorStatsController(Controller): + path = "/editor-stats" + + @get() + async def get_editor_stats( + self, + start: Annotated[ + datetime | None, Parameter(description="Inclusive UTC lower bound (ISO 8601). If omitted, no lower bound.") + ] = None, + end: Annotated[ + datetime | None, + Parameter(description="Exclusive UTC upper bound (ISO 8601). Defaults to now if start is set."), + ] = None, + limit: Annotated[int, Parameter(ge=1, le=1000, description="Page size (1-1000).")] = 100, + offset: Annotated[int, Parameter(ge=0, description="Page offset.")] = 0, + ) -> EditorStatsResponse: + start, end = resolve_optional_window(start, end) + rows = await fetch_editor_stats( + start=start, + end=end, + limit=limit, + offset=offset, + ) + editors = [EditorStat(**row) for row in rows] + return EditorStatsResponse( + count=len(editors), + start=start, + end=end, + limit=limit, + offset=offset, + editors=editors, + ) + + +v1_router = Router( + path="/api/v1", + route_handlers=[StatsController, HashtagStatsController, EditorStatsController], +) diff --git a/api/schemas.py b/api/schemas.py index 8ec604f..fe3cae7 100644 --- a/api/schemas.py +++ b/api/schemas.py @@ -41,6 +41,56 @@ class UserStatsResponse(BaseModel): users: list[UserStat] +class HashtagStat(BaseModel): + hashtag: str + changesets: int + users: int + map_changes: int + rank: int + + +class HashtagTrend(BaseModel): + period_start: datetime + hashtag: str + changesets: int + users: int + map_changes: int + + +class HashtagTrendsResponse(BaseModel): + interval: str + trends: list[HashtagTrend] + + +class HashtagStatsResponse(BaseModel): + count: int + start: datetime + end: datetime + hashtag: list[str] | None + interval: str + limit: int + offset: int + hashtags: list[HashtagStat] + trends: list[HashtagTrend] + + +class EditorStat(BaseModel): + editor: str + changesets: int + users: int + map_changes: int + rank: int + + +class EditorStatsResponse(BaseModel): + count: int + start: datetime | None + end: datetime | None + limit: int + offset: int + editors: list[EditorStat] + + class HealthResponse(BaseModel): status: str last_seq: int | None diff --git a/docs/api-performance.md b/docs/api-performance.md new file mode 100644 index 0000000..c62ea3f --- /dev/null +++ b/docs/api-performance.md @@ -0,0 +1,47 @@ +# API query performance checks + +Use this script to run `EXPLAIN ANALYZE` for the analytics queries against a real PostgreSQL database. + +```powershell +$env:DATABASE_URL="postgresql://..." +uv run --group api python scripts/check_api_query_performance.py --days 30 --limit 100 +``` + +If the database was created before the analytics indexes were added, include `--ensure-indexes` before measuring: + +```powershell +uv run --group api python scripts/check_api_query_performance.py --days 30 --limit 100 --ensure-indexes +``` + +Check a specific hashtag: + +```powershell +uv run --group api python scripts/check_api_query_performance.py --days 30 --hashtag maproulette --limit 100 --ensure-indexes +``` + +Check a fixed window: + +```powershell +uv run --group api python scripts/check_api_query_performance.py --start 2026-05-01T00:00:00Z --end 2026-05-08T00:00:00Z --limit 100 +``` + +Check with generated temporary data instead of the real tables: + +```powershell +uv run --group api python scripts/check_api_query_performance.py --synthetic-rows 1000000 --start 2026-05-01T00:00:00Z --end 2026-05-31T00:00:00Z --limit 100 +``` + +Synthetic mode creates temporary tables in the current PostgreSQL session, runs the same query plans, and drops the temporary data when the connection closes. + +The output includes table counts and PostgreSQL execution plans for: + +- hashtag stats +- hashtag trends +- editor stats + +When reviewing the output, pay attention to: + +- total execution time +- whether indexes are used for `changesets.created_at`, `changesets.hashtags`, and `changeset_stats.changeset_id` +- whether row counts become too large for broad date windows +- whether a smaller time window or cached aggregate table is needed for planet-scale data diff --git a/osmsg/pg_schema.py b/osmsg/pg_schema.py index 178670a..1874322 100644 --- a/osmsg/pg_schema.py +++ b/osmsg/pg_schema.py @@ -13,6 +13,8 @@ geom GEOMETRY(POLYGON) ); CREATE INDEX IF NOT EXISTS idx_changesets_created_at ON changesets(created_at); +CREATE INDEX IF NOT EXISTS idx_changesets_hashtags ON changesets USING GIN (hashtags); +CREATE INDEX IF NOT EXISTS idx_changesets_editor ON changesets(editor); CREATE INDEX IF NOT EXISTS idx_changesets_geom ON changesets USING GIST (geom); CREATE TABLE IF NOT EXISTS changeset_stats ( changeset_id BIGINT NOT NULL REFERENCES changesets(changeset_id), @@ -33,6 +35,7 @@ PRIMARY KEY (seq_id, changeset_id) ); CREATE INDEX IF NOT EXISTS idx_changeset_stats_uid ON changeset_stats(uid); +CREATE INDEX IF NOT EXISTS idx_changeset_stats_changeset_id ON changeset_stats(changeset_id); CREATE TABLE IF NOT EXISTS state ( source_url TEXT PRIMARY KEY, last_seq BIGINT NOT NULL, diff --git a/scripts/check_api_query_performance.py b/scripts/check_api_query_performance.py new file mode 100644 index 0000000..0c5ded3 --- /dev/null +++ b/scripts/check_api_query_performance.py @@ -0,0 +1,218 @@ +import argparse +import asyncio +import os +from datetime import UTC, datetime, timedelta +from typing import Any + +import asyncpg + +from api.queries import _editor_stats_sql, _hashtag_stats_sql, _hashtag_trends_sql + + +def parse_dt(value: str) -> datetime: + parsed = datetime.fromisoformat(value.replace("Z", "+00:00")) + if parsed.tzinfo is None: + parsed = parsed.replace(tzinfo=UTC) + return parsed + + +def normalize_hashtags(values: list[str]) -> list[str]: + return ["#" + value.strip().lstrip("#") for value in values if value.strip()] + + +async def explain(conn: asyncpg.Connection, name: str, sql: str, params: list[Any]) -> None: + plan_rows = await conn.fetch(f"EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT) {sql}", *params) + print(f"\n=== {name} ===") + for row in plan_rows: + print(row["QUERY PLAN"]) + + +async def table_counts(conn: asyncpg.Connection) -> None: + print("=== table counts ===") + for table in ("users", "changesets", "changeset_stats"): + count = await conn.fetchval(f"SELECT COUNT(*) FROM {table}") + print(f"{table}: {count:,}") + row = await conn.fetchrow( + "SELECT MIN(created_at) AS min_created_at, MAX(created_at) AS max_created_at FROM changesets" + ) + print(f"changesets.created_at: {row['min_created_at']} -> {row['max_created_at']}") + + +async def create_synthetic_tables(conn: asyncpg.Connection, rows: int) -> None: + await conn.execute( + """ + CREATE TEMP TABLE users ( + uid BIGINT PRIMARY KEY, + username TEXT NOT NULL + ) ON COMMIT PRESERVE ROWS + """ + ) + await conn.execute( + """ + CREATE TEMP TABLE changesets ( + changeset_id BIGINT PRIMARY KEY, + uid BIGINT NOT NULL, + created_at TIMESTAMPTZ, + hashtags TEXT[], + editor TEXT + ) ON COMMIT PRESERVE ROWS + """ + ) + await conn.execute( + """ + CREATE TEMP TABLE changeset_stats ( + changeset_id BIGINT NOT NULL, + seq_id BIGINT NOT NULL, + uid BIGINT NOT NULL, + nodes_created INTEGER DEFAULT 0, + nodes_modified INTEGER DEFAULT 0, + nodes_deleted INTEGER DEFAULT 0, + ways_created INTEGER DEFAULT 0, + ways_modified INTEGER DEFAULT 0, + ways_deleted INTEGER DEFAULT 0, + rels_created INTEGER DEFAULT 0, + rels_modified INTEGER DEFAULT 0, + rels_deleted INTEGER DEFAULT 0, + poi_created INTEGER DEFAULT 0, + poi_modified INTEGER DEFAULT 0, + tag_stats JSONB, + PRIMARY KEY (seq_id, changeset_id) + ) ON COMMIT PRESERVE ROWS + """ + ) + await conn.execute( + """ + INSERT INTO users (uid, username) + SELECT uid, 'user_' || uid::text + FROM generate_series(1, LEAST($1::bigint, 100000::bigint)) AS uid + """, + rows, + ) + await conn.execute( + """ + INSERT INTO changesets (changeset_id, uid, created_at, hashtags, editor) + SELECT + id, + (id % LEAST($1::bigint, 100000::bigint)) + 1, + TIMESTAMPTZ '2026-05-01T00:00:00Z' + ((id % 43200) * INTERVAL '1 minute'), + CASE + WHEN id % 10 = 0 THEN ARRAY['#maproulette', '#tomtom'] + WHEN id % 7 = 0 THEN ARRAY['#hotosm'] + WHEN id % 5 = 0 THEN ARRAY['#osmnepal', '#buildings'] + ELSE ARRAY[]::TEXT[] + END, + CASE + WHEN id % 4 = 0 THEN 'iD 2.34.0' + WHEN id % 4 = 1 THEN 'JOSM' + WHEN id % 4 = 2 THEN 'StreetComplete' + ELSE NULL + END + FROM generate_series(1, $1::bigint) AS id + """, + rows, + ) + await conn.execute( + """ + INSERT INTO changeset_stats ( + changeset_id, seq_id, uid, nodes_created, nodes_modified, nodes_deleted, + ways_created, ways_modified, ways_deleted, rels_created, rels_modified, rels_deleted, + poi_created, poi_modified + ) + SELECT + id, + id, + (id % LEAST($1::bigint, 100000::bigint)) + 1, + (id % 97)::integer, + (id % 13)::integer, + (id % 3)::integer, + (id % 11)::integer, + (id % 5)::integer, + (id % 2)::integer, + (id % 7)::integer, + (id % 3)::integer, + (id % 2)::integer, + (id % 17)::integer, + (id % 5)::integer + FROM generate_series(1, $1::bigint) AS id + """, + rows, + ) + await ensure_indexes(conn) + await conn.execute("ANALYZE users") + await conn.execute("ANALYZE changesets") + await conn.execute("ANALYZE changeset_stats") + + +async def ensure_indexes(conn: asyncpg.Connection) -> None: + statements = [ + "CREATE INDEX IF NOT EXISTS idx_changesets_created_at ON changesets(created_at)", + "CREATE INDEX IF NOT EXISTS idx_changesets_hashtags ON changesets USING GIN (hashtags)", + "CREATE INDEX IF NOT EXISTS idx_changesets_editor ON changesets(editor)", + "CREATE INDEX IF NOT EXISTS idx_changeset_stats_changeset_id ON changeset_stats(changeset_id)", + ] + for statement in statements: + await conn.execute(statement) + + +async def main() -> None: + parser = argparse.ArgumentParser(description="Run EXPLAIN ANALYZE for API analytics queries.") + parser.add_argument("--database-url", default=os.environ.get("DATABASE_URL"), help="PostgreSQL connection URL.") + parser.add_argument("--start", type=parse_dt, help="Inclusive UTC lower bound, e.g. 2026-05-01T00:00:00Z.") + parser.add_argument("--end", type=parse_dt, help="Exclusive UTC upper bound, e.g. 2026-05-02T00:00:00Z.") + parser.add_argument("--days", type=int, default=30, help="Window size when start/end are omitted.") + parser.add_argument("--hashtag", action="append", default=[], help="Optional hashtag filter. Repeatable.") + parser.add_argument("--interval", choices=("day", "week", "month"), default="day", help="Trend bucket.") + parser.add_argument("--limit", type=int, default=100, help="Limit used by leaderboard queries.") + parser.add_argument("--offset", type=int, default=0, help="Offset used by leaderboard queries.") + parser.add_argument("--ensure-indexes", action="store_true", help="Create analytics indexes before measuring.") + parser.add_argument("--synthetic-rows", type=int, help="Use temporary generated tables with this many rows.") + args = parser.parse_args() + + if not args.database_url: + raise SystemExit("DATABASE_URL is required. Set it in the environment or pass --database-url.") + + end = args.end or datetime.now(tz=UTC) + start = args.start or (end - timedelta(days=args.days)) + if start >= end: + raise SystemExit("start must be before end") + + hashtags = normalize_hashtags(args.hashtag) + + conn = await asyncpg.connect(args.database_url) + try: + if args.synthetic_rows: + print(f"Creating temporary synthetic dataset with {args.synthetic_rows:,} rows...") + await create_synthetic_tables(conn, args.synthetic_rows) + if args.ensure_indexes: + await ensure_indexes(conn) + await table_counts(conn) + + hashtag_stats_params: list[Any] = [start, end] + if hashtags: + hashtag_stats_params.append(hashtags) + hashtag_stats_params.extend([args.limit, args.offset]) + await explain( + conn, + "hashtag stats", + _hashtag_stats_sql(filter_dates=True, filter_hashtags=bool(hashtags)), + hashtag_stats_params, + ) + + trend_params: list[Any] = [start, end] + if hashtags: + trend_params.append(hashtags) + trend_params.extend([args.interval, args.limit, args.offset]) + await explain( + conn, + "hashtag trends", + _hashtag_trends_sql(filter_hashtags=bool(hashtags)), + trend_params, + ) + + await explain(conn, "editor stats", _editor_stats_sql(filter_dates=True), [start, end, args.limit, args.offset]) + finally: + await conn.close() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/tests/test_api.py b/tests/test_api.py index 893d660..779093f 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -28,6 +28,9 @@ def test_api_exposes_only_active_public_routes(): assert "/health" in paths assert "/api/v1/stats" in paths + assert "/api/v1/hashtag-stats" in paths + assert "/api/v1/editor-stats" in paths + assert "/api/v1/hashtag-trends" not in paths assert "/api/v1/stats/summary" not in paths assert "/api/v1/stats/timeseries" not in paths @@ -56,11 +59,16 @@ def test_normalize_hashtags_dedupes_case_insensitively(): assert normalize_hashtags(["maproulette", "#MapRoulette", "#roads"]) == ["#maproulette", "#roads"] -def _stats_app(monkeypatch, fake_fetch): - monkeypatch.setattr(v1_module, "fetch_user_stats", fake_fetch) +def _v1_app(monkeypatch, **fakes): + for name, fake in fakes.items(): + monkeypatch.setattr(v1_module, name, fake) return Litestar(route_handlers=[v1_router]) +def _stats_app(monkeypatch, fake_fetch): + return _v1_app(monkeypatch, fetch_user_stats=fake_fetch) + + def test_user_stats_endpoint_returns_expected_response(monkeypatch): async def fake_fetch_user_stats(*, start, end, hashtag, tags, limit, offset): assert start.isoformat() == "2026-05-01T00:00:00+00:00" @@ -187,6 +195,116 @@ async def fake_fetch_user_stats(*, tags, **_kwargs): assert body["users"][0]["tag_stats"] is None +def test_hashtag_stats_endpoint_returns_expected_response(monkeypatch): + async def fake_fetch_hashtag_stats(*, start, end, hashtag, limit, offset): + assert start.isoformat() == "2026-05-01T00:00:00+00:00" + assert end.isoformat() == "2026-05-02T00:00:00+00:00" + assert hashtag == ["#mapathon"] + assert limit == 2 + assert offset == 0 + return [ + {"hashtag": "#mapathon", "changesets": 4, "users": 3, "map_changes": 100, "rank": 1}, + {"hashtag": "#roads", "changesets": 2, "users": 2, "map_changes": 25, "rank": 2}, + ] + + async def fake_fetch_hashtag_trends(*, start, end, interval, hashtag, limit, offset): + assert start.isoformat() == "2026-05-01T00:00:00+00:00" + assert end.isoformat() == "2026-05-02T00:00:00+00:00" + assert interval == "day" + assert hashtag == ["#mapathon"] + assert limit == 2 + assert offset == 0 + return [ + { + "period_start": datetime(2026, 5, 1, tzinfo=UTC), + "hashtag": "#mapathon", + "changesets": 4, + "users": 3, + "map_changes": 100, + } + ] + + with TestClient( + _v1_app( + monkeypatch, + fetch_hashtag_stats=fake_fetch_hashtag_stats, + fetch_hashtag_trends=fake_fetch_hashtag_trends, + ) + ) as client: + response = client.get( + "/api/v1/hashtag-stats", + params={ + "start": "2026-05-01T00:00:00Z", + "end": "2026-05-02T00:00:00Z", + "hashtag": "mapathon", + "limit": "2", + }, + ) + + assert response.status_code == 200 + assert response.json() == { + "count": 2, + "start": "2026-05-01T00:00:00Z", + "end": "2026-05-02T00:00:00Z", + "hashtag": ["#mapathon"], + "interval": "day", + "limit": 2, + "offset": 0, + "hashtags": [ + {"hashtag": "#mapathon", "changesets": 4, "users": 3, "map_changes": 100, "rank": 1}, + {"hashtag": "#roads", "changesets": 2, "users": 2, "map_changes": 25, "rank": 2}, + ], + "trends": [ + { + "period_start": "2026-05-01T00:00:00Z", + "hashtag": "#mapathon", + "changesets": 4, + "users": 3, + "map_changes": 100, + }, + ], + } + + +def test_hashtag_stats_endpoint_rejects_invalid_interval(monkeypatch): + async def fake_fetch_hashtag_trends(**kwargs): + raise AssertionError("fetch_hashtag_trends should not be called") + + with TestClient(_v1_app(monkeypatch, fetch_hashtag_trends=fake_fetch_hashtag_trends)) as client: + response = client.get("/api/v1/hashtag-stats", params={"interval": "hour"}) + + assert response.status_code == 400 + assert response.json()["detail"] == "interval must be one of: day, week, month" + + +def test_editor_stats_endpoint_returns_expected_response(monkeypatch): + async def fake_fetch_editor_stats(*, start, end, limit, offset): + assert start is None + assert end is None + assert limit == 2 + assert offset == 0 + return [ + {"editor": "iD 2.34.0", "changesets": 10, "users": 5, "map_changes": 500, "rank": 1}, + {"editor": "JOSM", "changesets": 4, "users": 2, "map_changes": 120, "rank": 2}, + ] + + with TestClient(_v1_app(monkeypatch, fetch_editor_stats=fake_fetch_editor_stats)) as client: + response = client.get("/api/v1/editor-stats", params={"limit": "2"}) + + assert response.status_code == 200 + assert response.json() == { + "count": 2, + "start": None, + "end": None, + "limit": 2, + "offset": 0, + "editors": [ + {"editor": "iD 2.34.0", "changesets": 10, "users": 5, "map_changes": 500, "rank": 1}, + {"editor": "JOSM", "changesets": 4, "users": 2, "map_changes": 120, "rank": 2}, + ], + } + + def test_user_stats_sql_omits_tag_ctes_when_tags_false(): from api.queries import _user_stats_sql @@ -198,6 +316,33 @@ def test_user_stats_sql_omits_tag_ctes_when_tags_false(): assert "user_hashtags" in sql_without +def test_hashtag_stats_sql_uses_array_overlap_and_lateral_unnest(): + from api.queries import _hashtag_stats_sql + + sql = _hashtag_stats_sql(filter_dates=True, filter_hashtags=True) + assert "CROSS JOIN LATERAL UNNEST(cs.hashtags)" in sql + assert "cs.hashtags && $3::TEXT[]" in sql + assert "LIMIT $4 OFFSET $5" in sql + + +def test_hashtag_trends_sql_uses_bounded_date_range(): + from api.queries import _hashtag_trends_sql + + sql = _hashtag_trends_sql(filter_hashtags=False) + assert "DATE_TRUNC($3, cs.created_at)" in sql + assert "cs.created_at >= $1" in sql + assert "cs.created_at < $2" in sql + assert "LIMIT $4 OFFSET $5" in sql + + +def test_editor_stats_sql_groups_blank_editors_as_unknown(): + from api.queries import _editor_stats_sql + + sql = _editor_stats_sql(filter_dates=False) + assert "COALESCE(NULLIF(cs.editor, ''), 'unknown') AS editor" in sql + assert "GROUP BY editor" in sql + + def _seed_pg_via_to_psql(fresh_db, populated_db_factory, dsn): populated = populated_db_factory(fresh_db) populated.execute( diff --git a/tests/test_psql_export.py b/tests/test_psql_export.py index d65d36d..12f8f05 100644 --- a/tests/test_psql_export.py +++ b/tests/test_psql_export.py @@ -57,7 +57,7 @@ def test_pg_schema_statements_each_parse_with_postgres_extension(): conn.execute("LOAD spatial") for stmt in [s.strip() for s in duckdb_clone.split(";") if s.strip()]: upper = stmt.upper() - if upper.startswith("CREATE EXTENSION") or "USING GIST" in upper: + if upper.startswith("CREATE EXTENSION") or "USING GIST" in upper or "USING GIN" in upper: continue conn.execute(stmt) tables = {r[0] for r in conn.execute("SELECT table_name FROM information_schema.tables").fetchall()}