Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,11 @@ alice = await db.insert(User(id=1, name="Alice", email="a@x"),
async for user in db.iter_rows(User, batch_size=500):
process(user)

# For huge tables, iter_rows_keyset stays O(N) by emitting
# `WHERE <by> > <last_seen>` per page. `by` defaults to __pk__[0].
# For huge tables, iter_rows_keyset stays O(N) by emitting a
# `(by, *pk_tail) > last_seen` cursor per page. PK tail breaks
# ties and lets the cursor paginate through NULL values. Order
# is `ORDER BY by NULLS FIRST` -- deterministic across backends.
# `by` defaults to __pk__[0]. SQLite 3.30+ required.
async for user in db.iter_rows_keyset(User, batch_size=500):
process(user)

Expand Down
36 changes: 14 additions & 22 deletions src/etchdb/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,21 +248,18 @@ async def iter_rows_keyset(
) -> AsyncIterator[Row]:
"""Stream every matching row via keyset pagination.

Unlike `iter_rows`, the per-page cost stays constant -- each
page reads `batch_size` rows via `WHERE <by> > <last_seen>`
Unlike `iter_rows`, per-page cost stays constant: each page
reads `batch_size` rows via `WHERE (by, *pk_tail) > (...)`
rather than `OFFSET N`, so total cost is O(N) instead of
O(N^2). The trade-off is the constraint on `by`:

- It must be a single column (composite-PK keyset uses
`(a, b) > (last_a, last_b)` and isn't supported here).
- It must be NOT NULL and unique enough that no two rows
tie. Primary keys usually qualify; created_at columns can
if the resolution is high enough. A NULL at a page
boundary stalls the cursor (WHERE by > NULL is false),
so we raise rather than loop forever.

Defaults to `model.__pk__[0]`. Filters are AND'd with the
cursor. Ordering is ascending; descending is not supported.
O(N^2). `by` must be a single DB-orderable column. PK columns
break ties on `by` and let the cursor paginate through NULL
values; the cursor is `[by] + [p for p in __pk__ if p != by]`.

`by` defaults to `model.__pk__[0]` (composite PK requires
explicit `by=`). Filters are AND'd with the cursor. Ordering
is ascending with `NULLS FIRST` -- NULL rows iterate first
on every backend. Descending is not supported. SQLite 3.30+
is required (for the `NULLS FIRST` keyword).
"""
if batch_size < 1:
raise ValueError(f"batch_size must be >= 1, got {batch_size}")
Expand All @@ -276,7 +273,8 @@ async def iter_rows_keyset(
)
by = model.__pk__[0]

last_seen: Any = None
order_cols = [by] + [p for p in model.__pk__ if p != by]
last_seen: list[Any] | None = None
while True:
q = sql.select_keyset(
model,
Expand All @@ -289,17 +287,11 @@ async def iter_rows_keyset(
rows = await self._adapter.fetch(q.sql, *q.params)
if not rows:
return
if len(rows) == batch_size and rows[-1][by] is None:
raise ValueError(
f"iter_rows_keyset: {by!r} is NULL at the page boundary; "
f"the cursor cannot advance. Pick a non-nullable column "
f"for `by`, or filter NULLs out via raw SQL."
)
for row in rows:
yield model(**row)
if len(rows) < batch_size:
return
last_seen = rows[-1][by]
last_seen = [rows[-1][c] for c in order_cols]

async def insert(self, row: Row, *, on_conflict: sql._OnConflict = None) -> Row:
"""Insert `row` and return the DB's view (RETURNING *).
Expand Down
87 changes: 76 additions & 11 deletions src/etchdb/sql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,40 +146,105 @@ def select_keyset(
placeholder: Callable[[int], str],
by: str,
batch_size: int,
last_seen: Any = None,
last_seen: Sequence[Any] | None = None,
**filters: Any,
) -> SqlQuery:
"""Build a single page of a keyset-paginated SELECT.

Emits `SELECT cols FROM table [WHERE <filters> [AND by > last_seen]]
ORDER BY by LIMIT batch_size`. The first page passes
`last_seen=None`, which drops the cursor predicate; subsequent
pages bind the previous page's max value for `by`.
Cursor columns are `[by] + [p for p in __pk__ if p != by]`, so the
cursor is unique (PK tail breaks ties). `last_seen` is a sequence
aligned with those columns; `None` drops the cursor predicate
(first page). `ORDER BY ... NULLS FIRST` is explicit so NULL rows
iterate first on every backend (Postgres default would be NULLS
LAST otherwise; SQLite already defaults NULLS FIRST). SQLite 3.30+
is required.

Three WHERE shapes, branched on `last_seen` state:
first page -- no cursor predicate
NULL region -- (by IS NULL AND <pk_cursor>) OR by IS NOT NULL
past NULL -- (by, *pk_tail) > (last_by, *last_pk_tail)

Single-column cursor (1 PK col and by == that col) degenerates to
the scalar form `by > $k` so we never emit one-element row-value
syntax. The OR-group is fully parenthesized inside the AND chain
so filters and the cursor compose correctly.
"""
if by not in _db_fields(row_class):
raise ValueError(f"keyset column {by!r} is not a DB column on {row_class.__name__}.")
if by in filters:
raise ValueError(f"keyset column {by!r} cannot also appear in filters.")

db_fields = _db_fields(row_class)
order_cols = [by] + [p for p in row_class.__pk__ if p != by]
for col in order_cols:
if col not in db_fields:
raise ValueError(
f"keyset column {col!r} is not a DB column on {row_class.__name__}."
)

table = _table_name(row_class)
columns = ", ".join(_db_fields(row_class))
columns = ", ".join(db_fields)
where_sql, params = _where_clauses(filters, placeholder=placeholder)
where_parts = [where_sql] if where_sql else []

if last_seen is not None:
where_parts.append(f"{by} > {placeholder(len(params))}")
params.append(last_seen)
if len(last_seen) != len(order_cols):
raise ValueError(
f"last_seen has {len(last_seen)} values but cursor has "
f"{len(order_cols)} columns ({order_cols})."
)
cursor_sql, cursor_values = _keyset_cursor_sql(
order_cols, last_seen, placeholder, start=len(params)
)
where_parts.append(cursor_sql)
params.extend(cursor_values)

sql = f"SELECT {columns} FROM {table}"
if where_parts:
sql += f" WHERE {' AND '.join(where_parts)}"
sql += f" ORDER BY {by}"
order_by = ", ".join([f"{order_cols[0]} NULLS FIRST", *order_cols[1:]])
sql += f" ORDER BY {order_by}"
params.append(batch_size)
sql += f" LIMIT {placeholder(len(params) - 1)}"

return SqlQuery(sql=sql, params=params)


def _keyset_cursor_sql(
order_cols: list[str],
last_seen: Sequence[Any],
placeholder: Callable[[int], str],
*,
start: int,
) -> tuple[str, list[Any]]:
"""Emit the WHERE cursor predicate. Returns `(sql, bound_values)`.

`start` is the offset for placeholder numbering -- `start + i` is
the param index for value `i`, so the caller controls placement in
its own params list.
"""
by = order_cols[0]

def _gt(cols: list[str], values: Sequence[Any], offset: int) -> str:
lhs = ", ".join(cols)
rhs = ", ".join(placeholder(offset + i) for i in range(len(values)))
if len(cols) == 1:
return f"{lhs} > {rhs}"
return f"({lhs}) > ({rhs})"

if last_seen[0] is None:
pk_tail = order_cols[1:]
if not pk_tail:
raise ValueError(
f"keyset cursor has NULL in {by!r} but no PK tie-breaker "
f"is available (single-PK case where by IS pk)."
)
values = list(last_seen[1:])
pk_cursor = _gt(pk_tail, values, start)
return f"(({by} IS NULL AND {pk_cursor}) OR {by} IS NOT NULL)", values

values = list(last_seen)
return _gt(order_cols, values, start), values


def update(
row: Row,
*,
Expand Down
96 changes: 85 additions & 11 deletions tests/integration/test_iter_rows_keyset.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,18 +93,92 @@ async def test_iter_rows_keyset_rejects_non_db_column(db: DB):
pass


async def test_iter_rows_keyset_raises_on_null_page_boundary(db: DB):
"""NULL at a full-page boundary stalls the cursor (WHERE by >
NULL is false), so the loop would re-fetch the same page forever.
Raise instead. Three NULL emails + one non-NULL + batch_size=2
reproduces on both SQLite (NULLs first) and Postgres (NULLs
last), since either way a NULL row lands at the end of a full
non-final page."""
async def test_iter_rows_keyset_paginates_through_all_nulls(db: DB):
"""Every row has `email=NULL`. Cursor advances via the PK tail
(id) inside the NULL region and terminates correctly."""
for i in range(1, 5):
await db.insert(User(id=i, name=f"u{i}", email=None))

rows = [u async for u in db.iter_rows_keyset(User, by="email", batch_size=2)]

assert [u.id for u in rows] == [1, 2, 3, 4]
assert all(u.email is None for u in rows)


async def test_iter_rows_keyset_paginates_through_mixed_nulls(db: DB):
"""Mixed NULL + non-NULL with NULL at a full-page boundary --
the failure mode from issue #1. With explicit NULLS FIRST and a
compound cursor, NULL rows come first and we paginate through
cleanly."""
await db.insert(User(id=1, name="u1", email=None))
await db.insert(User(id=2, name="u2", email=None))
await db.insert(User(id=3, name="u3", email="a@x"))
await db.insert(User(id=4, name="u4", email="b@x"))

rows = [u async for u in db.iter_rows_keyset(User, by="email", batch_size=2)]

assert [u.id for u in rows] == [1, 2, 3, 4]
assert [u.email for u in rows] == [None, None, "a@x", "b@x"]


async def test_iter_rows_keyset_paginates_through_tied_values(db: DB):
"""Same `email` on every row. The PK tail breaks ties so all
rows yield in id order; with the pre-fix `WHERE by > last_seen`
cursor, only the first page would be yielded."""
for i in range(1, 5):
await db.insert(User(id=i, name=f"u{i}", email="same@x"))

rows = [u async for u in db.iter_rows_keyset(User, by="email", batch_size=2)]

assert [u.id for u in rows] == [1, 2, 3, 4]


async def test_iter_rows_keyset_paginates_through_nulls_and_ties(db: DB):
"""Combination case: some NULLs, two distinct non-NULL values
each repeated. Order must be NULLs first then non-NULL groups,
tie-broken by id within each group."""
await db.insert(User(id=1, name="u1", email=None))
await db.insert(User(id=2, name="u2", email=None))
await db.insert(User(id=3, name="u3", email=None))
await db.insert(User(id=3, name="u3", email="a@x"))
await db.insert(User(id=4, name="u4", email="a@x"))
await db.insert(User(id=5, name="u5", email="b@x"))
await db.insert(User(id=6, name="u6", email="b@x"))

with pytest.raises(ValueError, match="NULL at the page boundary"):
async for _ in db.iter_rows_keyset(User, by="email", batch_size=2):
pass
rows = [u async for u in db.iter_rows_keyset(User, by="email", batch_size=2)]

assert [u.id for u in rows] == [1, 2, 3, 4, 5, 6]


async def test_iter_rows_keyset_composite_pk_tie_break(db: DB):
"""Non-PK `by` on a composite-PK model. Ties in `by` resolve via
both PK columns chained, so the cursor advances correctly when
a non-PK column duplicates across PK combinations."""
from tests._models import UserRole

await db.insert(UserRole(user_id=1, role_id=10, note="x"))
await db.insert(UserRole(user_id=1, role_id=20, note="x"))
await db.insert(UserRole(user_id=2, role_id=10, note="x"))
await db.insert(UserRole(user_id=2, role_id=20, note="y"))

rows = [u async for u in db.iter_rows_keyset(UserRole, by="note", batch_size=2)]

assert [(r.user_id, r.role_id, r.note) for r in rows] == [
(1, 10, "x"),
(1, 20, "x"),
(2, 10, "x"),
(2, 20, "y"),
]


async def test_iter_rows_keyset_filter_with_nulls_and_ties(db: DB):
"""Filter AND'd with the cursor predicate -- exercises the
parenthesization of the OR-group inside the WHERE chain."""
await db.insert(User(id=1, name="alice", email=None))
await db.insert(User(id=2, name="bob", email=None))
await db.insert(User(id=3, name="alice", email="a@x"))
await db.insert(User(id=4, name="alice", email="a@x"))
await db.insert(User(id=5, name="bob", email="a@x"))

rows = [u async for u in db.iter_rows_keyset(User, by="email", batch_size=2, name="alice")]

assert [u.id for u in rows] == [1, 3, 4]
Loading
Loading