import numpy as np
from parallel_statistics import ParallelMeanVariance, ParallelMean
[docs]def read_shear_catalog_type(stage):
"""
Determine the type of shear catalog a stage is using as input.
Returns a string, e.g. metacal, lensfit.
Also sets shear_catalog_type in the stage's configuration
so that it is available later and is saved in output.
"""
with stage.open_input('shear_catalog', wrapper=True) as f:
shear_catalog_type = f.catalog_type
stage.config['shear_catalog_type'] = shear_catalog_type
return shear_catalog_type
def metacal_variants(*names):
return [
name + suffix
for suffix in ['', '_1p', '_1m', '_2p', '_2m']
for name in names
]
def band_variants(bands, *names, shear_catalog_type='metacal'):
if shear_catalog_type=='metacal':
return [
name + "_" + band + suffix
for suffix in ['', '_1p', '_1m', '_2p', '_2m']
for band in bands
for name in names
]
else:
return [
name + "_" + band
for band in bands
for name in names
]
def calculate_selection_response(g1, g2, sel_1p, sel_2p, sel_1m, sel_2m, delta_gamma):
import numpy as np
S = np.ones((2,2))
S_11 = (g1[sel_1p].mean() - g1[sel_1m].mean()) / delta_gamma
S_12 = (g1[sel_2p].mean() - g1[sel_2m].mean()) / delta_gamma
S_21 = (g2[sel_1p].mean() - g2[sel_1m].mean()) / delta_gamma
S_22 = (g2[sel_2p].mean() - g2[sel_2m].mean()) / delta_gamma
# Also save the selection biases as a matrix.
S[0,0] = S_11
S[0,1] = S_12
S[1,0] = S_21
S[1,1] = S_22
return S
def calculate_shear_response(g1_1p,g1_2p,g1_1m,g1_2m,g2_1p,g2_2p,g2_1m,g2_2m,delta_gamma):
import numpy as np
n = len(g1_1p)
R = R = np.zeros((n,2,2))
R_11 = (g1_1p - g1_1m) / delta_gamma
R_12 = (g1_2p - g1_2m) / delta_gamma
R_21 = (g2_1p - g2_1m) / delta_gamma
R_22 = (g2_2p - g2_2m) / delta_gamma
R[:,0,0] = R_11
R[:,0,1] = R_12
R[:,1,0] = R_21
R[:,1,1] = R_22
R = np.mean(R, axis=0)
return R
def apply_metacal_response(R, S, g1, g2):
# The values of R are assumed to already
# have had appropriate weights included
from numpy.linalg import pinv
import numpy as np
mcal_g = np.stack([g1,g2], axis=1)
R_total = R+S
# Invert the responsivity matrix
Rinv = pinv(R_total)
mcal_g = (Rinv @ mcal_g.T)
return mcal_g[0], mcal_g[1]
def apply_lensfit_calibration(g1, g2, weight, c1=0, c2=0, sigma_e=0, m=0):
w_tot = np.sum(weight)
m = np.sum(weight*m)/w_tot #if m not provided, default is m=0, so one_plus_K=1
one_plus_K = 1.+m
R = 1. - np.sum(weight*sigma_e)/w_tot
g1 = (1./(one_plus_K))*((g1/R)-c1)
g2 = (1./(one_plus_K))*((g2/R)-c2)
return g1, g2, weight, one_plus_K
class _DataWrapper:
"""
This little helper class wraps dictionaries
or other mappings, and whenever something is
looked up in it, it first checks if there is
a column with the specified suffix present instead
and returns that if so.
"""
def __init__(self, data, suffix):
"""Create
Parameters
----------
data: dict
suffix: str
"""
self.suffix = suffix
self.data = data
def __getitem__(self, name):
variant_name = name + self.suffix
if variant_name in self.data:
return self.data[variant_name]
else:
return self.data[name]
def __contains__(self, name):
return (name in self.data)
[docs]class LensfitCalculator:
"""
This class builds up the total calibration
factors for lensfit-convention shears from each chunk of data it is given.
Note here we derive the c-terms from the data (in constrast to averaging
values derived from simulations and stored in the catalog.)
At the end an MPI communicator can be supplied to collect together
the results from the different processes.
"""
def __init__(self, selector, input_m_is_weighted=False):
"""
Initialize the Calibrator using the function you will use to select
objects. That function should take at least one argument,
the chunk of data to select on.
The selector can take further *args and **kwargs, passed in when adding
data.
Parameters
----------
selector: function
Function that selects objects
"""
self.selector = selector
# Create a set of calculators that will calculate (in parallel)
# the three quantities we need to compute the overall calibration
# We create these, then add data to them below, then collect them
# together over all the processes
self.K = ParallelMean(1)
self.C = ParallelMean(2)
self.count = 0
self.input_m_is_weighted = input_m_is_weighted
[docs] def add_data(self, data, *args, **kwargs):
"""Select objects from a new chunk of data and tally their responses
Parameters
----------
data: dict
Dictionary of data columns to select on and add
*args
Positional arguments to be passed to the selection function
**kwargs
Keyword arguments to be passed to the selection function
"""
# These all wrap the catalog such that lookups find the variant
# column if available
# This is just to let the selection tools access data.variant for feedback
data = _DataWrapper(data, '')
sel = self.selector(data, *args, **kwargs)
# Extract the calibration quantities for the selected objects
w = data['weight']
K = data['m']
g1 = data['g1']
g2 = data['g2']
n = g1[sel].size
# Record the count for this chunk, for summation later
self.count += n
# Accumulate the calibration quantities so that later we
# can compute the weighted mean of the values
if self.input_m_is_weighted:
# if the m values are already weighted don't use the weights here
self.K.add_data(0, K[sel])
else:
# if not apply the weights
self.K.add_data(0, K[sel], w[sel])
self.C.add_data(0, g1[sel], w[sel])
self.C.add_data(1, g2[sel], w[sel])
return sel
[docs] def collect(self, comm=None, allgather=False):
"""
Finalize and sum up all the response values, returning calibration
quantities.
Parameters
----------
comm: MPI Communicator
If supplied, all processors response values will be combined together.
All processes will return the same final value
Returns
-------
K: float
K = (1+m) calibration
C: float array
c1, c2 additive biases (weighted average of g1 and g2)
"""
# The total number of objects is just the
# number from all the processes summed together.
if comm is not None:
if allgather:
count = comm.allreduce(self.count)
else:
count = comm.reduce(self.count)
else:
count = self.count
# Collect the weighted means of these numbers.
# this collects all the values from the different
# processes and over all the chunks of data
mode = ("allgather" if allgather else "gather")
_ ,K = self.K.collect(comm, mode)
_, C = self.C.collect(comm, mode)
return K, C, count
[docs]class HSCCalculator:
"""
This class builds up the total response calibration
factors for HSC-convention shear-calibration from each chunk of data it is
given.
At the end an MPI communicator can be supplied to collect together
the results from the different processes.
"""
def __init__(self, selector):
"""
Initialize the Calibrator using the function you will use to select
objects. That function should take at least one argument,
the chunk of data to select on.
The selector can take further *args and **kwargs, passed in when adding
data.
Parameters
----------
selector: function
Function that selects objects
"""
self.selector = selector
# Create a set of calculators that will calculate (in parallel)
# the three quantities we need to compute the overall calibration
# We create these, then add data to them below, then collect them
# together over all the processes
self.K = ParallelMean(1)
self.R = ParallelMean(1)
self.count = 0
[docs] def add_data(self, data, *args, **kwargs):
"""Select objects from a new chunk of data and tally their responses
Parameters
----------
data: dict
Dictionary of data columns to select on and add
*args
Positional arguments to be passed to the selection function
**kwargs
Keyword arguments to be passed to the selection function
"""
# These all wrap the catalog such that lookups find the variant
# column if available
# This is just to let the selection tools access data.variant for feedback
data = _DataWrapper(data, '')
sel = self.selector(data, *args, **kwargs)
# Extract the calibration quantities for the selected objects
w = data['weight']
K = data['m']
R = 1.0 - data['sigma_e'] ** 2
n = w[sel].size
self.count += w.size
w = w[sel]
# Accumulate the calibration quantities so that later we
# can compute the weighted mean of the values
self.R.add_data(0, R[sel], w)
self.K.add_data(0, K[sel], w)
return sel
[docs] def collect(self, comm=None, allgather=False):
"""
Finalize and sum up all the response values, returning calibration
quantities.
Parameters
----------
comm: MPI Communicator
If supplied, all processors response values will be combined together.
All processes will return the same final value
Returns
-------
R: float
R calibration factor
K: float
K = (1+m) calibration
N: int
Total object count
"""
# The total number of objects is just the
# number from all the processes summed together.
if comm is not None:
if allgather:
count = comm.allreduce(self.count)
else:
count = comm.reduce(self.count)
else:
count = self.count
# Collect the weighted means of these numbers.
# this collects all the values from the different
# processes and over all the chunks of data
mode = ("allgather" if allgather else "gather")
_, R = self.R.collect(comm, mode)
_ ,K = self.K.collect(comm, mode)
return R, K, count
class MeanShearInBins:
def __init__(self, x_name, limits, delta_gamma, cut_source_bin=False, shear_catalog_type='metacal'):
self.x_name = x_name
self.limits = limits
self.delta_gamma = delta_gamma
self.cut_source_bin = cut_source_bin
self.shear_catalog_type = shear_catalog_type
self.size = len(self.limits) - 1
# We have to work out the mean g1, g2
self.g1 = ParallelMeanVariance(self.size)
self.g2 = ParallelMeanVariance(self.size)
self.x = ParallelMean(self.size)
if shear_catalog_type=='metacal':
self.calibrators = [MetacalCalculator(self.selector, delta_gamma) for i in range(self.size)]
elif shear_catalog_type=='lensfit':
self.calibrators = [LensfitCalculator(self.selector) for i in range(self.size)]
elif shear_catalog_type=='hsc':
self.calibrators = [HSCCalculator(self.selector) for i in range(self.size)]
else:
raise ValueError(f"Please specify metacal, lensfit or hsc for shear_catalog in config.")
def selector(self, data, i):
x = data[self.x_name]
w = (x > self.limits[i]) & (x < self.limits[i+1])
if self.cut_source_bin:
w &= data['source_bin'] !=-1
return np.where(w)
def add_data(self, data):
for i in range(self.size):
w = self.calibrators[i].add_data(data, i)
weight = data['weight'][w]
if self.shear_catalog_type=='metacal':
self.g1.add_data(i, data['mcal_g1'][w], weight)
self.g2.add_data(i, data['mcal_g2'][w], weight)
elif self.shear_catalog_type=='lensfit':
self.g1.add_data(i, data['g1'][w], weight)
self.g2.add_data(i, data['g2'][w], weight)
elif self.shear_catalog_type=='hsc':
self.g1.add_data(i, data['g1'][w]-data['c1'][w], weight)
self.g2.add_data(i, data['g2'][w]-data['c2'][w], weight)
self.x.add_data(i, data[self.x_name][w], weight)
def collect(self, comm=None):
count1, g1, var1 = self.g1.collect(comm, mode='gather')
count2, g2, var2 = self.g2.collect(comm, mode='gather')
_, mu = self.x.collect(comm, mode='gather')
# Now we have the complete sample we can get the calibration matrix
# to apply to it.
R = []
K = []
C =[]
for i in range(self.size):
if self.shear_catalog_type=='metacal':
# Tell the Calibrators to work out the responses
r, s, _ = self.calibrators[i].collect(comm)
# and record the total (a 2x2 matrix)
R.append(r+s)
elif self.shear_catalog_type=='lensfit':
k, c, _ = self.calibrators[i].collect(comm)
K.append(k)
C.append(c)
else:
r, k, _ = self.calibrators[i].collect(comm)
K.append(k)
R.append(r)
# Only the root processor does the rest
if (comm is not None) and (comm.Get_rank() != 0):
return None, None, None, None, None
sigma1 = np.zeros(self.size)
sigma2 = np.zeros(self.size)
for i in range(self.size):
# Get the shears and the errors on their means
g = [g1[i], g2[i]]
sigma = np.sqrt([var1[i]/count1[i], var2[i]/count2[i]])
if self.shear_catalog_type=='metacal':
# Get the inverse response matrix to apply
R_inv = np.linalg.inv(R[i])
# Apply the matrix in full to the shears and errors
g1[i], g2[i] = R_inv @ g
sigma1[i], sigma2[i] = R_inv @ sigma
elif self.shear_catalog_type=='lensfit':
g1[i] = g1[i]*(1./(1+K[i]))
g2[i] = g2[i]*(1./(1+K[i]))
sigma1[i] = (1./(1+K[i]))*(sigma[0])
sigma2[i] = (1./(1+K[i]))*(sigma[1])
else:
g1[i] = (g1[i] / (2 * R[i]))/ (1 + K[i])
g2[i] = (g2[i] / (2 * R[i]))/ (1 + K[i])
sigma1[i] = (sigma[0] / (2 * R[i]))/ (1 + K[i])
sigma2[i] = (sigma[1] / (2 * R[i]))/ (1 + K[i])
return mu, g1, g2, sigma1, sigma2