Source code for econml.panel.dml._dml

# Copyright (c) PyWhy contributors. All rights reserved.
# Licensed under the MIT License.

import abc
import numpy as np
from warnings import warn
from sklearn.base import clone
from sklearn.model_selection import GroupKFold
from scipy.stats import norm
from sklearn.linear_model import (ElasticNetCV, LassoCV, LogisticRegressionCV)
from ...sklearn_extensions.linear_model import (StatsModelsLinearRegression, WeightedLassoCVWrapper)
from ...sklearn_extensions.model_selection import ModelSelector, WeightedStratifiedKFold
from ...dml.dml import _make_first_stage_selector, _FinalWrapper
from ..._cate_estimator import TreatmentExpansionMixin, LinearModelFinalCateEstimatorMixin
from ..._ortho_learner import _OrthoLearner
from ...utilities import (_deprecate_positional, add_intercept,
                          broadcast_unit_treatments, check_high_dimensional,
                          cross_product, deprecated,
                          hstack, inverse_onehot, ndim, reshape,
                          reshape_treatmentwise_effects, shape, transpose,
                          get_feature_names_or_default, check_input_arrays,
                          filter_none_kwargs)


def _get_groups_period_filter(groups, n_periods):
    group_counts = {}
    group_period_filter = {i: [] for i in range(n_periods)}
    for i, g in enumerate(groups):
        if g not in group_counts:
            group_counts[g] = 0
        group_period_filter[group_counts[g]].append(i)
        group_counts[g] += 1
    return group_period_filter


class _DynamicModelNuisanceSelector(ModelSelector):
    """
    Nuisance model fits the model_y and model_t at fit time and at predict time
    calculates the residual Y and residual T based on the fitted models and returns
    the residuals as two nuisance parameters.
    """

    def __init__(self, model_y, model_t, n_periods):
        self._model_y = model_y
        self._model_t = model_t
        self.n_periods = n_periods

    def train(self, is_selecting, folds, Y, T, X=None, W=None, sample_weight=None, groups=None):
        """Fit a series of nuisance models for each period or period pairs."""
        assert Y.shape[0] % self.n_periods == 0, \
            "Length of training data should be an integer multiple of time periods."
        period_filters = _get_groups_period_filter(groups, self.n_periods)
        if not hasattr(self, '_model_y_trained'):  # create the per-period y and t models
            self._model_y_trained = {t: clone(self._model_y, safe=False)
                                     for t in np.arange(self.n_periods)}
            self._model_t_trained = {j: {t: clone(self._model_t, safe=False)
                                         for t in np.arange(j + 1)}
                                     for j in np.arange(self.n_periods)}

        # we have to filter the folds because they contain the indices in the original data not
        # the indices in the period-filtered data

        def _translate_inds(t, inds):
            # translate the indices in a fold to the indices in the period-filtered data
            # if groups was [3,3,4,4,5,5,6,6,1,1,2,2,0,0] (the group ids can be in any order, but the
            # time periods for each group should be contguous), and we had [10,11,0,1] as the indices in a fold
            # (so the fold is taking the entries corresponding to groups 2 and 3)
            # then group_period_filter(0) is [0,2,4,6,8,10,12] and gpf(1) is [1,3,5,7,9,11,13]
            # so for period 1, the fold should be [10,0] => [5,0] (the indices that return 10 and 0 in the t=0 data)
            # and for period 2, the fold should be [11,1] => [5,0] again (the indices that return 11,1 in the t=1 data)

            # filter to the indices for the time period
            inds = inds[np.isin(inds, period_filters[t])]

            # now find their index in the period-filtered data, which is always sorted
            return np.searchsorted(period_filters[t], inds)

        if folds is not None:
            translated_folds = []
            for (train, test) in folds:
                translated_folds.append((_translate_inds(0, train), _translate_inds(0, test)))
                # sanity check that the folds are the same no matter the time period
                for t in range(1, self.n_periods):
                    assert np.array_equal(_translate_inds(t, train), _translate_inds(0, train))
                    assert np.array_equal(_translate_inds(t, test), _translate_inds(0, test))
        else:
            translated_folds = None

        for t in np.arange(self.n_periods):
            self._model_y_trained[t].train(
                is_selecting, translated_folds,
                self._index_or_None(X, period_filters[t]),
                self._index_or_None(
                    W, period_filters[t]),
                Y[period_filters[self.n_periods - 1]])
            for j in np.arange(t, self.n_periods):
                self._model_t_trained[j][t].train(
                    is_selecting, translated_folds,
                    self._index_or_None(X, period_filters[t]),
                    self._index_or_None(W, period_filters[t]),
                    T[period_filters[j]])
        return self

    def predict(self, Y, T, X=None, W=None, sample_weight=None, groups=None):
        """Calculate nuisances for each period or period pairs.

        Returns
        -------
        Y_res : (n, d_y) matrix or vector of length n
            Y residuals for each period in panel format.
            This shape is required for _OrthoLearner's crossfitting.
        T_res : (n, d_t, n_periods) matrix
            T residuals for pairs of periods (t, j), where the data is in panel format for t
            and in index form for j. For example, the residuals for (t, j) can be retrieved via
            T_res[np.arange(n) % n_periods == t, ..., j]. For t < j, the entries of this
            matrix are np.nan.
            This shape is required for _OrthoLearner's crossfitting.
        """
        assert Y.shape[0] % self.n_periods == 0, \
            "Length of training data should be an integer multiple of time periods."
        period_filters = _get_groups_period_filter(groups, self.n_periods)
        Y_res = np.full(Y.shape, np.nan)
        T_res = np.full(T.shape + (self.n_periods, ), np.nan)
        shape_formatter = self._get_shape_formatter(X, W)
        for t in np.arange(self.n_periods):
            Y_slice = Y[period_filters[self.n_periods - 1]]
            Y_pred = self._model_y_trained[t].predict(
                self._index_or_None(X, period_filters[t]),
                self._index_or_None(W, period_filters[t]))
            Y_res[period_filters[t]] = Y_slice\
                - shape_formatter(Y_slice, Y_pred)
            for j in np.arange(t, self.n_periods):
                T_slice = T[period_filters[j]]
                T_pred = self._model_t_trained[j][t].predict(
                    self._index_or_None(X, period_filters[t]),
                    self._index_or_None(W, period_filters[t]))
                T_res[period_filters[j], ..., t] = T_slice\
                    - shape_formatter(T_slice, T_pred)
        return Y_res, T_res

    def score(self, Y, T, X=None, W=None, sample_weight=None, groups=None):
        assert Y.shape[0] % self.n_periods == 0, \
            "Length of training data should be an integer multiple of time periods."
        period_filters = _get_groups_period_filter(groups, self.n_periods)
        if hasattr(self._model_y, 'score'):
            Y_score = np.full((self.n_periods, ), np.nan)
            for t in np.arange(self.n_periods):
                Y_score[t] = self._model_y_trained[t].score(
                    self._index_or_None(X, period_filters[t]),
                    self._index_or_None(W, period_filters[t]),
                    Y[period_filters[self.n_periods - 1]])
        else:
            Y_score = None
        if hasattr(self._model_t, 'score'):
            T_score = np.full((self.n_periods, self.n_periods), np.nan)
            for t in np.arange(self.n_periods):
                for j in np.arange(t, self.n_periods):
                    T_score[j][t] = self._model_t_trained[j][t].score(
                        self._index_or_None(X, period_filters[t]),
                        self._index_or_None(W, period_filters[t]),
                        T[period_filters[j]])
        else:
            T_score = None
        return Y_score, T_score

    def _get_shape_formatter(self, X, W):
        if (X is None) and (W is None):
            return lambda x, x_pred: np.tile(x_pred.reshape(1, -1), (x.shape[0], 1)).reshape(x.shape)
        return lambda x, x_pred: x_pred.reshape(x.shape)

    def _index_or_None(self, X, filter_idx):
        return None if X is None else X[filter_idx]


class _DynamicModelFinal:
    """
    Final model at fit time, fits a residual on residual regression with a heterogeneous coefficient
    that depends on X, i.e.

        .. math ::
            Y - E[Y | X, W] = \\theta(X) \\cdot (T - E[T | X, W]) + \\epsilon

    and at predict time returns :math:`\\theta(X)`. The score method returns the MSE of this final
    residual on residual regression.
    Assumes model final is parametric with no intercept.
    """
    # TODO: update docs

    def __init__(self, model_final, n_periods):
        self._model_final = model_final
        self.n_periods = n_periods
        self._model_final_trained = {k: clone(self._model_final, safe=False) for k in np.arange(n_periods)}

    def fit(self, Y, T, X=None, W=None, Z=None, nuisances=None, sample_weight=None, sample_var=None, groups=None):
        # NOTE: sample weight, sample var are not passed in
        period_filters = _get_groups_period_filter(groups, self.n_periods)
        Y_res, T_res = nuisances
        self._d_y = Y.shape[1:]
        for t in np.arange(self.n_periods - 1, -1, -1):
            Y_adj = Y_res[period_filters[t]].copy()
            if t < self.n_periods - 1:
                Y_adj -= np.sum(
                    [self._model_final_trained[j].predict_with_res(
                        X[period_filters[0]] if X is not None else None,
                        T_res[period_filters[j], ..., t]
                    ) for j in np.arange(t + 1, self.n_periods)], axis=0)
            self._model_final_trained[t].fit(
                X[period_filters[0]] if X is not None else None, T[period_filters[t]],
                T_res[period_filters[t], ..., t], Y_adj)

        return self

    def predict(self, X=None):
        """
        Return shape: m x dy x (p*dt)
        """
        d_t_tuple = self._model_final_trained[0]._d_t
        d_t = d_t_tuple[0] if d_t_tuple else 1
        x_dy_shape = (X.shape[0] if X is not None else 1, ) + \
            self._model_final_trained[0]._d_y
        preds = np.zeros(
            x_dy_shape +
            (self.n_periods * d_t, )
        )
        for t in range(self.n_periods):
            preds[..., t * d_t: (t + 1) * d_t] = \
                self._model_final_trained[t].predict(X).reshape(
                x_dy_shape + (d_t, )
            )
        return preds

    def score(self, Y, T, X=None, W=None, Z=None, nuisances=None, sample_weight=None, sample_var=None, groups=None):
        assert Y.shape[0] % self.n_periods == 0, \
            "Length of training data should be an integer multiple of time periods."
        Y_res, T_res = nuisances
        scores = np.full((self.n_periods, ), np.nan)
        period_filters = _get_groups_period_filter(groups, self.n_periods)
        for t in np.arange(self.n_periods - 1, -1, -1):
            Y_adj = Y_res[period_filters[t]].copy()
            if t < self.n_periods - 1:
                Y_adj -= np.sum(
                    [self._model_final_trained[j].predict_with_res(
                        X[period_filters[0]] if X is not None else None,
                        T_res[period_filters[j], ..., t]
                    ) for j in np.arange(t + 1, self.n_periods)], axis=0)
            Y_adj_pred = self._model_final_trained[t].predict_with_res(
                X[period_filters[0]] if X is not None else None,
                T_res[period_filters[t], ..., t])
            if sample_weight is not None:
                scores[t] = np.mean(np.average((Y_adj - Y_adj_pred)**2, weights=sample_weight, axis=0))
            else:
                scores[t] = np.mean((Y_adj - Y_adj_pred) ** 2)
        return scores


class _LinearDynamicModelFinal(_DynamicModelFinal):
    """Wrapper for the DynamicModelFinal with StatsModelsLinearRegression final model.

    The final model is a linear model with (d_t*n_periods) coefficients.
    This model is defined after the coefficients and covariance are calculated.
    """

    def __init__(self, model_final, n_periods):
        super().__init__(model_final, n_periods)
        self.model_final_ = StatsModelsLinearRegression(fit_intercept=False)

    def fit(self, Y, T, X=None, W=None, Z=None, nuisances=None, sample_weight=None, sample_var=None, groups=None):
        super().fit(Y, T, X=X, W=W, Z=Z, nuisances=nuisances,
                    sample_weight=sample_weight, sample_var=sample_var, groups=groups)
        # Compose final model
        cov = self._get_cov(nuisances, X, groups)
        coef = self._get_coef_()
        self.model_final_._n_out = self._d_y[0] if self._d_y else 0
        self.model_final_._param_var = cov / (Y.shape[0] / self.n_periods)
        self.model_final_._param = coef.T if self.model_final_._n_out else coef

    def _get_coef_(self):
        period_coefs = np.array([self._model_final_trained[t]._model.coef_ for t in range(self.n_periods)])
        if self._d_y:
            return np.array([
                np.array([period_coefs[k, i, :] for k in range(self.n_periods)]).flatten()
                for i in range(self._d_y[0])
            ])
        return period_coefs.flatten()

    def _get_cov(self, nuisances, X, groups):
        if self._d_y:
            return np.array(
                [self._fit_single_output_cov((nuisances[0][:, i], nuisances[1]), X, i, groups)
                 for i in range(self._d_y[0])]
            )
        return self._fit_single_output_cov(nuisances, X, -1, groups)

    def _fit_single_output_cov(self, nuisances, X, y_index, groups):
        """ Calculates the covariance (n_periods*n_treatments)
            x (n_periods*n_treatments) matrix for a single outcome.
        """
        Y_res, T_res = nuisances
        # Calculate auxiliary quantities
        period_filters = _get_groups_period_filter(groups, self.n_periods)
        # X ⨂ T_res
        XT_res = np.array([
            [
                self._model_final_trained[0]._combine(
                    X[period_filters[0]] if X is not None else None,
                    T_res[period_filters[t], ..., j],
                    fitting=False
                )
                for j in range(self.n_periods)
            ]
            for t in range(self.n_periods)
        ])
        d_xt = XT_res.shape[-1]
        # sum(model_final.predict(X, T_res))
        Y_diff = np.array([
            np.sum([
                self._model_final_trained[j].predict_with_res(
                    X[period_filters[0]] if X is not None else None,
                    T_res[period_filters[j], ..., t]
                ) for j in np.arange(t, self.n_periods)],
                axis=0
            )
            for t in np.arange(self.n_periods)
        ])
        J = np.zeros((self.n_periods * d_xt,
                      self.n_periods * d_xt))
        Sigma = np.zeros((self.n_periods * d_xt,
                          self.n_periods * d_xt))
        for t in np.arange(self.n_periods):
            res_epsilon_t = (Y_res[period_filters[t]] -
                             (Y_diff[t][:, y_index] if y_index >= 0 else Y_diff[t])
                             ).reshape(-1, 1, 1)
            resT_t = XT_res[t][t]
            for j in np.arange(self.n_periods):
                # Calculating the (t, j) block entry (of size n_treatments x n_treatments) of matrix Sigma
                res_epsilon_j = (Y_res[period_filters[j]] -
                                 (Y_diff[j][:, y_index] if y_index >= 0 else Y_diff[j])
                                 ).reshape(-1, 1, 1)
                resT_j = XT_res[j][j]
                cov_resT_tj = resT_t.reshape(-1, d_xt, 1) @ resT_j.reshape(-1, 1, d_xt)
                sigma_tj = np.mean((res_epsilon_t * res_epsilon_j) * cov_resT_tj, axis=0)
                Sigma[t * d_xt:(t + 1) * d_xt,
                      j * d_xt:(j + 1) * d_xt] = sigma_tj
                if j >= t:
                    # Calculating the (t, j) block entry (of size n_treatments x n_treatments) of matrix J
                    m_tj = np.mean(
                        XT_res[j][t].reshape(-1, d_xt, 1) @ resT_t.reshape(-1, 1, d_xt),
                        axis=0)
                    J[t * d_xt:(t + 1) * d_xt,
                      j * d_xt:(j + 1) * d_xt] = m_tj
        return np.linalg.inv(J) @ Sigma @ np.linalg.inv(J).T


class _DynamicFinalWrapper(_FinalWrapper):

    def predict_with_res(self, X, T_res):
        fts = self._combine(X, T_res, fitting=False)
        prediction = self._model.predict(fts)
        if self._intercept is not None:
            prediction -= self._intercept
        return reshape(prediction, (prediction.shape[0],) + self._d_y)


[docs]class DynamicDML(LinearModelFinalCateEstimatorMixin, _OrthoLearner): """CATE estimator for dynamic treatment effect estimation. This estimator is an extension of the Double ML approach for treatments assigned sequentially over time periods. The estimator is a special case of an :class:`_OrthoLearner` estimator, so it follows the two stage process, where a set of nuisance functions are estimated in the first stage in a crossfitting manner and a final stage estimates the CATE model. See the documentation of :class:`._OrthoLearner` for a description of this two stage process. Parameters ---------- model_y: estimator, default ``'auto'`` Determines how to fit the outcome to the features. - If ``'auto'``, the model will be the best-fitting of a set of linear and forest models - Otherwise, see :ref:`model_selection` for the range of supported options; if a single model is specified it should be a classifier if `discrete_outcome` is True and a regressor otherwise model_t: estimator, default ``'auto'`` Determines how to fit the treatment to the features. - If ``'auto'``, the model will be the best-fitting of a set of linear and forest models - Otherwise, see :ref:`model_selection` for the range of supported options; if a single model is specified it should be a classifier if `discrete_treatment` is True and a regressor otherwise featurizer : :term:`transformer`, optional Must support fit_transform and transform. Used to create composite features in the final CATE regression. It is ignored if X is None. The final CATE will be trained on the outcome of featurizer.fit_transform(X). If featurizer=None, then CATE is trained on X. fit_cate_intercept : bool, default True Whether the linear CATE model should have a constant term. discrete_outcome: bool, default False Whether the outcome should be treated as binary discrete_treatment: bool, default ``False`` Whether the treatment values should be treated as categorical, rather than continuous, quantities categories: 'auto' or list, default 'auto' The categories to use when encoding discrete treatments (or 'auto' to use the unique sorted values). The first category will be treated as the control treatment. cv: int, cross-validation generator or an iterable, default 2 Determines the cross-validation splitting strategy. Possible inputs for cv are: - None, to use the default 3-fold cross-validation, - integer, to specify the number of folds. - :term:`CV splitter` - An iterable yielding (train, test) splits as arrays of indices. Iterables should make sure a group belongs to a single split. For integer/None inputs, :class:`~sklearn.model_selection.GroupKFold` is used Unless an iterable is used, we call `split(X, T, groups)` to generate the splits. mc_iters: int, optional The number of times to rerun the first stage models to reduce the variance of the nuisances. mc_agg: {'mean', 'median'}, default 'mean' How to aggregate the nuisance value for each sample across the `mc_iters` monte carlo iterations of cross-fitting. random_state : int, RandomState instance, or None, default None If int, random_state is the seed used by the random number generator; If :class:`~numpy.random.mtrand.RandomState` instance, random_state is the random number generator; If None, the random number generator is the :class:`~numpy.random.mtrand.RandomState` instance used by :mod:`np.random<numpy.random>`. allow_missing: bool Whether to allow missing values in W. If True, will need to supply nuisance_models that can handle missing values. Examples -------- A simple example with default models: .. testcode:: :hide: import numpy as np np.set_printoptions(suppress=True) .. testcode:: from econml.panel.dml import DynamicDML np.random.seed(123) n_panels = 100 # number of panels n_periods = 3 # number of time periods per panel n = n_panels * n_periods groups = np.repeat(a=np.arange(n_panels), repeats=n_periods, axis=0) X = np.random.normal(size=(n, 1)) T = np.random.normal(size=(n, 2)) y = np.random.normal(size=(n, )) est = DynamicDML() est.fit(y, T, X=X, W=None, groups=groups, inference="auto") >>> est.const_marginal_effect(X[:2]) array([[-0.363..., -0.049..., -0.044..., 0.042..., -0.202..., 0.023...], [-0.128..., 0.424..., 0.050... , -0.203..., -0.115..., -0.135...]]) >>> est.effect(X[:2], T0=0, T1=1) array([-0.594..., -0.107...]) >>> est.effect(X[:2], T0=np.zeros((2, n_periods*T.shape[1])), T1=np.ones((2, n_periods*T.shape[1]))) array([-0.594..., -0.107...]) >>> est.coef_ array([[ 0.112... ], [ 0.227...], [ 0.045...], [-0.118...], [ 0.041...], [-0.076...]]) >>> est.coef__interval() (array([[-0.060...], [-0.008...], [-0.120...], [-0.392...], [-0.120...], [-0.257...]]), array([[0.286...], [0.463...], [0.212... ], [0.156...], [0.204...], [0.104...]])) """
[docs] def __init__(self, *, model_y='auto', model_t='auto', featurizer=None, fit_cate_intercept=True, linear_first_stages="deprecated", discrete_outcome=False, discrete_treatment=False, categories='auto', cv=2, mc_iters=None, mc_agg='mean', random_state=None, allow_missing=False): self.fit_cate_intercept = fit_cate_intercept if linear_first_stages != "deprecated": warn("The linear_first_stages parameter is deprecated and will be removed in a future version of EconML", DeprecationWarning) self.featurizer = clone(featurizer, safe=False) self.model_y = clone(model_y, safe=False) self.model_t = clone(model_t, safe=False) super().__init__(discrete_outcome=discrete_outcome, discrete_treatment=discrete_treatment, treatment_featurizer=None, discrete_instrument=False, categories=categories, cv=GroupKFold(cv) if isinstance(cv, int) else cv, mc_iters=mc_iters, mc_agg=mc_agg, random_state=random_state, allow_missing=allow_missing)
def _gen_allowed_missing_vars(self): return ['W'] if self.allow_missing else [] # override only so that we can exclude treatment featurization verbiage in docstring
[docs] def const_marginal_effect(self, X=None): """ Calculate the constant marginal CATE :math:`\\theta(·)`. The marginal effect is conditional on a vector of features on a set of m test samples X[i]. Parameters ---------- X: (m, d_x) matrix, optional Features for each sample. Returns ------- theta: (m, d_y, d_t) matrix or (d_y, d_t) matrix if X is None Constant marginal CATE of each treatment on each outcome for each sample X[i]. Note that when Y or T is a vector rather than a 2-dimensional array, the corresponding singleton dimensions in the output will be collapsed (e.g. if both are vectors, then the output of this method will also be a vector) """ return super().const_marginal_effect(X=X)
# override only so that we can exclude treatment featurization verbiage in docstring
[docs] def const_marginal_ate(self, X=None): """ Calculate the average constant marginal CATE :math:`E_X[\\theta(X)]`. Parameters ---------- X: (m, d_x) matrix, optional Features for each sample. Returns ------- theta: (d_y, d_t) matrix Average constant marginal CATE of each treatment on each outcome. Note that when Y or T is a vector rather than a 2-dimensional array, the corresponding singleton dimensions in the output will be collapsed (e.g. if both are vectors, then the output of this method will be a scalar) """ return super().const_marginal_ate(X=X)
def _gen_featurizer(self): return clone(self.featurizer, safe=False) def _gen_model_y(self): return _make_first_stage_selector(self.model_y, is_discrete=self.discrete_outcome, random_state=self.random_state) def _gen_model_t(self): return _make_first_stage_selector(self.model_t, is_discrete=self.discrete_treatment, random_state=self.random_state) def _gen_model_final(self): return StatsModelsLinearRegression(fit_intercept=False) def _gen_ortho_learner_model_nuisance(self): return _DynamicModelNuisanceSelector( model_t=self._gen_model_t(), model_y=self._gen_model_y(), n_periods=self._n_periods) def _gen_ortho_learner_model_final(self): wrapped_final_model = _DynamicFinalWrapper( StatsModelsLinearRegression(fit_intercept=False), fit_cate_intercept=self.fit_cate_intercept, featurizer=self.featurizer, use_weight_trick=False) return _LinearDynamicModelFinal(wrapped_final_model, n_periods=self._n_periods) def _prefit(self, Y, T, *args, groups=None, only_final=False, **kwargs): # we need to set the number of periods before calling super()._prefit, since that will generate the # final and nuisance models, which need to have self._n_periods set u_periods = np.unique(np.unique(groups, return_counts=True)[1]) if len(u_periods) > 1: raise AttributeError( "Imbalanced panel. Method currently expects only panels with equal number of periods. Pad your data") self._n_periods = u_periods[0] super()._prefit(Y, T, *args, **kwargs) def _postfit(self, Y, T, *args, **kwargs): super()._postfit(Y, T, *args, **kwargs) # Set _d_t to effective number of treatments self._d_t = (self._n_periods * self._d_t[0], ) if self._d_t else (self._n_periods, ) def _strata(self, Y, T, X=None, W=None, Z=None, sample_weight=None, sample_var=None, groups=None, cache_values=False, only_final=False, check_input=True): # Required for bootstrap inference return groups
[docs] def fit(self, Y, T, *, X=None, W=None, sample_weight=None, sample_var=None, groups, cache_values=False, inference='auto'): """Estimate the counterfactual model from data, i.e. estimates function :math:`\\theta(\\cdot)`. The input data must contain groups with the same size corresponding to the number of time periods the treatments were assigned over. The data should be preferably in panel format, with groups clustered together. If group members do not appear together, the following is assumed: * the first instance of a group in the dataset is assumed to correspond to the first period of that group * the second instance of a group in the dataset is assumed to correspond to the second period of that group ...etc. Only the value of the features X at the first period of each unit are used for heterogeneity. The value of X in subseuqnet periods is used as a time-varying control but not for heterogeneity. Parameters ---------- Y: (n, d_y) matrix or vector of length n Outcomes for each sample (required: n = n_groups * n_periods) T: (n, d_t) matrix or vector of length n Treatments for each sample (required: n = n_groups * n_periods) X:(n, d_x) matrix, optional Features for each sample (Required: n = n_groups * n_periods). Only first period features from each unit are used for heterogeneity, the rest are used as time-varying controls together with W W:(n, d_w) matrix, optional Controls for each sample (Required: n = n_groups * n_periods) sample_weight:(n,) vector, optional Weights for each samples sample_var:(n,) vector, optional Sample variance for each sample groups: (n,) vector, required All rows corresponding to the same group will be kept together during splitting. If groups is not None, the `cv` argument passed to this class's initializer must support a 'groups' argument to its split method. cache_values: bool, default False Whether to cache inputs and first stage results, which will allow refitting a different final model inference: str,:class:`.Inference` instance, or None Method for performing inference. This estimator supports 'bootstrap' (or an instance of :class:`.BootstrapInference`) and 'auto' (or an instance of :class:`.LinearModelFinalInference`). Returns ------- self: DynamicDML instance """ if sample_weight is not None or sample_var is not None: warn("This CATE estimator does not yet support sample weights and sample variance. " "These inputs will be ignored during fitting.", UserWarning) return super().fit(Y, T, X=X, W=W, sample_weight=None, sample_var=None, groups=groups, cache_values=cache_values, inference=inference)
[docs] def score(self, Y, T, X=None, W=None, sample_weight=None, *, groups): """ Score the fitted CATE model on a new data set. Generates nuisance parameters for the new data set based on the fitted residual nuisance models created at fit time. It uses the mean prediction of the models fitted by the different crossfit folds. Then calculates the MSE of the final residual Y on residual T regression. If model_final does not have a score method, then it raises an :exc:`.AttributeError` Parameters ---------- Y: (n, d_y) matrix or vector of length n Outcomes for each sample (required: n = n_groups * n_periods) T: (n, d_t) matrix or vector of length n Treatments for each sample (required: n = n_groups * n_periods) X:(n, d_x) matrix, optional Features for each sample (Required: n = n_groups * n_periods) W:(n, d_w) matrix, optional Controls for each sample (Required: n = n_groups * n_periods) groups: (n,) vector, required All rows corresponding to the same group will be kept together during splitting. Returns ------- score: float The MSE of the final CATE model on the new data. """ if not hasattr(self._ortho_learner_model_final, 'score'): raise AttributeError("Final model does not have a score method!") Y, T, X, groups = check_input_arrays(Y, T, X, groups) W, = check_input_arrays(W, force_all_finite='allow-nan' if 'W' in self._gen_allowed_missing_vars() else True) self._check_fitted_dims(X) X, T = super()._expand_treatments(X, T) n_iters = len(self._models_nuisance) n_splits = len(self._models_nuisance[0]) # for each mc iteration for i, models_nuisances in enumerate(self._models_nuisance): # for each model under cross fit setting for j, mdl in enumerate(models_nuisances): nuisance_temp = mdl.predict(Y, T, **filter_none_kwargs(X=X, W=W, groups=groups)) if not isinstance(nuisance_temp, tuple): nuisance_temp = (nuisance_temp,) if i == 0 and j == 0: nuisances = [np.zeros((n_iters * n_splits,) + nuis.shape) for nuis in nuisance_temp] for it, nuis in enumerate(nuisance_temp): nuisances[it][i * n_iters + j] = nuis for it in range(len(nuisances)): nuisances[it] = np.mean(nuisances[it], axis=0) return self._ortho_learner_model_final.score(Y, T, nuisances=nuisances, **filter_none_kwargs(X=X, W=W, sample_weight=sample_weight, groups=groups))
[docs] def cate_treatment_names(self, treatment_names=None): """ Get treatment names for each time period. If the treatment is discrete, it will return expanded treatment names. Parameters ---------- treatment_names: list of str of length T.shape[1] or None The names of the treatments. If None and the T passed to fit was a dataframe, it defaults to the column names from the dataframe. Returns ------- out_treatment_names: list of str Returns (possibly expanded) treatment names. """ slice_treatment_names = super().cate_treatment_names(treatment_names) treatment_names_out = [] for k in range(self._n_periods): treatment_names_out += [f"({t})$_{k}$" for t in slice_treatment_names] return treatment_names_out
[docs] def cate_feature_names(self, feature_names=None): """ Get the output feature names. Parameters ---------- feature_names: list of str of length X.shape[1] or None The names of the input features. If None and X is a dataframe, it defaults to the column names from the dataframe. Returns ------- out_feature_names: list of str or None The names of the output features :math:`\\phi(X)`, i.e. the features with respect to which the final constant marginal CATE model is linear. It is the names of the features that are associated with each entry of the :meth:`coef_` parameter. Not available when the featurizer is not None and does not have a method: `get_feature_names(feature_names)`. Otherwise None is returned. """ if self._d_x is None: # Handles the corner case when X=None but featurizer might be not None return None if feature_names is None: feature_names = self._input_names["feature_names"] if self.original_featurizer is None: return feature_names return get_feature_names_or_default(self.original_featurizer, feature_names)
def _expand_treatments(self, X, *Ts, transform=True): # Expand treatments for each time period outTs = [] base_expand_treatments = super()._expand_treatments for T in Ts: if ndim(T) == 0: one_T = base_expand_treatments(X, T, transform=transform)[1] one_T = one_T.reshape(-1, 1) if ndim(one_T) == 1 else one_T T = np.tile(one_T, (1, self._n_periods, )) else: assert (T.shape[1] == self._n_periods if self.transformer else T.shape[1] == self._d_t[0]), \ f"Expected a list of time period * d_t, instead got a treatment array of shape {T.shape}." if self.transformer: T = np.hstack([ base_expand_treatments( X, T[:, [t]], transform=transform)[1] for t in range(self._n_periods) ]) outTs.append(T) return (X,) + tuple(outTs) @property def bias_part_of_coef(self): return self.ortho_learner_model_final_._model_final._fit_cate_intercept @property def fit_cate_intercept_(self): return self.ortho_learner_model_final_._model_final._fit_cate_intercept @property def original_featurizer(self): # NOTE: important to use the _ortho_learner_model_final_ attribute instead of the # attribute so that the trained featurizer will be passed through return self.ortho_learner_model_final_._model_final_trained[0]._original_featurizer @property def featurizer_(self): # NOTE This is used by the inference methods and has to be the overall featurizer. intended # for internal use by the library return self.ortho_learner_model_final_._model_final_trained[0]._featurizer @property def model_final_(self): # NOTE This is used by the inference methods and is more for internal use to the library # We need to use the _ortho_learner's copy to retain the information from fitting return self.ortho_learner_model_final_.model_final_ @property def model_final(self): return self._gen_model_final() @model_final.setter def model_final(self, model): if model is not None: raise ValueError("Parameter `model_final` cannot be altered for this estimator!") @property def models_y(self): return [[mdl._model_y for mdl in mdls] for mdls in super().models_nuisance_] @property def models_t(self): return [[mdl._model_t for mdl in mdls] for mdls in super().models_nuisance_] @property def nuisance_scores_y(self): return self.nuisance_scores_[0] @property def nuisance_scores_t(self): return self.nuisance_scores_[1] @property def residuals_(self): """ A tuple (y_res, T_res, X, W), of the residuals from the first stage estimation along with the associated X and W. Samples are not guaranteed to be in the same order as the input order. """ if not hasattr(self, '_cached_values'): raise AttributeError("Estimator is not fitted yet!") if self._cached_values is None: raise AttributeError("`fit` was called with `cache_values=False`. " "Set to `True` to enable residual storage.") Y_res, T_res = self._cached_values.nuisances return Y_res, T_res, self._cached_values.X, self._cached_values.W