"""Validation scripts."""
from __future__ import annotations
from typing import Any
import numpy as np
from numpy.typing import NDArray
__all__ = [
"as_1d_float_array",
"as_2d_float_array",
"is_non_negative_integer",
"normalize_axis",
"occupied_redshift_range_from_nz",
"redshift_window_mask",
"validate_1d_pair",
"validate_finite_scalar",
"validate_forecast_vector_and_covariance",
"validate_integration_axis",
"validate_integration_params",
"validate_joint_covariance_blocks",
"validate_nonnegative_1d_array",
"validate_nonnegative_scalar",
"validate_parameter_names",
"validate_positive_1d_array",
"validate_positive_scalar",
"validate_positive_strictly_increasing_1d_array",
"validate_power_spectrum_inputs",
"validate_redshift_distribution",
"validate_redshift_edges",
"validate_redshift_distribution_support",
"validate_redshift_pair",
"validate_scale_factor",
"validate_strictly_increasing",
]
[docs]
def validate_finite_scalar(value: float, name: str) -> None:
"""Validate that a scalar is finite.
Args:
value: Scalar value to validate.
name: Name used in error messages.
Raises:
ValueError: If ``value`` is not finite.
"""
if not np.isfinite(value):
raise ValueError(f"{name} must be finite.")
[docs]
def validate_positive_scalar(value: float, name: str) -> None:
"""Validate that a scalar is finite and positive.
Args:
value: Scalar value to validate.
name: Name used in error messages.
Raises:
ValueError: If ``value`` is not finite or is not positive.
"""
validate_finite_scalar(value, name)
if value <= 0.0:
raise ValueError(f"{name} must be positive.")
[docs]
def validate_nonnegative_scalar(value: float, name: str) -> None:
"""Validate that a scalar is finite and non-negative.
Args:
value: Scalar value to validate.
name: Name used in error messages.
Raises:
ValueError: If ``value`` is not finite or is negative.
"""
validate_finite_scalar(value, name)
if value < 0.0:
raise ValueError(f"{name} must be non-negative.")
[docs]
def validate_scale_factor(a: float) -> None:
"""Validate a cosmological scale factor.
Args:
a: Scale factor.
Raises:
ValueError: If ``a`` is not in the interval ``0 < a <= 1``.
"""
validate_finite_scalar(a, "a")
if a <= 0.0 or a > 1.0:
raise ValueError("a must satisfy 0 < a <= 1.")
[docs]
def validate_redshift_pair(z_lens: float, z_source: float) -> None:
"""Validate a lens/source redshift pair.
Args:
z_lens: Lens redshift.
z_source: Source redshift.
Raises:
ValueError: If redshifts are invalid or the source is not behind the lens.
"""
validate_nonnegative_scalar(z_lens, "z_lens")
validate_finite_scalar(z_source, "z_source")
if z_source <= z_lens:
raise ValueError("z_source must be greater than z_lens.")
[docs]
def as_1d_float_array(
values: Any,
name: str,
*,
min_size: int = 1,
) -> NDArray[np.float64]:
"""Return values as a finite one-dimensional float array.
Args:
values: Input values.
name: Name used in error messages.
min_size: Minimum allowed number of values.
Returns:
Finite one-dimensional float array.
Raises:
ValueError: If the input is not one-dimensional, too short, or non-finite.
"""
array = np.asarray(values, dtype=float)
if array.ndim != 1:
raise ValueError(f"{name} must be one-dimensional. Got shape {array.shape}.")
if array.size < min_size:
raise ValueError(f"{name} must contain at least {min_size} values.")
if np.any(~np.isfinite(array)):
raise ValueError(f"{name} must contain only finite values.")
return array
[docs]
def as_2d_float_array(
values: Any,
name: str,
) -> NDArray[np.float64]:
"""Return values as a finite two-dimensional float array.
Args:
values: Input values.
name: Name used in error messages.
Returns:
Finite two-dimensional float array.
Raises:
ValueError: If the input is not two-dimensional or contains non-finite
values.
"""
array = np.asarray(values, dtype=float)
if array.ndim != 2:
raise ValueError(f"{name} must be two-dimensional. Got shape {array.shape}.")
if np.any(~np.isfinite(array)):
raise ValueError(f"{name} must contain only finite values.")
return array
[docs]
def validate_positive_1d_array(
values: Any,
name: str,
*,
min_size: int = 1,
) -> NDArray[np.float64]:
"""Return values as a positive one-dimensional float array.
Args:
values: Input values.
name: Name used in error messages.
min_size: Minimum allowed number of values.
Returns:
Positive one-dimensional float array.
Raises:
ValueError: If the input is invalid or contains non-positive values.
"""
array = as_1d_float_array(values, name, min_size=min_size)
if np.any(array <= 0.0):
raise ValueError(f"{name} must contain only positive values.")
return array
[docs]
def validate_nonnegative_1d_array(
values: Any,
name: str,
*,
min_size: int = 1,
) -> NDArray[np.float64]:
"""Return values as a non-negative one-dimensional float array.
Args:
values: Input values.
name: Name used in error messages.
min_size: Minimum allowed number of values.
Returns:
Non-negative one-dimensional float array.
Raises:
ValueError: If the input is invalid or contains negative values.
"""
array = as_1d_float_array(values, name, min_size=min_size)
if np.any(array < 0.0):
raise ValueError(f"{name} must be non-negative.")
return array
[docs]
def validate_strictly_increasing(
values: Any,
name: str,
*,
min_size: int = 2,
) -> NDArray[np.float64]:
"""Return values as a strictly increasing one-dimensional float array.
Args:
values: Input values.
name: Name used in error messages.
min_size: Minimum allowed number of values.
Returns:
Strictly increasing one-dimensional float array.
Raises:
ValueError: If the input is invalid or is not strictly increasing.
"""
array = as_1d_float_array(values, name, min_size=min_size)
if np.any(np.diff(array) <= 0.0):
raise ValueError(f"{name} must be strictly increasing.")
return array
[docs]
def validate_positive_strictly_increasing_1d_array(
values: Any,
name: str,
*,
min_size: int = 2,
) -> NDArray[np.float64]:
"""Return values as a positive strictly increasing one-dimensional array.
This is useful for physical coordinate grids such as wavenumber, radius, or
frequency grids where values must be finite, positive, and ordered.
Args:
values: Input coordinate values.
name: Name used in error messages.
min_size: Minimum allowed number of values.
Returns:
Positive strictly increasing one-dimensional float array.
Raises:
ValueError: If the input is invalid, non-positive, or not strictly
increasing.
"""
array = validate_positive_1d_array(values, name, min_size=min_size)
if np.any(np.diff(array) <= 0.0):
raise ValueError(f"{name} must be strictly increasing.")
return array
[docs]
def validate_1d_pair(
x: Any,
y: Any,
*,
x_name: str = "x",
y_name: str = "y",
) -> None:
"""Validate two matching one-dimensional arrays.
Args:
x: First array. This array must be strictly increasing.
y: Second array.
x_name: Name of the first array for error messages.
y_name: Name of the second array for error messages.
Raises:
ValueError: If arrays are not finite, one-dimensional, matching, or if
``x`` is not strictly increasing.
"""
x_arr = validate_strictly_increasing(x, x_name, min_size=1)
y_arr = as_1d_float_array(y, y_name, min_size=1)
if x_arr.shape != y_arr.shape:
raise ValueError(
f"{x_name} and {y_name} must have matching shapes. Got {x_arr.shape} and {y_arr.shape}."
)
[docs]
def validate_joint_covariance_blocks(
cov_gm_gm: Any,
cov_gg_gg: Any,
cov_gm_gg: Any,
) -> tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]]:
"""Validate and return covariance blocks for a joint Delta Sigma covariance.
Args:
cov_gm_gm: Auto-covariance block for the galaxy-matter signal.
cov_gg_gg: Auto-covariance block for the galaxy-galaxy signal.
cov_gm_gg: Cross-covariance block between the two signals.
Returns:
Validated covariance blocks as finite float arrays.
Raises:
ValueError: If any block has an invalid shape.
"""
cov_gm_gm_arr = as_2d_float_array(cov_gm_gm, "cov_gm_gm")
cov_gg_gg_arr = as_2d_float_array(cov_gg_gg, "cov_gg_gg")
cov_gm_gg_arr = as_2d_float_array(cov_gm_gg, "cov_gm_gg")
if cov_gm_gm_arr.shape[0] != cov_gm_gm_arr.shape[1]:
raise ValueError("cov_gm_gm must be square.")
if cov_gg_gg_arr.shape[0] != cov_gg_gg_arr.shape[1]:
raise ValueError("cov_gg_gg must be square.")
expected_cross_shape = (
cov_gm_gm_arr.shape[0],
cov_gg_gg_arr.shape[0],
)
if cov_gm_gg_arr.shape != expected_cross_shape:
raise ValueError(
"cov_gm_gg has incompatible shape. "
f"Expected {expected_cross_shape}, got {cov_gm_gg_arr.shape}."
)
return cov_gm_gm_arr, cov_gg_gg_arr, cov_gm_gg_arr
[docs]
def normalize_axis(axis: int, ndim: int) -> int:
"""Return a non-negative axis index for an array with ``ndim`` dimensions.
Args:
axis: Axis index, allowing negative indexing.
ndim: Number of dimensions of the target array.
Returns:
Equivalent non-negative axis index.
Raises:
ValueError: If ``ndim`` is not positive or if ``axis`` is out of bounds.
"""
if ndim <= 0:
raise ValueError("ndim must be positive.")
if not -ndim <= axis < ndim:
raise ValueError(f"axis {axis} is out of bounds for an array with {ndim} dimensions.")
return axis % ndim
[docs]
def validate_integration_axis(
values: Any,
x: Any,
*,
axis: int = -1,
) -> None:
"""Validate that ``x`` matches an integration axis of ``values``.
Args:
values: Array of values to integrate.
x: Coordinate array for the integration axis.
axis: Integration axis.
Raises:
ValueError: If ``x`` is invalid or does not match the chosen axis.
"""
values_arr = np.asarray(values, dtype=float)
x_arr = validate_strictly_increasing(x, "x", min_size=2)
if values_arr.ndim == 0:
raise ValueError("values must have at least one dimension.")
axis = normalize_axis(axis, values_arr.ndim)
if values_arr.shape[axis] != x_arr.size:
raise ValueError(
"x length must match the integration axis of values. "
f"Got x.size={x_arr.size} and values.shape[{axis}]="
f"{values_arr.shape[axis]}."
)
[docs]
def validate_redshift_edges(
z_min: float,
z_max: float,
*,
z_min_name: str = "z_min",
z_max_name: str = "z_max",
) -> None:
"""Validate lower and upper redshift edges.
Args:
z_min: Lower redshift edge.
z_max: Upper redshift edge.
z_min_name: Name of the lower edge for error messages.
z_max_name: Name of the upper edge for error messages.
Raises:
ValueError: If the redshift edges are invalid.
"""
validate_nonnegative_scalar(z_min, z_min_name)
validate_finite_scalar(z_max, z_max_name)
if z_max <= z_min:
raise ValueError(f"{z_max_name} must be greater than {z_min_name}.")
[docs]
def validate_redshift_distribution(
z: Any,
nz: Any,
*,
name: str = "nz",
) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
"""Validate and return a redshift distribution.
Args:
z: Redshift grid.
nz: Redshift distribution evaluated on ``z``.
name: Name used in error messages.
Returns:
Redshift grid and redshift distribution as float arrays.
Raises:
ValueError: If the inputs are invalid.
"""
z_arr = validate_strictly_increasing(z, "z", min_size=2)
nz_arr = validate_nonnegative_1d_array(nz, name, min_size=2)
if z_arr.shape != nz_arr.shape:
raise ValueError(
f"z and {name} must have matching shapes. Got {z_arr.shape} and {nz_arr.shape}."
)
if np.any(z_arr < 0.0):
raise ValueError("z must be non-negative.")
return z_arr, nz_arr
[docs]
def validate_integration_params(params: dict[str, Any]) -> None:
"""Validate lens-magnification integration settings.
Args:
params: Integration settings dictionary.
Raises:
ValueError: If any setting is invalid.
"""
if int(params["n_ell"]) <= 0:
raise ValueError("n_ell must be positive.")
if float(params["ell_min"]) <= 0.0:
raise ValueError("ell_min must be positive.")
if float(params["ell_max"]) <= float(params["ell_min"]):
raise ValueError("ell_max must be greater than ell_min.")
if float(params["z_stepsize"]) <= 0.0:
raise ValueError("z_stepsize must be positive.")
if float(params["z_min"]) < 0.0:
raise ValueError("z_min must be non-negative.")
if float(params["delta_z_source"]) <= 0.0:
raise ValueError("delta_z_source must be positive.")
[docs]
def is_non_negative_integer(value: float | int) -> bool:
"""Return whether a value is a non-negative integer."""
value_float = float(value)
return value_float >= 0.0 and value_float.is_integer()
[docs]
def occupied_redshift_range_from_nz(
z: Any,
nz: Any,
*,
threshold: float = 0.0,
name: str = "nz",
) -> tuple[float, float]:
"""Return the redshift range occupied by a redshift distribution.
The occupied range is defined by the first and last redshift values where
the redshift distribution is above ``threshold``. This is useful when an
approximate tomographic-bin extent is needed but explicit bin edges are not
available.
Args:
z: Redshift grid.
nz: Redshift distribution evaluated on ``z``.
threshold: Minimum ``n(z)`` value used to define the occupied range.
name: Name of the redshift distribution used in error messages.
Returns:
Lower and upper occupied redshift values.
Raises:
ValueError: If the redshift distribution is invalid, if ``threshold`` is
negative, or if fewer than two redshift cells are occupied.
"""
validate_nonnegative_scalar(threshold, "threshold")
z_arr, nz_arr = validate_redshift_distribution(
z,
nz,
name=name,
)
occupied = nz_arr > threshold
if np.count_nonzero(occupied) < 2:
raise ValueError(f"{name} must be non-zero in at least two redshift cells.")
z_used = z_arr[occupied]
return float(z_used[0]), float(z_used[-1])
[docs]
def validate_forecast_vector_and_covariance(
data_vector: Any,
covariance: Any,
*,
data_vector_name: str = "data_vector",
covariance_name: str = "covariance",
) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
"""Validate and return a forecast data vector and matching covariance.
Args:
data_vector: Forecast data vector.
covariance: Forecast covariance matrix.
data_vector_name: Name of the data vector used in error messages.
covariance_name: Name of the covariance matrix used in error messages.
Returns:
Validated one-dimensional data vector and two-dimensional covariance
matrix.
Raises:
ValueError: If the data vector or covariance is invalid, or if the
covariance shape does not match the data-vector length.
"""
data_vector_arr = as_1d_float_array(data_vector, data_vector_name, min_size=1)
covariance_arr = as_2d_float_array(covariance, covariance_name)
expected_shape = (data_vector_arr.size, data_vector_arr.size)
if covariance_arr.shape != expected_shape:
raise ValueError(
f"{covariance_name} shape does not match {data_vector_name}. "
f"Expected {expected_shape}, got {covariance_arr.shape}."
)
return data_vector_arr, covariance_arr
[docs]
def validate_parameter_names(
parameter_names: Any,
theta0: Any,
*,
parameter_names_name: str = "parameter_names",
theta0_name: str = "theta0",
) -> tuple[str, ...]:
"""Validate parameter names against a fiducial parameter vector.
Args:
parameter_names: Names associated with the forecast parameters.
theta0: Fiducial parameter vector.
parameter_names_name: Name of the parameter-name sequence used in error
messages.
theta0_name: Name of the fiducial parameter vector used in error
messages.
Returns:
Parameter names as a tuple of strings.
Raises:
ValueError: If the number of names does not match the number of
fiducial parameters.
TypeError: If any parameter name is not a string.
"""
theta0_arr = as_1d_float_array(theta0, theta0_name, min_size=1)
names = tuple(parameter_names or ())
if any(not isinstance(name, str) for name in names):
raise TypeError(f"{parameter_names_name} must contain only strings.")
if names and len(names) != theta0_arr.size:
raise ValueError(
f"{parameter_names_name} must have the same length as {theta0_name}. "
f"Got {len(names)} names for {theta0_arr.size} parameters."
)
return names
[docs]
def redshift_window_mask(
z: NDArray[np.float64],
*,
z_min: float | None = None,
z_max: float | None = None,
) -> NDArray[np.bool_]:
"""Return a mask selecting redshifts inside optional window limits."""
z_arr = validate_nonnegative_1d_array(z, "z", min_size=2)
if z_min is not None:
validate_nonnegative_scalar(z_min, "z_min")
if z_max is not None:
validate_nonnegative_scalar(z_max, "z_max")
if z_min is not None and z_max is not None:
validate_redshift_edges(z_min, z_max)
mask = np.ones(z_arr.shape, dtype=bool)
if z_min is not None:
mask &= z_arr >= z_min
if z_max is not None:
mask &= z_arr <= z_max
return mask
[docs]
def validate_redshift_distribution_support(
z,
nz,
*,
name="redshift_distribution",
trim_edge_points=0,
):
"""Validate and select the positive support of a redshift distribution.
Leading and trailing zero-density points are removed. Interior non-positive
values inside the selected support are rejected.
"""
z_arr, nz_arr = validate_redshift_distribution(z, nz, name=name)
if not is_non_negative_integer(trim_edge_points):
raise ValueError("trim_edge_points must be a non-negative integer.")
trim_edge_points = int(trim_edge_points)
positive_support = np.where(nz_arr > 0.0)[0]
if positive_support.size == 0:
raise ValueError(f"{name} normalization must be finite and positive.")
first_support = int(positive_support[0])
last_support = int(positive_support[-1]) + 1
z_use = z_arr[first_support:last_support]
nz_use = nz_arr[first_support:last_support]
if z_use.size < 2:
raise ValueError(
f"{name} support must contain at least two redshift values "
"after support/window filtering."
)
if np.any(nz_use <= 0.0):
raise ValueError(f"{name} normalization must be finite and positive.")
if trim_edge_points > 0:
if z_use.size <= 2 * trim_edge_points:
raise ValueError(
f"trim_edge_points removes all {name} support. "
f"support size = {z_use.size}, "
f"trim_edge_points = {trim_edge_points}."
)
z_use = z_use[trim_edge_points:-trim_edge_points]
nz_use = nz_use[trim_edge_points:-trim_edge_points]
if z_use.size < 2:
raise ValueError(
f"{name} support must contain at least two redshift values "
"after support/window filtering."
)
norm = np.trapezoid(nz_use, z_use)
if not np.isfinite(norm) or norm <= 0.0:
raise ValueError(f"{name} normalization must be finite and positive.")
return z_use, nz_use, norm