"""General utility functions that are used in multiple modules"""
import warnings
import numpy as np
from scipy.integrate import simps
from scipy.interpolate import interp1d
def _integ_pzfuncs(pzpdf, pzbins, zmin=0.0, zmax=5, kernel=lambda z: 1.0, ngrid=1000):
r"""
Integrates the product of a photo-z pdf with a given kernel.
This function was created to allow for data with different photo-z binnings.
Parameters
----------
pzpdf : list of arrays
Photometric probablility density functions of the source galaxies.
pzbins : list of arrays
Redshift axis on which the individual photoz pdf is tabulated.
zmin : float, optional
Minimum redshift for integration. Default: 0
zmax : float, optional
Maximum redshift for integration. Default: 5
kernel : function, optional
Function to be integrated with the pdf, must be f(z_array) format.
Default: kernel(z)=1
ngrid : int, optional
Number of points for the interpolation of the redshift pdf.
Returns
-------
numpy.ndarray
Kernel integrated with the pdf of each galaxy.
Notes
-----
Will be replaced by qp at some point.
"""
# adding these lines to interpolate CLMM redshift grid for each galaxies
# to a constant redshift grid for all galaxies. If there is a constant grid for all galaxies
# these lines are not necessary and z_grid, pz_matrix = pzbins, pzpdf
if hasattr(pzbins[0], "__len__"):
# First need to interpolate on a fixed grid
z_grid = np.linspace(zmin, zmax, ngrid)
pdf_interp_list = [
interp1d(pzbin, pdf, bounds_error=False, fill_value=0.0)
for pzbin, pdf in zip(pzbins, pzpdf)
]
pz_matrix = np.array([pdf_interp(z_grid) for pdf_interp in pdf_interp_list])
kernel_matrix = kernel(z_grid)
else:
# OK perform the integration directly from the pdf binning common to all galaxies
mask = (pzbins >= zmin) * (pzbins <= zmax)
z_grid = pzbins[mask]
pz_matrix = np.array(pzpdf)[:, mask]
kernel_matrix = kernel(z_grid)
return simps(pz_matrix * kernel_matrix, x=z_grid, axis=1)
[docs]
def compute_for_good_redshifts(
function,
z1,
z2,
bad_value,
warning_message,
z1_arg_name="z1",
z2_arg_name="z2",
r_proj=None,
**kwargs,
):
"""Computes function only for `z1` < `z2`, the rest is filled with `bad_value`
Parameters
----------
function: function
Function to be executed
z1: float, array_like
Redshift lower
z2: float, array_like
Redshift higher
bad_value: any
Value to fill when `z1` >= `z2`
warning_message: str
Warning message to be displayed when `z1` >= `z2`
z1_arg_name: str, optional
Name of the keyword argument that `z1` is passed to. Default: 'z1'
z2_arg_name: str, optional
Name of the keyword argument that `z2` is passed to. Default: 'z2'
r_proj: float, array_like, optional
Value to be passed to keyword argument `r_proj` of `function`. Default: None
Returns
-------
Return type of `function`
Output of `function` with value for `z1` >= `z2` replaced by `bad_value`
"""
kwargs = {z1_arg_name: locals()["z1"], z2_arg_name: locals()["z2"], **kwargs}
z_good = np.less(z1, z2)
if r_proj is not None:
r_proj = np.array(r_proj) * np.full_like(z_good, True)
z_good = z_good * r_proj.astype(bool)
kwargs.update({"r_proj": r_proj[z_good] if np.iterable(r_proj) else r_proj})
if not np.all(z_good):
warnings.warn(warning_message, stacklevel=2)
if np.iterable(z_good):
res = np.full(z_good.shape, bad_value)
if np.any(z_good):
kwargs[z1_arg_name] = np.array(z1)[z_good] if np.iterable(z1) else z1
kwargs[z2_arg_name] = np.array(z2)[z_good] if np.iterable(z2) else z2
res[z_good] = function(**kwargs)
else:
res = bad_value
else:
res = function(**kwargs)
return res