diff --git a/.github/workflows/ci_job.yml b/.github/workflows/ci_job.yml index 4eebc7a3..2959c1cc 100644 --- a/.github/workflows/ci_job.yml +++ b/.github/workflows/ci_job.yml @@ -87,6 +87,7 @@ jobs: pip cfgrib xarray + xarray-prism cffi py-oidc-auth-fastapi condarc: | diff --git a/Dockerfile b/Dockerfile index 2fa5b072..ffab348b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -53,6 +53,7 @@ session required pam_sss.so\n\ redis-py \ requests \ watchfiles \ + xarray-prism \ zarr; \ elif [ "${CMD}" = "freva-rest-server" ];then\ PKGNAME=freva-rest && \ @@ -82,6 +83,7 @@ session required pam_sss.so\n\ uvicorn \ tomli \ xarray \ + xarray-prism \ py-oidc-auth-fastapi \ zarr; \ else \ diff --git a/freva-client/pyproject.toml b/freva-client/pyproject.toml index 8f1624bb..438959e0 100644 --- a/freva-client/pyproject.toml +++ b/freva-client/pyproject.toml @@ -45,7 +45,7 @@ Source = "https://github.com/freva-org/freva-nextgen/" [project.optional-dependencies] dev = ["tox"] -tests = ["namegenerator"] +tests = [] [tool.flit.sdist] include = ["assets/*"] diff --git a/freva-data-portal-worker/data-loader b/freva-data-portal-worker/data-loader index 6fc35150..dc5c05cd 100644 --- a/freva-data-portal-worker/data-loader +++ b/freva-data-portal-worker/data-loader @@ -26,7 +26,8 @@ jq \ mamba \ netcdf4 \ rasterio \ -zarr" +zarr \ +xarray-prism" # Generate a checksum of the package list diff --git a/freva-data-portal-worker/pyproject.toml b/freva-data-portal-worker/pyproject.toml index 05f2fee6..9178f679 100644 --- a/freva-data-portal-worker/pyproject.toml +++ b/freva-data-portal-worker/pyproject.toml @@ -28,6 +28,7 @@ dependencies = [ "bokeh", "cloudpickle", "dask[distributed,diagnostics]", +"xarray-prism", "h5netcdf", "jupyter-server-proxy", "packaging", diff --git a/freva-data-portal-worker/src/data_portal_worker/_cache_manager.py b/freva-data-portal-worker/src/data_portal_worker/_cache_manager.py new file mode 100644 index 00000000..b1788781 --- /dev/null +++ b/freva-data-portal-worker/src/data_portal_worker/_cache_manager.py @@ -0,0 +1,47 @@ +"""Periodic cache cleanup for xarray-prism downloaded files.""" + +from __future__ import annotations + +import os +import time + +from .utils import data_logger + +# How often to run eviction (seconds). Default: every 6 hours. +_CLEANUP_INTERVAL = float(os.environ.get("API_CACHE_CLEANUP_INTERVAL", 6 * 3600)) + + +def run_cache_cleanup(force: bool = False) -> None: + """Evict stale/oversized xarray-prism cache files safely. + """ + try: + import xarray_prism + + xarray_prism.clear_cache() + info = xarray_prism.cache_info() + data_logger.info( + "Cache cleanup done: %d file(s), %.1f MB remaining at %s", + info["files"], + info["size_bytes"] / 1024**2, + info["path"], + ) + except Exception as error: + # housekeeping never break the data-loading daemon + # on 6-hourly cleanup attempts, only warns on failure + data_logger.warning("Cache cleanup failed (non-fatal): %s", error) + + +class CacheScheduler: + """Tracks elapsed time and fires cleanup when interval is due.""" + + def __init__(self) -> None: + # run immediately on first tick, + # then every _CLEANUP_INTERVAL + # seconds thereafter + self._last_run: float = 0.0 + + def tick(self) -> None: + """Call on every iteration of the daemon loop.""" + if time.monotonic() - self._last_run >= _CLEANUP_INTERVAL: + self._last_run = time.monotonic() + run_cache_cleanup() diff --git a/freva-data-portal-worker/src/data_portal_worker/backends/posix_and_cloud.py b/freva-data-portal-worker/src/data_portal_worker/backends/posix_and_cloud.py index 9ab7233e..29bbd958 100644 --- a/freva-data-portal-worker/src/data_portal_worker/backends/posix_and_cloud.py +++ b/freva-data-portal-worker/src/data_portal_worker/backends/posix_and_cloud.py @@ -1,70 +1,24 @@ """Load data.""" from pathlib import Path -from typing import Any, Optional, Union +from typing import Any, Union from urllib.parse import urlparse -import h5netcdf -import netCDF4 -import rasterio import xarray as xr -import zarr - -from data_portal_worker.utils import data_logger - -try: - import cfgrib -except (ImportError, RuntimeError): # pragma: no cover - data_logger.warning("Could not import cfgrib, loading GRB files is disabled") - - -def get_xr_engine(file_path: str) -> Optional[str]: - """Get the engine, to open the xarray dataset.""" - try: - with netCDF4.Dataset(file_path, mode="r"): - return "netcdf4" - except Exception: - pass - - try: - with cfgrib.open_file(file_path): - return "cfgrib" # pragma: no cover - except Exception: - pass - try: - _ = zarr.open(file_path, mode="r") - return "zarr" - except Exception: - pass - - try: - with rasterio.open(file_path, mode="r"): - return "rasterio" - except Exception: - pass - - try: - with h5netcdf.File(file_path, mode="r"): - return "h5netcdf" - except Exception: - pass - - return None def posix_and_cloud( inp_file: Union[str, Path], chunk_size: float = 16.0, **kwargs: Any ) -> xr.Dataset: """Open a dataset with xarray.""" + engine = "prism" inp_str = str(inp_file) parsed = urlparse(inp_str) target: Union[str, Path] target = Path(inp_str) if parsed.scheme in ("", "file") else inp_str - engine = get_xr_engine(str(target)) _ = kwargs.pop("chunks", None) - for key in ("decode_cf", "use_cftime", "cache", "decode_coords"): + for key in ("decode_cf", "cache", "decode_coords"): kwargs[key] = False + kwargs["chunks"] = "auto" kwargs["engine"] = engine - if engine != "h5netcdf": - kwargs["chunks"] = "auto" return xr.open_dataset(target, **kwargs).unify_chunks() diff --git a/freva-data-portal-worker/src/data_portal_worker/load_data.py b/freva-data-portal-worker/src/data_portal_worker/load_data.py index 02970e17..942990ee 100644 --- a/freva-data-portal-worker/src/data_portal_worker/load_data.py +++ b/freva-data-portal-worker/src/data_portal_worker/load_data.py @@ -29,6 +29,7 @@ from redis.retry import Retry from xarray.backends.zarr import encode_zarr_variable +from ._cache_manager import CacheScheduler from .aggregator import DatasetAggregator, write_grouped_zarr from .backends import load_data from .rechunker import ChunkOptimizer @@ -391,12 +392,14 @@ def run_for_ever(self, channel: str) -> None: """Start the listener daemon.""" data_logger.info("Starting data-loading daemon") pubsub: Optional[PubSub] = None + cache_scheduler = CacheScheduler() data_logger.info("Broker will listen for messages now") while True: try: if pubsub is None: pubsub = self.cache.pubsub() pubsub.subscribe(channel) + cache_scheduler.tick() message = pubsub.get_message() if message and message["type"] == "message": self.redis_callback(message["data"]) diff --git a/tests/client/test_databrowser_cli.py b/tests/client/test_databrowser_cli.py index 4fcd4b06..8fbc1336 100644 --- a/tests/client/test_databrowser_cli.py +++ b/tests/client/test_databrowser_cli.py @@ -15,7 +15,6 @@ from pathlib import Path from tempfile import NamedTemporaryFile -import namegenerator from py_oidc_auth_client import Token from pytest import LogCaptureFixture from pytest_mock import MockerFixture @@ -664,8 +663,8 @@ def test_flavour_full_lifecycle( mocker.patch("freva_client.utils.choose_token_strategy").return_value = ( "use_token" ) - - flavour_name = namegenerator.gen() + from freva_rest.utils.namegenerator import generate_names + flavour_name = generate_names() res = cli_runner.invoke( app, [ diff --git a/tests/client/test_databrowser_py.py b/tests/client/test_databrowser_py.py index 580ab81b..e0580f90 100644 --- a/tests/client/test_databrowser_py.py +++ b/tests/client/test_databrowser_py.py @@ -12,7 +12,6 @@ from pathlib import Path from tempfile import TemporaryDirectory -import namegenerator import pandas as pd import pytest from py_oidc_auth_client import Token @@ -520,7 +519,8 @@ def test_flavour_lifecycle( ) -> None: """Test the full lifecycle: list, add, update, rename, delete.""" # list existing flavours - flavour_name = namegenerator.gen() + from freva_rest.utils.namegenerator import generate_names + flavour_name = generate_names() mocker.patch("freva_client.utils.choose_token_strategy").return_value = ( "use_token" ) diff --git a/tests/data_portal/test_cache_manager.py b/tests/data_portal/test_cache_manager.py new file mode 100644 index 00000000..c3a45e2d --- /dev/null +++ b/tests/data_portal/test_cache_manager.py @@ -0,0 +1,84 @@ +"""tests for _cache_manager.""" + +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +import pytest +import data_portal_worker._cache_manager as cm + + +def test_run_cache_cleanup_happy_path() -> None: + """Successful cleanup logs info and does not raise.""" + mock_prism = MagicMock() + mock_prism.cache_info.return_value = { + "files": 3, + "size_bytes": 1024 ** 2 * 8, + "path": "/tmp/prism_cache", + } + with patch.dict("sys.modules", {"xarray_prism": mock_prism}): + cm.run_cache_cleanup() + + mock_prism.clear_cache.assert_called_once() + mock_prism.cache_info.assert_called_once() + + +def test_run_cache_cleanup_logs_warning_on_error() -> None: + """Any exception inside cleanup is caught and logged as a warning.""" + mock_prism = MagicMock() + mock_prism.clear_cache.side_effect = RuntimeError("disk full") + + with patch.dict("sys.modules", {"xarray_prism": mock_prism}): + with patch.object(cm.data_logger, "warning") as mock_warn: + cm.run_cache_cleanup() + + mock_warn.assert_called_once() + assert "Cache cleanup failed" in mock_warn.call_args[0][0] + + + +def test_scheduler_fires_immediately_on_first_tick(monkeypatch: pytest.MonkeyPatch) -> None: + """tick() should call run_cache_cleanup on the very first call.""" + calls: list[int] = [] + monkeypatch.setattr(cm, "run_cache_cleanup", lambda: calls.append(1)) + scheduler = cm.CacheScheduler() + scheduler._last_run = float("-inf") + scheduler.tick() + assert calls == [1] + + +def test_scheduler_does_not_fire_twice_within_interval( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """A second tick() shortly after the first should not fire again.""" + calls: list[int] = [] + monkeypatch.setattr(cm, "run_cache_cleanup", lambda: calls.append(1)) + scheduler = cm.CacheScheduler() + scheduler._last_run = float("-inf") + scheduler.tick() + # too soon + scheduler.tick() + assert len(calls) == 1 + + +def test_scheduler_fires_again_after_interval( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """tick() fires again once the interval has elapsed.""" + calls: list[int] = [] + monkeypatch.setattr(cm, "run_cache_cleanup", lambda: calls.append(1)) + + scheduler = cm.CacheScheduler() + scheduler._last_run = float("-inf") + scheduler.tick() + scheduler._last_run = float("-inf") + scheduler.tick() + assert len(calls) == 2 + + +def test_cleanup_interval_env_var(monkeypatch: pytest.MonkeyPatch) -> None: + """_CLEANUP_INTERVAL should reflect API_CACHE_CLEANUP_INTERVAL.""" + monkeypatch.setenv("API_CACHE_CLEANUP_INTERVAL", "42") + import importlib + importlib.reload(cm) + assert cm._CLEANUP_INTERVAL == 42.0 diff --git a/tests/data_portal/test_load_utils.py b/tests/data_portal/test_load_utils.py index 6d858358..42d3d1fe 100644 --- a/tests/data_portal/test_load_utils.py +++ b/tests/data_portal/test_load_utils.py @@ -8,7 +8,6 @@ import rasterio import zarr from unittest.mock import patch -from data_portal_worker.backends.posix_and_cloud import get_xr_engine from data_portal_worker.utils import str_to_int as str_to_int2 from freva_rest.utils.base_utils import get_userinfo, str_to_int @@ -64,14 +63,3 @@ def test_get_auth_userinfo() -> None: assert out["last_name"] == "Doe" assert out["first_name"] == "Jane" - -def test_get_xr_posix_engine() -> None: - """Test the right xarray engine.""" - with TemporaryDirectory() as temp_dir: - assert get_xr_engine(create_netcdf4_file(temp_dir)) == "netcdf4" - assert get_xr_engine(create_rasterio_file(temp_dir)) == "rasterio" - assert get_xr_engine(create_zarr_file(temp_dir)) == "zarr" - with patch("h5netcdf.File"): - assert get_xr_engine("http://example.com/data.nc") == "h5netcdf" - with TemporaryDirectory() as temp_dir: - assert get_xr_engine(temp_dir) is None