from .base_stage import PipelineStage
from .data_types import ShearCatalog, HDFFile, FiducialCosmology, SACCFile
import numpy as np
import warnings
import os
import pickle
# require TJPCov to be in PYTHONPATH
d2r=np.pi/180
# Needed changes: 1) ell and theta spacing could be further optimized 2) coupling matrix
[docs]class TXFourierGaussianCovariance(PipelineStage):
name='TXFourierGaussianCovariance'
do_xi=False
inputs = [
('fiducial_cosmology', FiducialCosmology), # For the cosmological parameters
('twopoint_data_fourier', SACCFile), # For the binning information
('tracer_metadata', HDFFile), # For metadata
]
outputs = [
('summary_statistics_fourier', SACCFile),
]
config_options = {
'pickled_wigner_transform': '',
'use_true_shear': False,
'galaxy_bias': [0.],
}
def run(self):
import pyccl as ccl
import sacc
import tjpcov
import threadpoolctl
# read the fiducial cosmology
cosmo = self.read_cosmology()
# read binning
two_point_data = self.read_sacc()
# read the n(z) and f_sky from the source summary stats
meta = self.read_number_statistics()
# Binning choices. The ell binning is a linear piece with all the
# integer values up to 500 -- these are from firecrown, might need
# to change later
meta['ell'] = np.concatenate(
(np.linspace(2, 500-1, 500-2),
np.logspace(np.log10(500), np.log10(6e4), 500))
)
# Theta binning - log spaced between 1 .. 300 arcmin.
meta['theta'] = np.logspace(np.log10(1/60), np.log10(300./60), 3000)
#C_ell covariance
cov = self.compute_covariance(cosmo, meta, two_point_data=two_point_data)
self.save_outputs(two_point_data, cov)
def save_outputs(self, two_point_data, cov):
filename = self.get_output('summary_statistics_fourier')
two_point_data.add_covariance(cov)
two_point_data.save_fits(filename, overwrite=True)
def read_cosmology(self):
return self.open_input('fiducial_cosmology', wrapper=True).to_ccl()
def read_sacc(self):
import sacc
f = self.get_input('twopoint_data_fourier')
two_point_data = sacc.Sacc.load_fits(f)
# Remove the data types that we won't use for inference
mask = [
two_point_data.indices(sacc.standard_types.galaxy_shear_cl_ee),
two_point_data.indices(sacc.standard_types.galaxy_shearDensity_cl_e),
two_point_data.indices(sacc.standard_types.galaxy_density_cl),
# not doing b-modes, do we want to?
]
print("Length before cuts = ", len(two_point_data))
mask = np.concatenate(mask)
two_point_data.keep_indices(mask)
print("Length after cuts = ", len(two_point_data))
two_point_data.to_canonical_order()
return two_point_data
def read_number_statistics(self):
input_data = self.open_input('tracer_metadata')
# per-bin quantities
N_eff = input_data['tracers/N_eff'][:]
N_lens = input_data['tracers/lens_counts'][:]
if self.config['use_true_shear']:
nbins = len(input_data['tracers/sigma_e'][:])
sigma_e = np.array([0. for i in range(nbins)])
else:
sigma_e = input_data['tracers/sigma_e'][:]
# area in sq deg
area_deg2 = input_data['tracers'].attrs['area']
area_unit = input_data['tracers'].attrs['area_unit']
if area_unit != 'deg^2':
raise ValueError("Units of area have changed")
input_data.close()
# area in steradians and sky fraction
area = area_deg2 * np.radians(1)**2
area_arcmin2 = area_deg2 * 60**2
full_sky = 4*np.pi
f_sky = area / full_sky
# Density information from counts
n_eff = N_eff / area
n_lens = N_lens / area
# for printing out only
n_eff_arcmin = N_eff / area_arcmin2
n_lens_arcmin = N_lens / area_arcmin2
# Feedback
print(f"area = {area_deg2:.1f} deg^2")
print(f"f_sky: {f_sky}")
print(f"N_eff: {N_eff} (totals)")
print(f"N_lens: {N_lens} (totals)")
print(f"n_eff: {n_eff} / steradian")
print(f" = {np.around(n_eff_arcmin,2)} / sq arcmin")
print(f"lens density: {n_lens} / steradian")
print(f" = {np.around(n_lens_arcmin,2)} / arcmin")
# Pass all this back as a dictionary
meta = {
'f_sky': f_sky,
'sigma_e': sigma_e,
'n_eff': n_eff,
'n_lens': n_lens,
}
return meta
def get_tracer_info(self, cosmo, meta, two_point_data):
# Generates CCL tracers from n(z) information in the data file
import pyccl as ccl
ccl_tracers={}
tracer_noise={}
for tracer in two_point_data.tracers:
# Pull out the integer corresponding to the tracer index
tracer_dat = two_point_data.get_tracer(tracer)
nbin = int(two_point_data.tracers[tracer].name.split("_")[1])
z = tracer_dat.z.copy().flatten()
nz = tracer_dat.nz.copy().flatten()
# Identify source tracers and gnerate WeakLensingTracer objects
# based on them
if 'source' in tracer or 'src' in tracer:
sigma_e = meta['sigma_e'][nbin]
n_eff = meta['n_eff'][nbin]
ccl_tracers[tracer] = ccl.WeakLensingTracer(cosmo, dndz=(z, nz)) #CCL automatically normalizes dNdz
tracer_noise[tracer] = sigma_e**2 / n_eff
# or if it is a lens bin then generaete the corresponding
# CCL tracer class
elif 'lens' in tracer:
# Get galaxy bias for this sample. Default value = 1.
if self.config['galaxy_bias'] == [0.]:
b0 = 1
print(f"Using galaxy bias = 1 for {tracer} (since you didn't specify any biases)")
else:
b0 = self.config['galaxy_bias'][nbin]
print(f"Using galaxy bias = {b0} for {tracer}")
b = b0 * np.ones(len(z))
n_gal = meta['n_lens'][nbin]
tracer_noise[tracer] = 1 / n_gal
ccl_tracers[tracer] = ccl.NumberCountsTracer(cosmo, has_rsd=False, dndz=(z,nz), bias=(z,b))
return ccl_tracers, tracer_noise
def get_spins(self, tracer_comb):
# Get the Wigner Transform factors
WT_factors={}
WT_factors['lens','source'] = (0, 2)
WT_factors['source','lens'] = (2, 0) #same as (0,2)
WT_factors['source','source'] = {'plus':(2,2), 'minus':(2, -2)}
WT_factors['lens','lens'] = (0, 0)
tracers=[]
for i in tracer_comb:
if 'lens' in i:
tracers+=['lens']
if 'source' in i:
tracers+=['source']
return WT_factors[tuple(tracers)]
# compute a single covariance matrix for a given pair of C_ell or xi.
def compute_covariance_block(self, cosmo, meta, ell_bins,
tracer_comb1=None, tracer_comb2=None, ccl_tracers=None, tracer_Noise=None,
two_point_data=None,
xi_plus_minus1='plus', xi_plus_minus2='plus',
cache=None, WT=None,
):
import pyccl as ccl
from tjpcov import bin_cov
cl = {}
# tracers 1,2,3,4 = tracer_comb1[0], tracer_comb1[1], tracer_comb2[0], tracer_comb2[1]
# In the dicts below we use '13' to indicate C_ell_(1,3), etc.
# This index maps to this usae
reindex = {
(0, 0): 13,
(1, 1): 24,
(0, 1): 14,
(1, 0): 23,
}
ell = meta['ell']
# Getting all the C_ell that we need, saving the results in a cache
# for later re-use
for i in (0,1):
for j in (0,1):
local_key = reindex[(i,j)]
# For symmetric pairs we may have saved the C_ell the other
# way around, so try both keys
cache_key1 = (tracer_comb1[i], tracer_comb2[j])
cache_key2 = (tracer_comb2[j], tracer_comb1[i])
if cache_key1 in cache:
cl[local_key] = cache[cache_key1]
elif cache_key2 in cache:
cl[local_key] = cache[cache_key2]
else:
# If not cached then we must compute
t1 = tracer_comb1[i]
t2 = tracer_comb2[j]
c = ccl.angular_cl(cosmo, ccl_tracers[t1], ccl_tracers[t2], ell)
print("Computed C_ell for ", cache_key1)
cache[cache_key1] = c
cl[local_key] = c
# The shape noise C_ell values.
# These are zero for cross bins and as computed earlier for auto bins
SN={}
SN[13] = tracer_Noise[tracer_comb1[0]] if tracer_comb1[0] == tracer_comb2[0] else 0
SN[24] = tracer_Noise[tracer_comb1[1]] if tracer_comb1[1] == tracer_comb2[1] else 0
SN[14] = tracer_Noise[tracer_comb1[0]] if tracer_comb1[0] == tracer_comb2[1] else 0
SN[23] = tracer_Noise[tracer_comb1[1]] if tracer_comb1[1] == tracer_comb2[0] else 0
# The overall normalization factor at the front of the matrix
if self.do_xi:
norm = np.pi * 4 * meta['f_sky']
else:
norm = (2*ell + 1) * np.gradient(ell) * meta['f_sky']
# The coupling is an identity matrix at least when we neglect
# the mask
coupling_mat = {}
coupling_mat[1324] = np.eye(len(ell))
coupling_mat[1423] = np.eye(len(ell))
# Initial covariance of C_ell components
cov = {}
cov[1324] = np.outer(cl[13] + SN[13], cl[24] + SN[24]) * coupling_mat[1324]
cov[1423] = np.outer(cl[14] + SN[14], cl[23] + SN[23]) * coupling_mat[1423]
# for shear-shear components we also add a B-mode contribution
first_is_shear_shear = ('source' in tracer_comb1[0]) and ('source' in tracer_comb1[1])
second_is_shear_shear = ('source' in tracer_comb2[0]) and ('source' in tracer_comb2[1])
if self.do_xi and (first_is_shear_shear or second_is_shear_shear):
# this adds the B-mode shape noise contribution.
# We assume B-mode power (C_ell) is 0
Bmode_F = 1
if xi_plus_minus1 != xi_plus_minus2:
# in the cross term, this contribution is subtracted.
# eq. 29-31 of https://arxiv.org/pdf/0708.0387.pdf
Bmode_F=-1
# below the we multiply zero to maintain the shape of the Cl array, these are effectively
# B-modes
cov[1324] += np.outer(cl[13]*0 + SN[13], cl[24]*0 + SN[24]) * coupling_mat[1324] * Bmode_F
cov[1423] += np.outer(cl[14]*0 + SN[14], cl[23]*0 + SN[23]) * coupling_mat[1423] * Bmode_F
cov['final']=cov[1423]+cov[1324]
if self.do_xi:
s1_s2_1 = self.get_spins(tracer_comb1)
s1_s2_2 = self.get_spins(tracer_comb2)
# For the shear-shear we have two sets of spins, plus and minus,
# which are returned as a dict, so we need to pull out the one we need
# Otherwise it's just specified as a tuple, e.g. (2,0)
if isinstance(s1_s2_1, dict):
s1_s2_1 = s1_s2_1[xi_plus_minus1]
if isinstance(s1_s2_2, dict):
s1_s2_2 = s1_s2_2[xi_plus_minus2]
# Use these terms to project the covariance from C_ell to xi(theta)
th, cov['final']=WT.projected_covariance2(
l_cl=ell, s1_s2=s1_s2_1, s1_s2_cross=s1_s2_2, cl_cov=cov['final'])
# Normalize
cov['final'] /= norm
# Put the covariance into bins.
# This is optional in the case of a C_ell covariance (only if bins in ell are
# supplied, otherwise the matrix is for each ell value individually). It is
# required for real-space covariances since these are always binned.
if self.do_xi:
thb, cov['final_b'] = bin_cov(r=th/d2r, r_bins=ell_bins, cov=cov['final'])
else:
if ell_bins is not None:
lb, cov['final_b'] = bin_cov(r=ell, r_bins=ell_bins, cov=cov['final'])
return cov
def get_angular_bins(self, two_point_data):
# Assume that the ell binning is the same for each of the bins.
# This is true in the current pipeline.
X = two_point_data.get_data_points('galaxy_shear_cl_ee',i=0,j=0)
# Further assume that the ell ranges are contiguous, so that
# the max value of one window is the min value of the next.
# So we just need the lower edges of each bin and then the
# final maximum value of the last bin
ell_edges = [x['window'].min for x in X]
ell_edges.append(X[-1]['window'].max)
return np.array(ell_edges)
def make_wigner_transform(self, meta):
import threadpoolctl
from tjpcov import wigner_transform
path = self.config['pickled_wigner_transform']
if path:
if os.path.exists(path):
print(f"Loading precomputed wigner transform from {path}")
WT = pickle.load(open(path, 'rb'))
return WT
else:
print(f"Precomputed wigner transform {path} not found.")
print("Will compute it and then save it.")
# We don't want to use n processes with n threads each by accident,
# where n is the number of CPUs we have
# so for this bit of the code, which uses python's multiprocessing,
# we limit the number of threads that numpy etc can use.
# After this is finished this will switch back to allowing all the CPUs
# to be used for threading instead.
num_processes = int(os.environ.get("OMP_NUM_THREADS", 1))
print("Generating Wigner Transform.")
with threadpoolctl.threadpool_limits(1):
WT = wigner_transform(
l = meta['ell'],
theta = meta['theta'] * d2r,
s1_s2 = [(2,2), (2,-2), (0,2), (2,0), (0,0)],
ncpu = num_processes,
)
print("Computed Wigner Transform.")
if path:
try:
pickle.dump(WT, open(path, 'wb'))
except OSError:
sys.stderr.write(f"Could not save wigner transform to {path}")
return WT
#compute all the covariances and then combine them into one single giant matrix
def compute_covariance(self, cosmo, meta, two_point_data):
from tjpcov import bin_cov
ccl_tracers,tracer_Noise = self.get_tracer_info(cosmo, meta, two_point_data=two_point_data)
# we will loop over all these
tracer_combs = two_point_data.get_tracer_combinations()
N2pt = len(tracer_combs)
WT = self.make_wigner_transform(meta)
# the bit below is just counting the number of 2pt functions, and accounting
# for the fact that xi needs to be double counted
N2pt0 = 0
if self.do_xi:
N2pt0 = N2pt
tracer_combs_temp = tracer_combs.copy()
for combo in tracer_combs:
if ('source' in combo[0]) and ('source' in combo[1]):
N2pt += 1
tracer_combs_temp += [combo]
tracer_combs = tracer_combs_temp.copy()
ell_bins = self.get_angular_bins(two_point_data)
Nell_bins = len(ell_bins) - 1
cov_full=np.zeros((Nell_bins*N2pt, Nell_bins*N2pt))
count_xi_pm1 = 0
count_xi_pm2 = 0
cl_cache = {}
xi_pm = [[('plus','plus'), ('plus', 'minus')], [('minus','plus'), ('minus', 'minus')]]
print("Total number of 2pt functions:", N2pt)
print("Number of 2pt functions without xim:", N2pt0)
# Look through the chunk of matrix, tracer pair by tracer pair
# Order of the covariance needs to be the cannonical order of saac. For a 3x2pt matrix that is:
# -galaxy_density_xi
# -galaxy_shearDensity_xi_t
# -galaxy_shear_xi_minus
# -galaxy_shear_xi_plus
xim_start = N2pt0-(N2pt-N2pt0)
xim_end = N2pt0
for i in range(N2pt):
tracer_comb1 = tracer_combs[i]
count_xi_pm1 = 1 if i in range(xim_start, xim_end) else 0
for j in range(i, N2pt):
tracer_comb2 = tracer_combs[j]
print(f"Computing {tracer_comb1} x {tracer_comb2}: chunk ({i},{j}) of ({N2pt},{N2pt})")
count_xi_pm2 = 1 if j in range(xim_start, xim_end) else 0
if self.do_xi and ('source' in tracer_comb1[0] and 'source' in tracer_comb1[1]) or ('source' in tracer_comb2[0] and 'source' in tracer_comb2[1]):
cov_ij = self.compute_covariance_block(
cosmo,
meta,
ell_bins,
tracer_comb1=tracer_comb1,
tracer_comb2=tracer_comb2,
ccl_tracers=ccl_tracers,
tracer_Noise=tracer_Noise,
two_point_data=two_point_data,
xi_plus_minus1=xi_pm[count_xi_pm1][count_xi_pm2][0],
xi_plus_minus2=xi_pm[count_xi_pm1][count_xi_pm2][1],
cache=cl_cache,
WT=WT,
)
else:
cov_ij = self.compute_covariance_block(
cosmo,
meta,
ell_bins,
tracer_comb1=tracer_comb1,
tracer_comb2=tracer_comb2,
ccl_tracers=ccl_tracers,
tracer_Noise=tracer_Noise,
two_point_data=two_point_data,
cache=cl_cache,
WT=WT,
)
# Fill in this chunk of the matrix
cov_ij = cov_ij['final_b']
# Find the right location in the matrix
start_i = i * Nell_bins
start_j = j * Nell_bins
end_i = start_i + Nell_bins
end_j = start_j + Nell_bins
# and fill it in, and the transpose component
cov_full[start_i:end_i, start_j:end_j] = cov_ij
cov_full[start_j:end_j, start_i:end_i] = cov_ij.T
try:
np.linalg.cholesky(cov_full)
except:
print("liAnalg.LinAlgError: Covariance not positive definite! "
"Most likely this is a problem in xim. "
"We will continue for now but this needs to be fixed.")
return cov_full
[docs]class TXRealGaussianCovariance(TXFourierGaussianCovariance):
name='TXRealGaussianCovariance'
do_xi = True
inputs = [
('fiducial_cosmology', FiducialCosmology), # For the cosmological parameters
('twopoint_data_real', SACCFile), # For the binning information
('tracer_metadata', HDFFile), # For metadata
]
outputs = [
('summary_statistics_real', SACCFile),
]
config_options = {
'min_sep':2.5, # arcmin
'max_sep':250,
'nbins':20,
'pickled_wigner_transform': '',
'use_true_shear': False,
'galaxy_bias': [0.],
}
def run(self):
super().run()
def get_angular_bins(self, two_point_data):
# this should be changed to read from sacc file
th_arcmin = np.logspace(np.log10(self.config['min_sep']), np.log10(self.config['max_sep']), self.config['nbins']+1)
return th_arcmin/60.0
def read_sacc(self):
import sacc
f = self.get_input('twopoint_data_real')
two_point_data = sacc.Sacc.load_fits(f)
mask = [
two_point_data.indices(sacc.standard_types.galaxy_density_xi),
two_point_data.indices(sacc.standard_types.galaxy_shearDensity_xi_t),
two_point_data.indices(sacc.standard_types.galaxy_shear_xi_plus),
two_point_data.indices(sacc.standard_types.galaxy_shear_xi_minus),
]
mask = np.concatenate(mask)
two_point_data.keep_indices(mask)
two_point_data.to_canonical_order()
return two_point_data
def save_outputs(self, two_point_data, cov):
filename = self.get_output('summary_statistics_real')
two_point_data.add_covariance(cov)
two_point_data.save_fits(filename, overwrite=True)