Source code for clevar.footprint.footprint

#!/usr/bin/env python
import warnings
import numpy as np

from ..catalog import ClData, TagData, ClCatalog
from ..geometry import convert_units, angular_bank, physical_bank
from ..utils import none_val, hp, updated_dict
from astropy.coordinates import SkyCoord
from astropy import units as u
from .nfw_funcs import nfw2D_profile_flatcore
from ..match_metrics import plot_helper as ph
from ..match_metrics.plot_helper import plt

[docs]class Footprint(TagData): ''' Functions for footprint management Attributes ---------- nside: int Heapix NSIDE nest: bool If ordering is nested data: clevar.ClData Data with columns: pixel, detfrac, zmax pixel_dict: dict Dictionary to point to pixel in data size: int Number of objects in the catalog tags: LoweCaseDict Tag for main quantities used in matching and plots (ex: pixel, detfrac, zmax) ''' @property def pixel_dict(self): return self.__pixel_dict @property def nside(self): return self.data.meta['nside'] @property def nest(self): return self.data.meta['nest'] @nside.setter def nside(self, nside): self.data.meta['nside'] = nside @nest.setter def nest(self, nest): self.data.meta['nest'] = nest def __init__(self, nside=None, tags=None, nest=False, **kwargs): ''' Parameters ---------- nside: int Heapix NSIDE nest: bool If ordering is nested pixel: array Pixels inside the footprint detfrac_vals: array, None Detection fraction, if None is set to 1 zmax_vals: array, None Zmax, if None is set to 99 ''' self.__pixel_dict = {} tags = updated_dict({'pixel':'pixel'}, tags) if len(kwargs)>0: kwargs['nside'] = nside kwargs['nest'] = nest TagData.__init__(self, tags=tags, default_tags=['pixel', 'detfrac', 'zmax'], **kwargs) def _add_values(self, nside=None, nest=False, **columns): ''' Adds provided values for attribues and assign default values to rest Parameters ---------- nside: int Heapix NSIDE nest: bool If ordering is nested pixel: array Pixels inside the footprint detfrac: array Detection fraction. If None, value 1 is assigned. zmax: array Zmax. If None, value 99 is assigned. ''' if not isinstance(nside, int) or (nside&(nside-1)!=0) or nside==0: raise ValueError(f'nside (={nside}) must be a power of 2.') self.nside = nside self.nest = nest TagData._add_values(self, **columns) self['pixel'] = self['pixel'].astype(int) if self.tags.get('detfrac', None) not in self.colnames: self['detfrac'] = 1. if self.tags.get('zmax', None) not in self.colnames: self['zmax'] = 99. ra, dec = hp.pix2ang(nside, self['pixel'], lonlat=True, nest=nest) self['SkyCoord'] = SkyCoord(ra*u.deg, dec*u.deg, frame='icrs') self.pixel_dict.update(self._make_col_dict('pixel'))
[docs] def get_map(self, data, bad_val=0): ''' Transforms a internal quantity into a map Parameters ---------- data: str Name of internal data to be used. bad_val: float, None Values for pixels outside footprint. Returns ------- Map Healpix map with the given values, other pixels are zero ''' return hp.pix2map(self.nside, self['pixel'], self[data], bad_val)
[docs] def get_values_in_pixels(self, data, pixels, bad_val, transform=lambda x:x): ''' Transforms a internal quantity into a map Parameters ---------- values: array Values in each pixel. data: str Name of internal data to be used. pixels: list List of pixels bad_val: float, None Values for pixels outside footprint. transform: function Function to be applied to data for value in pixel. Default is f(data)=data. Returns ------- Map Healpix map with the given values, other pixels are zero ''' return np.array([transform(self[data][self.pixel_dict[p]]) if p in self.pixel_dict else bad_val for p in pixels])
[docs] def zmax_masks_from_footprint(self, ra, dec, z): ''' Create a zmax mask for a catalog based on a footprint Parameters ---------- ra: numpy array Ra array in degrees dec: numpy array Dec array in degrees z: numpy array Redshift array Returns ------- ndarray Arrays of booleans of objects in footprint ''' pixels = hp.ang2pix(self.nside, ra, dec, nest=self.nest, lonlat=True) print("## creating visibility mask ##") #zmax_vals = self.get_map(self['zmax'])[pixels] old method zmax_vals = self.get_values_in_pixels('zmax', pixels, 0) return z<=zmax_vals
[docs] @classmethod def read(self, filename, nside, tags, nest=False, full=False): """ Loads fits file and converst to FootprintZmax object Parameters ---------- filename: str Name of input file nside: int Healpix nside pixel_name: str Name of pixel column detfrac_name: str, None Name of detection fraction colum. If None value 1 is assigned. zmax_name: Name of maximum redshit colum. If None value 99 is assigned. tags: LoweCaseDict, None Tags for table (required if full=False). full: bool Reads all columns of the catalog """ if not isinstance(tags, dict): raise ValueError(f'tags (={tags}) must be a dictionary.') elif 'pixel' not in tags: raise ValueError(f'pixel must be a key in tags (={tags})') data = ClData.read(filename) if not full: data._check_cols(tags.values()) data = data[list(tags.values())] return self(nside=nside, nest=nest, data=data, tags=tags)
def __repr__(self): out = f"FootprintZmax object with {len(self.data):,} pixels\n" out += "zmax: [%g, %g]\n"%(self['zmax'].min(), self['zmax'].max()) out += "detfrac: [%g, %g]"%(self['detfrac'].min(), self['detfrac'].max()) return out def _get_coverfrac(self, cl_sk, cl_z, aperture_radius, aperture_radius_unit, cosmo=None, wtfunc=lambda pixels, sk: np.ones(len(pixels))): ''' Get cover fraction Parameters ---------- cl_sk: astropy.coordinates.SkyCoord Cluster SkyCoord [degrees, frame='icrs'] cl_z: float Cluster redshift aperture_radius: float Radius of aperture aperture_radius_unit: str Unit of aperture radius cosmo: clevar.Cosmology object Cosmology object for when aperture has physical units. wtfunc: function Window function, must take (pixels, SkyCoord) as input. Default is flat window. Returns ------- float Cover fraction ''' if aperture_radius_unit in physical_bank and cosmo is None: raise TypeError('A cosmology is necessary if aperture in physical units.') pix_list = hp.query_disc( nside=self.nside, inclusive=True, nest=self.nest, vec=hp.pixelfunc.ang2vec(cl_sk.ra.value, cl_sk.dec.value, lonlat=True), radius=convert_units(aperture_radius, aperture_radius_unit, 'radians', redshift=cl_z, cosmo=cosmo) ) weights = np.array(wtfunc(pix_list, cl_sk)) detfrac_vals = self.get_values_in_pixels('detfrac', pix_list, 0) zmax_vals = self.get_values_in_pixels('zmax', pix_list, 0) values = detfrac_vals*np.array(cl_z<=zmax_vals, dtype=float) return (weights*values).sum()/weights.sum() def _get_coverfrac_nfw2D(self, cl_sk, cl_z, cl_radius, cl_radius_unit, aperture_radius, aperture_radius_unit, cosmo): ''' Cover fraction with NFW 2D flatcore window. It is computed using: Parameters ---------- cl_sk: astropy.coordinates.SkyCoord Cluster SkyCoord [degrees, frame='icrs'] cl_z: float Cluster redshift cl_radius: float Cluster radius cl_radius_unit: str Unit of cluster radius aperture_radius: float Radius of aperture aperture_radius_unit: str Unit of aperture radius cosmo: clevar.Cosmology object Cosmology object. Returns ------- float ''' cl_radius_mpc = convert_units(cl_radius, cl_radius_unit, 'mpc', redshift=cl_z, cosmo=cosmo) return self._get_coverfrac(cl_sk, cl_z, aperture_radius, aperture_radius_unit, cosmo=cosmo, wtfunc=lambda pix_list, cl_sk: self._nfw_flatcore_window_func( pix_list, cl_sk, cl_z, cl_radius_mpc, cosmo))
[docs] def get_coverfrac(self, cl_ra, cl_dec, cl_z, aperture_radius, aperture_radius_unit, cosmo=None, wtfunc=lambda pixels, sk: np.ones(len(pixels))): r''' Get cover fraction with a given window. .. math:: CF(R) = \frac{\sum_{r_i<R}w(r_i)df(r_i)}{\sum_{r_i<R}w(r_i)} where the index `i` represents pixels of the footprint, :math:`r_i` is the distance between the cluster center and the pixel center, :math:`R` is the aperture radius to be considered and :math:`w` is the window function. Parameters ---------- cl_ra: float Cluster RA in deg cl_dec: float Cluster DEC in deg cl_z: float Cluster redshift aperture_radius: float Radius of aperture aperture_radius_unit: str Unit of aperture radius cosmo: clevar.Cosmology object Cosmology object for when aperture has physical units. wtfunc: function Window function, must take (pixels, SkyCoord) as input. Default is flat window. Returns ------- float Cover fraction ''' return self._get_coverfrac(SkyCoord(cl_ra*u.deg, cl_dec*u.deg, frame='icrs'), cl_z, aperture_radius, aperture_radius_unit, cosmo=cosmo, wtfunc=wtfunc)
[docs] def get_coverfrac_nfw2D(self, cl_ra, cl_dec, cl_z, cl_radius, cl_radius_unit, aperture_radius, aperture_radius_unit, cosmo): r''' Cover fraction with NFW 2D flatcore window. .. math:: CF(R) = \frac{\sum_{r_i<R}w_{NFW}(r_i)df(r_i)}{\sum_{r_i<R}w_{NFW}(r_i)} where the index `i` represents pixels of the footprint, :math:`r_i` is the distance between the cluster center and the pixel center, :math:`R` is the aperture radius to be considered and :math:`w_{nfw}` is the NFW 2D flatcore window function. Parameters ---------- cl_ra: float Cluster RA in deg cl_dec: float Cluster DEC in deg cl_z: float Cluster redshift cl_radius: float Cluster radius cl_radius_unit: str Unit of cluster radius aperture_radius: float Radius of aperture aperture_radius_unit: str Unit of aperture radius cosmo: clevar.Cosmology object Cosmology object. Returns ------- float Notes ----- The NFW 2D function was taken from section 3.1 of arXiv:1104.2089 and was validated with Rs = 0.15 Mpc/h (0.214 Mpc) and Rcore = 0.1 Mpc/h (0.142 Mpc). ''' return self._get_coverfrac_nfw2D(SkyCoord(cl_ra*u.deg, cl_dec*u.deg, frame='icrs'), cl_z, cl_radius, cl_radius_unit, aperture_radius, aperture_radius_unit, cosmo=cosmo)
def _nfw_flatcore_window_func(self, pix_list, cl_sk, cl_z, cl_radius, cosmo): ''' Get aperture function for NFW 2D Profile with a top-hat core Parameters ---------- pix_list: list List of pixels in the aperture cl_sk: astropy.coordinates.SkyCoord Cluster SkyCoord [degrees, frame='icrs'] cl_z: float Cluster redshift cl_radius: float Cluster radius in Mpc Returns ------- array Value of the aperture function at each pixel ''' Rs = 0.15/cosmo['h'] # 0.214Mpc Rcore = 0.1/cosmo['h'] # 0.142Mpc R = self.get_values_in_pixels('SkyCoord', pix_list, Rcore, transform=lambda x: convert_units(cl_sk.separation(x).value, 'degrees', 'mpc', cl_z, cosmo) ) return nfw2D_profile_flatcore(R, cl_radius, Rs, Rcore)
[docs] def plot(self, data, bad_val=hp.UNSEEN, auto_lim=False, ra_lim=None, dec_lim=None, cluster=None, cluster_kwargs=None, cosmo=None, fig=None, figsize=None, **kwargs): """ Plot footprint. It can also overlay clusters with their radial sizes. Parameters ---------- data: str Name of internal data to be used. bad_val: float Values for pixels outside footprint. auto_lim: bool Set automatic limits for ra/dec. ra_lim: list RA limits in degrees. dec_lim: list DEC limits in degrees. cluster: clevar.ClCatalog Clusters to be overlayed on footprint. cluster_kwargs: dict, None Keyword arguments to plot clusters. If cluster radius used, arguments for plt.Circle function, if not arguments for plt.scatter. cosmo: clevar.Cosmology object Cosmology object for when cluster radius has physical units. fig: matplotlib.figure.Figure, None Matplotlib figure object. If not provided a new one is created. figsize: tuple Width, height in inches (float, float). Default value from hp.cartview. **kwargs: Extra arguments for hp.cartview: * xsize (int) : The size of the image. Default: 800 * title (str) : The title of the plot. Default: None * min (float) : The minimum range value * max (float) : The maximum range value * remove_dip (bool) : If :const:`True`, remove the dipole+monopole * remove_mono (bool) : If :const:`True`, remove the monopole * gal_cut (float, scalar) : Symmetric galactic cut for \ the dipole/monopole fit. Removes points in latitude range \ [-gal_cut, +gal_cut] * format (str) : The format of the scale label. Default: '%g' * cbar (bool) : Display the colorbar. Default: True * notext (bool) : If True, no text is printed around the map * norm ({'hist', 'log', None}) : Color normalization, \ hist= histogram equalized color mapping, log= logarithmic color \ mapping, default: None (linear color mapping) * cmap (a color map) : The colormap to use (see matplotlib.cm) * badcolor (str) : Color to use to plot bad values * bgcolor (str) : Color to use for background * margins (None or sequence) : Either None, or a \ sequence (left,bottom,right,top) giving the margins on \ left,bottom,right and top of the axes. Values are relative to \ figure (0-1). Default: None Returns ------- fig: matplotlib.pyplot.figure Figure of the plot. The main can be accessed at fig.axes[0], and the colorbar at fig.axes[1]. """ fig, ax, cb = ph.plot_healpix_map( self.get_map(data, bad_val), nest=self.nest, auto_lim=auto_lim, bad_val=bad_val, ra_lim=ra_lim, dec_lim=dec_lim, fig=fig, figsize=figsize, **kwargs) if cb: cb.set_xlabel(data) ax.xaxis.tick_top() ax.xaxis.set_label_position('top') xlim, ylim = ax.get_xlim(), ax.get_ylim() if cluster is not None: if isinstance(cluster, ClCatalog): if 'radius' in cluster.tags and cluster.radius_unit is not None: if (cluster.radius_unit in physical_bank and (cosmo is None or 'z' not in cluster.tags)): raise TypeError( 'A cosmology and cluster redsift is necessary if ' 'cluster radius in physical units.') theta = np.linspace(0, 2*np.pi, 50) sin, cos = np.sin(theta), np.cos(theta) rad_deg = convert_units(cluster['radius'], cluster.radius_unit, 'degrees', redshift=cluster['z'], cosmo=cosmo) plt_cl = lambda ra, dec, radius: [ ax.plot(ra_+radius_*sin/np.cos(np.radians(dec_)), dec_+radius_*cos, **updated_dict({'color':'b', 'lw':1}, cluster_kwargs)) for ra_, dec_, radius_ in np.transpose([ra, dec, radius])[ (ra+radius>=xlim[0])*(ra-radius<xlim[1]) *(dec+radius>=ylim[0])*(dec-radius<ylim[1])] ] else: warnings.warn("Column 'radius' or radius_unit of cluster not set up. " "Plotting clusters as points with plt.scatter.") rad_deg = np.ones(cluster.size) lims_mask = lambda ra, dec: ((ra>=xlim[0])*(ra<xlim[1])*(dec>=ylim[0])*(dec<ylim[1])) plt_cl = lambda ra, dec, radius: \ ax.scatter(*np.transpose([ra, dec])[lims_mask(ra, dec)].T, **updated_dict({'color':'b', 's':5}, cluster_kwargs)) # Plot clusters in regular range plt_cl(cluster['ra'], cluster['dec'], rad_deg) # Plot clusters using -180<ra<0 if ax.get_xlim()[0]<=0.: ra2, dec2, r2 = np.transpose([cluster['ra'], cluster['dec'], rad_deg])[ (cluster['ra']>=180)].T ra2 -= 360. plt_cl(ra2, dec2, r2) # Plot clusters using 180<ra<360 if ax.get_xlim()[1]>=180.: ra2, dec2, r2 = np.transpose([cluster['ra'], cluster['dec'], rad_deg])[ (cluster['ra']<=0)].T ra2 += 360. plt_cl(ra2, dec2, r2) # Plot clusters using ra>360 (for xlim [360<, >0]) if ax.get_xlim()[1]>=360.: ra2, dec2, r2 = np.transpose([cluster['ra'], cluster['dec'], rad_deg])[ (cluster['ra']>=0)].T ra2 += 360. plt_cl(ra2, dec2, r2) else: raise TypeError(f'cluster argument (={cluster}) must be a ClCatalog.') return fig