Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,55 @@ def load_dataset(path: str | list[str], **kwargs) -> xr.Dataset:

return ds
```

## Intake Integration

`mlwp-data-loaders` ships an [Intake](https://intake.readthedocs.io/) driver that wraps any mlwp loader module, making datasets discoverable via Intake catalogs.

Install with Intake support:

```bash
uv pip install mlwp-data-loaders[intake]
```

### Using the Intake driver

Point an Intake catalog at any loader module (bundled or custom):

```yaml
# catalog.yaml
sources:
my_data:
driver: mlwp_loader
args:
dataset_path: /path/to/data.zarr
loader: mlwp_data_loaders.loaders.anemoi.anemoi_datasets
```

```python
import intake
cat = intake.open_catalog("catalog.yaml")
ds = cat["my_data"].read() # -> xr.Dataset
```

Both `read()` and `to_dask()` delegate to the same `load_dataset()` call with the `chunks` value from the catalog entry. If you want dask-backed arrays from `to_dask()`, set `chunks: auto` (or an explicit chunk spec) in the catalog args.

### Test datasets catalog

The repository includes a catalog with the datasets used in the test suite,
structured by loader type:

```python
import intake
cat = intake.open_catalog("tests/catalog/test_datasets.yaml")
ds = cat["anemoi_datasets"]["cerra_sample"].read()
ds = cat["harp_obstable"]["observation_table"].read()
```

Available entries:

| Access path | Loader | Description |
|-------------|--------|-------------|
| `cat["anemoi_datasets"]["cerra_sample"]` | `anemoi_datasets` | CERRA sample Zarr on EWC object store |
| `cat["anemoi_inference"]["..."]` | `anemoi_inference` | (no test datasets yet) |
| `cat["harp_obstable"]["observation_table"]` | `harp.obstable` | HARP SQLite observation table |
8 changes: 8 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dependencies = [
"zarr>2,<3",
"h5netcdf",
"mlwp-data-specs",
"dask>=2026.3.0",
]

[tool.isort]
Expand All @@ -26,6 +27,13 @@ test = [
"pytest>=8.0.0",
"certifi>=2021.10.8",
]
intake = [
"intake>=0.7",
"jinja2",
]

[project.entry-points."intake.drivers"]
mlwp_loader = "mlwp_data_loaders.intake_driver:MLWPLoaderDriver"

[project.scripts]
"mlwp.load_and_validate_dataset" = "mlwp_data_loaders.cli:main"
Expand Down
70 changes: 70 additions & 0 deletions src/mlwp_data_loaders/intake_driver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""Intake driver that loads datasets via mlwp-data-loaders loader modules."""

from __future__ import annotations

from typing import Any

import xarray as xr
from intake.source.base import DataSource, Schema

from mlwp_data_loaders.core import get_loader_func


class MLWPLoaderDriver(DataSource):
container = "xarray"
name = "mlwp_loader"
version = "0.1.0"
partition_access = False

def __init__(
self,
dataset_path: str | list[str],
loader: str,
chunks: str | dict | None = None,
variables: str | list[str] | None = None,
storage_options: dict[str, Any] | None = None,
metadata: dict[str, Any] | None = None,
**kwargs: Any,
):
self._dataset_path = dataset_path
self._loader_str = loader
self._chunks = chunks
self._variables = variables
self._storage_options = storage_options or {}
self._kwargs = kwargs
self._ds: xr.Dataset | None = None
super().__init__(metadata=metadata)

def _load(self) -> xr.Dataset:
if self._ds is None:
loader_func = get_loader_func(self._loader_str)
loader_kwargs: dict[str, Any] = dict(self._kwargs)
loader_kwargs["chunks"] = self._chunks
if self._variables is not None:
loader_kwargs["variables"] = self._variables
if self._storage_options:
loader_kwargs["storage_options"] = self._storage_options
self._ds = loader_func(self._dataset_path, **loader_kwargs)
return self._ds

def _get_schema(self) -> Schema:
ds = self._load()
data_vars = {k: str(v.dtype) for k, v in ds.data_vars.items()}
return Schema(
datashape=dict(ds.sizes),
dtype=data_vars,
shape=None,
npartitions=1,
metadata=dict(ds.attrs),
extra_metadata={},
)

def read(self, **kwargs: Any) -> xr.Dataset:
return self._load()

def to_dask(self, **kwargs: Any) -> xr.Dataset:
return self._load()

def _close(self) -> None:
self._ds = None
self._schema = None
29 changes: 29 additions & 0 deletions src/mlwp_data_loaders/loaders/harp/obstable.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@

import sqlite3

import fsspec
import pandas as pd
import xarray as xr
from loguru import logger
from mlwp_data_specs.api import (
SPACE_TRAIT_ATTR,
TIME_TRAIT_ATTR,
Expand All @@ -21,6 +23,31 @@
}


def _resolve_path(path: str) -> str:
"""Return a local file path, resolving fsspec-protocol URIs if needed.

``sqlite3.connect()`` requires a local file path and cannot open
HTTP URLs directly. We use fsspec to resolve protocols like
``simplecache::`` (which downloads remote files to a local cache)
before handing the path to sqlite3.
"""
with fsspec.open(path, "rb") as f:
resolved = getattr(f, "name", path)

if resolved.startswith(("http://", "https://")):
logger.error(
"Cannot open remote path with sqlite3. "
"Prefix the path with 'simplecache::' to cache it locally, "
"or download the file manually."
)
raise ValueError(
f"Cannot open remote path {path!r} with sqlite3. "
f"Prefix with 'simplecache::' or download locally."
)

return resolved


def load_dataset(
paths: str | list[str], variables: list[str] | None = None, **kwargs
) -> xr.Dataset:
Expand Down Expand Up @@ -50,6 +77,8 @@ def load_dataset(
else:
path = paths

path = _resolve_path(path)

# Connect to the sqlite file
conn = sqlite3.connect(path)

Expand Down
15 changes: 15 additions & 0 deletions tests/catalog/anemoi_datasets.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
sources:
cerra_sample:
description: >
Small CERRA sample dataset stored on EWC (European Weather Cloud)
S3-compatible object store, used for testing anemoi-datasets loading.
driver: mlwp_loader
args:
dataset_path: s3://mlwp-sample-datasets/anemoi-datasets/cerra-rr-an-oper-0001-mars-5p5km-2017-2017-6h-v3-testing.zarr/
loader: mlwp_data_loaders.loaders.anemoi.anemoi_datasets
storage_options:
endpoint_url: https://object-store.os-api.cci2.ecmwf.int
anon: true
chunks: null
metadata:
url: https://object-store.os-api.cci2.ecmwf.int
1 change: 1 addition & 0 deletions tests/catalog/anemoi_inference.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sources: {}
11 changes: 11 additions & 0 deletions tests/catalog/harp_obstable.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
sources:
observation_table:
description: >
HARP SQLite observation table test dataset from the harpData repository.
driver: mlwp_loader
args:
dataset_path: "simplecache::https://raw.githubusercontent.com/harphub/harpData/master/inst/OBSTABLE/OBSTABLE_2019.sqlite"
loader: mlwp_data_loaders.loaders.harp.obstable
metadata:
url: https://github.com/harphub/harpData
hash: bdab991c287a41871488456d1a9d697942aa3a612800a88264defa312a9d637b
18 changes: 18 additions & 0 deletions tests/catalog/test_datasets.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
sources:
anemoi_datasets:
description: Datasets loaded by the anemoi-datasets loader
driver: yaml_file_cat
args:
path: "{{ CATALOG_DIR }}/anemoi_datasets.yaml"

anemoi_inference:
description: Datasets loaded by the anemoi-inference loader
driver: yaml_file_cat
args:
path: "{{ CATALOG_DIR }}/anemoi_inference.yaml"

harp_obstable:
description: Datasets loaded by the harp.obstable loader
driver: yaml_file_cat
args:
path: "{{ CATALOG_DIR }}/harp_obstable.yaml"
40 changes: 13 additions & 27 deletions tests/test_anemoi_datasets_integration.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,30 @@
"""Integration tests for the built-in ``anemoi-datasets`` loader."""
"""Integration tests for the built-in ``anemoi-datasets`` loader, loaded via intake."""

from __future__ import annotations

from pathlib import Path

import intake
import xarray as xr
from mlwp_data_specs.api import (
SPACE_TRAIT_ATTR,
TIME_TRAIT_ATTR,
UNCERTAINTY_TRAIT_ATTR,
)

from mlwp_data_loaders.api import load_and_validate_dataset
from mlwp_data_loaders.mxalign_api import validate_dataset_with_mxalign

# Use small CERRA sample dataset stored on EWC (European Weather Cloud)
# S3-compatible object store for testing.
DATASET_PATH = (
"s3://mlwp-sample-datasets/anemoi-datasets/"
"cerra-rr-an-oper-0001-mars-5p5km-2017-2017-6h-v3-testing.zarr/"
)
ENDPOINT_URL = "https://object-store.os-api.cci2.ecmwf.int"
LOADER = "mlwp_data_loaders.loaders.anemoi.anemoi_datasets"
HERE = Path(__file__).parent
CATALOG = HERE / "catalog" / "test_datasets.yaml"


def test_load_dataset_opens_anemoi_store_from_ewc() -> None:
"""The anemoi-datasets loader can open and validate the sample Zarr store."""
storage_options: dict[str, object] = {
"endpoint_url": ENDPOINT_URL,
"anon": True,
}

ds, report_specs = load_and_validate_dataset( # type: ignore
DATASET_PATH,
loader=LOADER,
storage_options=storage_options,
chunks=None,
return_validation_report=True,
)
"""The anemoi-datasets loader can open and validate the sample Zarr store via intake."""
cat = intake.open_catalog(str(CATALOG))
source = cat["anemoi_datasets"]["cerra_sample"]

ds = source.read()
assert isinstance(ds, xr.Dataset)

# Note: mxalign validation is temporarily kept here during early development
# to ensure `mlwp-data-specs` behaves identically. It will eventually be removed.
Expand All @@ -47,7 +37,3 @@ def test_load_dataset_opens_anemoi_store_from_ewc() -> None:
if report_mxalign.has_fails():
report_mxalign.console_print()
assert not report_mxalign.has_fails()

if report_specs.has_fails():
report_specs.console_print()
assert not report_specs.has_fails()
39 changes: 13 additions & 26 deletions tests/test_harp_obstable_integration.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,30 @@
"""Integration tests for the built-in ``harp.obstable`` loader."""
"""Integration tests for the built-in ``harp.obstable`` loader, loaded via intake."""

from __future__ import annotations

import pooch
import pytest
from pathlib import Path

import intake
import xarray as xr
from mlwp_data_specs.api import (
SPACE_TRAIT_ATTR,
TIME_TRAIT_ATTR,
UNCERTAINTY_TRAIT_ATTR,
)

from mlwp_data_loaders.api import load_and_validate_dataset
from mlwp_data_loaders.mxalign_api import validate_dataset_with_mxalign

HARP_DATA_URL = "https://raw.githubusercontent.com/harphub/harpData/master/inst/OBSTABLE/OBSTABLE_2019.sqlite"
HARP_DATA_HASH = "bdab991c287a41871488456d1a9d697942aa3a612800a88264defa312a9d637b"
LOADER = "mlwp_data_loaders.loaders.harp.obstable"
HERE = Path(__file__).parent
CATALOG = HERE / "catalog" / "test_datasets.yaml"


@pytest.fixture(scope="module")
def obstable_path() -> str:
"""Download and cache the test SQLite dataset."""
return pooch.retrieve(
url=HARP_DATA_URL,
known_hash=HARP_DATA_HASH,
)
def test_load_dataset_opens_harp_obstable() -> None:
"""The harp.obstable loader can open and validate the sample SQLite file via intake."""
cat = intake.open_catalog(str(CATALOG))
source = cat["harp_obstable"]["observation_table"]


def test_load_dataset_opens_harp_obstable(obstable_path: str) -> None:
"""The harp.obstable loader can open and validate the sample SQLite file."""
ds, report_specs = load_and_validate_dataset( # type: ignore
obstable_path,
loader=LOADER,
return_validation_report=True,
)
ds = source.read()
assert isinstance(ds, xr.Dataset)

# Note: mxalign validation is temporarily kept here during early development
# to ensure `mlwp-data-specs` behaves identically. It will eventually be removed.
Expand All @@ -46,7 +37,3 @@ def test_load_dataset_opens_harp_obstable(obstable_path: str) -> None:
if report_mxalign.has_fails():
report_mxalign.console_print()
assert not report_mxalign.has_fails()

if report_specs.has_fails():
report_specs.console_print()
assert not report_specs.has_fails()
Loading
Loading