Skip to content

Add low-pass filter functionality (CDO-like lowpass) #62

@nicrie

Description

@nicrie

Is your feature request related to a problem? Please describe.

In several applications (e.g. Climate Pulse), we smooth daily climatologies by removing sub-monthly variability using a low-pass filter. AFAIK, we currently rely on CDO for this step:

cdo lowpass,12 -del29feb daily_clim_366days.nc daily_clim_365days_lowpass.nc

This introduces an extra dependency and requires switching between tools within the preprocessing pipeline.

Describe the solution you'd like

Add a CDO-compatible ideal low-pass FFT filter (cutoff in cycles per year) that handles datetime and numeric “climatology axes” (e.g., dayofyear). An example API:

from earthkit.transforms import filter as ek_filter

daily_clim_smooth = ek_filter.lowpass(
    clim, fmax=12, time_dim="dayofyear", calendar="365_day", samples_per_year=365
)

Below is a working reference implementation and reproducible examples.

Reproducible examples

Current approach with CDO
import numpy as np
import xarray as xr
from earthkit.transforms import aggregate as ek_aggregate

# Toy data
ds = xr.tutorial.open_dataset("air_temperature")
clim = ek_aggregate.climatology.daily_mean(ds)  # dims: dayofyear, lat, lon

# --- Hand-off to CDO (documented reference, not executed in Python) ---
# Prepare for CDO: rename to time and give an explicit daily datetime index (leap year)
clim.rename({"dayofyear": "time"}).assign_coords(
    time=xr.date_range("2001-01-01", "2001-12-31", freq="D")  # has 365 days actually
).to_netcdf("daily_clim_366days.nc")

# Run externally in shell:
# cdo lowpass,12 -del29feb daily_clim_366days.nc daily_clim_365days_lowpass.nc

# Load CDO result back (now 365 days)
daily_clim_smooth_cdo = xr.load_dataarray("daily_clim_365days_lowpass.nc").assign_coords(
    time=np.arange(1, 366, dtype="int32")
).rename({"time": "dayofyear"})
Python-only approach
def _is_datetime64(coord):
    return np.issubdtype(coord.dtype, np.datetime64)


def _uniform_step_numeric(coord):
    vals = coord.values.astype(np.float64)
    diffs = np.diff(vals)
    if not np.allclose(diffs, diffs[0], rtol=0, atol=1e-12):
        raise ValueError(
            "Non-uniform numeric time coordinate; resample or reindex to uniform spacing."
        )
    return diffs[0]


def _dt_years_from_coord(coord, calendar="365_day", samples_per_year=None):
    """
    Compute sampling interval (dT) in YEARS.
    - datetime64: infer from timestamps; year length set by `calendar` (365 or 360 days).
    - numeric/integer: require `samples_per_year` (e.g., 365 for dayofyear), or infer for common climatology cases.
    """
    if _is_datetime64(coord):
        t = coord.values.astype("datetime64[ns]").astype("int64")  # ns
        dt_days = np.diff(t).astype(np.float64) / (24 * 60 * 60 * 1e9)
        if not np.allclose(dt_days, dt_days[0], rtol=0, atol=1e-9):
            raise ValueError("Non-uniform datetime spacing; resample to uniform step.")
        year_days = (
            365.0
            if calendar == "365_day"
            else (360.0 if calendar == "360_day" else 365.0)
        )
        return dt_days[0] / year_days
    else:
        # Numeric/integer axis: treat each index step as 1 / samples_per_year of a year.
        if samples_per_year is None:
            # Heuristic for climatology-like axes
            N = coord.size
            if N in (360, 365, 366, 12):
                samples_per_year = N
            else:
                raise ValueError(
                    "Numeric time axis detected. Please provide samples_per_year "
                    "(e.g., 365 for dayofyear, 12 for month)."
                )
        # Verify uniform spacing (value spacing doesn't affect dt_years, but we check for sanity)
        _ = _uniform_step_numeric(coord)
        return 1.0 / float(samples_per_year)


def _lowpass_fft_core(data, dt_years, fmax, time_axis, round_up_to_bin=True):
    """
    Apply ideal low-pass in frequency domain along `time_axis`.
    Keeps |f| <= fmax (cycles/year). Uses rfft/irfft.
    """
    N = data.shape[time_axis]
    is_dask = isinstance(data, da.Array)

    rfft = da.fft.rfft if is_dask else np.fft.rfft
    irfft = da.fft.irfft if is_dask else np.fft.irfft

    spec = rfft(data, axis=time_axis)
    # Small vector; always fine to use NumPy for freqs
    freqs = np.fft.rfftfreq(N, d=dt_years)  # cycles/year

    if round_up_to_bin:
        idx = np.searchsorted(freqs, fmax, side="left")
        if idx >= len(freqs):
            fmax_eff = freqs[-1]
        else:
            fmax_eff = freqs[idx]
    else:
        fmax_eff = fmax

    mask_1d = freqs <= fmax_eff
    # Broadcast mask along the rFFT axis
    shape = [1] * spec.ndim
    shape[time_axis] = spec.shape[time_axis]
    mask = mask_1d.reshape(shape)

    spec_filt = spec * mask
    out = irfft(spec_filt, n=N, axis=time_axis)
    return out


def lowpass_xr(
    obj: xr.Dataset | xr.DataArray,
    fmax: float,
    time_dim: str = "time",
    calendar: str = "365_day",
    samples_per_year: int | float | None = None,
    round_up_to_bin: bool = True,
):
    """
    CDO-like low-pass filter with cutoff `fmax` (cycles/year).


    Parameters
    ----------
    obj : xr.Dataset or xr.DataArray
        Must include `time_dim` with uniform spacing. No missing values.
    fmax : float
        Cutoff frequency in cycles per year.
    time_dim : str
        Name of time dimension.
    calendar : {'365_day','360_day'}
        Used only for datetime coordinates to interpret 'per year'.
    samples_per_year : int|float|None
        For numeric/integer time coordinates, number of samples per year
        (e.g., 365 or 366 for dayofyear, 12 for monthly climatology).
        If None, tries to infer for common climatology sizes (360/365/366/12).
    round_up_to_bin : bool
        Mimic CDO behavior: fmax rounded up to the next available FFT bin.

    Returns
    -------
    xr.Dataset or xr.DataArray
        Same shape and coordinates as input, filtered.
    """
    if time_dim not in obj.dims:
        raise ValueError(f"time dimension '{time_dim}' not found.")

    # Ensure one chunk along time for FFT; keep spatial dims chunked
    if hasattr(obj, "chunks") and obj.chunks is not None:
        obj = obj.chunk({d: (-1 if d == time_dim else "auto") for d in obj.dims})

    # Determine dT in years
    dt_years = _dt_years_from_coord(
        obj[time_dim], calendar=calendar, samples_per_year=samples_per_year
    )

    def _filter_da(da: xr.DataArray):
        data = da.data
        time_axis = da.get_axis_num(time_dim)

        filt = _lowpass_fft_core(
            data,
            dt_years=dt_years,
            fmax=fmax,
            time_axis=time_axis,
            round_up_to_bin=round_up_to_bin,
        )
        return xr.DataArray(
            filt, dims=da.dims, coords=da.coords, name=da.name, attrs=da.attrs
        ).astype(da.dtype)

    if isinstance(obj, xr.DataArray):
        return _filter_da(obj)
    elif isinstance(obj, xr.Dataset):
        numeric_vars = {
            k: v
            for k, v in obj.data_vars.items()
            if np.issubdtype(v.dtype, np.floating)
        }
        others = obj.drop_vars(list(numeric_vars.keys()))
        filtered_arrays = []
        for name, var in numeric_vars.items():
            fa = _filter_da(var)
            # ensure the resulting DataArray has the original variable name
            fa = fa.rename(name)
            filtered_arrays.append(fa)

        filtered_ds = xr.merge(filtered_arrays).assign_coords(**obj.coords)
        return xr.merge([filtered_ds, others])
    else:
        raise TypeError("obj must be an xarray DataArray or Dataset.")
import xarray as xr
from earthkit.transforms import aggregate as ek_aggregate

ds = xr.tutorial.open_dataset("air_temperature")

clim = ek_aggregate.climatology.daily_mean(ds)
daily_clim_smooth = lowpass_xr(
    clim, fmax=12, time_dim="dayofyear", samples_per_year=365
)

Limitations

Handling of leap years (Feb 29) is still tricky. With 366-day climatologies, slight differences occur.

Describe alternatives you've considered

No response

Additional context

Verification

Comparison with CDO confirms numerical agreement:

xr.testing.assert_allclose(daily_clim_smooth.air.compute(), daily_clim_smooth_cdo)  # True
Image

Organisation

ECMWF

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions