Source code for snat_sim.pipeline

"""The ``pipeline`` module defines the ``FittingPipeline`` class, which
is built to provide a parallelized approach to simulating and fitting
light-curves with atmospheric effects.

SubModules
----------

Although the ``pipeline`` module provides a prebuilt data analysis pipeline,
you can also build customized pipelines using any of the included nodes.
Relevant documentation can be found in the following pages:

.. autosummary::
   :nosignatures:

   data_model
   nodes

Usage Example
-------------

Instances of the ``FittingPipeline`` class can be run synchronously
(by calling ``FittingPipeline.run``) or asynchronously (with
``FittingPipeline.run_async``). A pipeline instance can be created as
follows:

.. doctest:: python

   >>> from snat_sim.models import SNModel
   >>> from snat_sim.pipeline import FittingPipeline

   >>> pipeline = FittingPipeline(
   ...     cadence='alt_sched',
   ...     sim_model=SNModel('salt2'),
   ...     fit_model=SNModel('salt2'),
   ...     vparams=['x0', 'x1', 'c'],
   ...     out_path='./demo_out_path.h5',
   ...     fitting_pool=6,
   ...     simulation_pool=3
   ... )

Module Docs
-----------
"""

from __future__ import annotations

from numbers import Number
from pathlib import Path
from typing import *
from typing import Dict, List

from egon.pipeline import Pipeline

from . import nodes
from .nodes import FitLightCurves, LoadPlasticcCadence, SimulateLightCurves, WritePipelinePacket
from ..models import SNModel, VariableCatalog
from ..plasticc import PLAsTICC


[docs]class FittingPipeline(Pipeline): """Pipeline of parallel processes for simulating and fitting light-curves"""
[docs] def __init__( self, cadence: str, sim_model: SNModel, fit_model: SNModel, vparams: List[str], out_path: Union[str, Path], fitting_pool: int = 1, simulation_pool: int = 1, writing_pool: int = 1, bounds: Dict[str, Tuple[Number, Number]] = None, max_queue: int = 100, iter_lim: int = float('inf'), catalog: VariableCatalog = None, add_scatter: bool = True, fixed_snr: Optional[float] = None, write_lc_sims: bool = False ) -> None: """Fit light-curves using multiple processes and combine results into an output file Args: cadence: Cadence to use when simulating light-curves sim_model: Model to use when simulating light-curves fit_model: Model to use when fitting light-curves vparams: List of parameter names to vary in the fit out_path: Path to write results to fitting_pool: Number of child processes allocated to simulating light-curves simulation_pool: Number of child processes allocated to fitting light-curves bounds: Bounds to impose on ``fit_model`` parameters when fitting light-curves max_queue: Maximum number of light-curves to store in pipeline at once iter_lim: Limit number of processed light-curves (Useful for profiling) catalog: Reference star catalog to calibrate simulated supernova with add_scatter: Add randomly generated scatter to simulated light-curve points fixed_snr: Simulate light-curves with a fixed signal to noise ratio write_lc_sims: Include simulated light_curves in the """ out_path = Path(out_path).resolve() out_path.parent.mkdir(exist_ok=True) # Define the nodes of the analysis pipeline cadence = PLAsTICC(cadence, model=11) self.load_plastic = LoadPlasticcCadence(cadence, iter_lim=iter_lim) self.write_to_disk = WritePipelinePacket(out_path, write_lc_sims, num_processes=writing_pool) self.simulate_light_curves = SimulateLightCurves( sn_model=sim_model, catalog=catalog, num_processes=simulation_pool, add_scatter=add_scatter, fixed_snr=fixed_snr ) self.fit_light_curves = FitLightCurves( sn_model=fit_model, vparams=vparams, bounds=bounds, num_processes=fitting_pool) # Connect pipeline nodes together self.load_plastic.output.connect(self.simulate_light_curves.input) self.simulate_light_curves.success_output.connect(self.fit_light_curves.input) self.simulate_light_curves.failure_output.connect(self.write_to_disk.input) self.fit_light_curves.success_output.connect(self.write_to_disk.input) self.fit_light_curves.failure_output.connect(self.write_to_disk.input) if max_queue: # Limit the number of light-curves fed into the pipeline self.simulate_light_curves.input.maxsize = max_queue self.fit_light_curves.input.maxsize = max_queue * simulation_pool self.write_to_disk.input.maxsize = max_queue * simulation_pool super().__init__()