Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci_job.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ jobs:
pip
cfgrib
xarray
xarray-prism
cffi
py-oidc-auth-fastapi
condarc: |
Expand Down
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ session required pam_sss.so\n\
redis-py \
requests \
watchfiles \
xarray-prism \
zarr; \
elif [ "${CMD}" = "freva-rest-server" ];then\
PKGNAME=freva-rest && \
Expand Down Expand Up @@ -82,6 +83,7 @@ session required pam_sss.so\n\
uvicorn \
tomli \
xarray \
xarray-prism \
py-oidc-auth-fastapi \
zarr; \
else \
Expand Down
2 changes: 1 addition & 1 deletion freva-client/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ Source = "https://github.com/freva-org/freva-nextgen/"

[project.optional-dependencies]
dev = ["tox"]
tests = ["namegenerator"]
tests = []

[tool.flit.sdist]
include = ["assets/*"]
Expand Down
3 changes: 2 additions & 1 deletion freva-data-portal-worker/data-loader
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ jq \
mamba \
netcdf4 \
rasterio \
zarr"
zarr \
xarray-prism"


# Generate a checksum of the package list
Expand Down
1 change: 1 addition & 0 deletions freva-data-portal-worker/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ dependencies = [
"bokeh",
"cloudpickle",
"dask[distributed,diagnostics]",
"xarray-prism",
"h5netcdf",
"jupyter-server-proxy",
"packaging",
Expand Down
47 changes: 47 additions & 0 deletions freva-data-portal-worker/src/data_portal_worker/_cache_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""Periodic cache cleanup for xarray-prism downloaded files."""

from __future__ import annotations

import os
import time

from .utils import data_logger

# How often to run eviction (seconds). Default: every 6 hours.
_CLEANUP_INTERVAL = float(os.environ.get("API_CACHE_CLEANUP_INTERVAL", 6 * 3600))


def run_cache_cleanup(force: bool = False) -> None:
"""Evict stale/oversized xarray-prism cache files safely.
"""
try:
import xarray_prism

xarray_prism.clear_cache()
info = xarray_prism.cache_info()
data_logger.info(
"Cache cleanup done: %d file(s), %.1f MB remaining at %s",
info["files"],
info["size_bytes"] / 1024**2,
info["path"],
)
except Exception as error:
# housekeeping never break the data-loading daemon
# on 6-hourly cleanup attempts, only warns on failure
data_logger.warning("Cache cleanup failed (non-fatal): %s", error)


class CacheScheduler:
"""Tracks elapsed time and fires cleanup when interval is due."""

def __init__(self) -> None:
# run immediately on first tick,
# then every _CLEANUP_INTERVAL
# seconds thereafter
self._last_run: float = 0.0

def tick(self) -> None:
"""Call on every iteration of the daemon loop."""
if time.monotonic() - self._last_run >= _CLEANUP_INTERVAL:
self._last_run = time.monotonic()
run_cache_cleanup()
Original file line number Diff line number Diff line change
@@ -1,70 +1,24 @@
"""Load data."""

from pathlib import Path
from typing import Any, Optional, Union
from typing import Any, Union
from urllib.parse import urlparse

import h5netcdf
import netCDF4
import rasterio
import xarray as xr
import zarr

from data_portal_worker.utils import data_logger

try:
import cfgrib
except (ImportError, RuntimeError): # pragma: no cover
data_logger.warning("Could not import cfgrib, loading GRB files is disabled")


def get_xr_engine(file_path: str) -> Optional[str]:
"""Get the engine, to open the xarray dataset."""
try:
with netCDF4.Dataset(file_path, mode="r"):
return "netcdf4"
except Exception:
pass

try:
with cfgrib.open_file(file_path):
return "cfgrib" # pragma: no cover
except Exception:
pass
try:
_ = zarr.open(file_path, mode="r")
return "zarr"
except Exception:
pass

try:
with rasterio.open(file_path, mode="r"):
return "rasterio"
except Exception:
pass

try:
with h5netcdf.File(file_path, mode="r"):
return "h5netcdf"
except Exception:
pass

return None


def posix_and_cloud(
inp_file: Union[str, Path], chunk_size: float = 16.0, **kwargs: Any
) -> xr.Dataset:
"""Open a dataset with xarray."""
engine = "prism"
inp_str = str(inp_file)
parsed = urlparse(inp_str)
target: Union[str, Path]
target = Path(inp_str) if parsed.scheme in ("", "file") else inp_str
engine = get_xr_engine(str(target))
_ = kwargs.pop("chunks", None)
for key in ("decode_cf", "use_cftime", "cache", "decode_coords"):
for key in ("decode_cf", "cache", "decode_coords"):
kwargs[key] = False
kwargs["chunks"] = "auto"
kwargs["engine"] = engine
if engine != "h5netcdf":
kwargs["chunks"] = "auto"
return xr.open_dataset(target, **kwargs).unify_chunks()
3 changes: 3 additions & 0 deletions freva-data-portal-worker/src/data_portal_worker/load_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from redis.retry import Retry
from xarray.backends.zarr import encode_zarr_variable

from ._cache_manager import CacheScheduler
from .aggregator import DatasetAggregator, write_grouped_zarr
from .backends import load_data
from .rechunker import ChunkOptimizer
Expand Down Expand Up @@ -391,12 +392,14 @@ def run_for_ever(self, channel: str) -> None:
"""Start the listener daemon."""
data_logger.info("Starting data-loading daemon")
pubsub: Optional[PubSub] = None
cache_scheduler = CacheScheduler()
data_logger.info("Broker will listen for messages now")
while True:
try:
if pubsub is None:
pubsub = self.cache.pubsub()
pubsub.subscribe(channel)
cache_scheduler.tick()
message = pubsub.get_message()
if message and message["type"] == "message":
self.redis_callback(message["data"])
Expand Down
5 changes: 2 additions & 3 deletions tests/client/test_databrowser_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from pathlib import Path
from tempfile import NamedTemporaryFile

import namegenerator
from py_oidc_auth_client import Token
from pytest import LogCaptureFixture
from pytest_mock import MockerFixture
Expand Down Expand Up @@ -664,8 +663,8 @@ def test_flavour_full_lifecycle(
mocker.patch("freva_client.utils.choose_token_strategy").return_value = (
"use_token"
)

flavour_name = namegenerator.gen()
from freva_rest.utils.namegenerator import generate_names
flavour_name = generate_names()
res = cli_runner.invoke(
app,
[
Expand Down
4 changes: 2 additions & 2 deletions tests/client/test_databrowser_py.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from pathlib import Path
from tempfile import TemporaryDirectory

import namegenerator
import pandas as pd
import pytest
from py_oidc_auth_client import Token
Expand Down Expand Up @@ -520,7 +519,8 @@ def test_flavour_lifecycle(
) -> None:
"""Test the full lifecycle: list, add, update, rename, delete."""
# list existing flavours
flavour_name = namegenerator.gen()
from freva_rest.utils.namegenerator import generate_names
flavour_name = generate_names()
mocker.patch("freva_client.utils.choose_token_strategy").return_value = (
"use_token"
)
Expand Down
84 changes: 84 additions & 0 deletions tests/data_portal/test_cache_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
"""tests for _cache_manager."""

from __future__ import annotations

from unittest.mock import MagicMock, patch

import pytest
import data_portal_worker._cache_manager as cm


def test_run_cache_cleanup_happy_path() -> None:
"""Successful cleanup logs info and does not raise."""
mock_prism = MagicMock()
mock_prism.cache_info.return_value = {
"files": 3,
"size_bytes": 1024 ** 2 * 8,
"path": "/tmp/prism_cache",
}
with patch.dict("sys.modules", {"xarray_prism": mock_prism}):
cm.run_cache_cleanup()

mock_prism.clear_cache.assert_called_once()
mock_prism.cache_info.assert_called_once()


def test_run_cache_cleanup_logs_warning_on_error() -> None:
"""Any exception inside cleanup is caught and logged as a warning."""
mock_prism = MagicMock()
mock_prism.clear_cache.side_effect = RuntimeError("disk full")

with patch.dict("sys.modules", {"xarray_prism": mock_prism}):
with patch.object(cm.data_logger, "warning") as mock_warn:
cm.run_cache_cleanup()

mock_warn.assert_called_once()
assert "Cache cleanup failed" in mock_warn.call_args[0][0]



def test_scheduler_fires_immediately_on_first_tick(monkeypatch: pytest.MonkeyPatch) -> None:
"""tick() should call run_cache_cleanup on the very first call."""
calls: list[int] = []
monkeypatch.setattr(cm, "run_cache_cleanup", lambda: calls.append(1))
scheduler = cm.CacheScheduler()
scheduler._last_run = float("-inf")
scheduler.tick()
assert calls == [1]


def test_scheduler_does_not_fire_twice_within_interval(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""A second tick() shortly after the first should not fire again."""
calls: list[int] = []
monkeypatch.setattr(cm, "run_cache_cleanup", lambda: calls.append(1))
scheduler = cm.CacheScheduler()
scheduler._last_run = float("-inf")
scheduler.tick()
# too soon
scheduler.tick()
assert len(calls) == 1


def test_scheduler_fires_again_after_interval(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""tick() fires again once the interval has elapsed."""
calls: list[int] = []
monkeypatch.setattr(cm, "run_cache_cleanup", lambda: calls.append(1))

scheduler = cm.CacheScheduler()
scheduler._last_run = float("-inf")
scheduler.tick()
scheduler._last_run = float("-inf")
scheduler.tick()
assert len(calls) == 2


def test_cleanup_interval_env_var(monkeypatch: pytest.MonkeyPatch) -> None:
"""_CLEANUP_INTERVAL should reflect API_CACHE_CLEANUP_INTERVAL."""
monkeypatch.setenv("API_CACHE_CLEANUP_INTERVAL", "42")
import importlib
importlib.reload(cm)
assert cm._CLEANUP_INTERVAL == 42.0
12 changes: 0 additions & 12 deletions tests/data_portal/test_load_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import rasterio
import zarr
from unittest.mock import patch
from data_portal_worker.backends.posix_and_cloud import get_xr_engine
from data_portal_worker.utils import str_to_int as str_to_int2
from freva_rest.utils.base_utils import get_userinfo, str_to_int

Expand Down Expand Up @@ -64,14 +63,3 @@ def test_get_auth_userinfo() -> None:
assert out["last_name"] == "Doe"
assert out["first_name"] == "Jane"


def test_get_xr_posix_engine() -> None:
"""Test the right xarray engine."""
with TemporaryDirectory() as temp_dir:
assert get_xr_engine(create_netcdf4_file(temp_dir)) == "netcdf4"
assert get_xr_engine(create_rasterio_file(temp_dir)) == "rasterio"
assert get_xr_engine(create_zarr_file(temp_dir)) == "zarr"
with patch("h5netcdf.File"):
assert get_xr_engine("http://example.com/data.nc") == "h5netcdf"
with TemporaryDirectory() as temp_dir:
assert get_xr_engine(temp_dir) is None
Loading