diff --git a/README.md b/README.md index 2cf2986..cd80bfa 100644 --- a/README.md +++ b/README.md @@ -68,8 +68,11 @@ alice = await db.insert(User(id=1, name="Alice", email="a@x"), async for user in db.iter_rows(User, batch_size=500): process(user) -# For huge tables, iter_rows_keyset stays O(N) by emitting -# `WHERE > ` per page. `by` defaults to __pk__[0]. +# For huge tables, iter_rows_keyset stays O(N) by emitting a +# `(by, *pk_tail) > last_seen` cursor per page. PK tail breaks +# ties and lets the cursor paginate through NULL values. Order +# is `ORDER BY by NULLS FIRST` -- deterministic across backends. +# `by` defaults to __pk__[0]. SQLite 3.30+ required. async for user in db.iter_rows_keyset(User, batch_size=500): process(user) diff --git a/src/etchdb/db.py b/src/etchdb/db.py index 02a957a..f7c21f7 100644 --- a/src/etchdb/db.py +++ b/src/etchdb/db.py @@ -248,21 +248,18 @@ async def iter_rows_keyset( ) -> AsyncIterator[Row]: """Stream every matching row via keyset pagination. - Unlike `iter_rows`, the per-page cost stays constant -- each - page reads `batch_size` rows via `WHERE > ` + Unlike `iter_rows`, per-page cost stays constant: each page + reads `batch_size` rows via `WHERE (by, *pk_tail) > (...)` rather than `OFFSET N`, so total cost is O(N) instead of - O(N^2). The trade-off is the constraint on `by`: - - - It must be a single column (composite-PK keyset uses - `(a, b) > (last_a, last_b)` and isn't supported here). - - It must be NOT NULL and unique enough that no two rows - tie. Primary keys usually qualify; created_at columns can - if the resolution is high enough. A NULL at a page - boundary stalls the cursor (WHERE by > NULL is false), - so we raise rather than loop forever. - - Defaults to `model.__pk__[0]`. Filters are AND'd with the - cursor. Ordering is ascending; descending is not supported. + O(N^2). `by` must be a single DB-orderable column. PK columns + break ties on `by` and let the cursor paginate through NULL + values; the cursor is `[by] + [p for p in __pk__ if p != by]`. + + `by` defaults to `model.__pk__[0]` (composite PK requires + explicit `by=`). Filters are AND'd with the cursor. Ordering + is ascending with `NULLS FIRST` -- NULL rows iterate first + on every backend. Descending is not supported. SQLite 3.30+ + is required (for the `NULLS FIRST` keyword). """ if batch_size < 1: raise ValueError(f"batch_size must be >= 1, got {batch_size}") @@ -276,7 +273,8 @@ async def iter_rows_keyset( ) by = model.__pk__[0] - last_seen: Any = None + order_cols = [by] + [p for p in model.__pk__ if p != by] + last_seen: list[Any] | None = None while True: q = sql.select_keyset( model, @@ -289,17 +287,11 @@ async def iter_rows_keyset( rows = await self._adapter.fetch(q.sql, *q.params) if not rows: return - if len(rows) == batch_size and rows[-1][by] is None: - raise ValueError( - f"iter_rows_keyset: {by!r} is NULL at the page boundary; " - f"the cursor cannot advance. Pick a non-nullable column " - f"for `by`, or filter NULLs out via raw SQL." - ) for row in rows: yield model(**row) if len(rows) < batch_size: return - last_seen = rows[-1][by] + last_seen = [rows[-1][c] for c in order_cols] async def insert(self, row: Row, *, on_conflict: sql._OnConflict = None) -> Row: """Insert `row` and return the DB's view (RETURNING *). diff --git a/src/etchdb/sql/__init__.py b/src/etchdb/sql/__init__.py index 7184d69..2fef6b6 100644 --- a/src/etchdb/sql/__init__.py +++ b/src/etchdb/sql/__init__.py @@ -146,40 +146,105 @@ def select_keyset( placeholder: Callable[[int], str], by: str, batch_size: int, - last_seen: Any = None, + last_seen: Sequence[Any] | None = None, **filters: Any, ) -> SqlQuery: """Build a single page of a keyset-paginated SELECT. - Emits `SELECT cols FROM table [WHERE [AND by > last_seen]] - ORDER BY by LIMIT batch_size`. The first page passes - `last_seen=None`, which drops the cursor predicate; subsequent - pages bind the previous page's max value for `by`. + Cursor columns are `[by] + [p for p in __pk__ if p != by]`, so the + cursor is unique (PK tail breaks ties). `last_seen` is a sequence + aligned with those columns; `None` drops the cursor predicate + (first page). `ORDER BY ... NULLS FIRST` is explicit so NULL rows + iterate first on every backend (Postgres default would be NULLS + LAST otherwise; SQLite already defaults NULLS FIRST). SQLite 3.30+ + is required. + + Three WHERE shapes, branched on `last_seen` state: + first page -- no cursor predicate + NULL region -- (by IS NULL AND ) OR by IS NOT NULL + past NULL -- (by, *pk_tail) > (last_by, *last_pk_tail) + + Single-column cursor (1 PK col and by == that col) degenerates to + the scalar form `by > $k` so we never emit one-element row-value + syntax. The OR-group is fully parenthesized inside the AND chain + so filters and the cursor compose correctly. """ - if by not in _db_fields(row_class): - raise ValueError(f"keyset column {by!r} is not a DB column on {row_class.__name__}.") if by in filters: raise ValueError(f"keyset column {by!r} cannot also appear in filters.") + db_fields = _db_fields(row_class) + order_cols = [by] + [p for p in row_class.__pk__ if p != by] + for col in order_cols: + if col not in db_fields: + raise ValueError( + f"keyset column {col!r} is not a DB column on {row_class.__name__}." + ) + table = _table_name(row_class) - columns = ", ".join(_db_fields(row_class)) + columns = ", ".join(db_fields) where_sql, params = _where_clauses(filters, placeholder=placeholder) where_parts = [where_sql] if where_sql else [] if last_seen is not None: - where_parts.append(f"{by} > {placeholder(len(params))}") - params.append(last_seen) + if len(last_seen) != len(order_cols): + raise ValueError( + f"last_seen has {len(last_seen)} values but cursor has " + f"{len(order_cols)} columns ({order_cols})." + ) + cursor_sql, cursor_values = _keyset_cursor_sql( + order_cols, last_seen, placeholder, start=len(params) + ) + where_parts.append(cursor_sql) + params.extend(cursor_values) sql = f"SELECT {columns} FROM {table}" if where_parts: sql += f" WHERE {' AND '.join(where_parts)}" - sql += f" ORDER BY {by}" + order_by = ", ".join([f"{order_cols[0]} NULLS FIRST", *order_cols[1:]]) + sql += f" ORDER BY {order_by}" params.append(batch_size) sql += f" LIMIT {placeholder(len(params) - 1)}" return SqlQuery(sql=sql, params=params) +def _keyset_cursor_sql( + order_cols: list[str], + last_seen: Sequence[Any], + placeholder: Callable[[int], str], + *, + start: int, +) -> tuple[str, list[Any]]: + """Emit the WHERE cursor predicate. Returns `(sql, bound_values)`. + + `start` is the offset for placeholder numbering -- `start + i` is + the param index for value `i`, so the caller controls placement in + its own params list. + """ + by = order_cols[0] + + def _gt(cols: list[str], values: Sequence[Any], offset: int) -> str: + lhs = ", ".join(cols) + rhs = ", ".join(placeholder(offset + i) for i in range(len(values))) + if len(cols) == 1: + return f"{lhs} > {rhs}" + return f"({lhs}) > ({rhs})" + + if last_seen[0] is None: + pk_tail = order_cols[1:] + if not pk_tail: + raise ValueError( + f"keyset cursor has NULL in {by!r} but no PK tie-breaker " + f"is available (single-PK case where by IS pk)." + ) + values = list(last_seen[1:]) + pk_cursor = _gt(pk_tail, values, start) + return f"(({by} IS NULL AND {pk_cursor}) OR {by} IS NOT NULL)", values + + values = list(last_seen) + return _gt(order_cols, values, start), values + + def update( row: Row, *, diff --git a/tests/integration/test_iter_rows_keyset.py b/tests/integration/test_iter_rows_keyset.py index 6901430..729dfc2 100644 --- a/tests/integration/test_iter_rows_keyset.py +++ b/tests/integration/test_iter_rows_keyset.py @@ -93,18 +93,92 @@ async def test_iter_rows_keyset_rejects_non_db_column(db: DB): pass -async def test_iter_rows_keyset_raises_on_null_page_boundary(db: DB): - """NULL at a full-page boundary stalls the cursor (WHERE by > - NULL is false), so the loop would re-fetch the same page forever. - Raise instead. Three NULL emails + one non-NULL + batch_size=2 - reproduces on both SQLite (NULLs first) and Postgres (NULLs - last), since either way a NULL row lands at the end of a full - non-final page.""" +async def test_iter_rows_keyset_paginates_through_all_nulls(db: DB): + """Every row has `email=NULL`. Cursor advances via the PK tail + (id) inside the NULL region and terminates correctly.""" + for i in range(1, 5): + await db.insert(User(id=i, name=f"u{i}", email=None)) + + rows = [u async for u in db.iter_rows_keyset(User, by="email", batch_size=2)] + + assert [u.id for u in rows] == [1, 2, 3, 4] + assert all(u.email is None for u in rows) + + +async def test_iter_rows_keyset_paginates_through_mixed_nulls(db: DB): + """Mixed NULL + non-NULL with NULL at a full-page boundary -- + the failure mode from issue #1. With explicit NULLS FIRST and a + compound cursor, NULL rows come first and we paginate through + cleanly.""" + await db.insert(User(id=1, name="u1", email=None)) + await db.insert(User(id=2, name="u2", email=None)) + await db.insert(User(id=3, name="u3", email="a@x")) + await db.insert(User(id=4, name="u4", email="b@x")) + + rows = [u async for u in db.iter_rows_keyset(User, by="email", batch_size=2)] + + assert [u.id for u in rows] == [1, 2, 3, 4] + assert [u.email for u in rows] == [None, None, "a@x", "b@x"] + + +async def test_iter_rows_keyset_paginates_through_tied_values(db: DB): + """Same `email` on every row. The PK tail breaks ties so all + rows yield in id order; with the pre-fix `WHERE by > last_seen` + cursor, only the first page would be yielded.""" + for i in range(1, 5): + await db.insert(User(id=i, name=f"u{i}", email="same@x")) + + rows = [u async for u in db.iter_rows_keyset(User, by="email", batch_size=2)] + + assert [u.id for u in rows] == [1, 2, 3, 4] + + +async def test_iter_rows_keyset_paginates_through_nulls_and_ties(db: DB): + """Combination case: some NULLs, two distinct non-NULL values + each repeated. Order must be NULLs first then non-NULL groups, + tie-broken by id within each group.""" await db.insert(User(id=1, name="u1", email=None)) await db.insert(User(id=2, name="u2", email=None)) - await db.insert(User(id=3, name="u3", email=None)) + await db.insert(User(id=3, name="u3", email="a@x")) await db.insert(User(id=4, name="u4", email="a@x")) + await db.insert(User(id=5, name="u5", email="b@x")) + await db.insert(User(id=6, name="u6", email="b@x")) - with pytest.raises(ValueError, match="NULL at the page boundary"): - async for _ in db.iter_rows_keyset(User, by="email", batch_size=2): - pass + rows = [u async for u in db.iter_rows_keyset(User, by="email", batch_size=2)] + + assert [u.id for u in rows] == [1, 2, 3, 4, 5, 6] + + +async def test_iter_rows_keyset_composite_pk_tie_break(db: DB): + """Non-PK `by` on a composite-PK model. Ties in `by` resolve via + both PK columns chained, so the cursor advances correctly when + a non-PK column duplicates across PK combinations.""" + from tests._models import UserRole + + await db.insert(UserRole(user_id=1, role_id=10, note="x")) + await db.insert(UserRole(user_id=1, role_id=20, note="x")) + await db.insert(UserRole(user_id=2, role_id=10, note="x")) + await db.insert(UserRole(user_id=2, role_id=20, note="y")) + + rows = [u async for u in db.iter_rows_keyset(UserRole, by="note", batch_size=2)] + + assert [(r.user_id, r.role_id, r.note) for r in rows] == [ + (1, 10, "x"), + (1, 20, "x"), + (2, 10, "x"), + (2, 20, "y"), + ] + + +async def test_iter_rows_keyset_filter_with_nulls_and_ties(db: DB): + """Filter AND'd with the cursor predicate -- exercises the + parenthesization of the OR-group inside the WHERE chain.""" + await db.insert(User(id=1, name="alice", email=None)) + await db.insert(User(id=2, name="bob", email=None)) + await db.insert(User(id=3, name="alice", email="a@x")) + await db.insert(User(id=4, name="alice", email="a@x")) + await db.insert(User(id=5, name="bob", email="a@x")) + + rows = [u async for u in db.iter_rows_keyset(User, by="email", batch_size=2, name="alice")] + + assert [u.id for u in rows] == [1, 3, 4] diff --git a/tests/unit/test_sql_emitter.py b/tests/unit/test_sql_emitter.py index bd81bf8..83e5228 100644 --- a/tests/unit/test_sql_emitter.py +++ b/tests/unit/test_sql_emitter.py @@ -748,35 +748,98 @@ class Article(Row): # --- select_keyset: single page of cursor-paginated SELECT ------------ -def test_select_keyset_first_page_omits_cursor_pg(): - """First page (last_seen=None) drops the `by > ...` predicate and - relies on ORDER BY + LIMIT alone.""" +def test_select_keyset_first_page_emits_nulls_first_pg(): + """First page drops the cursor predicate and emits explicit + NULLS FIRST so iteration order is deterministic across backends.""" q = sql.select_keyset(User, placeholder=pg, by="id", batch_size=3) - assert q.sql == "SELECT id, name, email FROM users ORDER BY id LIMIT $1" + assert q.sql == "SELECT id, name, email FROM users ORDER BY id NULLS FIRST LIMIT $1" assert q.params == [3] -def test_select_keyset_subsequent_page_emits_cursor_pg(): - """Once last_seen is set, the page's WHERE binds the previous max.""" - q = sql.select_keyset(User, placeholder=pg, by="id", batch_size=3, last_seen=10) - assert q.sql == "SELECT id, name, email FROM users WHERE id > $1 ORDER BY id LIMIT $2" +def test_select_keyset_by_pk_emits_scalar_cursor_pg(): + """Single-column cursor (by IS pk) degenerates to the scalar form + `id > $1`, not one-element row-value syntax.""" + q = sql.select_keyset(User, placeholder=pg, by="id", batch_size=3, last_seen=[10]) + assert q.sql == ( + "SELECT id, name, email FROM users WHERE id > $1 ORDER BY id NULLS FIRST LIMIT $2" + ) assert q.params == [10, 3] -def test_select_keyset_with_filter_pg(): - """Filter predicates land before the cursor in the WHERE chain; - placeholder numbering threads through filter binds first.""" - q = sql.select_keyset(User, placeholder=pg, by="id", batch_size=2, last_seen=5, name="alice") +def test_select_keyset_by_non_pk_emits_compound_cursor_pg(): + """`by` differs from pk: cursor is `(by, pk_tail)` row-value, and + ORDER BY chains the PK tail after `by`.""" + q = sql.select_keyset(User, placeholder=pg, by="email", batch_size=3, last_seen=["a@x", 10]) assert q.sql == ( - "SELECT id, name, email FROM users WHERE name = $1 AND id > $2 ORDER BY id LIMIT $3" + "SELECT id, name, email FROM users WHERE (email, id) > ($1, $2) " + "ORDER BY email NULLS FIRST, id LIMIT $3" + ) + assert q.params == ["a@x", 10, 3] + + +def test_select_keyset_null_region_cursor_pg(): + """`last_seen[0] IS NULL`: emit OR-group that either advances through + more NULL-by rows via PK tail or unlocks the non-NULL tail.""" + q = sql.select_keyset(User, placeholder=pg, by="email", batch_size=3, last_seen=[None, 7]) + assert q.sql == ( + "SELECT id, name, email FROM users " + "WHERE ((email IS NULL AND id > $1) OR email IS NOT NULL) " + "ORDER BY email NULLS FIRST, id LIMIT $2" + ) + assert q.params == [7, 3] + + +def test_select_keyset_composite_pk_emits_three_col_cursor_pg(): + """Composite PK with non-PK `by`: cursor extends through both PK + columns so ties resolve through the full unique key.""" + q = sql.select_keyset( + UserRole, placeholder=pg, by="note", batch_size=2, last_seen=["x", 1, 10] + ) + assert q.sql == ( + "SELECT user_id, role_id, note FROM user_roles " + "WHERE (note, user_id, role_id) > ($1, $2, $3) " + "ORDER BY note NULLS FIRST, user_id, role_id LIMIT $4" + ) + assert q.params == ["x", 1, 10, 2] + + +def test_select_keyset_composite_pk_null_region_pg(): + """Composite PK + null region: OR-group's PK side is itself a + row-value `(user_id, role_id) > ($1, $2)`.""" + q = sql.select_keyset( + UserRole, placeholder=pg, by="note", batch_size=2, last_seen=[None, 1, 10] + ) + assert q.sql == ( + "SELECT user_id, role_id, note FROM user_roles " + "WHERE ((note IS NULL AND (user_id, role_id) > ($1, $2)) OR note IS NOT NULL) " + "ORDER BY note NULLS FIRST, user_id, role_id LIMIT $3" + ) + assert q.params == [1, 10, 2] + + +def test_select_keyset_filter_with_cursor_parenthesizes_or_pg(): + """Filters AND'd with the cursor: the OR-group must be parenthesized + so AND/OR precedence does not break the predicate. Placeholders + thread through filter binds first.""" + q = sql.select_keyset( + User, placeholder=pg, by="email", batch_size=2, last_seen=[None, 5], name="alice" + ) + assert q.sql == ( + "SELECT id, name, email FROM users " + "WHERE name = $1 AND ((email IS NULL AND id > $2) OR email IS NOT NULL) " + "ORDER BY email NULLS FIRST, id LIMIT $3" ) assert q.params == ["alice", 5, 2] -def test_select_keyset_sqlite(): - q = sql.select_keyset(User, placeholder=lite, by="id", batch_size=2, last_seen=5) - assert q.sql == "SELECT id, name, email FROM users WHERE id > ? ORDER BY id LIMIT ?" - assert q.params == [5, 2] +def test_select_keyset_sqlite_placeholders(): + """SQLite uses `?` placeholders; emitter is otherwise identical.""" + q = sql.select_keyset(User, placeholder=lite, by="email", batch_size=2, last_seen=["a@x", 5]) + assert q.sql == ( + "SELECT id, name, email FROM users WHERE (email, id) > (?, ?) " + "ORDER BY email NULLS FIRST, id LIMIT ?" + ) + assert q.params == ["a@x", 5, 2] def test_select_keyset_rejects_non_db_column(): @@ -789,6 +852,13 @@ def test_select_keyset_rejects_by_in_filters(): sql.select_keyset(User, placeholder=pg, by="id", batch_size=1, id=5) +def test_select_keyset_rejects_last_seen_length_mismatch(): + """`last_seen` must have one value per cursor column; a mismatch is + a caller bug, caught at emit time rather than at SQL execute time.""" + with pytest.raises(ValueError, match="last_seen has"): + sql.select_keyset(User, placeholder=pg, by="email", batch_size=1, last_seen=["only-one"]) + + # --- update_where: bulk update scoped by where= -------------------------