from .base_stage import PipelineStage
from .data_types import ShearCatalog, HDFFile
from .utils.calibration_tools import band_variants, metacal_variants
import numpy as np
from .utils.timer import Timer
[docs]class TXCosmoDC2Mock(PipelineStage):
"""
This stage simulates metacal data and metacalibrated
photometry measurements, starting from a cosmology catalogs
of the kind used as an input to DC2 image and obs-catalog simulations.
This is mainly useful for testing infrastructure in advance
of the DC2 catalogs being available, but might also be handy
for starting from a purer simulation.
"""
name='TXCosmoDC2Mock'
inputs = [
('response_model', HDFFile)
]
outputs = [
('shear_catalog', ShearCatalog),
('photometry_catalog', HDFFile),
]
config_options = {
'cat_name':'cosmoDC2',
'visits_per_band':165, # used in the noise simulation
'snr_limit': 4.0, # used to decide what objects to cut out
'max_size': 99999999999999, #for testing on smaller catalogs
'extra_cols': "", # string-separated list of columns to include
'max_npix':99999999999999,
'unit_response': False,
'cat_size': 0,
'flip_g2': True, # this matches the metacal definition, and the treecorr/namaster one
'apply_mag_cut': False, #used when comparing to descqa measurements
'Mag_r_limit': -19, # used to decide what objects to cut out
}
def data_iterator(self, gc):
# Columns we need from the cosmo simulation
cols = ['mag_true_u_lsst', 'mag_true_g_lsst',
'mag_true_r_lsst', 'mag_true_i_lsst',
'mag_true_z_lsst', 'mag_true_y_lsst',
'ra', 'dec',
'ellipticity_1_true', 'ellipticity_2_true',
'shear_1', 'shear_2',
'size_true',
'galaxy_id',
'redshift_true',
]
# Add any extra requestd columns
cols += self.config['extra_cols'].split()
it = gc.get_quantities(cols, return_iterator=True)
nfile = len(gc._file_list) if hasattr(gc, '_file_list') else 0
for i, data in enumerate(it):
if nfile:
j = i+1
print(f"Loading chunk {j}/{nfile}")
yield data
def run(self):
import GCRCatalogs
cat_name = self.config['cat_name']
self.bands = ('u', 'g', 'r', 'i', 'z', 'y')
print(f"Loading from catalog {cat_name}")
gc = GCRCatalogs.load_catalog(cat_name)
# GCR sometimes tries to read the entire catalog
# to measure its length rather than looking at metadata
# this can take a very long time.
# allow the user to say that already know it.
N = self.config['cat_size']
if N == 0:
N = len(gc)
print(f"Rank {self.rank} loaded: length = {N}.")
target_size = min(N, self.config['max_size'])
select_fraction = target_size / N
if target_size != N:
print(f"Will select approx {100*select_fraction:.2f}% of objects ({target_size})")
# Prepare output files
metacal_file = self.open_output('shear_catalog', parallel=self.is_mpi())
photo_file = self.open_output('photometry_catalog', parallel=self.is_mpi())
photo_cols = self.setup_photometry_output(photo_file, target_size)
metacal_cols = self.setup_metacal_output(metacal_file, target_size)
# Load the metacal response file
self.load_metacal_response_model()
# Keep track of catalog position
start = 0
count = 0
# Loop through chunks of
for data in self.data_iterator(gc):
# The initial chunk size, of all the input data.
# This will be reduced later as we remove objects
some_col = list(data.keys())[0]
chunk_size = len(data[some_col])
print(f"Process {self.rank} read chunk {count} - {count+chunk_size} of {N}")
count += chunk_size
# Select a random fraction of the catalog if we are cutting down
# We can't just take the earliest galaxies because they are ordered
# by redshift
if target_size != N:
select = np.random.uniform(size=chunk_size) < select_fraction
nselect = select.sum()
print(f"Cutting down to {nselect}/{chunk_size} objects")
for name in list(data.keys()):
data[name] = data[name][select]
# Simulate the various output data sets
mock_photometry = self.make_mock_photometry(data)
# Cut out any objects too faint to be detected and measured.
# We have to do this after the photometry, so that we know if
# the object is detected, but we can do it before making the mock
# metacal info, saving us some time simulating un-needed objects
self.remove_undetected(data, mock_photometry)
if self.config['apply_mag_cut']:
self.apply_magnitude_cut(data)
mock_metacal = self.make_mock_metacal(data, mock_photometry)
# The chunk size has now changed
some_col = list(mock_photometry.keys())[0]
chunk_size = len(mock_photometry[some_col])
# start is where this process should start writing this
# chunk of data. end is where the final process will finish
# writing, becoming the starting point for the whole next
# chunk over all the processes
start, end = self.next_output_indices(start, chunk_size)
# Save all output
self.write_output(start, target_size, photo_cols, metacal_cols, photo_file, mock_photometry, metacal_file, mock_metacal)
# The next iteration starts writing where the current one ends.
start = end
if start >= target_size:
break
# Tidy up
photo_file.close()
metacal_file.close()
self.truncate_outputs(end)
def truncate_outputs(self, n):
import h5py
if self.comm is not None:
self.comm.Barrier()
if self.rank == 0:
# all files should now be closed for all procs
print(f"Resizing all outupts to size {n}")
f = h5py.File(self.get_output('photometry_catalog'), 'r+')
g = f['photometry']
for col in list(g.keys()):
print(col)
g[col].resize((n,))
f.close()
f = h5py.File(self.get_output('shear_catalog'), 'r+')
g = f['shear']
for col in g.keys():
g[col].resize((n,))
def next_output_indices(self, start, chunk_size):
if self.comm is None:
end = start + chunk_size
else:
all_indices = self.comm.allgather(chunk_size)
starting_points = np.concatenate(([0], np.cumsum(all_indices)))
# use the old start to find the end point.
# the final starting point (not used below, since it is larger
# than the largest self.rank value) is the total data length
end = start + starting_points[-1]
start = start + starting_points[self.rank]
print(f"- Rank {self.rank} writing output to {start}-{start+chunk_size}")
return start, end
def setup_photometry_output(self, photo_file, target_size):
# Get a list of all the column names
cols = ['ra', 'dec', 'extendedness']
for band in self.bands:
cols.append(f'mag_{band}')
cols.append(f'mag_{band}_err')
cols.append(f'snr_{band}')
for col in self.config['extra_cols'].split():
cols.append(col)
# Make group for all the photometry
group = photo_file.create_group('photometry')
# Extensible columns becase we don't know the size yet.
# We will cut down the size at the end.
for col in cols:
group.create_dataset(col, (target_size,), maxshape=(target_size,), dtype='f8')
# The only non-float column for now
group.create_dataset('id', (target_size,), maxshape=(target_size,), dtype='i8')
return cols + ['id']
def setup_metacal_output(self, metacal_file, target_size):
# Get a list of all the column names
cols = (
['ra', 'dec', 'psf_g1', 'psf_g2', 'mcal_psf_g1', 'mcal_psf_g2', 'mcal_psf_T_mean']
+ metacal_variants('mcal_g1', 'mcal_g2', 'mcal_T', 'mcal_s2n', 'mcal_T_err')
+ band_variants('riz', 'mcal_mag', 'mcal_mag_err',shear_catalog_type='metacal')
+ ['weight']
)
cols += ['true_g1', 'true_g2', 'redshift_true']
# Make group for all the photometry
group = metacal_file.create_group('shear')
# Extensible columns becase we don't know the size yet.
# We will cut down the size at the end.
for col in cols:
group.create_dataset(col, (target_size,), maxshape=(target_size,), dtype='f8')
group.create_dataset('id', (target_size,), maxshape=(target_size,), dtype='i8')
group.create_dataset('mcal_flags', (target_size,), maxshape=(target_size,), dtype='i8')
return cols + ['id', 'mcal_flags']
[docs] def write_output(self, start, target_size, photo_cols, metacal_cols, photo_file, photo_data, metacal_file, metacal_data):
"""
Save the photometry we have just simulated to disc
Parameters
----------
photo_file: HDF File object
metacal_file: HDF File object
photo_data: dict
Dictionary of simulated photometry
metacal_data: dict
Dictionary of simulated metacal data
"""
# Work out the range of data to output (since we will be
# doing this in chunks). If we have cut down to a random
# subset of the catalog then we may have gone over the
# target length, depending on the random selection
n = len(photo_data['id'])
end = min(start + n, target_size)
#assert photo_data['id'].min()>0
# Save each column
for name in photo_cols:
photo_file[f'photometry/{name}'][start:end] = photo_data[name]
for name in metacal_cols:
metacal_file[f'shear/{name}'][start:end] = metacal_data[name]
def make_mock_photometry(self, data):
# The visit count affects the overall noise levels
n_visit = self.config['visits_per_band']
# Do all the work in the function below
photo = make_mock_photometry(n_visit, self.bands, data, self.config['unit_response'])
for col in self.config['extra_cols'].split():
photo[col] = data[col]
return photo
[docs] def apply_magnitude_cut(self, data):
"""
Allow for a cut in absolute magnitude.
"""
mag_limit = self.config['Mag_r_limit']
sel = data['Mag_true_r_sdss_z0']< mag_limit
ndet = sel.sum()
ntot = sel.size
fract = ndet*100./ntot
print(f"{ndet} objects pass magnitude cut out of {ntot} objects ({fract:.1f}%)")
# Remove all objects not selected
for key in list(data.keys()):
data[key] = data[key][sel]
[docs] def remove_undetected(self, data, photo):
"""
Strip out any undetected objects from the two
simulated data sets.
Use a configuration parameter snr_limit to decide
on the detection limit.
"""
snr_limit = self.config['snr_limit']
# This will become a boolean array in a minute when
# we OR it with an array
detected = False
# Check if detected in any band. Makes a boolean array
# Even though we started with just a single False.
for band in self.bands:
detected_in_band = photo[f'snr_{band}'] > snr_limit
not_detected_in_band = ~detected_in_band
# Set objects not detected in one band that are detected in another
# to inf magnitude in that band, and the SNR to zero.
# We have to do this for each of the variants also, because otherwise
# we end up with wildly different final SNR values later.
# This is the metadetection issue really!
for v in metacal_variants(f'snr_{band}'):
photo[v][not_detected_in_band] = 0.0
for v in metacal_variants(f'mag_{band}'):
photo[v][not_detected_in_band] = np.inf
# Record that we have detected this object at all
detected |= detected_in_band
# the protoDC2 sims have an edge with zero shear.
# Remove it.
zero_shear_edge = (abs(data['shear_1'])==0) & (abs(data['shear_2'])==0)
print("Removing {} objects with identically zero shear in both terms".format(zero_shear_edge.sum()))
detected &= (~zero_shear_edge)
# Print out interesting information
ndet = detected.sum()
ntot = detected.size
fract = ndet*100./ntot
print(f"Detected {ndet} out of {ntot} objects ({fract:.1f}%)")
# Remove all objects not detected in *any* band
# make a copy of the keys with list(photo.keys()) so we are not
# modifying during the iteration
for key in list(photo.keys()):
photo[key] = photo[key][detected]
for key in list(data.keys()):
data[key] = data[key][detected]
[docs]class TXBuzzardMock(TXCosmoDC2Mock):
"""
This stage simulates metacal data and metacalibrated
photometry measurements, starting from a cosmology catalogs
of the kind used as an input to DC2 image and obs-catalog simulations.
This is mainly useful for testing infrastructure in advance
of the DC2 catalogs being available, but might also be handy
for starting from a purer simulation.
"""
name='TXBuzzardMock'
inputs = [
('response_model', HDFFile)
]
outputs = [
('shear_catalog', ShearCatalog),
('photometry_catalog', HDFFile),
]
config_options = {
'cat_name':'buzzard',
'visits_per_band':165, # used in the noise simulation
'snr_limit':4.0, # used to decide what objects to cut out
'max_size': 99999999999999, #for testing on smaller catalogs
'extra_cols': "", # string-separated list of columns to include
'max_npix':99999999999999,
'unit_response': False,
'flip_g2': True, # this matches the metacal definition, and the treecorr/namaster one
}
[docs]def make_mock_photometry(n_visit, bands, data, unit_response):
"""
Generate a mock photometric table with noise added
This is mostly from LSE‐40 by
Zeljko Ivezic, Lynne Jones, and Robert Lupton
retrieved here:
http://faculty.washington.edu/ivezic/Teaching/Astr511/LSST_SNRdoc.pdf
"""
output = {}
nobj = data['galaxy_id'].size
output['ra'] = data['ra']
output['dec'] = data['dec']
output['id'] = data['galaxy_id']
output['extendedness'] = np.ones(nobj)
# Sky background, seeing FWHM, and system throughput,
# all from table 2 of Ivezic, Jones, & Lupton
B_b = np.array([85.07, 467.9, 1085.2, 1800.3, 2775.7, 3614.3])
fwhm = np.array([0.77, 0.73, 0.70, 0.67, 0.65, 0.63])
T_b = np.array([0.0379, 0.1493, 0.1386, 0.1198, 0.0838, 0.0413])
# effective pixels size for a Gaussian PSF, from equation
# 27 of Ivezic, Jones, & Lupton
pixel = 0.2 # arcsec
N_eff = 2.436 * (fwhm/pixel)**2
# other numbers from Ivezic, Jones, & Lupton
sigma_inst2 = 10.0**2 #instrumental noise in photons per pixel, just below eq 42
gain = 1 # ADU units per photon, also just below eq 42
D = 6.5 # primary mirror diameter in meters, from LSST key numbers page (effective clear diameter)
# Not sure what effective clear diameter means but it's closer to the number used in the paper
time = 30. # seconds per exposure, from LSST key numbers page
sigma_b2 = 0.0 # error on background, just above eq 42
# combination of these used below, from various equations
factor = 5455./gain * (D/6.5)**2 * (time/30.)
# Fake some metacal responses
if unit_response:
mag_responses = [0.0 for i in bands]
else:
mag_responses = generate_mock_metacal_mag_responses(bands, nobj)
delta_gamma = 0.01 # this is the half-delta gamma, i.e. gamma_+ - gamma_0
# that's the right thing to use here because we are doing m+ = m0 + dm/dy*dy
# Use the same response for gamma1 and gamma2
for band, b_b, t_b, n_eff, mag_resp in zip(bands, B_b, T_b, N_eff, mag_responses):
# truth magnitude
mag = data[f'mag_true_{band}_lsst']
output[f'mag_true_{band}_lsst'] = mag
# expected signal photons, over all visits
c_b = factor * 10**(0.4*(25-mag)) * t_b * n_visit
# expected background photons, over all visits
background = np.sqrt((b_b + sigma_inst2 + sigma_b2) * n_eff * n_visit)
# total expected photons
mu = c_b + background
# Observed number of photons in excess of the expected background.
# This can go negative for faint magnitudes, indicating that the object is
# not going to be detected
n_obs = np.random.poisson(mu) - background
n_obs_err = np.sqrt(mu)
# signal to noise, true and estimated values
true_snr = c_b / background
obs_snr = n_obs / background
# observed magnitude from inverting c_b expression above
mag_obs = 25 - 2.5*np.log10(n_obs/factor/t_b/n_visit)
# converting error on n_obs to error on mag
mag_err = 2.5/np.log(10.) / obs_snr
output[f'true_snr_{band}'] = true_snr
output[f'snr_{band}'] = obs_snr
output[f'mag_{band}'] = mag_obs
output[f'mag_err_{band}'] = mag_err
output[f'mag_{band}_err'] = mag_err
m = mag_resp*delta_gamma
m1 = mag_obs + m
m2 = mag_obs - m
mag_obs_1p = m1
mag_obs_1m = m2
output[f'mag_{band}_1p'] = m1
output[f'mag_{band}_1m'] = m2
output[f'mag_{band}_2p'] = m1
output[f'mag_{band}_2m'] = m2
# Scale the SNR values according the to change in magnitude.r
s = np.power(10., -0.4*m)
snr1 = obs_snr * s
snr2 = obs_snr / s
output[f'snr_{band}_1p'] = snr1
output[f'snr_{band}_1m'] = snr2
output[f'snr_{band}_2p'] = snr1
output[f'snr_{band}_2m'] = snr2
return output
def generate_mock_metacal_mag_responses(bands, nobj):
nband = len(bands)
mu = np.zeros(nband) # seems approx mean of response across bands, from HSC tract
rho = 0.25 # approx correlation between response in bands, from HSC tract
sigma2 = 1.7**2 # approx variance of response, from HSC tract
covmat = np.full((nband,nband), rho*sigma2)
np.fill_diagonal(covmat, sigma2)
mag_responses = np.random.multivariate_normal(mu, covmat, nobj).T
return mag_responses
def test():
import pylab
data = {
'ra':None,
'dec':None,
'galaxy_id':None,
}
bands = 'ugrizy'
n_visit=165
M5 = [24.22, 25.17, 24.74, 24.38, 23.80]
for b,m5 in zip(bands, M5):
data[f'mag_true_{b}_lsst'] = np.repeat(m5, 10000)
results = make_mock_photometry(n_visit, bands, data, True)
pylab.hist(results['snr_r'], bins=50, histtype='step')
pylab.savefig('snr_r.png')