From f354b11410d22c4185ba11646a0249d5c4887269 Mon Sep 17 00:00:00 2001 From: "niko.sarcevic" Date: Thu, 14 May 2026 10:53:47 -0400 Subject: [PATCH] Add LF magnitude-window integration utilities --- src/lfkit/photometry/lf_integrals.py | 291 ++++++++++++--------- src/lfkit/utils/evaluators.py | 58 ++++- src/lfkit/utils/integrators.py | 172 ++++++++++++ tests/test_photometry_lf_integrals.py | 123 +++++++-- tests/test_utils_evaluators.py | 277 ++++++++++++++++++++ tests/test_utils_integrators.py | 362 ++++++++++++++++++++++++++ 6 files changed, 1140 insertions(+), 143 deletions(-) create mode 100644 src/lfkit/utils/integrators.py create mode 100644 tests/test_utils_integrators.py diff --git a/src/lfkit/photometry/lf_integrals.py b/src/lfkit/photometry/lf_integrals.py index 81567f1c..aaefe3f1 100644 --- a/src/lfkit/photometry/lf_integrals.py +++ b/src/lfkit/photometry/lf_integrals.py @@ -23,6 +23,13 @@ import numpy as np from lfkit.utils.types import FloatArray, FloatInput, LuminosityFunction +from lfkit.utils.integrators import integrate_between_variable_bounds, safe_divide +from lfkit.utils.evaluators import ( + evaluate_optional_redshift_callable, + evaluate_positive_redshift_callable, + evaluate_lf_on_grid, + evaluate_weight_on_grid, +) from lfkit.utils.validators import validate_array @@ -33,6 +40,7 @@ "integrated_luminosity_density", "mean_luminosity", "cumulative_number_density", + "magnitude_window_number_density", ] @@ -264,7 +272,7 @@ def mean_luminosity( n_m=n_m, ) - return _safe_divide(luminosity_density, number_density) + return safe_divide(luminosity_density, number_density) def cumulative_number_density( @@ -336,165 +344,194 @@ def cumulative_number_density( ) -def _integrated_lf_between_bounds( +def magnitude_window_number_density( z: FloatInput, lf: LuminosityFunction, *, - m_lower: FloatInput, - m_upper: FloatInput, - n_m: int, - weight_fn: Callable[[FloatArray, FloatArray], FloatArray] | None = None, + m_bright: FloatInput | None = None, + m_faint: FloatInput | None = None, + apparent_m_bright: FloatInput | None = None, + apparent_m_faint: FloatInput | None = None, + luminosity_distance_mpc_fn: Callable[[FloatArray], FloatArray] | None = None, + k_correction_fn: Callable[[FloatArray], FloatArray] | None = None, + e_correction_fn: Callable[[FloatArray], FloatArray] | None = None, + n_m: int = 512, ) -> FloatArray: - r"""Return finite-range LF integral between magnitude bounds.""" - z_arr = validate_array(z, name="z") - m_lower_arr = validate_array(m_lower, name="m_lower") - m_upper_arr = validate_array(m_upper, name="m_upper") - - _validate_integration_inputs( - z=z_arr, - m_lower=m_lower_arr, - m_upper=m_upper_arr, - n_m=n_m, - ) - - z_b, m_lower_b, m_upper_b = np.broadcast_arrays( - z_arr, - m_lower_arr, - m_upper_arr, - ) - - integral = np.zeros_like(z_b, dtype=float) - valid = m_upper_b > m_lower_b + r"""Return LF number density inside a magnitude-selection window. - if not np.any(valid): - return np.asarray(integral, dtype=float) + This integrates a luminosity function over a finite absolute-magnitude + range. The bright and faint limits may be supplied directly as absolute + magnitudes, converted from apparent magnitudes, or supplied as a mixture of + both. - m_grid = _magnitude_grid( - m_lower=m_lower_b[valid], - m_upper=m_upper_b[valid], - n_m=n_m, - ) - z_grid = np.broadcast_to(z_b[valid][None, :], m_grid.shape) - - phi = _evaluate_lf_on_grid(lf, m_grid=m_grid, z_grid=z_grid) + Magnitudes are ordered so that more negative values are brighter. Apparent + magnitude limits are converted to absolute-magnitude limits at each + redshift before integration. - if weight_fn is not None: - weight = _evaluate_weight_on_grid( - weight_fn, - m_grid=m_grid, - z_grid=z_grid, - ) - phi = phi * weight + This helper is science-use-case agnostic. It only defines the LF integral + over a magnitude window; interpretation of that selected population belongs + to the calling analysis. - integral[valid] = np.trapezoid(phi, x=m_grid, axis=0) + Args: + z: Redshift value or array-like of redshift values. + lf: Luminosity-function callable with signature ``lf(M, z)``. + m_bright: Bright absolute-magnitude bound. + m_faint: Faint absolute-magnitude bound. + apparent_m_bright: Bright apparent-magnitude bound. + apparent_m_faint: Faint apparent-magnitude bound. + luminosity_distance_mpc_fn: Callable returning luminosity distance in + Mpc. Required when either apparent-magnitude bound is supplied. + k_correction_fn: Optional K-correction callable evaluated at ``z``. + e_correction_fn: Optional E-correction callable evaluated at ``z``. + n_m: Number of magnitude-grid points used for the integral. - return np.asarray(integral, dtype=float) + Returns: + NumPy array of number densities evaluated at ``z``. + Raises: + ValueError: If a bright or faint bound is missing, if both absolute and + apparent values are supplied for the same bound, or if an apparent + bound is supplied without a luminosity-distance callable. + """ + z_arr = validate_array(z, name="z") -def _magnitude_grid( - *, - m_lower: FloatArray, - m_upper: FloatArray, - n_m: int, -) -> FloatArray: - r"""Return a magnitude grid for column-wise finite-range integration.""" - t = np.linspace(0.0, 1.0, int(n_m), dtype=float) + m_bright_resolved = _resolve_magnitude_window_bound( + z_arr, + absolute_mag=m_bright, + apparent_mag=apparent_m_bright, + luminosity_distance_mpc_fn=luminosity_distance_mpc_fn, + k_correction_fn=k_correction_fn, + e_correction_fn=e_correction_fn, + bound_name="bright", + ) + m_faint_resolved = _resolve_magnitude_window_bound( + z_arr, + absolute_mag=m_faint, + apparent_mag=apparent_m_faint, + luminosity_distance_mpc_fn=luminosity_distance_mpc_fn, + k_correction_fn=k_correction_fn, + e_correction_fn=e_correction_fn, + bound_name="faint", + ) - return np.asarray( - m_lower[None, :] + t[:, None] * (m_upper[None, :] - m_lower[None, :]), - dtype=float, + return integrated_number_density( + z_arr, + lf, + m_bright=m_bright_resolved, + m_faint=m_faint_resolved, + n_m=n_m, ) -def _evaluate_lf_on_grid( - lf: LuminosityFunction, +def _resolve_magnitude_window_bound( + z: FloatArray, *, - m_grid: FloatArray, - z_grid: FloatArray, + absolute_mag: FloatInput | None, + apparent_mag: FloatInput | None, + luminosity_distance_mpc_fn: Callable[[FloatArray], FloatArray] | None, + k_correction_fn: Callable[[FloatArray], FloatArray] | None, + e_correction_fn: Callable[[FloatArray], FloatArray] | None, + bound_name: str, ) -> FloatArray: - r"""Return LF values evaluated on a magnitude-redshift grid.""" - phi = np.asarray(lf(m_grid, z_grid), dtype=float) - - if phi.shape != m_grid.shape: - try: - phi = np.broadcast_to(phi, m_grid.shape) - except ValueError as exc: - raise ValueError( - "lf(M, z) must return values broadcastable to the shape " - "of the magnitude-redshift integration grid." - ) from exc + r"""Return an absolute-magnitude bound for a magnitude window.""" + if absolute_mag is None and apparent_mag is None: + raise ValueError( + f"Must provide either m_{bound_name} or apparent_m_{bound_name}." + ) - if np.any(~np.isfinite(phi)): - raise ValueError("lf(M, z) returned non-finite values.") + if absolute_mag is not None and apparent_mag is not None: + raise ValueError( + f"Provide only one of m_{bound_name} or apparent_m_{bound_name}." + ) - if np.any(phi < 0.0): - raise ValueError("lf(M, z) must be non-negative.") + if absolute_mag is not None: + return validate_array(absolute_mag, name=f"m_{bound_name}") - return np.asarray(phi, dtype=float) + if luminosity_distance_mpc_fn is None: + raise ValueError( + "luminosity_distance_mpc_fn is required when apparent magnitude " + f"bounds are supplied. Missing conversion for apparent_m_{bound_name}." + ) + apparent_mag_arr = validate_array( + apparent_mag, + name=f"apparent_m_{bound_name}", + ) + luminosity_distance_mpc = evaluate_positive_redshift_callable( + luminosity_distance_mpc_fn, + z, + name="luminosity_distance_mpc_fn", + ) -def _evaluate_weight_on_grid( - weight_fn: Callable[[FloatArray, FloatArray], FloatArray], - *, - m_grid: FloatArray, - z_grid: FloatArray, -) -> FloatArray: - r"""Return finite non-negative weight values on a magnitude-redshift grid.""" - weight = np.asarray(weight_fn(m_grid, z_grid), dtype=float) + k_correction = evaluate_optional_redshift_callable( + k_correction_fn, + z, + name="k_correction_fn", + ) + e_correction = evaluate_optional_redshift_callable( + e_correction_fn, + z, + name="e_correction_fn", + ) - if weight.shape != m_grid.shape: - try: - weight = np.broadcast_to(weight, m_grid.shape) - except ValueError as exc: - raise ValueError( - "weight_fn(M, z) must return values broadcastable to the shape " - "of the magnitude-redshift integration grid." - ) from exc + if k_correction is None: + k_correction = np.zeros_like(z, dtype=float) - if np.any(~np.isfinite(weight)): - raise ValueError("weight_fn(M, z) returned non-finite values.") + if e_correction is None: + e_correction = np.zeros_like(z, dtype=float) - if np.any(weight < 0.0): - raise ValueError("weight_fn(M, z) must be non-negative.") + distance_modulus = 5.0 * np.log10(luminosity_distance_mpc) + 25.0 - return np.asarray(weight, dtype=float) + return np.asarray( + apparent_mag_arr - distance_modulus - k_correction + e_correction, + dtype=float, + ) -def _validate_integration_inputs( +def _integrated_lf_between_bounds( + z: FloatInput, + lf: LuminosityFunction, *, - z: FloatArray, - m_lower: FloatArray, - m_upper: FloatArray, + m_lower: FloatInput, + m_upper: FloatInput, n_m: int, -) -> None: - r"""Validate inputs for finite-range LF integration.""" - if np.any(z < 0.0): - raise ValueError("Redshift z must be >= 0.") - - if np.any(~np.isfinite(m_lower)): - raise ValueError("m_lower must contain only finite values.") + weight_fn: Callable[[FloatArray, FloatArray], FloatArray] | None = None, +) -> FloatArray: + r"""Return finite-range LF integral between magnitude bounds.""" + z_arr = validate_array(z, name="z") - if np.any(~np.isfinite(m_upper)): - raise ValueError("m_upper must contain only finite values.") + if np.any(z_arr < 0.0): + raise ValueError("Redshift z must be >= 0.") - if n_m < 2: - raise ValueError("n_m must be at least 2.") + def integrand( + absolute_mag: FloatArray, + redshift: FloatArray, + ) -> FloatArray: + phi = evaluate_lf_on_grid( + lf, + m_grid=absolute_mag, + z_grid=redshift, + ) + if weight_fn is None: + return phi -def _safe_divide( - numerator: FloatArray, - denominator: FloatArray, -) -> FloatArray: - r"""Return numerator / denominator with zero output for zero denominator.""" - numerator_arr = np.asarray(numerator, dtype=float) - denominator_arr = np.asarray(denominator, dtype=float) - - with np.errstate(divide="ignore", invalid="ignore"): - result = np.divide( - numerator_arr, - denominator_arr, - out=np.zeros_like(numerator_arr, dtype=float), - where=denominator_arr > 0.0, + weight = evaluate_weight_on_grid( + weight_fn, + m_grid=absolute_mag, + z_grid=redshift, ) - return np.asarray(result, dtype=float) + return np.asarray(phi * weight, dtype=float) + + return integrate_between_variable_bounds( + z_arr, + lower=m_lower, + upper=m_upper, + integrand_fn=integrand, + n_grid=n_m, + y_name="z", + lower_name="m_lower", + upper_name="m_upper", + n_grid_name="n_m", + ) diff --git a/src/lfkit/utils/evaluators.py b/src/lfkit/utils/evaluators.py index 72b3f3ea..208bc694 100644 --- a/src/lfkit/utils/evaluators.py +++ b/src/lfkit/utils/evaluators.py @@ -6,13 +6,15 @@ import numpy as np -from lfkit.utils.types import FloatArray +from lfkit.utils.types import FloatArray, LuminosityFunction __all__ = [ "evaluate_non_negative_redshift_callable", "evaluate_optional_redshift_callable", "evaluate_positive_redshift_callable", + "evaluate_weight_on_grid", + "evaluate_lf_on_grid", ] @@ -86,6 +88,60 @@ def evaluate_non_negative_redshift_callable( return values +def evaluate_weight_on_grid( + weight_fn: Callable[[FloatArray, FloatArray], FloatArray], + *, + m_grid: FloatArray, + z_grid: FloatArray, +) -> FloatArray: + r"""Return finite non-negative weight values on a magnitude-redshift grid.""" + weight = np.asarray(weight_fn(m_grid, z_grid), dtype=float) + + if weight.shape != m_grid.shape: + try: + weight = np.broadcast_to(weight, m_grid.shape) + except ValueError as exc: + raise ValueError( + "weight_fn(M, z) must return values broadcastable to the shape " + "of the magnitude-redshift integration grid." + ) from exc + + if np.any(~np.isfinite(weight)): + raise ValueError("weight_fn(M, z) returned non-finite values.") + + if np.any(weight < 0.0): + raise ValueError("weight_fn(M, z) must be non-negative.") + + return np.asarray(weight, dtype=float) + + +def evaluate_lf_on_grid( + lf: LuminosityFunction, + *, + m_grid: FloatArray, + z_grid: FloatArray, +) -> FloatArray: + r"""Return LF values evaluated on a magnitude-redshift grid.""" + phi = np.asarray(lf(m_grid, z_grid), dtype=float) + + if phi.shape != m_grid.shape: + try: + phi = np.broadcast_to(phi, m_grid.shape) + except ValueError as exc: + raise ValueError( + "lf(M, z) must return values broadcastable to the shape " + "of the magnitude-redshift integration grid." + ) from exc + + if np.any(~np.isfinite(phi)): + raise ValueError("lf(M, z) returned non-finite values.") + + if np.any(phi < 0.0): + raise ValueError("lf(M, z) must be non-negative.") + + return np.asarray(phi, dtype=float) + + def _evaluate_redshift_callable( fn: Callable[[FloatArray], FloatArray], z: FloatArray, diff --git a/src/lfkit/utils/integrators.py b/src/lfkit/utils/integrators.py new file mode 100644 index 00000000..5af7362c --- /dev/null +++ b/src/lfkit/utils/integrators.py @@ -0,0 +1,172 @@ +"""Numerical integration utilities. + +This module provides small reusable helpers for integrating tabulated values +over fixed or variable finite bounds. These helpers do not encode any +luminosity-function, photometry, or cosmology assumptions. +""" + +from __future__ import annotations + +from collections.abc import Callable + +import numpy as np + +from lfkit.utils.types import FloatArray, FloatInput +from lfkit.utils.validators import validate_array + + +__all__ = [ + "integrate_between_variable_bounds", + "safe_divide", +] + + +def integrate_between_variable_bounds( + y: FloatInput, + *, + lower: FloatInput, + upper: FloatInput, + integrand_fn: Callable[[FloatArray, FloatArray], FloatArray], + n_grid: int = 512, + y_name: str = "y", + lower_name: str = "lower", + upper_name: str = "upper", + n_grid_name: str = "n_grid", +) -> FloatArray: + """Integrate a callable between finite bounds that may vary with ``y``. + + Args: + y: Coordinate values at which to evaluate the integral. + lower: Lower integration bound. May be scalar or array-like. + upper: Upper integration bound. May be scalar or array-like. + integrand_fn: Callable evaluated as ``integrand_fn(x_grid, y_grid)``. + n_grid: Number of integration points. + y_name: Name used for ``y`` in error messages. + lower_name: Name used for ``lower`` in error messages. + upper_name: Name used for ``upper`` in error messages. + n_grid_name: Name used for ``n_grid`` in error messages. + + Returns: + Integral values with the broadcast shape of ``y``, ``lower``, and + ``upper``. + """ + if n_grid < 2: + raise ValueError(f"{n_grid_name} must be at least 2") + + y_arr = validate_array(y, name=y_name) + + lower_arr = np.asarray(lower, dtype=float) + upper_arr = np.asarray(upper, dtype=float) + + if np.any(~np.isfinite(lower_arr)): + raise ValueError(_bound_finite_error_message(lower_name)) + + if np.any(~np.isfinite(upper_arr)): + raise ValueError(_bound_finite_error_message(upper_name)) + + lower_arr = validate_array(lower_arr, name=lower_name) + upper_arr = validate_array(upper_arr, name=upper_name) + + y_arr, lower_arr, upper_arr = np.broadcast_arrays(y_arr, lower_arr, upper_arr) + + width = upper_arr - lower_arr + empty = width <= 0.0 + + t_grid = np.linspace(0.0, 1.0, n_grid, dtype=float) + grid_shape = (n_grid,) + (1,) * lower_arr.ndim + + x_grid = lower_arr[None, ...] + t_grid.reshape(grid_shape) * width[None, ...] + y_grid = np.broadcast_to(y_arr[None, ...], x_grid.shape) + + values = np.asarray(integrand_fn(x_grid, y_grid), dtype=float) + + try: + values = np.broadcast_to(values, x_grid.shape) + except ValueError as exc: + raise ValueError( + "integrand_fn(x, y) must return values broadcastable " + "to the integration grid shape" + ) from exc + + if np.any(~np.isfinite(values)): + raise ValueError("integrand_fn(x, y) returned non-finite values") + + result = np.trapezoid(values, x=x_grid, axis=0) + return np.where(empty, 0.0, result) + + +def safe_divide( + numerator: FloatInput, + denominator: FloatInput, + *, + fill_value: float = 0.0, +) -> FloatArray: + """Divide arrays safely, replacing invalid or zero-denominator results. + + Args: + numerator: Values in the numerator. + denominator: Values in the denominator. + fill_value: Value returned where the denominator is zero or where + the division would produce a non-finite result. + + Returns: + Broadcasted division result with invalid entries replaced by + ``fill_value``. + """ + numerator_arr, denominator_arr = np.broadcast_arrays( + np.asarray(numerator, dtype=float), + np.asarray(denominator, dtype=float), + ) + + result = np.full(numerator_arr.shape, fill_value, dtype=float) + + valid = denominator_arr > 0.0 + np.divide( + numerator_arr, + denominator_arr, + out=result, + where=valid, + ) + + result = np.where(np.isfinite(result), result, fill_value) + return result + + +def _variable_bounds_grid( + *, + lower: FloatArray, + upper: FloatArray, + n_grid: int, +) -> FloatArray: + """Return a column-wise grid between variable lower and upper bounds.""" + t = np.linspace(0.0, 1.0, int(n_grid), dtype=float) + + return np.asarray( + lower[None, :] + t[:, None] * (upper[None, :] - lower[None, :]), + dtype=float, + ) + + +def _validate_variable_bound_inputs( + *, + lower: FloatArray, + upper: FloatArray, + n_grid: int, +) -> None: + """Validate finite variable-bound integration inputs.""" + if np.any(~np.isfinite(lower)): + raise ValueError("lower must contain only finite values.") + + if np.any(~np.isfinite(upper)): + raise ValueError("upper must contain only finite values.") + + if n_grid < 2: + raise ValueError("n_grid must be at least 2.") + + +def _bound_finite_error_message(name: str) -> str: + """Return the finite-bound validation message for a named bound.""" + if name in {"lower", "upper"}: + return f"{name} must contain only finite values." + + return f"{name} contains NaN or infinite values." diff --git a/tests/test_photometry_lf_integrals.py b/tests/test_photometry_lf_integrals.py index acbe7315..48cf312e 100644 --- a/tests/test_photometry_lf_integrals.py +++ b/tests/test_photometry_lf_integrals.py @@ -490,24 +490,117 @@ def test_cumulative_number_density_rejects_negative_redshift() -> None: ) -def test_private_magnitude_grid_has_expected_shape_and_edges() -> None: - """Tests that the private magnitude-grid helper builds column-wise grids.""" - result = li._magnitude_grid( - m_lower=np.array([-24.0, -23.0]), - m_upper=np.array([-18.0, -20.0]), - n_m=4, +def test_magnitude_window_number_density_uses_absolute_bounds() -> None: + """Tests magnitude-window density with direct absolute-magnitude bounds.""" + result = li.magnitude_window_number_density( + [0.1, 0.2], + constant_lf, + m_bright=-24.0, + m_faint=-18.0, + n_m=64, + ) + + np.testing.assert_allclose(result, np.array([6.0, 6.0])) + + +def test_magnitude_window_density_supports_mixed_bounds() -> None: + """Tests magnitude-window density with one apparent and one absolute bound.""" + + def luminosity_distance_mpc_fn(z: np.ndarray) -> np.ndarray: + """Return a constant luminosity distance.""" + return 10.0 * np.ones_like(z, dtype=float) + + result = li.magnitude_window_number_density( + [0.1, 0.2], + constant_lf, + m_bright=-24.0, + apparent_m_faint=12.0, + luminosity_distance_mpc_fn=luminosity_distance_mpc_fn, + n_m=64, ) - assert result.shape == (4, 2) - np.testing.assert_allclose(result[0], np.array([-24.0, -23.0])) - np.testing.assert_allclose(result[-1], np.array([-18.0, -20.0])) + np.testing.assert_allclose(result, np.array([6.0, 6.0])) + + +def test_magnitude_window_number_density_rejects_missing_bright_bound() -> None: + """Tests that magnitude-window density requires a bright bound.""" + with pytest.raises( + ValueError, + match="Must provide either m_bright or apparent_m_bright", + ): + li.magnitude_window_number_density( + 0.1, + constant_lf, + m_faint=-18.0, + ) + + +def test_magnitude_window_number_density_rejects_missing_faint_bound() -> None: + """Tests that magnitude-window density requires a faint bound.""" + with pytest.raises( + ValueError, + match="Must provide either m_faint or apparent_m_faint", + ): + li.magnitude_window_number_density( + 0.1, + constant_lf, + m_bright=-24.0, + ) + + +def test_magnitude_window_number_density_rejects_duplicate_bright_bounds() -> None: + """Tests that absolute and apparent bright bounds cannot both be supplied.""" + with pytest.raises( + ValueError, + match="Provide only one of m_bright or apparent_m_bright", + ): + li.magnitude_window_number_density( + 0.1, + constant_lf, + m_bright=-24.0, + apparent_m_bright=18.0, + m_faint=-18.0, + ) -def test_private_safe_divide_returns_zero_for_zero_denominator() -> None: - """Tests that the private safe-division helper handles zero denominators.""" - result = li._safe_divide( - np.array([1.0, 2.0, 3.0]), - np.array([1.0, 0.0, 2.0]), +def test_magnitude_window_density_requires_distance_for_apparent_bounds() -> None: + """Tests that apparent-magnitude bounds require a distance callable.""" + with pytest.raises( + ValueError, + match="luminosity_distance_mpc_fn is required", + ): + li.magnitude_window_number_density( + 0.1, + constant_lf, + apparent_m_bright=18.0, + m_faint=-18.0, + ) + + +def test_magnitude_window_number_density_applies_k_and_e_corrections() -> None: + """Tests apparent-magnitude conversion with K- and E-corrections.""" + + def luminosity_distance_mpc_fn(z: np.ndarray) -> np.ndarray: + """Return a constant luminosity distance.""" + return 10.0 * np.ones_like(z, dtype=float) + + def k_correction_fn(z: np.ndarray) -> np.ndarray: + """Return a constant K-correction.""" + return 1.0 * np.ones_like(z, dtype=float) + + def e_correction_fn(z: np.ndarray) -> np.ndarray: + """Return a constant E-correction.""" + return 0.5 * np.ones_like(z, dtype=float) + + result = li.magnitude_window_number_density( + [0.1, 0.2], + constant_lf, + m_bright=-24.0, + apparent_m_faint=12.0, + luminosity_distance_mpc_fn=luminosity_distance_mpc_fn, + k_correction_fn=k_correction_fn, + e_correction_fn=e_correction_fn, + n_m=64, ) - np.testing.assert_allclose(result, np.array([1.0, 0.0, 1.5])) + np.testing.assert_allclose(result, np.array([5.5, 5.5])) diff --git a/tests/test_utils_evaluators.py b/tests/test_utils_evaluators.py index 069f35be..f0f9ae83 100644 --- a/tests/test_utils_evaluators.py +++ b/tests/test_utils_evaluators.py @@ -6,9 +6,11 @@ import pytest from lfkit.utils.evaluators import ( + evaluate_lf_on_grid, evaluate_non_negative_redshift_callable, evaluate_optional_redshift_callable, evaluate_positive_redshift_callable, + evaluate_weight_on_grid, ) @@ -395,3 +397,278 @@ def fn(z_arr: np.ndarray) -> np.ndarray: assert result.shape == z.shape assert result.dtype == np.float64 np.testing.assert_allclose(result, expected) + + +def test_evaluate_lf_on_grid_accepts_matching_shape() -> None: + """Tests that LF grid evaluation accepts values with matching shape.""" + m_grid = np.array( + [ + [-24.0, -23.0], + [-22.0, -21.0], + ], + dtype=float, + ) + z_grid = np.array( + [ + [0.1, 0.2], + [0.1, 0.2], + ], + dtype=float, + ) + + def lf(m_abs: np.ndarray, z: np.ndarray) -> np.ndarray: + """Return LF values with the same shape as the input grid.""" + return np.ones_like(np.broadcast_arrays(m_abs, z)[0], dtype=float) + + result = evaluate_lf_on_grid( + lf, + m_grid=m_grid, + z_grid=z_grid, + ) + + np.testing.assert_allclose(result, np.ones_like(m_grid)) + + +def test_evaluate_lf_on_grid_broadcasts_scalar_output() -> None: + """Tests that scalar LF output is broadcast to the grid shape.""" + m_grid = np.ones((3, 2), dtype=float) + z_grid = np.ones((3, 2), dtype=float) + + def lf(m_abs: np.ndarray, z: np.ndarray) -> float: + """Return a scalar LF value.""" + _ = m_abs + _ = z + return 2.0 + + result = evaluate_lf_on_grid( + lf, + m_grid=m_grid, + z_grid=z_grid, + ) + + np.testing.assert_allclose(result, 2.0 * np.ones_like(m_grid)) + + +def test_evaluate_lf_on_grid_broadcasts_column_output() -> None: + """Tests that broadcastable LF output is expanded to the grid shape.""" + m_grid = np.ones((3, 2), dtype=float) + z_grid = np.ones((3, 2), dtype=float) + + def lf(m_abs: np.ndarray, z: np.ndarray) -> np.ndarray: + """Return one LF value per redshift column.""" + _ = m_abs + _ = z + return np.array([[1.0, 2.0]], dtype=float) + + result = evaluate_lf_on_grid( + lf, + m_grid=m_grid, + z_grid=z_grid, + ) + + expected = np.array( + [ + [1.0, 2.0], + [1.0, 2.0], + [1.0, 2.0], + ], + dtype=float, + ) + np.testing.assert_allclose(result, expected) + + +def test_evaluate_lf_on_grid_rejects_unbroadcastable_output() -> None: + """Tests that LF grid output must be broadcastable to the grid shape.""" + m_grid = np.ones((3, 2), dtype=float) + z_grid = np.ones((3, 2), dtype=float) + + def lf(m_abs: np.ndarray, z: np.ndarray) -> np.ndarray: + """Return LF values with an invalid shape.""" + _ = m_abs + _ = z + return np.ones((4, 4), dtype=float) + + with pytest.raises( + ValueError, + match="lf\\(M, z\\) must return values broadcastable", + ): + evaluate_lf_on_grid( + lf, + m_grid=m_grid, + z_grid=z_grid, + ) + + +def test_evaluate_lf_on_grid_rejects_nonfinite_values() -> None: + """Tests that LF grid values must be finite.""" + m_grid = np.ones((3, 2), dtype=float) + z_grid = np.ones((3, 2), dtype=float) + + def lf(m_abs: np.ndarray, z: np.ndarray) -> np.ndarray: + """Return LF values containing NaN.""" + _ = z + return np.full_like(m_abs, np.nan, dtype=float) + + with pytest.raises(ValueError, match="lf\\(M, z\\) returned non-finite values"): + evaluate_lf_on_grid( + lf, + m_grid=m_grid, + z_grid=z_grid, + ) + + +def test_evaluate_lf_on_grid_rejects_negative_values() -> None: + """Tests that LF grid values must be non-negative.""" + m_grid = np.ones((3, 2), dtype=float) + z_grid = np.ones((3, 2), dtype=float) + + def lf(m_abs: np.ndarray, z: np.ndarray) -> np.ndarray: + """Return negative LF values.""" + _ = z + return -np.ones_like(m_abs, dtype=float) + + with pytest.raises(ValueError, match="lf\\(M, z\\) must be non-negative"): + evaluate_lf_on_grid( + lf, + m_grid=m_grid, + z_grid=z_grid, + ) + + +def test_evaluate_weight_on_grid_accepts_matching_shape() -> None: + """Tests that weight grid evaluation accepts values with matching shape.""" + m_grid = np.array( + [ + [-24.0, -23.0], + [-22.0, -21.0], + ], + dtype=float, + ) + z_grid = np.array( + [ + [0.1, 0.2], + [0.1, 0.2], + ], + dtype=float, + ) + + def weight_fn(m_abs: np.ndarray, z: np.ndarray) -> np.ndarray: + """Return weights with the same shape as the input grid.""" + return np.ones_like(np.broadcast_arrays(m_abs, z)[0], dtype=float) + + result = evaluate_weight_on_grid( + weight_fn, + m_grid=m_grid, + z_grid=z_grid, + ) + + np.testing.assert_allclose(result, np.ones_like(m_grid)) + + +def test_evaluate_weight_on_grid_broadcasts_scalar_output() -> None: + """Tests that scalar weight output is broadcast to the grid shape.""" + m_grid = np.ones((3, 2), dtype=float) + z_grid = np.ones((3, 2), dtype=float) + + def weight_fn(m_abs: np.ndarray, z: np.ndarray) -> float: + """Return a scalar weight value.""" + _ = m_abs + _ = z + return 0.5 + + result = evaluate_weight_on_grid( + weight_fn, + m_grid=m_grid, + z_grid=z_grid, + ) + + np.testing.assert_allclose(result, 0.5 * np.ones_like(m_grid)) + + +def test_evaluate_weight_on_grid_broadcasts_column_output() -> None: + """Tests that broadcastable weight output is expanded to the grid shape.""" + m_grid = np.ones((3, 2), dtype=float) + z_grid = np.ones((3, 2), dtype=float) + + def weight_fn(m_abs: np.ndarray, z: np.ndarray) -> np.ndarray: + """Return one weight value per redshift column.""" + _ = m_abs + _ = z + return np.array([[0.25, 0.75]], dtype=float) + + result = evaluate_weight_on_grid( + weight_fn, + m_grid=m_grid, + z_grid=z_grid, + ) + + expected = np.array( + [ + [0.25, 0.75], + [0.25, 0.75], + [0.25, 0.75], + ], + dtype=float, + ) + np.testing.assert_allclose(result, expected) + + +def test_evaluate_weight_on_grid_rejects_unbroadcastable_output() -> None: + """Tests that weight output must be broadcastable to the grid shape.""" + m_grid = np.ones((3, 2), dtype=float) + z_grid = np.ones((3, 2), dtype=float) + + def weight_fn(m_abs: np.ndarray, z: np.ndarray) -> np.ndarray: + """Return weights with an invalid shape.""" + _ = m_abs + _ = z + return np.ones((4, 4), dtype=float) + + with pytest.raises( + ValueError, + match="weight_fn\\(M, z\\) must return values broadcastable", + ): + evaluate_weight_on_grid( + weight_fn, + m_grid=m_grid, + z_grid=z_grid, + ) + + +def test_evaluate_weight_on_grid_rejects_nonfinite_values() -> None: + """Tests that weight grid values must be finite.""" + m_grid = np.ones((3, 2), dtype=float) + z_grid = np.ones((3, 2), dtype=float) + + def weight_fn(m_abs: np.ndarray, z: np.ndarray) -> np.ndarray: + """Return weights containing infinity.""" + _ = z + return np.full_like(m_abs, np.inf, dtype=float) + + with pytest.raises( + ValueError, + match="weight_fn\\(M, z\\) returned non-finite values", + ): + evaluate_weight_on_grid( + weight_fn, + m_grid=m_grid, + z_grid=z_grid, + ) + + +def test_evaluate_weight_on_grid_rejects_negative_values() -> None: + """Tests that weight grid values must be non-negative.""" + m_grid = np.ones((3, 2), dtype=float) + z_grid = np.ones((3, 2), dtype=float) + + def weight_fn(m_abs: np.ndarray, z: np.ndarray) -> np.ndarray: + """Return negative weights.""" + _ = z + return -np.ones_like(m_abs, dtype=float) + + with pytest.raises(ValueError, match="weight_fn\\(M, z\\) must be non-negative"): + evaluate_weight_on_grid( + weight_fn, + m_grid=m_grid, + z_grid=z_grid, + ) diff --git a/tests/test_utils_integrators.py b/tests/test_utils_integrators.py new file mode 100644 index 00000000..9897b5da --- /dev/null +++ b/tests/test_utils_integrators.py @@ -0,0 +1,362 @@ +"""Unit tests for ``lfkit.utils.integrators.py``.""" + +from __future__ import annotations + +import numpy as np +import pytest + +from lfkit.utils.integrators import ( + integrate_between_variable_bounds, + safe_divide, +) + + +def constant_integrand(x: np.ndarray, y: np.ndarray) -> np.ndarray: + """Return a constant integrand.""" + return np.ones_like(np.broadcast_arrays(x, y)[0], dtype=float) + + +def double_integrand(x: np.ndarray, y: np.ndarray) -> np.ndarray: + """Return a constant integrand with amplitude two.""" + return 2.0 * np.ones_like(np.broadcast_arrays(x, y)[0], dtype=float) + + +def linear_x_integrand(x: np.ndarray, y: np.ndarray) -> np.ndarray: + """Return an integrand that varies linearly with the integration coordinate.""" + _ = y + return np.asarray(x, dtype=float) + + +def linear_y_integrand(x: np.ndarray, y: np.ndarray) -> np.ndarray: + """Return an integrand that varies linearly with the external coordinate.""" + _ = x + return np.asarray(y, dtype=float) + + +def test_integrate_between_variable_bounds_integrates_constant_function() -> None: + """Tests that variable-bound integration returns the expected interval width.""" + result = integrate_between_variable_bounds( + [0.1, 0.2], + lower=-24.0, + upper=-18.0, + integrand_fn=constant_integrand, + n_grid=64, + ) + + np.testing.assert_allclose(result, np.array([6.0, 6.0])) + + +def test_integrate_between_variable_bounds_preserves_integrand_amplitude() -> None: + """Tests that variable-bound integration preserves integrand amplitude.""" + result = integrate_between_variable_bounds( + [0.1, 0.2], + lower=-24.0, + upper=-18.0, + integrand_fn=double_integrand, + n_grid=64, + ) + + np.testing.assert_allclose(result, np.array([12.0, 12.0])) + + +def test_integrate_between_variable_bounds_supports_array_bounds() -> None: + """Tests that variable-bound integration supports coordinate-dependent bounds.""" + result = integrate_between_variable_bounds( + [0.1, 0.2, 0.3], + lower=np.array([-24.0, -23.0, -22.0]), + upper=np.array([-18.0, -18.0, -18.0]), + integrand_fn=constant_integrand, + n_grid=64, + ) + + np.testing.assert_allclose(result, np.array([6.0, 5.0, 4.0])) + + +def test_integrate_between_variable_bounds_supports_scalar_y() -> None: + """Tests that variable-bound integration supports scalar coordinate input.""" + result = integrate_between_variable_bounds( + 0.1, + lower=-24.0, + upper=-18.0, + integrand_fn=constant_integrand, + n_grid=64, + ) + + assert result.shape == () + assert result == pytest.approx(6.0) + + +def test_integrate_between_variable_bounds_returns_zero_for_empty_ranges() -> None: + """Tests that integration returns zero where the upper bound is not larger.""" + result = integrate_between_variable_bounds( + [0.1, 0.2, 0.3], + lower=np.array([-18.0, -20.0, -19.0]), + upper=np.array([-20.0, -20.0, -21.0]), + integrand_fn=constant_integrand, + n_grid=64, + ) + + np.testing.assert_allclose(result, np.array([0.0, 0.0, 0.0])) + + +def test_integrate_between_variable_bounds_handles_mixed_valid_and_empty_ranges() -> None: + """Tests that only valid intervals contribute to the integral.""" + result = integrate_between_variable_bounds( + [0.1, 0.2, 0.3], + lower=np.array([-24.0, -18.0, -22.0]), + upper=np.array([-18.0, -20.0, -18.0]), + integrand_fn=constant_integrand, + n_grid=64, + ) + + np.testing.assert_allclose(result, np.array([6.0, 0.0, 4.0])) + + +def test_integrate_between_variable_bounds_integrates_linear_x_function() -> None: + """Tests integration of a linear function over the integration coordinate.""" + result = integrate_between_variable_bounds( + 0.1, + lower=0.0, + upper=2.0, + integrand_fn=linear_x_integrand, + n_grid=128, + ) + + assert result == pytest.approx(2.0) + + +def test_integrate_between_variable_bounds_integrates_y_dependent_function() -> None: + """Tests integration of an external-coordinate-dependent function.""" + result = integrate_between_variable_bounds( + [1.0, 2.0, 3.0], + lower=0.0, + upper=2.0, + integrand_fn=linear_y_integrand, + n_grid=64, + ) + + np.testing.assert_allclose(result, np.array([2.0, 4.0, 6.0])) + + +def test_integrate_between_variable_bounds_accepts_scalar_integrand_output() -> None: + """Tests that scalar integrand output is broadcast to the integration grid.""" + + def scalar_integrand(x: np.ndarray, y: np.ndarray) -> float: + """Return a scalar integrand value.""" + _ = x + _ = y + return 3.0 + + result = integrate_between_variable_bounds( + [0.1, 0.2], + lower=0.0, + upper=2.0, + integrand_fn=scalar_integrand, + n_grid=64, + ) + + np.testing.assert_allclose(result, np.array([6.0, 6.0])) + + +def test_integrate_between_variable_bounds_accepts_broadcastable_integrand_output() -> None: + """Tests that broadcastable integrand output is expanded to the grid shape.""" + + def broadcastable_integrand(x: np.ndarray, y: np.ndarray) -> np.ndarray: + """Return one integrand value per coordinate value.""" + _ = x + _ = y + return np.array([[1.0, 2.0]], dtype=float) + + result = integrate_between_variable_bounds( + [0.1, 0.2], + lower=0.0, + upper=2.0, + integrand_fn=broadcastable_integrand, + n_grid=64, + ) + + np.testing.assert_allclose(result, np.array([2.0, 4.0])) + + +def test_integrate_between_variable_bounds_preserves_broadcasted_shape() -> None: + """Tests that broadcasted coordinate and bound inputs define the output shape.""" + result = integrate_between_variable_bounds( + np.array([[0.1], [0.2]], dtype=float), + lower=np.array([0.0, 1.0, 2.0], dtype=float), + upper=np.array([1.0, 3.0, 5.0], dtype=float), + integrand_fn=constant_integrand, + n_grid=64, + ) + + expected = np.array( + [ + [1.0, 2.0, 3.0], + [1.0, 2.0, 3.0], + ], + dtype=float, + ) + assert result.shape == expected.shape + np.testing.assert_allclose(result, expected) + + +def test_integrate_between_variable_bounds_rejects_small_grid() -> None: + """Tests that variable-bound integration requires at least two grid points.""" + with pytest.raises(ValueError, match="n_grid must be at least 2"): + integrate_between_variable_bounds( + [0.1, 0.2], + lower=0.0, + upper=1.0, + integrand_fn=constant_integrand, + n_grid=1, + ) + + +def test_integrate_between_variable_bounds_rejects_nonfinite_lower_bound() -> None: + """Tests that lower integration bounds must be finite.""" + with pytest.raises(ValueError, match="lower must contain only finite values"): + integrate_between_variable_bounds( + [0.1, 0.2], + lower=np.nan, + upper=1.0, + integrand_fn=constant_integrand, + n_grid=64, + ) + + +def test_integrate_between_variable_bounds_rejects_nonfinite_upper_bound() -> None: + """Tests that upper integration bounds must be finite.""" + with pytest.raises(ValueError, match="upper must contain only finite values"): + integrate_between_variable_bounds( + [0.1, 0.2], + lower=0.0, + upper=np.inf, + integrand_fn=constant_integrand, + n_grid=64, + ) + + +def test_integrate_between_variable_bounds_rejects_unbroadcastable_bounds() -> None: + """Tests that coordinate and bound inputs must be broadcastable together.""" + with pytest.raises(ValueError): + integrate_between_variable_bounds( + np.ones((2, 3), dtype=float), + lower=np.ones((4,), dtype=float), + upper=2.0, + integrand_fn=constant_integrand, + n_grid=64, + ) + + +def test_integrate_between_variable_bounds_rejects_unbroadcastable_integrand() -> None: + """Tests that integrand output must be broadcastable to the integration grid.""" + + def bad_integrand(x: np.ndarray, y: np.ndarray) -> np.ndarray: + """Return integrand values with an invalid shape.""" + _ = x + _ = y + return np.ones((4, 4), dtype=float) + + with pytest.raises( + ValueError, + match="integrand_fn\\(x, y\\) must return values broadcastable", + ): + integrate_between_variable_bounds( + [0.1, 0.2], + lower=0.0, + upper=1.0, + integrand_fn=bad_integrand, + n_grid=64, + ) + + +def test_integrate_between_variable_bounds_rejects_nonfinite_integrand_values() -> None: + """Tests that integrand values must be finite.""" + + def bad_integrand(x: np.ndarray, y: np.ndarray) -> np.ndarray: + """Return non-finite integrand values.""" + _ = y + return np.full_like(x, np.nan, dtype=float) + + with pytest.raises( + ValueError, + match="integrand_fn\\(x, y\\) returned non-finite values", + ): + integrate_between_variable_bounds( + [0.1, 0.2], + lower=0.0, + upper=1.0, + integrand_fn=bad_integrand, + n_grid=64, + ) + + +def test_safe_divide_returns_ratio_for_positive_denominator() -> None: + """Tests that safe division returns the ordinary ratio for positive denominators.""" + result = safe_divide( + np.array([2.0, 6.0, 12.0]), + np.array([1.0, 2.0, 3.0]), + ) + + np.testing.assert_allclose(result, np.array([2.0, 3.0, 4.0])) + + +def test_safe_divide_returns_zero_for_zero_denominator() -> None: + """Tests that safe division returns zero for zero denominators.""" + result = safe_divide( + np.array([1.0, 2.0, 3.0]), + np.array([1.0, 0.0, 2.0]), + ) + + np.testing.assert_allclose(result, np.array([1.0, 0.0, 1.5])) + + +def test_safe_divide_returns_zero_for_negative_denominator() -> None: + """Tests that safe division returns zero for negative denominators.""" + result = safe_divide( + np.array([1.0, 2.0, 3.0]), + np.array([1.0, -1.0, 2.0]), + ) + + np.testing.assert_allclose(result, np.array([1.0, 0.0, 1.5])) + + +def test_safe_divide_supports_scalar_inputs() -> None: + """Tests that safe division supports scalar inputs.""" + result = safe_divide(4.0, 2.0) + + assert result.shape == () + assert result == pytest.approx(2.0) + + +def test_safe_divide_returns_zero_for_scalar_zero_denominator() -> None: + """Tests that safe division returns zero for scalar zero denominator.""" + result = safe_divide(4.0, 0.0) + + assert result.shape == () + assert result == pytest.approx(0.0) + + +def test_safe_divide_supports_broadcastable_inputs() -> None: + """Tests that safe division supports broadcastable numerator and denominator.""" + result = safe_divide( + np.array([[2.0], [4.0]], dtype=float), + np.array([1.0, 2.0, 0.0], dtype=float), + ) + + expected = np.array( + [ + [2.0, 1.0, 0.0], + [4.0, 2.0, 0.0], + ], + dtype=float, + ) + np.testing.assert_allclose(result, expected) + + +def test_safe_divide_rejects_unbroadcastable_inputs() -> None: + """Tests that safe division requires broadcastable inputs.""" + with pytest.raises(ValueError): + safe_divide( + np.ones((2, 3), dtype=float), + np.ones((4,), dtype=float), + )