Source code for snat_sim.cli

"""The `cli` module defines a command line interface for interacting with
the parent package.

Module Docs

import argparse
from pathlib import Path
from typing import Dict, Tuple, Union

from pwv_kpno.gps_pwv import GPSReceiver

from . import models
from .pipeline import FittingPipeline
from . import __version__

SALT2_PARAMS = ('z', 't0', 'x0', 'x1', 'c')
SUOMINET_VALUES = ('PWV', 'SrfcPress', 'SrfcTemp', 'SrfcRH', 'ZenithDelay')
AtmModels = Union[models.StaticPWVTrans, models.VariablePWVTrans, models.SeasonalPWVTrans]

[docs]class AdvancedNamespace(argparse.Namespace): """Represents parsed command line arguments cast into friendly object types""" def _create_pwv_effect(self, pwv_variability) -> AtmModels: """Create a PWV transmission effect for use with supernova models Note: ``pwv_variability`` should be a string! If ``pwv_variability`` represents a numerical value, return a ``StaticPWVTrans`` object set to the given PWV concentration (in mm). If ``pwv_variability`` equals ``epoch``, return a ``VariablePWVTrans`` object. If `pwv_variability`` equals ``seasonal``, return a ``SeasonalPWVTrans`` object. Args: pwv_variability (str): Command line value for how to vary PWV as a function of time Returns: A propagation effect usable with a supernova model object """ # The command line parser defaults to the pwv variability being a string # even if it is numeric. A typecast is sometimes in order. if pwv_variability.isnumeric(): transmission_effect = models.StaticPWVTrans() transmission_effect.set(pwv=float(pwv_variability)) return transmission_effect # Model PWV continuously over the year using CTIO data elif pwv_variability == 'epoch': return models.VariablePWVTrans(self.pwv_model) elif pwv_variability == 'seasonal': return models.SeasonalPWVTrans.from_pwv_model(self.pwv_model) raise NotImplementedError(f'Unknown variability: {pwv_variability}') @property def pwv_model(self) -> models.PWVModel: """Build a PWV model based on the command line argument""" print('Building PWV Model...') data_cuts = dict() for value in SUOMINET_VALUES: if param_bound := getattr(self, f'cut_{value}', None): data_cuts[value] = [param_bound, ] primary_year, *supp_years = self.pwv_model_years receiver = GPSReceiver(self.receiver_id, data_cuts=data_cuts) return models.PWVModel.from_suominet_receiver(receiver, primary_year, supp_years) @property def fitting_bounds(self) -> Dict[str, Tuple]: """Parameter boundaries to enforce when fitting light-curves""" fitting_bounds = dict() for param in SALT2_PARAMS: if param_bound := getattr(self, f'bound_{param}', None): fitting_bounds[param] = param_bound return fitting_bounds @property def simulation_model(self) -> models.SNModel: """Return the Supernova model used for fitting light-curves""" propagation_effect = self._create_pwv_effect(self.sim_variability) print('Building supernova simulation model...') return models.SNModel( source=self.sim_source, effects=[propagation_effect], effect_names=[''], effect_frames=['obs']) @property def fitting_model(self) -> models.SNModel: """Return the Supernova model used for simulating light-curves""" propagation_effect = self._create_pwv_effect(self.fit_variability) print('Building supernova fitting model...') return models.SNModel( source=self.fit_source, effects=[propagation_effect], effect_names=[''], effect_frames=['obs']) @property def catalog(self) -> models.VariableCatalog: """The reference star catalog to calibrate simulations with.""" return models.VariableCatalog(self.pwv_model, *self.ref_stars) @property def add_scatter(self) -> bool: """Whether to include added scatter in the light-curve simulations.""" return not self.no_scatter
[docs]class Parser(argparse.ArgumentParser): """Commandline parser with a pre-defined application interface"""
[docs] def __init__(self) -> None: """Instantiate the parser and define the commandline interface""" super().__init__() self.add_argument('-v', '--version', action='version', version=__version__) ####################################################################### # General Pipeline configuration ####################################################################### self.add_argument( '-c', '--cadence', type=str, required=True, help='Observational cadence to assume for the LSST.' ) self.add_argument( '-s', '--sim_pool_size', type=int, default=1, help='Total number of workers to spawn for simulating supernova light-curves.' ) self.add_argument( '-f', '--fit_pool_size', type=int, default=1, help='Total number of workers to spawn for fitting light-curves.' ) self.add_argument( '-w', '--write_pool_size', type=int, default=1, help='Total number of workers to spawn for writing data to disk.' ) self.add_argument( '-i', '--iter_lim', type=int, default=float('inf'), help='Exit pipeline after processing the given number of light-curves (Useful for profiling).' ) self.add_argument( '-o', '--out_path', type=Path, required=True, help='Output file path (a .h5 extension is enforced).' ) self.add_argument( '--write_lc_sims', action='store_true', help='Flag to include simulated light curves in the pipeline output file.' ' Note the added IO may noticeably increase the pipeline runtime.' ) ####################################################################### # Light-curve simulation ####################################################################### simulation_group = self.add_argument_group( 'Light-Curve Simulation', description='Options for simulating supernova light-curves.') simulation_group.add_argument( '--sim_source', type=str, default='salt2-extended', help='The name of the sncosmo spectral template to use when simulating supernova light-curves.' ) simulation_group.add_argument( '--sim_variability', type=str, required=True, help='Rate at which to vary PWV when simulating light-curves.' ' Specify a numerical value for a fixed PWV concentration.' ' Specify "epoch" to vary the PWV per observation.' ) simulation_group.add_argument( '--ref_stars', type=str, default=('G2', 'M5', 'K2'), nargs='+', help='Reference star(s) to calibrate simulated SNe against.' ) ####################################################################### # Light-curve fitting ####################################################################### fitting_group = self.add_argument_group( 'Light-Curve Fitting', description='Options for configuring supernova light-curve fits.') fitting_group.add_argument( '--fit_source', type=str, default='salt2-extended', help='The name of the sncosmo spectral template to use when fitting supernova light-curves.' ) fitting_group.add_argument( '--fit_variability', type=str, required=True, help='Rate at which to vary the assumed PWV when fitting light-curves.' ' Specify a numerical value for a fixed PWV concentration.' ' Specify "epoch" to vary the PWV per observation.' ) fitting_group.add_argument( '--vparams', type=str, default=('x0', 'x1', 'c'), nargs='+', help='Parameters to vary when fitting light-curves.' ) for param in SALT2_PARAMS: fitting_group.add_argument( f'--bound_{param}', type=float, default=None, nargs=2, help=f'Upper and lower bounds for {param} parameter when fitting light-curves (Optional).' ) ####################################################################### # PWV Modeling ####################################################################### pwv_modeling_group = self.add_argument_group( 'PWV Modeling', description='Options used when building the PWV variability model' ' from SuomiNet GPS data.' ) pwv_modeling_group.add_argument( '--receiver_id', type=str, default='ctio' ) pwv_modeling_group.add_argument( '--pwv_model_years', type=float, nargs='+', default=[2016, 2017] ) pwv_modeling_group.add_argument( '--cut_PWV', type=float, nargs=2, default=[0, 30], help='Only use measured data points with a PWV value within the given bounds (units of millimeters)' ) data_cut_names = ('surface pressure', 'temperature', 'relative humidity', 'zenith delay') data_cut_units = ('Millibars', 'Centigrade', 'Percentage', 'Millimeters') for arg, name, unit in zip(SUOMINET_VALUES[1:], data_cut_names, data_cut_units): pwv_modeling_group.add_argument( f'--cut_{arg}', type=float, nargs=2, help=f'Only use measured data points with a {name} value within the given bounds (units of {unit})' ) debugging_group = self.add_argument_group( 'Debugging / Validation', description='Options used when debugging pipeline behavior.' ) debugging_group.add_argument( '--no-scatter', action='store_true', help='Flag used to turn off added scatter when simulating light-curves.' ) debugging_group.add_argument( '--fixed-snr', type=float, default=None, help='Simulate light-curves with a fixed signal to noise ratio.' ) ####################################################################### # Pipeline Visualization ####################################################################### visualizer = self.add_argument_group( title='Pipeline Visualization', description='Optionally launch a server instance for monitoring the pipeline in real time.') visualizer.add_argument( '--visualize', action='store_true', help='Flag used to launch a web server for visualizing the current pipeline status.' ) visualizer.add_argument( '--host', type=str, default=None, help='Optionally define the host IP used to serve the application from.' ) visualizer.add_argument( '--port', type=int, default=None, help='Optionally define the port used to serve the application.' ) visualizer.add_argument( '--proxy', type=str, default=None, help='Optionally use a proxy to serve the application to a different URL "{input}::{output}".' )
[docs]class Application: """Executable command-line application. Launch the application by running ``Application.execute``. """
[docs] @staticmethod def execute(): """Parse any commandline arguments and run the analysis pipeline""" parsed_args = Parser().parse_args(namespace=AdvancedNamespace()) print(f'Instantiating pipeline (target: {parsed_args.out_path})') pipeline = FittingPipeline( cadence=parsed_args.cadence, sim_model=parsed_args.simulation_model, fit_model=parsed_args.fitting_model, vparams=parsed_args.vparams, out_path=parsed_args.out_path, simulation_pool=parsed_args.sim_pool_size, fitting_pool=parsed_args.fit_pool_size, writing_pool=parsed_args.write_pool_size, bounds=parsed_args.fitting_bounds, iter_lim=parsed_args.iter_lim, catalog=parsed_args.catalog, add_scatter=parsed_args.add_scatter, fixed_snr=parsed_args.fixed_snr, write_lc_sims=parsed_args.write_lc_sims ) pipeline.validate() if parsed_args.visualize: pipeline.run_async() pipeline.visualize() else: