Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
291 changes: 164 additions & 127 deletions src/lfkit/photometry/lf_integrals.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@
import numpy as np

from lfkit.utils.types import FloatArray, FloatInput, LuminosityFunction
from lfkit.utils.integrators import integrate_between_variable_bounds, safe_divide
from lfkit.utils.evaluators import (
evaluate_optional_redshift_callable,
evaluate_positive_redshift_callable,
evaluate_lf_on_grid,
evaluate_weight_on_grid,
)
from lfkit.utils.validators import validate_array


Expand All @@ -33,6 +40,7 @@
"integrated_luminosity_density",
"mean_luminosity",
"cumulative_number_density",
"magnitude_window_number_density",
]


Expand Down Expand Up @@ -264,7 +272,7 @@ def mean_luminosity(
n_m=n_m,
)

return _safe_divide(luminosity_density, number_density)
return safe_divide(luminosity_density, number_density)


def cumulative_number_density(
Expand Down Expand Up @@ -336,165 +344,194 @@ def cumulative_number_density(
)


def _integrated_lf_between_bounds(
def magnitude_window_number_density(
z: FloatInput,
lf: LuminosityFunction,
*,
m_lower: FloatInput,
m_upper: FloatInput,
n_m: int,
weight_fn: Callable[[FloatArray, FloatArray], FloatArray] | None = None,
m_bright: FloatInput | None = None,
m_faint: FloatInput | None = None,
apparent_m_bright: FloatInput | None = None,
apparent_m_faint: FloatInput | None = None,
luminosity_distance_mpc_fn: Callable[[FloatArray], FloatArray] | None = None,
k_correction_fn: Callable[[FloatArray], FloatArray] | None = None,
e_correction_fn: Callable[[FloatArray], FloatArray] | None = None,
n_m: int = 512,
) -> FloatArray:
r"""Return finite-range LF integral between magnitude bounds."""
z_arr = validate_array(z, name="z")
m_lower_arr = validate_array(m_lower, name="m_lower")
m_upper_arr = validate_array(m_upper, name="m_upper")

_validate_integration_inputs(
z=z_arr,
m_lower=m_lower_arr,
m_upper=m_upper_arr,
n_m=n_m,
)

z_b, m_lower_b, m_upper_b = np.broadcast_arrays(
z_arr,
m_lower_arr,
m_upper_arr,
)

integral = np.zeros_like(z_b, dtype=float)
valid = m_upper_b > m_lower_b
r"""Return LF number density inside a magnitude-selection window.

if not np.any(valid):
return np.asarray(integral, dtype=float)
This integrates a luminosity function over a finite absolute-magnitude
range. The bright and faint limits may be supplied directly as absolute
magnitudes, converted from apparent magnitudes, or supplied as a mixture of
both.

m_grid = _magnitude_grid(
m_lower=m_lower_b[valid],
m_upper=m_upper_b[valid],
n_m=n_m,
)
z_grid = np.broadcast_to(z_b[valid][None, :], m_grid.shape)

phi = _evaluate_lf_on_grid(lf, m_grid=m_grid, z_grid=z_grid)
Magnitudes are ordered so that more negative values are brighter. Apparent
magnitude limits are converted to absolute-magnitude limits at each
redshift before integration.

if weight_fn is not None:
weight = _evaluate_weight_on_grid(
weight_fn,
m_grid=m_grid,
z_grid=z_grid,
)
phi = phi * weight
This helper is science-use-case agnostic. It only defines the LF integral
over a magnitude window; interpretation of that selected population belongs
to the calling analysis.

integral[valid] = np.trapezoid(phi, x=m_grid, axis=0)
Args:
z: Redshift value or array-like of redshift values.
lf: Luminosity-function callable with signature ``lf(M, z)``.
m_bright: Bright absolute-magnitude bound.
m_faint: Faint absolute-magnitude bound.
apparent_m_bright: Bright apparent-magnitude bound.
apparent_m_faint: Faint apparent-magnitude bound.
luminosity_distance_mpc_fn: Callable returning luminosity distance in
Mpc. Required when either apparent-magnitude bound is supplied.
k_correction_fn: Optional K-correction callable evaluated at ``z``.
e_correction_fn: Optional E-correction callable evaluated at ``z``.
n_m: Number of magnitude-grid points used for the integral.

return np.asarray(integral, dtype=float)
Returns:
NumPy array of number densities evaluated at ``z``.

Raises:
ValueError: If a bright or faint bound is missing, if both absolute and
apparent values are supplied for the same bound, or if an apparent
bound is supplied without a luminosity-distance callable.
"""
z_arr = validate_array(z, name="z")

def _magnitude_grid(
*,
m_lower: FloatArray,
m_upper: FloatArray,
n_m: int,
) -> FloatArray:
r"""Return a magnitude grid for column-wise finite-range integration."""
t = np.linspace(0.0, 1.0, int(n_m), dtype=float)
m_bright_resolved = _resolve_magnitude_window_bound(
z_arr,
absolute_mag=m_bright,
apparent_mag=apparent_m_bright,
luminosity_distance_mpc_fn=luminosity_distance_mpc_fn,
k_correction_fn=k_correction_fn,
e_correction_fn=e_correction_fn,
bound_name="bright",
)
m_faint_resolved = _resolve_magnitude_window_bound(
z_arr,
absolute_mag=m_faint,
apparent_mag=apparent_m_faint,
luminosity_distance_mpc_fn=luminosity_distance_mpc_fn,
k_correction_fn=k_correction_fn,
e_correction_fn=e_correction_fn,
bound_name="faint",
)

return np.asarray(
m_lower[None, :] + t[:, None] * (m_upper[None, :] - m_lower[None, :]),
dtype=float,
return integrated_number_density(
z_arr,
lf,
m_bright=m_bright_resolved,
m_faint=m_faint_resolved,
n_m=n_m,
)


def _evaluate_lf_on_grid(
lf: LuminosityFunction,
def _resolve_magnitude_window_bound(
z: FloatArray,
*,
m_grid: FloatArray,
z_grid: FloatArray,
absolute_mag: FloatInput | None,
apparent_mag: FloatInput | None,
luminosity_distance_mpc_fn: Callable[[FloatArray], FloatArray] | None,
k_correction_fn: Callable[[FloatArray], FloatArray] | None,
e_correction_fn: Callable[[FloatArray], FloatArray] | None,
bound_name: str,
) -> FloatArray:
r"""Return LF values evaluated on a magnitude-redshift grid."""
phi = np.asarray(lf(m_grid, z_grid), dtype=float)

if phi.shape != m_grid.shape:
try:
phi = np.broadcast_to(phi, m_grid.shape)
except ValueError as exc:
raise ValueError(
"lf(M, z) must return values broadcastable to the shape "
"of the magnitude-redshift integration grid."
) from exc
r"""Return an absolute-magnitude bound for a magnitude window."""
if absolute_mag is None and apparent_mag is None:
raise ValueError(
f"Must provide either m_{bound_name} or apparent_m_{bound_name}."
)

if np.any(~np.isfinite(phi)):
raise ValueError("lf(M, z) returned non-finite values.")
if absolute_mag is not None and apparent_mag is not None:
raise ValueError(
f"Provide only one of m_{bound_name} or apparent_m_{bound_name}."
)

if np.any(phi < 0.0):
raise ValueError("lf(M, z) must be non-negative.")
if absolute_mag is not None:
return validate_array(absolute_mag, name=f"m_{bound_name}")

return np.asarray(phi, dtype=float)
if luminosity_distance_mpc_fn is None:
raise ValueError(
"luminosity_distance_mpc_fn is required when apparent magnitude "
f"bounds are supplied. Missing conversion for apparent_m_{bound_name}."
)

apparent_mag_arr = validate_array(
apparent_mag,
name=f"apparent_m_{bound_name}",
)
luminosity_distance_mpc = evaluate_positive_redshift_callable(
luminosity_distance_mpc_fn,
z,
name="luminosity_distance_mpc_fn",
)

def _evaluate_weight_on_grid(
weight_fn: Callable[[FloatArray, FloatArray], FloatArray],
*,
m_grid: FloatArray,
z_grid: FloatArray,
) -> FloatArray:
r"""Return finite non-negative weight values on a magnitude-redshift grid."""
weight = np.asarray(weight_fn(m_grid, z_grid), dtype=float)
k_correction = evaluate_optional_redshift_callable(
k_correction_fn,
z,
name="k_correction_fn",
)
e_correction = evaluate_optional_redshift_callable(
e_correction_fn,
z,
name="e_correction_fn",
)

if weight.shape != m_grid.shape:
try:
weight = np.broadcast_to(weight, m_grid.shape)
except ValueError as exc:
raise ValueError(
"weight_fn(M, z) must return values broadcastable to the shape "
"of the magnitude-redshift integration grid."
) from exc
if k_correction is None:
k_correction = np.zeros_like(z, dtype=float)

if np.any(~np.isfinite(weight)):
raise ValueError("weight_fn(M, z) returned non-finite values.")
if e_correction is None:
e_correction = np.zeros_like(z, dtype=float)

if np.any(weight < 0.0):
raise ValueError("weight_fn(M, z) must be non-negative.")
distance_modulus = 5.0 * np.log10(luminosity_distance_mpc) + 25.0

return np.asarray(weight, dtype=float)
return np.asarray(
apparent_mag_arr - distance_modulus - k_correction + e_correction,
dtype=float,
)


def _validate_integration_inputs(
def _integrated_lf_between_bounds(
z: FloatInput,
lf: LuminosityFunction,
*,
z: FloatArray,
m_lower: FloatArray,
m_upper: FloatArray,
m_lower: FloatInput,
m_upper: FloatInput,
n_m: int,
) -> None:
r"""Validate inputs for finite-range LF integration."""
if np.any(z < 0.0):
raise ValueError("Redshift z must be >= 0.")

if np.any(~np.isfinite(m_lower)):
raise ValueError("m_lower must contain only finite values.")
weight_fn: Callable[[FloatArray, FloatArray], FloatArray] | None = None,
) -> FloatArray:
r"""Return finite-range LF integral between magnitude bounds."""
z_arr = validate_array(z, name="z")

if np.any(~np.isfinite(m_upper)):
raise ValueError("m_upper must contain only finite values.")
if np.any(z_arr < 0.0):
raise ValueError("Redshift z must be >= 0.")

if n_m < 2:
raise ValueError("n_m must be at least 2.")
def integrand(
absolute_mag: FloatArray,
redshift: FloatArray,
) -> FloatArray:
phi = evaluate_lf_on_grid(
lf,
m_grid=absolute_mag,
z_grid=redshift,
)

if weight_fn is None:
return phi

def _safe_divide(
numerator: FloatArray,
denominator: FloatArray,
) -> FloatArray:
r"""Return numerator / denominator with zero output for zero denominator."""
numerator_arr = np.asarray(numerator, dtype=float)
denominator_arr = np.asarray(denominator, dtype=float)

with np.errstate(divide="ignore", invalid="ignore"):
result = np.divide(
numerator_arr,
denominator_arr,
out=np.zeros_like(numerator_arr, dtype=float),
where=denominator_arr > 0.0,
weight = evaluate_weight_on_grid(
weight_fn,
m_grid=absolute_mag,
z_grid=redshift,
)

return np.asarray(result, dtype=float)
return np.asarray(phi * weight, dtype=float)

return integrate_between_variable_bounds(
z_arr,
lower=m_lower,
upper=m_upper,
integrand_fn=integrand,
n_grid=n_m,
y_name="z",
lower_name="m_lower",
upper_name="m_upper",
n_grid_name="n_m",
)
Loading
Loading