# Copyright (c) PyWhy contributors. All rights reserved.
# Licensed under the MIT License.
"""Collection of scikit-learn extensions for model selection techniques."""
from inspect import signature
import inspect
import numbers
from typing import List, Optional
import warnings
import abc
import numpy as np
from collections.abc import Iterable
import scipy.sparse as sp
import sklearn
from joblib import Parallel, delayed
from sklearn.base import BaseEstimator, clone, is_classifier
from sklearn.ensemble import (GradientBoostingClassifier, GradientBoostingRegressor,
RandomForestClassifier, RandomForestRegressor)
from sklearn.exceptions import FitFailedWarning
from sklearn.linear_model import (ElasticNet, ElasticNetCV, Lasso, LassoCV, MultiTaskElasticNet, MultiTaskElasticNetCV,
MultiTaskLasso, MultiTaskLassoCV, Ridge, RidgeCV, RidgeClassifier, RidgeClassifierCV,
LogisticRegression, LogisticRegressionCV)
from sklearn.model_selection import (BaseCrossValidator, GridSearchCV, GroupKFold, KFold,
RandomizedSearchCV, StratifiedKFold,
check_cv)
# TODO: conisder working around relying on sklearn implementation details
from sklearn.model_selection._validation import (_check_is_permutation,
_fit_and_predict)
from sklearn.neural_network import MLPClassifier, MLPRegressor
from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.preprocessing import LabelEncoder, PolynomialFeatures, StandardScaler
from sklearn.utils import check_random_state, indexable
from sklearn.utils.multiclass import type_of_target
from sklearn.utils.validation import _num_samples
from .linear_model import WeightedLassoCVWrapper, WeightedLassoWrapper
def _split_weighted_sample(self, X, y, sample_weight, is_stratified=False):
random_state = self.random_state if self.shuffle else None
if is_stratified:
kfold_model = StratifiedKFold(n_splits=self.n_splits, shuffle=self.shuffle,
random_state=random_state)
else:
kfold_model = KFold(n_splits=self.n_splits, shuffle=self.shuffle,
random_state=random_state)
if sample_weight is None:
return kfold_model.split(X, y)
else:
random_state = self.random_state
kfold_model.shuffle = True
kfold_model.random_state = random_state
weights_sum = np.sum(sample_weight)
max_deviations = []
all_splits = []
for _ in range(self.n_trials + 1):
splits = [test for (train, test) in list(kfold_model.split(X, y))]
weight_fracs = np.array([np.sum(sample_weight[split]) / weights_sum for split in splits])
if np.all(weight_fracs > .95 / self.n_splits):
# Found a good split, return.
return self._get_folds_from_splits(splits, X.shape[0])
# Record all splits in case the stratification by weight yeilds a worse partition
all_splits.append(splits)
max_deviation = np.max(np.abs(weight_fracs - 1 / self.n_splits))
max_deviations.append(max_deviation)
# Reseed random generator and try again
if isinstance(kfold_model.random_state, numbers.Integral):
kfold_model.random_state = kfold_model.random_state + 1
elif kfold_model.random_state is not None:
kfold_model.random_state = np.random.RandomState(kfold_model.random_state.randint(np.iinfo(np.int32).max))
# If KFold fails after n_trials, we try the next best thing: stratifying by weight groups
warnings.warn("The KFold algorithm failed to find a weight-balanced partition after " +
"{n_trials} trials. Falling back on a weight stratification algorithm.".format(
n_trials=self.n_trials), UserWarning)
if is_stratified:
stratified_weight_splits = [[]] * self.n_splits
for y_unique in np.unique(y.flatten()):
class_inds = np.argwhere(y == y_unique).flatten()
class_splits = self._get_splits_from_weight_stratification(sample_weight[class_inds])
stratified_weight_splits = [split + list(class_inds[class_split]) for split, class_split in zip(
stratified_weight_splits, class_splits)]
else:
stratified_weight_splits = self._get_splits_from_weight_stratification(sample_weight)
weight_fracs = np.array([np.sum(sample_weight[split]) / weights_sum for split in stratified_weight_splits])
if np.all(weight_fracs > .95 / self.n_splits):
# Found a good split, return.
return self._get_folds_from_splits(stratified_weight_splits, X.shape[0])
else:
# Did not find a good split
# Record the devaiation for the weight-stratified split to compare with KFold splits
all_splits.append(stratified_weight_splits)
max_deviation = np.max(np.abs(weight_fracs - 1 / self.n_splits))
max_deviations.append(max_deviation)
# Return most weight-balanced partition
min_deviation_index = np.argmin(max_deviations)
return self._get_folds_from_splits(all_splits[min_deviation_index], X.shape[0])
[docs]class WeightedKFold:
"""K-Folds cross-validator for weighted data.
Provides train/test indices to split data in train/test sets.
Split dataset into k folds of roughly equal size and equal total weight.
The default is to try sklearn.model_selection.KFold a number of trials to find
a weight-balanced k-way split. If it cannot find such a split, it will fall back
onto a more rigorous weight stratification algorithm.
Parameters
----------
n_splits : int, default 3
Number of folds. Must be at least 2.
n_trials : int, default 10
Number of times to try sklearn.model_selection.KFold before falling back to another
weight stratification algorithm.
shuffle : bool, optional
Whether to shuffle the data before splitting into batches.
random_state : int, RandomState instance, or None, default None
If int, random_state is the seed used by the random number generator;
If :class:`~numpy.random.mtrand.RandomState` instance, random_state is the random number generator;
If None, the random number generator is the :class:`~numpy.random.mtrand.RandomState` instance used
by :mod:`np.random<numpy.random>`. Used when ``shuffle`` == True.
"""
[docs] def __init__(self, n_splits=3, n_trials=10, shuffle=False, random_state=None):
self.n_splits = n_splits
self.shuffle = shuffle
self.n_trials = n_trials
self.random_state = random_state
[docs] def split(self, X, y, sample_weight=None):
"""Generate indices to split data into training and test set.
Parameters
----------
X : array_like, shape (n_samples, n_features)
Training data, where n_samples is the number of samples
and n_features is the number of features.
y : array_like, shape (n_samples,)
The target variable for supervised learning problems.
sample_weight : array_like, shape (n_samples,)
Weights associated with the training data.
"""
return _split_weighted_sample(self, X, y, sample_weight, is_stratified=False)
[docs] def get_n_splits(self, X, y, groups=None):
"""Return the number of splitting iterations in the cross-validator.
Parameters
----------
X : object
Always ignored, exists for compatibility.
y : object
Always ignored, exists for compatibility.
groups : object
Always ignored, exists for compatibility.
Returns
-------
n_splits : int
Returns the number of splitting iterations in the cross-validator.
"""
return self.n_splits
def _get_folds_from_splits(self, splits, sample_size):
folds = []
sample_indices = np.arange(sample_size)
for it in range(self.n_splits):
folds.append([np.setdiff1d(sample_indices, splits[it], assume_unique=True), splits[it]])
return folds
def _get_splits_from_weight_stratification(self, sample_weight):
# Weight stratification algorithm
# Sort weights for weight strata search
random_state = check_random_state(self.random_state)
sorted_inds = np.argsort(sample_weight)
sorted_weights = sample_weight[sorted_inds]
max_split_size = sorted_weights.shape[0] // self.n_splits
max_divisible_length = max_split_size * self.n_splits
sorted_inds_subset = np.reshape(sorted_inds[:max_divisible_length], (max_split_size, self.n_splits))
shuffled_sorted_inds_subset = np.apply_along_axis(random_state.permutation, axis=1, arr=sorted_inds_subset)
splits = [list(shuffled_sorted_inds_subset[:, i]) for i in range(self.n_splits)]
if max_divisible_length != sorted_weights.shape[0]:
# There are some leftover indices that have yet to be assigned
subsample = sorted_inds[max_divisible_length:]
if self.shuffle:
random_state.shuffle(subsample)
new_splits = np.array_split(subsample, self.n_splits)
random_state.shuffle(new_splits)
# Append stratum splits to overall splits
splits = [split + list(new_split) for split, new_split in zip(splits, new_splits)]
return splits
[docs]class WeightedStratifiedKFold(WeightedKFold):
"""Stratified K-Folds cross-validator for weighted data.
Provides train/test indices to split data in train/test sets.
Split dataset into k folds of roughly equal size and equal total weight.
The default is to try sklearn.model_selection.StratifiedKFold a number of trials to find
a weight-balanced k-way split. If it cannot find such a split, it will fall back
onto a more rigorous weight stratification algorithm.
Parameters
----------
n_splits : int, default 3
Number of folds. Must be at least 2.
n_trials : int, default 10
Number of times to try sklearn.model_selection.StratifiedKFold before falling back to another
weight stratification algorithm.
shuffle : bool, optional
Whether to shuffle the data before splitting into batches.
random_state : int, RandomState instance, or None, default None
If int, random_state is the seed used by the random number generator;
If :class:`~numpy.random.mtrand.RandomState` instance, random_state is the random number generator;
If None, the random number generator is the :class:`~numpy.random.mtrand.RandomState` instance used
by :mod:`np.random<numpy.random>`. Used when ``shuffle`` == True.
"""
[docs] def split(self, X, y, sample_weight=None):
"""Generate indices to split data into training and test set.
Parameters
----------
X : array_like, shape (n_samples, n_features)
Training data, where n_samples is the number of samples
and n_features is the number of features.
y : array_like, shape (n_samples,)
The target variable for supervised learning problems.
sample_weight : array_like, shape (n_samples,)
Weights associated with the training data.
"""
return _split_weighted_sample(self, X, y, sample_weight, is_stratified=True)
[docs] def get_n_splits(self, X, y, groups=None):
"""Return the number of splitting iterations in the cross-validator.
Parameters
----------
X : object
Always ignored, exists for compatibility.
y : object
Always ignored, exists for compatibility.
groups : object
Always ignored, exists for compatibility.
Returns
-------
n_splits : int
Returns the number of splitting iterations in the cross-validator.
"""
return self.n_splits
class ModelSelector(metaclass=abc.ABCMeta):
"""
This class enables a two-stage fitting process, where first a model is selected
by calling `train` with `is_selecting=True`, and then the selected model is fit (presumably
on a different data set) by calling train with `is_selecting=False`.
"""
@abc.abstractmethod
def train(self, is_selecting: bool, folds: Optional[List], *args, **kwargs):
"""
Either selects a model or fits a model, depending on the value of `is_selecting`.
If `is_selecting` is `False`, then `folds` should not be provided because they are only during selection.
"""
raise NotImplementedError("Abstract method")
@abc.abstractmethod
def predict(self, *args, **kwargs):
"""
Predicts using the selected model; should not be called until after `train` has been used
both to select a model and to fit it.
"""
raise NotImplementedError("Abstract method")
@abc.abstractmethod
def score(self, *args, **kwargs):
"""
Gets the score of the selected model on the given data; should not be called until after `train` has been used
both to select a model and to fit it.
"""
raise NotImplementedError("Abstract method")
class SingleModelSelector(ModelSelector):
"""
A model selection class that selects a single best model;
this encompasses random search, grid search, ensembling, etc.
"""
@property
@abc.abstractmethod
def best_model(self):
raise NotImplementedError("Abstract method")
@property
@abc.abstractmethod
def best_score(self):
raise NotImplementedError("Abstract method")
def predict(self, *args, **kwargs):
return self.best_model.predict(*args, **kwargs)
# only expose predict_proba if best_model has predict_proba
# used because logic elsewhere uses hasattr predict proba to check if model is a classifier
def __getattr__(self, name):
if name == 'predict_proba':
return getattr(self.best_model, name)
else:
self.__getattribute__(name)
def score(self, *args, **kwargs):
if hasattr(self.best_model, 'score'):
return self.best_model.score(*args, **kwargs)
else:
return None
def _fit_with_groups(model, X, y, *, sub_model=None, groups, **kwargs):
"""
Fits a model while correctly handling grouping if necessary.
This enables us to perform an inner-loop cross-validation of a model
which handles grouping correctly, which is not easy using typical sklearn models.
For example, GridSearchCV and RandomSearchCV both support passing `groups` to fit,
but other CV-related estimators (e.g. LassoCV) do not, which means that GroupKFold
cannot be used as the cv instance, because the `groups` argument will never be passed through
to GroupKFold's `split` method.
The hacky workaround here is to explicitly set the `cv` attribute to the set of
rows that GroupKFold would have generated rather than using GroupKFold as the cv instance.
"""
if groups is not None:
if sub_model is None:
sub_model = model
if hasattr(sub_model, 'cv'):
old_cv = sub_model.cv
# logic copied from check_cv
cv = 5 if old_cv is None else old_cv
if isinstance(cv, numbers.Integral):
cv = GroupKFold(cv)
# otherwise we will assume the user already set the cv attribute to something
# compatible with splitting with a `groups` argument
splits = list(cv.split(X, y, groups=groups))
try:
sub_model.cv = splits
return model.fit(X, y, **kwargs) # drop groups from arg list
finally:
sub_model.cv = old_cv
# drop groups from arg list, which were already used at the outer level and may not be supported by the model
return model.fit(X, y, **kwargs)
class FixedModelSelector(SingleModelSelector):
"""
Model selection class that always selects the given sklearn-compatible model
"""
def __init__(self, model, score_during_selection):
self.model = clone(model, safe=False)
self.score_during_selection = score_during_selection
def train(self, is_selecting, folds: Optional[List], X, y, groups=None, **kwargs):
if is_selecting:
if self.score_during_selection:
# the score needs to be compared to another model's
# so we don't need to fit the model itself on all of the data, just get the out-of-sample score
assert hasattr(self.model, 'score'), (f"Can't select between a fixed {type(self.model)} model "
"and others because it doesn't have a score method")
scores = []
for train, test in folds:
# use _fit_with_groups instead of just fit to handle nested grouping
_fit_with_groups(self.model, X[train], y[train],
groups=None if groups is None else groups[train],
**{key: val[train] for key, val in kwargs.items()})
scores.append(self.model.score(X[test], y[test]))
self._score = np.mean(scores)
else:
# we need to train the model on the data
_fit_with_groups(self.model, X, y, groups=groups, **kwargs)
return self
@property
def best_model(self):
return self.model
@property
def best_score(self):
if hasattr(self, '_score'):
return self._score
else:
raise ValueError("No score was computed during selection")
def _copy_to(m1, m2, attrs, insert_underscore=False):
for attr in attrs:
setattr(m2, attr, getattr(m1, attr + "_" if insert_underscore else attr))
def _convert_linear_model(model, new_cls):
new_model = new_cls()
# copy common parameters
_copy_to(model, new_model, ["fit_intercept"])
# copy common fitted variables
_copy_to(model, new_model, ["coef_", "intercept_", "n_features_in_"])
return new_model
def _to_logisticRegression(model: LogisticRegressionCV):
lr = _convert_linear_model(model, LogisticRegression)
_copy_to(model, lr, ["penalty", "dual", "intercept_scaling",
"class_weight",
"solver", "multi_class",
"verbose", "n_jobs",
"tol", "max_iter", "random_state", "n_iter_"])
_copy_to(model, lr, ["classes_"])
_copy_to(model, lr, ["C", "l1_ratio"], True) # these are arrays in LogisticRegressionCV, need to convert them next
# make sure all classes agree on best c/l1 combo
assert np.isclose(lr.C, lr.C.flatten()[0]).all()
assert np.equal(lr.l1_ratio, None).all() or np.isclose(lr.l1_ratio, lr.l1_ratio.flatten()[0]).all()
lr.C = lr.C[0]
lr.l1_ratio = lr.l1_ratio[0]
avg_scores = np.average([v for k, v in model.scores_.items()], axis=1) # average over folds
best_scores = np.max(avg_scores, axis=tuple(range(1, avg_scores.ndim))) # average score of best c/l1 combo
assert np.isclose(best_scores, best_scores.flatten()[0]).all() # make sure all folds agree on best c/l1 combo
return lr, best_scores[0]
def _convert_linear_regression(model, new_cls, extra_attrs=["positive"]):
new_model = _convert_linear_model(model, new_cls)
_copy_to(model, new_model, ["alpha"], True)
return new_model
def _to_elasticNet(model: ElasticNetCV, args, kwargs, is_lasso=False, cls=None, extra_attrs=[]):
# We need an R^2 score to compare to other models; ElasticNetCV doesn't provide it,
# but we can calculate it ourselves from the MSE plus the variance of the target y
y = signature(model.fit).bind(*args, **kwargs).arguments["y"]
cls = cls or (Lasso if is_lasso else ElasticNet)
new_model = _convert_linear_regression(model, cls, extra_attrs + ['selection', 'warm_start', 'dual_gap_',
'tol', 'max_iter', 'random_state', 'n_iter_',
'copy_X'])
if not is_lasso:
# l1 ratio doesn't apply to Lasso, only ElasticNet
_copy_to(model, new_model, ["l1_ratio"], True)
# max R^2 corresponds to min MSE
min_mse = np.min(np.mean(model.mse_path_, axis=-1)) # last dimension in mse_path is folds, so average over that
r2 = 1 - min_mse / np.var(y) # R^2 = 1 - MSE / Var(y)
return new_model, r2
def _to_ridge(model, cls=Ridge, extra_attrs=["positive"]):
ridge = _convert_linear_regression(model, cls, extra_attrs + ["_normalize", "solver"])
best_score = model.best_score_
return ridge, best_score
class SklearnCVSelector(SingleModelSelector):
"""
Wraps one of sklearn's CV classes in the ModelSelector interface
"""
def __init__(self, searcher):
self.searcher = clone(searcher)
@staticmethod
def convertible_types():
return {GridSearchCV, RandomizedSearchCV} | SklearnCVSelector._model_mapping().keys()
@staticmethod
def can_wrap(model):
if isinstance(model, Pipeline):
return SklearnCVSelector.can_wrap(model.steps[-1][1])
return any(isinstance(model, model_type) for model_type in SklearnCVSelector.convertible_types())
@staticmethod
def _model_mapping():
return {LogisticRegressionCV: lambda model, _args, _kwargs: _to_logisticRegression(model),
ElasticNetCV: lambda model, args, kwargs: _to_elasticNet(model, args, kwargs),
LassoCV: lambda model, args, kwargs: _to_elasticNet(model, args, kwargs, True, None, ["positive"]),
RidgeCV: lambda model, _args, _kwargs: _to_ridge(model),
RidgeClassifierCV: lambda model, _args, _kwargs: _to_ridge(model, RidgeClassifier,
["positive", "class_weight",
"_label_binarizer"]),
MultiTaskElasticNetCV: lambda model, args, kwargs: _to_elasticNet(model, args, kwargs,
False, MultiTaskElasticNet,
extra_attrs=[]),
MultiTaskLassoCV: lambda model, args, kwargs: _to_elasticNet(model, args, kwargs,
True, MultiTaskLasso, extra_attrs=[]),
WeightedLassoCVWrapper: lambda model, args, kwargs: _to_elasticNet(model, args, kwargs,
True, WeightedLassoWrapper,
extra_attrs=[]),
}
@staticmethod
def _convert_model(model, args, kwargs):
if isinstance(model, Pipeline):
name, inner_model = model.steps[-1]
best_model, score = SklearnCVSelector._convert_model(inner_model, args, kwargs)
return Pipeline(steps=[*model.steps[:-1], (name, best_model)]), score
if isinstance(model, GridSearchCV) or isinstance(model, RandomizedSearchCV):
return model.best_estimator_, model.best_score_
for known_type in SklearnCVSelector._model_mapping().keys():
if isinstance(model, known_type):
converter = SklearnCVSelector._model_mapping()[known_type]
return converter(model, args, kwargs)
def train(self, is_selecting: bool, folds: Optional[List], *args, groups=None, **kwargs):
if is_selecting:
sub_model = self.searcher
if isinstance(self.searcher, Pipeline):
sub_model = self.searcher.steps[-1][1]
init_params = inspect.signature(sub_model.__init__).parameters
if 'cv' in init_params:
default_cv = init_params['cv'].default
else:
# constructor takes cv as a positional or kwarg, just pull it out of a new instance
default_cv = type(sub_model)().cv
if sub_model.cv != default_cv:
warnings.warn(f"Model {sub_model} has a non-default cv attribute, which will be ignored")
sub_model.cv = folds
self.searcher.fit(*args, **kwargs)
self._best_model, self._best_score = self._convert_model(self.searcher, args, kwargs)
else:
self.best_model.fit(*args, **kwargs)
return self
@property
def best_model(self):
return self._best_model
@property
def best_score(self):
return self._best_score
class ListSelector(SingleModelSelector):
"""
Model selection class that selects the best model from a list of model selectors
Parameters
----------
models : list of ModelSelector
The list of model selectors to choose from
unwrap : bool, default True
Whether to return the best model's best model, rather than just the outer best model selector
"""
def __init__(self, models, unwrap=True):
self.models = [clone(model, safe=False) for model in models]
self.unwrap = unwrap
def train(self, is_selecting, folds: Optional[List], *args, **kwargs):
assert len(self.models) > 0, "ListSelector must have at least one model"
if is_selecting:
scores = []
for model in self.models:
model.train(is_selecting, folds, *args, **kwargs)
scores.append(model.best_score)
self._all_scores = scores
self._best_score = np.max(scores)
self._best_model = self.models[np.argmax(scores)]
else:
self._best_model.train(is_selecting, folds, *args, **kwargs)
@property
def best_model(self):
"""
Gets the best model; note that if we were selecting over SingleModelSelectors and `unwrap` is `False`,
we will return the SingleModelSelector instance, not its best model.
"""
return self._best_model.best_model if self.unwrap else self._best_model
@property
def best_score(self):
return self._best_score
def get_selector(input, is_discrete, *, random_state=None, cv=None, wrapper=GridSearchCV, needs_scoring=False):
named_models = {
'linear': (LogisticRegressionCV(random_state=random_state, cv=cv) if is_discrete
else WeightedLassoCVWrapper(random_state=random_state, cv=cv)),
'poly': ([make_pipeline(PolynomialFeatures(d),
(LogisticRegressionCV(random_state=random_state, cv=cv) if is_discrete
else WeightedLassoCVWrapper(random_state=random_state, cv=cv)))
for d in range(1, 4)]),
'forest': (GridSearchCV(RandomForestClassifier(random_state=random_state) if is_discrete
else RandomForestRegressor(random_state=random_state),
param_grid={}, cv=cv)),
'gbf': (GridSearchCV(GradientBoostingClassifier(random_state=random_state) if is_discrete
else GradientBoostingRegressor(random_state=random_state),
param_grid={}, cv=cv)),
'nnet': (GridSearchCV(MLPClassifier(random_state=random_state) if is_discrete
else MLPRegressor(random_state=random_state),
param_grid={}, cv=cv)),
'automl': ["poly", "forest", "gbf", "nnet"],
}
if isinstance(input, ModelSelector): # we've already got a model selector, don't need to do anything
return input
elif isinstance(input, list): # we've got a list; call get_selector on each element, then wrap in a ListSelector
models = [get_selector(model, is_discrete,
random_state=random_state, cv=cv, wrapper=wrapper,
needs_scoring=True) # we need to score to compare outputs to each other
for model in input]
return ListSelector(models)
elif isinstance(input, str): # we've got a string; look it up
if input in named_models:
return get_selector(named_models[input], is_discrete,
random_state=random_state, cv=cv, wrapper=wrapper,
needs_scoring=needs_scoring)
else:
raise ValueError(f"Unknown model type: {input}, must be one of {named_models.keys()}")
elif SklearnCVSelector.can_wrap(input):
return SklearnCVSelector(input)
else: # assume this is an sklearn-compatible model
return FixedModelSelector(input, needs_scoring)
[docs]class GridSearchCVList(BaseEstimator):
""" An extension of GridSearchCV that allows for passing a list of estimators each with their own
parameter grid and returns the best among all estimators in the list and hyperparameter in their
corresponding grid. We are only changing the estimator parameter to estimator_list and the param_grid
parameter to be a list of parameter grids. The rest of the parameters are the same as in
:meth:`~sklearn.model_selection.GridSearchCV`. See the documentation of that class
for explanation of the remaining parameters.
Parameters
----------
estimator_list : list of estimator object.
Each estimator in th list is assumed to implement the scikit-learn estimator interface.
Either estimator needs to provide a ``score`` function,
or ``scoring`` must be passed.
param_grid : list of dict or list of list of dictionaries
For each estimator, the dictionary with parameters names (`str`) as keys and lists of
parameter settings to try as values, or a list of such
dictionaries, in which case the grids spanned by each dictionary
in the list are explored. This enables searching over any sequence
of parameter settings.
"""
[docs] def __init__(self, estimator_list, param_grid_list, scoring=None,
n_jobs=None, refit=True, cv=None, verbose=0, pre_dispatch='2*n_jobs',
error_score=np.nan, return_train_score=False):
self.estimator_list = estimator_list
self.param_grid_list = param_grid_list
self.scoring = scoring
self.n_jobs = n_jobs
self.refit = refit
self.cv = cv
self.verbose = verbose
self.pre_dispatch = pre_dispatch
self.error_score = error_score
self.return_train_score = return_train_score
return
def fit(self, X, y=None, **fit_params):
self._gcv_list = [GridSearchCV(estimator, param_grid, scoring=self.scoring,
n_jobs=self.n_jobs, refit=self.refit, cv=self.cv, verbose=self.verbose,
pre_dispatch=self.pre_dispatch, error_score=self.error_score,
return_train_score=self.return_train_score)
for estimator, param_grid in zip(self.estimator_list, self.param_grid_list)]
self.best_ind_ = np.argmax([gcv.fit(X, y, **fit_params).best_score_ for gcv in self._gcv_list])
self.best_estimator_ = self._gcv_list[self.best_ind_].best_estimator_
self.best_score_ = self._gcv_list[self.best_ind_].best_score_
self.best_params_ = self._gcv_list[self.best_ind_].best_params_
return self
def predict(self, X):
return self.best_estimator_.predict(X)
def predict_proba(self, X):
return self.best_estimator_.predict_proba(X)
def _cross_val_predict(estimator, X, y=None, *, groups=None, cv=None,
n_jobs=None, verbose=0, fit_params=None,
pre_dispatch='2*n_jobs', method='predict', safe=True):
"""This is a fork from :meth:`~sklearn.model_selection.cross_val_predict` to allow for
non-safe cloning of the models for each fold.
Parameters
----------
estimator : estimator object implementing 'fit' and 'predict'
The object to use to fit the data.
X : array_like of shape (n_samples, n_features)
The data to fit. Can be, for example a list, or an array at least 2d.
y : array_like of shape (n_samples,) or (n_samples, n_outputs), \
default=None
The target variable to try to predict in the case of
supervised learning.
groups : array_like of shape (n_samples,), optional
Group labels for the samples used while splitting the dataset into
train/test set. Only used in conjunction with a "Group" :term:`cv`
instance (e.g., :class:`GroupKFold`).
cv : int, cross-validation generator or an iterable, default None
Determines the cross-validation splitting strategy.
Possible inputs for cv are:
- None, to use the default 5-fold cross validation,
- int, to specify the number of folds in a `(Stratified)KFold`,
- CV splitter,
- An iterable yielding (train, test) splits as arrays of indices.
For int/None inputs, if the estimator is a classifier and ``y`` is
either binary or multiclass, :class:`StratifiedKFold` is used. In all
other cases, :class:`KFold` is used.
Refer :ref:`User Guide <cross_validation>` for the various
cross-validation strategies that can be used here.
.. versionchanged:: 0.22
``cv`` default value if None changed from 3-fold to 5-fold.
n_jobs : int, default None
The number of CPUs to use to do the computation.
``None`` means 1 unless in a :obj:`joblib.parallel_backend` context.
``-1`` means using all processors. See :term:`Glossary <n_jobs>`
for more details.
verbose : int, default 0
The verbosity level.
fit_params : dict, defualt=None
Parameters to pass to the fit method of the estimator.
pre_dispatch : int or str, default '2*n_jobs'
Controls the number of jobs that get dispatched during parallel
execution. Reducing this number can be useful to avoid an
explosion of memory consumption when more jobs get dispatched
than CPUs can process. This parameter can be:
- None, in which case all the jobs are immediately
created and spawned. Use this for lightweight and
fast-running jobs, to avoid delays due to on-demand
spawning of the jobs
- An int, giving the exact number of total jobs that are
spawned
- A str, giving an expression as a function of n_jobs,
as in '2*n_jobs'
method : str, default 'predict'
Invokes the passed method name of the passed estimator. For
method='predict_proba', the columns correspond to the classes
in sorted order.
safe : bool, default True
Whether to clone with safe option.
Returns
-------
predictions : ndarray
This is the result of calling ``method``
"""
X, y, groups = indexable(X, y, groups)
cv = check_cv(cv, y, classifier=is_classifier(estimator))
splits = list(cv.split(X, y, groups))
test_indices = np.concatenate([test for _, test in splits])
if not _check_is_permutation(test_indices, _num_samples(X)):
raise ValueError('cross_val_predict only works for partitions')
# If classification methods produce multiple columns of output,
# we need to manually encode classes to ensure consistent column ordering.
encode = method in ['decision_function', 'predict_proba',
'predict_log_proba'] and y is not None
if encode:
y = np.asarray(y)
if y.ndim == 1:
le = LabelEncoder()
y = le.fit_transform(y)
elif y.ndim == 2:
y_enc = np.zeros_like(y, dtype=int)
for i_label in range(y.shape[1]):
y_enc[:, i_label] = LabelEncoder().fit_transform(y[:, i_label])
y = y_enc
# We clone the estimator to make sure that all the folds are
# independent, and that it is pickle-able.
parallel = Parallel(n_jobs=n_jobs, verbose=verbose,
pre_dispatch=pre_dispatch)
from packaging.version import parse
# verbose was removed from sklearn's non-public _fit_and_predict method in 1.4
if parse(sklearn.__version__) < parse("1.4"):
predictions = parallel(delayed(_fit_and_predict)(
clone(estimator, safe=safe), X, y, train, test, verbose, fit_params, method)
for train, test in splits)
else:
predictions = parallel(delayed(_fit_and_predict)(
clone(estimator, safe=safe), X, y, train, test, fit_params, method)
for train, test in splits)
inv_test_indices = np.empty(len(test_indices), dtype=int)
inv_test_indices[test_indices] = np.arange(len(test_indices))
if sp.issparse(predictions[0]):
predictions = sp.vstack(predictions, format=predictions[0].format)
elif encode and isinstance(predictions[0], list):
# `predictions` is a list of method outputs from each fold.
# If each of those is also a list, then treat this as a
# multioutput-multiclass task. We need to separately concatenate
# the method outputs for each label into an `n_labels` long list.
n_labels = y.shape[1]
concat_pred = []
for i_label in range(n_labels):
label_preds = np.concatenate([p[i_label] for p in predictions])
concat_pred.append(label_preds)
predictions = concat_pred
else:
predictions = np.concatenate(predictions)
if isinstance(predictions, list):
return [p[inv_test_indices] for p in predictions]
else:
return predictions[inv_test_indices]