Source code for txpipe.diagnostics

from .base_stage import PipelineStage
from .data_types import Directory, ShearCatalog, HDFFile, PNGFile, TomographyCatalog
from parallel_statistics import ParallelMeanVariance
from .utils.calibration_tools import calculate_selection_response, calculate_shear_response, apply_metacal_response, apply_lensfit_calibration, MeanShearInBins
from .utils.fitting import fit_straight_line
from .plotting import manual_step_histogram
import numpy as np

[docs]class TXSourceDiagnosticPlots(PipelineStage): """ """ name='TXSourceDiagnosticPlots' inputs = [ ('shear_catalog', ShearCatalog), ('shear_tomography_catalog', TomographyCatalog), ] outputs = [ ('g_psf_T', PNGFile), ('g_psf_g', PNGFile), ('g1_hist', PNGFile), ('g2_hist', PNGFile), ('g_snr', PNGFile), ('g_T', PNGFile), ('source_snr_hist', PNGFile), ('source_mag_hist', PNGFile), ('response_hist', PNGFile), ] config_options = { 'chunk_rows': 100000, 'delta_gamma': 0.02, 'shear_prefix':'mcal_', 'psf_prefix': 'mcal_psf_', 'T_min': 0.2, 'T_max': 0.28, 'bands': 'riz', } def run(self): # PSF tests import matplotlib matplotlib.use('agg') with self.open_input('shear_catalog', wrapper=True) as f: self.config['shear_catalog_type'] = f.catalog_type # Collect together all the methods on this class called self.plot_* # They are all expected to be python coroutines - generators that # use the yield feature to pause and wait for more input. # We instantiate them all here plotters = [getattr(self, f)() for f in dir(self) if f.startswith('plot_')] # Start off each of the plotters. This will make them all run up to the # first yield statement, then pause and wait for the first chunk of data for plotter in plotters: plotter.send(None) # Create an iterator for reading through the input data. # This method automatically splits up data among the processes, # so the plotters should handle this. chunk_rows = self.config['chunk_rows'] psf_prefix = self.config['psf_prefix'] shear_prefix = self.config['shear_prefix'] if self.config['shear_catalog_type']=='metacal': shear_cols = [f'{psf_prefix}g1', f'{psf_prefix}g2', f'{psf_prefix}T_mean', 'mcal_g1','mcal_g1_1p','mcal_g1_2p','mcal_g1_1m','mcal_g1_2m','mcal_g2','mcal_g2_1p','mcal_g2_2p','mcal_g2_1m','mcal_g2_2m','mcal_s2n','mcal_T', 'mcal_T_1p','mcal_T_2p','mcal_T_1m','mcal_T_2m','mcal_s2n_1p','mcal_s2n_2p','mcal_s2n_1m', 'mcal_s2n_2m', 'weight'] + [f'mcal_mag_{b}' for b in self.config['bands']] else: shear_cols = ['psf_g1','psf_g2','g1','g2','psf_T_mean','s2n','T','weight','m','sigma_e','c1','c2', ] + [f'{shear_prefix}mag_{b}' for b in self.config['bands']] shear_tomo_cols = ['source_bin'] if self.config['shear_catalog_type']=='metacal': more_iters = ['shear_tomography_catalog', 'metacal_response', ['R_gamma']] elif self.config['shear_catalog_type']=='lensfit': more_iters = [] else: more_iters = ['shear_tomography_catalog', 'response', ['R']] it = self.combined_iterators( chunk_rows, 'shear_catalog', 'shear', shear_cols, 'shear_tomography_catalog','tomography',shear_tomo_cols, *more_iters ) # Now loop through each chunk of input data, one at a time. # Each time we get a new segment of data, which goes to all the plotters for (start, end, data) in it: print(f"Read data {start} - {end}") # This causes each data = yield statement in each plotter to # be given this data chunk as the variable data. for plotter in plotters: plotter.send(data) # Tell all the plotters to finish, collect together results from the different # processors, and make their final plots. Plotters need to respond # to the None input and for plotter in plotters: try: plotter.send(None) except StopIteration: pass def plot_psf_shear(self): # mean shear in bins of PSF print("Making PSF shear plot") import matplotlib.pyplot as plt from scipy import stats from .utils.fitting import fit_straight_line delta_gamma = self.config['delta_gamma'] size = 5 gr = np.logspace(-3,-2, size//2) psf_g_edges = np.concatenate([-gr[::-1], gr]) print('psf_g_edges', psf_g_edges) psf_prefix = self.config['psf_prefix'] p1 = MeanShearInBins(f'{psf_prefix}g1', psf_g_edges, delta_gamma, cut_source_bin=True, shear_catalog_type=self.config['shear_catalog_type']) p2 = MeanShearInBins(f'{psf_prefix}g2', psf_g_edges, delta_gamma, cut_source_bin=True,shear_catalog_type=self.config['shear_catalog_type']) psf_g_mid = 0.5*(psf_g_edges[1:] + psf_g_edges[:-1]) while True: data = yield if data is None: break p1.add_data(data) p2.add_data(data) mu1, mean11, mean12, std11, std12 = p1.collect(self.comm) mu2, mean21, mean22, std21, std22 = p2.collect(self.comm) if self.rank != 0: return fig = self.open_output('g_psf_g', wrapper=True) #Include a small shift to be able to see the g1 / g2 points on the plot dx = 0.1*(mu1[1] - mu1[0]) slope11, intercept11, mc_cov = fit_straight_line(mu1, mean11, y_err=std11, nan_error=True, skip_nan=True) std_err11 = mc_cov[0,0]**0.5 line11 = slope11*(mu1)+intercept11 slope12, intercept12, mc_cov = fit_straight_line(mu1, mean12, y_err=std12, nan_error=True, skip_nan=True) std_err12 = mc_cov[0,0]**0.5 line12 = slope12*(mu1)+intercept12 slope21, intercept21, mc_cov = fit_straight_line(mu2, mean21, y_err=std21, nan_error=True, skip_nan=True) std_err21 = mc_cov[0,0]**0.5 line21 = slope21*(mu2)+intercept21 slope22, intercept22, mc_cov = fit_straight_line(mu2, mean22, y_err=std22, nan_error=True, skip_nan=True) std_err22 = mc_cov[0,0]**0.5 line22 = slope22*(mu2)+intercept22 plt.subplot(2,1,1) plt.plot(mu1,line11,color='red',label=r"$m=%.4f \pm %.4f$" %(slope11, std_err11)) plt.plot(mu1,[0]*len(line11),color='black') plt.plot(mu1,line12,color='blue',label=r"$m=%.4f \pm %.4f$" %(slope12, std_err12)) plt.plot(mu1,[0]*len(line12),color='black') plt.errorbar(mu1+dx, mean11, std11, label='g1', fmt='+',color='red') plt.errorbar(mu1-dx, mean12, std12, label='g2', fmt='+',color='blue') plt.xlabel("PSF g1") plt.ylabel("Mean g") plt.legend() plt.subplot(2,1,2) plt.plot(mu2,line21,color='red',label=r"$m=%.4f \pm %.4f$" %(slope21, std_err21)) plt.plot(mu2,[0]*len(line21),color='black') plt.plot(mu2,line22,color='blue',label=r"$m=%.4f \pm %.4f$" %(slope22, std_err22)) plt.plot(mu2,[0]*len(line22),color='black') plt.errorbar(mu2+dx, mean21, std21, label='g1', fmt='+',color='red') plt.errorbar(mu2-dx, mean22, std22, label='g2', fmt='+',color='blue') plt.xlabel("PSF g2") plt.ylabel("Mean g") plt.legend() plt.tight_layout() # This also saves the figure fig.close() def plot_psf_size_shear(self): # mean shear in bins of PSF print("Making PSF size plot") import matplotlib.pyplot as plt from scipy import stats psf_prefix = self.config['psf_prefix'] delta_gamma = self.config['delta_gamma'] size = 5 T_min = self.config['T_min'] T_max = self.config['T_max'] psf_T_edges = np.linspace(T_min, T_max, size+1) binnedShear = MeanShearInBins(f'{psf_prefix}T_mean', psf_T_edges, delta_gamma, cut_source_bin=True, shear_catalog_type=self.config['shear_catalog_type']) while True: data = yield if data is None: break binnedShear.add_data(data) mu, mean1, mean2, std1, std2 = binnedShear.collect(self.comm) if self.rank != 0: return w = (mu!=0) & np.isfinite(std1) mu = mu[w] mean1 = mean1[w] mean2 = mean2[w] std1 = std1[w] std2 = std2[w] dx = 0.05*(psf_T_edges[1] - psf_T_edges[0]) slope1, intercept1, cov1 = fit_straight_line(mu, mean1, std1, skip_nan=True, nan_error=True) std_err1 = cov1[0,0]**0.5 line1 = slope1*mu + intercept1 slope2, intercept2, cov2 = fit_straight_line(mu, mean2, std2, skip_nan=True, nan_error=True) std_err2 = cov2[0,0]**0.5 line2 = slope2*mu + intercept2 fig = self.open_output('g_psf_T', wrapper=True) plt.plot(mu,line1,color='red',label=r"$m=%.4f \pm %.4f$" %(slope1, std_err1)) plt.plot(mu,[0]*len(mu),color='black') plt.errorbar(mu+dx, mean1, std1, label='g1', fmt='+',color='red') plt.legend(loc='best') plt.plot(mu-dx,line2,color='blue',label=r"$m=%.4f \pm %.4f$" %(slope2, std_err2)) plt.plot(mu,[0]*len(mu),color='black') plt.errorbar(mu-dx, mean2, std2, label='g2', fmt='+',color='blue') plt.xlabel("PSF T") plt.ylabel("Mean g") #plt.ylim(-0.0015,0.0015) plt.legend(loc='best') plt.tight_layout() fig.close() def plot_snr_shear(self): # mean shear in bins of snr print("Making mean shear SNR plot") import matplotlib.pyplot as plt from scipy import stats # Parameters of the binning in SNR size = 5 delta_gamma = self.config['delta_gamma'] shear_prefix = self.config['shear_prefix'] snr_edges = np.logspace(.1,2.5,size+1) # This class includes all the cutting and calibration, both for # estimator and selection biases binnedShear = MeanShearInBins(f'{shear_prefix}s2n', snr_edges, delta_gamma, cut_source_bin=True, shear_catalog_type=self.config['shear_catalog_type']) while True: # This happens when we have loaded a new data chunk data = yield # Indicates the end of the data stream if data is None: break binnedShear.add_data(data) mu, mean1, mean2, std1, std2 = binnedShear.collect(self.comm) if self.rank != 0: return # Get the error on the mean dx = 0.05*(snr_edges[1] - snr_edges[0]) good = (mu>0) & (np.isfinite(mean1)) slope1, intercept1, r_value1, p_value1, std_err1 = stats.linregress(np.log10(mu[good]),mean1[good]) line1 = slope1*(np.log10(mu))+intercept1 good = (mu>0) & (np.isfinite(mean2)) slope2, intercept2, r_value2, p_value2, std_err2 = stats.linregress(np.log10(mu[good]),mean2[good]) line2 = slope2*(np.log10(mu))+intercept2 fig = self.open_output('g_snr', wrapper=True) plt.plot(mu,line1,color='red',label=r"$m=%.4f \pm %.4f$" %(slope1, std_err1)) plt.plot(mu,[0]*len(mu),color='black') plt.errorbar(mu+dx, mean1, std1, label='g1', fmt='+',color='red') plt.plot(mu,line2,color='blue',label=r"$m=%.4f \pm %.4f$" %(slope2, std_err2)) plt.plot(mu,[0]*len(mu-dx),color='black') plt.xscale('log') # plt.ylim(-0.0015,0.0015) plt.errorbar(mu-dx, mean2, std2, label='g2', fmt='+',color='blue') plt.xlabel("SNR") plt.ylabel("Mean g") plt.legend() plt.tight_layout() fig.close() def plot_size_shear(self): # mean shear in bins of galaxy size print("Making mean shear galaxy size plot") import matplotlib.pyplot as plt from scipy import stats delta_gamma = self.config['delta_gamma'] psf_prefix = self.config['psf_prefix'] size = 5 T_edges = np.linspace(0.1,2.1,size+1) shear_prefix = self.config['shear_prefix'] binnedShear = MeanShearInBins(f'{shear_prefix}T', T_edges, delta_gamma, cut_source_bin=True, shear_catalog_type=self.config['shear_catalog_type']) while True: # This happens when we have loaded a new data chunk data = yield # Indicates the end of the data stream if data is None: break binnedShear.add_data(data) mu, mean1, mean2, std1, std2 = binnedShear.collect(self.comm) if self.rank != 0: return dx = 0.05*(T_edges[1] - T_edges[0]) slope1, intercept1, r_value1, p_value1, std_err1 = stats.linregress(np.log10(mu),mean1) line1 = slope1*(np.log10(mu))+intercept1 slope2, intercept2, r_value2, p_value2, std_err2 = stats.linregress(np.log10(mu),mean2) line2 = slope2*(np.log10(mu))+intercept2 fig = self.open_output('g_T', wrapper=True) plt.plot(mu,line1,color='red',label=r"$m=%.4f \pm %.4f$" %(slope1, std_err1)) plt.plot(mu,[0]*len(mu),color='black') plt.errorbar(mu+dx, mean1, std1, label='g1', fmt='+',color='red') plt.plot(mu,line2,color='blue',label=r"$m=%.4f \pm %.4f$" %(slope2, std_err2)) plt.plot(mu,[0]*len(mu),color='black') plt.errorbar(mu-dx, mean2, std2, label='g2', fmt='+',color='blue') #plt.ylim(-0.0015,0.0015) plt.xscale('log') plt.xlabel("galaxy size T") plt.ylabel("Mean g") plt.legend() plt.tight_layout() fig.close() def plot_g_histogram(self): print('plotting histogram') import matplotlib.pyplot as plt from scipy import stats delta_gamma = self.config['delta_gamma'] bins = 10 edges = np.linspace(-1, 1, bins+1) mids = 0.5*(edges[1:] + edges[:-1]) calc1 = ParallelMeanVariance(bins) calc2 = ParallelMeanVariance(bins) while True: data = yield if data is None: break qual_cut = data['source_bin'] !=-1 # qual_cut |= data['lens_bin'] !=-1 if self.config['shear_catalog_type']=='metacal': b1 = np.digitize(data['mcal_g1'][qual_cut], edges) - 1 b1_1p = np.digitize(data['mcal_g1_1p'][qual_cut], edges) - 1 b1_2p = np.digitize(data['mcal_g1_2p'][qual_cut], edges) - 1 b1_1m = np.digitize(data['mcal_g1_1m'][qual_cut], edges) - 1 b1_2m = np.digitize(data['mcal_g1_2m'][qual_cut], edges) - 1 else: b1 = np.digitize(data['g1'][qual_cut], edges) - 1 if self.config['shear_catalog_type']=='metacal': b2 = np.digitize(data['mcal_g2'][qual_cut], edges) - 1 b2_1p = np.digitize(data['mcal_g2_1p'][qual_cut], edges) - 1 b2_2p = np.digitize(data['mcal_g2_2p'][qual_cut], edges) - 1 b2_1m = np.digitize(data['mcal_g2_1m'][qual_cut], edges) - 1 b2_2m = np.digitize(data['mcal_g2_2m'][qual_cut], edges) - 1 else: b2 = np.digitize(data['g2'][qual_cut], edges) - 1 for i in range(bins): w1 = np.where(b1==i) if self.config['shear_catalog_type']=='metacal': w1_1p = np.where(b1_1p==i) w1_2p = np.where(b1_2p==i) w1_1m = np.where(b1_1m==i) w1_2m = np.where(b1_2m==i) S = calculate_selection_response(data['mcal_g1'][qual_cut], data['mcal_g2'][qual_cut], w1_1p, w1_2p,w1_1m, w1_2m, delta_gamma) R = calculate_shear_response(data['mcal_g1_1p'][qual_cut],data['mcal_g1_2p'][qual_cut],data['mcal_g1_1m'][qual_cut],data['mcal_g1_2m'][qual_cut], data['mcal_g2_1p'][qual_cut],data['mcal_g2_2p'][qual_cut],data['mcal_g2_1m'][qual_cut],data['mcal_g2_2m'][qual_cut],delta_gamma) g1, g2 = apply_metacal_response(R, S, data['mcal_g1'][qual_cut][w1], data['mcal_g2'][qual_cut][w1]) elif self.config['shear_catalog_type']=='lensfit': g1, g2, weight, one_plus_K = apply_lensfit_calibration(data['g1'][qual_cut][w1], data['g2'][qual_cut][w1],data['weight'][qual_cut][w1]) else: raise ValueError(f"Please specify metacal, lensfit or hsc for shear_catalog in config.") # Do more things here to establish calc1.add_data(i, g1) w2 = np.where(b2==i) if self.config['shear_catalog_type']=='metacal': w2_1p = np.where(b2_1p==i) w2_2p = np.where(b2_2p==i) w2_1m = np.where(b2_1m==i) w2_2m = np.where(b2_2m==i) S = calculate_selection_response(data['mcal_g1'][qual_cut], data['mcal_g2'][qual_cut], w1_1p, w1_2p,w1_1m, w1_2m, delta_gamma) R = calculate_shear_response(data['mcal_g1_1p'][qual_cut],data['mcal_g1_2p'][qual_cut],data['mcal_g1_1m'][qual_cut],data['mcal_g1_2m'][qual_cut], data['mcal_g2_1p'][qual_cut],data['mcal_g2_2p'][qual_cut],data['mcal_g2_1m'][qual_cut],data['mcal_g2_2m'][qual_cut],delta_gamma) g1, g2 = apply_metacal_response(R, S, data['mcal_g1'][qual_cut][w1], data['mcal_g2'][qual_cut][w1]) elif self.config['shear_catalog_type']=='lensfit': g1, g2, weight, one_plus_K = apply_lensfit_calibration(data['g1'][qual_cut][w1], data['g2'][qual_cut][w1],data['weight'][qual_cut][w1]) else: raise ValueError(f"Please specify metacal, lensfit or hsc for shear_catalog in config.") calc2.add_data(i, g2) count1, mean1, var1 = calc1.collect(self.comm, mode='gather') count2, mean2, var2 = calc2.collect(self.comm, mode='gather') if self.rank != 0: return std1 = np.sqrt(var1/count1) std2 = np.sqrt(var2/count2) fig = self.open_output('g1_hist', wrapper=True) plt.bar(mids, count1, width=edges[1]-edges[0],edgecolor='black',align='center',color='blue') plt.xlabel("g1") plt.ylabel(r'$N_{galaxies}$') plt.ylim(0,1.1*max(count1)) fig.close() fig = self.open_output('g2_hist', wrapper=True) plt.bar(mids, count2, width=edges[1]-edges[0], align='center',edgecolor='black',color='purple') plt.ticklabel_format(style='sci', axis='y', scilimits=(0,0)) plt.xlabel("g2") plt.ylabel(r'$N_{galaxies}$') plt.ylim(0,1.1*max(count2)) fig.close() def plot_snr_histogram(self): print('plotting snr histogram') import matplotlib.pyplot as plt delta_gamma = self.config['delta_gamma'] shear_prefix = self.config['shear_prefix'] bins = 10 edges = np.logspace(1, 3, bins+1) mids = 0.5*(edges[1:] + edges[:-1]) calc1 = ParallelMeanVariance(bins) while True: data = yield if data is None: break qual_cut = data['source_bin'] !=-1 # qual_cut |= data['lens_bin'] !=-1 b1 = np.digitize(data[f'{shear_prefix}s2n'][qual_cut], edges) - 1 for i in range(bins): w = np.where(b1==i) # Do more things here to establish calc1.add_data(i, data[f'{shear_prefix}s2n'][qual_cut][w]) count1, mean1, var1 = calc1.collect(self.comm, mode='gather') if self.rank != 0: return std1 = np.sqrt(var1/count1) fig = self.open_output('source_snr_hist', wrapper=True) plt.bar(mids, count1, width=edges[1:]-edges[:-1],edgecolor='black',align='center',color='blue') plt.ticklabel_format(style='sci', axis='y', scilimits=(0,0)) plt.xscale('log') plt.xlabel("log(snr)") plt.ylabel(r'$N_{galaxies}$') plt.ylim(0,1.1*max(count1)) fig.close() def plot_response_histograms(self): import matplotlib.pyplot as plt if self.comm: import mpi4py.MPI size = 10 # This seems to be a reasonable range, though there are samples # with extremely high values edges = np.linspace(-3, 3, size+1) mid = 0.5*(edges[1:] + edges[:-1]) width = edges[1] - edges[0] if self.config['shear_catalog_type']=='metacal': # count of objects counts = np.zeros((2,2,size)) # make a separate histogram of the shear-sample-selected # objects counts_s = np.zeros((2,2,size)) else: # count of objects counts = np.zeros((size)) # make a separate histogram of the shear-sample-selected # objects counts_s = np.zeros((size)) while True: data = yield if data is None: break # check if selected for any source bin in_shear_sample = data['source_bin'] !=-1 if self.config['shear_catalog_type']=='metacal': B = np.digitize(data['R_gamma'], edges) - 1 # loop through this chunk of data. for s, b in zip(in_shear_sample, B): # for each element in the 2x2 matrix for i in range(2): for j in range(2): bij = b[i, j] # this will naturally filter out # the nans if (bij >= 0) and (bij < size): counts[i, j, bij] += 1 if s: counts_s[i, j, bij] += 1 elif self.config['shear_catalog_type']=='lensfit': B = np.digitize(data['m'], edges) - 1 # loop through this chunk of data. for s, b in zip(in_shear_sample, B): if (b >= 0) and (b < size): counts[b] +=1 if s: counts_s[b] += 1 else: B = np.digitize(data['R'], edges) - 1 # loop through this chunk of data. for s, b in zip(in_shear_sample, B): if (b >= 0) and (b < size): counts[b] +=1 if s: counts_s[b] += 1 # Sum from all processors and then non-root ones return if self.comm is not None: if self.rank == 0: self.comm.Reduce(mpi4py.MPI.IN_PLACE, counts) self.comm.Reduce(mpi4py.MPI.IN_PLACE, counts_s) else: self.comm.Reduce(counts, None) self.comm.Reduce(counts_s, None) # only root process makes plots return fig = self.open_output('response_hist', wrapper=True, figsize=(10, 5)) plt.subplot(1,2,1) if self.config['shear_catalog_type']=='metacal': manual_step_histogram(edges, counts[0, 0], label='R00', color='#1f77b4') manual_step_histogram(edges, counts[1, 1], label='R11', color='#ff7f0e') manual_step_histogram(edges, counts[0, 1], label='R01', color='#2ca02c') manual_step_histogram(edges, counts[1, 0], label='R10', color='#d62728') else: manual_step_histogram(edges, counts, label='R', color='#1f77b4') plt.ylim(0, counts.max()*1.1) plt.xlabel("Response") plt.ylabel("Count") plt.title("All flag=0") plt.subplot(1,2,2) if self.config['shear_catalog_type']=='metacal': manual_step_histogram(edges, counts_s[0, 0], label='R00', color='#1f77b4') manual_step_histogram(edges, counts_s[1, 1], label='R11', color='#ff7f0e') manual_step_histogram(edges, counts_s[0, 1], label='R01', color='#2ca02c') manual_step_histogram(edges, counts_s[1, 0], label='R10', color='#d62728') else: manual_step_histogram(edges, counts_s, label='R', color='#1f77b4') plt.ylim(0, counts_s.max()*1.1) plt.xlabel("R_gamma") plt.ylabel("Count") plt.title("Source sample") plt.legend() plt.tight_layout() fig.close() def plot_mag_histograms(self): if self.comm: import mpi4py.MPI # mean shear in bins of PSF print("Making mag histogram") prefix = self.config['shear_prefix'] import matplotlib.pyplot as plt size = 10 mag_min = 20 mag_max = 30 edges = np.linspace(mag_min, mag_max, size+1) mid = 0.5*(edges[1:] + edges[:-1]) width = edges[1] - edges[0] bands = self.config['bands'] shear_prefix = self.config['shear_prefix'] nband = len(bands) full_hists = [np.zeros(size, dtype=int) for b in bands] source_hists = [np.zeros(size, dtype=int) for b in bands] while True: data = yield if data is None: break for (b, h1,h2) in zip(bands, full_hists, source_hists): b1 = np.digitize(data[f'{shear_prefix}mag_{b}'], edges) - 1 for i in range(size): w = b1==i count = w.sum() h1[i] += count w &= (data['source_bin']>=0) count = w.sum() h2[i] += count if self.comm is not None: full_hists = reduce(self.comm, full_hists) source_hists = reduce(self.comm, source_hists) if self.rank == 0: fig = self.open_output('source_mag_hist', wrapper=True, figsize=(4,nband*3)) for i, (b,h1,h2) in enumerate(zip(bands, full_hists, source_hists)): plt.subplot(nband, 1, i+1) plt.bar(mid, h1, width=width, fill=False, label='Complete', edgecolor='r') plt.bar(mid, h2, width=width, fill=True, label='WL Source', color='g') plt.xlabel(f"Mag {b}") plt.ylabel("N") if i==0: plt.legend() plt.tight_layout() fig.close()
[docs]class TXLensDiagnosticPlots(PipelineStage): """ """ name='TXLensDiagnosticPlots' # This tells ceci to launch under dask: dask_parallel = True inputs = [ ('photometry_catalog', HDFFile), ('lens_tomography_catalog', TomographyCatalog), ] outputs = [ ('lens_snr_hist', PNGFile), ('lens_mag_hist', PNGFile), ] config_options = { 'block_size': 0, 'delta_gamma': 0.02, 'mag_min': 18, 'mag_max': 28, 'snr_min': 5, 'snr_max': 200, 'bands': 'ugrizy', } def run(self): # PSF tests import matplotlib matplotlib.use('agg') files, data, nbin = self.load_data() self.plot_snr_histograms(data, nbin) self.plot_mag_histograms(data, nbin) # These need to stay open until the end for f in files: f.close() def load_data(self): import dask.array as da bands = self.config['bands'] # These need to stay open until dask has finished with them.: f = self.open_input("photometry_catalog") g = self.open_input("lens_tomography_catalog") # read nbin from metadata nbin = g['tomography'].attrs['nbin_lens'] # Default to the automatic value but expose as an option block = self.config['block_size'] if block == 0: block = 'auto' # Load data columns, lazily for dask data = {} for b in bands: data[f'mag_{b}'] = da.from_array(f[f'photometry/mag_{b}'], block) data[f'snr_{b}'] = da.from_array(f[f'photometry/snr_{b}'], block) data['bin'] = da.from_array(g['tomography/lens_bin'], block) # Avoid recomputing selections in each histogram by doing it externally here data['sel'] = da.nonzero(data['bin'] >= 0) for i in range(nbin): data[f'sel{i}'] = da.nonzero(data['bin'] == i) # Return the open files so they stay open until dask has finished with them. # and also the dict of lazy columns and the bin info return [f, g], data, nbin def plot_snr_histograms(self, data, nbin): smin = self.config['snr_min'] smax = self.config['snr_max'] bins = np.geomspace(smin, smax, 50) xlog = True self.plot_histograms(data, nbin, 'snr', xlog, bins) def plot_mag_histograms(self, data, nbin): # Histogram ranges are read from configuration choices mmin = self.config['mag_min'] mmax = self.config['mag_max'] # let's do this in 0.5 mag bins bins = np.arange(mmin, mmax+1e-6, 0.5) xlog = False self.plot_histograms(data, nbin, 'mag', xlog, bins) def plot_histograms(self, data, nbin, name, xlog, bins): import dask import dask.array as da import matplotlib.pyplot as plt bands = self.config['bands'] nband = len(bands) hists = {} # Do a different set of histograms for each band for i,b in enumerate(bands): sel = data['sel'] # first do the global non-tomographic version (all selected objects) hists[b, -1] = da.histogram(data[f'{name}_{b}'][sel], bins=bins) # and also loop through tomo bins for j in range(nbin): sel = data[f'sel{j}'] hists[b, j] = da.histogram(data[f'{name}_{b}'][sel], bins=bins) # Launch actual dask computations so we have the data ready to plot # Doing these all at once seems to be faster print(f"Beginning {name} histogram compute") hists, = dask.compute(hists) print("Done") # Now make all the panels. The open_output method returns an object that # can be used as a context manager and will be saved automatically at the end to the # right location figsize = (4*nband, 4*(nbin+1)) with self.open_output(f'lens_{name}_hist', wrapper=True, figsize=figsize) as fig: axes = fig.file.subplots(nbin+1, nband, squeeze=False) for i,b in enumerate(bands): for j in range(-1, nbin): bj = j if j >= 0 else '2D' heights, edges = hists[b, j] ax = axes[j+1, i] # This is my function to plot a line histogram from pre-computed edges and heights manual_step_histogram(edges, heights, ax=ax) if xlog: ax.set_xscale('log') ax.set_xlabel(f"Bin {bj} {b}-{name}") axes[0, 0].set_ylabel("Count") fig.file.tight_layout()
def reduce(comm, H): H2 = [] rank = comm.Get_rank() for h in H: if rank == 0: hsum = np.zeros_like(h) else: hsum = None comm.Reduce(h, hsum) H2.append(hsum) return H2