Source code for econml.sklearn_extensions.linear_model

# Copyright (c) PyWhy contributors. All rights reserved.
# Licensed under the MIT License.

"""Collection of scikit-learn extensions for linear models.

.. testsetup::

    # Our classes that derive from sklearn ones sometimes include
    # inherited docstrings that have embedded doctests; we need the following imports
    # so that they don't break.

    import numpy as np
    from sklearn.linear_model import lasso_path
"""

from __future__ import annotations  # needed to allow type signature to refer to containing type

import numbers
import numpy as np
import warnings
from collections.abc import Iterable
from scipy.stats import norm
from ..utilities import ndim, shape, reshape, _safe_norm_ppf, check_input_arrays
from sklearn import clone
from sklearn.linear_model import LinearRegression, LassoCV, MultiTaskLassoCV, Lasso, MultiTaskLasso
from sklearn.linear_model._base import _preprocess_data
from sklearn.metrics import r2_score
from sklearn.model_selection import KFold, StratifiedKFold
# TODO: consider working around relying on sklearn implementation details
from sklearn.model_selection._split import _CVIterableWrapper
from sklearn.multioutput import MultiOutputRegressor
from sklearn.utils import check_array, check_X_y
from sklearn.utils.multiclass import type_of_target
from sklearn.utils.validation import check_is_fitted
from sklearn.base import BaseEstimator
from statsmodels.tools.tools import add_constant
from statsmodels.api import RLM
import statsmodels
from joblib import Parallel, delayed
from typing import List


class _WeightedCVIterableWrapper(_CVIterableWrapper):
    def __init__(self, cv):
        super().__init__(cv)

    def get_n_splits(self, X=None, y=None, groups=None, sample_weight=None):
        if groups is not None and sample_weight is not None:
            raise ValueError("Cannot simultaneously use grouping and weighting")
        return super().get_n_splits(X, y, groups)

    def split(self, X=None, y=None, groups=None, sample_weight=None):
        if groups is not None and sample_weight is not None:
            raise ValueError("Cannot simultaneously use grouping and weighting")
        return super().split(X, y, groups)


def _weighted_check_cv(cv=5, y=None, classifier=False, random_state=None):
    # local import to avoid circular imports
    from .model_selection import WeightedKFold, WeightedStratifiedKFold
    cv = 5 if cv is None else cv
    if isinstance(cv, numbers.Integral):
        if (classifier and (y is not None) and
                (type_of_target(y) in ('binary', 'multiclass'))):
            return WeightedStratifiedKFold(cv, random_state=random_state)
        else:
            return WeightedKFold(cv, random_state=random_state)

    if not hasattr(cv, 'split') or isinstance(cv, str):
        if not isinstance(cv, Iterable) or isinstance(cv, str):
            raise ValueError("Expected cv as an integer, cross-validation "
                             "object (from sklearn.model_selection) "
                             "or an iterable. Got %s." % cv)
        return _WeightedCVIterableWrapper(cv)

    return cv  # New style cv objects are passed without any modification


class WeightedModelMixin:
    """Mixin class for weighted models.

    For linear models, weights are applied as reweighting of the data matrix X and targets y.
    """

    def _fit_weighted_linear_model(self, X, y, sample_weight, check_input=None):
        # Convert X, y into numpy arrays
        X, y = check_X_y(X, y, y_numeric=True, multi_output=True)
        # Define fit parameters
        fit_params = {'X': X, 'y': y}
        # Some algorithms don't have a check_input option
        if check_input is not None:
            fit_params['check_input'] = check_input

        if sample_weight is not None:
            # Check weights array
            if np.atleast_1d(sample_weight).ndim > 1:
                # Check that weights are size-compatible
                raise ValueError("Sample weights must be 1D array or scalar")
            if np.ndim(sample_weight) == 0:
                sample_weight = np.repeat(sample_weight, X.shape[0])
            else:
                sample_weight = check_array(sample_weight, ensure_2d=False, allow_nd=False)
                if sample_weight.shape[0] != X.shape[0]:
                    raise ValueError(
                        "Found array with {0} sample(s) while {1} samples were expected.".format(
                            sample_weight.shape[0], X.shape[0])
                    )

            # Normalize inputs
            X, y, X_offset, y_offset, X_scale = _preprocess_data(
                X, y, fit_intercept=self.fit_intercept,
                copy=self.copy_X, check_input=check_input if check_input is not None else True,
                sample_weight=sample_weight)
            # Weight inputs
            normalized_weights = X.shape[0] * sample_weight / np.sum(sample_weight)
            sqrt_weights = np.sqrt(normalized_weights)
            X_weighted = sqrt_weights.reshape(-1, 1) * X
            y_weighted = sqrt_weights.reshape(-1, 1) * y if y.ndim > 1 else sqrt_weights * y
            fit_params['X'] = X_weighted
            fit_params['y'] = y_weighted
            if self.fit_intercept:
                # Fit base class without intercept
                self.fit_intercept = False
                # Fit Lasso
                super().fit(**fit_params)
                # Reset intercept
                self.fit_intercept = True
                # The intercept is not calculated properly due the sqrt(weights) factor
                # so it must be recomputed
                self._set_intercept(X_offset, y_offset, X_scale)
            else:
                super().fit(**fit_params)
        else:
            # Fit lasso without weights
            super().fit(**fit_params)


[docs]class WeightedLasso(WeightedModelMixin, Lasso): """Version of sklearn Lasso that accepts weights. Parameters ---------- alpha : float, optional Constant that multiplies the L1 term. Defaults to 1.0. ``alpha = 0`` is equivalent to ordinary least squares, solved by the :class:`LinearRegression` object. For numerical reasons, using ``alpha = 0`` with Lasso is not advised. Given this, you should use the :class:`LinearRegression` object. fit_intercept : bool, default True Whether to calculate the intercept for this model. If set to False, no intercept will be used in calculations (e.g. data is expected to be already centered). precompute : True | False | array_like, default False Whether to use a precomputed Gram matrix to speed up calculations. If set to ``'auto'`` let us decide. The Gram matrix can also be passed as argument. For sparse input this option is always ``True`` to preserve sparsity. copy_X : bool, default True If ``True``, X will be copied; else, it may be overwritten. max_iter : int, optional The maximum number of iterations tol : float, optional The tolerance for the optimization: if the updates are smaller than ``tol``, the optimization code checks the dual gap for optimality and continues until it is smaller than ``tol``. warm_start : bool, optional When set to True, reuse the solution of the previous call to fit as initialization, otherwise, just erase the previous solution. See :term:`the Glossary <warm_start>`. positive : bool, optional When set to ``True``, forces the coefficients to be positive. random_state : int, RandomState instance, or None, default None The seed of the pseudo random number generator that selects a random feature to update. If int, random_state is the seed used by the random number generator; If :class:`~numpy.random.mtrand.RandomState` instance, random_state is the random number generator; If None, the random number generator is the :class:`~numpy.random.mtrand.RandomState` instance used by :mod:`np.random<numpy.random>`. Used when ``selection='random'``. selection : str, default 'cyclic' If set to 'random', a random coefficient is updated every iteration rather than looping over features sequentially by default. This (setting to 'random') often leads to significantly faster convergence especially when tol is higher than 1e-4. Attributes ---------- coef_ : array, shape (n_features,) | (n_targets, n_features) parameter vector (w in the cost function formula) intercept_ : float | array, shape (n_targets,) independent term in decision function. n_iter_ : int | array_like, shape (n_targets,) number of iterations run by the coordinate descent solver to reach the specified tolerance. """
[docs] def __init__(self, alpha=1.0, fit_intercept=True, precompute=False, copy_X=True, max_iter=1000, tol=1e-4, warm_start=False, positive=False, random_state=None, selection='cyclic'): super().__init__( alpha=alpha, fit_intercept=fit_intercept, precompute=precompute, copy_X=copy_X, max_iter=max_iter, tol=tol, warm_start=warm_start, positive=positive, random_state=random_state, selection=selection)
[docs] def fit(self, X, y, sample_weight=None, check_input=True): """Fit model with coordinate descent. Parameters ---------- X : ndarray or scipy.sparse matrix, (n_samples, n_features) Data y : ndarray, shape (n_samples,) or (n_samples, n_targets) Target. Will be cast to X's dtype if necessary sample_weight : numpy array of shape [n_samples] Individual weights for each sample. The weights will be normalized internally. check_input : bool, default True Allow to bypass several input checking. Don't use this parameter unless you know what you do. """ self._fit_weighted_linear_model(X, y, sample_weight, check_input) return self
class WeightedMultiTaskLasso(WeightedModelMixin, MultiTaskLasso): """Version of sklearn MultiTaskLasso that accepts weights. Parameters ---------- alpha : float, optional Constant that multiplies the L1 term. Defaults to 1.0. ``alpha = 0`` is equivalent to an ordinary least square, solved by the :class:`LinearRegression` object. For numerical reasons, using ``alpha = 0`` with the ``Lasso`` object is not advised. Given this, you should use the :class:`LinearRegression` object. fit_intercept : bool, default True Whether to calculate the intercept for this model. If set to False, no intercept will be used in calculations (e.g. data is expected to be already centered). copy_X : bool, default True If ``True``, X will be copied; else, it may be overwritten. max_iter : int, optional The maximum number of iterations tol : float, optional The tolerance for the optimization: if the updates are smaller than ``tol``, the optimization code checks the dual gap for optimality and continues until it is smaller than ``tol``. warm_start : bool, optional When set to True, reuse the solution of the previous call to fit as initialization, otherwise, just erase the previous solution. See :term:`the Glossary <warm_start>`. random_state : int, RandomState instance, or None, default None The seed of the pseudo random number generator that selects a random feature to update. If int, random_state is the seed used by the random number generator; If :class:`~numpy.random.mtrand.RandomState` instance, random_state is the random number generator; If None, the random number generator is the :class:`~numpy.random.mtrand.RandomState` instance used by :mod:`np.random<numpy.random>`. Used when ``selection='random'``. selection : str, default 'cyclic' If set to 'random', a random coefficient is updated every iteration rather than looping over features sequentially by default. This (setting to 'random') often leads to significantly faster convergence especially when tol is higher than 1e-4. Attributes ---------- coef_ : array, shape (n_features,) | (n_targets, n_features) parameter vector (w in the cost function formula) intercept_ : float | array, shape (n_targets,) independent term in decision function. n_iter_ : int | array_like, shape (n_targets,) number of iterations run by the coordinate descent solver to reach the specified tolerance. """ def __init__(self, alpha=1.0, fit_intercept=True, copy_X=True, max_iter=1000, tol=1e-4, warm_start=False, random_state=None, selection='cyclic'): super().__init__( alpha=alpha, fit_intercept=fit_intercept, copy_X=copy_X, max_iter=max_iter, tol=tol, warm_start=warm_start, random_state=random_state, selection=selection) def fit(self, X, y, sample_weight=None): """Fit model with coordinate descent. Parameters ---------- X : ndarray or scipy.sparse matrix, (n_samples, n_features) Data y : ndarray, shape (n_samples,) or (n_samples, n_targets) Target. Will be cast to X's dtype if necessary sample_weight : numpy array of shape [n_samples] Individual weights for each sample. The weights will be normalized internally. """ self._fit_weighted_linear_model(X, y, sample_weight) return self
[docs]class WeightedLassoCV(WeightedModelMixin, LassoCV): """Version of sklearn LassoCV that accepts weights. .. testsetup:: import numpy as np from sklearn.linear_model import lasso_path Parameters ---------- eps : float, optional Length of the path. ``eps=1e-3`` means that ``alpha_min / alpha_max = 1e-3``. n_alphas : int, optional Number of alphas along the regularization path alphas : numpy array, optional List of alphas where to compute the models. If ``None`` alphas are set automatically fit_intercept : bool, default True whether to calculate the intercept for this model. If set to false, no intercept will be used in calculations (e.g. data is expected to be already centered). precompute : True | False | 'auto' | array_like Whether to use a precomputed Gram matrix to speed up calculations. If set to ``'auto'`` let us decide. The Gram matrix can also be passed as argument. max_iter : int, optional The maximum number of iterations tol : float, optional The tolerance for the optimization: if the updates are smaller than ``tol``, the optimization code checks the dual gap for optimality and continues until it is smaller than ``tol``. copy_X : bool, default True If ``True``, X will be copied; else, it may be overwritten. cv : int, cross-validation generator or an iterable, optional Determines the cross-validation splitting strategy. Possible inputs for cv are: - None, to use the default 3-fold weighted cross-validation, - integer, to specify the number of folds. - :term:`CV splitter`, - An iterable yielding (train, test) splits as arrays of indices. For integer/None inputs, :class:`WeightedKFold` is used. If None then 5 folds are used. verbose : bool or int Amount of verbosity. n_jobs : int or None, optional Number of CPUs to use during the cross validation. ``None`` means 1 unless in a :obj:`joblib.parallel_backend` context. ``-1`` means using all processors. See :term:`Glossary <n_jobs>` for more details. positive : bool, optional If positive, restrict regression coefficients to be positive random_state : int, RandomState instance, or None, default None The seed of the pseudo random number generator that selects a random feature to update. If int, random_state is the seed used by the random number generator; If :class:`~numpy.random.mtrand.RandomState` instance, random_state is the random number generator; If None, the random number generator is the :class:`~numpy.random.mtrand.RandomState` instance used by :mod:`np.random<numpy.random>`. Used when ``selection='random'``. selection : str, default 'cyclic' If set to 'random', a random coefficient is updated every iteration rather than looping over features sequentially by default. This (setting to 'random') often leads to significantly faster convergence especially when tol is higher than 1e-4. """
[docs] def __init__(self, eps=1e-3, n_alphas=100, alphas=None, fit_intercept=True, precompute='auto', max_iter=1000, tol=1e-4, copy_X=True, cv=None, verbose=False, n_jobs=None, positive=False, random_state=None, selection='cyclic'): super().__init__( eps=eps, n_alphas=n_alphas, alphas=alphas, fit_intercept=fit_intercept, precompute=precompute, max_iter=max_iter, tol=tol, copy_X=copy_X, cv=cv, verbose=verbose, n_jobs=n_jobs, positive=positive, random_state=random_state, selection=selection)
[docs] def fit(self, X, y, sample_weight=None): """Fit model with coordinate descent. Parameters ---------- X : ndarray or scipy.sparse matrix, (n_samples, n_features) Data y : ndarray, shape (n_samples,) or (n_samples, n_targets) Target. Will be cast to X's dtype if necessary sample_weight : numpy array of shape [n_samples] Individual weights for each sample. The weights will be normalized internally. """ # Make weighted splitter cv_temp = self.cv self.cv = _weighted_check_cv(self.cv, random_state=self.random_state).split(X, y, sample_weight=sample_weight) # Fit weighted model self._fit_weighted_linear_model(X, y, sample_weight) self.cv = cv_temp return self
[docs]class WeightedMultiTaskLassoCV(WeightedModelMixin, MultiTaskLassoCV): """Version of sklearn MultiTaskLassoCV that accepts weights. .. testsetup:: import numpy as np from sklearn.linear_model import lasso_path Parameters ---------- eps : float, optional Length of the path. ``eps=1e-3`` means that ``alpha_min / alpha_max = 1e-3``. n_alphas : int, optional Number of alphas along the regularization path alphas : array_like, optional List of alphas where to compute the models. If not provided, set automatically. fit_intercept : bool whether to calculate the intercept for this model. If set to false, no intercept will be used in calculations (e.g. data is expected to be already centered). max_iter : int, optional The maximum number of iterations. tol : float, optional The tolerance for the optimization: if the updates are smaller than ``tol``, the optimization code checks the dual gap for optimality and continues until it is smaller than ``tol``. copy_X : bool, default True If ``True``, X will be copied; else, it may be overwritten. cv : int, cross-validation generator or an iterable, optional Determines the cross-validation splitting strategy. Possible inputs for cv are: - None, to use the default 3-fold weighted cross-validation, - integer, to specify the number of folds. - :term:`CV splitter`, - An iterable yielding (train, test) splits as arrays of indices. For integer/None inputs, :class:`WeightedKFold` is used. If None then 5-folds are used. verbose : bool or int Amount of verbosity. n_jobs : int or None, optional Number of CPUs to use during the cross validation. Note that this is used only if multiple values for l1_ratio are given. ``None`` means 1 unless in a :obj:`joblib.parallel_backend` context. ``-1`` means using all processors. See :term:`Glossary <n_jobs>` for more details. random_state : int, RandomState instance, or None, default None The seed of the pseudo random number generator that selects a random feature to update. If int, random_state is the seed used by the random number generator; If :class:`~numpy.random.mtrand.RandomState` instance, random_state is the random number generator; If None, the random number generator is the :class:`~numpy.random.mtrand.RandomState` instance used by :mod:`np.random<numpy.random>`. Used when ``selection='random'``. selection : str, default 'cyclic' If set to 'random', a random coefficient is updated every iteration rather than looping over features sequentially by default. This (setting to 'random') often leads to significantly faster convergence especially when tol is higher than 1e-4. """
[docs] def __init__(self, eps=1e-3, n_alphas=100, alphas=None, fit_intercept=True, max_iter=1000, tol=1e-4, copy_X=True, cv=None, verbose=False, n_jobs=None, random_state=None, selection='cyclic'): super().__init__( eps=eps, n_alphas=n_alphas, alphas=alphas, fit_intercept=fit_intercept, max_iter=max_iter, tol=tol, copy_X=copy_X, cv=cv, verbose=verbose, n_jobs=n_jobs, random_state=random_state, selection=selection)
[docs] def fit(self, X, y, sample_weight=None): """Fit model with coordinate descent. Parameters ---------- X : ndarray or scipy.sparse matrix, (n_samples, n_features) Data y : ndarray, shape (n_samples,) or (n_samples, n_targets) Target. Will be cast to X's dtype if necessary sample_weight : numpy array of shape [n_samples] Individual weights for each sample. The weights will be normalized internally. """ # Make weighted splitter cv_temp = self.cv self.cv = _weighted_check_cv(self.cv, random_state=self.random_state).split(X, y, sample_weight=sample_weight) # Fit weighted model self._fit_weighted_linear_model(X, y, sample_weight) self.cv = cv_temp return self
def _get_theta_coefs_and_tau_sq(i, X, sample_weight, alpha_cov, n_alphas_cov, max_iter, tol, random_state): n_samples, n_features = X.shape y = X[:, i] X_reduced = X[:, list(range(i)) + list(range(i + 1, n_features))] # Call weighted lasso on reduced design matrix if alpha_cov == 'auto': local_wlasso = WeightedLassoCV(cv=3, n_alphas=n_alphas_cov, fit_intercept=False, max_iter=max_iter, tol=tol, n_jobs=1, random_state=random_state) else: local_wlasso = WeightedLasso(alpha=alpha_cov, fit_intercept=False, max_iter=max_iter, tol=tol, random_state=random_state) local_wlasso.fit(X_reduced, y, sample_weight=sample_weight) coefs = local_wlasso.coef_ # Weighted tau if sample_weight is not None: y_weighted = y * sample_weight / np.sum(sample_weight) else: y_weighted = y / n_samples tausq = np.dot(y - local_wlasso.predict(X_reduced), y_weighted) return coefs, tausq
[docs]class DebiasedLasso(WeightedLasso): """Debiased Lasso model. Implementation was derived from <https://arxiv.org/abs/1303.0518>. Only implemented for single-dimensional output. .. testsetup:: import numpy as np from sklearn.linear_model import lasso_path Parameters ---------- alpha : str | float, default 'auto'. Constant that multiplies the L1 term. Defaults to 'auto'. ``alpha = 0`` is equivalent to an ordinary least square, solved by the :class:`LinearRegression` object. For numerical reasons, using ``alpha = 0`` with the ``Lasso`` object is not advised. Given this, you should use the :class:`.LinearRegression` object. n_alphas : int, default 100 How many alphas to try if alpha='auto' alpha_cov : str | float, default 'auto' The regularization alpha that is used when constructing the pseudo inverse of the covariance matrix Theta used to for correcting the lasso coefficient. Each such regression corresponds to the regression of one feature on the remainder of the features. n_alphas_cov : int, default 10 How many alpha_cov to try if alpha_cov='auto'. fit_intercept : bool, default True Whether to calculate the intercept for this model. If set to False, no intercept will be used in calculations (e.g. data is expected to be already centered). precompute : True | False | array_like, default False Whether to use a precomputed Gram matrix to speed up calculations. If set to ``'auto'`` let us decide. The Gram matrix can also be passed as argument. For sparse input this option is always ``True`` to preserve sparsity. copy_X : bool, default True If ``True``, X will be copied; else, it may be overwritten. max_iter : int, optional The maximum number of iterations tol : float, optional The tolerance for the optimization: if the updates are smaller than ``tol``, the optimization code checks the dual gap for optimality and continues until it is smaller than ``tol``. warm_start : bool, optional When set to True, reuse the solution of the previous call to fit as initialization, otherwise, just erase the previous solution. See :term:`the Glossary <warm_start>`. random_state : int, RandomState instance, or None, default None The seed of the pseudo random number generator that selects a random feature to update. If int, random_state is the seed used by the random number generator; If :class:`~numpy.random.mtrand.RandomState` instance, random_state is the random number generator; If None, the random number generator is the :class:`~numpy.random.mtrand.RandomState` instance used by :mod:`np.random<numpy.random>`. Used when ``selection='random'``. selection : str, default 'cyclic' If set to 'random', a random coefficient is updated every iteration rather than looping over features sequentially by default. This (setting to 'random') often leads to significantly faster convergence especially when tol is higher than 1e-4. n_jobs : int, optional How many jobs to use whenever parallelism is invoked Attributes ---------- coef_ : array, shape (n_features,) Parameter vector (w in the cost function formula). intercept_ : float Independent term in decision function. n_iter_ : int | array_like, shape (n_targets,) Number of iterations run by the coordinate descent solver to reach the specified tolerance. selected_alpha_ : float Penalty chosen through cross-validation, if alpha='auto'. coef_stderr_ : array, shape (n_features,) Estimated standard errors for coefficients (see ``coef_`` attribute). intercept_stderr_ : float Estimated standard error intercept (see ``intercept_`` attribute). """
[docs] def __init__(self, alpha='auto', n_alphas=100, alpha_cov='auto', n_alphas_cov=10, fit_intercept=True, precompute=False, copy_X=True, max_iter=1000, tol=1e-4, warm_start=False, random_state=None, selection='cyclic', n_jobs=None): self.n_jobs = n_jobs self.n_alphas = n_alphas self.alpha_cov = alpha_cov self.n_alphas_cov = n_alphas_cov super().__init__( alpha=alpha, fit_intercept=fit_intercept, precompute=precompute, copy_X=copy_X, max_iter=max_iter, tol=tol, warm_start=warm_start, positive=False, random_state=random_state, selection=selection)
[docs] def fit(self, X, y, sample_weight=None, check_input=True): """Fit debiased lasso model. Parameters ---------- X : ndarray or scipy.sparse matrix, (n_samples, n_features) Input data. y : array, shape (n_samples,) Target. Will be cast to X's dtype if necessary sample_weight : numpy array of shape [n_samples] Individual weights for each sample. The weights will be normalized internally. check_input : bool, default True Allow to bypass several input checking. Don't use this parameter unless you know what you do. """ self.selected_alpha_ = None if self.alpha == 'auto': # Select optimal penalty self.alpha = self._get_optimal_alpha(X, y, sample_weight) self.selected_alpha_ = self.alpha else: # Warn about consistency warnings.warn("Setting a suboptimal alpha can lead to miscalibrated confidence intervals. " "We recommend setting alpha='auto' for optimality.") # Convert X, y into numpy arrays X, y = check_X_y(X, y, y_numeric=True, multi_output=False) # Fit weighted lasso with user input super().fit(X, y, sample_weight, check_input) # Center X, y X, y, X_offset, y_offset, X_scale = _preprocess_data( X, y, fit_intercept=self.fit_intercept, copy=self.copy_X, check_input=check_input, sample_weight=sample_weight) # Calculate quantities that will be used later on. Account for centered data y_pred = self.predict(X) - self.intercept_ self._theta_hat = self._get_theta_hat(X, sample_weight) self._X_offset = X_offset # Calculate coefficient and error variance num_nonzero_coefs = np.count_nonzero(self.coef_) self._error_variance = np.average((y - y_pred)**2, weights=sample_weight) / \ (1 - num_nonzero_coefs / X.shape[0]) self._mean_error_variance = self._error_variance / X.shape[0] self._coef_variance = self._get_unscaled_coef_var( X, self._theta_hat, sample_weight) * self._error_variance # Add coefficient correction coef_correction = self._get_coef_correction( X, y, y_pred, sample_weight, self._theta_hat) self.coef_ += coef_correction # Set coefficients and intercept standard errors self.coef_stderr_ = np.sqrt(np.diag(self._coef_variance)) if self.fit_intercept: self.intercept_stderr_ = np.sqrt( self._X_offset @ self._coef_variance @ self._X_offset + self._mean_error_variance ) else: self.intercept_stderr_ = 0 # Set intercept self._set_intercept(X_offset, y_offset, X_scale) # Return alpha to 'auto' state if self.selected_alpha_ is not None: self.alpha = 'auto' return self
[docs] def prediction_stderr(self, X): """Get the standard error of the predictions using the debiased lasso. Parameters ---------- X : ndarray or scipy.sparse matrix, (n_samples, n_features) Samples. Returns ------- prediction_stderr : array_like, shape (n_samples, ) The standard error of each coordinate of the output at each point we predict """ # Note that in the case of no intercept, X_offset is 0 if self.fit_intercept: X = X - self._X_offset # Calculate the variance of the predictions var_pred = np.sum(np.matmul(X, self._coef_variance) * X, axis=1) if self.fit_intercept: var_pred += self._mean_error_variance pred_stderr = np.sqrt(var_pred) return pred_stderr
[docs] def predict_interval(self, X, alpha=0.05): """Build prediction confidence intervals using the debiased lasso. Parameters ---------- X : ndarray or scipy.sparse matrix, (n_samples, n_features) Samples. alpha: float in [0, 1], default 0.05 The overall level of confidence of the reported interval. The alpha/2, 1-alpha/2 confidence interval is reported. Returns ------- (y_lower, y_upper) : tuple of array, shape (n_samples, ) Returns lower and upper interval endpoints. """ lower = alpha / 2 upper = 1 - alpha / 2 y_pred = self.predict(X) # Calculate prediction confidence intervals sd_pred = self.prediction_stderr(X) y_lower = y_pred + \ np.apply_along_axis(lambda s: norm.ppf( lower, scale=s), 0, sd_pred) y_upper = y_pred + \ np.apply_along_axis(lambda s: norm.ppf( upper, scale=s), 0, sd_pred) return y_lower, y_upper
[docs] def coef__interval(self, alpha=0.05): """Get a confidence interval bounding the fitted coefficients. Parameters ---------- alpha : float, default 0.05 The confidence level. Will calculate the alpha/2-quantile and the (1-alpha/2)-quantile of the parameter distribution as confidence interval Returns ------- (coef_lower, coef_upper) : tuple of array, shape (n_coefs, ) Returns lower and upper interval endpoints for the coefficients. """ lower = alpha / 2 upper = 1 - alpha / 2 return self.coef_ + np.apply_along_axis(lambda s: norm.ppf(lower, scale=s), 0, self.coef_stderr_), \ self.coef_ + np.apply_along_axis(lambda s: norm.ppf(upper, scale=s), 0, self.coef_stderr_)
[docs] def intercept__interval(self, alpha=0.05): """Get a confidence interval bounding the fitted intercept. Parameters ---------- alpha : float, default 0.05 The confidence level. Will calculate the alpha/2-quantile and the (1-alpha/2)-quantile of the parameter distribution as confidence interval Returns ------- (intercept_lower, intercept_upper) : tuple floats Returns lower and upper interval endpoints for the intercept. """ lower = alpha / 2 upper = 1 - alpha / 2 if self.fit_intercept: return self.intercept_ + norm.ppf(lower, scale=self.intercept_stderr_), self.intercept_ + \ norm.ppf(upper, scale=self.intercept_stderr_), else: return 0.0, 0.0
def _get_coef_correction(self, X, y, y_pred, sample_weight, theta_hat): # Assumes flattened y n_samples, _ = X.shape y_res = np.ndarray.flatten(y) - y_pred # Compute weighted residuals if sample_weight is not None: y_res_scaled = y_res * sample_weight / np.sum(sample_weight) else: y_res_scaled = y_res / n_samples delta_coef = np.matmul( theta_hat, np.matmul(X.T, y_res_scaled)) return delta_coef def _get_optimal_alpha(self, X, y, sample_weight): # To be done once per target. Assumes y can be flattened. cv_estimator = WeightedLassoCV(cv=5, n_alphas=self.n_alphas, fit_intercept=self.fit_intercept, precompute=self.precompute, copy_X=True, max_iter=self.max_iter, tol=self.tol, random_state=self.random_state, selection=self.selection, n_jobs=self.n_jobs) cv_estimator.fit(X, y.flatten(), sample_weight=sample_weight) return cv_estimator.alpha_ def _get_theta_hat(self, X, sample_weight): # Assumes that X has already been offset n_samples, n_features = X.shape # Special case: n_features=1 if n_features == 1: C_hat = np.ones((1, 1)) tausq = (X.T @ X / n_samples).flatten() return np.diag(1 / tausq) @ C_hat # Compute Lasso coefficients for the columns of the design matrix results = Parallel(n_jobs=self.n_jobs)( delayed(_get_theta_coefs_and_tau_sq)(i, X, sample_weight, self.alpha_cov, self.n_alphas_cov, self.max_iter, self.tol, self.random_state) for i in range(n_features)) coefs, tausq = zip(*results) coefs = np.array(coefs) tausq = np.array(tausq) # Compute C_hat C_hat = np.diag(np.ones(n_features)) C_hat[0][1:] = -coefs[0] for i in range(1, n_features): C_hat[i][:i] = -coefs[i][:i] C_hat[i][i + 1:] = -coefs[i][i:] # Compute theta_hat theta_hat = np.diag(1 / tausq) @ C_hat return theta_hat def _get_unscaled_coef_var(self, X, theta_hat, sample_weight): if sample_weight is not None: norm_weights = sample_weight / np.sum(sample_weight) sigma = X.T @ (norm_weights.reshape(-1, 1) * X) else: sigma = np.matmul(X.T, X) / X.shape[0] _unscaled_coef_var = np.matmul( np.matmul(theta_hat, sigma), theta_hat.T) / X.shape[0] return _unscaled_coef_var
[docs]class MultiOutputDebiasedLasso(MultiOutputRegressor): """Debiased MultiOutputLasso model. Implementation was derived from <https://arxiv.org/abs/1303.0518>. Applies debiased lasso once per target. If only a flat target is passed in, it reverts to the DebiasedLasso algorithm. Parameters ---------- alpha : str | float, optional. Default 'auto'. Constant that multiplies the L1 term. Defaults to 'auto'. ``alpha = 0`` is equivalent to an ordinary least square, solved by the :class:`LinearRegression` object. For numerical reasons, using ``alpha = 0`` with the ``Lasso`` object is not advised. Given this, you should use the :class:`LinearRegression` object. n_alphas : int, default 100 How many alphas to try if alpha='auto' alpha_cov : str | float, default 'auto' The regularization alpha that is used when constructing the pseudo inverse of the covariance matrix Theta used to for correcting the lasso coefficient. Each such regression corresponds to the regression of one feature on the remainder of the features. n_alphas_cov : int, default 10 How many alpha_cov to try if alpha_cov='auto'. fit_intercept : bool, default True Whether to calculate the intercept for this model. If set to False, no intercept will be used in calculations (e.g. data is expected to be already centered). precompute : True | False | array_like, default False Whether to use a precomputed Gram matrix to speed up calculations. If set to ``'auto'`` let us decide. The Gram matrix can also be passed as argument. For sparse input this option is always ``True`` to preserve sparsity. copy_X : bool, default True If ``True``, X will be copied; else, it may be overwritten. max_iter : int, optional The maximum number of iterations tol : float, optional The tolerance for the optimization: if the updates are smaller than ``tol``, the optimization code checks the dual gap for optimality and continues until it is smaller than ``tol``. warm_start : bool, optional When set to True, reuse the solution of the previous call to fit as initialization, otherwise, just erase the previous solution. See :term:`the Glossary <warm_start>`. random_state : int, RandomState instance, or None, default None The seed of the pseudo random number generator that selects a random feature to update. If int, random_state is the seed used by the random number generator; If :class:`~numpy.random.mtrand.RandomState` instance, random_state is the random number generator; If None, the random number generator is the :class:`~numpy.random.mtrand.RandomState` instance used by :mod:`np.random<numpy.random>`. Used when ``selection='random'``. selection : str, default 'cyclic' If set to 'random', a random coefficient is updated every iteration rather than looping over features sequentially by default. This (setting to 'random') often leads to significantly faster convergence especially when tol is higher than 1e-4. n_jobs : int, optional How many jobs to use whenever parallelism is invoked Attributes ---------- coef_ : array, shape (n_targets, n_features) or (n_features,) Parameter vector (w in the cost function formula). intercept_ : array, shape (n_targets, ) or float Independent term in decision function. selected_alpha_ : array, shape (n_targets, ) or float Penalty chosen through cross-validation, if alpha='auto'. coef_stderr_ : array, shape (n_targets, n_features) or (n_features, ) Estimated standard errors for coefficients (see ``coef_`` attribute). intercept_stderr_ : array, shape (n_targets, ) or float Estimated standard error intercept (see ``intercept_`` attribute). """
[docs] def __init__(self, alpha='auto', n_alphas=100, alpha_cov='auto', n_alphas_cov=10, fit_intercept=True, precompute=False, copy_X=True, max_iter=1000, tol=1e-4, warm_start=False, random_state=None, selection='cyclic', n_jobs=None): self.estimator = DebiasedLasso(alpha=alpha, n_alphas=n_alphas, alpha_cov=alpha_cov, n_alphas_cov=n_alphas_cov, fit_intercept=fit_intercept, precompute=precompute, copy_X=copy_X, max_iter=max_iter, tol=tol, warm_start=warm_start, random_state=random_state, selection=selection, n_jobs=n_jobs) super().__init__(estimator=self.estimator, n_jobs=n_jobs)
[docs] def fit(self, X, y, sample_weight=None): """Fit the multi-output debiased lasso model. Parameters ---------- X : ndarray or scipy.sparse matrix, (n_samples, n_features) Input data. y : array, shape (n_samples, n_targets) or (n_samples, ) Target. Will be cast to X's dtype if necessary sample_weight : numpy array of shape [n_samples] Individual weights for each sample. The weights will be normalized internally. """ # Allow for single output as well # When only one output is passed in, the MultiOutputDebiasedLasso behaves like the DebiasedLasso self.flat_target = False if np.ndim(y) == 1: self.flat_target = True y = np.asarray(y).reshape(-1, 1) super().fit(X, y, sample_weight) # Set coef_ attribute self._set_attribute("coef_") # Set intercept_ attribute self._set_attribute("intercept_", condition=self.estimators_[0].fit_intercept, default=0.0) # Set selected_alpha_ attribute self._set_attribute("selected_alpha_", condition=(self.estimators_[0].alpha == 'auto')) # Set coef_stderr_ self._set_attribute("coef_stderr_") # intercept_stderr_ self._set_attribute("intercept_stderr_") return self
[docs] def predict(self, X): """Get the prediction using the debiased lasso. Parameters ---------- X : ndarray or scipy.sparse matrix, (n_samples, n_features) Samples. Returns ------- prediction : array_like, shape (n_samples, ) or (n_samples, n_targets) The prediction at each point. """ pred = super().predict(X) if self.flat_target: pred = pred.flatten() return pred
[docs] def prediction_stderr(self, X): """Get the standard error of the predictions using the debiased lasso. Parameters ---------- X : ndarray or scipy.sparse matrix, (n_samples, n_features) Samples. Returns ------- prediction_stderr : array_like, shape (n_samples, ) or (n_samples, n_targets) The standard error of each coordinate of the output at each point we predict """ n_estimators = len(self.estimators_) X = check_array(X) pred_stderr = np.empty((X.shape[0], n_estimators)) for i, estimator in enumerate(self.estimators_): pred_stderr[:, i] = estimator.prediction_stderr(X) if self.flat_target: pred_stderr = pred_stderr.flatten() return pred_stderr
[docs] def predict_interval(self, X, alpha=0.05): """Build prediction confidence intervals using the debiased lasso. Parameters ---------- X : ndarray or scipy.sparse matrix, (n_samples, n_features) Samples. alpha: float in [0, 1], default 0.05 The overall level of confidence of the reported interval. The alpha/2, 1-alpha/2 confidence interval is reported. Returns ------- (y_lower, y_upper) : tuple of array, shape (n_samples, n_targets) or (n_samples, ) Returns lower and upper interval endpoints. """ n_estimators = len(self.estimators_) X = check_array(X) y_lower = np.empty((X.shape[0], n_estimators)) y_upper = np.empty((X.shape[0], n_estimators)) for i, estimator in enumerate(self.estimators_): y_lower[:, i], y_upper[:, i] = estimator.predict_interval(X, alpha=alpha) if self.flat_target: y_lower = y_lower.flatten() y_upper = y_upper.flatten() return y_lower, y_upper
[docs] def coef__interval(self, alpha=0.05): """Get a confidence interval bounding the fitted coefficients. Parameters ---------- alpha : float, default 0.05 The confidence level. Will calculate the alpha/2-quantile and the (1-alpha/2)-quantile of the parameter distribution as confidence interval Returns ------- (coef_lower, coef_upper) : tuple of array, shape (n_targets, n_coefs) or (n_coefs, ) Returns lower and upper interval endpoints for the coefficients. """ n_estimators = len(self.estimators_) coef_lower = np.empty((n_estimators, self.estimators_[0].coef_.shape[0])) coef_upper = np.empty((n_estimators, self.estimators_[0].coef_.shape[0])) for i, estimator in enumerate(self.estimators_): coef_lower[i], coef_upper[i] = estimator.coef__interval(alpha=alpha) if self.flat_target == 1: coef_lower = coef_lower.flatten() coef_upper = coef_upper.flatten() return coef_lower, coef_upper
[docs] def intercept__interval(self, alpha=0.05): """Get a confidence interval bounding the fitted intercept. Parameters ---------- alpha : float, default 0.05 The confidence level. Will calculate the alpha/2-quantile and the (1-alpha/2)-quantile of the parameter distribution as confidence interval Returns ------- (intercept_lower, intercept_upper) : tuple of array of size (n_targets, ) or tuple of floats Returns lower and upper interval endpoints for the intercept. """ if len(self.estimators_) == 1: return self.estimators_[0].intercept__interval(alpha=alpha) else: intercepts = np.array([estimator.intercept__interval(alpha=alpha) for estimator in self.estimators_]) return intercepts[:, 0], intercepts[:, 1]
[docs] def get_params(self, deep=True): """Get parameters for this estimator.""" return self.estimator.get_params(deep=deep)
[docs] def set_params(self, **params): """Set parameters for this estimator.""" self.estimator.set_params(**params)
def _set_attribute(self, attribute_name, condition=True, default=None): if condition: if not self.flat_target: attribute_value = np.array([getattr(estimator, attribute_name) for estimator in self.estimators_]) else: attribute_value = getattr(self.estimators_[0], attribute_name) else: attribute_value = default setattr(self, attribute_name, attribute_value)
class _PairedEstimatorWrapper: """Helper class to wrap two different estimators, one of which can be used only with single targets and the other which can be used on multiple targets. Not intended to be used directly by users.""" _SingleEst = None _MultiEst = None _known_params = [] _post_fit_attrs = [] def __init__(self, *args, **kwargs): self.args = args self.kwargs = kwargs # set model to the single-target estimator by default so there's always a model to get and set attributes on self.model = self._SingleEst(*args, **kwargs) def fit(self, X, y, sample_weight=None): self._needs_unravel = False params = {key: value for (key, value) in self.get_params().items() if key in self._known_params} if ndim(y) == 2 and shape(y)[1] > 1: self.model = self._MultiEst(**params) else: if ndim(y) == 2 and shape(y)[1] == 1: y = np.ravel(y) self._needs_unravel = True self.model = self._SingleEst(**params) self.model.fit(X, y, sample_weight) for param in self._post_fit_attrs: setattr(self, param, getattr(self.model, param)) return self def predict(self, X): predictions = self.model.predict(X) return reshape(predictions, (-1, 1)) if self._needs_unravel else predictions def score(self, X, y, sample_weight=None): return self.model.score(X, y, sample_weight) def __getattr__(self, key): if key in self._known_params: return getattr(self.model, key) else: raise AttributeError("No attribute " + key) def __setattr__(self, key, value): if key in self._known_params: setattr(self.model, key, value) else: super().__setattr__(key, value) def get_params(self, deep=True): """Get parameters for this estimator.""" return {k: v for k, v in self.model.get_params(deep=deep).items() if k in self._known_params} def set_params(self, **params): """Set parameters for this estimator.""" self.model.set_params(**params)
[docs]class WeightedLassoCVWrapper(_PairedEstimatorWrapper): """Helper class to wrap either WeightedLassoCV or WeightedMultiTaskLassoCV depending on the shape of the target.""" _SingleEst = WeightedLassoCV _MultiEst = WeightedMultiTaskLassoCV # whitelist known params because full set is not necessarily identical between LassoCV and MultiTaskLassoCV # (e.g. former has 'positive' and 'precompute' while latter does not) _known_params = set(['eps', 'n_alphas', 'alphas', 'fit_intercept', 'normalize', 'max_iter', 'tol', 'copy_X', 'cv', 'verbose', 'n_jobs', 'random_state', 'selection']) _post_fit_attrs = set(['alpha_', 'alphas_', 'coef_', 'dual_gap_', 'intercept_', 'n_iter_', 'n_features_in_', 'mse_path_'])
class WeightedLassoWrapper(_PairedEstimatorWrapper): """Helper class to wrap either WeightedLasso or WeightedMultiTaskLasso depending on the shape of the target.""" _SingleEst = WeightedLasso _MultiEst = WeightedMultiTaskLasso _known_params = set(['alpha', 'fit_intercept', 'copy_X', 'max_iter', 'tol', 'random_state', 'selection']) _post_fit_attrs = set(['coef_', 'dual_gap_', 'intercept_', 'n_iter_', 'n_features_in_'])
[docs]class SelectiveRegularization: """ Estimator of a linear model where regularization is applied to only a subset of the coefficients. Assume that our loss is .. math:: \\ell(\\beta_1, \\beta_2) = \\lVert y - X_1 \\beta_1 - X_2 \\beta_2 \\rVert^2 + f(\\beta_2) so that we're regularizing only the coefficients in :math:`\\beta_2`. Then, since :math:`\\beta_1` doesn't appear in the penalty, the problem of finding :math:`\\beta_1` to minimize the loss once :math:`\\beta_2` is known reduces to just a normal OLS regression, so that: .. math:: \\beta_1 = (X_1^\\top X_1)^{-1}X_1^\\top(y - X_2 \\beta_2) Plugging this into the loss, we obtain .. math:: ~& \\lVert y - X_1 (X_1^\\top X_1)^{-1}X_1^\\top(y - X_2 \\beta_2) - X_2 \\beta_2 \\rVert^2 + f(\\beta_2) \\\\ =~& \\lVert (I - X_1 (X_1^\\top X_1)^{-1}X_1^\\top)(y - X_2 \\beta_2) \\rVert^2 + f(\\beta_2) But, letting :math:`M_{X_1} = I - X_1 (X_1^\\top X_1)^{-1}X_1^\\top`, we see that this is .. math:: \\lVert (M_{X_1} y) - (M_{X_1} X_2) \\beta_2 \\rVert^2 + f(\\beta_2) so finding the minimizing :math:`\\beta_2` can be done by regressing :math:`M_{X_1} y` on :math:`M_{X_1} X_2` using the penalized regression method incorporating :math:`f`. Note that these are just the residual values of :math:`y` and :math:`X_2` when regressed on :math:`X_1` using OLS. Parameters ---------- unpenalized_inds : list of int, other 1-dimensional indexing expression, or callable The indices that should not be penalized when the model is fit; all other indices will be penalized. If this is a callable, it will be called with the arguments to `fit` and should return a corresponding indexing expression. For example, ``lambda X, y: unpenalized_inds=slice(1,-1)`` will result in only the first and last indices being penalized. penalized_model : :term:`regressor` A penalized linear regression model fit_intercept : bool, default True Whether to fit an intercept; the intercept will not be penalized if it is fit Attributes ---------- coef_ : array, shape (n_features, ) or (n_targets, n_features) Estimated coefficients for the linear regression problem. If multiple targets are passed during the fit (y 2D), this is a 2D array of shape (n_targets, n_features), while if only one target is passed, this is a 1D array of length n_features. intercept_ : float or array of shape (n_targets) Independent term in the linear model. penalized_model : :term:`regressor` The penalized linear regression model, cloned from the one passed into the initializer """
[docs] def __init__(self, unpenalized_inds, penalized_model, fit_intercept=True): self._unpenalized_inds_expr = unpenalized_inds self.penalized_model = clone(penalized_model) self._fit_intercept = fit_intercept
[docs] def fit(self, X, y, sample_weight=None): """ Fit the model. Parameters ---------- X : array_like, shape (n, d_x) The features to regress against y : array_like, shape (n,) or (n, d_y) The regression target sample_weight : array_like, shape (n,), optional Relative weights for each sample """ X, y = check_X_y(X, y, multi_output=True, estimator=self) if callable(self._unpenalized_inds_expr): if sample_weight is None: self._unpenalized_inds = self._unpenalized_inds_expr(X, y) else: self._unpenalized_inds = self._unpenalized_inds_expr(X, y, sample_weight=sample_weight) else: self._unpenalized_inds = self._unpenalized_inds_expr mask = np.ones(X.shape[1], dtype=bool) mask[self._unpenalized_inds] = False self._penalized_inds = np.arange(X.shape[1])[mask] X1 = X[:, self._unpenalized_inds] X2 = X[:, self._penalized_inds] X2_res = X2 - LinearRegression(fit_intercept=self._fit_intercept).fit(X1, X2, sample_weight=sample_weight).predict(X1) y_res = y - LinearRegression(fit_intercept=self._fit_intercept).fit(X1, y, sample_weight=sample_weight).predict(X1) if sample_weight is not None: self.penalized_model.fit(X2_res, y_res, sample_weight=sample_weight) else: self.penalized_model.fit(X2_res, y_res) # The unpenalized model can't contain an intercept, because in the analysis above # we rely on the fact that M(X beta) = (M X) beta, but M(X beta + c) is not the same # as (M X) beta + c, so the learned coef and intercept will be wrong intercept = self.penalized_model.predict(np.zeros_like(X2[0:1])) if not np.allclose(intercept, 0): raise AttributeError("The penalized model has a non-zero intercept; to fit an intercept " "you should instead either set fit_intercept to True when initializing the " "SelectiveRegression instance (for an unpenalized intercept) or " "explicitly add a column of ones to the data being fit and include that " "column in the penalized indices.") # now regress X1 on y - X2 * beta2 to learn beta1 self._model_X1 = LinearRegression(fit_intercept=self._fit_intercept) self._model_X1.fit(X1, y - self.penalized_model.predict(X2), sample_weight=sample_weight) # set coef_ and intercept_ attributes self.coef_ = np.empty(shape(y)[1:] + shape(X)[1:]) self.coef_[..., self._penalized_inds] = self.penalized_model.coef_ self.coef_[..., self._unpenalized_inds] = self._model_X1.coef_ # Note that the penalized model should *not* have an intercept self.intercept_ = self._model_X1.intercept_ return self
[docs] def predict(self, X): """ Make a prediction for each sample. Parameters ---------- X : array_like, shape (m, d_x) The samples whose targets to predict Output ------ arr : array_like, shape (m,) or (m, d_y) The predicted targets """ check_is_fitted(self, "coef_") X1 = X[:, self._unpenalized_inds] X2 = X[:, self._penalized_inds] return self._model_X1.predict(X1) + self.penalized_model.predict(X2)
[docs] def score(self, X, y): """ Score the predictions for a set of features to ground truth. Parameters ---------- X : array_like, shape (m, d_x) The samples to predict y : array_like, shape (m,) or (m, d_y) The ground truth targets Output ------ score : float The model's score """ check_is_fitted(self, "coef_") X, y = check_X_y(X, y, multi_output=True, estimator=self) return r2_score(y, self.predict(X))
known_params = {'known_params', 'coef_', 'intercept_', 'penalized_model', '_unpenalized_inds_expr', '_fit_intercept', '_unpenalized_inds', '_penalized_inds', '_model_X1'} def __getattr__(self, key): # don't proxy special methods if key.startswith('__'): raise AttributeError(key) # don't pass get_params through to model, because that will cause sklearn to clone this # regressor incorrectly if key != "get_params" and key not in self.known_params: return getattr(self.penalized_model, key) else: # Note: for known attributes that have been set this method will not be called, # so we should just throw here because this is an attribute belonging to this class # but which hasn't yet been set on this instance raise AttributeError("No attribute " + key) def __setattr__(self, key, value): if key not in self.known_params: setattr(self.penalized_model, key, value) else: super().__setattr__(key, value)
class _StatsModelsWrapper(BaseEstimator): """ Parent class for statsmodels linear models. At init time each children class should set the boolean flag property fit_intercept. At fit time, each children class must calculate and set the following properties: _param: (m,) or (m, p) array Where m is number of features and p is number of outcomes, which corresponds to the coefficients of the linear model (including the intercept in the first column if fit_intercept=True). _param_var: (m, m) or (p, m, m) array Where m is number of features and p is number of outcomes, where each (m, m) matrix corresponds to the scaled covariance matrix of the parameters of the linear model. _n_out: the second dimension of the training y, or 0 if y is a vector """ def predict(self, X): """ Predicts the output given an array of instances. Parameters ---------- X : (n, d) array_like The covariates on which to predict Returns ------- predictions : {(n,) array, (n,p) array} The predicted mean outcomes """ if X is None: X = np.empty((1, 0)) if self.fit_intercept: X = add_constant(X, has_constant='add') return np.matmul(X, self._param) @property def coef_(self): """ Get the model's coefficients on the covariates. Returns ------- coef_ : {(d,), (p, d)} nd array_like The coefficients of the variables in the linear regression. If label y was p-dimensional, then the result is a matrix of coefficents, whose p-th row containts the coefficients corresponding to the p-th coordinate of the label. """ if self.fit_intercept: if self._n_out == 0: return self._param[1:] else: return self._param[1:].T else: if self._n_out == 0: return self._param else: return self._param.T @property def intercept_(self): """ Get the intercept(s) (or 0 if no intercept was fit). Returns ------- intercept_ : float or (p,) nd array_like The intercept of the linear regresion. If label y was p-dimensional, then the result is a vector whose p-th entry containts the intercept corresponding to the p-th coordinate of the label. """ return self._param[0] if self.fit_intercept else (0 if self._n_out == 0 else np.zeros(self._n_out)) @property def _param_stderr(self): """ The standard error of each parameter that was estimated. Returns ------- _param_stderr : {(d (+1),) (d (+1), p)} nd array_like The standard error of each parameter that was estimated. """ if self._n_out == 0: return np.sqrt(np.clip(np.diag(self._param_var), 0, np.inf)) else: return np.array([np.sqrt(np.clip(np.diag(v), 0, np.inf)) for v in self._param_var]).T @property def coef_stderr_(self): """ Gets the standard error of the fitted coefficients. Returns ------- coef_stderr_ : {(d,), (p, d)} nd array_like The standard error of the coefficients """ return self._param_stderr[1:].T if self.fit_intercept else self._param_stderr.T @property def intercept_stderr_(self): """ Gets the standard error of the intercept(s) (or 0 if no intercept was fit). Returns ------- intercept_stderr_ : float or (p,) nd array_like The standard error of the intercept(s) """ return self._param_stderr[0] if self.fit_intercept else (0 if self._n_out == 0 else np.zeros(self._n_out)) def prediction_stderr(self, X): """ Gets the standard error of the predictions. Parameters ---------- X : (n, d) array_like The covariates at which to predict Returns ------- prediction_stderr : (n, p) array_like The standard error of each coordinate of the output at each point we predict """ if X is None: X = np.empty((1, 0)) if self.fit_intercept: X = add_constant(X, has_constant='add') if self._n_out == 0: return np.sqrt(np.clip(np.sum(np.matmul(X, self._param_var) * X, axis=1), 0, np.inf)) else: return np.array([np.sqrt(np.clip(np.sum(np.matmul(X, v) * X, axis=1), 0, np.inf)) for v in self._param_var]).T def coef__interval(self, alpha=0.05): """ Gets a confidence interval bounding the fitted coefficients. Parameters ---------- alpha : float, default 0.05 The confidence level. Will calculate the alpha/2-quantile and the (1-alpha/2)-quantile of the parameter distribution as confidence interval Returns ------- coef__interval : {tuple ((p, d) array, (p,d) array), tuple ((d,) array, (d,) array)} The lower and upper bounds of the confidence interval of the coefficients """ return np.array([_safe_norm_ppf(alpha / 2, loc=p, scale=err) for p, err in zip(self.coef_, self.coef_stderr_)]), \ np.array([_safe_norm_ppf(1 - alpha / 2, loc=p, scale=err) for p, err in zip(self.coef_, self.coef_stderr_)]) def intercept__interval(self, alpha=0.05): """ Gets a confidence interval bounding the intercept(s) (or 0 if no intercept was fit). Parameters ---------- alpha : float, default 0.05 The confidence level. Will calculate the alpha/2-quantile and the (1-alpha/2)-quantile of the parameter distribution as confidence interval Returns ------- intercept__interval : {tuple ((p,) array, (p,) array), tuple (float, float)} The lower and upper bounds of the confidence interval of the intercept(s) """ if not self.fit_intercept: return (0 if self._n_out == 0 else np.zeros(self._n_out)), \ (0 if self._n_out == 0 else np.zeros(self._n_out)) if self._n_out == 0: return _safe_norm_ppf(alpha / 2, loc=self.intercept_, scale=self.intercept_stderr_), \ _safe_norm_ppf(1 - alpha / 2, loc=self.intercept_, scale=self.intercept_stderr_) else: return np.array([_safe_norm_ppf(alpha / 2, loc=p, scale=err) for p, err in zip(self.intercept_, self.intercept_stderr_)]), \ np.array([_safe_norm_ppf(1 - alpha / 2, loc=p, scale=err) for p, err in zip(self.intercept_, self.intercept_stderr_)]) def predict_interval(self, X, alpha=0.05): """ Gets a confidence interval bounding the prediction. Parameters ---------- X : (n, d) array_like The covariates on which to predict alpha : float, default 0.05 The confidence level. Will calculate the alpha/2-quantile and the (1-alpha/2)-quantile of the parameter distribution as confidence interval Returns ------- prediction_intervals : {tuple ((n,) array, (n,) array), tuple ((n,p) array, (n,p) array)} The lower and upper bounds of the confidence intervals of the predicted mean outcomes """ return np.array([_safe_norm_ppf(alpha / 2, loc=p, scale=err) for p, err in zip(self.predict(X), self.prediction_stderr(X))]), \ np.array([_safe_norm_ppf(1 - alpha / 2, loc=p, scale=err) for p, err in zip(self.predict(X), self.prediction_stderr(X))])
[docs]class StatsModelsLinearRegression(_StatsModelsWrapper): """ Class which mimics weighted linear regression from the statsmodels package. However, unlike statsmodels WLS, this class also supports sample variances in addition to sample weights, which enables more accurate inference when working with summarized data. Parameters ---------- fit_intercept : bool, default True Whether to fit an intercept in this model cov_type : string, default "HC0" The covariance approach to use. Supported values are "HCO", "HC1", and "nonrobust". enable_federation : bool, default False Whether to enable federation (aggregating this model's results with other models in a distributed setting). This requires additional memory proportional to the number of columns in X to the fourth power. """
[docs] def __init__(self, fit_intercept=True, cov_type="HC0", *, enable_federation=False): self.cov_type = cov_type self.fit_intercept = fit_intercept self.enable_federation = enable_federation
def _check_input(self, X, y, sample_weight, freq_weight, sample_var): """Check dimensions and other assertions.""" X, y, sample_weight, freq_weight, sample_var = check_input_arrays( X, y, sample_weight, freq_weight, sample_var, dtype='numeric') if X is None: X = np.empty((y.shape[0], 0)) if self.fit_intercept: X = add_constant(X, has_constant='add') # set default values for None if sample_weight is None: sample_weight = np.ones(y.shape[0]) if freq_weight is None: freq_weight = np.ones(y.shape[0]) if sample_var is None: sample_var = np.zeros(y.shape) # check freq_weight should be integer and should be accompanied by sample_var if np.any(np.not_equal(np.mod(freq_weight, 1), 0)): raise AttributeError("Frequency weights must all be integers for inference to be valid!") if sample_var.ndim < 2: if np.any(np.equal(freq_weight, 1) & np.not_equal(sample_var, 0)): warnings.warn( "Variance was set to non-zero for an observation with freq_weight=1! " "sample_var represents the variance of the original observations that are " "summarized in this sample. Hence, cannot have a non-zero variance if only " "one observations was summarized. Inference will be invalid!") elif np.any(np.not_equal(freq_weight, 1) & np.equal(sample_var, 0)): warnings.warn( "Variance was set to zero for an observation with freq_weight>1! " "sample_var represents the variance of the original observations that are " "summarized in this sample. If it's zero, please use sample_wegiht instead " "to reflect the weight for each individual sample!") else: if np.any(np.equal(freq_weight, 1) & np.not_equal(np.sum(sample_var, axis=1), 0)): warnings.warn( "Variance was set to non-zero for an observation with freq_weight=1! " "sample_var represents the variance of the original observations that are " "summarized in this sample. Hence, cannot have a non-zero variance if only " "one observations was summarized. Inference will be invalid!") elif np.any(np.not_equal(freq_weight, 1) & np.equal(np.sum(sample_var, axis=1), 0)): warnings.warn( "Variance was set to zero for an observation with freq_weight>1! " "sample_var represents the variance of the original observations that are " "summarized in this sample. If it's zero, please use sample_wegiht instead " "to reflect the weight for each individual sample!") # check array shape assert (X.shape[0] == y.shape[0] == sample_weight.shape[0] == freq_weight.shape[0] == sample_var.shape[0]), "Input lengths not compatible!" if y.ndim >= 2: assert (y.ndim == sample_var.ndim and y.shape[1] == sample_var.shape[1]), "Input shapes not compatible: {}, {}!".format( y.shape, sample_var.shape) # weight X and y and sample_var weighted_X = X * np.sqrt(sample_weight).reshape(-1, 1) if y.ndim < 2: weighted_y = y * np.sqrt(sample_weight) sample_var = sample_var * sample_weight else: weighted_y = y * np.sqrt(sample_weight).reshape(-1, 1) sample_var = sample_var * (sample_weight.reshape(-1, 1)) return weighted_X, weighted_y, freq_weight, sample_var
[docs] def fit(self, X, y, sample_weight=None, freq_weight=None, sample_var=None): """ Fits the model. Parameters ---------- X : (N, d) nd array_like co-variates y : {(N,), (N, p)} nd array_like output variable(s) sample_weight : (N,) array_like or None Individual weights for each sample. If None, it assumes equal weight. freq_weight: (N, ) array_like of int or None Weight for the observation. Observation i is treated as the mean outcome of freq_weight[i] independent observations. When ``sample_var`` is not None, this should be provided. sample_var : {(N,), (N, p)} nd array_like or None Variance of the outcome(s) of the original freq_weight[i] observations that were used to compute the mean outcome represented by observation i. Returns ------- self : StatsModelsLinearRegression """ # TODO: Add other types of covariance estimation (e.g. Newey-West (HAC), HC2, HC3) X, y, freq_weight, sample_var = self._check_input(X, y, sample_weight, freq_weight, sample_var) WX = X * np.sqrt(freq_weight).reshape(-1, 1) if y.ndim < 2: self._n_out = 0 wy = y * np.sqrt(freq_weight) else: self._n_out = y.shape[1] wy = y * np.sqrt(freq_weight).reshape(-1, 1) param, _, rank, _ = np.linalg.lstsq(WX, wy, rcond=None) n_obs = np.sum(freq_weight) self._n_obs = n_obs df = param.shape[0] if rank < df: warnings.warn("Co-variance matrix is underdetermined. Inference will be invalid!") if n_obs <= df: warnings.warn("Number of observations <= than number of parameters. Using biased variance calculation!") correction = 1 else: correction = (n_obs / (n_obs - df)) # For aggregation calculations, always treat wy as an array so that einsum expressions don't need to change # We'll collapse results back down afterwards if necessary wy = wy.reshape(-1, 1) if y.ndim < 2 else wy sv = sample_var.reshape(-1, 1) if y.ndim < 2 else sample_var self.XX = np.matmul(WX.T, WX) self.Xy = np.matmul(WX.T, wy) if self.enable_federation: # for federation, we need to store these 5 arrays when using heteroskedasticity-robust inference if (self.cov_type in ['HC0', 'HC1']): # y dimension is always first in the output when present so that broadcasting works correctly self.XXyy = np.einsum('nw,nx,ny,ny->ywx', X, X, wy, wy) self.XXXy = np.einsum('nv,nw,nx,ny->yvwx', X, X, WX, wy) self.XXXX = np.einsum('nu,nv,nw,nx->uvwx', X, X, WX, WX) self.sample_var = np.einsum('nw,nx,ny->ywx', WX, WX, sv) elif (self.cov_type is None) or (self.cov_type == 'nonrobust'): self.XXyy = np.einsum('ny,ny->y', wy, wy) self.XXXy = np.einsum('nx,ny->yx', WX, wy) self.XXXX = np.einsum('nw,nx->wx', WX, WX) self.sample_var = np.average(sv, weights=freq_weight, axis=0) * n_obs sigma_inv = np.linalg.pinv(self.XX) var_i = sample_var + (y - np.matmul(X, param))**2 self._param = param if (self.cov_type is None) or (self.cov_type == 'nonrobust'): if y.ndim < 2: self._var = correction * np.average(var_i, weights=freq_weight) * sigma_inv else: vars = correction * np.average(var_i, weights=freq_weight, axis=0) self._var = [v * sigma_inv for v in vars] elif (self.cov_type == 'HC0'): if y.ndim < 2: weighted_sigma = np.matmul(WX.T, WX * var_i.reshape(-1, 1)) self._var = np.matmul(sigma_inv, np.matmul(weighted_sigma, sigma_inv)) else: self._var = [] for j in range(self._n_out): weighted_sigma = np.matmul(WX.T, WX * var_i[:, [j]]) self._var.append(np.matmul(sigma_inv, np.matmul(weighted_sigma, sigma_inv))) elif (self.cov_type == 'HC1'): if y.ndim < 2: weighted_sigma = np.matmul(WX.T, WX * var_i.reshape(-1, 1)) self._var = correction * np.matmul(sigma_inv, np.matmul(weighted_sigma, sigma_inv)) else: self._var = [] for j in range(self._n_out): weighted_sigma = np.matmul(WX.T, WX * var_i[:, [j]]) self._var.append(correction * np.matmul(sigma_inv, np.matmul(weighted_sigma, sigma_inv))) else: raise AttributeError("Unsupported cov_type. Must be one of nonrobust, HC0, HC1.") self._param_var = np.array(self._var) return self
[docs] @staticmethod def aggregate(models: List[StatsModelsLinearRegression]): """ Aggregate multiple models into one. Parameters ---------- models : list of StatsModelsLinearRegression The models to aggregate Returns ------- agg_model : StatsModelsLinearRegression The aggregated model """ if len(models) == 0: raise ValueError("Must aggregate at least one model!") cov_types = set([model.cov_type for model in models]) fit_intercepts = set([model.fit_intercept for model in models]) enable_federation = set([model.enable_federation for model in models]) _n_outs = set([model._n_out for model in models]) assert enable_federation == {True}, "All models must have enable_federation=True!" assert len(cov_types) == 1, "All models must have the same cov_type!" assert len(fit_intercepts) == 1, "All models must have the same fit_intercept!" assert len(_n_outs) == 1, "All models must have the same number of outcomes!" agg_model = StatsModelsLinearRegression(cov_type=models[0].cov_type, fit_intercept=models[0].fit_intercept) agg_model._n_out = models[0]._n_out XX = np.sum([model.XX for model in models], axis=0) Xy = np.sum([model.Xy for model in models], axis=0) XXyy = np.sum([model.XXyy for model in models], axis=0) XXXy = np.sum([model.XXXy for model in models], axis=0) XXXX = np.sum([model.XXXX for model in models], axis=0) sample_var = np.sum([model.sample_var for model in models], axis=0) n_obs = np.sum([model._n_obs for model in models], axis=0) sigma_inv = np.linalg.pinv(XX) param = sigma_inv @ Xy df = np.shape(param)[0] agg_model._param = param if agg_model._n_out > 0 else param.squeeze(1) if n_obs <= df: warnings.warn("Number of observations <= than number of parameters. Using biased variance calculation!") correction = 1 elif agg_model.cov_type == 'HC0': correction = 1 else: # both HC1 and nonrobust use the same correction factor correction = (n_obs / (n_obs - df)) if agg_model.cov_type in ['HC0', 'HC1']: weighted_sigma = XXyy - 2 * np.einsum('yvwx,vy->ywx', XXXy, param) + \ np.einsum('uvwx,uy,vy->ywx', XXXX, param, param) + sample_var if agg_model._n_out == 0: agg_model._var = correction * np.matmul(sigma_inv, np.matmul(weighted_sigma.squeeze(0), sigma_inv)) else: agg_model._var = [correction * np.matmul(sigma_inv, np.matmul(ws, sigma_inv)) for ws in weighted_sigma] else: assert agg_model.cov_type == 'nonrobust' or agg_model.cov_type is None sigma = XXyy - 2 * np.einsum('yx,xy->y', XXXy, param) + np.einsum('wx,wy,xy->y', XXXX, param, param) var_i = (sample_var + sigma) / n_obs if agg_model._n_out == 0: agg_model._var = correction * var_i * sigma_inv else: agg_model._var = [correction * var * sigma_inv for var in var_i] agg_model._param_var = np.array(agg_model._var) (agg_model.XX, agg_model.Xy, agg_model.XXyy, agg_model.XXXy, agg_model.XXXX, agg_model.sample_var, agg_model._n_obs) = XX, Xy, XXyy, XXXy, XXXX, sample_var, n_obs return agg_model
[docs]class StatsModelsRLM(_StatsModelsWrapper): """ Class which mimics robust linear regression from the statsmodels package. Parameters ---------- t : float, default 1.345 The tuning constant for Huber’s t function maxiter : int, default 50 The maximum number of iterations to try tol : float, default 1e-08 The convergence tolerance of the estimate fit_intercept : bool, default True Whether to fit an intercept in this model cov_type : {'H1', 'H2', or 'H3'}, default 'H1' Indicates how the covariance matrix is estimated. See statsmodels.robust.robust_linear_model.RLMResults for more information. """
[docs] def __init__(self, t=1.345, maxiter=50, tol=1e-08, fit_intercept=True, cov_type='H1'): self.t = t self.maxiter = maxiter self.tol = tol self.cov_type = cov_type self.fit_intercept = fit_intercept return
def _check_input(self, X, y): """Check dimensions and other assertions.""" if X is None: X = np.empty((y.shape[0], 0)) assert (X.shape[0] == y.shape[0]), "Input lengths not compatible!" return X, y
[docs] def fit(self, X, y): """ Fits the model. Parameters ---------- X : (N, d) nd array_like co-variates y : (N,) nd array_like or (N, p) array_like output variable Returns ------- self : StatsModelsRLM """ X, y = self._check_input(X, y) if self.fit_intercept: X = add_constant(X, has_constant='add') self._n_out = 0 if len(y.shape) == 1 else (y.shape[1],) def model_gen(y): return RLM(endog=y, exog=X, M=statsmodels.robust.norms.HuberT(t=self.t)).fit(cov=self.cov_type, maxiter=self.maxiter, tol=self.tol) if y.ndim < 2: self.model = model_gen(y) self._param = self.model.params self._param_var = self.model.cov_params() else: self.models = [model_gen(y[:, i]) for i in range(y.shape[1])] self._param = np.array([mdl.params for mdl in self.models]).T self._param_var = np.array([mdl.cov_params() for mdl in self.models]) return self
class StatsModels2SLS(_StatsModelsWrapper): """ Class that solves the moment equation E[(y-theta*T)*Z]=0 Parameters ---------- cov_type : {'HC0', 'HC1', 'nonrobust', or None}, default 'HC0' Indicates how the covariance matrix is estimated. """ def __init__(self, cov_type="HC0"): self.fit_intercept = False self.cov_type = cov_type return def _check_input(self, Z, T, y, sample_weight): """Check dimensions and other assertions.""" # set default values for None if sample_weight is None: sample_weight = np.ones(y.shape[0]) # check array shape assert (T.shape[0] == Z.shape[0] == y.shape[0] == sample_weight.shape[0]), "Input lengths not compatible!" # check dimension of instruments is more than dimension of treatments if Z.shape[1] < T.shape[1]: raise AssertionError("The number of treatments couldn't be larger than the number of instruments!" + " If you are using a treatment featurizer, make sure the number of featurized" + " treatments is less than or equal to the number of instruments. You can either" + " featurize the instrument yourself, or consider using projection = True" + " along with a flexible model_t_xwz.") # weight X and y weighted_Z = Z * np.sqrt(sample_weight).reshape(-1, 1) weighted_T = T * np.sqrt(sample_weight).reshape(-1, 1) if y.ndim < 2: weighted_y = y * np.sqrt(sample_weight) else: weighted_y = y * np.sqrt(sample_weight).reshape(-1, 1) return weighted_Z, weighted_T, weighted_y def fit(self, Z, T, y, sample_weight=None, freq_weight=None, sample_var=None): """ Fits the model. Parameters ---------- Z : {(N, p)} nd array_like instrumental variables T : {(N, p)} nd array_like treatment variables y : {(N,), (N, p)} nd array_like output variables sample_weight : (N,) array_like or None Individual weights for each sample. If None, it assumes equal weight. freq_weight: (N, ) array_like of int or None Weight for the observation. Observation i is treated as the mean outcome of freq_weight[i] independent observations. When ``sample_var`` is not None, this should be provided. sample_var : {(N,), (N, p)} nd array_like or None Variance of the outcome(s) of the original freq_weight[i] observations that were used to compute the mean outcome represented by observation i. Returns ------- self : StatsModels2SLS """ assert freq_weight is None, "freq_weight is not supported yet for this class!" assert sample_var is None, "sample_var is not supported yet for this class!" Z, T, y = self._check_input(Z, T, y, sample_weight) self._n_out = 0 if y.ndim < 2 else y.shape[1] # learn point estimate # solve first stage linear regression E[T|Z] zT_z = np.dot(Z.T, Z) zT_t = np.dot(Z.T, T) # "that" means T̂ self._thatparams = np.linalg.solve(zT_z, zT_t) that = np.dot(Z, self._thatparams) # solve second stage linear regression E[Y|that] # (T̂.T*T̂)^{-1} thatT_that = np.dot(that.T, that) thatT_y = np.dot(that.T, y) param = np.linalg.solve(thatT_that, thatT_y) self._param = param n_obs = y.shape[0] df = len(param) if self._n_out == 0 else param.shape[0] if n_obs <= df: warnings.warn("Number of observations <= than number of parameters. Using biased variance calculation!") correction = 1 else: correction = (n_obs / (n_obs - df)) # learn cov(theta) # (T̂.T*T̂)^{-1} thatT_that_inv = np.linalg.inv(thatT_that) # sigma^2 var_i = (y - np.dot(T, param))**2 # reference: http://www.hec.unil.ch/documents/seminars/deep/361.pdf if (self.cov_type is None) or (self.cov_type == 'nonrobust'): if y.ndim < 2: self._var = correction * np.average(var_i) * thatT_that_inv else: sigma2 = correction * np.average(var_i, axis=0) self._var = [s * thatT_that_inv for s in sigma2] elif (self.cov_type == 'HC0'): if y.ndim < 2: weighted_sigma = np.matmul(that.T, that * var_i.reshape(-1, 1)) self._var = np.matmul(thatT_that_inv, np.matmul(weighted_sigma, thatT_that_inv)) else: self._var = [] for j in range(self._n_out): weighted_sigma = np.matmul(that.T, that * var_i[:, [j]]) self._var.append(np.matmul(thatT_that_inv, np.matmul(weighted_sigma, thatT_that_inv))) elif (self.cov_type == 'HC1'): if y.ndim < 2: weighted_sigma = np.matmul(that.T, that * var_i.reshape(-1, 1)) self._var = correction * np.matmul(thatT_that_inv, np.matmul(weighted_sigma, thatT_that_inv)) else: self._var = [] for j in range(self._n_out): weighted_sigma = np.matmul(that.T, that * var_i[:, [j]]) self._var.append(correction * np.matmul(thatT_that_inv, np.matmul(weighted_sigma, thatT_that_inv))) else: raise AttributeError("Unsupported cov_type. Must be one of nonrobust, HC0, HC1.") self._param_var = np.array(self._var) return self