-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathbuild_catalog.py
More file actions
145 lines (114 loc) · 4.2 KB
/
build_catalog.py
File metadata and controls
145 lines (114 loc) · 4.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
from banal import ensure_list
import httpx
import fnmatch
from typing import Annotated, Any, Generator
from anystore.io import smart_write
from anystore.logging import configure_logging
from ftmq.model import Catalog as BaseCatalog, Dataset as BaseDataset
from structlog import get_logger
import typer
class Dataset(BaseDataset):
"""Extended Dataset with aleph_url field."""
aleph_url: str | None = None
configure_logging()
log = get_logger("datasets.build_catalog")
ALEPH_API = "https://search.openaleph.org/api/2/collections"
# Global lookup for Aleph collections: {foreign_id: ui_url}
_aleph_lookup: dict[str, str] | None = None
def _fetch_aleph_collections() -> dict[str, str]:
"""Fetch all collections from Aleph with pagination and build lookup."""
lookup = {}
offset = 0
limit = 100
log.info("Fetching Aleph collections...")
while True:
params = {
"exclude:category": "casefile",
"limit": limit,
"offset": offset,
}
try:
res = httpx.get(ALEPH_API, params=params, timeout=30)
res.raise_for_status()
data = res.json()
except Exception as e:
log.error("Failed to fetch Aleph collections", error=str(e))
break
results = ensure_list(data.get("results", []))
if not results:
break
for collection in results:
foreign_id = collection.get("foreign_id")
links = collection.get("links") or {}
ui_url = links.get("ui")
if foreign_id and ui_url:
lookup[foreign_id] = ui_url
total = data.get("total", 0)
offset += limit
if offset >= total:
break
log.info("Fetched Aleph collections", count=len(lookup))
return lookup
def get_aleph_url(foreign_id: str) -> str | None:
"""Look up Aleph URL for a dataset by foreign_id."""
global _aleph_lookup
if _aleph_lookup is None:
_aleph_lookup = _fetch_aleph_collections()
return _aleph_lookup.get(foreign_id)
class Catalog(BaseCatalog):
include_datasets: list[str] = []
exclude_datasets: list[str] = []
patch_metadata: dict[str, Any] = {}
def patch_dataset(self, ds: BaseDataset) -> Dataset:
prefix = self.patch_metadata.get("dataset_prefix")
if prefix is not None and ds.name not in self.patch_metadata.get(
"dataset_prefix_ignore", []
):
if not ds.name.startswith(prefix):
ds.name = f"{prefix}_{ds.name}"
return Dataset(
**{
**ds.model_dump(),
"aleph_url": get_aleph_url(ds.name),
**self.patch_metadata,
}
)
def get_datasets(self) -> Generator[Dataset, None, None]:
for dataset in self.datasets:
if self.include_datasets and not any(
(fnmatch.fnmatch(dataset.name, m) for m in self.include_datasets)
):
continue
if self.exclude_datasets and any(
(fnmatch.fnmatch(dataset.name, m) for m in self.exclude_datasets)
):
continue
yield self.patch_dataset(dataset)
class MultiCatalog(Catalog):
include_catalogs: list[Catalog] = []
def get_datasets(self) -> Generator[Dataset, None, None]:
yield from super().get_datasets()
for catalog in self.include_catalogs:
yield from catalog.get_datasets()
def serialize(self) -> str:
import json
seen = set()
datasets = []
for dataset in self.get_datasets():
if dataset.name not in seen:
datasets.append(dataset.model_dump(mode="json"))
seen.add(dataset.name)
return json.dumps({"name": self.name, "datasets": datasets})
def main(
in_uri: Annotated[str, typer.Option("-i")] = "-",
out_uri: Annotated[str, typer.Option("-o")] = "-",
):
"""
Build a catalog from datasets metadata and write it to anywhere from stdout
(default) to any uri `anystore` can handle.
"""
catalog = MultiCatalog._from_uri(in_uri)
data = catalog.serialize()
smart_write(out_uri, data.encode())
if __name__ == "__main__":
typer.run(main)