Source code for txpipe.metacal_gcr_input
from .base_stage import PipelineStage
from .data_types import ShearCatalog, HDFFile
from .utils.calibration_tools import band_variants, metacal_variants
import numpy as np
import glob
import re
[docs]class TXMetacalGCRInput(PipelineStage):
"""
This stage simulates metacal data and metacalibrated
photometry measurements, starting from a cosmology catalogs
of the kind used as an input to DC2 image and obs-catalog simulations.
This is mainly useful for testing infrastructure in advance
of the DC2 catalogs being available, but might also be handy
for starting from a purer simulation.
"""
name='TXMetacalGCRInput'
inputs = []
outputs = [
('shear_catalog', ShearCatalog),
('photometry_catalog', HDFFile),
]
config_options = {
'cat_name': str,
'single_tract': '',
'length': 0,
'table_dir': '',
'data_release': '',
}
def run(self):
import GCRCatalogs
import GCR
import h5py
# Open input data. We do not treat this as a formal "input"
# since it's the starting point of the whol pipeline and so is
# not in a TXPipe format.
cat_name = self.config['cat_name']
# If set, override some keys from the config
config_overwrite={}
for key in ['table_dir', 'data_release']:
if self.config[key]:
config_overwrite[key] = self.config[key]
print(f"Loading catalog {cat_name} + {config_overwrite}")
cat = GCRCatalogs.load_catalog(cat_name, config_overwrite=config_overwrite)
# Total size is needed to set up the output file,
# although in larger files it is a little slow to compute this.
if self.config['length'] == 0:
n = len(cat)
print(f"Total catalog size = {n}")
else:
n = self.config['length']
print(f"Using fixed specified size = {n}")
# This option, which prevented memory leaks with a previous
# catalog format, appears not to work for Parquet catalogs,
# but the memory leaks don't happen in that case either.
try:
cat.master.use_cache = False
except AttributeError:
pass
available = cat.list_all_quantities()
bands = []
for b in 'ugrizy':
if f'mcal_mag_{b}' in available:
bands.append(b)
# Columns that we will need.
shear_cols = (['id', 'ra', 'dec', 'mcal_psf_g1', 'mcal_psf_g2', 'mcal_psf_T_mean', 'mcal_flags']
+ metacal_variants('mcal_g1', 'mcal_g2', 'mcal_T', 'mcal_s2n')
+ band_variants(bands, 'mcal_mag', 'mcal_mag_err',shear_catalog_type='metacal')
)
# Input columns for photometry
photo_cols = ['id', 'ra', 'dec', 'extendedness', 'tract']
# Photometry columns (non-metacal)
for band in 'ugrizy':
photo_cols.append(f'mag_{band}')
photo_cols.append(f'magerr_{band}')
photo_cols.append(f'snr_{band}_cModel')
# For shear we just add a weight column, and the non-rounded PSF estimates
shear_out_cols = shear_cols + ['weight', 'psf_g1', 'psf_g2']
# We want these in the input but not the output as we construct
# other values from them instead
shear_cols += ['IxxPSF', 'IxyPSF', 'IyyPSF']
# For the photometry output we strip off the _cModeel suffix.
photo_out_cols = [col[:-7] if col.endswith('_cModel') else col
for col in photo_cols]
# eliminate duplicates before loading
cols = list(set(shear_cols + photo_cols))
start = 0
shear_output = None
photo_output = None
# Loop through the data, as chunke natively by GCRCatalogs
single_tract = self.config['single_tract']
if single_tract:
kwargs = {'native_filters': f'tract == {single_tract}'}
print(f"Selecting one tract only: {single_tract}")
else:
kwargs = {}
for data in cat.get_quantities(cols, return_iterator=True, **kwargs):
# Some columns have different names in input than output
self.rename_columns(data)
self.add_weight_column(data)
# First chunk of data we use to set up the output
# It is easier this way (no need to check types etc)
# if we change the column list
if shear_output is None:
shear_output = self.setup_output('shear_catalog', 'shear', data, shear_out_cols, n)
photo_output = self.setup_output('photometry_catalog', 'photometry', data, photo_out_cols, n)
# Write out this chunk of data to HDF
end = start + len(data['ra'])
print(f" Saving {start} - {end}")
self.write_output(shear_output, 'shear', shear_out_cols, start, end, data)
self.write_output(photo_output, 'photometry', photo_out_cols, start, end, data)
start = end
# All done!
photo_output.close()
shear_output.close()
def rename_columns(self, data):
for band in 'ugrizy':
data[f'snr_{band}'] = data[f'snr_{band}_cModel']
del data[f'snr_{band}_cModel']
Ixx = data['IxxPSF']
Ixy = data['IxyPSF']
Iyy = data['IyyPSF']
data['psf_g1'], data['psf_g2'] = moments_to_shear(Ixx, Iyy, Ixy)
def setup_output(self, name, group, cat, cols, n):
import h5py
f = self.open_output(name)
g = f.create_group(group)
for name in cols:
g.create_dataset(name, shape=(n,), dtype=cat[name].dtype)
return f
def add_weight_column(self, data):
n = len(data['ra'])
data['weight'] = np.ones(n)
def write_output(self, output_file, group_name, cols, start, end, data):
g = output_file[group_name]
for name in cols:
g[name][start:end] = data[name]
[docs]class TXIngestStars(PipelineStage):
name = "TXIngestStars"
inputs = []
outputs = [
('star_catalog', HDFFile),
]
config_options = {
'single_tract': '',
'cat_name': str,
'length': 0,
}
def run(self):
import GCRCatalogs
import GCR
import h5py
from .utils.hdf_tools import repack
cat_name = self.config['cat_name']
cat = GCRCatalogs.load_catalog(cat_name)
# This is the max possible length of the stars.
# Actually much smaller of course
if self.config['length']:
n = self.config['length']
print(f"Using fixed size {n}")
else:
n = len(cat)
print(f"Full catalog size = {n}")
# Columns we need to load in for the star data -
# the measured object moments and the identifier telling us
# if it was used in PSF measurement
star_cols = [
'id',
'ra',
'dec',
'calib_psf_used',
'calib_psf_reserved',
'extendedness',
'tract',
'mag_u', 'mag_g', 'mag_r', 'mag_i', 'mag_z', 'mag_y',
'Ixx',
'Ixy',
'Iyy',
'IxxPSF',
'IxyPSF',
'IyyPSF',
]
# The star output names are mostly different to the input names
star_out_cols = [
# These are read directly
'id', 'ra', 'dec',
'calib_psf_used',
'calib_psf_reserved',
'extendedness',
'tract',
'mag_u', 'mag_g', 'mag_r', 'mag_i', 'mag_z', 'mag_y',
# These are calculated
'measured_e1', 'measured_e2',
'model_e1', 'model_e2',
'measured_T', 'model_T',
]
single_tract = self.config['single_tract']
if single_tract:
kwargs = {'native_filters': f'tract == {single_tract}'}
print(f"Selecting one tract only: {single_tract}")
else:
kwargs = {}
# As with the galaxy ingestion, this option doesn't
# work with Parquet catalogs.
try:
cat.master.use_cache = False
except AttributeError:
pass
start = 0
star_start = 0
star_output = None
for data in cat.get_quantities(star_cols, return_iterator=True, **kwargs):
end = start + len(data['ra'])
print(f"Reading data {start:,} - {end:,}")
# Some columns have different names in input than output
star_data = self.compute_star_data(data)
star_end = star_start + len(star_data['ra'])
if star_output is None:
star_output = self.setup_output('star_catalog', 'stars', star_data, star_out_cols, n)
self.write_output(star_output, 'stars', star_out_cols, star_start, star_end, star_data)
start = end
star_start = star_end
# Cut down to just include stars.
for col in star_out_cols:
h5py_shorten(star_output['stars'], col, star_end)
star_output.close()
# Run h5repack on the file
repack(self.get_output('star_catalog'))
def setup_output(self, name, group, cat, cols, n):
import h5py
f = self.open_output(name)
g = f.create_group(group)
for name in cols:
g.create_dataset(name, shape=(n,), dtype=cat[name].dtype)
return f
def write_output(self, output_file, group_name, cols, start, end, data):
g = output_file[group_name]
for name in cols:
g[name][start:end] = data[name]
def compute_star_data(self, data):
star_data = {}
# We specifically use the stars chosen for PSF measurement
star = data['calib_psf_used'] | data['calib_psf_reserved']
for col in ['id', 'ra', 'dec',
'calib_psf_used',
'calib_psf_reserved',
'extendedness',
'tract']:
star_data[col] = data[col][star]
for b in 'ugrizy':
star_data[f'mag_{b}'] = data[f'mag_{b}'][star]
# HSM reports moments. We convert these into
# ellipticities. We do this for both the star shape
# itself and the PSF model.
kinds = [
('', 'measured_'),
('PSF', 'model_')
]
for in_name, out_name in kinds:
# Pulling out the correct moment columns
Ixx = data[f'Ixx{in_name}'][star]
Iyy = data[f'Iyy{in_name}'][star]
Ixy = data[f'Ixy{in_name}'][star]
# Conversion of moments to e1, e2
T = Ixx + Iyy
e1, e2 = moments_to_shear(Ixx, Iyy, Ixy)
# save to output
star_data[f'{out_name}e1'] = e1
star_data[f'{out_name}e2'] = e2
star_data[f'{out_name}T'] = T
return star_data
def moments_to_shear(Ixx, Iyy, Ixy):
b = Ixx + Iyy + 2 * np.sqrt(Ixx * Iyy - Ixy**2)
e1 = (Ixx - Iyy) / b
e2 = 2 * Ixy / b
return e1, e2
# response to an old Stack Overflow question of mine:
# https://stackoverflow.com/questions/33529057/indices-that-intersect-and-sort-two-numpy-arrays
def intersecting_indices(x, y):
u_x, u_idx_x = np.unique(x, return_index=True)
u_y, u_idx_y = np.unique(y, return_index=True)
i_xy = np.intersect1d(u_x, u_y, assume_unique=True)
i_idx_x = u_idx_x[np.in1d(u_x, i_xy, assume_unique=True)]
i_idx_y = u_idx_y[np.in1d(u_y, i_xy, assume_unique=True)]
return i_idx_x, i_idx_y
def h5py_shorten(group, name, n):
tmp_name = name + "_tmp_5kb04scgllj"
group[tmp_name] = group[name][:n]
del group[name]
group[name] = group[tmp_name]
del group[tmp_name]