from .base_stage import PipelineStage
from .data_types import HDFFile, ShearCatalog, TomographyCatalog, RandomsCatalog, FiducialCosmology, SACCFile, PhotozPDFFile, PNGFile, TextFile
from .utils.calibration_tools import apply_metacal_response, apply_lensfit_calibration 
from .utils.calibration_tools import read_shear_catalog_type
import numpy as np
import random
import collections
import sys
import os
import pathlib
from time import perf_counter
import gc

# This creates a little mini-type, like a struct,
# for holding individual measurements
Measurement = collections.namedtuple(
    ['corr_type', 'object', 'i', 'j'])


[docs]class TXTwoPoint(PipelineStage): r""" This is the base stage for real-space two-point correlations. Input files: * shear_tomography_catalog: Shear catalog. * shear_photoz_stack: Shear photoz information. * lens_tomography_catalog: Lens catalog. * lens_photoz_stack: Lens photoz information. * random_cats: Random catalog for lenses. * patch_centers: Patch centers for Jackknife. * tracer_metadata: Metadata for tracers. Return files: * twopoint_data_real_raw: Sacc file with all two-point correlations including jackknife covariance matrices. * twopoint_gamma_x: This is the gamma_x component, for null tests, its kept seperate since it is only needed for null tests. """ name='TXTwoPoint' inputs = [ ('binned_shear_catalog', ShearCatalog), ('binned_lens_catalog', HDFFile), ('binned_random_catalog', HDFFile), ('shear_photoz_stack', HDFFile), ('lens_photoz_stack', HDFFile), ('patch_centers', TextFile), ('tracer_metadata', HDFFile), ] outputs = [ ('twopoint_data_real_raw', SACCFile), ('twopoint_gamma_x', SACCFile) ] # Add values to the config file that are not previously defined config_options = { # TODO: Allow more fine-grained selection of 2pt subsets to compute 'calcs':[0,1,2], 'min_sep':0.5, 'max_sep':300., 'nbins':9, 'bin_slop':0.0, 'sep_units':'arcmin', 'flip_g1':False, 'flip_g2':True, 'cores_per_task':20, 'verbose':1, 'source_bins':[-1], 'lens_bins':[-1], 'reduce_randoms_size':1.0, 'do_shear_shear': True, 'do_shear_pos': True, 'do_pos_pos': True, 'var_method': 'jackknife', 'use_randoms': True, 'low_mem': False, 'patch_dir': './cache/patches', }
[docs] def run(self): """ Run the analysis for this stage. """ import sacc import healpy import treecorr # Binning information source_list, lens_list = self.read_nbin() # Calculate metadata like the area and related # quantities meta = self.read_metadata() # Choose which pairs of bins to calculate calcs = self.select_calculations(source_list, lens_list) sys.stdout.flush() # Split the catalogs into patch files self.prepare_patches(calcs) results = [] for i,j,k in calcs: result = self.call_treecorr(i, j, k) results.append(result) # Save the results if self.rank==0: self.write_output(source_list, lens_list, meta, results)
def select_calculations(self, source_list, lens_list): calcs = [] # For shear-shear we omit pairs with j>i if self.config['do_shear_shear']: k = SHEAR_SHEAR for i in source_list: for j in range(i+1): if j in source_list: calcs.append((i,j,k)) # For shear-position we use all pairs if self.config['do_shear_pos']: k = SHEAR_POS for i in source_list: for j in lens_list: calcs.append((i,j,k)) # For position-position we omit pairs with j>i if self.config['do_pos_pos']: if not self.config['use_randoms']: raise ValueError("You need to have a random catalog to calculate position-position correlations") k = POS_POS for i in lens_list: for j in range(i+1): if j in lens_list: calcs.append((i,j,k)) if self.rank==0: print(f"Running {len(calcs)} calculations: {calcs}") return calcs
[docs] def read_nbin(self): """ Determine the bins to use in this analysis, either from the input file or from the configuration. """ if self.config['source_bins'] == [-1] and self.config['lens_bins'] == [-1]: source_list, lens_list = self._read_nbin_from_tomography() else: source_list, lens_list = self._read_nbin_from_config() ns = len(source_list) nl = len(lens_list) if self.rank == 0: print(f'Running with {ns} source bins and {nl} lens bins') return source_list, lens_list
# These two functions can be combined into a single one. def _read_nbin_from_tomography(self): with self.open_input('binned_shear_catalog') as f: nbin_source = f['shear'].attrs['nbin_source'] with self.open_input('binned_lens_catalog') as f: nbin_lens = f['lens'].attrs['nbin_lens'] source_list = range(nbin_source) lens_list = range(nbin_lens) return source_list, lens_list def _read_nbin_from_config(self): # TODO handle the case where the user only specefies # bins for only sources or only lenses source_list = self.config['source_bins'] lens_list = self.config['lens_bins'] # catch bad input tomo_source_list, tomo_lens_list = self._read_nbin_from_tomography() tomo_nbin_source = len(tomo_source_list) tomo_nbin_lens = len(tomo_lens_list) nbin_source = len(source_list) nbin_lens = len(lens_list) if source_list == [-1]: source_list = tomo_source_list if lens_list == [-1]: lens_list = tomo_lens_list # if more bins are input than exist, raise an error if not nbin_source <= tomo_nbin_source: raise ValueError(f'Requested too many source bins in the config ({nbin_source}): max is {tomo_nbin_source}') if not nbin_lens <= tomo_nbin_lens: raise ValueError(f'Requested too many lens bins in the config ({nbin_lens}): max is {tomo_nbin_lens}') # make sure the bin numbers actually exist for i in source_list: if i not in tomo_source_list: raise ValueError(f"Requested source bin {i} that is not in the input file") for i in lens_list: if i not in tomo_lens_list: raise ValueError(f"Requested lens bin {i} that is not in the input file") return source_list, lens_list def write_output(self, source_list, lens_list, meta, results): import sacc import treecorr XI = "combined" XIP = sacc.standard_types.galaxy_shear_xi_plus XIM = sacc.standard_types.galaxy_shear_xi_minus GAMMAT = sacc.standard_types.galaxy_shearDensity_xi_t GAMMAX = sacc.standard_types.galaxy_shearDensity_xi_x WTHETA = sacc.standard_types.galaxy_density_xi S = sacc.Sacc() if self.config['do_shear_pos'] == True: S2 = sacc.Sacc() # We include the n(z) data in the output. # So here we load it in and add it to the data f = self.open_input('shear_photoz_stack') # Load the tracer data N(z) from an input file and # copy it to the output, for convenience for i in source_list: z = f['n_of_z/source/z'][:] Nz = f[f'n_of_z/source/bin_{i}'][:] S.add_tracer('NZ', f'source_{i}', z, Nz) if self.config['do_shear_pos'] == True: S2.add_tracer('NZ', f'source_{i}',z, Nz) f = self.open_input('lens_photoz_stack') # For both source and lens for i in lens_list: z = f['n_of_z/lens/z'][:] Nz = f[f'n_of_z/lens/bin_{i}'][:] S.add_tracer('NZ', f'lens_{i}', z, Nz) if self.config['do_shear_pos'] == True: S2.add_tracer('NZ', f'lens_{i}',z, Nz) # Closing n(z) file f.close() # Now build up the collection of data points, adding them all to # the sacc data one by one. comb = [] for d in results: # First the tracers and generic tags tracer1 = f'source_{d.i}' if d.corr_type in [XI, GAMMAT] else f'lens_{d.i}' tracer2 = f'source_{d.j}' if d.corr_type in [XI] else f'lens_{d.j}' # We build up the comb list to get the covariance of it later # in the same order as our data points comb.append(d.object) theta = np.exp(d.object.meanlogr) npair = d.object.npairs weight = d.object.weight # xip / xim is a special case because it has two observables. # the other two are together below if d.corr_type == XI: xip = d.object.xip xim = d.object.xim xiperr = np.sqrt(d.object.varxip) ximerr = np.sqrt(d.object.varxim) n = len(xip) # add all the data points to the sacc for i in range(n): S.add_data_point(XIP, (tracer1,tracer2), xip[i], theta=theta[i], error=xiperr[i], npair=npair[i], weight= weight[i]) for i in range(n): S.add_data_point(XIM, (tracer1,tracer2), xim[i], theta=theta[i], error=ximerr[i], npair=npair[i], weight= weight[i]) else: xi = d.object.xi err = np.sqrt(d.object.varxi) n = len(xi) for i in range(n): S.add_data_point(d.corr_type, (tracer1,tracer2), xi[i], theta=theta[i], error=err[i], weight=weight[i]) # Add the covariance. There are several different jackknife approaches # available - see the treecorr docs cov = treecorr.estimate_multi_cov(comb, self.config['var_method']) S.add_covariance(cov) # Our data points may currently be in any order depending on which processes # ran which calculations. Re-order them. S.to_canonical_order() self.write_metadata(S,meta) # Finally, save the output to Sacc file S.save_fits(self.get_output('twopoint_data_real_raw'), overwrite=True) # Adding the gammaX calculation: if self.config['do_shear_pos'] == True: comb = [] for d in results: tracer1 = f'source_{d.i}' if d.corr_type in [XI, GAMMAT] else f'lens_{d.i}' tracer2 = f'source_{d.j}' if d.corr_type in [XI] else f'lens_{d.j}' if d.corr_type == GAMMAT: theta = np.exp(d.object.meanlogr) npair = d.object.npairs weight = d.object.weight xi_x = d.object.xi_im covX = d.object.estimate_cov('shot') comb.append(covX) err = np.sqrt(np.diag(covX)) n = len(xi_x) for i in range(n): S2.add_data_point(GAMMAX, (tracer1,tracer2), xi_x[i], theta=theta[i], error=err[i], weight=weight[i]) S2.add_covariance(comb) S2.to_canonical_order self.write_metadata(S2,meta) S2.save_fits(self.get_output('twopoint_gamma_x'), overwrite=True) def write_metadata(self, S, meta): # We also save the associated metadata to the file for k,v in meta.items(): if np.isscalar(v): S.metadata[k] = v else: for i, vi in enumerate(v): S.metadata[f'{k}_{i}'] = vi # Add provenance metadata. In managed formats this is done # automatically, but because the Sacc library is external # we do it manually here. provenance = self.gather_provenance() provenance.update(SACCFile.generate_provenance()) for key, value in provenance.items(): if isinstance(value, str) and '\n' in value: values = value.split("\n") for i,v in enumerate(values): S.metadata[f'provenance/{key}_{i}'] = v else: S.metadata[f'provenance/{key}'] = value
[docs] def call_treecorr(self, i, j, k): """ This is a wrapper for interaction with treecorr. """ import sacc if k==SHEAR_SHEAR: xx = self.calculate_shear_shear(i, j) xtype = "combined" elif k==SHEAR_POS: xx = self.calculate_shear_pos(i, j) xtype = sacc.standard_types.galaxy_shearDensity_xi_t elif k==POS_POS: xx = self.calculate_pos_pos(i, j) xtype = sacc.standard_types.galaxy_density_xi else: raise ValueError(f"Unknown correlation function {k}") # Force garbage collection here to make sure all the # catalogs are definitely freed gc.collect() # The measurement object collects the results and type info. # we use it because the ordering will not be simple if we have # parallelized, so it's good to keep explicit track. result = Measurement(xtype, xx, i, j) sys.stdout.flush() return result
[docs] def prepare_patches(self, calcs): """ For each catalog to be generated, have one process load the catalog and write its patch files out to disc. These are then re-used later by all the different processes. Parameters ---------- calcs: list A list of (bin1, bin2, bin_type) where bin1 and bin2 are indices or bin labels and bin_type is one of the constants SHEAR_SHEAR, SHEAR_POS, or POS_POS. """ # Make the full list of catalogs to run cats = set() # Use shear-shear and pos-pos only here as they represent # catalogs not pairs. for i, j, k in calcs: if k == SHEAR_SHEAR: cats.add((i, SHEAR_SHEAR)) cats.add((j, SHEAR_SHEAR)) elif k == SHEAR_POS: cats.add((i, SHEAR_SHEAR)) cats.add((j, POS_POS)) elif k == POS_POS: cats.add((i, POS_POS)) cats.add((j, POS_POS)) cats = list(cats) cats.sort(key=str) # This does a round-robin assignment to processes for (h, k) in self.split_tasks_by_rank(cats): print(f"Rank {self.rank} making patches for {k}-type bin {h}") # For shear we just have the one catalog. For position we may # have randoms also. We explicitly delete catalogs after loading # them to ensure we don't have two in memory at once. if k == SHEAR_SHEAR: cat = self.get_shear_catalog(h) cat.get_patches(low_mem=False) del cat else: cat = self.get_lens_catalog(h) cat.get_patches(low_mem=False) del cat ran_cat = self.get_random_catalog(h) # support use_randoms = False if ran_cat is None: continue ran_cat.get_patches(low_mem=False) del ran_cat # stop other processes progressing to the rest of the code and # trying to load things we have not written yet if self.comm is not None: self.comm.Barrier()
[docs] def get_patch_dir(self, input_tag, b): """ Select a patch directory for the file with the given input tag and with a bin number/label. To ensure that if you change the catalog the patch dir will also change, the directory path includes the unique ID of the input file. Parameters ---------- input_tag: str One of the tags in the class's inputs attribute b: any An additional label used as the last component in the returned directory Returns ------- str: a directory, which has been created if it did not exist already. """ # start from a user-specified base directory patch_base = self.config['patch_dir'] # append the unique identifier for the parent catalog file with self.open_input(input_tag, wrapper=True) as f: p = f.read_provenance() uuid = p['uuid'] pth = pathlib.Path(f.path).resolve() ctime = os.stat(pth).st_ctime # We expect the input files to be generated within a pipeline and so always # have input files to have a unique ID. But if for some reason it doesn't # have one we handle that too. if uuid == 'UNKNOWN': ident = hash(f"{pth}{ctime}").to_bytes(8, 'big', signed=True).hex() name = f"{input_tag}_{ident}" else: name = f"{input_tag}_{uuid}" # And finally append the bin name or number patch_dir = pathlib.Path(patch_base) / name / str(b) # Make the directory and return it pathlib.Path(patch_dir).mkdir(exist_ok=True, parents=True) return patch_dir
def get_shear_catalog(self, i): import treecorr # Load and calibrate the appropriate bin data cat = treecorr.Catalog( self.get_input("binned_shear_catalog"), ext = f"/shear/bin_{i}", g1_col = "g1", g2_col = "g2", ra_col = "ra", dec_col = "dec", w_col = "weight", ra_units='degree', dec_units='degree', patch_centers=self.get_input('patch_centers'), save_patch_dir=self.get_patch_dir("binned_shear_catalog", i), flip_g1 = self.config["flip_g1"], flip_g2 = self.config["flip_g2"], ) return cat def get_lens_catalog(self, i): import treecorr # Load and calibrate the appropriate bin data cat = treecorr.Catalog( self.get_input("binned_lens_catalog"), ext = f"/lens/bin_{i}", ra_col = "ra", dec_col = "dec", w_col = "weight", ra_units='degree', dec_units='degree', patch_centers=self.get_input('patch_centers'), save_patch_dir=self.get_patch_dir('binned_lens_catalog', i), ) return cat def get_random_catalog(self, i): import treecorr if not self.config["use_randoms"]: return None rancat = treecorr.Catalog( self.get_input("binned_random_catalog"), ext = f"/randoms/bin_{i}", ra_col = "ra", dec_col = "dec", ra_units='degree', dec_units='degree', patch_centers=self.get_input('patch_centers'), save_patch_dir=self.get_patch_dir('binned_random_catalog', i), ) return rancat def calculate_shear_shear(self, i, j): import treecorr cat_i = self.get_shear_catalog(i) n_i = cat_i.nobj if i==j: cat_j = None n_j = n_i else: cat_j = self.get_shear_catalog(j) n_j = cat_j.nobj if self.rank == 0: print(f"Calculating shear-shear bin pair ({i},{j}): {n_i} x {n_j} objects using MPI") gg = treecorr.GGCorrelation(self.config) t1 = perf_counter() gg.process(cat_i, cat_j, low_mem=self.config["low_mem"], comm=self.comm) t2 = perf_counter() if self.rank == 0: print(f"Processing took {t2 - t1:.1f} seconds") return gg def calculate_shear_pos(self, i, j): import treecorr cat_i = self.get_shear_catalog(i) n_i = cat_i.nobj cat_j = self.get_lens_catalog(j) rancat_j = self.get_random_catalog(j) n_j = cat_j.nobj n_rand_j = rancat_j.nobj if rancat_j is not None else 0 if self.rank == 0: print(f"Calculating shear-position bin pair ({i},{j}): {n_i} x {n_j} objects, {n_rand_j} randoms") ng = treecorr.NGCorrelation(self.config) t1 = perf_counter() ng.process(cat_j, cat_i, comm=self.comm, low_mem=self.config["low_mem"]) if rancat_j: rg = treecorr.NGCorrelation(self.config) rg.process(rancat_j, cat_i, comm=self.comm, low_mem=self.config["low_mem"]) else: rg = None if self.rank == 0: ng.calculateXi(rg=rg) t2 = perf_counter() print(f"Processing took {t2 - t1:.1f} seconds") return ng def calculate_pos_pos(self, i, j): import treecorr cat_i = self.get_lens_catalog(i) rancat_i = self.get_random_catalog(i) n_i = cat_i.nobj n_rand_i = rancat_i.nobj if rancat_i is not None else 0 if i==j: cat_j = None rancat_j = rancat_i n_j = n_i n_rand_j = n_rand_i else: cat_j = self.get_lens_catalog(j) rancat_j = self.get_random_catalog(j) n_j = cat_j.nobj n_rand_j = rancat_j.nobj if self.rank == 0: print(f"Calculating position-position bin pair ({i}, {j}): {n_i} x {n_j} objects, {n_rand_i} x {n_rand_j} randoms") t1 = perf_counter() nn = treecorr.NNCorrelation(self.config) nn.process(cat_i, cat_j, comm=self.comm, low_mem=self.config["low_mem"]) nr = treecorr.NNCorrelation(self.config) nr.process(cat_i, rancat_j, comm=self.comm, low_mem=self.config["low_mem"]) # The next calculation is faster if we explicitly tell TreeCorr # that its two catalogs here are the same one. if i == j: rancat_j = None rr = treecorr.NNCorrelation(self.config) rr.process(rancat_i, rancat_j, comm=self.comm, low_mem=self.config["low_mem"]) if i==j: rn = None else: rn = treecorr.NNCorrelation(self.config) rn.process(rancat_i, cat_j, comm=self.comm, low_mem=self.config["low_mem"]) if self.rank == 0: t2 = perf_counter() nn.calculateXi(rr, dr=nr, rd=rn) print(f"Processing took {t2 - t1:.1f} seconds") return nn def read_metadata(self): meta_data = self.open_input('tracer_metadata') area = meta_data['tracers'].attrs['area'] sigma_e = meta_data['tracers/sigma_e'][:] N_eff = meta_data['tracers/N_eff'][:] mean_e1 = meta_data['tracers/mean_e1'][:] mean_e2 = meta_data['tracers/mean_e2'][:] meta = {} meta["neff"] = N_eff meta["area"] = area meta["sigma_e"] = sigma_e meta["mean_e1"] = mean_e1 meta["mean_e2"] = mean_e2 return meta
if __name__ == '__main__': PipelineStage.main()