import numpy as np
from scipy.interpolate import InterpolatedUnivariateSpline as spline
from .parent import Match
from ..geometry import units_bank, convert_units
from ..catalog import ClData
from ..utils import veclen, str2dataunit
[docs]class ProximityMatch(Match):
def __init__(self, ):
Match.__init__(self)
self.type = 'Proximity'
[docs] def multiple(self, cat1, cat2, radius_selection='max', verbose=True):
"""
Make the one way multiple matching
Parameters
----------
cat1: clevar.ClCatalog
Base catalog
cat2: clevar.ClCatalog
Target catalog
verbose: bool
Print result for individual matches.
radius_selection: str (optional)
Case of radius to be used, can be: max, min, self, other.
"""
if cat1.mt_input is None:
raise AttributeError('cat1.mt_input is None, run prep_cat_for_match first.')
if cat2.mt_input is None:
raise AttributeError('cat2.mt_input is None, run prep_cat_for_match first.')
self.cat1_mmt = np.zeros(cat1.size, dtype=bool) # To add flag in multi step matching
ra2, dec2, sk2 = (cat2[c] for c in ('ra', 'dec', 'SkyCoord'))
ang2, z2min, z2max = (cat2.mt_input[c] for c in ('ang', 'zmin', 'zmax'))
ang2max = ang2.max()
print(f'Finding candidates ({cat1.name})')
for i, (ra1, dec1, sk1, ang1, z1min, z1max) in enumerate(zip(*(
[cat1[c] for c in ('ra', 'dec', 'SkyCoord')]+
[cat1.mt_input[c] for c in ('ang', 'zmin', 'zmax')]
))):
# crop in redshift range
mask = (z2max>=z1min)*(z2min<=z1max)
if mask.any():
# makes square crop with radius
Da = max(ang2max, ang1)
DaCos = Da / np.cos(np.radians(dec1))
mask *= (ra2>=ra1-DaCos)*(ra2<ra1+DaCos)*(dec2>=dec1-Da)*(dec2<dec1+Da)
if mask.any():
# makes circular crop
dist = sk1.separation(sk2[mask]).value
max_dist = self._max_mt_distance(ang1, ang2[mask], radius_selection=radius_selection)
for id2 in cat2['id'][mask][dist<=max_dist]:
cat1['mt_multi_self'][i].append(id2)
i2 = int(cat2.id_dict[id2])
cat2['mt_multi_other'][i2].append(cat1['id'][i])
self.cat1_mmt[i] = True
if verbose:
print(f" {i:,}({cat1.size:,}) - {len(cat1['mt_multi_self'][i]):,} candidates",
end='\r')
print(f'* {(veclen(cat1["mt_multi_self"])>0).sum():,}/{cat1.size:,} objects matched.')
cat1.remove_multiple_duplicates()
cat2.remove_multiple_duplicates()
self.history.append({
'func':'multiple', 'cats': f'{cat1.name}, {cat2.name}',
'radius_selection':radius_selection})
cat1._set_mt_hist(self.history)
cat2._set_mt_hist(self.history)
[docs] def prep_cat_for_match(self, cat, delta_z, match_radius, n_delta_z=1, n_match_radius=1,
cosmo=None):
"""
Adds zmin, zmax and radius to cat.mt_input
Parameters
----------
cat: clevar.ClCatalog
Input ClCatalog
delta_z: float, string
Defines the zmin, zmax for matching. Options are:
* `'cat'` - uses redshift properties of the catalog
* `'spline.filename'` - interpolates data in 'filename' (z, zmin, zmax) fmt
* `float` - uses delta_z*(1+z)
* `None` - does not use z
match_radius: string
Defines the radius for matching. Options are:
* `'cat'` - uses the radius in the catalog
* `'value unit'` - used fixed value (ex: `1 arcsec`, `1 Mpc`)
n_delta_z: float
Number of delta_z to be used in the matching
n_match_radius: float
Multiplies the radius of the matchingi
cosmo: clevar.Cosmology object
Cosmology object for when radius has physical units
"""
print('## Prep mt_cols')
cat.mt_input = ClData()
# Set zmin, zmax
if delta_z is None:
# No z use
print('* zmin|zmax set to -1|10')
cat.mt_input['zmin'] = -1*np.ones(cat.size)
cat.mt_input['zmax'] = 10*np.ones(cat.size)
elif delta_z == 'cat':
# Values from catalog
if 'zmin' in cat.tags and 'zmax' in cat.tags:
print('* zmin|zmax from cat cols (zmin, zmax)')
cat.mt_input['zmin'] = self._rescale_z(cat['z'], cat['zmin'], n_delta_z)
cat.mt_input['zmax'] = self._rescale_z(cat['z'], cat['zmax'], n_delta_z)
# create zmin/zmax if not there
elif 'z_err' in cat.tags:
print('* zmin|zmax from cat cols (err)')
cat.mt_input['zmin'] = cat['z']-n_delta_z*cat['z_err']
cat.mt_input['zmax'] = cat['z']+n_delta_z*cat['z_err']
else:
raise ValueError('ClCatalog must contain zmin, zmax or z_err for this matching.')
elif isinstance(delta_z, str):
# zmin/zmax in auxiliar file
print('* zmin|zmax from aux file')
k = 3
zv, zvmin, zvmax = np.loadtxt(delta_z)
zvmin = self._rescale_z(zv, zvmin, n_delta_z)
zvmax = self._rescale_z(zv, zvmax, n_delta_z)
cat.mt_input['zmin'] = spline(zv, zvmin, k=k)(cat['z'])
cat.mt_input['zmax'] = spline(zv, zvmax, k=k)(cat['z'])
elif isinstance(delta_z, (int, float)):
# zmin/zmax from sigma_z*(1+z)
print('* zmin|zmax from config value')
cat.mt_input['zmin'] = cat['z']-delta_z*n_delta_z*(1.0+cat['z'])
cat.mt_input['zmax'] = cat['z']+delta_z*n_delta_z*(1.0+cat['z'])
# Set angular radius
if match_radius == 'cat':
print('* ang radius from cat')
in_rad, in_rad_unit = cat['radius'], cat.radius_unit
# when mass is passed to radius: m#b or m#c - background/critical
if in_rad_unit.lower()!='mpc' and in_rad_unit[0].lower()=='m':
print(f" * Converting mass ({in_rad_unit}) ->radius")
delta, mtyp = str2dataunit(in_rad_unit[1:], ['b', 'c'],
err_msg=f"Mass unit ({in_rad_unit}) must be in format 'm#b' (background) or 'm#c' (critical)")
in_rad = cosmo.eval_mass2radius(in_rad, cat['z'], delta,
mass_type={'b':'background', 'c':'critical'}[mtyp])
in_rad_unit = 'mpc'
else:
print('* ang radius from set scale')
in_rad, in_rad_unit = str2dataunit(match_radius, units_bank.keys())
in_rad *= np.ones(cat.size)
# convert to degrees
cat.mt_input['ang'] = convert_units(
in_rad*n_match_radius, in_rad_unit, 'degrees',
redshift=cat['z'] if 'z' in cat.tags else None,
cosmo=cosmo)
self.history.append({
'func':'prep_cat_for_match', 'cat': cat.name, 'delta_z': delta_z, 'match_radius': match_radius,
'n_delta_z': n_delta_z, 'n_match_radius': n_match_radius,
'cosmo': cosmo if cosmo is None else cosmo.get_desc()})
def _rescale_z(self, z, zlim, n):
"""Rescale zmin/zmax by a factor n
Parameters
----------
z: float, array
Redshift value
zlim: float, array
Redshift limit
n: float
Value to rescale zlim
Returns
-------
float, array
Rescaled z limit
"""
return z+n*(zlim-z)
def _max_mt_distance(self, radius1, radius2, radius_selection):
"""Get maximum angular distance allowed for the matching
Parameters
----------
radius1: float, array
Radius to be used for catalog 1
radius2: float, array
Radius to be used for catalog 2
radius_selection: str
Case of radius to be used, can be: self, other, min, max.
Returns
-------
float, array
Maximum angular distance allowed for matching
"""
if radius_selection=='self':
f1 = np.ones(radius1.size)
f2 = np.zeros(radius2.size)
elif radius_selection=='other':
f1 = np.zeros(radius1.size)
f2 = np.ones(radius2.size)
elif radius_selection=='max':
f1 = (radius1 >= radius2)
f2 = (radius1 < radius2)
elif radius_selection=='min':
f1 = (radius1 < radius2)
f2 = (radius1 >= radius2)
return f1 * radius1 + f2 * radius2
[docs] def match_from_config(self, cat1, cat2, match_config, cosmo=None):
"""
Make matching of catalogs based on a configuration dictionary
Parameters
----------
cat1: clevar.ClCatalog
ClCatalog 1
cat2: clevar.ClCatalog
ClCatalog 2
match_config: dict
Dictionary with the matching configuration. Keys must be:
* `type`: type of matching, can be: `cat1`, `cat2`, `cross`.
* `catalog1`: `kwargs` used in `prep_cat_for_match(cat1, **kwargs)` (minus `cosmo`).
* `catalog2`: `kwargs` used in `prep_cat_for_match(cat2, **kwargs)` (minus `cosmo`).
* `which_radius`: Case of radius to be used, can be: `cat1`, `cat2`, `min`, `max`.
* `preference`: Preference to set best match, can be: `more_massive`,
`angular_proximity`, `redshift_proximity`, `shared_member_fraction`.
* `verbose`: Print result for individual matches (default=`True`).
cosmo: clevar.Cosmology object
Cosmology object for when radius has physical units
"""
if match_config['type'] not in ('cat1', 'cat2', 'cross'):
raise ValueError("config type must be cat1, cat2 or cross")
if match_config['type'] in ('cat1', 'cross'):
print("\n## ClCatalog 1")
self.prep_cat_for_match(cat1, cosmo=cosmo, **match_config['catalog1'])
if match_config['type'] in ('cat2', 'cross'):
print("\n## ClCatalog 2")
self.prep_cat_for_match(cat2, cosmo=cosmo, **match_config['catalog2'])
verbose = match_config.get('verbose', True)
if match_config['type'] in ('cat1', 'cross'):
print("\n## Multiple match (catalog 1)")
if match_config['which_radius'] == 'cat1':
radius_selection = 'self'
elif match_config['which_radius'] == 'cat2':
radius_selection = 'other'
else:
radius_selection = match_config['which_radius']
self.multiple(cat1, cat2, radius_selection, verbose=verbose)
if match_config['type'] in ('cat2', 'cross'):
print("\n## Multiple match (catalog 2)")
if match_config['which_radius'] == 'cat1':
radius_selection = 'other'
elif match_config['which_radius'] == 'cat2':
radius_selection = 'self'
else:
radius_selection = match_config['which_radius']
self.multiple(cat2, cat1, radius_selection, verbose=verbose)
if match_config['type'] in ('cat1', 'cross'):
print("\n## Finding unique matches of catalog 1")
self.unique(cat1, cat2, match_config['preference'])
if match_config['type'] in ('cat2', 'cross'):
print("\n## Finding unique matches of catalog 2")
self.unique(cat2, cat1, match_config['preference'])
if match_config['type'] == 'cross':
self.cross_match(cat1)
self.cross_match(cat2)