Source code for snmachine.sndata

"""
Module containing Dataset classes. These read in data from various sources and turns the light curves into astropy tables that
can be read by the rest of the code.
"""
from __future__ import division
from past.builtins import basestring
import numpy as np
import os
if not 'DISPLAY' in os.environ:
    import matplotlib
    matplotlib.use('Agg')
import matplotlib.pyplot as plt
from astropy.table import Table, Column
from astropy.io import fits
import sys, sncosmo
from random import shuffle,sample
from scipy import interpolate
import sncosmo, math

#Colours for graphs
colours={'sdssu':'#6614de','sdssg':'#007718','sdssr':'#b30100','sdssi':'#d35c00','sdssz':'k','desg':'#007718','desr':'#b30100','desi':'#d35c00','desz':'k',
            'lssty':'#2727c1','lsstu':'#6614de','lsstg':'#007718','lsstr':'#b30100','lssti':'#d35c00','lsstz':'k','lsstY':'#2727c1'}
sntypes={1:'Ia',2:'II',21:'IIn',22:'IIP',23:'IIL',3:'Ibc',32:'Ib',33:'Ic',66:'other'}
markers={'desg':'^', 'desr':'o', 'desi':'s', 'desz':'*'}
labels={'desg':'g', 'desr':'r', 'desi':'i', 'desz':'z'}

[docs]def plot_lc(lc): """ External function to plot light curves. Parameters ---------- lc : astropy.table.Table Light curve """ """ @param fname The filename of the supernova (relative to data_root) """ #This selects the filters from the possible set that this object has measurements in and maintains the order filts=np.unique(lc['filter']) #Keep track of the min and max values on the plot for resizing axes min_x=np.inf max_x=-np.inf lines=[] for j in range(len(filts)): inds=np.where(lc['filter']==filts[j])[0] if 'flux_error' in lc.keys(): t, F, F_err=lc['mjd'][inds], lc['flux'][inds], lc['flux_error'][inds] error=True else: t, F=lc['mjd'][inds], lc['flux'][inds] error=False #tdelt=t-t.min() tdelt=t if filts[j] in markers.keys(): mkr=markers[filts[j]] else: mkr='o' if error: l=plt.errorbar(tdelt, F,yerr=F_err, marker=mkr,linestyle='none', color=colours[filts[j]], markersize=4) else: l=plt.plot(tdelt, F,lw=2, marker=mkr,color=colours[filts[j]]) lines.append(l) if tdelt.min()<min_x: min_x=tdelt.min() if tdelt.max()>max_x: max_x=tdelt.max() ext=0.05*(max_x-min_x) plt.xlim([min_x-ext, max_x+ext]) plt.xlabel('Time (days)', fontsize=16) plt.ylabel('Flux', fontsize=16) plt.legend(lines, filts, numpoints=1,loc='best')
[docs]class Dataset: """ Class to manage the files from a single dataset. The base class works with data from the SPCC. This class can be inherited and overridden to work a completely different kind of dataset. All this class really needs is a list of object names and a method, get_lightcurve, which takes an individual object name and returns a light curve. Other functions provided here are for plotting and convenience. """ def __init__(self, folder, subset='none', filter_set=['desg', 'desr', 'desi', 'desz']): """ Initialisation. Parameters ---------- folder : str Root folder containing the data subset : str or list-like, optional Something you pass to get_object_names to specify which objects you want filter_set : list, optional List of possible filters used """ self.filter_set=filter_set self.rootdir=folder self.survey_name=folder.split('/')[-2] self.object_names=self.get_object_names(subset=subset) #Get all the data as a list of astropy tables (this should not be memory intensive, even for large numbers of light curves) self.data={} invalid=0 #Some objects may have empty data print ('Reading data...') for i in range(len(self.object_names)): lc=self.get_lightcurve(self.object_names[i]) if len(lc['mjd']>0): self.data[self.object_names[i]]=lc else: invalid+=1 if invalid>0: print ('%d objects were invalid and not added to the dataset.' %invalid) print ('%d objects read into memory.' %len(self.data)) #We create an optional model set which can be set by whatever feature class used self.models={}
[docs] def get_object_names(self, subset='none'): """ Gets a list of the names of the files within the dataset. Parameters ---------- subset : str or list-like, optional Used to specify which files you want. Current setup is get_object_names will accept a list of indices, a list of actual object names as a subset or the keyword 'spectro'. """ if isinstance(subset,basestring): if subset=='spectro': object_names= np.genfromtxt(self.rootdir+'spectro.list', dtype='U').flatten() else: object_names= np.genfromtxt(self.rootdir+self.survey_name+'.LIST', dtype='U') elif all(isinstance(l,basestring) for l in subset): #We assume subset is a list of strings containing object names object_names= subset else: #Otherwise it must be a list of indices. Otherwise raise an error. names=np.genfromtxt(self.rootdir+self.survey_name+'.LIST', dtype='U') try: object_names= names[subset] except IndexError: print ('Invalid subset provided') sys.exit() return np.sort(object_names)
[docs] def get_max_length(self): """Gets the length (in days) of the longest observation in the dataset. """ max_obs=0 for n in self.object_names: times=self.data[n]['mjd'] dif=times.max()-times.min() if dif>max_obs: max_obs=dif return max_obs
[docs] def get_lightcurve(self, flname): """ Given a filename, returns a light curve astropy table that conforms with sncosmo requirements Parameters ---------- flname : str The filename of the supernova (relative to data_root) Returns ------- astropy.table.Table Light curve """ fl=open(self.rootdir+flname,'r') mjd=[] flt=[] flux=[] fluxerr=[] z=-9 z_err=-9 type=-9 for line in fl: s=line.split() if len(s)>0: if s[0]=='HOST_GALAXY_PHOTO-Z:': z=(float)(s[1]) z_err=(float)(s[3]) elif s[0]=='OBS:': mjd.append((float)(s[1])) flt.append('des'+s[2]) flux.append((float)(s[4])) fluxerr.append((float)(s[5])) elif s[0]=='SIM_COMMENT:': for k in sntypes.keys(): if sntypes[k] in s: type=(int)(k) #Zeropoint zp=np.array([27.5]*len(mjd)) zpsys=['ab']*len(mjd) #Make everything arrays mjd=np.array(mjd) flt=np.array(flt, dtype='str') flux=np.array(flux) fluxerr=np.array(fluxerr) start_mjd=mjd.min() mjd=mjd-start_mjd #We shift the times of the observations to all start at zero. If required, the mjd of the initial observation is stored in the metadata. #Note: obviously this will only zero the observations in one filter band, the others have to be zeroed if fitting functions. tab = Table([mjd, flt, flux, fluxerr, zp, zpsys], names=('mjd', 'filter', 'flux', 'flux_error', 'zp', 'zpsys'), meta={'name':flname,'z':z, 'z_err':z_err, 'type':type, 'initial_observation_time':start_mjd}) return tab
def __plot_this(self, fname, title=True, loc='best'): """ Internal function used by other functions to plot light curves. Parameters ---------- fname : str The filename of the supernova (relative to data_root) title : str, optional Put a title on the plot loc : str, optional Location of legend """ lc=self.data[fname] #This selects the filters from the possible set that this object has measurements in and maintains the order filts=sorted(set(self.filter_set) & set(np.unique(lc['filter'])), key = self.filter_set.index) #Keep track of the min and max values on the plot for resizing axes min_x=np.inf max_x=-np.inf lines=[] for j in range(len(filts)): inds=np.where(lc['filter']==filts[j])[0] t, F, F_err=lc['mjd'][inds], lc['flux'][inds], lc['flux_error'][inds] #tdelt=t-t.min() tdelt=t if filts[j] in markers.keys(): mkr=markers[filts[j]] else: mkr='o' #Plot the model, if it has been set if self.plot_model: if fname in self.models.keys(): mod=self.models[fname] if mod is not None: inds=np.where(mod['filter']==filts[j])[0] t_mod, F_mod=mod['mjd'][inds], mod['flux'][inds] plt.plot(t_mod, F_mod, color=colours[filts[j]]) l=plt.errorbar(tdelt, F,yerr=F_err, marker=mkr, linestyle='none', color=colours[filts[j]], markersize=4) lines.append(l) if tdelt.min()<min_x: min_x=tdelt.min() if tdelt.max()>max_x: max_x=tdelt.max() ext=0.05*(max_x-min_x) plt.xlim([min_x-ext, max_x+ext]) plt.xlabel('Time (days)') plt.ylabel('Flux') #plt.gca().tick_params(labelsize=8) if title: plt.title('Object: %s, z:%0.2f, Type:%s' %(fname, lc.meta['z'], lc.meta['type'])) labs=[] for f in filts: if f in labels.keys(): labs.append(labels[f]) else: labs.append(f) plt.legend(lines, labs, numpoints=1,loc=loc) #plt.subplots_adjust(left=0.3)
[docs] def set_model(self, fit_sn, *args): """ Can use any function to set the model for all objects in the data. Parameters ---------- fit_sn : function A function which can take a light curve (astropy table) argument and a list of arguments and returns an astropy table args : list, optional Whatever arguments fit_sn requires """ print ('Fitting supernova models...') for obj in self.object_names: self.models[obj]=fit_sn(self.data[obj], *args) print ('Models fitted.')
[docs] def plot_lc(self, fname, plot_model=True, title=True, loc='best'): """Public function to plot a single light curve. Parameters ---------- fname : str The filename of the supernova (relative to data_root) plot_model : bool, optional Whether or not to overplot the model title : str, optional Put a title on the plot loc : str, optional Location of the legend """ self.plot_model=plot_model self.__plot_this(fname, title=title, loc=loc) plt.show()
def __on_press(self, event): """ Allows one to cycle through the supernovae in the dataset by hitting the left or right arrow keys. Parameters ---------- event : keyboard event object Keyboard event (i.e. the left or right arrow button has been pressed) """ event.canvas.figure.clear() if event.key=='right' and self.__ind<len(self.object_names)-1: self.__ind+=1 elif event.key=='left' and self.__ind>0: self.__ind-=1 self.__plot_this(self.object_names[self.__ind]) event.canvas.draw()
[docs] def plot_all(self, plot_model=True): """ Plots all the supernovae in the dataset and allows the user to cycle through them with the left and right arrow keys. Parameters ---------- plot_model : bool, optional Whether or not to overplot the model. """ self.plot_model=plot_model #We use a class variable because this can't be passed directly to __on_press fig = plt.figure() self.__ind=-1 fig.canvas.mpl_connect('key_press_event', self.__on_press) plt.plot([0, 0]) #subplots_adjust(right=0.95, top=0.95) plt.show()
[docs] def get_types(self): """ Returns a list of the types of the entire dataset. Returns ------- `~numpy.ndarray` Array of types """ typs=[] for o in self.object_names: typs.append(self.data[o].meta['type']) tab=Table(data=[self.object_names,typs],names=['Object','Type']) return tab
[docs] def get_redshift(self): """ Returns a list of the redshifts of the entire dataset. Returns ------- `~numpy.ndarray` Array of redshifts """ z=[] for o in self.object_names: if 'z' in self.data[o].meta: z.append(self.data[o].meta['z']) else: z.append(-1) return np.array(z)
[docs] def sim_stats(self, **kwargs): """ Prints information about the survey/simulation. Parameters ---------- indices : list-like, optional List of indices to indicate which objects to consider. This allows you to, for example, see the statistics of a training subsample. plot_redshift : bool, optional Plots a histogram of the redshift distribution """ if 'indices' in kwargs: indices=kwargs['indices'] else: indices=np.arange(len(self.object_names)) if 'plot_redshifts' in kwargs: plot_redshifts=kwargs['plot_redshifts'] else: plot_redshifts=True N=len(indices) redshifts=np.zeros(N) types=np.zeros(N) for i in np.arange(N): ind=indices[i] lc=self.get_lightcurve(self.object_names[ind]) redshifts[i]=lc.meta['z'] types[i]=lc.meta['type'] print() print ('Total number of SNe: %d' %(N)) print() ks=self.sntypes.keys() ks.sort() for k in ks: nk=len(np.where(types==k)[0]) print ('Number of %s: %d (%0.2f%%)' %(self.sntypes[k],nk ,nk/N*100)) nk=len(np.where(types==-9)[0]) print ('Number of unknown: %d (%0.2f%%)' %(nk ,nk/N*100)) if plot_redshifts==True: plt.hist(redshifts[redshifts!=-9], 30, facecolor='#0057f6') plt.xlabel('Redshift', fontsize=16) plt.show()
[docs] def reduced_chi_squared(self, subset = 'none'): """ Returns the reduced chi squared for each object, once a model has been set. Parameters ---------- subset : str or list-like, optional List of a subset of object names. If not supplied, the full dataset will be used Returns ------- dict Dictionary of reduced chi^2 for each object """ if subset == 'none': data_list = self.object_names else: data_list = subset chi_dict ={} for name in data_list: lc=self.data[name] #This selects the filters from the possible set that this object has measurements in and maintains the order filts=sorted(set(self.filter_set) & set(np.unique(lc['filter'])), key = self.filter_set.index) chi_2 = 0 N = 0 # counts total number of points for j in range(len(filts)): inds=np.where(lc['filter']==filts[j])[0] t, F, F_err = np.array(lc['mjd'][inds]), np.array(lc['flux'][inds]), np.array(lc['flux_error'][inds]) #tdelt=t-t.min() tdelt=t N = N + len(tdelt) if name in self.models: mod=self.models[name] if mod is not None: inds=np.where(mod['filter']==filts[j])[0] t_mod, F_mod = np.array(mod['mjd'][inds]), np.array(mod['flux'][inds]) #interpolate the model so that we can compare the model with the data int_f = interpolate.interp1d(t_mod, F_mod, kind='cubic') F_mod_mjd = np.array(int_f(tdelt)) summand=(F-F_mod_mjd)**2/(F_err**2) chi_2 = np.sum(summand) + chi_2 chi_dict[name] = chi_2 / N return chi_dict
[docs]class OpsimDataset(Dataset): """ Class to read in an LSST simulated dataset, based on OpSim runs and SNANA simulations. """ def __init__(self, folder, subset='none', mix=False, filter_set=['lsstu', 'lsstg', 'lsstr', 'lssti', 'lsstz', 'lssty']): """ Initialisation. Parameters ---------- folder : str Folder where simulations are located subset : str or list-like, optional List of a subset of object names. If not supplied, the full dataset will be used mix : bool, optional The output of the simulations is often highly ordered, this randomly permutes the objects when they're read in filter_set : list-like, optional Set of possible filters """ self.survey_name='LSST' self.filter_set=filter_set self.get_data(folder, subset=subset) self.models={} if mix==True: shuffle(self.object_names)
[docs] def get_data(self, folder, subset='none'): """ Reads in the simulated data Parameters ---------- folder : str Folder where simulations are located subset : str or list-like, optional List of a subset of object names. If not supplied, the full dataset will be used """ if ~isinstance(subset,basestring) and (all(isinstance(l,basestring) for l in subset)): #We have to deal with separate Ia and nIa fits files Ia_head=os.path.join(folder,'LSST_Ia_HEAD.FITS') nIa_head=os.path.join(folder,'LSST_NONIa_HEAD.FITS') df = fits.open(Ia_head)[1].data subset=np.char.strip(subset) #If these are read from the header they have to be stripped of white space Ia_ids=subset[np.in1d(subset,np.char.strip(df['SNID']))] df = fits.open(nIa_head)[1].data nIa_ids=subset[np.in1d(subset,np.char.strip(df['SNID']))] data_Ia = sncosmo.read_snana_fits(Ia_head, os.path.join(folder,'LSST_Ia_PHOT.FITS'),snids=Ia_ids) data_nIa = sncosmo.read_snana_fits(nIa_head, os.path.join(folder,'LSST_NONIa_PHOT.FITS'),snids=nIa_ids) else: data_Ia = sncosmo.read_snana_fits(os.path.join(folder,'LSST_Ia_HEAD.FITS'), os.path.join(folder,'LSST_Ia_PHOT.FITS')) data_nIa = sncosmo.read_snana_fits(os.path.join(folder,'LSST_NONIa_HEAD.FITS'), os.path.join(folder,'LSST_NONIa_PHOT.FITS')) if isinstance(subset,basestring)and subset=='Ia': all_data=data_Ia elif isinstance(subset,basestring) and subset=='nIa': all_data=data_nIa else: all_data=data_Ia+data_nIa self.data={} self.object_names=[] invalid=0 #Some objects may have empty data print ('Reading data...') for i in range(len(all_data)): snid=all_data[i].meta['SNID'] if isinstance(subset,basestring) or ((snid in subset) or (i in subset)): self.object_names.append((str)(snid)) lc=self.get_lightcurve(all_data[i]) if len(lc['mjd']>0): self.data[snid]=lc else: invalid+=1 if invalid>0: print ('%d objects were invalid and not added to the dataset.' %invalid) self.object_names=np.array(self.object_names, dtype='str') print ('%d objects read into memory.' %len(self.data))
[docs] def get_lightcurve(self, tab): """ Converts the sncosmo convention for the astropy tables to snmachine's. Parameters ---------- tab : astropy.table.Table Light curve """ tab_new=tab['MJD', 'FLUXCAL', 'FLUXCALERR', 'FLT'] tab_new.rename_column('MJD','mjd') start_mjd=(tab_new['mjd']).min() tab_new['mjd']=tab_new['mjd']-start_mjd tab_new.rename_column('FLUXCAL','flux') tab_new.rename_column('FLUXCALERR','flux_error') tab_new.rename_column('FLT','filter') tab_new=Table(tab_new, dtype=['f', 'f', 'f', 'S64']) old_filts=['u', 'g', 'r', 'i', 'z', 'Y'] for f in range(len(old_filts)): tab_new['filter'][tab_new['filter']==old_filts[f]]=self.filter_set[f] zp=Column(name='zp', data=np.array([27.5]*len(tab_new['mjd']))) zpsys=Column(name='zpsys', data=['ab']*len(tab_new['mjd'])) tab_new.add_column(zp) tab_new.add_column(zpsys) tab_new.meta={'name':tab.meta['SNID'], 'z':tab.meta['REDSHIFT_FINAL'], 'z_err':tab.meta['REDSHIFT_FINAL_ERR'], 'type':tab.meta['SNTYPE'], 'initial_observation_time':start_mjd} return tab_new
[docs]class SDSS_Data(Dataset): """ Class to read in the SDSS supernovae dataset """ def __init__(self, folder, subset='none', training_only=False, filter_set=['sdssu','sdssg', 'sdssr', 'sdssi', 'sdssz'], subset_length = False, classification = 'none'): """ Initialisation Parameters ---------- folder : str Folder where simulations are located subset : str or list-like, optional List of a subset of object names. If not supplied, the full dataset will be used filter_set : list-like, optional Set of possible filters subset_length : bool or int, optional If supplied, will return this many random objects (can be used in conjunction with subset="spectro") classification : str, optional Can return one specific type of supernova. """ self.filter_set=filter_set self.rootdir=folder self.survey_name=folder.split(os.path.sep)[-2] # second to last / / /.. self.object_names=self.get_object_names(subset=subset,subset_length=subset_length,classification=classification) #Get all the data as a list of astropy tables (this should not be memory intensive, even for large numbers of light curves) self.data={} invalid=0 #Some objects may have empty data print ('Reading data...') for i in range(len(self.object_names)): lc=self.get_lightcurve(self.object_names[i]) if len(lc['mjd']>0): self.data[self.object_names[i]]=lc else: invalid+=1 if invalid>0: print ('%d objects were invalid and not added to the dataset.' %invalid) print ('%d objects read into memory.' %len(self.data)) #We create an optional model set which can be set by whatever feature class used self.models={}
[docs] def get_SNe(self,subset_length): """ Function to take all supernovae from Master SDSS data file and return a random sample of SNe of user-specified length if requested Parameters ---------- subset_length : int Number of objects to return Returns ------- list-like List of object names """ fl = open(self.rootdir+self.survey_name+'.LIST') SN = [] for line in fl: s = line.split() if (s[5] == "SNIa" or s[5] == "SNIb" or s[5] == "SNIc" or s[5] == "SNII" or s[5] == "SNIa?" or s[5] == "pSNIa" or s[5] == "pSNIbc" or s[5] == "pSNII" or s[5] == "zSNIa" or s[5] == "zSNIbc" or s[5] == "zSNII"): if len(str(s[0])) == 3: SN.append("SMP_000%s.dat" % s[0]) elif len(str(s[0])) == 4: SN.append("SMP_00%s.dat" % s[0]) elif len(str(s[0])) == 5: SN.append("SMP_0%s.dat" % s[0]) #SN now contains all file names for supernovae if subset_length != False: SN = [SN[i] for i in sorted(sample(range(len(SN)), subset_length)) ] return SN
[docs] def get_spectro(self,subset_length,classification): """ Function to take all spectroscopically confirmed supernovae from Master file and return a random sample of SNe of user-specified length if requested Parameters ---------- subset_length : bool or int Number of objects to return (False to return all) classification : str Can specify a particular type of supernova to return ('none' for all types) Returns ------- list-like List of object names """ fl = open(self.rootdir+self.survey_name+'.LIST') SN = [] classes = [] for line in fl: s = line.split() if (s[5] == "SNIa" or s[5] == "SNIb" or s[5] == "SNIc" or s[5] == "SNII"): classes.append(s[5]) if len(str(s[0])) == 3: SN.append("SMP_000%s.dat" % s[0]) elif len(str(s[0])) == 4: SN.append("SMP_00%s.dat" % s[0]) elif len(str(s[0])) == 5: SN.append("SMP_0%s.dat" % s[0]) #SN now contains all file names for spectroscopically confirmed supernovae if subset_length != False: x = sorted(sample(range(len(SN)), subset_length)) SN = [SN[i] for i in x ] classes = [classes[i] for i in x] if classification != 'none': # can specify classification of supernova if user-requested if classification == 'Ia' or classification == 'SNIa': SN = [SN[i] for i in range(len(SN)) if classes[i] =='SNIa'] elif classification == 'Ib' or classification == 'SNIb': SN = [SN[i] for i in range(len(SN)) if classes[i] =='SNIb'] elif classification == 'Ic' or classification == 'SNIc': SN = [SN[i] for i in range(len(SN)) if classes[i] =='SNIc'] elif classification == 'Ibc' or classification == 'SNIbc': SN = [SN[i] for i in range(len(SN)) if classes[i] == 'SNIb' or classes[i] == 'SNIc'] elif classification == 'II' or classification == 'SNII': SN = [SN[i] for i in range(len(SN)) if classes[i] == 'SNII'] else: print ('Invalid classification requested.') sys.exit() return SN
[docs] def get_photo(self,subset_length,classification): """ Function to take all purely photometric supernovae from Master file and return a random sample of SNe of user-specified length if requested Parameters ---------- subset_length : bool or int Number of objects to return (False to return all) classification : str Can specify a particular type of supernova to return ('none' for all types) Returns ------- list-like List of object names """ fl = open(self.rootdir+self.survey_name+'.LIST') SN = [] classes = [] for line in fl: s = line.split() if (s[5] == "pSNIa" or s[5] == "pSNIbc" or s[5] == "pSNII" or s[5] == "zSNIa" or s[5] == 'zSNIbc' or s[5] == 'zSNII' or s[5] == 'SNIa?'): classes.append(s[5]) if len(str(s[0])) == 3: SN.append("SMP_000%s.dat" % s[0]) elif len(str(s[0])) == 4: SN.append("SMP_00%s.dat" % s[0]) elif len(str(s[0])) == 5: SN.append("SMP_0%s.dat" % s[0]) #SN now contains all file names for spectroscopically confirmed supernovae if subset_length != False: x = sorted(sample(range(len(SN)), subset_length)) SN = [SN[i] for i in x ] classes = [classes[i] for i in x] if classification != 'none': # can specify classification of supernova if user-requested if classification =='Ia' or classification == 'SNIa': SN = [SN[i] for i in range(len(SN)) if classes[i] == 'SNIa?' or classes[i] == 'pSNIa' or classes[i] == 'zSNIa'] elif classification == 'Ibc' or classification == 'SNIbc': SN = [SN[i] for i in range(len(SN)) if classes[i] == 'pSNIbc' or classes[i] == 'zSNIbc'] elif classification == 'II' or classification == 'SNII': SN = [SN[i] for i in range(len(SN)) if classes[i] == 'pSNII' or classes[i] == 'zSNII'] else: print ('Invalid classification requested.') sys.exit() return SN
[docs] def get_object_names(self, subset='none',subset_length=False,classification='none'): """ Gets a list of the names of the files within the dataset. Parameters ---------- subset : str or list-like, optional List of a subset of object names. If not supplied, the full dataset will be used subset_length : bool or int, optional Number of objects to return (False to return all) classification : str, optional Can specify a particular type of supernova to return ('none' for all types) Returns ------- list-like Object names """ if isinstance(subset,basestring): if subset=='spectro': return np.array(self.get_spectro(subset_length,classification)) # have to convert to numpy array for wavelet features to work #loads random sample of the spec confirmed data defaulted to whole elif subset == 'photo': ##only photometric data return np.array(self.get_photo(subset_length,classification)) elif subset=='none': return np.array(self.get_SNe(subset_length)) # loads random sample of SNe from whole master file - sample defaulted to whole elif all(isinstance(l,basestring) for l in subset): #We assume subset is a list of strings containing object names return subset else: #Otherwise it must be a list of indices. Otherwise raise an error. names=np.genfromtxt(self.rootdir+self.survey_name+'.LIST', dtype='str') try: return names[subset] except IndexError: print ('Invalid subset provided')
[docs] def get_info(self,flname): """ Function which takes file name of supernova and returns dictionary of spectroscopic and photometric redshifts and their errors when available as well as the type of the supernova Parameters ---------- flname : str Name of object Returns ------- list-like Redshift, redshift error, type """ fl = open(self.rootdir+self.survey_name+'.LIST') z = {'z_hel':float('nan'),'z_phot':float('nan')}# z_hel is spectroscopic heliocentric redshift and z_psnid uses zspec as prior but has many more?? z_err = {'z_hel_err':float('nan'),'z_phot_err':float('nan')} t = -9 for line in fl: s=line.split() if "SMP_000%s.dat" % s[0] == flname or "SMP_00%s.dat" % s[0] == flname or "SMP_0%s.dat" % s[0] == flname: # is this a bit slow? if s[103] != "\\N": z['z_phot']=float(s[103]) else: z['z_phot']= -9 if s[11] != "\\N": z['z_hel']=float(s[11]) else: z['z_hel']= -9 if s[104] != "\\N": z_err['z_phot_err']=float(s[104]) else: z_err['z_phot_err']= -9 if s[12] != "\\N": z_err['z_hel_err']=float(s[12]) else: z_err['z_hel_err']= -9 if s[5] == 'SNIa' or s[5] == 'pSNIa' or s[5] == 'zSNIa' or s[5] =='SNIa?': #all classifications of type Ia SNe - includes probable SNeIa t = 1 elif s[5] == 'SNIb' or s[5] == 'SNIc' or s[5] == 'pSNIbc' or s[5] == 'zSNIbc': t = 3 elif s[5] == 'SNII' or s[5] == 'pSNII' or s[5] == 'zSNII': t = 2 return z , z_err, t
[docs] def get_lightcurve(self, flname): """ Given a filename, returns a light curve astropy table that conforms with sncosmo requirements Parameters ---------- flname : str The filename of the supernova (relative to data_root) Returns ------- astropy.table.Table Light curve """ fl=open(self.rootdir+flname,'r') mjd=[] flt=[] # band flux=[] fluxerr=[] # error in flux mag = [] magerr = [] line_count = 0 for line in fl: s=line.split() line_count = line_count + 1 if len(s)>0 and line_count > 4 and float(s[0]) < 1024: # s[0] is flag - a flag of 1024 or greater indicates bad data ( http://arxiv.org/pdf/0908.4277v1.pdf page 18) mjd.append(s[1]) flux.append(s[7]) fluxerr.append(s[8]) mag.append(s[3]) magerr.append(s[4]) if s[2] == '0': flt.append('sdssu') elif s[2] == '1': flt.append('sdssg') elif s[2] == '2': flt.append('sdssr') elif s[2] == '3': flt.append('sdssi') elif s[2] == '4': flt.append('sdssz') mjd = [float(x) for x in mjd] # contains strings - convert to floats flux = [float(x) for x in flux] fluxerr = [float(x) for x in fluxerr] mag = [float(x) for x in mag] magerr = [float(x) for x in magerr] (Z, Z_err, type) = self.get_info(flname) if Z['z_hel'] != -9: z = Z['z_hel'] #use spectroscopic redshifts if available z_err = Z_err['z_hel_err'] elif Z['z_phot'] != -9: z = Z['z_phot'] #photometric redshift z_err = Z_err['z_phot_err'] else: z = -9 z_err = -9 # returns values of -9 if no redshift is available #Zeropoint zp=np.array([27.5]*len(mjd)) zpsys=['ab']*len(mjd) #Make everything arrays mjd=np.array(mjd) flt=np.array(flt, dtype='str') flux=np.array(flux) fluxerr=np.array(fluxerr) mag = np.array(mag) magerr = np.array(magerr) start_mjd=mjd.min() r_flux = np.array([flux[i] for i in range(len(flux)) if flt[i]=='sdssr']) if len(r_flux) > 0: peak_flux = r_flux.max() else: peak_flux = -9 mjd=mjd-start_mjd #We shift the times of the observations to all start at zero. If required, the mjd of the initial observation is stored in the metadata. #Note: obviously this will only zero the observations in one filter band, the others have to be zeroed if fitting functions. tab = Table([mjd, flt, flux, fluxerr, zp, zpsys, mag, magerr], names=('mjd', 'filter', 'flux', 'flux_error', 'zp', 'zpsys', 'mag', 'mag_error'), meta={'name':flname,'z':z, 'z_err':z_err, 'type':type, 'initial_observation_time':start_mjd, 'peak flux':peak_flux }) return tab
[docs]class SDSS_Simulations(Dataset): """ Class to read in the SDSS simulations dataset """ def __init__(self, folder, subset='none', training_only=False, filter_set=['sdssu','sdssg', 'sdssr', 'sdssi', 'sdssz'], subset_length = False, classification = 'none'): """ Initialisation Parameters ---------- folder : str Folder where simulations are located subset : str or list-like, optional List of a subset of object names. If not supplied, the full dataset will be used filter_set : list-like, optional Set of possible filters subset_length : bool or int, optional If supplied, will return this many random objects (can be used in conjunction with subset="spectro") classification : str, optional Can return one specific type of supernova. """ self.filter_set=filter_set self.rootdir=folder self.survey_name=folder.split(os.path.sep)[-2] # second to last / / /.. #Get all the data as a list of astropy tables (this should not be memory intensive, even for large numbers of light curves) self.data={} invalid=0 #Some objects may have empty data print ('Reading data..') (self.data, invalid) = self.get_data(subset, subset_length, classification) if invalid>0: print ('%d objects were invalid and not added to the dataset.' %invalid) print ('%d objects read into memory.' %len(self.data)) self.object_names = self.data.keys() #We create an optional model set which can be set by whatever feature class used self.models={}
[docs] def get_data(self, subset='none', subset_length=False, classification = 'none'): """ Function to get all data in same form as SDSS Data """ # read in data as snana files d_Ia = sncosmo.read_snana_fits(self.rootdir + "SDSS_Ia_HEAD.FITS",self.rootdir + "SDSS_Ia_PHOT.FITS") d_nIa = sncosmo.read_snana_fits(self.rootdir + "SDSS_NONIa_HEAD.FITS", self.rootdir + "SDSS_NONIa_PHOT.FITS") invalid = 0 # number of invalid LCs data = {} # allow user to specify dataset of entirely Ia, Ibc, non-Ia or II if classification == 'none': for i in range(len(d_Ia)): SN = self.get_lightcurve(d_Ia[i]) if len(SN['mjd']) > 0 : data['%s'%SN.meta['snid']] = SN else: invalid+=1 for i in range(len(d_nIa)): SN = self.get_lightcurve(d_nIa[i]) if len(SN['mjd']) > 0: data['%s'%SN.meta['snid']] = SN else: invalid+=1 elif classification == 'Ia' or classification == 'SNIa': for i in range(len(d_Ia)): SN = self.get_lightcurve(d_Ia[i]) if len(SN['mjd']) > 0 : data['%s'%SN.meta['snid']] = SN else: invalid+=1 elif classification == 'nIa' or classification == 'non-SNIa': for i in range(len(d_nIa)): SN = self.get_lightcurve(d_nIa[i]) if len(SN['mjd']) > 0: data['%s'%SN.meta['snid']] = SN else: invalid+=1 elif classification == 'Ibc' or classification == 'SNIbc': for i in range(len(d_nIa)): SN = self.get_lightcurve(d_nIa[i]) if len(SN['mjd']) > 0 : if SN.meta['type'] == 3: data['%s'%SN.meta['snid']] = SN else: invalid+=1 elif classification == 'II' or classification == 'SNII': for i in range(len(d_nIa)): SN = self.get_lightcurve(d_Ia[i]) if len(SN['mjd']) > 0: if SN.meta['type'] == 2: data['%s'%SN.meta['snid']] = SN else: invalid+=1 else: print ('Invalid classification requested') sys.exit() # allow user to request entirely spectroscopic data or photometric data if subset == 'spectro': data = dict((key,value) for (key,value) in data.items() if value.meta['data type'] == 'spec') if subset == 'photo': data = dict((key,value) for (key,value) in data.items() if value.meta['data type'] == 'phot') # allow user to request a shortened subset of the dataset if subset_length != False: data = dict(sample(data.items(), subset_length)) return data, invalid
[docs] def get_lightcurve(self, lc): mjd = np.array([lc['MJD'][i] for i in range(len(lc['MJD'])) if lc['FLUXCAL'][i] > 0]) flt = np.array(['sdss' + lc['FLT'][i] for i in range(len(lc['FLT'])) if lc['FLUXCAL'][i] > 0]) flux = np.array( [(lc['FLUXCAL'][i]*math.pow(10,-1.44)) for i in range(len(lc['FLUXCAL'])) if lc['FLUXCAL'][i] > 0]) # ignore negative flux values #FLUX: MULTIPLY BY 10^-1.44 (FLUX IN SDSS IS CALCULATED AS 10^(-0.4MAG +9.56) WHEREAS SIMULATED FLUXES ARE CALCULATED AS 10^(-0.4MAG + 11)- WE USE THE SDSS CONVENTION). fluxerr = np.array([(lc['FLUXCALERR'][i]*math.pow(10,-1.44)) for i in range(len(lc['FLUXCALERR'])) if lc['FLUXCAL'][i] > 0]) mag = np.array([lc['MAG'][i] for i in range(len(lc['MAGERR'])) if lc['FLUXCAL'][i] > 0]) magerr = np.array([lc['MAGERR'][i] for i in range(len(lc['MAGERR'])) if lc['FLUXCAL'][i] > 0]) start_mjd = mjd.min() r_flux = np.array([flux[i] for i in range(len(flux)) if flt[i]=='sdssr']) if len(r_flux) > 0: peak_flux = r_flux.max() else: peak_flux = -9 mjd=mjd-start_mjd #We shift the times of the observations to all start at zero. If required, the mjd of the initial observation is stored in the metadata. #Note: obviously this will only zero the observations in one filter band, the others have to be zeroed if fitting functions. # find supernova classification and whether it is spectroscopically classified or photometrically sntype = -9 dtype = -9 if lc.meta['SNTYPE'] == 120 : sntype = 1 dtype = 'spec' elif lc.meta['SNTYPE'] == 106: sntype =1 dtype = 'phot' elif lc.meta['SNTYPE'] == 32 or lc.meta['SNTYPE'] == 33: sntype =3 dtype = 'spec' elif lc.meta['SNTYPE'] == 132 or lc.meta['SNTYPE'] == 133: sntype = 3 dtype = 'phot' elif lc.meta['SNTYPE'] == 22: sntype = 2 dtype = 'spec' elif lc.meta['SNTYPE'] == 122: sntype = 2 dtype = 'phot' # get redshift - heliocentric is used where possible, otherwise a simulated heliocentric redshift is used z = -9 z_err = -9 if lc.meta['REDSHIFT_HELIO'] != -9 and lc.meta['REDSHIFT_HELIO_ERR'] != -9: z = lc.meta['REDSHIFT_HELIO'] z_err = lc.meta['REDSHIFT_HELIO_ERR'] else: z = lc.meta['SIM_REDSHIFT_HELIO'] # no simulated error in redshift available #supernova identifier (used as object name) snid = lc.meta['SNID'] #Zeropoint zp=np.array([27.5]*len(mjd)) zpsys=['ab']*len(mjd) # form astropy table tab = Table([mjd, flt, flux, fluxerr, zp, zpsys, mag, magerr], names=('mjd', 'filter', 'flux', 'flux_error', 'zp', 'zpsys', 'mag', 'mag_error'), meta={'snid': snid,'z':z, 'z_err':z_err, 'type':sntype, 'initial_observation_time':start_mjd, 'peak flux':peak_flux , 'data type':dtype }) return tab