From fba61678c482b1d208559b959991a6c77aff447a Mon Sep 17 00:00:00 2001 From: Mo Date: Wed, 25 Feb 2026 15:56:39 +0100 Subject: [PATCH 1/8] opening datasets via xarray in data-loader is relying on prism --- Makefile | 6 +-- freva-data-portal-worker/data-loader | 3 +- freva-data-portal-worker/pyproject.toml | 1 + .../backends/posix_and_cloud.py | 52 ++----------------- tests/data_portal/test_load_utils.py | 12 ----- 5 files changed, 9 insertions(+), 65 deletions(-) diff --git a/Makefile b/Makefile index 6efc72d6..dcfc64af 100644 --- a/Makefile +++ b/Makefile @@ -2,12 +2,12 @@ all: install install: prepare - python3 -m pip install -e ./freva-rest[dev,tests] -e ./freva-client -e ./freva-data-portal-worker[full] + python -m pip install -e ./freva-rest[dev,tests] -e ./freva-client -e ./freva-data-portal-worker[full] prepare: - python3 -m pip install cryptography tox + python -m pip install cryptography tox mkdir -p dev-env/certs - python3 dev-env/config/dev-utils.py gen-certs + python dev-env/config/dev-utils.py gen-certs coverage: python -m pytest -vv \ diff --git a/freva-data-portal-worker/data-loader b/freva-data-portal-worker/data-loader index 6fc35150..dc5c05cd 100644 --- a/freva-data-portal-worker/data-loader +++ b/freva-data-portal-worker/data-loader @@ -26,7 +26,8 @@ jq \ mamba \ netcdf4 \ rasterio \ -zarr" +zarr \ +xarray-prism" # Generate a checksum of the package list diff --git a/freva-data-portal-worker/pyproject.toml b/freva-data-portal-worker/pyproject.toml index 05f2fee6..9178f679 100644 --- a/freva-data-portal-worker/pyproject.toml +++ b/freva-data-portal-worker/pyproject.toml @@ -28,6 +28,7 @@ dependencies = [ "bokeh", "cloudpickle", "dask[distributed,diagnostics]", +"xarray-prism", "h5netcdf", "jupyter-server-proxy", "packaging", diff --git a/freva-data-portal-worker/src/data_portal_worker/backends/posix_and_cloud.py b/freva-data-portal-worker/src/data_portal_worker/backends/posix_and_cloud.py index 9ab7233e..f50b0543 100644 --- a/freva-data-portal-worker/src/data_portal_worker/backends/posix_and_cloud.py +++ b/freva-data-portal-worker/src/data_portal_worker/backends/posix_and_cloud.py @@ -1,70 +1,24 @@ """Load data.""" from pathlib import Path -from typing import Any, Optional, Union +from typing import Any, Union from urllib.parse import urlparse -import h5netcdf -import netCDF4 -import rasterio import xarray as xr -import zarr - -from data_portal_worker.utils import data_logger - -try: - import cfgrib -except (ImportError, RuntimeError): # pragma: no cover - data_logger.warning("Could not import cfgrib, loading GRB files is disabled") - - -def get_xr_engine(file_path: str) -> Optional[str]: - """Get the engine, to open the xarray dataset.""" - try: - with netCDF4.Dataset(file_path, mode="r"): - return "netcdf4" - except Exception: - pass - - try: - with cfgrib.open_file(file_path): - return "cfgrib" # pragma: no cover - except Exception: - pass - try: - _ = zarr.open(file_path, mode="r") - return "zarr" - except Exception: - pass - - try: - with rasterio.open(file_path, mode="r"): - return "rasterio" - except Exception: - pass - - try: - with h5netcdf.File(file_path, mode="r"): - return "h5netcdf" - except Exception: - pass - - return None def posix_and_cloud( inp_file: Union[str, Path], chunk_size: float = 16.0, **kwargs: Any ) -> xr.Dataset: """Open a dataset with xarray.""" + engine = "prism" inp_str = str(inp_file) parsed = urlparse(inp_str) target: Union[str, Path] target = Path(inp_str) if parsed.scheme in ("", "file") else inp_str - engine = get_xr_engine(str(target)) _ = kwargs.pop("chunks", None) for key in ("decode_cf", "use_cftime", "cache", "decode_coords"): kwargs[key] = False + kwargs["chunks"] = "auto" kwargs["engine"] = engine - if engine != "h5netcdf": - kwargs["chunks"] = "auto" return xr.open_dataset(target, **kwargs).unify_chunks() diff --git a/tests/data_portal/test_load_utils.py b/tests/data_portal/test_load_utils.py index 6d858358..42d3d1fe 100644 --- a/tests/data_portal/test_load_utils.py +++ b/tests/data_portal/test_load_utils.py @@ -8,7 +8,6 @@ import rasterio import zarr from unittest.mock import patch -from data_portal_worker.backends.posix_and_cloud import get_xr_engine from data_portal_worker.utils import str_to_int as str_to_int2 from freva_rest.utils.base_utils import get_userinfo, str_to_int @@ -64,14 +63,3 @@ def test_get_auth_userinfo() -> None: assert out["last_name"] == "Doe" assert out["first_name"] == "Jane" - -def test_get_xr_posix_engine() -> None: - """Test the right xarray engine.""" - with TemporaryDirectory() as temp_dir: - assert get_xr_engine(create_netcdf4_file(temp_dir)) == "netcdf4" - assert get_xr_engine(create_rasterio_file(temp_dir)) == "rasterio" - assert get_xr_engine(create_zarr_file(temp_dir)) == "zarr" - with patch("h5netcdf.File"): - assert get_xr_engine("http://example.com/data.nc") == "h5netcdf" - with TemporaryDirectory() as temp_dir: - assert get_xr_engine(temp_dir) is None From d52f9bc42e3538c73a10b08ce0038043ee397cf1 Mon Sep 17 00:00:00 2001 From: Mo Date: Fri, 27 Feb 2026 11:35:27 +0100 Subject: [PATCH 2/8] add xarray-prism deps in the Dockerfile and CI test job --- .github/workflows/ci_job.yml | 1 + Dockerfile | 2 ++ Makefile | 6 +++--- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci_job.yml b/.github/workflows/ci_job.yml index a9a79a00..0141a45b 100644 --- a/.github/workflows/ci_job.yml +++ b/.github/workflows/ci_job.yml @@ -87,6 +87,7 @@ jobs: pip cfgrib xarray + xarray-prism cffi condarc: | channels: diff --git a/Dockerfile b/Dockerfile index 483b31ec..3979fa6b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -53,6 +53,7 @@ session required pam_sss.so\n\ redis-py \ requests \ watchfiles \ + xarray-prism \ zarr; \ elif [ "${CMD}" = "freva-rest-server" ];then\ PKGNAME=freva-rest && \ @@ -83,6 +84,7 @@ session required pam_sss.so\n\ uvicorn \ tomli \ xarray \ + xarray-prism \ zarr; \ else \ echo "Invalid CMD argument: $CMD" && exit 1; \ diff --git a/Makefile b/Makefile index dcfc64af..6efc72d6 100644 --- a/Makefile +++ b/Makefile @@ -2,12 +2,12 @@ all: install install: prepare - python -m pip install -e ./freva-rest[dev,tests] -e ./freva-client -e ./freva-data-portal-worker[full] + python3 -m pip install -e ./freva-rest[dev,tests] -e ./freva-client -e ./freva-data-portal-worker[full] prepare: - python -m pip install cryptography tox + python3 -m pip install cryptography tox mkdir -p dev-env/certs - python dev-env/config/dev-utils.py gen-certs + python3 dev-env/config/dev-utils.py gen-certs coverage: python -m pytest -vv \ From 12e855a48fb7c592bf87d05f0de6c0c5b1c68b36 Mon Sep 17 00:00:00 2001 From: Mo Date: Wed, 25 Mar 2026 21:47:45 +0100 Subject: [PATCH 3/8] add cache manager to take care of auto cache cleanup --- .../src/data_portal_worker/_cache_manager.py | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 freva-data-portal-worker/src/data_portal_worker/_cache_manager.py diff --git a/freva-data-portal-worker/src/data_portal_worker/_cache_manager.py b/freva-data-portal-worker/src/data_portal_worker/_cache_manager.py new file mode 100644 index 00000000..b1788781 --- /dev/null +++ b/freva-data-portal-worker/src/data_portal_worker/_cache_manager.py @@ -0,0 +1,47 @@ +"""Periodic cache cleanup for xarray-prism downloaded files.""" + +from __future__ import annotations + +import os +import time + +from .utils import data_logger + +# How often to run eviction (seconds). Default: every 6 hours. +_CLEANUP_INTERVAL = float(os.environ.get("API_CACHE_CLEANUP_INTERVAL", 6 * 3600)) + + +def run_cache_cleanup(force: bool = False) -> None: + """Evict stale/oversized xarray-prism cache files safely. + """ + try: + import xarray_prism + + xarray_prism.clear_cache() + info = xarray_prism.cache_info() + data_logger.info( + "Cache cleanup done: %d file(s), %.1f MB remaining at %s", + info["files"], + info["size_bytes"] / 1024**2, + info["path"], + ) + except Exception as error: + # housekeeping never break the data-loading daemon + # on 6-hourly cleanup attempts, only warns on failure + data_logger.warning("Cache cleanup failed (non-fatal): %s", error) + + +class CacheScheduler: + """Tracks elapsed time and fires cleanup when interval is due.""" + + def __init__(self) -> None: + # run immediately on first tick, + # then every _CLEANUP_INTERVAL + # seconds thereafter + self._last_run: float = 0.0 + + def tick(self) -> None: + """Call on every iteration of the daemon loop.""" + if time.monotonic() - self._last_run >= _CLEANUP_INTERVAL: + self._last_run = time.monotonic() + run_cache_cleanup() From 63babced20c20f38713ada9a6bfd173c86e5c1f0 Mon Sep 17 00:00:00 2001 From: Mo Date: Mon, 13 Apr 2026 12:02:23 +0200 Subject: [PATCH 4/8] fix ci --- freva-client/pyproject.toml | 2 +- .../src/data_portal_worker/backends/posix_and_cloud.py | 3 ++- tests/client/test_databrowser_cli.py | 5 ++--- tests/client/test_databrowser_py.py | 4 ++-- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/freva-client/pyproject.toml b/freva-client/pyproject.toml index 8f1624bb..438959e0 100644 --- a/freva-client/pyproject.toml +++ b/freva-client/pyproject.toml @@ -45,7 +45,7 @@ Source = "https://github.com/freva-org/freva-nextgen/" [project.optional-dependencies] dev = ["tox"] -tests = ["namegenerator"] +tests = [] [tool.flit.sdist] include = ["assets/*"] diff --git a/freva-data-portal-worker/src/data_portal_worker/backends/posix_and_cloud.py b/freva-data-portal-worker/src/data_portal_worker/backends/posix_and_cloud.py index f50b0543..08d35cbb 100644 --- a/freva-data-portal-worker/src/data_portal_worker/backends/posix_and_cloud.py +++ b/freva-data-portal-worker/src/data_portal_worker/backends/posix_and_cloud.py @@ -17,8 +17,9 @@ def posix_and_cloud( target: Union[str, Path] target = Path(inp_str) if parsed.scheme in ("", "file") else inp_str _ = kwargs.pop("chunks", None) - for key in ("decode_cf", "use_cftime", "cache", "decode_coords"): + for key in ("decode_cf", "cache", "decode_coords"): kwargs[key] = False + kwargs["decode_times"] = xr.coders.CFDatetimeCoder(use_cftime=False) kwargs["chunks"] = "auto" kwargs["engine"] = engine return xr.open_dataset(target, **kwargs).unify_chunks() diff --git a/tests/client/test_databrowser_cli.py b/tests/client/test_databrowser_cli.py index 4fcd4b06..8fbc1336 100644 --- a/tests/client/test_databrowser_cli.py +++ b/tests/client/test_databrowser_cli.py @@ -15,7 +15,6 @@ from pathlib import Path from tempfile import NamedTemporaryFile -import namegenerator from py_oidc_auth_client import Token from pytest import LogCaptureFixture from pytest_mock import MockerFixture @@ -664,8 +663,8 @@ def test_flavour_full_lifecycle( mocker.patch("freva_client.utils.choose_token_strategy").return_value = ( "use_token" ) - - flavour_name = namegenerator.gen() + from freva_rest.utils.namegenerator import generate_names + flavour_name = generate_names() res = cli_runner.invoke( app, [ diff --git a/tests/client/test_databrowser_py.py b/tests/client/test_databrowser_py.py index 580ab81b..e0580f90 100644 --- a/tests/client/test_databrowser_py.py +++ b/tests/client/test_databrowser_py.py @@ -12,7 +12,6 @@ from pathlib import Path from tempfile import TemporaryDirectory -import namegenerator import pandas as pd import pytest from py_oidc_auth_client import Token @@ -520,7 +519,8 @@ def test_flavour_lifecycle( ) -> None: """Test the full lifecycle: list, add, update, rename, delete.""" # list existing flavours - flavour_name = namegenerator.gen() + from freva_rest.utils.namegenerator import generate_names + flavour_name = generate_names() mocker.patch("freva_client.utils.choose_token_strategy").return_value = ( "use_token" ) From f41c59777d22a67f01e874308389582b412d5513 Mon Sep 17 00:00:00 2001 From: Mo Date: Mon, 13 Apr 2026 12:34:48 +0200 Subject: [PATCH 5/8] add test for the cache_manager --- tests/data_portal/test_cache_manager.py | 84 +++++++++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 tests/data_portal/test_cache_manager.py diff --git a/tests/data_portal/test_cache_manager.py b/tests/data_portal/test_cache_manager.py new file mode 100644 index 00000000..fec7135d --- /dev/null +++ b/tests/data_portal/test_cache_manager.py @@ -0,0 +1,84 @@ +"""tests for _cache_manager.""" + +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +import pytest +import data_portal_worker._cache_manager as cm + + +def test_run_cache_cleanup_happy_path() -> None: + """Successful cleanup logs info and does not raise.""" + mock_prism = MagicMock() + mock_prism.cache_info.return_value = { + "files": 3, + "size_bytes": 1024 ** 2 * 8, + "path": "/tmp/prism_cache", + } + with patch.dict("sys.modules", {"xarray_prism": mock_prism}): + cm.run_cache_cleanup() + + mock_prism.clear_cache.assert_called_once() + mock_prism.cache_info.assert_called_once() + + +def test_run_cache_cleanup_logs_warning_on_error() -> None: + """Any exception inside cleanup is caught and logged as a warning.""" + mock_prism = MagicMock() + mock_prism.clear_cache.side_effect = RuntimeError("disk full") + + with patch.dict("sys.modules", {"xarray_prism": mock_prism}): + with patch.object(cm.data_logger, "warning") as mock_warn: + cm.run_cache_cleanup() + + mock_warn.assert_called_once() + assert "Cache cleanup failed" in mock_warn.call_args[0][0] + + + +def test_scheduler_fires_immediately_on_first_tick(monkeypatch: pytest.MonkeyPatch) -> None: + """tick() should call run_cache_cleanup on the very first call.""" + calls: list[int] = [] + monkeypatch.setattr(cm, "run_cache_cleanup", lambda: calls.append(1)) + scheduler = cm.CacheScheduler() + scheduler.tick() + assert calls == [1] + + +def test_scheduler_does_not_fire_twice_within_interval( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """A second tick() shortly after the first should not fire again.""" + calls: list[int] = [] + monkeypatch.setattr(cm, "run_cache_cleanup", lambda: calls.append(1)) + scheduler = cm.CacheScheduler() + scheduler.tick() # fires + scheduler.tick() # too soon — must not fire + assert len(calls) == 1 + + +def test_scheduler_fires_again_after_interval( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """tick() fires again once the interval has elapsed.""" + calls: list[int] = [] + monkeypatch.setattr(cm, "run_cache_cleanup", lambda: calls.append(1)) + + scheduler = cm.CacheScheduler() + # fires immediately (_last_run was 0.0) + scheduler.tick() + # rewind _last_run to simulate elapsed interval without sleep + # and without touching the global _CLEANUP_INTERVAL + scheduler._last_run = 0.0 + # fires again + scheduler.tick() + assert len(calls) == 2 + + +def test_cleanup_interval_env_var(monkeypatch: pytest.MonkeyPatch) -> None: + """_CLEANUP_INTERVAL should reflect API_CACHE_CLEANUP_INTERVAL.""" + monkeypatch.setenv("API_CACHE_CLEANUP_INTERVAL", "42") + import importlib + importlib.reload(cm) + assert cm._CLEANUP_INTERVAL == 42.0 From 0b2f3ac64cf1401f6d8b29ba3bb96a2347331d71 Mon Sep 17 00:00:00 2001 From: Mo Date: Mon, 13 Apr 2026 13:02:07 +0200 Subject: [PATCH 6/8] change the last_run to inf since on CI a fresh docker starts to take the test and uptime is definitely less than 6 hours --- tests/data_portal/test_cache_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/data_portal/test_cache_manager.py b/tests/data_portal/test_cache_manager.py index fec7135d..1286018a 100644 --- a/tests/data_portal/test_cache_manager.py +++ b/tests/data_portal/test_cache_manager.py @@ -70,7 +70,7 @@ def test_scheduler_fires_again_after_interval( scheduler.tick() # rewind _last_run to simulate elapsed interval without sleep # and without touching the global _CLEANUP_INTERVAL - scheduler._last_run = 0.0 + scheduler._last_run = float("-inf") # fires again scheduler.tick() assert len(calls) == 2 From 96a166bfb1434a2702f82919ceded0de0ff79e53 Mon Sep 17 00:00:00 2001 From: Mo Date: Mon, 13 Apr 2026 15:08:11 +0200 Subject: [PATCH 7/8] test: always set the last_run as inf to be able to make the test work on a fresh docker instance --- tests/data_portal/test_cache_manager.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/data_portal/test_cache_manager.py b/tests/data_portal/test_cache_manager.py index 1286018a..c3a45e2d 100644 --- a/tests/data_portal/test_cache_manager.py +++ b/tests/data_portal/test_cache_manager.py @@ -42,6 +42,7 @@ def test_scheduler_fires_immediately_on_first_tick(monkeypatch: pytest.MonkeyPat calls: list[int] = [] monkeypatch.setattr(cm, "run_cache_cleanup", lambda: calls.append(1)) scheduler = cm.CacheScheduler() + scheduler._last_run = float("-inf") scheduler.tick() assert calls == [1] @@ -53,8 +54,10 @@ def test_scheduler_does_not_fire_twice_within_interval( calls: list[int] = [] monkeypatch.setattr(cm, "run_cache_cleanup", lambda: calls.append(1)) scheduler = cm.CacheScheduler() - scheduler.tick() # fires - scheduler.tick() # too soon — must not fire + scheduler._last_run = float("-inf") + scheduler.tick() + # too soon + scheduler.tick() assert len(calls) == 1 @@ -66,12 +69,9 @@ def test_scheduler_fires_again_after_interval( monkeypatch.setattr(cm, "run_cache_cleanup", lambda: calls.append(1)) scheduler = cm.CacheScheduler() - # fires immediately (_last_run was 0.0) + scheduler._last_run = float("-inf") scheduler.tick() - # rewind _last_run to simulate elapsed interval without sleep - # and without touching the global _CLEANUP_INTERVAL scheduler._last_run = float("-inf") - # fires again scheduler.tick() assert len(calls) == 2 From 19d2079ee1f651718cdbd91c9ad09d8484412269 Mon Sep 17 00:00:00 2001 From: Mo Date: Mon, 13 Apr 2026 16:15:11 +0200 Subject: [PATCH 8/8] never deal with time and never decode it, since you are going to deal with so many different dataset, with different time scales --- .../src/data_portal_worker/backends/posix_and_cloud.py | 1 - 1 file changed, 1 deletion(-) diff --git a/freva-data-portal-worker/src/data_portal_worker/backends/posix_and_cloud.py b/freva-data-portal-worker/src/data_portal_worker/backends/posix_and_cloud.py index 08d35cbb..29bbd958 100644 --- a/freva-data-portal-worker/src/data_portal_worker/backends/posix_and_cloud.py +++ b/freva-data-portal-worker/src/data_portal_worker/backends/posix_and_cloud.py @@ -19,7 +19,6 @@ def posix_and_cloud( _ = kwargs.pop("chunks", None) for key in ("decode_cf", "cache", "decode_coords"): kwargs[key] = False - kwargs["decode_times"] = xr.coders.CFDatetimeCoder(use_cftime=False) kwargs["chunks"] = "auto" kwargs["engine"] = engine return xr.open_dataset(target, **kwargs).unify_chunks()