Source code for txpipe.noise_maps

from .base_stage import PipelineStage
from .maps import TXBaseMaps
from .data_types import ShearCatalog, TomographyCatalog, MapsFile, \
                        LensingNoiseMaps, ClusteringNoiseMaps, HDFFile
import numpy as np
from .utils.mpi_utils import mpi_reduce_large
from .utils import choose_pixelization
from .utils.calibration_tools import read_shear_catalog_type

[docs]class TXNoiseMaps(PipelineStage): """ Generate a suite of random noise maps by randomly rotating individual galaxy measurements. Only works if the source and lens catalogs are the same lengths. Otherwise use TXSourceNoiseMaps and TXLensNoiseMaps below """ # TODO rewrite this as a TXBaseMaps subclass # like the two below name='TXNoiseMaps' inputs = [ ('shear_catalog', ShearCatalog), ('lens_tomography_catalog', TomographyCatalog), ('shear_tomography_catalog', TomographyCatalog), # We get the pixelization info from the diagnostic maps ('mask', MapsFile), ('lens_maps', MapsFile), ] outputs = [ ('source_noise_maps', LensingNoiseMaps), ('lens_noise_maps', ClusteringNoiseMaps), ] config_options = { 'chunk_rows': 100000, 'lensing_realizations': 30, 'clustering_realizations': 1, } def run(self): from .utils import choose_pixelization # get the number of bins. nbin_source, nbin_lens, ngal_maps, mask, map_info = self.read_inputs() pixel_scheme = choose_pixelization(**map_info) lensing_realizations = self.config['lensing_realizations'] clustering_realizations = self.config['clustering_realizations'] # The columns we will need shear_cols = ['ra', 'dec', 'weight', 'mcal_g1', 'mcal_g2'] # Make the iterators chunk_rows = self.config['chunk_rows'] it = self.combined_iterators(chunk_rows, 'shear_catalog', 'shear', shear_cols, 'shear_tomography_catalog','tomography', ['source_bin'], 'lens_tomography_catalog','tomography', ['lens_bin'], ) # Get a mapping from healpix indices to masked pixel indices # This reduces memory usage. We could use a healsparse array # here, but I'm not sure how to do that best with our # many realizations. Possiby a recarray? index_map = np.zeros(pixel_scheme.npix, dtype=np.int64) - 1 c = 0 for i in range(pixel_scheme.npix): if mask[i] > 0: index_map[i] = c c += 1 # Number of unmasked pixels npix = c if self.rank == 0: nmaps = nbin_source * (2 * lensing_realizations + 1) + nbin_lens * clustering_realizations * 2 nGB = (npix * nmaps * 8) / 1000.**3 print(f"Allocating maps of size {nGB:.2f} GB") # lensing g1, g2 G1 = np.zeros((npix, nbin_source, lensing_realizations)) G2 = np.zeros((npix, nbin_source, lensing_realizations)) # lensing weight GW = np.zeros((npix, nbin_source)) # clustering map - n_gal to start with ngal_split = np.zeros((npix, nbin_lens, clustering_realizations, 2), dtype=np.int32) # TODO: Clustering weights go here # Loop through the data for (s, e, data) in it: print(f"Rank {self.rank} processing rows {s} - {e}") source_bin = data['source_bin'] lens_bin = data['lens_bin'] ra = data['ra'] dec = data['dec'] orig_pixels = pixel_scheme.ang2pix(ra, dec) pixels = index_map[orig_pixels] n = e - s w = data['weight'] g1 = data['mcal_g1'] * w g2 = data['mcal_g2'] * w # randomly select a half for each object split = np.random.binomial(1, 0.5, (n, clustering_realizations)) # random rotations of the g1, g2 values phi = np.random.uniform(0, 2*np.pi, (n, lensing_realizations)) c = np.cos(phi) s = np.sin(phi) g1r = c * g1[:, np.newaxis] + s * g2[:, np.newaxis] g2r = -s * g1[:, np.newaxis] + c * g2[:, np.newaxis] for i in range(n): # convert to the index in the partial space pix = pixels[i] if pix < 0: continue sb = source_bin[i] lb = lens_bin[i] # build up the rotated map for each bin if sb >= 0: G1[pix, sb, :] += g1r[i] G2[pix, sb, :] += g2r[i] GW[pix, sb] += w[i] # Build up the ngal for the random half for each bin for j in range(clustering_realizations): if lb >= 0: ngal_split[pix, lb, j, split[i]] += 1 # TODO add to clustering weight too # Sum everything at root if self.comm is not None: mpi_reduce_large(G1, self.comm) mpi_reduce_large(G2, self.comm) mpi_reduce_large(GW, self.comm) mpi_reduce_large(ngal_split, self.comm) if self.rank != 0: del G1, G2, GW, ngal_split if self.rank==0: print("Saving maps") outfile = self.open_output('source_noise_maps', wrapper=True) # The top section has the metadata in group = outfile.file.create_group("maps") group.attrs['nbin_source'] = nbin_source group.attrs['lensing_realizations'] = lensing_realizations metadata = {**self.config, **map_info} pixels = np.where(mask>0)[0] for b in range(nbin_source): for i in range(lensing_realizations): bin_mask = np.where(GW[:, b]>0) g1 = G1[:, b, i] / GW[:, b] g2 = G2[:, b, i] / GW[:, b] outfile.write_map(f"rotation_{i}/g1_{b}", pixels[bin_mask], g1[bin_mask], metadata) outfile.write_map(f"rotation_{i}/g2_{b}", pixels[bin_mask], g2[bin_mask], metadata) outfile = self.open_output('lens_noise_maps', wrapper=True) group = outfile.file.create_group("maps") group.attrs['nbin_lens'] = nbin_lens group.attrs['clustering_realizations'] = clustering_realizations for b in range(nbin_lens): for i in range(clustering_realizations): # We have computed the first half already, # and we have the total map from an earlier stage half1 = ngal_split[:, b, i, 0] half2 = ngal_split[:, b, i, 1] # Convert to overdensity. I thought about # using half the mean from the full map to reduce # noise, but thought that might add covariance # to the two maps, and this shouldn't be that noisy mu1 = np.average(half1, weights=mask[pixels]) mu2 = np.average(half2, weights=mask[pixels]) # This will produce some mangled sentinel values # but they will be masked out rho1 = (half1 - mu1) / mu1 rho2 = (half2 - mu2) / mu2 # Write both overdensity and count maps # for each bin for each split outfile.write_map(f"split_{i}/rho1_{b}", pixels, rho1, metadata) outfile.write_map(f"split_{i}/rho2_{b}", pixels, rho2, metadata) # counts outfile.write_map(f"split_{i}/ngal1_{b}", pixels, half1, metadata) outfile.write_map(f"split_{i}/ngal2_{b}", pixels, half2, metadata) def read_inputs(self): with self.open_input('mask', wrapper=True) as f: mask = f.read_map('mask') # pixelization etc map_info = f.read_map_info('mask') with self.open_input('lens_maps', wrapper=True) as f: nbin_lens = f.file['maps'].attrs['nbin_lens'] ngal_maps = [f.read_map(f'ngal_{b}') for b in range(nbin_lens)] with self.open_input('shear_tomography_catalog') as f: nbin_source = f['tomography'].attrs['nbin_source'] sz1 = f['tomography/source_bin'].size with self.open_input('lens_tomography_catalog') as f: sz2 = f['tomography/lens_bin'].size if sz1 != sz2: raise ValueError("Lens and source catalogs are different sizes in " "TXNoiseMaps. In this case run TXSourceNoiseMaps " "and TXLensNoiseMaps separately.") return nbin_source, nbin_lens, ngal_maps, mask, map_info
[docs]class TXSourceNoiseMaps(TXBaseMaps): name='TXSourceNoiseMaps' inputs = [ ('shear_catalog', ShearCatalog), ('shear_tomography_catalog', TomographyCatalog), # We get the pixelization info from the diagnostic maps ('mask', MapsFile), ] outputs = [ ('source_noise_maps', LensingNoiseMaps), ] config_options = { 'chunk_rows': 100000, 'lensing_realizations': 30, } # instead of reading from config we match the basic maps
[docs] def choose_pixel_scheme(self): with self.open_input("mask", wrapper=True) as maps_file: pix_info = maps_file.read_map_info("mask") return choose_pixelization(**pix_info)
[docs] def prepare_mappers(self, pixel_scheme): read_shear_catalog_type(self) with self.open_input("mask", wrapper=True) as maps_file: mask = maps_file.read_map("mask") with self.open_input('shear_tomography_catalog', wrapper=True) as f: nbin_source = f.file['tomography'].attrs['nbin_source'] # Mapping from 0 .. nhit - 1 to healpix indices reverse_map = np.where(mask>0)[0] # Get a mapping from healpix indices to masked pixel indices # This reduces memory usage. We could use a healsparse array # here, but I'm not sure how to do that best with our # many realizations. Possiby a recarray? index_map = np.zeros(pixel_scheme.npix, dtype=np.int64) - 1 index_map[reverse_map] = np.arange(reverse_map.size) # Number of unmasked pixels npix = reverse_map.size lensing_realizations = self.config["lensing_realizations"] # lensing g1, g2 G1 = np.zeros((npix, nbin_source, lensing_realizations)) G2 = np.zeros((npix, nbin_source, lensing_realizations)) # lensing weight GW = np.zeros((npix, nbin_source)) return (npix, G1, G2, GW, index_map, reverse_map, nbin_source)
[docs] def data_iterator(self): if self.config["shear_catalog_type"] == "metacal": shear_cols = ['ra', 'dec', 'weight', 'mcal_g1', 'mcal_g2'] else: shear_cols = ['ra', 'dec', 'weight', 'g1', 'g2'] it = self.combined_iterators(self.config["chunk_rows"], 'shear_catalog', 'shear', shear_cols, 'shear_tomography_catalog','tomography', ['source_bin'], ) return it
[docs] def accumulate_maps(self, pixel_scheme, data, mappers): npix, G1, G2, GW, index_map, _, _ = mappers lensing_realizations = self.config['lensing_realizations'] if self.config['shear_catalog_type'] == 'metacal': data['g1'] = data['mcal_g1'] data['g2'] = data['mcal_g2'] source_bin = data['source_bin'] # Get the pixel index for each object and convert # to the reduced index ra = data['ra'] dec = data['dec'] orig_pixels = pixel_scheme.ang2pix(ra, dec) pixels = index_map[orig_pixels] # Pull out some columns we need n = len(ra) w = data['weight'] # Pre-weight the g1 values so we don't have to # weight each realization again g1 = data['g1'] * w g2 = data['g2'] * w # random rotations of the g1, g2 values phi = np.random.uniform(0, 2*np.pi, (n, lensing_realizations)) c = np.cos(phi) s = np.sin(phi) g1r = c * g1[:, np.newaxis] + s * g2[:, np.newaxis] g2r = -s * g1[:, np.newaxis] + c * g2[:, np.newaxis] for i in range(n): sb = source_bin[i] # Skip objects we don't use if sb < 0: continue # convert to the index in the partial space pix = pixels[i] # The sentinel value for pixels is -1 if pix < 0: continue # build up the rotated map for each bin G1[pix, sb, :] += g1r[i] G2[pix, sb, :] += g2r[i] GW[pix, sb] += w[i]
[docs] def finalize_mappers(self, pixel_scheme, mappers): # only one mapper here - we call its finalize method # to collect everything npix, G1, G2, GW, index_map, reverse_map, nbin_source = mappers lensing_realizations = self.config["lensing_realizations"] # Sum everything at root if self.comm is not None: mpi_reduce_large(G1, self.comm) mpi_reduce_large(G2, self.comm) mpi_reduce_large(GW, self.comm) if self.rank != 0: del G1, G2, GW # build up output maps = {} # only master gets full stuff if self.rank != 0: return maps for b in range(nbin_source): for i in range(lensing_realizations): bin_mask = np.where(GW[:, b]>0) g1 = G1[:, b, i] / GW[:, b] g2 = G2[:, b, i] / GW[:, b] maps["source_noise_maps", f"rotation_{i}/g1_{b}"] = ( reverse_map[bin_mask], g1[bin_mask] ) maps["source_noise_maps", f"rotation_{i}/g2_{b}"] = ( reverse_map[bin_mask], g2[bin_mask] ) return maps
[docs]class TXExternalLensNoiseMaps(TXBaseMaps): name='TXExternalLensNoiseMaps' inputs = [ ('lens_tomography_catalog', TomographyCatalog), ('lens_catalog', HDFFile), ('mask', MapsFile), ] outputs = [ ('lens_noise_maps', ClusteringNoiseMaps), ] config_options = { 'chunk_rows': 100000, 'clustering_realizations': 1, } # instead of reading from config we match the basic maps
[docs] def choose_pixel_scheme(self): with self.open_input("mask", wrapper=True) as maps_file: pix_info = maps_file.read_map_info("mask") return choose_pixelization(**pix_info)
[docs] def prepare_mappers(self, pixel_scheme): with self.open_input("mask", wrapper=True) as maps_file: mask = maps_file.read_map("mask") with self.open_input('lens_tomography_catalog', wrapper=True) as f: nbin_lens = f.file['tomography'].attrs['nbin_lens'] # Mapping from 0 .. nhit - 1 to healpix indices reverse_map = np.where(mask>0)[0] # Get a mapping from healpix indices to masked pixel indices # This reduces memory usage. We could use a healsparse array # here, but I'm not sure how to do that best with our # many realizations. Possiby a recarray? index_map = np.zeros(pixel_scheme.npix, dtype=np.int64) - 1 index_map[reverse_map] = np.arange(reverse_map.size) # Number of unmasked pixels npix = reverse_map.size clustering_realizations = self.config["clustering_realizations"] ngal_split = np.zeros((npix, nbin_lens, clustering_realizations, 2), dtype=np.int32) # TODO: Clustering weights go here return (npix, ngal_split, index_map, reverse_map, mask, nbin_lens)
[docs] def data_iterator(self): it = self.combined_iterators(self.config["chunk_rows"], 'lens_catalog','lens', ['ra', 'dec'], 'lens_tomography_catalog','tomography', ['lens_bin'], ) return it
[docs] def accumulate_maps(self, pixel_scheme, data, mappers): npix, ngal_split, index_map, _, _, _ = mappers clustering_realizations = self.config['clustering_realizations'] # Tomographic bin lens_bin = data['lens_bin'] # Get the pixel index for each object and convert # to the reduced index ra = data['ra'] dec = data['dec'] orig_pixels = pixel_scheme.ang2pix(ra, dec) pixels = index_map[orig_pixels] n = len(ra) # randomly select a half for each object split = np.random.binomial(1, 0.5, (n, clustering_realizations)) for i in range(n): lb = lens_bin[i] # Skip objects we don't use if lb < 0: continue # convert to the index in the partial space pix = pixels[i] # The sentinel value for pixels is -1 if pix < 0: continue for j in range(clustering_realizations): ngal_split[pix, lb, j, split[i]] += 1
[docs] def finalize_mappers(self, pixel_scheme, mappers): # only one mapper here - we call its finalize method # to collect everything npix, ngal_split, index_map, reverse_map, mask, nbin_lens = mappers clustering_realizations = self.config["clustering_realizations"] # Sum everything at root if self.comm is not None: mpi_reduce_large(ngal_split, self.comm) if self.rank != 0: del ngal_split # build up output maps = {} # only master gets full stuff if self.rank != 0: return maps for b in range(nbin_lens): for i in range(clustering_realizations): # We have computed the first half already, # and we have the total map from an earlier stage half1 = ngal_split[:, b, i, 0] half2 = ngal_split[:, b, i, 1] # Convert to overdensity. I thought about # using half the mean from the full map to reduce # noise, but thought that might add covariance # to the two maps, and this shouldn't be that noisy mu1 = np.average(half1, weights=mask[reverse_map]) mu2 = np.average(half2, weights=mask[reverse_map]) # This will produce some mangled sentinel values # but they will be masked out rho1 = (half1 - mu1) / mu1 rho2 = (half2 - mu2) / mu2 # Save four maps - density splits and ngal splits maps['lens_noise_maps', f"split_{i}/rho1_{b}"] = ( reverse_map, rho1) maps['lens_noise_maps', f"split_{i}/rho2_{b}"] = ( reverse_map, rho2) maps['lens_noise_maps', f"split_{i}/ngal1_{b}"] = ( reverse_map, half1) maps['lens_noise_maps', f"split_{i}/ngal2_{b}"] = ( reverse_map, half2) return maps