"""
Utility module mostly wrapping sklearn functionality and providing utility functions.
"""
from __future__ import division
from past.builtins import basestring
import numpy as np
import os
if not 'DISPLAY' in os.environ:
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from sklearn import svm
from sklearn import neighbors
from sklearn import grid_search
from sklearn.naive_bayes import GaussianNB
from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier
from sklearn.tree import DecisionTreeClassifier
from scipy.integrate import trapz
from astropy.table import Table,join
import sys, collections,time
from functools import partial
from multiprocessing import Pool
from sklearn.preprocessing import StandardScaler
#This allows the user to easily loop through all possible classifiers
choice_of_classifiers=['svm', 'knn', 'random_forest', 'decision_tree','boost_dt','boost_rf', 'nb']
#boost_rf is a set of boosted random forests which Max came up with.
try:
from sklearn.neural_network import MLPClassifier
choice_of_classifiers.append('neural_network')
except ImportError:
print ('Neural networks not available in this version of scikit-learn. Neural networks are available from development version 0.18.')
[docs]def roc(pr, Yt, true_class=0):
"""
Produce the false positive rate and true positive rate required to plot
a ROC curve, and the area under that curve.
Parameters
----------
pr : array
An array of probability scores, either a 1d array of size N_samples or an nd array,
in which case the column corresponding to the true class will be used.
Yt : array
An array of class labels, of size (N_samples,)
true_class : int
which class is taken to be the "true class" (e.g. Ia vs everything else)
Returns
-------
fpr : array
An array containing the false positive rate at each probability threshold
tpr : array
An array containing the true positive rate at each probability threshold
auc : float
The area under the ROC curve
"""
probs = pr.copy()
Y_test = Yt.copy()
min_class = (int)(Y_test.min()) # This is to deal with starting class assignment at 1.
Y_test = Y_test.squeeze()
if len(pr.shape)>1:
probs_1 = probs[:, true_class-min_class]
else:
probs_1 = probs
threshold = np.linspace(0., 1., 50) # 50 evenly spaced numbers between 0,1
# This creates an array where each column is the prediction for each threshold
preds = np.tile(probs_1, (len(threshold), 1)).T >= np.tile(threshold, (len(probs_1), 1))
Y_bool = (Y_test == true_class)
Y_bool = np.tile(Y_bool, (len(threshold), 1)).T
TP = (preds & Y_bool).sum(axis=0)
FP = (preds & ~Y_bool).sum(axis=0)
TN = (~preds & ~Y_bool).sum(axis=0)
FN = (~preds & Y_bool).sum(axis=0)
tpr = np.zeros(len(TP))
tpr[TP != 0] = TP[TP != 0]/(TP[TP != 0] + FN[TP != 0])
fpr = FP/(FP+TN)
fpr = np.array(fpr)[::-1]
tpr = np.array(tpr)[::-1]
auc = trapz(tpr, fpr)
return fpr, tpr, auc
[docs]def plot_roc(fpr, tpr, auc, labels=[], cols=[], label_size=26, tick_size=18, line_width=3, figsize=(8,6)):
"""
Plots a ROC curve or multiple curves. Can plot the results from multiple classifiers if fpr and tpr are arrays
where each column corresponds to a different classifier.
Parameters
----------
fpr : array
An array containing the false positive rate at each probability threshold
tpr : array
An array containing the true positive rate at each probability threshold
auc : float
The area under the ROC curve
labels : list, optional
Labels of each curve (e.g. ML algorithm names)
cols : list, optional
Colors of the line(s)
label_size : float, optional
Size of x and y axis labels.
tick_size: float, optional
Size of tick labels.
line_width : float, optional
Line width
"""
#Automatically fill in the colors if not supplied
if not isinstance(cols, basestring) and len(cols)==0:
cols=['#185aa9','#008c48','#ee2e2f','#f47d23','#662c91','#a21d21','#b43894','#010202']
#This should work regardless of whether it's one or many roc curves
#fig=plt.figure(figsize=figsize)
fig=plt.gcf()
ax=fig.add_subplot(111)
ax.set_color_cycle(cols)
ax.plot(fpr, tpr, lw=line_width)
# ax.plot(fpr, tpr)
ax.tick_params(axis='both', which='major', labelsize=tick_size)
ax.xaxis.set_major_locator(plt.MaxNLocator(6))
ax.yaxis.set_major_locator(plt.MaxNLocator(6))
plt.xlabel('False positive rate (contamination)', fontsize=label_size)
plt.ylabel('True positive rate (completeness)', fontsize=label_size)
# plt.xlabel('False positive rate (contamination)')
# plt.ylabel('True positive rate (completeness)')
#Robust against the possibility of AUC being a single number instead of list
if not isinstance(auc, collections.Sequence):
auc=[auc]
if len(labels)>0:
labs=[]
for i in range(len(labels)):
labs.append(labels[i]+' (%.3f)' %(auc[i]))
else:
labs=np.array(range(len(ax.lines)), dtype='str')
for i in range(len(labs)):
labs[i]=(labs[i]+' (%.3f)' %(auc[i]))
#plt.legend(labs, loc='lower right', fancybox=True, bbox_to_anchor=(0.95, 0.05), fontsize=label_size)
plt.legend(labs, loc='lower right', bbox_to_anchor=(0.95, 0.05))
plt.tight_layout()
#plt.show()
[docs]def F1(pr, Yt, true_class, full_output=False):
"""
Calculate an F1 score for many probability threshold increments
and select the best one.
Parameters
----------
pr : array
An array of probability scores, either a 1d array of size N_samples or an nd array,
in which case the column corresponding to the true class will be used.
Yt : array
An array of class labels, of size (N_samples,)
true_class : int
which class is taken to be the "true class" (e.g. Ia vs everything else)
full_output : bool, optional
If true returns two vectors corresponding to F1 as a function of threshold, instead of the best value.
Returns
-------
best_F1 : float
(If full_output=False) The largest F1 value
best_threshold : array
(If full_output=False) The probability threshold corresponding to best_F1
f1 : array
(If full_output=True) F1 as a function of threshold.
threshold : array
(If full_output=True) Vector of thresholds (from 0 to 1)
"""
probs=pr.copy()
Y_test=Yt.copy()
min_class=Y_test.min() #This is to deal with starting class assignment at 1.
Y_test = Y_test.squeeze()
if len(pr.shape)>1:
probs_1=probs[:, true_class-min_class]
else:
probs_1=probs
threshold = np.arange(0, 1, 0.01)
#This creates an array where each column is the prediction for each threshold
preds=np.tile(probs_1, (len(threshold), 1)).T>np.tile(threshold, (len(probs_1), 1))
Y_bool=(Y_test==true_class)
Y_bool=np.tile(Y_bool, (len(threshold), 1)).T
TP=(preds & Y_bool).sum(axis=0)
FP=(preds & ~Y_bool).sum(axis=0)
TN=(~preds & ~Y_bool).sum(axis=0)
FN=(~preds & Y_bool).sum(axis=0)
f1=np.zeros(len(TP))
f1[TP!=0]=2*TP[TP!=0]/(2*TP[TP!=0]+FN[TP!=0]+FP[TP!=0])
if full_output:
return f1, threshold
else:
best_F1 = f1.max()
best_threshold_index = np.argmax(f1)
best_threshold = threshold[best_threshold_index]
return best_F1, best_threshold
[docs]def FoM(pr, Yt, true_class=1, full_output=False):
"""
Calculate a Kessler FoM for many probability threshold increments
and select the best one.
FoM is defined as:
FoM = TP^2/((TP+FN)(TP+3*FP))
Parameters
----------
pr : array
An array of probability scores, either a 1d array of size N_samples or an nd array,
in which case the column corresponding to the true class will be used.
Yt : array
An array of class labels, of size (N_samples,)
true_class : int
which class is taken to be the "true class" (e.g. Ia vs everything else)
full_output : bool, optional
If true returns two vectors corresponding to F1 as a function of threshold, instead of the best value.
Returns
-------
best_FoM : float
(If full_output=False) The largest FoM value
best_threshold : array
(If full_output=False) The probability threshold corresponding to best_FoM
fom : array
(If full_output=True) FoM as a function of threshold.
threshold : array
(If full_output=True) Vector of thresholds (from 0 to 1)
"""
weight = 3.0
probs=pr.copy()
Y_test=Yt.copy()
min_class=Y_test.min() #This is to deal with starting class assignment at 1.
Y_test = Y_test.squeeze()
if len(pr.shape)>1:
probs_1=probs[:, true_class-min_class]
else:
probs_1=probs
threshold = np.arange(0, 1, 0.01)
#This creates an array where each column is the prediction for each threshold
preds=np.tile(probs_1, (len(threshold), 1)).T>np.tile(threshold, (len(probs_1), 1))
Y_bool=(Y_test==true_class)
Y_bool=np.tile(Y_bool, (len(threshold), 1)).T
TP=(preds & Y_bool).sum(axis=0)
FP=(preds & ~Y_bool).sum(axis=0)
TN=(~preds & ~Y_bool).sum(axis=0)
FN=(~preds & Y_bool).sum(axis=0)
fom=np.zeros(len(TP))
fom[TP!=0]=TP[TP!=0]**2/(TP[TP!=0]+FN[TP!=0])/(TP[TP!=0]+weight*FP[TP!=0])
if full_output:
return fom, threshold
else:
best_FoM = fom.max()
best_threshold_index = np.argmax(fom)
best_threshold = threshold[best_threshold_index]
return best_FoM, best_threshold
[docs]class OptimisedClassifier():
"""Implements an optimised classifier (although it can be run without optimisation). Equipped with interfaces to several
sklearn classes and functions.
"""
NB_param_dict = {}
KNN_param_dict = {'n_neighbors':list(range(1, 180, 5)), 'weights':['distance']}
SVM_param_dict = {'C':np.logspace(-2, 5, 5), 'gamma':np.logspace(-8, 3, 5)}
NN_param_dict={'hidden_layer_sizes':[(l,) for l in range(80, 120, 5)]}
DT_param_dict={'criterion':['gini','entropy'],'min_samples_leaf':list(range(1,400,25))}
RF_param_dict = {'n_estimators':list(range(200, 900, 100)), 'criterion':['gini', 'entropy']}
ests=[DecisionTreeClassifier(criterion='entropy',min_samples_leaf=l) for l in range(5, 55, 10)]
Boost_param_dict={'base_estimator':ests,'n_estimators':list(range(5, 85, 10))}
#This is a strange boosted random forest classifier that Max came up that works quite well, but is likely biased
#in general
Boost_RF_param_dict = {'base_estimator':[RandomForestClassifier(400, 'entropy'),
RandomForestClassifier(600, 'entropy')],'n_estimators':list([2, 3, 5, 10])}
#Dictionary to hold good default ranges for parameters for each classifier.
params={'svm':SVM_param_dict, 'knn':KNN_param_dict, 'decision_tree':DT_param_dict,'random_forest':RF_param_dict,
'boost_dt':Boost_param_dict, 'boost_rf':Boost_RF_param_dict, 'nb':NB_param_dict, 'neural_network':NN_param_dict}
def __init__(self, classifier, optimise=True, **kwargs):
"""
Wrapper around sklearn classifiers
Parameters
----------
classifier : str or sklearn.Classifier
Either a string (one of choice_of_classifiers) or sklearn classifier object
optimise : bool, optional
Whether or not to optimise the parameters
kwargs : optional
Keyword arguments passed directly to the classifier
"""
self.classifier_name=classifier
if isinstance(classifier, basestring):
#Choose from available classifiers, with default parameters best for SNe (unless overridden)
if 'probability' in kwargs:
self.prob=kwargs.pop('probability')
else:
self.prob=True #By default we want to always return probabilities. User can override this.
try:
if classifier=='svm':
if 'kernel' in kwargs:
kern=kwargs.pop('kernel') #Removes this from kwargs
else:
kern='rbf'
self.clf=svm.SVC(kernel=kern, probability = self.prob, **kwargs)
elif classifier=='knn':
if 'n_neighbours' in kwargs:
n=kwargs.pop('n_neighbours')
else:
n=5
if 'weights' in kwargs:
wgts=kwargs.pop('weights')
else:
wgts='distance'
self.clf=neighbors.KNeighborsClassifier(n_neighbors = n, weights=wgts, **kwargs)
elif classifier=='random_forest':
self.clf = RandomForestClassifier(**kwargs)
elif classifier=='decision_tree':
self.clf = DecisionTreeClassifier(**kwargs)
elif classifier=='boost_dt' or classifier=='boost_rf':
self.clf = AdaBoostClassifier(**kwargs)
elif classifier=='nb':
self.clf = GaussianNB(**kwargs)
elif classifier=='neural_network':
if 'hidden_layer_sizes' in kwargs:
l=kwargs.pop('hidden_layer_sizes')
else:
l=(5, 2)
if 'algorithm' in kwargs:
algo=kwargs.pop('algorithm')
else:
algo='adam'
if 'activation' in kwargs:
activation=kwargs.pop('activation')
else:
activation='tanh'
self.clf = MLPClassifier(solver=algo, hidden_layer_sizes=l,activation=activation, **kwargs)
elif classifier=='multiple_classifier':
pass
else:
print ('Requested classifier not recognised.' )
print ('Choice of built-in classifiers:')
print (choice_of_classifiers)
sys.exit(0)
except TypeError:
#Gracefully catch errors of sending the wrong kwargs to the classifiers
print()
print ('Error')
print ('One of the kwargs below:')
print (kwargs.keys())
print ('Does not belong to classifier', classifier)
print()
sys.exit(0)
else:
#This is already some sklearn classifier or an object that behaves like one
self.clf=classifier
print ('Created classifier of type:')
print (self.clf)
print()
[docs] def classify(self, X_train, y_train, X_test):
"""
Run unoptimised classifier with initial parameters.
Parameters
----------
X_train : array
Array of training features of shape (n_train,n_features)
y_train : array
Array of known classes of shape (n_train)
X_test : array
Array of validation features of shape (n_test,n_features)
Returns
-------
Yfit : array
Predicted classes for X_test
probs : array
(If self.prob=True) Probability for each object to belong to each class.
"""
self.clf.fit(X_train, y_train)
if self.prob: #Probabilities requested
probs=self.clf.predict_proba(X_test)
Yfit=probs.argmax(axis=1) #Each object is assigned the class with highest probability
classes=np.unique(y_train)
classes.sort()
Yfit=classes[Yfit]
return Yfit, probs
else:
Yfit=self.clf.predict(X_test)
#scr=self.clf.score(X_test, y_test)
return Yfit
def __custom_auc_score(self, estimator, X, Y):
"""
Custom scoring method for use with GridSearchCV.
Parameters
----------
estimator : sklearn.Classifier
The current classifier (used by GridSearchCV).
X : array
Array of training features of shape (n_train,n_features).
Y : array
Array of known classes of shape (n_train)
Returns
-------
auc : float
AUC score
"""
probs=estimator.predict_proba(X)
fpr, tpr, auc=roc(probs, Y, true_class=1)
return auc
[docs] def optimised_classify(self, X_train, y_train, X_test, **kwargs):
"""
Run optimised classifier using grid search with cross validation to choose optimal classifier parameters.
Parameters
----------
X_train : array
Array of training features of shape (n_train,n_features)
y_train : array
Array of known classes of shape (n_train)
X_test : array
Array of validation features of shape (n_test,n_features)
params : dict, optional
Allows the user to specify which parameters and over what ranges to optimise. If not set,
defaults will be used.
true_class : int, optional
The class determined to be the desired class (e.g. Ias, which might correspond to class 1). This allows
the user to optimise for different classes (based on ROC curve AUC).
Returns
-------
Yfit : array
Predicted classes for X_test
probs : array
(If self.prob=True) Probability for each object to belong to each class.
"""
if 'params' in kwargs:
params=kwargs['params']
else:
params=self.params[self.classifier_name]
# if self.classifier_name=='svm':
# n_features=len(X_train[0, :]) #Update now the number of features is known
# params['gamma']=[1/(n_features**2), 1/n_features, 1/np.sqrt(n_features)]
if 'true_class' in kwargs:
self.true_class=kwargs['true_class']
else:
self.true_class=1
self.clf=grid_search.GridSearchCV(self.clf, params, scoring=self.__custom_auc_score, cv=5)
self.clf.fit(X_train, y_train) #This actually does the grid search
best_params=self.clf.best_params_
print ('Optimised parameters:', best_params)
for k in best_params.keys():
#This is the safest way to check if something is a number
try:
float(best_params[k])
if best_params[k]<=min(params[k]):
print()
print('WARNING: Lower boundary on parameter', k, 'may be too high. Optimum may not have been reached.')
print()
elif best_params[k]>=max(params[k]):
print()
print('WARNING: Upper boundary on parameter', k, 'may be too low. Optimum may not have been reached.')
print()
except (ValueError, TypeError):
pass #Ignore a parameter that isn't numerical
if self.prob: #Probabilities requested
probs=self.clf.predict_proba(X_test)
Yfit=probs.argmax(axis=1).tolist() #Each object is assigned the class with highest probability
classes=np.unique(y_train)
classes.sort()
Yfit=classes[Yfit]
return Yfit, probs
else:
Yfit=self.clf.predict(X_test)
return Yfit
def __call_classifier(classifier, X_train, y_train, X_test, param_dict,return_classifier):
"""Specifically designed to run with multiprocessing"""
c=OptimisedClassifier(classifier)
if classifier in param_dict.keys():
y_fit, probs=c.optimised_classify(X_train, y_train, X_test,params=param_dict[classifier])
else:
y_fit, probs=c.optimised_classify(X_train, y_train, X_test)
if return_classifier:
return probs,c.clf #Returns the best fitting sklearn classifier object
else:
return probs
[docs]def run_pipeline(features,types,output_name='',columns=[],classifiers=['nb','knn','svm','neural_network','boost_dt'],
training_set=0.7, param_dict={}, nprocesses=1, scale=True,
plot_roc_curve=True,return_classifier=False):
"""
Utility function to classify a dataset with a number of classification methods. This does assume your test
set has known values to compare against. Returns, if requested, the classifier objects to run on future test sets.
Parameters
----------
features : astropy.table.Table or array
Features either in the form of a table or array
types : astropy.table.Table or array
Classes, either in the form of a table or array
output_name : str, optional
Full root path and name for output (e.g. '<output_path>/salt2-run-')
columns : list, optional
If you want to run a subset of columns
classifiers : list, optional
Which available ML classifiers to use
training_set : float or list, optional
If a float, this is the fraction of objects that will be used as training set. If a list, it's assumed these are
the ID's of the objects to be used
param_dict : dict, optional
Use to run different ranges of hyperparameters for the classifiers when optimising
nprocesses : int, optional
Number of processors for multiprocessing (shared memory only). Each classifier will then be run in parallel.
scale : bool, optional
Rescale features using sklearn's preprocessing Scalar class (highly recommended this is True)
plot_roc_curve : bool, optional
Whether or not to plot the ROC curve at the end
return_classifier : bool, optional
Whether or not to return the actual classifier objects (due to the limitations of multiprocessing, this can't
be done in parallel at the moment).
Returns
-------
dict
(If return_classifier=True) Dictionary of fitted sklearn Classifier objects
"""
t1= time.time()
if isinstance(features,Table):
#The features are in astropy table format and must be converted to a numpy array before passing to sklearn
#We need to make sure we match the correct Y values to X values. The safest way to do this is to make types an
#astropy table as well.
if not isinstance(types,Table):
types=Table(data=[features['Object'],types],names=['Object','Type'])
feats=join(features,types,'Object')
if len(columns)==0:
columns=feats.columns[1:-1]
#Split into training and validation sets
if np.isscalar(training_set):
objs=feats['Object']
objs=np.random.permutation(objs)
training_set=objs[:(int)(training_set*len(objs))]
#Otherwise a training set has already been provided as a list of Object names and we can continue
feats_train=feats[np.in1d(feats['Object'],training_set)]
feats_test=feats[~np.in1d(feats['Object'],training_set)]
X_train=np.array([feats_train[c] for c in columns]).T
y_train=np.array(feats_train['Type'])
X_test=np.array([feats_test[c] for c in columns]).T
y_test=np.array(feats_test['Type'])
else:
#Otherwise the features are already in the form of a numpy array
if np.isscalar(training_set):
inds=np.random.permutation(range(len(features)))
train_inds=inds[:(int)(len(inds)*training_set)]
test_inds=inds[(int)(len(inds)*training_set):]
else:
#We assume the training set has been provided as indices
train_inds=training_set
test_inds=range(len(types))[~np.in1d(range(len(types)),training_set)]
X_train=features[train_inds]
y_train=types[train_inds]
X_test=features[test_inds]
y_test=types[test_inds]
#Rescale the data (highly recommended)
if scale:
scaler = StandardScaler()
scaler.fit(np.vstack((X_train, X_test)))
X_train = scaler.transform(X_train)
X_test = scaler.transform(X_test)
probabilities={}
classifier_objects={}
if nprocesses>1 and return_classifier:
print ("Due to limitations with python's multiprocessing module, classifier objects cannot be returned if " \
"multiple processors are used. Continuing serially...")
print()
if nprocesses>1 and not return_classifier:
partial_func=partial(__call_classifier,X_train=X_train, y_train=y_train, X_test=X_test,
param_dict=param_dict,return_classifier=False)
p=Pool(nprocesses, maxtasksperchild=1)
result=p.map(partial_func,classifiers)
for i in range(len(result)):
cls=classifiers[i]
probabilities[cls]=result[i]
else:
for cls in classifiers:
retval = __call_classifier(cls, X_train, y_train, X_test, param_dict,return_classifier)
if return_classifier:
probabilities[cls] = retval[0]
classifier_objects[cls] = retval[1]
else:
probabilities[cls] = retval[0]
for i in range(len(classifiers)):
cls=classifiers[i]
probs=probabilities[cls]
fpr, tpr, auc=roc(probs, y_test, true_class=1)
fom, thresh_fom=FoM(probs, y_test, true_class=1, full_output=False)
print ('Classifier', cls+':', 'AUC =', auc, 'FoM =', fom)
if i==0:
FPR=fpr
TPR=tpr
AUC=[auc]
else:
FPR=np.column_stack((FPR, fpr))
TPR=np.column_stack((TPR, tpr))
AUC.append(auc)
if len(output_name)!=0:#Only save if an output directory is supplied
typs=np.unique(y_train)
typs.sort()
typs=np.array(typs,dtype='str').tolist()
if isinstance(features,Table):
index_column=feats_test['Object']
else:
index_column=np.array(test_inds,dtype='str')
dat=np.column_stack((index_column,probs))
nms=['Object']+typs
tab=Table(dat,dtype=['S64']+['f']*probs.shape[1],names=nms)
flname=output_name+cls+'.probs'
tab.write(flname,format='ascii')
tab=Table(np.column_stack((fpr, tpr)),names=['FPR','TPR'])
tab.write(output_name+cls+'.roc',format='ascii')
np.savetxt(output_name+cls+'.auc',[auc])
print()
print ('Time taken ', (time.time()-t1)/60., 'minutes')
if plot_roc_curve:
plot_roc(FPR, TPR, AUC, labels=classifiers,label_size=16,tick_size=12,line_width=1.5)
plt.show()
if return_classifier:
return classifier_objects