Skip to content

Commit f397d31

Browse files
authored
Index without legacy scanning information (#67)
The previous indexing process relied on information saved during the scanning/transformation process. While I could still do it that way I'd like to make these steps independent so they can be better distributed in the future. While only postgres was indexed there's really no harm to also indexing duckdb. I've also realized that the ldlite system generated columns like __id and __o are almost never used in queries so they do not need to be indexed. At some point in the past the unit tests for indexing were accidentally removed and they're restored in this PR.
1 parent 493b5fc commit f397d31

4 files changed

Lines changed: 148 additions & 68 deletions

File tree

src/ldlite/__init__.py

Lines changed: 44 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,6 @@ def query( # noqa: C901, PLR0912, PLR0913, PLR0915
337337
records,
338338
desc="downloading",
339339
total=total_records,
340-
leave=False,
341340
mininterval=5,
342341
disable=self._quiet,
343342
unit=table.split(".")[-1],
@@ -354,7 +353,11 @@ def query( # noqa: C901, PLR0912, PLR0913, PLR0915
354353
newtables = self._database.expand_prefix(table, json_depth, keep_raw)
355354
if keep_raw:
356355
newtables = [table, *newtables]
357-
indexable_attrs = []
356+
transform_elapsed = datetime.now(timezone.utc) - transform_started
357+
358+
with tqdm(desc="indexing", disable=self._quiet) as progress:
359+
index_started = datetime.now(timezone.utc)
360+
self._database.index_prefix(table, progress)
358361

359362
else:
360363
try:
@@ -392,40 +395,46 @@ def query( # noqa: C901, PLR0912, PLR0913, PLR0915
392395
finally:
393396
autocommit(self.db, self.dbtype, True)
394397

395-
transform_elapsed = datetime.now(timezone.utc) - transform_started
396-
# Create indexes on id columns (for postgres)
397-
index_started = datetime.now(timezone.utc)
398-
if self.dbtype == DBType.POSTGRES:
399-
400-
class PbarNoop:
401-
def update(self, _: int) -> None: ...
402-
def close(self) -> None: ...
403-
404-
pbar: tqdm | PbarNoop = PbarNoop() # type:ignore[type-arg]
405-
406-
index_total = len(indexable_attrs)
407-
if not self._quiet:
408-
pbar = tqdm(
409-
desc="indexing",
410-
total=index_total,
411-
leave=False,
412-
mininterval=3,
413-
smoothing=0,
414-
colour="#A9A9A9",
415-
bar_format="{desc} {bar}{postfix}",
416-
)
417-
for t, attr in indexable_attrs:
418-
cur = self.db.cursor()
419-
try:
420-
cur.execute(
421-
"CREATE INDEX ON " + sqlid(t) + " (" + sqlid(attr.name) + ")",
398+
transform_elapsed = datetime.now(timezone.utc) - transform_started
399+
400+
# Create indexes on id columns (for postgres)
401+
index_started = datetime.now(timezone.utc)
402+
if self.dbtype == DBType.POSTGRES:
403+
404+
class PbarNoop:
405+
def update(self, _: int) -> None: ...
406+
def close(self) -> None: ...
407+
408+
pbar: tqdm | PbarNoop = PbarNoop() # type:ignore[type-arg]
409+
410+
index_total = len(indexable_attrs)
411+
if not self._quiet:
412+
pbar = tqdm(
413+
desc="indexing",
414+
total=index_total,
415+
leave=False,
416+
mininterval=3,
417+
smoothing=0,
418+
colour="#A9A9A9",
419+
bar_format="{desc} {bar}{postfix}",
422420
)
423-
except (RuntimeError, psycopg.Error):
424-
pass
425-
finally:
426-
cur.close()
427-
pbar.update(1)
428-
pbar.close()
421+
for t, attr in indexable_attrs:
422+
cur = self.db.cursor()
423+
try:
424+
cur.execute(
425+
"CREATE INDEX ON "
426+
+ sqlid(t)
427+
+ " ("
428+
+ sqlid(attr.name)
429+
+ ")",
430+
)
431+
except (RuntimeError, psycopg.Error):
432+
pass
433+
finally:
434+
cur.close()
435+
pbar.update(1)
436+
pbar.close()
437+
429438
index_elapsed = datetime.now(timezone.utc) - index_started
430439
self._database.record_history(
431440
LoadHistory(

src/ldlite/database/__init__.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,16 @@
11
"""A module for implementing ldlite database targets."""
22

3-
import datetime
3+
from __future__ import annotations
4+
45
from abc import ABC, abstractmethod
5-
from collections.abc import Iterator
66
from dataclasses import dataclass
7+
from typing import TYPE_CHECKING, NoReturn
8+
9+
if TYPE_CHECKING:
10+
import datetime
11+
from collections.abc import Iterator
12+
13+
from tqdm import tqdm
714

815

916
@dataclass(frozen=True)
@@ -50,6 +57,10 @@ def ingest_records(self, prefix: str, records: Iterator[bytes]) -> int:
5057
def expand_prefix(self, prefix: str, json_depth: int, keep_raw: bool) -> list[str]:
5158
"""Unnests and explodes the raw data at the given prefix."""
5259

60+
@abstractmethod
61+
def index_prefix(self, prefix: str, progress: tqdm[NoReturn] | None = None) -> None:
62+
"""Finds and indexes all tables at the given prefix."""
63+
5364
@abstractmethod
5465
def record_history(self, history: LoadHistory) -> None:
5566
"""Records the statistics and history of a single ldlite operation."""

src/ldlite/database/_typed_database.py

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
# pyright: reportArgumentType=false
2+
from __future__ import annotations
3+
24
from abc import abstractmethod
3-
from collections.abc import Callable, Sequence
45
from contextlib import closing
56
from datetime import timezone
6-
from typing import TYPE_CHECKING, Any, Generic, TypeVar, cast
7+
from typing import TYPE_CHECKING, Any, Generic, NoReturn, TypeVar, cast
8+
from uuid import uuid4
79

810
import psycopg
911
from psycopg import sql
@@ -13,7 +15,10 @@
1315
from ._prefix import Prefix
1416

1517
if TYPE_CHECKING:
18+
from collections.abc import Callable, Sequence
19+
1620
import duckdb
21+
from tqdm import tqdm
1722

1823

1924
DB = TypeVar("DB", bound="duckdb.DuckDBPyConnection | psycopg.Connection")
@@ -247,6 +252,63 @@ def expand_prefix(self, prefix: str, json_depth: int, keep_raw: bool) -> list[st
247252

248253
return created_tables
249254

255+
def index_prefix(self, prefix: str, progress: tqdm[NoReturn] | None = None) -> None:
256+
pfx = Prefix(prefix)
257+
with closing(self._conn_factory()) as conn:
258+
with closing(conn.cursor()) as cur:
259+
cur.execute(
260+
"""
261+
SELECT table_name FROM information_schema.tables
262+
WHERE table_schema = $1 and table_name = $2;""",
263+
(
264+
pfx.schema or self._default_schema,
265+
pfx.catalog_table.name,
266+
),
267+
)
268+
if len(cur.fetchall()) < 1:
269+
return
270+
271+
with closing(conn.cursor()) as cur:
272+
cur.execute(
273+
sql.SQL(
274+
r"""
275+
SELECT TABLE_NAME, COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS
276+
WHERE
277+
TABLE_SCHEMA = $1 AND
278+
TABLE_NAME IN (SELECT TABLE_NAME FROM {catalog}) AND
279+
(
280+
DATA_TYPE IN ('UUID', 'uuid') OR
281+
COLUMN_NAME = 'id' OR
282+
(COLUMN_NAME LIKE '%\_id' AND COLUMN_NAME <> '__id')
283+
);
284+
""",
285+
)
286+
.format(catalog=pfx.catalog_table.id)
287+
.as_string(),
288+
(pfx.schema or self._default_schema,),
289+
)
290+
indexes = cur.fetchall()
291+
292+
if progress is not None:
293+
progress.total = len(indexes)
294+
progress.refresh()
295+
296+
for index in indexes:
297+
with closing(conn.cursor()) as cur:
298+
cur.execute(
299+
sql.SQL("CREATE INDEX {name} ON {table} ({column});")
300+
.format(
301+
name=sql.Identifier(str(uuid4()).split("-")[0]),
302+
table=sql.Identifier(*index[0].split(".")),
303+
column=sql.Identifier(index[1]),
304+
)
305+
.as_string(),
306+
)
307+
if progress is not None:
308+
progress.update(1)
309+
310+
conn.commit()
311+
250312
def record_history(self, history: LoadHistory) -> None:
251313
with closing(self._conn_factory()) as conn, conn.cursor() as cur:
252314
cur.execute(

tests/test_query.py

Lines changed: 27 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,6 @@ def case_one_table(json_depth: int) -> QueryTC:
5151
"prefix__tcatalog": (["table_name"], [("prefix__t",)]),
5252
},
5353
expected_indexes=[
54-
("prefix", "__id"),
55-
("prefix__t", "__id"),
5654
("prefix__t", "id"),
5755
],
5856
)
@@ -115,12 +113,8 @@ def case_two_tables(json_depth: int) -> QueryTC:
115113
),
116114
},
117115
expected_indexes=[
118-
("prefix", "__id"),
119-
("prefix__t", "__id"),
120116
("prefix__t", "id"),
121-
("prefix__t__sub_objects", "__id"),
122117
("prefix__t__sub_objects", "id"),
123-
("prefix__t__sub_objects", "sub_objects__o"),
124118
("prefix__t__sub_objects", "sub_objects__id"),
125119
],
126120
)
@@ -267,21 +261,11 @@ def case_three_tables(json_depth: int) -> QueryTC:
267261
),
268262
},
269263
expected_indexes=[
270-
("prefix", "__id"),
271-
("prefix__t", "__id"),
272264
("prefix__t", "id"),
273-
("prefix__t__sub_objects", "__id"),
274265
("prefix__t__sub_objects", "id"),
275-
("prefix__t__sub_objects", "sub_objects__o"),
276266
("prefix__t__sub_objects", "sub_objects__id"),
277-
("prefix__t__sub_objects__sub_sub_objects", "__id"),
278267
("prefix__t__sub_objects__sub_sub_objects", "id"),
279-
("prefix__t__sub_objects__sub_sub_objects", "sub_objects__o"),
280268
("prefix__t__sub_objects__sub_sub_objects", "sub_objects__id"),
281-
(
282-
"prefix__t__sub_objects__sub_sub_objects",
283-
"sub_objects__sub_sub_objects__o",
284-
),
285269
(
286270
"prefix__t__sub_objects__sub_sub_objects",
287271
"sub_objects__sub_sub_objects__id",
@@ -327,8 +311,6 @@ def case_nested_object() -> QueryTC:
327311
),
328312
},
329313
expected_indexes=[
330-
("prefix", "__id"),
331-
("prefix__t", "__id"),
332314
("prefix__t", "id"),
333315
("prefix__t", "sub_object__id"),
334316
],
@@ -383,8 +365,6 @@ def case_doubly_nested_object() -> QueryTC:
383365
),
384366
},
385367
expected_indexes=[
386-
("prefix", "__id"),
387-
("prefix__t", "__id"),
388368
("prefix__t", "id"),
389369
("prefix__t", "sub_object__id"),
390370
("prefix__t", "sub_object__sub_sub_object__id"),
@@ -632,8 +612,6 @@ def case_indexing_id_like() -> QueryTC:
632612
],
633613
expected_values={},
634614
expected_indexes=[
635-
("prefix", "__id"),
636-
("prefix__t", "__id"),
637615
("prefix__t", "id"),
638616
("prefix__t", "other_id"),
639617
("prefix__t", "an_id_but_with_a_different_ending"),
@@ -666,7 +644,6 @@ def case_drop_raw(json_depth: int) -> QueryTC:
666644
"prefix__tcatalog": (["table_name"], [("prefix__t",)]),
667645
},
668646
expected_indexes=[
669-
("prefix__t", "__id"),
670647
("prefix__t", "id"),
671648
],
672649
)
@@ -692,8 +669,6 @@ def case_null_records() -> QueryTC:
692669
expected_tables=["prefix", "prefix__t", "prefix__tcatalog"],
693670
expected_values={},
694671
expected_indexes=[
695-
("prefix", "__id"),
696-
("prefix__t", "__id"),
697672
("prefix__t", "id"),
698673
],
699674
)
@@ -727,8 +702,6 @@ def case_erm_keys() -> QueryTC:
727702
"prefix__tcatalog": (["table_name"], [("prefix__t",)]),
728703
},
729704
expected_indexes=[
730-
("prefix", "__id"),
731-
("prefix__t", "__id"),
732705
("prefix__t", "id"),
733706
],
734707
)
@@ -790,7 +763,7 @@ def _assert(
790763
.as_string(),
791764
)
792765
for v in values:
793-
assert cur.fetchone() == v
766+
assert cur.fetchone() == v, str(v)
794767

795768
assert cur.fetchone() is None
796769

@@ -804,7 +777,32 @@ def _assert(
804777
""",
805778
exp,
806779
)
807-
assert cur.fetchone() == (0,)
780+
assert cur.fetchone() == (0,), str(exp)
781+
782+
if tc.expected_indexes is not None:
783+
for exp in tc.expected_indexes:
784+
cur.execute(
785+
"""
786+
SELECT * FROM pg_indexes
787+
WHERE tablename = $1 and indexdef LIKE $2;
788+
""",
789+
(exp[0], "%" + exp[1] + "%"),
790+
)
791+
assert cur.fetchone() is not None, str(exp)
792+
793+
indexed_tables = {exp[0] for exp in tc.expected_indexes}
794+
where = ",".join([f"${n}" for n in range(1, len(indexed_tables) + 1)])
795+
cur.execute(
796+
f"""
797+
SELECT tablename, indexdef FROM pg_indexes
798+
WHERE tablename IN ({where})
799+
ORDER BY tablename;
800+
""",
801+
list(indexed_tables),
802+
)
803+
actual_indexes = cur.fetchall()
804+
expected_indexes = tc.expected_indexes
805+
assert len(actual_indexes) == len(expected_indexes)
808806

809807

810808
@mock.patch("httpx_folio.auth.httpx.post")

0 commit comments

Comments
 (0)