Source code for txpipe.metadata
import numpy as np
import yaml
from .base_stage import PipelineStage
from .data_types import TomographyCatalog, MapsFile, HDFFile, YamlFile, ShearCatalog
from .utils.calibration_tools import read_shear_catalog_type
from .utils import choose_pixelization
[docs]class TXTracerMetadata(PipelineStage):
"""
This stage doesn't actually calculate anything, it just
collates together metadata about our sources, so that we
don't need to pass around catalog sized objects as much.
"""
name = "TXTracerMetadata"
inputs = [
("shear_catalog", ShearCatalog),
("shear_tomography_catalog", TomographyCatalog),
("lens_tomography_catalog", TomographyCatalog),
("mask", MapsFile),
]
outputs = [
("tracer_metadata", HDFFile),
("tracer_metadata_yml", YamlFile), # human-readable version
]
def run(self):
# Read the area
area = self.read_area()
shear_catalog_type = read_shear_catalog_type(self)
area_sq_arcmin = area * 60 ** 2
shear_tomo_file = self.open_input("shear_tomography_catalog")
lens_tomo_file = self.open_input("lens_tomography_catalog")
meta_file = self.open_output("tracer_metadata")
metadata = {}
def copy(tomo, in_section, out_section, name):
x = tomo[f"{in_section}/{name}"][:]
meta_file.create_dataset(f"{out_section}/{name}", data=x)
metadata[name] = x.tolist()
def copy_attrs(tomo, name, out_name):
for k, v in tomo[name].attrs.items():
meta_file[out_name].attrs[k] = v
if isinstance(v, np.ndarray):
v = v.tolist()
elif isinstance(v, np.float64):
v = float(v)
elif isinstance(v, np.int64):
v = int(v)
metadata[k] = v
if shear_catalog_type == "metacal":
copy(shear_tomo_file, "metacal_response", "tracers", "R_gamma_mean")
copy(shear_tomo_file, "metacal_response", "tracers", "R_S")
copy(shear_tomo_file, "metacal_response", "tracers", "R_total")
copy(shear_tomo_file, "metacal_response", "tracers", "R_gamma_mean_2d")
copy(shear_tomo_file, "metacal_response", "tracers", "R_S_2d")
copy(shear_tomo_file, "metacal_response", "tracers", "R_total_2d")
elif shear_catalog_type == 'lensfit':
copy(shear_tomo_file, "response", "tracers", "K")
copy(shear_tomo_file, "response", "tracers", "C")
copy(shear_tomo_file, "response", "tracers", "K_2d")
copy(shear_tomo_file, "response", "tracers", "C_2d")
elif shear_catalog_type == 'hsc':
copy(shear_tomo_file, "response", "tracers", "R")
copy(shear_tomo_file, "response", "tracers", "K")
copy(shear_tomo_file, "response", "tracers", "R_mean_2d")
copy(shear_tomo_file, "response", "tracers", "K_2d")
copy(shear_tomo_file, "tomography", "tracers", "N_eff")
copy(lens_tomo_file, "tomography", "tracers", "lens_counts")
copy(shear_tomo_file, "tomography", "tracers", "sigma_e")
copy(shear_tomo_file, "tomography", "tracers", "mean_e1")
copy(shear_tomo_file, "tomography", "tracers", "mean_e2")
copy(shear_tomo_file, "tomography", "tracers", "source_counts")
copy(shear_tomo_file, "tomography", "tracers", "N_eff_2d")
copy(lens_tomo_file, "tomography", "tracers", "lens_counts_2d")
copy(shear_tomo_file, "tomography", "tracers", "sigma_e_2d")
copy(shear_tomo_file, "tomography", "tracers", "mean_e1_2d")
copy(shear_tomo_file, "tomography", "tracers", "mean_e2_2d")
copy(shear_tomo_file, "tomography", "tracers", "source_counts_2d")
N_eff = shear_tomo_file["tomography/N_eff"][:]
N_eff_2d = shear_tomo_file["tomography/N_eff_2d"][:]
n_eff = N_eff / area_sq_arcmin
n_eff_2d = N_eff_2d / area_sq_arcmin
lens_counts = lens_tomo_file["tomography/lens_counts"][:]
source_counts = shear_tomo_file["tomography/source_counts"][:]
lens_counts_2d = lens_tomo_file["tomography/lens_counts_2d"][:]
source_counts_2d = shear_tomo_file["tomography/source_counts_2d"][:]
lens_density = lens_counts / area_sq_arcmin
source_density = source_counts / area_sq_arcmin
lens_density_2d = lens_counts_2d / area_sq_arcmin
source_density_2d = source_counts_2d / area_sq_arcmin
meta_file.create_dataset("tracers/n_eff", data=n_eff)
meta_file.create_dataset("tracers/lens_density", data=lens_density)
meta_file.create_dataset("tracers/source_density", data=source_density)
meta_file.create_dataset("tracers/lens_density_2d", data=lens_density_2d)
meta_file.create_dataset("tracers/source_density_2d", data=source_density_2d)
meta_file["tracers"].attrs["area"] = area
meta_file["tracers"].attrs["area_unit"] = "deg^2"
meta_file["tracers"].attrs["density_unit"] = "arcmin^{-2}"
metadata['n_eff'] = n_eff.tolist()
metadata['n_eff_2d'] = n_eff_2d.tolist()
metadata['lens_density'] = lens_density.tolist()
metadata['source_density'] = source_density.tolist()
metadata['lens_density_2d'] = lens_density_2d.tolist()
metadata['source_density_2d'] = source_density_2d.tolist()
metadata['area'] = area
metadata['area_unit'] = "deg^2"
metadata['density_unit'] = "arcmin^{-2}"
copy_attrs(shear_tomo_file, "tomography", "tracers")
copy_attrs(lens_tomo_file, "tomography", "tracers")
meta_file.close()
yaml_out = self.open_output("tracer_metadata_yml", wrapper=True)
yaml_out.write(metadata)
yaml_out.close()
shear_tomo_file.close()
lens_tomo_file.close()
def read_area(self):
with self.open_input("mask", wrapper=True) as f:
m = f.read_map("mask")
pixel_scheme = choose_pixelization(**f.read_map_info("mask"))
num_hit = (m > 0).sum()
area_sq_deg = pixel_scheme.pixel_area(degrees=True) * num_hit
f_sky = float(area_sq_deg) / 41252.96125
print(f"Area = {area_sq_deg:.2f} deg^2")
return float(area_sq_deg)