Source code for txpipe.maps

from .base_stage import PipelineStage
from .data_types import TomographyCatalog, MapsFile, HDFFile, ShearCatalog
import numpy as np
from .utils import unique_list, choose_pixelization
from .utils.calibration_tools import read_shear_catalog_type, apply_metacal_response
from .utils.calibrators import LensfitCalibrator
from .mapping import Mapper, FlagMapper


SHEAR_SHEAR = 0
SHEAR_POS = 1
POS_POS = 2

# These generic mapping options are used by multiple different
# map types.
# TODO: consider dropping support for gnomonic maps.
# Also consider adding support for pixell
map_config_options = {
    "chunk_rows": 100000,  # The number of rows to read in each chunk of data at a time
    "pixelization": "healpix",  # The pixelization scheme to use, currently just healpix
    "nside": 0,  # The Healpix resolution parameter for the generated maps. Only req'd if using healpix
    "sparse": True,  # Whether to generate sparse maps - faster and less memory for small sky areas,
    "ra_cent": np.nan,  # These parameters are only required if pixelization==tan
    "dec_cent": np.nan,
    "npix_x": -1,
    "npix_y": -1,
    "pixel_size": np.nan,  # Pixel size of pixelization scheme
}


[docs]class TXBaseMaps(PipelineStage): """ This is an abstract base mapping class, which other subclasses inherit from to use the same basic structure, which is: - select pixelization - prepare some mapper objects - iterate through selected columns - update each mapper with each chunk - finalize the mappers - save the maps """ name = "TXBaseMaps" inputs = [] outputs = [] config_options = {} def run(self): # Read input configuration information # and select pixel scheme. Also save the scheme # metadata so we can save it later pixel_scheme = self.choose_pixel_scheme() self.config.update(pixel_scheme.metadata) # Initialize our maps mappers = self.prepare_mappers(pixel_scheme) # Loop through the data for s, e, data in self.data_iterator(): # Give an idea of progress print(f"Process {self.rank} read data chunk {s:,} - {e:,}") # Build up any maps we are using self.accumulate_maps(pixel_scheme, data, mappers) # Finalize and output the maps maps = self.finalize_mappers(pixel_scheme, mappers) if self.rank == 0: self.save_maps(pixel_scheme, maps)
[docs] def choose_pixel_scheme(self): """ Subclasses can override to instead load pixelization from an existing map """ return choose_pixelization(**self.config)
[docs] def prepare_mappers(self, pixel_scheme): """ Subclasses must override to init any mapper objects """ raise RuntimeError("Do not use TXBaseMaps - use a subclass")
[docs] def data_iterator(self): """ Subclasses must override to create an iterator looping over input data """ raise RuntimeError("Do not use TXBaseMaps - use a subclass")
[docs] def accumulate_maps(self, pixel_scheme, data, mappers): """ Subclasses must override to supply the next chunk "data" to their mappers """ raise RuntimeError("Do not use TXBaseMaps - use a subclass")
[docs] def finalize_mappers(self, pixel_scheme, mappers): """ Subclasses must override to finalize their maps and return a dictionary of (output_tag, map_name) -> (pixels, values) """ raise RuntimeError("Do not use TXBaseMaps - use a subclass")
[docs] def save_maps(self, pixel_scheme, maps): """ Subclasses can use this directly, by generating maps as described in finalize_mappers """ # Find and open all output files tags = unique_list([outfile for outfile, _ in maps.keys()]) output_files = { tag: self.open_output(tag, wrapper=True) for tag in tags if tag.endswith("maps") } # add a maps section to each for output_file in output_files.values(): output_file.file.create_group("maps") output_file.file["maps"].attrs.update(self.config) # same the relevant maps in each for (tag, map_name), (pix, map_data) in maps.items(): output_files[tag].write_map(map_name, pix, map_data, self.config)
[docs]class TXSourceMaps(TXBaseMaps): """ Make g1, g2, var(g1), var(g2), and lensing weight maps from shear catalogs and tomography """ name = "TXSourceMaps" inputs = [ ("shear_catalog", ShearCatalog), ("shear_tomography_catalog", TomographyCatalog), ] outputs = [ ("source_maps", MapsFile), ] # Generic mapping options + one option # to use the truth shear columns config_options = {"true_shear": False, **map_config_options}
[docs] def prepare_mappers(self, pixel_scheme): # read shear cols and calibration info nbin_source, cal = self.get_calibrators() # store in config so it is saved later self.config["nbin_source"] = nbin_source # create basic mapper object source_bins = list(range(nbin_source)) lens_bins = [] mapper = Mapper( pixel_scheme, lens_bins, source_bins, do_lens=False, sparse=self.config["sparse"], ) return [mapper, cal]
def get_calibrators(self): shear_catalog_type = read_shear_catalog_type(self) with self.open_input("shear_tomography_catalog") as f: nbin_source = f["tomography"].attrs["nbin_source"] if shear_catalog_type == "metacal": R = f['/metacal_response/R_total'][:] # nbin x 2 x 2 cal = {i:R[i] for i in range(nbin_source)} cal['2D'] = f['/metacal_response/R_total_2d'][:] elif shear_catalog_type == "lensfit": K = f['/response/K'][:] c = f['/response/C'][:] cal = {i: (K[i], c[i]) for i in range(nbin_source)} cal['2D'] = ( f['/response/K_2d'][0], f['/response/C_2d'][:], ) elif shear_catalog_type == "hsc": R = f['/response/R'][:] K = f['/response/K'][:] cal = {i: (R[i], K[i]) for i in range(nbin_source)} cal['2D'] = ( f['/response/R_2d'][0], f['/response/K_2d'][:], ) else: raise ValueError("Unknown calibration") return nbin_source, cal
[docs] def data_iterator(self): # can optionally read truth values if self.config["true_shear"]: shear_cols = ["true_g1", "true_g2", "ra", "dec", "weight"] elif self.config["shear_catalog_type"] == "metacal": shear_cols = ["mcal_g1", "mcal_g2", "ra", "dec", "weight"] else: shear_cols = ["g1", "g2", "ra", "dec", "weight"] # use utility function that combines data chunks # from different files. Reading from n file sections # takes 3n+1 arguments return self.combined_iterators( self.config["chunk_rows"], # number of rows to iterate at once # first file info "shear_catalog", # tag of input file to iterate through "shear", # data group within file to look at shear_cols, # column(s) to read # next file "shear_tomography_catalog", # tag of input file to iterate through "tomography", # data group within file to look at ["source_bin"], # column(s) to read )
[docs] def accumulate_maps(self, pixel_scheme, data, mappers): # rename columns, if needed if self.config["true_shear"]: data["g1"] = data["true_g1"] data["g2"] = data["true_g2"] elif self.config["shear_catalog_type"] == "metacal": data["g1"] = data["mcal_g1"] data["g2"] = data["mcal_g2"] # for other catalogs they're just called g1, g2 aready # send data to map mapper = mappers[0] mapper.add_data(data)
def calibrate_map_metacal(self, g1, g2, var_g1, var_g2, R): g1, g2 = apply_metacal_response(R, 0, g1, g2) std_g1 = np.sqrt(var_g1) std_g2 = np.sqrt(var_g2) std_g1, std_g2 = apply_metacal_response(R, 0, std_g1, std_g2) var_g1 = std_g1 ** 2 var_g2 = std_g2 ** 2 return g1, g2, var_g1, var_g2 def calibrate_map_lensfit(self, g1, g2, var_g1, var_g2, K, c): calib = LensfitCalibrator(K,c) g1,g2 = calib.apply(g1,g2,subtract_mean=True) var_g1,var_g2 = calib.apply(g1,g2,subtract_mean=False) return g1, g2, var_g1, var_g2 def calibrate_map_hsc(self, g1, g2, var_g1, var_g2, K, c): return "need to write calibrate map hsc function" def calibrate_maps(self, g1, g2, var_g1, var_g2, cal): import healpy # We will return lists of calibrated maps g1_out = {} g2_out = {} var_g1_out = {} var_g2_out = {} # We calibrate the 2D case separately for i in g1.keys(): # we want to avoid accidentally calibrating any pixels # that should be masked. mask = ( (g1[i] == healpy.UNSEEN) | (g2[i] == healpy.UNSEEN) | (var_g1[i] == healpy.UNSEEN) | (var_g2[i] == healpy.UNSEEN) ) if self.config['shear_catalog_type'] == 'metacal': out = self.calibrate_map_metacal(g1[i], g2[i], var_g1[i], var_g2[i], cal[i]) elif self.config['shear_catalog_type'] == 'lensfit': K, c = cal[i] out = self.calibrate_map_lensfit(g1[i], g2[i], var_g1[i], var_g2[i], K, c) elif self.config['shear_catalog_type'] == 'hsc': R, K, c = cal[i] out = self.calibrate_map_hsc(g1[i], g2[i], var_g1[i], var_g2[i], R, K, c) else: raise ValueError("Unknown calibration") # re-apply the masking, just to make sure for x in out: x[mask] = healpy.UNSEEN # append our results for this tomographic bin g1_out[i] = out[0] g2_out[i] = out[1] var_g1_out[i] = out[2] var_g2_out[i] = out[3] return g1_out, g2_out, var_g1_out, var_g2_out
[docs] def finalize_mappers(self, pixel_scheme, mappers): # only one mapper here - we call its finalize method # to collect everything mapper, cal = mappers pix, _, _, g1, g2, var_g1, var_g2, weights_g = mapper.finalize(self.comm) # build up output maps = {} # only master gets full stuff if self.rank != 0: return maps # Calibrate the maps g1, g2, var_g1, var_g2 = self.calibrate_maps(g1, g2, var_g1, var_g2, cal) for b in mapper.source_bins: # keys are the output tag and the map name maps["source_maps", f"g1_{b}"] = (pix, g1[b]) maps["source_maps", f"g2_{b}"] = (pix, g2[b]) maps["source_maps", f"var_g1_{b}"] = (pix, var_g1[b]) maps["source_maps", f"var_g2_{b}"] = (pix, var_g2[b]) maps["source_maps", f"lensing_weight_{b}"] = (pix, weights_g[b]) return maps
[docs]class TXLensMaps(TXBaseMaps): """ Make galaxy number count maps from photometry and lens tomography. Density maps are made later once masks are generated. """ name = "TXLensMaps" inputs = [ ("photometry_catalog", HDFFile), ("lens_tomography_catalog", TomographyCatalog), ] outputs = [ ("lens_maps", MapsFile), ] config_options = {**map_config_options}
[docs] def prepare_mappers(self, pixel_scheme): # read nbin_lens and save with self.open_input("lens_tomography_catalog") as f: nbin_lens = f["tomography"].attrs["nbin_lens"] self.config["nbin_lens"] = nbin_lens # create lone mapper lens_bins = list(range(nbin_lens)) source_bins = [] mapper = Mapper( pixel_scheme, lens_bins, source_bins, do_g=False, sparse=self.config["sparse"], ) return [mapper]
[docs] def data_iterator(self): print("TODO: add use of lens weights here") # see TXSourceMaps abov for info on this return self.combined_iterators( self.config["chunk_rows"], # first file "photometry_catalog", "photometry", ["ra", "dec"], # next file "lens_tomography_catalog", "tomography", ["lens_bin"], )
[docs] def accumulate_maps(self, pixel_scheme, data, mappers): # no need to rename cols here since just ra, dec, lens_bin mapper = mappers[0] mapper.add_data(data)
[docs] def finalize_mappers(self, pixel_scheme, mappers): # Again just the one mapper mapper = mappers[0] # Ignored return values are empty dicts for shear pix, ngal, weighted_ngal, _, _, _, _, _ = mapper.finalize(self.comm) maps = {} if self.rank != 0: return maps for b in mapper.lens_bins: # keys are the output tag and the map name maps["lens_maps", f"ngal_{b}"] = (pix, ngal[b]) maps["lens_maps", f"weighted_ngal_{b}"] = (pix, weighted_ngal[b]) return maps
[docs]class TXExternalLensMaps(TXLensMaps): """ Same as TXLensMaps except it reads from an external lens catalog. """ name = "TXExternalLensMaps" inputs = [ ("lens_catalog", HDFFile), ("lens_tomography_catalog", TomographyCatalog), ] config_options = {**map_config_options}
[docs] def data_iterator(self): # See TXSourceMaps for an explanation of thsis return self.combined_iterators( self.config["chunk_rows"], # first file "lens_catalog", "lens", ["ra", "dec"], # next file "lens_tomography_catalog", "tomography", ["lens_bin", "lens_weight"], # another section in the same file )
[docs]class TXMainMaps(TXSourceMaps, TXLensMaps): """ Combined source and photometric lens maps, from the same photometry catalog. This might be slightly faster than running two maps separately, but it only works if the source and lens catalogs are the same set of objects. Otherwise use TXSourceMaps and TXLensMaps. """ name = "TXMainMaps" inputs = [ ("photometry_catalog", HDFFile), ("lens_tomography_catalog", TomographyCatalog), ("shear_tomography_catalog", TomographyCatalog), ("shear_catalog", ShearCatalog), ] outputs = [ ("lens_maps", MapsFile), ("source_maps", MapsFile), ] config_options = {"true_shear": False, **map_config_options}
[docs] def data_iterator(self): # This is just the combination of # the source and lens map columns print("TODO: no lens weights here") with self.open_input("photometry_catalog") as f: sz1 = f['photometry/ra'].size with self.open_input("shear_catalog") as f: sz2 = f['shear/ra'].size if sz1 != sz2: raise ValueError("Shear and photometry catalogs in TXMainMaps are " "different sizes. To use separate source and lens " "samples use TXSourceMaps and TXLensMaps separately." ) # metacal, lensfit, etc. shear_catalog_type = read_shear_catalog_type(self) # can optionally read truth values, or otherwise will look for # lensfit or metacal col names if self.config["true_shear"]: shear_cols = ["true_g1", "true_g1", "ra", "dec", "weight"] elif shear_catalog_type == "metacal": shear_cols = ["mcal_g1", "mcal_g2", "ra", "dec", "weight"] else: shear_cols = ["g1", "g2", "ra", "dec", "weight"] return self.combined_iterators( self.config["chunk_rows"], # first file "photometry_catalog", "photometry", ["ra", "dec"], # next file "shear_catalog", "shear", shear_cols, # next file "lens_tomography_catalog", "tomography", ["lens_bin", "lens_weight"], # next file "shear_tomography_catalog", "tomography", ["source_bin"], )
[docs] def prepare_mappers(self, pixel_scheme): nbin_source, cal = self.get_calibrators() with self.open_input("lens_tomography_catalog") as f: nbin_lens = f["tomography"].attrs["nbin_lens"] self.config["nbin_source"] = nbin_source self.config["nbin_lens"] = nbin_lens source_bins = list(range(nbin_source)) lens_bins = list(range(nbin_lens)) # still a single mapper doing source and lens mapper = Mapper( pixel_scheme, lens_bins, source_bins, sparse=self.config["sparse"] ) return [mapper, cal]
# accumulate_maps is inherited from TXSourceMaps because # that appears first in the parent classes
[docs] def finalize_mappers(self, pixel_scheme, mappers): # Still one mapper, but now we read both source and # lens maps from it. mapper, cal = mappers pix, ngal, weighted_ngal, g1, g2, var_g1, var_g2, weights_g = mapper.finalize(self.comm) maps = {} if self.rank != 0: return maps g1, g2, var_g1, var_g2 = self.calibrate_maps(g1, g2, var_g1, var_g2, cal) # Now both loops, source and lens for b in mapper.lens_bins: maps["lens_maps", f"ngal_{b}"] = (pix, ngal[b]) maps["lens_maps", f"weighted_ngal_{b}"] = (pix, weighted_ngal[b]) for b in mapper.source_bins: maps["source_maps", f"g1_{b}"] = (pix, g1[b]) maps["source_maps", f"g2_{b}"] = (pix, g2[b]) maps["source_maps", f"var_g1_{b}"] = (pix, var_g1[b]) maps["source_maps", f"var_g2_{b}"] = (pix, var_g2[b]) maps["source_maps", f"lensing_weight_{b}"] = (pix, weights_g[b]) return maps
[docs]class TXDensityMaps(PipelineStage): """ Convert n_gal maps to overdensity delta maps delta = (ngal - <ngal>) / <ngal> This has to be separate from the lens mappers above because it requires the mask, which is created elsewhere (right now in masks.py) """ name = "TXDensityMaps" inputs = [ ("lens_maps", MapsFile), ("mask", MapsFile), ] outputs = [ ("density_maps", MapsFile), ] def run(self): import healpy # Read the mask with self.open_input("mask", wrapper=True) as f: mask = f.read_map("mask") # set unseen pixels to weight zero mask[mask == healpy.UNSEEN] = 0 mask[np.isnan(mask)] = 0 mask = mask.flatten() pix = np.where(mask > 0)[0] # Read the count maps with self.open_input("lens_maps", wrapper=True) as f: meta = dict(f.file["maps"].attrs) nbin_lens = meta["nbin_lens"] ngal_maps = [f.read_map(f"weighted_ngal_{b}").flatten() for b in range(nbin_lens)] # Convert count maps into density maps density_maps = [] for i, ng in enumerate(ngal_maps): mask_copy = mask.copy() mask_copy[ng == healpy.UNSEEN] = 0 ng[np.isnan(ng)] = 0.0 ng[ng == healpy.UNSEEN] = 0 # Convert the number count maps to overdensity maps. # First compute the overall mean object count per bin. # mean clustering galaxies per pixel in this map mu = np.average(ng, weights=mask_copy) print(f"Mean number density in bin {i} = {mu}") # and then use that to convert to overdensity d = (ng - mu) / mu # remove nans d[mask == 0] = 0 density_maps.append(d) # write output with self.open_output("density_maps", wrapper=True) as f: # create group and save metadata there too. f.file.create_group("maps") f.file["maps"].attrs.update(meta) # save each density map for i, rho in enumerate(density_maps): f.write_map(f"delta_{i}", pix, rho[pix], meta)