from .base_stage import PipelineStage
from .data_types import SACCFile
import numpy as np
import warnings
import os
"""
.. module:: txblinding
"""
[docs]class TXBlinding(PipelineStage):
"""
Blinding the data vectors.
"""
name='TXBlinding'
inputs = [
('twopoint_data_real_raw', SACCFile),
]
outputs = [
('twopoint_data_real', SACCFile),
]
config_options = {
'seed': 1972, ## seed uniquely specifies the shift in parameters
'Omega_b': [0.0485, 0.001], ## fiducial_model_value, shift_sigma
'Omega_c': [0.2545, 0.01],
'w0': [-1.0, 0.1],
'h': [0.682, 0.02],
'sigma8': [0.801, 0.01],
'n_s': [0.971, 0.03],
'b0': 0.95, ### we assume bias to be of the form b0/growth
'delete_unblinded': False,
}
[docs] def run(self):
"""
Run the analysis for this stage.
- Load two point SACC file
- Blinding it
- Output blinded data
- Optionally deletete unblinded data
"""
import pyccl
import firecrown
import sacc
unblinded_fname = self.get_input('twopoint_data_real_raw')
# We may have blinded already. The error message would be quite
# obscure, so check here and say something more straightforward.
size = os.stat(unblinded_fname).st_size
if size == 0:
raise ValueError("The raw 2point file you specified has zero size. "
"Maybe you already run the blinding stage? "
"It clears the input file to enforce blindness.")
# Load
sack = sacc.Sacc.load_fits(unblinded_fname)
# Blind
blinded_sack = self.blind_muir(sack)
# Save
blinded_sack.save_fits(self.get_output('twopoint_data_real'), overwrite=True)
# Optionally make sure we stay blind by deleting the pre-blinding
# file.
if self.config['delete_unblinded']:
print(f"Replacing {unblinded_fname} with empty file.")
open(unblinded_fname,'w').close()
def blind_muir(self, sack):
# Get the parameters, fiducial and offset.
# The signature is a short sequence that means we can
# check that it is unchanged
signature, fid_params, offset_params = self.get_parameters()
# Save the signature into the output
sack.metadata['blinding_signature'] = signature
# Turn the b0 parameter into the b(z) per lens bin.
bias = self.compute_bias(sack, fid_params)
# Get the configuration dictionary
firecrown_config, types = self.make_firecrown_config(sack, bias)
## now try to get predictions
print("Computing fiducial theory")
fid_theory = self.compute_theory_vector(firecrown_config,
fid_params, types)
print("Computing offset theory")
offset_theory = self.compute_theory_vector(firecrown_config,
offset_params,types)
# Get the parameter shift vector
diff_vec = offset_theory - fid_theory
# And add this to the original data.
for p, delta in zip(sack.data, diff_vec):
p.value += delta
print(f"Congratulations you are now blind.")
return sack
def get_parameters(self):
seed = self.config["seed"]
print(f"Blinding with seed {seed}")
# This is the legacy random number generator
# which will always produce the same values
rng = np.random.RandomState(seed=seed)
# blind signature -- this ensures seed is consistent across
# numpy versions. We do this before and after
signature_bytes = rng.bytes(4)
signature = ''.join(format(x, '02x') for x in signature_bytes)
if self.rank==0:
print(f"Blinding signature: {signature}")
# Pull out the fiducial parameters from the config
fid_params = {p: self.config[p][0] for p in
['Omega_b', 'Omega_c', 'h', 'w0', 'sigma8', 'n_s']
}
# Get the parameters in the offset space
offset_params = fid_params.copy()
# This should have a standard order since python dictionaries
# now maintain their order.
for par in fid_params.keys():
offset_params[par] += self.config[par][1] * rng.normal(0., 1.)
return signature, fid_params, offset_params
def compute_bias(self, sack, fid_params):
import pyccl as ccl
print("Computing bias values b(z)")
# now get bias as a function of redshift for each lens bin
bias = {}
fid_cosmo = ccl.Cosmology(**fid_params)
b0 = self.config['b0']
for key, tracer in sack.tracers.items():
if 'lens' in key:
z_eff = (tracer.z*tracer.nz).sum() / tracer.nz.sum()
a_eff = 1 / (1 + z_eff)
bias[key] = b0 / ccl.growth_factor(fid_cosmo, a_eff)
return bias
def make_firecrown_config(self, sack, bias):
from sacc.utils import unique_list
# We will run firecrown with the old and new parameters,
# but otherwise the same configuration
firecrown_config = {
'parameters': {
'Omega_k': 0.0,
'wa': 0.0,
'one': 1
},
'two_point': {
'module': 'firecrown.ccl.two_point',
'sacc_data': sack,
'systematics': {
'dummy': {
'kind': 'PhotoZShiftBias',
'delta_z': 'one'
}
},
'sources': {},
'statistics': {}
}
}
for k, v in bias.items():
firecrown_config['parameters'][f'bias_{k}'] = v
# Tell FireCrown to use the sources we have here, and
# their types.
firecrown_sources = firecrown_config['two_point']['sources']
for key, tracer in sack.tracers.items():
# Assume the word source or lens will be in the name.
# Bit of a hack.
if 'source' in key:
firecrown_sources[key] = {
'kind' : 'WLSource',
'sacc_tracer' : key
}
if 'lens' in key:
firecrown_sources[key] = {
'kind' : 'NumberCountsSource',
'bias' : f'bias_{key}',
'sacc_tracer' : key
}
## Make a list of the unique tracer/bin groups
types = []
for p in sack.data:
name = f"{p.data_type}_{p.tracers[0]}_{p.tracers[1]}"
types.append((p.data_type, p.tracers, name))
types = unique_list(types)
# Collect together the statistics we will need
firecrown_stats = firecrown_config['two_point']['statistics']
for dtype, (tracer1,tracer2), name in types:
firecrown_stats[name] = {
'sources' : [tracer1, tracer2],
'sacc_data_type' : dtype
}
return firecrown_config, types
def compute_theory_vector(self, config, params, types):
import firecrown
# Set up the firecrown configuration
config['parameters'].update(params)
config2, data = firecrown.parse(config)
# Run the cosmology. The loglike also as a by-product
# fills in the predictions deep inside the data dictionary.
cosmo = firecrown.get_ccl_cosmology(config2['parameters'])
firecrown.compute_loglike(cosmo=cosmo, data=data)
# Pull out and stack the theory predictions at this point
results = data['two_point']['data']['statistics']
return np.hstack(
[results[stat_name].predicted_statistic_
for (_, _, stat_name) in types]
)
[docs]class TXNullBlinding(PipelineStage):
"""
A null stage to trivially copy the real raw data to real without blinding,
for use with simulations data.
"""
name='TXNullBlinding'
inputs = [
('twopoint_data_real_raw', SACCFile),
]
outputs = [
('twopoint_data_real', SACCFile),
]
config_options = {
}
[docs] def run(self):
"""
Run the analysis for this stage.
- Load two point SACC file
- Copy two point SACC file twopoint_data_real_raw to twopoint_data_real
"""
import shutil
unblinded_fname = self.get_input('twopoint_data_real_raw')
shutil.copyfile(unblinded_fname, self.get_output('twopoint_data_real'))