Source code for econml.iv.dr._dr

# Copyright (c) PyWhy contributors. All rights reserved.
# Licensed under the MIT License.

"""Doubly Robust IV for Heterogeneous Treatment Effects.

An Doubly Robust machine learning approach to estimation of heterogeneous
treatment effect with an endogenous treatment and an instrument.

Implements the DRIV algorithm for estimating CATE with IVs from the paper:

Machine Learning Estimation of Heterogeneous Treatment Effects with Instruments
Vasilis Syrgkanis, Victor Lei, Miruna Oprescu, Maggie Hei, Keith Battocchi, Greg Lewis
https://arxiv.org/abs/1905.10176
"""

import numpy as np
from sklearn.base import clone
from sklearn.linear_model import LinearRegression, LogisticRegressionCV
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import FunctionTransformer
from sklearn.dummy import DummyClassifier


from ..._ortho_learner import _OrthoLearner
from ..._cate_estimator import (StatsModelsCateEstimatorMixin, DebiasedLassoCateEstimatorMixin,
                                ForestModelFinalCateEstimatorMixin, GenericSingleTreatmentModelFinalInference,
                                LinearCateEstimator)
from ...inference import StatsModelsInference
from ...sklearn_extensions.linear_model import StatsModelsLinearRegression, DebiasedLasso, WeightedLassoCVWrapper
from ...sklearn_extensions.model_selection import ModelSelector, SingleModelSelector, WeightedStratifiedKFold
from ...utilities import (_deprecate_positional, add_intercept, filter_none_kwargs,
                          inverse_onehot, get_feature_names_or_default, check_high_dimensional, check_input_arrays)
from ...grf import RegressionForest
from ...dml.dml import _make_first_stage_selector, _FinalWrapper
from ...iv.dml import NonParamDMLIV
from ..._shap import _shap_explain_model_cate


def _combine(W, Z, n_samples):
    if Z is not None:  # Z will not be None
        Z = Z.reshape(n_samples, -1)
        return Z if W is None else np.hstack([W, Z])
    return None if W is None else W


class _BaseDRIVNuisanceSelector(ModelSelector):
    def __init__(self, *, prel_model_effect, model_y_xw, model_t_xw, model_z,
                 projection,
                 discrete_treatment, discrete_instrument):
        self._prel_model_effect = prel_model_effect
        self._model_y_xw = model_y_xw
        self._model_t_xw = model_t_xw
        self._projection = projection
        self._discrete_treatment = discrete_treatment
        self._discrete_instrument = discrete_instrument
        if self._projection:
            self._model_t_xwz = model_z
        else:
            self._model_z_xw = model_z

    def train(self, is_selecting, folds, Y, T, X=None, W=None, Z=None, sample_weight=None, groups=None):
        # T and Z only allow single continuous or binary, keep the shape of (n,) for continuous and (n,1) for binary
        T = T.ravel() if not self._discrete_treatment else T
        Z = Z.ravel() if not self._discrete_instrument else Z

        self._model_y_xw.train(is_selecting, folds, X=X, W=W, Target=Y, sample_weight=sample_weight, groups=groups)
        self._model_t_xw.train(is_selecting, folds, X=X, W=W, Target=T, sample_weight=sample_weight, groups=groups)

        if self._projection:
            WZ = _combine(W, Z, Y.shape[0])
            self._model_t_xwz.train(is_selecting, folds, X=X, W=WZ, Target=T,
                                    sample_weight=sample_weight, groups=groups)
        else:
            self._model_z_xw.train(is_selecting, folds, X=X, W=W, Target=Z, sample_weight=sample_weight, groups=groups)

        # TODO: prel_model_effect could allow sample_var and freq_weight?
        if self._discrete_instrument:
            Z = inverse_onehot(Z)
        if self._discrete_treatment:
            T = inverse_onehot(T)
        self._prel_model_effect.fit(Y, T, Z=Z, X=X,
                                    W=W, sample_weight=sample_weight, groups=groups)
        return self

    def score(self, Y, T, X=None, W=None, Z=None, sample_weight=None, groups=None):
        # T and Z only allow single continuous or binary, keep the shape of (n,) for continuous and (n,1) for binary
        T = T.ravel() if not self._discrete_treatment else T
        Z = Z.ravel() if not self._discrete_instrument else Z

        if hasattr(self._model_y_xw, 'score'):
            y_xw_score = self._model_y_xw.score(X=X, W=W, Target=Y, sample_weight=sample_weight)
        else:
            y_xw_score = None
        if hasattr(self._model_t_xw, 'score'):
            t_xw_score = self._model_t_xw.score(X=X, W=W, Target=T, sample_weight=sample_weight)
        else:
            t_xw_score = None

        if hasattr(self._prel_model_effect, 'score'):
            # we need to undo the one-hot encoding for calling effect,
            # since it expects raw values
            raw_T = inverse_onehot(T) if self._discrete_treatment else T
            raw_Z = inverse_onehot(Z) if self._discrete_instrument else Z
            effect_score = self._prel_model_effect.score(Y, raw_T,
                                                         Z=raw_Z, X=X, W=W, sample_weight=sample_weight)
        else:
            effect_score = None

        if self._projection:
            if hasattr(self._model_t_xwz, 'score'):
                WZ = _combine(W, Z, Y.shape[0])
                t_xwz_score = self._model_t_xwz.score(X=X, W=WZ, Target=T, sample_weight=sample_weight)
            else:
                t_xwz_score = None

            return y_xw_score, t_xw_score, t_xwz_score, effect_score

        else:
            if hasattr(self._model_z_xw, 'score'):
                z_xw_score = self._model_z_xw.score(X=X, W=W, Target=Z, sample_weight=sample_weight)
            else:
                z_xw_score = None

            return y_xw_score, t_xw_score, z_xw_score, effect_score

    def predict(self, Y, T, X=None, W=None, Z=None, sample_weight=None, groups=None):
        Y_pred = self._model_y_xw.predict(X, W)
        T_pred = self._model_t_xw.predict(X, W)
        prel_theta = self._prel_model_effect.effect(X)

        if X is None:
            prel_theta = np.tile(prel_theta.reshape(1, -1), (Y.shape[0], 1))
            if W is None:
                Y_pred = np.tile(Y_pred.reshape(1, -1), (Y.shape[0], 1))
                T_pred = np.tile(T_pred.reshape(1, -1), (Y.shape[0], 1))

        # for convenience, reshape Z,T to a vector since they are either binary or single dimensional continuous
        T = T.reshape(T.shape[0],)
        Z = Z.reshape(Z.shape[0],)
        # reshape the predictions
        Y_pred = Y_pred.reshape(Y.shape)
        T_pred = T_pred.reshape(T.shape)

        Y_res = Y - Y_pred
        T_res = T - T_pred

        if self._projection:
            # concat W and Z
            WZ = _combine(W, Z, Y.shape[0])
            T_proj = self._model_t_xwz.predict(X, WZ).reshape(T.shape)
            Z_res = T_proj - T_pred
        else:
            Z_pred = self._model_z_xw.predict(X, W)
            if X is None and W is None:
                Z_pred = np.tile(Z_pred.reshape(1, -1), (Z.shape[0], 1))
            Z_pred = Z_pred.reshape(Z.shape)
            Z_res = Z - Z_pred

        # check nuisances outcome shape
        # Y_res could be a vector or 1-dimensional 2d-array
        assert T_res.ndim == 1, "Nuisance outcome should be vector!"
        assert Z_res.ndim == 1, "Nuisance outcome should be vector!"

        return prel_theta, Y_res, T_res, Z_res

    @property
    def needs_fit(self):
        return (self._model_y_xw.needs_fit or self._model_t_xw.needs_fit or
                (self._projection and self._model_t_xwz.needs_fit) or
                (not self._projection and self._model_z_xw.needs_fit))


class _BaseDRIVNuisanceCovarianceSelector(ModelSelector):
    def __init__(self, *, model_tz_xw,
                 projection, fit_cov_directly,
                 discrete_treatment, discrete_instrument):
        self._model_tz_xw = model_tz_xw
        self._projection = projection
        self._fit_cov_directly = fit_cov_directly
        self._discrete_treatment = discrete_treatment
        self._discrete_instrument = discrete_instrument

    def _get_target(self, T_res, Z_res, T, Z):
        T = T.ravel() if not self._discrete_treatment else T
        Z = Z.ravel() if not self._discrete_instrument else Z
        if self._projection:
            if self._fit_cov_directly:
                # We're projecting, so we're treating E[T|X,Z] as the instrument (ignoring W for simplicity)
                # Then beta(X) = E[T̃ (E[T|X,Z]-E[E[T|X,Z]|X)|X] and we can apply the tower rule several times to get
                #              = E[(E[T|X,Z]-E[T|X])^2|X]
                # and also     = E[(E[T|X,Z]-T)^2|X]
                # so we can compute it either from (T_proj-T_pred)^2 or from (T_proj-T)^2
                # The first of these is just Z_res^2
                target = Z_res**2
            else:
                # fit on T*T_proj, covariance will be computed by E[T_res * T_proj] = E[T*T_proj] - E[T]^2
                # return shape (n,)
                T_pred = T - T_res.reshape(T.shape)
                T_proj = T_pred + Z_res.reshape(T.shape)
                target = (T * T_proj).reshape(T.shape[0],)
        else:
            if self._fit_cov_directly:
                # we will fit on the covariance (T_res*Z_res) directly
                target_shape = Z_res.shape if Z_res.ndim > 1 else T_res.shape
                target = T_res.reshape(target_shape) * Z_res.reshape(target_shape)
            else:
                # fit on TZ, covariance will be computed by E[T_res * Z_res] = TZ_pred - T_pred * Z_pred
                if self._discrete_treatment:
                    if self._discrete_instrument:
                        # target will be discrete and will be inversed from FirstStageWrapper, shape (n,1)
                        target = T * Z
                    else:
                        # shape (n,)
                        target = inverse_onehot(T) * Z
                else:
                    if self._discrete_instrument:
                        # shape (n,)
                        target = T * inverse_onehot(Z)
                    else:
                        # shape(n,)
                        target = T * Z
        return target

    def train(self, is_selecting, folds,
              prel_theta, Y_res, T_res, Z_res,
              Y, T, X=None, W=None, Z=None, sample_weight=None, groups=None):
        # T and Z only allow single continuous or binary, keep the shape of (n,) for continuous and (n,1) for binary
        target = self._get_target(T_res, Z_res, T, Z)
        self._model_tz_xw.train(is_selecting, folds, X=X, W=W, Target=target,
                                sample_weight=sample_weight, groups=groups)

        return self

    def score(self, prel_theta, Y_res, T_res, Z_res, Y, T, X=None, W=None, Z=None, sample_weight=None, groups=None):
        # T and Z only allow single continuous or binary, keep the shape of (n,) for continuous and (n,1) for binary
        if hasattr(self._model_tz_xw, 'score'):
            target = self._get_target(T_res, Z_res, T, Z)
            tz_xw_score = self._model_tz_xw.score(X=X, W=W, Target=target, sample_weight=sample_weight)
        else:
            tz_xw_score = None

        return (tz_xw_score,)

    def predict(self, prel_theta, Y_res, T_res, Z_res, Y, T, X=None, W=None, Z=None, sample_weight=None, groups=None):
        TZ_pred = self._model_tz_xw.predict(X, W)

        if X is None and W is None:
            TZ_pred = np.tile(TZ_pred.reshape(1, -1), (Y.shape[0], 1))

        # for convenience, reshape Z,T to a vector since they are either binary or single dimensional continuous
        T = T.reshape(T.shape[0],)
        Z = Z.reshape(Z.shape[0],)

        # reshape the predictions
        TZ_pred = TZ_pred.reshape(T.shape)

        if self._projection:
            if self._fit_cov_directly:
                cov = TZ_pred
            else:
                T_pred = T - T_res
                cov = TZ_pred - T_pred**2
            # in the projection case, this is a variance and should always be non-negative
            cov = np.maximum(cov, 0)
        else:
            if self._fit_cov_directly:
                cov = TZ_pred
            else:
                T_pred = T - T_res
                Z_pred = Z - Z_res
                cov = TZ_pred - T_pred * Z_pred

        # check nuisances outcome shape
        assert cov.ndim == 1, "Nuisance outcome should be vector!"

        return (cov,)

    @property
    def needs_fit(self):
        return self._model_tz_xw.needs_fit


class _BaseDRIVModelFinal:
    def __init__(self, model_final, featurizer, fit_cate_intercept, cov_clip, opt_reweighted):
        self._model_final = clone(model_final, safe=False)
        self._original_featurizer = clone(featurizer, safe=False)
        self._fit_cate_intercept = fit_cate_intercept
        self._cov_clip = cov_clip
        self._opt_reweighted = opt_reweighted

        if self._fit_cate_intercept:
            add_intercept_trans = FunctionTransformer(add_intercept,
                                                      validate=True)
            if featurizer:
                self._featurizer = Pipeline([('featurize', self._original_featurizer),
                                             ('add_intercept', add_intercept_trans)])
            else:
                self._featurizer = add_intercept_trans
        else:
            self._featurizer = self._original_featurizer

    def _effect_estimate(self, nuisances):
        # all could be reshaped to vector since Y, T, Z are all single dimensional.
        prel_theta, res_y, res_t, res_z, cov = [nuisance.reshape(nuisances[0].shape[0]) for nuisance in nuisances]

        # Estimate final model of theta(X) by minimizing the square loss:
        # (prel_theta(X) + (Y_res - prel_theta(X) * T_res) * Z_res / cov[T,Z | X] - theta(X))^2
        # We clip the covariance so that it is bounded away from zero, so as to reduce variance
        # at the expense of some small bias. For points with very small covariance we revert
        # to the model-based preliminary estimate and do not add the correction term.
        cov_sign = np.sign(cov)
        cov_sign[cov_sign == 0] = 1
        clipped_cov = cov_sign * np.clip(np.abs(cov),
                                         self._cov_clip, np.inf)
        return prel_theta + (res_y - prel_theta * res_t) * res_z / clipped_cov, clipped_cov, res_z

    def _transform_X(self, X, n=1, fitting=True):
        if X is not None:
            if self._featurizer is not None:
                F = self._featurizer.fit_transform(X) if fitting else self._featurizer.transform(X)
            else:
                F = X
        else:
            if not self._fit_cate_intercept:
                raise AttributeError("Cannot have X=None and also not allow for a CATE intercept!")
            F = np.ones((n, 1))
        return F

    def fit(self, Y, T, X=None, W=None, Z=None, nuisances=None,
            sample_weight=None, freq_weight=None, sample_var=None, groups=None):
        self.d_y = Y.shape[1:]
        self.d_t = T.shape[1:]
        theta_dr, clipped_cov, res_z = self._effect_estimate(nuisances)

        X = self._transform_X(X, n=theta_dr.shape[0])
        if self._opt_reweighted and (sample_weight is not None):
            sample_weight = sample_weight * clipped_cov.ravel()**2
        elif self._opt_reweighted:
            sample_weight = clipped_cov.ravel()**2
        target_var = sample_var * (res_z**2 / clipped_cov**2) if sample_var is not None else None
        self._model_final.fit(X, theta_dr, **filter_none_kwargs(sample_weight=sample_weight,
                                                                freq_weight=freq_weight, sample_var=target_var))
        return self

    def predict(self, X=None):
        X = self._transform_X(X, fitting=False)
        return self._model_final.predict(X).reshape((-1,) + self.d_y + self.d_t)

    def score(self, Y, T, X=None, W=None, Z=None, nuisances=None, sample_weight=None, groups=None):
        theta_dr, clipped_cov, _ = self._effect_estimate(nuisances)

        X = self._transform_X(X, fitting=False)

        if self._opt_reweighted and (sample_weight is not None):
            sample_weight = sample_weight * clipped_cov.ravel()**2
        elif self._opt_reweighted:
            sample_weight = clipped_cov.ravel()**2

        return np.average((theta_dr.ravel() - self._model_final.predict(X).ravel())**2,
                          weights=sample_weight, axis=0)


class _BaseDRIV(_OrthoLearner):
    # A helper class that access all the internal fitted objects of a DRIV Cate Estimator.
    # Used by both DRIV and IntentToTreatDRIV.
    def __init__(self, *,
                 model_final,
                 featurizer=None,
                 fit_cate_intercept=False,
                 cov_clip=1e-3,
                 opt_reweighted=False,
                 discrete_outcome=False,
                 discrete_instrument=False,
                 discrete_treatment=False,
                 treatment_featurizer=None,
                 categories='auto',
                 cv=2,
                 mc_iters=None,
                 mc_agg='mean',
                 random_state=None,
                 allow_missing=False,
                 use_ray=False,
                 ray_remote_func_options=None):
        self.model_final = clone(model_final, safe=False)
        self.featurizer = clone(featurizer, safe=False)
        self.fit_cate_intercept = fit_cate_intercept
        self.cov_clip = cov_clip
        self.opt_reweighted = opt_reweighted
        super().__init__(discrete_outcome=discrete_outcome,
                         discrete_instrument=discrete_instrument,
                         discrete_treatment=discrete_treatment,
                         treatment_featurizer=treatment_featurizer,
                         categories=categories,
                         cv=cv,
                         mc_iters=mc_iters,
                         mc_agg=mc_agg,
                         random_state=random_state,
                         allow_missing=allow_missing,
                         use_ray=use_ray,
                         ray_remote_func_options=ray_remote_func_options)

    def _gen_allowed_missing_vars(self):
        return ['W'] if self.allow_missing else []

    # Maggie: I think that would be the case?
    def _get_inference_options(self):
        options = super()._get_inference_options()
        options.update(auto=GenericSingleTreatmentModelFinalInference)
        return options

    def _gen_featurizer(self):
        return clone(self.featurizer, safe=False)

    def _gen_model_final(self):
        return clone(self.model_final, safe=False)

    def _gen_ortho_learner_model_final(self):
        return _BaseDRIVModelFinal(self._gen_model_final(), self._gen_featurizer(), self.fit_cate_intercept,
                                   self.cov_clip, self.opt_reweighted)

    def _check_inputs(self, Y, T, Z, X, W):
        Y1, T1, Z1, = check_input_arrays(Y, T, Z)
        if len(Y1.shape) > 1 and Y1.shape[1] > 1:
            raise AssertionError("DRIV only supports single dimensional outcome")
        if len(T1.shape) > 1 and T1.shape[1] > 1:
            if self.discrete_treatment:
                raise AttributeError("DRIV only supports binary treatments")
            elif self.treatment_featurizer:  # defer possible failure to downstream logic
                pass
            else:
                raise AttributeError("DRIV only supports single-dimensional continuous treatments")
        if len(Z1.shape) > 1 and Z1.shape[1] > 1:
            if self.discrete_instrument:
                raise AttributeError("DRIV only supports binary instruments")
            else:
                raise AttributeError("DRIV only supports single-dimensional continuous instruments")
        return Y, T, Z, X, W

    def fit(self, Y, T, *, Z, X=None, W=None, sample_weight=None, freq_weight=None, sample_var=None, groups=None,
            cache_values=False, inference="auto"):
        """
        Estimate the counterfactual model from data, i.e. estimates function :math:`\\theta(\\cdot)`.

        Parameters
        ----------
        Y: (n,) vector of length n
            Outcomes for each sample
        T: (n,) vector of length n
            Treatments for each sample
        Z: (n, d_z) matrix
            Instruments for each sample
        X:(n, d_x) matrix, optional
            Features for each sample
        W:(n, d_w) matrix, optional
            Controls for each sample
        sample_weight : (n,) array_like, optional
            Individual weights for each sample. If None, it assumes equal weight.
        freq_weight: (n,) array_like of int, optional
            Weight for the observation. Observation i is treated as the mean
            outcome of freq_weight[i] independent observations.
            When ``sample_var`` is not None, this should be provided.
        sample_var : (n,) nd array_like, optional
            Variance of the outcome(s) of the original freq_weight[i] observations that were used to
            compute the mean outcome represented by observation i.
        groups: (n,) vector, optional
            All rows corresponding to the same group will be kept together during splitting.
            If groups is not None, the `cv` argument passed to this class's initializer
            must support a 'groups' argument to its split method.
        cache_values: bool, default False
            Whether to cache inputs and first stage results, which will allow refitting a different final model
        inference: str, :class:`.Inference` instance, or None
            Method for performing inference.  This estimator supports 'bootstrap'
            (or an instance of :class:`.BootstrapInference`) and 'auto'
            (or an instance of :class:`.GenericSingleTreatmentModelFinalInference`)

        Returns
        -------
        self
        """
        Y, T, Z, X, W = self._check_inputs(Y, T, Z, X, W)
        # Replacing fit from _OrthoLearner, to reorder arguments and improve the docstring
        return super().fit(Y, T, X=X, W=W, Z=Z,
                           sample_weight=sample_weight, freq_weight=freq_weight, sample_var=sample_var, groups=groups,
                           cache_values=cache_values, inference=inference)

    def refit_final(self, *, inference='auto'):
        return super().refit_final(inference=inference)
    refit_final.__doc__ = _OrthoLearner.refit_final.__doc__

    def score(self, Y, T, Z, X=None, W=None, sample_weight=None):
        """
        Score the fitted CATE model on a new data set. Generates nuisance parameters
        for the new data set based on the fitted residual nuisance models created at fit time.
        It uses the mean prediction of the models fitted by the different crossfit folds.
        Then calculates the MSE of the final residual Y on residual T regression.

        If model_final does not have a score method, then it raises an :exc:`.AttributeError`

        Parameters
        ----------
        Y: (n, d_y) matrix or vector of length n
            Outcomes for each sample
        T: (n, d_t) matrix or vector of length n
            Treatments for each sample
        Z: (n, d_z) matrix
            Instruments for each sample
        X:(n, d_x) matrix, optional
            Features for each sample
        W:(n, d_w) matrix, optional
            Controls for each sample
        sample_weight:(n,) vector, optional
            Weights for each samples


        Returns
        -------
        score: float
            The MSE of the final CATE model on the new data.
        """
        # Replacing score from _OrthoLearner, to enforce Z to be required and improve the docstring
        return super().score(Y, T, X=X, W=W, Z=Z, sample_weight=sample_weight)

    @property
    def featurizer_(self):
        """
        Get the fitted featurizer.

        Returns
        -------
        featurizer: object of type(`featurizer`)
            An instance of the fitted featurizer that was used to preprocess X in the final CATE model training.
            Available only when featurizer is not None and X is not None.
        """
        return self.ortho_learner_model_final_._featurizer

    @property
    def original_featurizer(self):
        # NOTE: important to use the ortho_learner_model_final_ attribute instead of the
        #       attribute so that the trained featurizer will be passed through
        return self.ortho_learner_model_final_._original_featurizer

    def cate_feature_names(self, feature_names=None):
        """
        Get the output feature names.

        Parameters
        ----------
        feature_names: list of str of length X.shape[1] or None
            The names of the input features. If None and X is a dataframe, it defaults to the column names
            from the dataframe.

        Returns
        -------
        out_feature_names: list of str or None
            The names of the output features :math:`\\phi(X)`, i.e. the features with respect to which the
            final CATE model for each treatment is linear. It is the names of the features that are associated
            with each entry of the :meth:`coef_` parameter. Available only when the featurizer is not None and has
            a method: `get_feature_names(feature_names)`. Otherwise None is returned.
        """
        if self._d_x is None:
            # Handles the corner case when X=None but featurizer might be not None
            return None
        if feature_names is None:
            feature_names = self._input_names["feature_names"]
        if self.original_featurizer is None:
            return feature_names
        return get_feature_names_or_default(self.original_featurizer, feature_names)

    @property
    def model_final_(self):
        # NOTE This is used by the inference methods and is more for internal use to the library
        return self.ortho_learner_model_final_._model_final

    @property
    def model_cate(self):
        """
        Get the fitted final CATE model.

        Returns
        -------
        model_cate: object of type(model_final)
            An instance of the model_final object that was fitted after calling fit which corresponds
            to the constant marginal CATE model.
        """
        return self.ortho_learner_model_final_._model_final

    def shap_values(self, X, *, feature_names=None, treatment_names=None, output_names=None, background_samples=100):
        return _shap_explain_model_cate(self.const_marginal_effect, self.model_cate, X, self._d_t, self._d_y,
                                        featurizer=self.featurizer_,
                                        feature_names=feature_names,
                                        treatment_names=treatment_names,
                                        output_names=output_names,
                                        input_names=self._input_names,
                                        background_samples=background_samples)
    shap_values.__doc__ = LinearCateEstimator.shap_values.__doc__

    @property
    def residuals_(self):
        """
        A tuple (prel_theta, Y_res, T_res, Z_res, cov, X, W, Z), of the residuals from the first stage estimation
        along with the associated X, W and Z. Samples are not guaranteed to be in the same
        order as the input order.
        """
        if not hasattr(self, '_cached_values'):
            raise AttributeError("Estimator is not fitted yet!")
        if self._cached_values is None:
            raise AttributeError("`fit` was called with `cache_values=False`. "
                                 "Set to `True` to enable residual storage.")
        prel_theta, Y_res, T_res, Z_res, cov = self._cached_values.nuisances
        return (prel_theta, Y_res, T_res, Z_res, cov, self._cached_values.X, self._cached_values.W,
                self._cached_values.Z)


class _DRIV(_BaseDRIV):
    """
    Private Base class for the DRIV algorithm.
    """

    def __init__(self, *,
                 model_y_xw="auto",
                 model_t_xw="auto",
                 model_z_xw="auto",
                 model_t_xwz="auto",
                 model_tz_xw="auto",
                 fit_cov_directly=True,
                 prel_model_effect,
                 model_final,
                 projection=False,
                 featurizer=None,
                 fit_cate_intercept=False,
                 cov_clip=1e-3,
                 opt_reweighted=False,
                 discrete_outcome=False,
                 discrete_instrument=False,
                 discrete_treatment=False,
                 treatment_featurizer=None,
                 categories='auto',
                 cv=2,
                 mc_iters=None,
                 mc_agg='mean',
                 random_state=None,
                 allow_missing=False,
                 use_ray=False,
                 ray_remote_func_options=None
                 ):
        self.model_y_xw = clone(model_y_xw, safe=False)
        self.model_t_xw = clone(model_t_xw, safe=False)
        self.model_t_xwz = clone(model_t_xwz, safe=False)
        self.model_z_xw = clone(model_z_xw, safe=False)
        self.model_tz_xw = clone(model_tz_xw, safe=False)
        self.prel_model_effect = clone(prel_model_effect, safe=False)
        self.projection = projection
        self.fit_cov_directly = fit_cov_directly
        super().__init__(model_final=model_final,
                         featurizer=featurizer,
                         fit_cate_intercept=fit_cate_intercept,
                         cov_clip=cov_clip,
                         opt_reweighted=opt_reweighted,
                         discrete_outcome=discrete_outcome,
                         discrete_instrument=discrete_instrument,
                         discrete_treatment=discrete_treatment,
                         treatment_featurizer=treatment_featurizer,
                         categories=categories,
                         cv=cv,
                         mc_iters=mc_iters,
                         mc_agg=mc_agg,
                         random_state=random_state,
                         allow_missing=allow_missing,
                         use_ray=use_ray,
                         ray_remote_func_options=ray_remote_func_options)

    def _gen_prel_model_effect(self):
        return clone(self.prel_model_effect, safe=False)

    def _gen_ortho_learner_model_nuisance(self):
        model_y_xw = _make_first_stage_selector(self.model_y_xw, self.discrete_outcome, self.random_state)
        model_t_xw = _make_first_stage_selector(self.model_t_xw, self.discrete_treatment, self.random_state)

        if self.projection:
            # this is a regression model since the instrument E[T|X,W,Z] is always continuous
            model_tz_xw = _make_first_stage_selector(self.model_tz_xw,
                                                     is_discrete=False,
                                                     random_state=self.random_state)

            # we're using E[T|X,W,Z] as the instrument
            model_z = _make_first_stage_selector(self.model_t_xwz,
                                                 is_discrete=self.discrete_treatment,
                                                 random_state=self.random_state)

        else:
            model_tz_xw = _make_first_stage_selector(self.model_tz_xw,
                                                     is_discrete=(self.discrete_treatment and
                                                                  self.discrete_instrument and
                                                                  not self.fit_cov_directly),
                                                     random_state=self.random_state)

            model_z = _make_first_stage_selector(self.model_z_xw,
                                                 is_discrete=self.discrete_instrument,
                                                 random_state=self.random_state)

        return [_BaseDRIVNuisanceSelector(prel_model_effect=self._gen_prel_model_effect(),
                                          model_y_xw=model_y_xw,
                                          model_t_xw=model_t_xw,
                                          model_z=model_z,
                                          projection=self.projection,
                                          discrete_treatment=self.discrete_treatment,
                                          discrete_instrument=self.discrete_instrument),
                _BaseDRIVNuisanceCovarianceSelector(model_tz_xw=model_tz_xw,
                                                    projection=self.projection,
                                                    fit_cov_directly=self.fit_cov_directly,
                                                    discrete_treatment=self.discrete_treatment,
                                                    discrete_instrument=self.discrete_instrument)]


[docs]class DRIV(_DRIV): """ The DRIV algorithm for estimating CATE with IVs. It is the parent of the public classes {LinearDRIV, SparseLinearDRIV,ForestDRIV} Parameters ---------- model_y_xw: estimator, default ``'auto'`` Determines how to fit the outcome to the features and controls (:math:`\\E[Y | X, W]`). - If ``'auto'``, the model will be the best-fitting of a set of linear and forest models - Otherwise, see :ref:`model_selection` for the range of supported options; if a single model is specified it should be a classifier if `discrete_outcome` is True and a regressor otherwise model_t_xw: estimator, default ``'auto'`` Determines how to fit the treatment to the features and controls (:math:`\\E[T | X, W]`). - If ``'auto'``, the model will be the best-fitting of a set of linear and forest models - Otherwise, see :ref:`model_selection` for the range of supported options; if a single model is specified it should be a classifier if `discrete_treatment` is True and a regressor otherwise model_z_xw: estimator, default ``'auto'`` Determines how to fit the instrument to the features and controls (:math:`\\E[Z | X, W]`). - If ``'auto'``, the model will be the best-fitting of a set of linear and forest models - Otherwise, see :ref:`model_selection` for the range of supported options; if a single model is specified it should be a classifier if `discrete_instrument` is True and a regressor otherwise model_t_xwz: estimator, default ``'auto'`` Determines how to fit the treatment to the features, controls, and instrument (:math:`\\E[T | X, W, Z]`). - If ``'auto'``, the model will be the best-fitting of a set of linear and forest models - Otherwise, see :ref:`model_selection` for the range of supported options; if a single model is specified it should be a classifier if `discrete_treatment` is True and a regressor otherwise model_tz_xw: estimator, default ``'auto'`` Determines how to fit the covariance to the features and controls (:math:`\\E[T*Z | X, W]` or :math:`\\E[\\tilde{T}*\\tilde{Z} | X, W]` depending on `fit_cov_directly`). - If ``'auto'``, the model will be the best-fitting of a set of linear and forest models - Otherwise, see :ref:`model_selection` for the range of supported options; if a single model is specified it should be a classifier if `discrete_treatment` is True and a regressor otherwise fit_cov_directly : bool, default True Whether to fit :math:`\\E[\\tilde{T}*\\tilde{Z} | X, W]` instead of :math:`\\E[T*Z | X, W]`. Otherwise, we compute :math:`\\E[\\tilde{T}*\\tilde{Z} | X, W]` from :math:`\\E[T*Z | X, W] - \\E[T | X, W] \\E[Z | X, W]`. flexible_model_effect : estimator or 'auto' (default is 'auto') a flexible model for a preliminary version of the CATE, must accept sample_weight at fit time. If 'auto', :class:`.StatsModelsLinearRegression` will be applied. model_final : estimator, optional a final model for the CATE and projections. If None, then flexible_model_effect is also used as a final model prel_cate_approach : one of {'driv', 'dmliv'}, default 'driv' model that estimates a preliminary version of the CATE. If 'driv', :class:`._DRIV` will be used. If 'dmliv', :class:`.NonParamDMLIV` will be used prel_cv : int, cross-validation generator or an iterable, default 1 Determines the cross-validation splitting strategy for the preliminary effect model. prel_opt_reweighted : bool, default True Whether to reweight the samples to minimize variance for the preliminary effect model. projection: bool, default False If True, we fit a slight variant of DRIV where we use E[T|X, W, Z] as the instrument as opposed to Z, model_z_xw will be disabled; If False, model_t_xwz will be disabled. featurizer : :term:`transformer`, optional Must support fit_transform and transform. Used to create composite features in the final CATE regression. It is ignored if X is None. The final CATE will be trained on the outcome of featurizer.fit_transform(X). If featurizer=None, then CATE is trained on X. fit_cate_intercept : bool, default False Whether the linear CATE model should have a constant term. cov_clip : float, default 0.1 clipping of the covariate for regions with low "overlap", to reduce variance opt_reweighted : bool, default False Whether to reweight the samples to minimize variance. If True then model_final.fit must accept sample_weight as a kw argument. If True then assumes the model_final is flexible enough to fit the true CATE model. Otherwise, it method will return a biased projection to the model_final space, biased to give more weight on parts of the feature space where the instrument is strong. discrete_outcome: bool, default False Whether the outcome should be treated as binary discrete_instrument: bool, default False Whether the instrument values should be treated as categorical, rather than continuous, quantities discrete_treatment: bool, default False Whether the treatment values should be treated as categorical, rather than continuous, quantities treatment_featurizer : :term:`transformer`, optional Must support fit_transform and transform. Used to create composite treatment in the final CATE regression. The final CATE will be trained on the outcome of featurizer.fit_transform(T). If featurizer=None, then CATE is trained on T. categories: 'auto' or list, default 'auto' The categories to use when encoding discrete treatments (or 'auto' to use the unique sorted values). The first category will be treated as the control treatment. cv: int, cross-validation generator or an iterable, default 2 Determines the cross-validation splitting strategy. Possible inputs for cv are: - None, to use the default 3-fold cross-validation, - integer, to specify the number of folds. - :term:`CV splitter` - An iterable yielding (train, test) splits as arrays of indices. For integer/None inputs, if the treatment is discrete :class:`~sklearn.model_selection.StratifiedKFold` is used, else, :class:`~sklearn.model_selection.KFold` is used (with a random shuffle in either case). Unless an iterable is used, we call `split(concat[W, X], T)` to generate the splits. If all W, X are None, then we call `split(ones((T.shape[0], 1)), T)`. mc_iters: int, optional The number of times to rerun the first stage models to reduce the variance of the nuisances. mc_agg: {'mean', 'median'}, default 'mean' How to aggregate the nuisance value for each sample across the `mc_iters` monte carlo iterations of cross-fitting. random_state : int, RandomState instance, or None, default None If int, random_state is the seed used by the random number generator; If :class:`~numpy.random.mtrand.RandomState` instance, random_state is the random number generator; If None, the random number generator is the :class:`~numpy.random.mtrand.RandomState` instance used by :mod:`np.random<numpy.random>`. allow_missing: bool Whether to allow missing values in W. If True, will need to supply nuisance models that can handle missing values. use_ray: bool, default False Whether to use Ray to parallelize the cross-validation step. If True, Ray must be installed. ray_remote_func_options : dict, default None Options to pass to the remote function when using Ray. See https://docs.ray.io/en/latest/ray-core/api/doc/ray.remote.html Examples -------- A simple example with the default models: .. testcode:: :hide: import numpy as np import scipy.special np.set_printoptions(suppress=True) .. testcode:: from econml.iv.dr import DRIV # Define the data generation functions def dgp(n, p, true_fn): X = np.random.normal(0, 1, size=(n, p)) Z = np.random.binomial(1, 0.5, size=(n,)) nu = np.random.uniform(0, 10, size=(n,)) coef_Z = 0.8 C = np.random.binomial( 1, coef_Z * scipy.special.expit(0.4 * X[:, 0] + nu) ) # Compliers when recomended C0 = np.random.binomial( 1, 0.06 * np.ones(X.shape[0]) ) # Non-compliers when not recommended T = C * Z + C0 * (1 - Z) y = true_fn(X) * T + 2 * nu + 5 * (X[:, 3] > 0) + 0.1 * np.random.uniform(0, 1, size=(n,)) return y, T, Z, X def true_heterogeneity_function(X): return 5 * X[:, 0] np.random.seed(123) y, T, Z, X = dgp(1000, 5, true_heterogeneity_function) est = DRIV(discrete_treatment=True, discrete_instrument=True) est.fit(Y=y, T=T, Z=Z, X=X) >>> est.effect(X[:3]) array([-4.15079..., 5.99291..., -2.86514...]) """
[docs] def __init__(self, *, model_y_xw="auto", model_t_xw="auto", model_z_xw="auto", model_t_xwz="auto", model_tz_xw="auto", fit_cov_directly=True, flexible_model_effect="auto", model_final=None, prel_cate_approach="driv", prel_cv=1, prel_opt_reweighted=True, projection=False, featurizer=None, fit_cate_intercept=False, cov_clip=1e-3, opt_reweighted=False, discrete_outcome=False, discrete_instrument=False, discrete_treatment=False, treatment_featurizer=None, categories='auto', cv=2, mc_iters=None, mc_agg='mean', random_state=None, allow_missing=False, use_ray=False, ray_remote_func_options=None ): if flexible_model_effect == "auto": self.flexible_model_effect = StatsModelsLinearRegression(fit_intercept=False) else: self.flexible_model_effect = clone(flexible_model_effect, safe=False) self.prel_cate_approach = prel_cate_approach self.prel_cv = prel_cv self.prel_opt_reweighted = prel_opt_reweighted super().__init__(model_y_xw=model_y_xw, model_t_xw=model_t_xw, model_z_xw=model_z_xw, model_t_xwz=model_t_xwz, model_tz_xw=model_tz_xw, fit_cov_directly=fit_cov_directly, prel_model_effect=self.prel_cate_approach, model_final=model_final, projection=projection, featurizer=featurizer, fit_cate_intercept=fit_cate_intercept, cov_clip=cov_clip, opt_reweighted=opt_reweighted, discrete_outcome=discrete_outcome, discrete_instrument=discrete_instrument, discrete_treatment=discrete_treatment, treatment_featurizer=treatment_featurizer, categories=categories, cv=cv, mc_iters=mc_iters, mc_agg=mc_agg, random_state=random_state, allow_missing=allow_missing, use_ray=use_ray, ray_remote_func_options=ray_remote_func_options)
def _gen_model_final(self): if self.model_final is None: return clone(self.flexible_model_effect, safe=False) return clone(self.model_final, safe=False) def _gen_prel_model_effect(self): if self.prel_cate_approach == "driv": return _DRIV(model_y_xw=clone(self.model_y_xw, safe=False), model_t_xw=clone(self.model_t_xw, safe=False), model_z_xw=clone(self.model_z_xw, safe=False), model_t_xwz=clone(self.model_t_xwz, safe=False), model_tz_xw=clone(self.model_tz_xw, safe=False), prel_model_effect=_DummyCATE(), model_final=clone(self.flexible_model_effect, safe=False), projection=self.projection, fit_cov_directly=self.fit_cov_directly, featurizer=self._gen_featurizer(), fit_cate_intercept=self.fit_cate_intercept, cov_clip=self.cov_clip, opt_reweighted=self.prel_opt_reweighted, discrete_instrument=self.discrete_instrument, discrete_treatment=self.discrete_treatment, discrete_outcome=self.discrete_outcome, categories=self.categories, cv=self.prel_cv, mc_iters=self.mc_iters, mc_agg=self.mc_agg, random_state=self.random_state, allow_missing=self.allow_missing) elif self.prel_cate_approach == "dmliv": return NonParamDMLIV(model_y_xw=clone(self.model_y_xw, safe=False), model_t_xw=clone(self.model_t_xw, safe=False), model_t_xwz=clone(self.model_t_xwz, safe=False), model_final=clone(self.flexible_model_effect, safe=False), discrete_instrument=self.discrete_instrument, discrete_treatment=self.discrete_treatment, discrete_outcome=self.discrete_outcome, featurizer=self._gen_featurizer(), categories=self.categories, cv=self.prel_cv, mc_iters=self.mc_iters, mc_agg=self.mc_agg, random_state=self.random_state, allow_missing=self.allow_missing) else: raise ValueError( "We only support 'dmliv' or 'driv' preliminary model effect, " f"but received '{self.prel_cate_approach}'!")
[docs] def fit(self, Y, T, *, Z, X=None, W=None, sample_weight=None, freq_weight=None, sample_var=None, groups=None, cache_values=False, inference="auto"): """ Estimate the counterfactual model from data, i.e. estimates function :math:`\\theta(\\cdot)`. Parameters ---------- Y: (n,) vector of length n Outcomes for each sample T: (n,) vector of length n Treatments for each sample Z: (n, d_z) matrix Instruments for each sample X:(n, d_x) matrix, optional Features for each sample W:(n, d_w) matrix, optional Controls for each sample sample_weight : (n,) array_like, optional Individual weights for each sample. If None, it assumes equal weight. freq_weight: (n,) array_like of int, optional Weight for the observation. Observation i is treated as the mean outcome of freq_weight[i] independent observations. When ``sample_var`` is not None, this should be provided. sample_var : (n,) nd array_like, optional Variance of the outcome(s) of the original freq_weight[i] observations that were used to compute the mean outcome represented by observation i. groups: (n,) vector, optional All rows corresponding to the same group will be kept together during splitting. If groups is not None, the `cv` argument passed to this class's initializer must support a 'groups' argument to its split method. cache_values: bool, default False Whether to cache inputs and first stage results, which will allow refitting a different final model inference: str, :class:`.Inference` instance, or None Method for performing inference. This estimator supports 'bootstrap' (or an instance of :class:`.BootstrapInference`) and 'auto' (or an instance of :class:`.GenericSingleTreatmentModelFinalInference`) Returns ------- self """ if self.projection: assert self.model_z_xw == "auto", ("In the case of projection=True, model_z_xw will not be fitted, " "please keep it as default!") if self.prel_cate_approach == "driv" and not self.projection: assert self.model_t_xwz == "auto", ("In the case of projection=False and prel_cate_approach='driv', " "model_t_xwz will not be fitted, " "please keep it as default!") return super().fit(Y, T, X=X, W=W, Z=Z, sample_weight=sample_weight, freq_weight=freq_weight, sample_var=sample_var, groups=groups, cache_values=cache_values, inference=inference)
@property def models_y_xw(self): """ Get the fitted models for :math:`\\E[Y | X]`. Returns ------- models_y_xw: nested list of objects of type(`model_y_xw`) A nested list of instances of the `model_y_xw` object. Number of sublist equals to number of monte carlo iterations, each element in the sublist corresponds to a crossfitting fold and is the model instance that was fitted for that training fold. """ return [[mdl._model_y_xw.best_model._model for mdl in mdls[0]] for mdls in super().models_nuisance_] @property def models_t_xw(self): """ Get the fitted models for :math:`\\E[T | X]`. Returns ------- models_t_xw: nested list of objects of type(`model_t_xw`) A nested list of instances of the `model_t_xw` object. Number of sublist equals to number of monte carlo iterations, each element in the sublist corresponds to a crossfitting fold and is the model instance that was fitted for that training fold. """ return [[mdl._model_t_xw.best_model._model for mdl in mdls[0]] for mdls in super().models_nuisance_] @property def models_z_xw(self): """ Get the fitted models for :math:`\\E[Z | X]`. Returns ------- models_z_xw: nested list of objects of type(`model_z_xw`) A nested list of instances of the `model_z_xw` object. Number of sublist equals to number of monte carlo iterations, each element in the sublist corresponds to a crossfitting fold and is the model instance that was fitted for that training fold. """ if self.projection: raise AttributeError("Projection model is fitted for instrument! Use models_t_xwz.") return [[mdl._model_z_xw.best_model._model for mdl in mdls[0]] for mdls in super().models_nuisance_] @property def models_t_xwz(self): """ Get the fitted models for :math:`\\E[Z | X]`. Returns ------- models_z_xw: nested list of objects of type(`model_z_xw`) A nested list of instances of the `model_z_xw` object. Number of sublist equals to number of monte carlo iterations, each element in the sublist corresponds to a crossfitting fold and is the model instance that was fitted for that training fold. """ if not self.projection: raise AttributeError("Direct model is fitted for instrument! Use models_z_xw.") return [[mdl._model_t_xwz.best_model._model for mdl in mdls[0]] for mdls in super().models_nuisance_] @property def models_tz_xw(self): """ Get the fitted models for :math:`\\E[T*Z | X]`. Returns ------- models_tz_xw: nested list of objects of type(`model_tz_xw`) A nested list of instances of the `model_tz_xw` object. Number of sublist equals to number of monte carlo iterations, each element in the sublist corresponds to a crossfitting fold and is the model instance that was fitted for that training fold. """ return [[mdl._model_tz_xw.best_model._model for mdl in mdls[1]] for mdls in super().models_nuisance_] @property def models_prel_model_effect(self): """ Get the fitted preliminary CATE estimator. Returns ------- prel_model_effect: nested list of objects of type(`prel_model_effect`) A nested list of instances of the `prel_model_effect` object. Number of sublist equals to number of monte carlo iterations, each element in the sublist corresponds to a crossfitting fold and is the model instance that was fitted for that training fold. """ return [[mdl._prel_model_effect for mdl in mdls[0]] for mdls in super().models_nuisance_] @property def nuisance_scores_y_xw(self): """ Get the scores for y_xw model on the out-of-sample training data """ return self.nuisance_scores_[0] @property def nuisance_scores_t_xw(self): """ Get the scores for t_xw model on the out-of-sample training data """ return self.nuisance_scores_[1] @property def nuisance_scores_z_xw(self): """ Get the scores for z_xw model on the out-of-sample training data """ if self.projection: raise AttributeError("Projection model is fitted for instrument! Use nuisance_scores_t_xwz.") return self.nuisance_scores_[2] @property def nuisance_scores_t_xwz(self): """ Get the scores for z_xw model on the out-of-sample training data """ if not self.projection: raise AttributeError("Direct model is fitted for instrument! Use nuisance_scores_z_xw.") return self.nuisance_scores_[2] @property def nuisance_scores_prel_model_effect(self): """ Get the scores for prel_model_effect model on the out-of-sample training data """ return self.nuisance_scores_[3] @property def nuisance_scores_tz_xw(self): """ Get the scores for tz_xw model on the out-of-sample training data """ return self.nuisance_scores_[4]
[docs]class LinearDRIV(StatsModelsCateEstimatorMixin, DRIV): """ Special case of the :class:`.DRIV` where the final stage is a Linear Regression. In this case, inference can be performed via the StatsModels Inference approach and its asymptotic normal characterization of the estimated parameters. This is computationally faster than bootstrap inference. Leave the default ``inference='auto'`` unchanged, or explicitly set ``inference='statsmodels'`` at fit time to enable inference via asymptotic normality. Parameters ---------- model_y_xw: estimator, default ``'auto'`` Determines how to fit the outcome to the features and controls (:math:`\\E[Y | X, W]`). - If ``'auto'``, the model will be the best-fitting of a set of linear and forest models - Otherwise, see :ref:`model_selection` for the range of supported options; if a single model is specified it should be a classifier if `discrete_outcome` is True and a regressor otherwise model_t_xw: estimator, default ``'auto'`` Determines how to fit the treatment to the features and controls (:math:`\\E[T | X, W]`). - If ``'auto'``, the model will be the best-fitting of a set of linear and forest models - Otherwise, see :ref:`model_selection` for the range of supported options; if a single model is specified it should be a classifier if `discrete_treatment` is True and a regressor otherwise model_z_xw: estimator, default ``'auto'`` Determines how to fit the instrument to the features and controls (:math:`\\E[Z | X, W]`). - If ``'auto'``, the model will be the best-fitting of a set of linear and forest models - Otherwise, see :ref:`model_selection` for the range of supported options; if a single model is specified it should be a classifier if `discrete_instrument` is True and a regressor otherwise model_t_xwz: estimator, default ``'auto'`` Determines how to fit the treatment to the features, controls, and instrument (:math:`\\E[T | X, W, Z]`). - If ``'auto'``, the model will be the best-fitting of a set of linear and forest models - Otherwise, see :ref:`model_selection` for the range of supported options; if a single model is specified it should be a classifier if `discrete_treatment` is True and a regressor otherwise model_tz_xw: estimator, default ``'auto'`` Determines how to fit the covariance to the features and controls (:math:`\\E[T*Z | X, W]` or :math:`\\E[\\tilde{T}*\\tilde{Z} | X, W]` depending on `fit_cov_directly`). - If ``'auto'``, the model will be the best-fitting of a set of linear and forest models - Otherwise, see :ref:`model_selection` for the range of supported options; if a single model is specified it should be a classifier if `discrete_treatment` is True and a regressor otherwise fit_cov_directly : bool, default True Whether to fit :math:`\\E[\\tilde{T}*\\tilde{Z} | X, W]` instead of :math:`\\E[T*Z | X, W]`. Otherwise, we compute :math:`\\E[\\tilde{T}*\\tilde{Z} | X, W]` from :math:`\\E[T*Z | X, W] - \\E[T | X, W] \\E[Z | X, W]`. flexible_model_effect : estimator or 'auto' (default is 'auto') a flexible model for a preliminary version of the CATE, must accept sample_weight at fit time. If 'auto', :class:`.StatsModelsLinearRegression` will be applied. prel_cate_approach : one of {'driv', 'dmliv'}, default 'driv' model that estimates a preliminary version of the CATE. If 'driv', :class:`._DRIV` will be used. If 'dmliv', :class:`.NonParamDMLIV` will be used prel_cv : int, cross-validation generator or an iterable, default 1 Determines the cross-validation splitting strategy for the preliminary effect model. prel_opt_reweighted : bool, default True Whether to reweight the samples to minimize variance for the preliminary effect model. projection: bool, default False If True, we fit a slight variant of DRIV where we use E[T|X, W, Z] as the instrument as opposed to Z, model_z_xw will be disabled; If False, model_t_xwz will be disabled. featurizer : :term:`transformer`, optional Must support fit_transform and transform. Used to create composite features in the final CATE regression. It is ignored if X is None. The final CATE will be trained on the outcome of featurizer.fit_transform(X). If featurizer=None, then CATE is trained on X. fit_cate_intercept : bool, default True Whether the linear CATE model should have a constant term. cov_clip : float, default 0.1 clipping of the covariate for regions with low "overlap", to reduce variance opt_reweighted : bool, default False Whether to reweight the samples to minimize variance. If True then model_final.fit must accept sample_weight as a kw argument. If True then assumes the model_final is flexible enough to fit the true CATE model. Otherwise, it method will return a biased projection to the model_final space, biased to give more weight on parts of the feature space where the instrument is strong. discrete_outcome: bool, default False Whether the outcome should be treated as binary discrete_instrument: bool, default False Whether the instrument values should be treated as categorical, rather than continuous, quantities discrete_treatment: bool, default False Whether the treatment values should be treated as categorical, rather than continuous, quantities treatment_featurizer : :term:`transformer`, optional Must support fit_transform and transform. Used to create composite treatment in the final CATE regression. The final CATE will be trained on the outcome of featurizer.fit_transform(T). If featurizer=None, then CATE is trained on T. categories: 'auto' or list, default 'auto' The categories to use when encoding discrete treatments (or 'auto' to use the unique sorted values). The first category will be treated as the control treatment. cv: int, cross-validation generator or an iterable, default 2 Determines the cross-validation splitting strategy. Possible inputs for cv are: - None, to use the default 3-fold cross-validation, - integer, to specify the number of folds. - :term:`CV splitter` - An iterable yielding (train, test) splits as arrays of indices. For integer/None inputs, if the treatment is discrete :class:`~sklearn.model_selection.StratifiedKFold` is used, else, :class:`~sklearn.model_selection.KFold` is used (with a random shuffle in either case). Unless an iterable is used, we call `split(concat[W, X], T)` to generate the splits. If all W, X are None, then we call `split(ones((T.shape[0], 1)), T)`. mc_iters: int, optional The number of times to rerun the first stage models to reduce the variance of the nuisances. mc_agg: {'mean', 'median'}, default 'mean' How to aggregate the nuisance value for each sample across the `mc_iters` monte carlo iterations of cross-fitting. random_state : int, RandomState instance, or None, default None If int, random_state is the seed used by the random number generator; If :class:`~numpy.random.mtrand.RandomState` instance, random_state is the random number generator; If None, the random number generator is the :class:`~numpy.random.mtrand.RandomState` instance used by :mod:`np.random<numpy.random>`. allow_missing: bool Whether to allow missing values in W. If True, will need to supply nuisance models that can handle missing values. use_ray: bool, default False Whether to use Ray to parallelize the cross-validation step. If True, Ray must be installed. ray_remote_func_options : dict, default None Options to pass to the remote function when using Ray. See https://docs.ray.io/en/latest/ray-core/api/doc/ray.remote.html Examples -------- A simple example with the default models: .. testcode:: :hide: import numpy as np import scipy.special np.set_printoptions(suppress=True) .. testcode:: from econml.iv.dr import LinearDRIV # Define the data generation functions def dgp(n, p, true_fn): X = np.random.normal(0, 1, size=(n, p)) Z = np.random.binomial(1, 0.5, size=(n,)) nu = np.random.uniform(0, 10, size=(n,)) coef_Z = 0.8 C = np.random.binomial( 1, coef_Z * scipy.special.expit(0.4 * X[:, 0] + nu) ) # Compliers when recomended C0 = np.random.binomial( 1, 0.06 * np.ones(X.shape[0]) ) # Non-compliers when not recommended T = C * Z + C0 * (1 - Z) y = true_fn(X) * T + 2 * nu + 5 * (X[:, 3] > 0) + 0.1 * np.random.uniform(0, 1, size=(n,)) return y, T, Z, X def true_heterogeneity_function(X): return 5 * X[:, 0] np.random.seed(123) y, T, Z, X = dgp(1000, 5, true_heterogeneity_function) est = LinearDRIV(discrete_treatment=True, discrete_instrument=True) est.fit(Y=y, T=T, Z=Z, X=X) >>> est.effect(X[:3]) array([-4.27803..., 5.84999..., -2.98296...]) >>> est.effect_interval(X[:3]) (array([-7.16141..., 1.71887..., -5.41441...]), array([-1.39465..., 9.98110..., -0.55151...])) >>> est.coef_ array([ 4.65225..., 0.93347..., 0.23315..., 0.22843..., -0.42850...]) >>> est.coef__interval() (array([ 3.40045..., -0.19165..., -0.95122..., -0.88662..., -1.56024...]), array([5.90404..., 2.05861..., 1.41753..., 1.34349..., 0.70324...])) >>> est.intercept_ -0.12823... >>> est.intercept__interval() (-1.27155..., 1.01508...) """
[docs] def __init__(self, *, model_y_xw="auto", model_t_xw="auto", model_z_xw="auto", model_t_xwz="auto", model_tz_xw="auto", fit_cov_directly=True, flexible_model_effect="auto", prel_cate_approach="driv", prel_cv=1, prel_opt_reweighted=True, projection=False, featurizer=None, fit_cate_intercept=True, cov_clip=1e-3, opt_reweighted=False, discrete_outcome=False, discrete_instrument=False, discrete_treatment=False, treatment_featurizer=None, categories='auto', cv=2, mc_iters=None, mc_agg='mean', random_state=None, allow_missing=False, use_ray=False, ray_remote_func_options=None ): super().__init__(model_y_xw=model_y_xw, model_t_xw=model_t_xw, model_z_xw=model_z_xw, model_t_xwz=model_t_xwz, model_tz_xw=model_tz_xw, fit_cov_directly=fit_cov_directly, flexible_model_effect=flexible_model_effect, model_final=None, prel_cate_approach=prel_cate_approach, prel_cv=prel_cv, prel_opt_reweighted=prel_opt_reweighted, projection=projection, featurizer=featurizer, fit_cate_intercept=fit_cate_intercept, cov_clip=cov_clip, opt_reweighted=opt_reweighted, discrete_outcome=discrete_outcome, discrete_instrument=discrete_instrument, discrete_treatment=discrete_treatment, treatment_featurizer=treatment_featurizer, categories=categories, cv=cv, mc_iters=mc_iters, mc_agg=mc_agg, random_state=random_state, allow_missing=allow_missing, use_ray=use_ray, ray_remote_func_options=ray_remote_func_options)
def _gen_model_final(self): return StatsModelsLinearRegression(fit_intercept=False)
[docs] def fit(self, Y, T, *, Z, X=None, W=None, sample_weight=None, freq_weight=None, sample_var=None, groups=None, cache_values=False, inference='auto'): """ Estimate the counterfactual model from data, i.e. estimates function :math:`\\theta(\\cdot)`. Parameters ---------- Y: (n,) vector of length n Outcomes for each sample T: (n,) vector of length n Treatments for each sample Z: (n, d_z) matrix Instruments for each sample X:(n, d_x) matrix, optional Features for each sample W:(n, d_w) matrix, optional Controls for each sample sample_weight : (n,) array_like, optional Individual weights for each sample. If None, it assumes equal weight. freq_weight: (n,) array_like of int, optional Weight for the observation. Observation i is treated as the mean outcome of freq_weight[i] independent observations. When ``sample_var`` is not None, this should be provided. sample_var : (n,) nd array_like, optional Variance of the outcome(s) of the original freq_weight[i] observations that were used to compute the mean outcome represented by observation i. groups: (n,) vector, optional All rows corresponding to the same group will be kept together during splitting. If groups is not None, the `cv` argument passed to this class's initializer must support a 'groups' argument to its split method. cache_values: bool, default False Whether to cache inputs and first stage results, which will allow refitting a different final model inference: str, :class:`.Inference` instance, or None Method for performing inference. This estimator supports ``'bootstrap'`` (or an instance of :class:`.BootstrapInference`) and ``'statsmodels'`` (or an instance of :class:`.StatsModelsInferenceDiscrete`). Returns ------- self """ # Replacing fit from _OrthoLearner, to reorder arguments and improve the docstring return super().fit(Y, T, X=X, W=W, Z=Z, sample_weight=sample_weight, freq_weight=freq_weight, sample_var=sample_var, groups=groups, cache_values=cache_values, inference=inference)
@property def fit_cate_intercept_(self): return self.ortho_learner_model_final_._fit_cate_intercept @property def bias_part_of_coef(self): return self.ortho_learner_model_final_._fit_cate_intercept @property def model_final(self): return self._gen_model_final() @model_final.setter def model_final(self, model): if model is not None: raise ValueError("Parameter `model_final` cannot be altered for this estimator!")
[docs]class SparseLinearDRIV(DebiasedLassoCateEstimatorMixin, DRIV): """ Special case of the :class:`.DRIV` where the final stage is a Debiased Lasso Regression. In this case, inference can be performed via the debiased lasso approach and its asymptotic normal characterization of the estimated parameters. This is computationally faster than bootstrap inference. Leave the default ``inference='auto'`` unchanged, or explicitly set ``inference='debiasedlasso'`` at fit time to enable inference via asymptotic normality. Parameters ---------- model_y_xw: estimator, default ``'auto'`` Determines how to fit the outcome to the features and controls (:math:`\\E[Y | X, W]`). - If ``'auto'``, the model will be the best-fitting of a set of linear and forest models - Otherwise, see :ref:`model_selection` for the range of supported options; if a single model is specified it should be a classifier if `discrete_outcome` is True and a regressor otherwise model_t_xw: estimator, default ``'auto'`` Determines how to fit the treatment to the features and controls (:math:`\\E[T | X, W]`). - If ``'auto'``, the model will be the best-fitting of a set of linear and forest models - Otherwise, see :ref:`model_selection` for the range of supported options; if a single model is specified it should be a classifier if `discrete_treatment` is True and a regressor otherwise model_z_xw: estimator, default ``'auto'`` Determines how to fit the instrument to the features and controls (:math:`\\E[Z | X, W]`). - If ``'auto'``, the model will be the best-fitting of a set of linear and forest models - Otherwise, see :ref:`model_selection` for the range of supported options; if a single model is specified it should be a classifier if `discrete_instrument` is True and a regressor otherwise model_t_xwz: estimator, default ``'auto'`` Determines how to fit the treatment to the features, controls, and instrument (:math:`\\E[T | X, W, Z]`). - If ``'auto'``, the model will be the best-fitting of a set of linear and forest models - Otherwise, see :ref:`model_selection` for the range of supported options; if a single model is specified it should be a classifier if `discrete_treatment` is True and a regressor otherwise model_tz_xw: estimator, default ``'auto'`` Determines how to fit the covariance to the features and controls (:math:`\\E[T*Z | X, W]` or :math:`\\E[\\tilde{T}*\\tilde{Z} | X, W]` depending on `fit_cov_directly`). - If ``'auto'``, the model will be the best-fitting of a set of linear and forest models - Otherwise, see :ref:`model_selection` for the range of supported options; if a single model is specified it should be a classifier if `discrete_treatment` is True and a regressor otherwise fit_cov_directly : bool, default True Whether to fit :math:`\\E[\\tilde{T}*\\tilde{Z} | X, W]` instead of :math:`\\E[T*Z | X, W]`. Otherwise, we compute :math:`\\E[\\tilde{T}*\\tilde{Z} | X, W]` from :math:`\\E[T*Z | X, W] - \\E[T | X, W] \\E[Z | X, W]`. flexible_model_effect : estimator or 'auto' (default is 'auto') a flexible model for a preliminary version of the CATE, must accept sample_weight at fit time. If 'auto', :class:`.StatsModelsLinearRegression` will be applied. prel_cate_approach : one of {'driv', 'dmliv'}, default 'driv' model that estimates a preliminary version of the CATE. If 'driv', :class:`._DRIV` will be used. If 'dmliv', :class:`.NonParamDMLIV` will be used prel_cv : int, cross-validation generator or an iterable, default 1 Determines the cross-validation splitting strategy for the preliminary effect model. prel_opt_reweighted : bool, default True Whether to reweight the samples to minimize variance for the preliminary effect model. projection: bool, default False If True, we fit a slight variant of DRIV where we use E[T|X, W, Z] as the instrument as opposed to Z, model_z_xw will be disabled; If False, model_t_xwz will be disabled. featurizer : :term:`transformer`, optional Must support fit_transform and transform. Used to create composite features in the final CATE regression. It is ignored if X is None. The final CATE will be trained on the outcome of featurizer.fit_transform(X). If featurizer=None, then CATE is trained on X. fit_cate_intercept : bool, default True Whether the linear CATE model should have a constant term. alpha: str | float, optional., default 'auto'. CATE L1 regularization applied through the debiased lasso in the final model. 'auto' corresponds to a CV form of the :class:`DebiasedLasso`. n_alphas : int, default 100 How many alphas to try if alpha='auto' alpha_cov : str | float, default 'auto' The regularization alpha that is used when constructing the pseudo inverse of the covariance matrix Theta used to for correcting the final state lasso coefficient in the debiased lasso. Each such regression corresponds to the regression of one feature on the remainder of the features. n_alphas_cov : int, default 10 How many alpha_cov to try if alpha_cov='auto'. max_iter : int, default 1000 The maximum number of iterations in the Debiased Lasso tol : float, default 1e-4 The tolerance for the optimization: if the updates are smaller than ``tol``, the optimization code checks the dual gap for optimality and continues until it is smaller than ``tol``. n_jobs : int or None, optional The number of jobs to run in parallel for both `fit` and `predict`. ``None`` means 1 unless in a :func:`joblib.parallel_backend` context. ``-1`` means using all processors. cov_clip : float, default 0.1 clipping of the covariate for regions with low "overlap", to reduce variance opt_reweighted : bool, default False Whether to reweight the samples to minimize variance. If True then model_final.fit must accept sample_weight as a kw argument. If True then assumes the model_final is flexible enough to fit the true CATE model. Otherwise, it method will return a biased projection to the model_final space, biased to give more weight on parts of the feature space where the instrument is strong. discrete_outcome: bool, default False Whether the outcome should be treated as binary discrete_instrument: bool, default False Whether the instrument values should be treated as categorical, rather than continuous, quantities discrete_treatment: bool, default False Whether the treatment values should be treated as categorical, rather than continuous, quantities treatment_featurizer : :term:`transformer`, optional Must support fit_transform and transform. Used to create composite treatment in the final CATE regression. The final CATE will be trained on the outcome of featurizer.fit_transform(T). If featurizer=None, then CATE is trained on T. categories: 'auto' or list, default 'auto' The categories to use when encoding discrete treatments (or 'auto' to use the unique sorted values). The first category will be treated as the control treatment. cv: int, cross-validation generator or an iterable, default 2 Determines the cross-validation splitting strategy. Possible inputs for cv are: - None, to use the default 3-fold cross-validation, - integer, to specify the number of folds. - :term:`CV splitter` - An iterable yielding (train, test) splits as arrays of indices. For integer/None inputs, if the treatment is discrete :class:`~sklearn.model_selection.StratifiedKFold` is used, else, :class:`~sklearn.model_selection.KFold` is used (with a random shuffle in either case). Unless an iterable is used, we call `split(concat[W, X], T)` to generate the splits. If all W, X are None, then we call `split(ones((T.shape[0], 1)), T)`. mc_iters: int, optional The number of times to rerun the first stage models to reduce the variance of the nuisances. mc_agg: {'mean', 'median'}, default 'mean' How to aggregate the nuisance value for each sample across the `mc_iters` monte carlo iterations of cross-fitting. random_state : int, RandomState instance, or None, default None If int, random_state is the seed used by the random number generator; If :class:`~numpy.random.mtrand.RandomState` instance, random_state is the random number generator; If None, the random number generator is the :class:`~numpy.random.mtrand.RandomState` instance used by :mod:`np.random<numpy.random>`. allow_missing: bool Whether to allow missing values in W. If True, will need to supply nuisance models that can handle missing values. use_ray: bool, default False Whether to use Ray to parallelize the cross-validation step. If True, Ray must be installed. ray_remote_func_options : dict, default None Options to pass to the remote function when using Ray. See https://docs.ray.io/en/latest/ray-core/api/doc/ray.remote.html Examples -------- A simple example with the default models: .. testcode:: :hide: import numpy as np import scipy.special np.set_printoptions(suppress=True) .. testcode:: from econml.iv.dr import SparseLinearDRIV # Define the data generation functions def dgp(n, p, true_fn): X = np.random.normal(0, 1, size=(n, p)) Z = np.random.binomial(1, 0.5, size=(n,)) nu = np.random.uniform(0, 10, size=(n,)) coef_Z = 0.8 C = np.random.binomial( 1, coef_Z * scipy.special.expit(0.4 * X[:, 0] + nu) ) # Compliers when recomended C0 = np.random.binomial( 1, 0.06 * np.ones(X.shape[0]) ) # Non-compliers when not recommended T = C * Z + C0 * (1 - Z) y = true_fn(X) * T + 2 * nu + 5 * (X[:, 3] > 0) + 0.1 * np.random.uniform(0, 1, size=(n,)) return y, T, Z, X def true_heterogeneity_function(X): return 5 * X[:, 0] np.random.seed(123) y, T, Z, X = dgp(1000, 5, true_heterogeneity_function) est = SparseLinearDRIV(discrete_treatment=True, discrete_instrument=True) est.fit(Y=y, T=T, Z=Z, X=X) >>> est.effect(X[:3]) array([-4.23929..., 5.89223..., -3.01208...]) >>> est.effect_interval(X[:3]) (array([-6.99789..., 1.96351..., -5.41963...]), array([-1.48069..., 9.82096..., -0.60454...])) >>> est.coef_ array([ 4.65819..., 0.94689..., 0.18314..., 0.23012..., -0.40375...]) >>> est.coef__interval() (array([ 3.51647..., -0.20839..., -0.99568..., -0.89394..., -1.58518...]), array([5.79991..., 2.10218... , 1.36197..., 1.35420... , 0.77767...])) >>> est.intercept_ -0.06539... >>> est.intercept__interval() (-1.20716..., 1.07637...) """
[docs] def __init__(self, *, model_y_xw="auto", model_t_xw="auto", model_z_xw="auto", model_t_xwz="auto", model_tz_xw="auto", fit_cov_directly=True, flexible_model_effect="auto", prel_cate_approach="driv", prel_cv=1, prel_opt_reweighted=True, projection=False, featurizer=None, fit_cate_intercept=True, alpha='auto', n_alphas=100, alpha_cov='auto', n_alphas_cov=10, max_iter=1000, tol=1e-4, n_jobs=None, cov_clip=1e-3, opt_reweighted=False, discrete_outcome=False, discrete_instrument=False, discrete_treatment=False, treatment_featurizer=None, categories='auto', cv=2, mc_iters=None, mc_agg='mean', random_state=None, allow_missing=False, use_ray=False, ray_remote_func_options=None): self.alpha = alpha self.n_alphas = n_alphas self.alpha_cov = alpha_cov self.n_alphas_cov = n_alphas_cov self.max_iter = max_iter self.tol = tol self.n_jobs = n_jobs super().__init__(model_y_xw=model_y_xw, model_t_xw=model_t_xw, model_z_xw=model_z_xw, model_t_xwz=model_t_xwz, model_tz_xw=model_tz_xw, fit_cov_directly=fit_cov_directly, flexible_model_effect=flexible_model_effect, model_final=None, prel_cate_approach=prel_cate_approach, prel_cv=prel_cv, prel_opt_reweighted=prel_opt_reweighted, projection=projection, featurizer=featurizer, fit_cate_intercept=fit_cate_intercept, cov_clip=cov_clip, opt_reweighted=opt_reweighted, discrete_outcome=discrete_outcome, discrete_instrument=discrete_instrument, discrete_treatment=discrete_treatment, treatment_featurizer=treatment_featurizer, categories=categories, cv=cv, mc_iters=mc_iters, mc_agg=mc_agg, random_state=random_state, allow_missing=allow_missing, use_ray=use_ray, ray_remote_func_options=ray_remote_func_options )
def _gen_model_final(self): return DebiasedLasso(alpha=self.alpha, n_alphas=self.n_alphas, alpha_cov=self.alpha_cov, n_alphas_cov=self.n_alphas_cov, fit_intercept=False, max_iter=self.max_iter, tol=self.tol, n_jobs=self.n_jobs, random_state=self.random_state)
[docs] def fit(self, Y, T, *, Z, X=None, W=None, sample_weight=None, groups=None, cache_values=False, inference='auto'): """ Estimate the counterfactual model from data, i.e. estimates function :math:`\\theta(\\cdot)`. Parameters ---------- Y: (n,) vector of length n Outcomes for each sample T: (n,) vector of length n Treatments for each sample Z: (n, d_z) matrix Instruments for each sample X:(n, d_x) matrix, optional Features for each sample W:(n, d_w) matrix, optional Controls for each sample sample_weight : (n,) array_like, optional Individual weights for each sample. If None, it assumes equal weight. groups: (n,) vector, optional All rows corresponding to the same group will be kept together during splitting. If groups is not None, the `cv` argument passed to this class's initializer must support a 'groups' argument to its split method. cache_values: bool, default False Whether to cache inputs and first stage results, which will allow refitting a different final model inference: str, :class:`.Inference` instance, or None Method for performing inference. This estimator supports ``'bootstrap'`` (or an instance of :class:`.BootstrapInference`) and ``'debiasedlasso'`` (or an instance of :class:`.LinearModelInferenceDiscrete`). Returns ------- self """ # TODO: support freq_weight and sample_var in debiased lasso # Replacing fit from _OrthoLearner, to reorder arguments and improve the docstring check_high_dimensional(X, T, threshold=5, featurizer=self.featurizer, discrete_treatment=self.discrete_treatment, msg="The number of features in the final model (< 5) is too small for a sparse model. " "We recommend using the LinearDRLearner for this low-dimensional setting.") return super().fit(Y, T, X=X, W=W, Z=Z, sample_weight=sample_weight, groups=groups, cache_values=cache_values, inference=inference)
@property def fit_cate_intercept_(self): return self.ortho_learner_model_final_._fit_cate_intercept @property def bias_part_of_coef(self): return self.ortho_learner_model_final_._fit_cate_intercept @property def model_final(self): return self._gen_model_final() @model_final.setter def model_final(self, model): if model is not None: raise ValueError("Parameter `model_final` cannot be altered for this estimator!")
[docs]class ForestDRIV(ForestModelFinalCateEstimatorMixin, DRIV): """ Instance of DRIV with a :class:`~econml.grf.RegressionForest` as a final model, so as to enable non-parametric inference. Parameters ---------- model_y_xw: estimator, default ``'auto'`` Determines how to fit the outcome to the features and controls (:math:`\\E[Y | X, W]`). - If ``'auto'``, the model will be the best-fitting of a set of linear and forest models - Otherwise, see :ref:`model_selection` for the range of supported options; if a single model is specified it should be a classifier if `discrete_outcome` is True and a regressor otherwise model_t_xw: estimator, default ``'auto'`` Determines how to fit the treatment to the features and controls (:math:`\\E[T | X, W]`). - If ``'auto'``, the model will be the best-fitting of a set of linear and forest models - Otherwise, see :ref:`model_selection` for the range of supported options; if a single model is specified it should be a classifier if `discrete_treatment` is True and a regressor otherwise model_z_xw: estimator, default ``'auto'`` Determines how to fit the instrument to the features and controls (:math:`\\E[Z | X, W]`). - If ``'auto'``, the model will be the best-fitting of a set of linear and forest models - Otherwise, see :ref:`model_selection` for the range of supported options; if a single model is specified it should be a classifier if `discrete_instrument` is True and a regressor otherwise model_t_xwz: estimator, default ``'auto'`` Determines how to fit the treatment to the features, controls, and instrument (:math:`\\E[T | X, W, Z]`). - If ``'auto'``, the model will be the best-fitting of a set of linear and forest models - Otherwise, see :ref:`model_selection` for the range of supported options; if a single model is specified it should be a classifier if `discrete_treatment` is True and a regressor otherwise model_tz_xw: estimator, default ``'auto'`` Determines how to fit the covariance to the features and controls (:math:`\\E[T*Z | X, W]` or :math:`\\E[\\tilde{T}*\\tilde{Z} | X, W]` depending on `fit_cov_directly`). - If ``'auto'``, the model will be the best-fitting of a set of linear and forest models - Otherwise, see :ref:`model_selection` for the range of supported options; if a single model is specified it should be a classifier if `discrete_treatment` is True and a regressor otherwise fit_cov_directly : bool, default True Whether to fit :math:`\\E[\\tilde{T}*\\tilde{Z} | X, W]` instead of :math:`\\E[T*Z | X, W]`. Otherwise, we compute :math:`\\E[\\tilde{T}*\\tilde{Z} | X, W]` from :math:`\\E[T*Z | X, W] - \\E[T | X, W] \\E[Z | X, W]`. flexible_model_effect : estimator or 'auto' (default is 'auto') a flexible model for a preliminary version of the CATE, must accept sample_weight at fit time. If 'auto', :class:`.StatsModelsLinearRegression` will be applied. prel_cate_approach : one of {'driv', 'dmliv'}, default 'driv' model that estimates a preliminary version of the CATE. If 'driv', :class:`._DRIV` will be used. If 'dmliv', :class:`.NonParamDMLIV` will be used prel_cv : int, cross-validation generator or an iterable, default 1 Determines the cross-validation splitting strategy for the preliminary effect model. prel_opt_reweighted : bool, default True Whether to reweight the samples to minimize variance for the preliminary effect model. projection: bool, default False If True, we fit a slight variant of DRIV where we use E[T|X, W, Z] as the instrument as opposed to Z, model_z_xw will be disabled; If False, model_t_xwz will be disabled. featurizer : :term:`transformer`, optional Must support fit_transform and transform. Used to create composite features in the final CATE regression. It is ignored if X is None. The final CATE will be trained on the outcome of featurizer.fit_transform(X). If featurizer=None, then CATE is trained on X. n_estimators : int, default 100 The total number of trees in the forest. The forest consists of a forest of sqrt(n_estimators) sub-forests, where each sub-forest contains sqrt(n_estimators) trees. max_depth : int or None, optional The maximum depth of the tree. If None, then nodes are expanded until all leaves are pure or until all leaves contain less than min_samples_split samples. min_samples_split : int, float, default 2 The minimum number of splitting samples required to split an internal node. - If int, then consider `min_samples_split` as the minimum number. - If float, then `min_samples_split` is a fraction and `ceil(min_samples_split * n_samples)` are the minimum number of samples for each split. min_samples_leaf : int, float, default 1 The minimum number of samples required to be at a leaf node. A split point at any depth will only be considered if it leaves at least ``min_samples_leaf`` splitting samples in each of the left and right branches. This may have the effect of smoothing the model, especially in regression. After construction the tree is also pruned so that there are at least min_samples_leaf estimation samples on each leaf. - If int, then consider `min_samples_leaf` as the minimum number. - If float, then `min_samples_leaf` is a fraction and `ceil(min_samples_leaf * n_samples)` are the minimum number of samples for each node. min_weight_fraction_leaf : float, default 0. The minimum weighted fraction of the sum total of weights (of all splitting samples) required to be at a leaf node. Samples have equal weight when sample_weight is not provided. After construction the tree is pruned so that the fraction of the sum total weight of the estimation samples contained in each leaf node is at least min_weight_fraction_leaf max_features : int, float, str, or None, default "auto" The number of features to consider when looking for the best split: - If int, then consider `max_features` features at each split. - If float, then `max_features` is a fraction and `int(max_features * n_features)` features are considered at each split. - If "auto", then `max_features=n_features`. - If "sqrt", then `max_features=sqrt(n_features)`. - If "log2", then `max_features=log2(n_features)`. - If None, then `max_features=n_features`. Note: the search for a split does not stop until at least one valid partition of the node samples is found, even if it requires to effectively inspect more than ``max_features`` features. min_impurity_decrease : float, default 0. A node will be split if this split induces a decrease of the impurity greater than or equal to this value. The weighted impurity decrease equation is the following:: N_t / N * (impurity - N_t_R / N_t * right_impurity - N_t_L / N_t * left_impurity) where ``N`` is the total number of split samples, ``N_t`` is the number of split samples at the current node, ``N_t_L`` is the number of split samples in the left child, and ``N_t_R`` is the number of split samples in the right child. ``N``, ``N_t``, ``N_t_R`` and ``N_t_L`` all refer to the weighted sum, if ``sample_weight`` is passed. max_samples : int or float in (0, .5], default .45, The number of samples to use for each subsample that is used to train each tree: - If int, then train each tree on `max_samples` samples, sampled without replacement from all the samples - If float, then train each tree on ceil(`max_samples` * `n_samples`), sampled without replacement from all the samples. min_balancedness_tol: float in [0, .5], default .45 How imbalanced a split we can tolerate. This enforces that each split leaves at least (.5 - min_balancedness_tol) fraction of samples on each side of the split; or fraction of the total weight of samples, when sample_weight is not None. Default value, ensures that at least 5% of the parent node weight falls in each side of the split. Set it to 0.0 for no balancedness and to .5 for perfectly balanced splits. For the formal inference theory to be valid, this has to be any positive constant bounded away from zero. honest : bool, default True Whether to use honest trees, i.e. half of the samples are used for creating the tree structure and the other half for the estimation at the leafs. If False, then all samples are used for both parts. subforest_size : int, default 4, The number of trees in each sub-forest that is used in the bootstrap-of-little-bags calculation. The parameter `n_estimators` must be divisible by `subforest_size`. Should typically be a small constant. n_jobs : int or None, default -1 The number of jobs to run in parallel for both `fit` and `predict`. ``None`` means 1 unless in a :func:`joblib.parallel_backend` context. ``-1`` means using all processors. See :term:`Glossary <n_jobs>` for more details. verbose : int, default 0 Controls the verbosity when fitting and predicting. cov_clip : float, default 0.1 clipping of the covariate for regions with low "overlap", to reduce variance opt_reweighted : bool, default False Whether to reweight the samples to minimize variance. If True then model_final.fit must accept sample_weight as a kw argument. If True then assumes the model_final is flexible enough to fit the true CATE model. Otherwise, it method will return a biased projection to the model_final space, biased to give more weight on parts of the feature space where the instrument is strong. discrete_outcome: bool, default False Whether the outcome should be treated as binary discrete_instrument: bool, default False Whether the instrument values should be treated as categorical, rather than continuous, quantities discrete_treatment: bool, default False Whether the treatment values should be treated as categorical, rather than continuous, quantities treatment_featurizer : :term:`transformer`, optional Must support fit_transform and transform. Used to create composite treatment in the final CATE regression. The final CATE will be trained on the outcome of featurizer.fit_transform(T). If featurizer=None, then CATE is trained on T. categories: 'auto' or list, default 'auto' The categories to use when encoding discrete treatments (or 'auto' to use the unique sorted values). The first category will be treated as the control treatment. cv: int, cross-validation generator or an iterable, default 2 Determines the cross-validation splitting strategy. Possible inputs for cv are: - None, to use the default 3-fold cross-validation, - integer, to specify the number of folds. - :term:`CV splitter` - An iterable yielding (train, test) splits as arrays of indices. For integer/None inputs, if the treatment is discrete :class:`~sklearn.model_selection.StratifiedKFold` is used, else, :class:`~sklearn.model_selection.KFold` is used (with a random shuffle in either case). Unless an iterable is used, we call `split(concat[W, X], T)` to generate the splits. If all W, X are None, then we call `split(ones((T.shape[0], 1)), T)`. mc_iters: int, optional The number of times to rerun the first stage models to reduce the variance of the nuisances. mc_agg: {'mean', 'median'}, default 'mean' How to aggregate the nuisance value for each sample across the `mc_iters` monte carlo iterations of cross-fitting. random_state : int, RandomState instance, or None, default None If int, random_state is the seed used by the random number generator; If :class:`~numpy.random.mtrand.RandomState` instance, random_state is the random number generator; If None, the random number generator is the :class:`~numpy.random.mtrand.RandomState` instance used by :mod:`np.random<numpy.random>`. allow_missing: bool Whether to allow missing values in W. If True, will need to supply nuisance models that can handle missing values. use_ray: bool, default False Whether to use Ray to parallelize the cross-validation step. If True, Ray must be installed. ray_remote_func_options : dict, default None Options to pass to the remote function when using Ray. See https://docs.ray.io/en/latest/ray-core/api/doc/ray.remote.html Examples -------- A simple example with the default models: .. testcode:: :hide: import numpy as np import scipy.special np.set_printoptions(suppress=True) .. testcode:: from econml.iv.dr import ForestDRIV # Define the data generation functions def dgp(n, p, true_fn): X = np.random.normal(0, 1, size=(n, p)) Z = np.random.binomial(1, 0.5, size=(n,)) nu = np.random.uniform(0, 10, size=(n,)) coef_Z = 0.8 C = np.random.binomial( 1, coef_Z * scipy.special.expit(0.4 * X[:, 0] + nu) ) # Compliers when recomended C0 = np.random.binomial( 1, 0.06 * np.ones(X.shape[0]) ) # Non-compliers when not recommended T = C * Z + C0 * (1 - Z) y = true_fn(X) * T + 2 * nu + 5 * (X[:, 3] > 0) + 0.1 * np.random.uniform(0, 1, size=(n,)) return y, T, Z, X def true_heterogeneity_function(X): return 5 * X[:, 0] np.random.seed(123) y, T, Z, X = dgp(1000, 5, true_heterogeneity_function) est = ForestDRIV(discrete_treatment=True, discrete_instrument=True, random_state=42) est.fit(Y=y, T=T, Z=Z, X=X) >>> est.effect(X[:3]) array([-2.11667..., 6.31903..., -3.65700...]) >>> est.effect_interval(X[:3]) (array([-5.53359..., 2.40420..., -7.14977...]), array([ 1.30025..., 10.23385..., -0.16424...])) """
[docs] def __init__(self, *, model_y_xw="auto", model_t_xw="auto", model_z_xw="auto", model_t_xwz="auto", model_tz_xw="auto", fit_cov_directly=True, flexible_model_effect="auto", prel_cate_approach="driv", prel_cv=1, prel_opt_reweighted=True, projection=False, featurizer=None, n_estimators=1000, max_depth=None, min_samples_split=5, min_samples_leaf=5, min_weight_fraction_leaf=0., max_features="auto", min_impurity_decrease=0., max_samples=.45, min_balancedness_tol=.45, honest=True, subforest_size=4, n_jobs=-1, verbose=0, cov_clip=1e-3, opt_reweighted=False, discrete_outcome=False, discrete_instrument=False, discrete_treatment=False, treatment_featurizer=None, categories='auto', cv=2, mc_iters=None, mc_agg='mean', random_state=None, allow_missing=False, use_ray=False, ray_remote_func_options=None): self.n_estimators = n_estimators self.max_depth = max_depth self.min_samples_split = min_samples_split self.min_samples_leaf = min_samples_leaf self.min_weight_fraction_leaf = min_weight_fraction_leaf self.max_features = max_features self.min_impurity_decrease = min_impurity_decrease self.max_samples = max_samples self.min_balancedness_tol = min_balancedness_tol self.honest = honest self.subforest_size = subforest_size self.n_jobs = n_jobs self.verbose = verbose super().__init__(model_y_xw=model_y_xw, model_t_xw=model_t_xw, model_z_xw=model_z_xw, model_t_xwz=model_t_xwz, model_tz_xw=model_tz_xw, fit_cov_directly=fit_cov_directly, flexible_model_effect=flexible_model_effect, model_final=None, prel_cate_approach=prel_cate_approach, prel_cv=prel_cv, prel_opt_reweighted=prel_opt_reweighted, projection=projection, featurizer=featurizer, fit_cate_intercept=False, cov_clip=cov_clip, opt_reweighted=opt_reweighted, discrete_outcome=discrete_outcome, discrete_instrument=discrete_instrument, discrete_treatment=discrete_treatment, treatment_featurizer=treatment_featurizer, categories=categories, cv=cv, mc_iters=mc_iters, mc_agg=mc_agg, random_state=random_state, allow_missing=allow_missing, use_ray=use_ray, ray_remote_func_options=ray_remote_func_options)
def _gen_model_final(self): return RegressionForest(n_estimators=self.n_estimators, max_depth=self.max_depth, min_samples_split=self.min_samples_split, min_samples_leaf=self.min_samples_leaf, min_weight_fraction_leaf=self.min_weight_fraction_leaf, max_features=self.max_features, min_impurity_decrease=self.min_impurity_decrease, max_samples=self.max_samples, min_balancedness_tol=self.min_balancedness_tol, honest=self.honest, inference=True, subforest_size=self.subforest_size, n_jobs=self.n_jobs, random_state=self.random_state, verbose=self.verbose, warm_start=False)
[docs] def fit(self, Y, T, *, Z, X=None, W=None, sample_weight=None, groups=None, cache_values=False, inference='auto'): """ Estimate the counterfactual model from data, i.e. estimates function :math:`\\theta(\\cdot)`. Parameters ---------- Y: (n,) vector of length n Outcomes for each sample T: (n,) vector of length n Treatments for each sample Z: (n, d_z) matrix Instruments for each sample X:(n, d_x) matrix, optional Features for each sample W:(n, d_w) matrix, optional Controls for each sample sample_weight : (n,) array_like, optional Individual weights for each sample. If None, it assumes equal weight. groups: (n,) vector, optional All rows corresponding to the same group will be kept together during splitting. If groups is not None, the `cv` argument passed to this class's initializer must support a 'groups' argument to its split method. cache_values: bool, default False Whether to cache inputs and first stage results, which will allow refitting a different final model inference: str, `Inference` instance, or None Method for performing inference. This estimator supports 'bootstrap' (or an instance of :class:`.BootstrapInference`) and 'blb' (for Bootstrap-of-Little-Bags based inference) Returns ------- self """ if X is None: raise ValueError("This estimator does not support X=None!") # Replacing fit from _OrthoLearner, to reorder arguments and improve the docstring return super().fit(Y, T, X=X, W=W, Z=Z, sample_weight=sample_weight, groups=groups, cache_values=cache_values, inference=inference)
@property def model_final(self): return self._gen_model_final() @model_final.setter def model_final(self, model): if model is not None: raise ValueError("Parameter `model_final` cannot be altered for this estimator!")
class _IntentToTreatDRIVNuisanceSelector(ModelSelector): def __init__(self, model_y_xw: SingleModelSelector, model_t_xwz: SingleModelSelector, dummy_z: SingleModelSelector, prel_model_effect): self._model_y_xw = model_y_xw self._model_t_xwz = model_t_xwz self._dummy_z = dummy_z self._prel_model_effect = prel_model_effect def train(self, is_selecting, folds, Y, T, X=None, W=None, Z=None, sample_weight=None, groups=None): self._model_y_xw.train(is_selecting, folds, X=X, W=W, Target=Y, sample_weight=sample_weight, groups=groups) # concat W and Z WZ = _combine(W, Z, Y.shape[0]) self._model_t_xwz.train(is_selecting, folds, X=X, W=WZ, Target=T, sample_weight=sample_weight, groups=groups) self._dummy_z.train(is_selecting, folds, X=X, W=W, Target=Z, sample_weight=sample_weight, groups=groups) # we need to undo the one-hot encoding for calling effect, # since it expects raw values self._prel_model_effect.fit(Y, inverse_onehot(T), Z=inverse_onehot(Z), X=X, W=W, sample_weight=sample_weight, groups=groups) return self def score(self, Y, T, X=None, W=None, Z=None, sample_weight=None, groups=None): if hasattr(self._model_y_xw, 'score'): Y_X_score = self._model_y_xw.score(X=X, W=W, Target=Y, sample_weight=sample_weight) else: Y_X_score = None if hasattr(self._model_t_xwz, 'score'): # concat W and Z WZ = _combine(W, Z, Y.shape[0]) T_XZ_score = self._model_t_xwz.score(X=X, W=WZ, Target=T, sample_weight=sample_weight) else: T_XZ_score = None if hasattr(self._prel_model_effect, 'score'): # we need to undo the one-hot encoding for calling effect, # since it expects raw values effect_score = self._prel_model_effect.score(Y, inverse_onehot(T), inverse_onehot(Z), X=X, W=W, sample_weight=sample_weight) else: effect_score = None return Y_X_score, T_XZ_score, effect_score def predict(self, Y, T, X=None, W=None, Z=None, sample_weight=None, groups=None): Y_pred = self._model_y_xw.predict(X, W) T_pred_zero = self._model_t_xwz.predict(X, _combine(W, np.zeros(Z.shape), Y.shape[0])) T_pred_one = self._model_t_xwz.predict(X, _combine(W, np.ones(Z.shape), Y.shape[0])) Z_pred = self._dummy_z.predict(X, W) prel_theta = self._prel_model_effect.effect(X) if X is None: # In this case predict above returns a single row prel_theta = np.tile(prel_theta.reshape(1, -1), (T.shape[0], 1)) if W is None: Y_pred = np.tile(Y_pred.reshape(1, -1), (Y.shape[0], 1)) Z_pred = np.tile(Z_pred.reshape(1, -1), (Z.shape[0], 1)) # reshape the predictions Y_pred = Y_pred.reshape(Y.shape) Z_pred = Z_pred.reshape(Z.shape) T_pred_one = T_pred_one.reshape(T.shape) T_pred_zero = T_pred_zero.reshape(T.shape) # T_res, Z_res, beta expect shape to be (n,1) beta = Z_pred * (1 - Z_pred) * (T_pred_one - T_pred_zero) T_pred = T_pred_one * Z_pred + T_pred_zero * (1 - Z_pred) Y_res = Y - Y_pred T_res = T - T_pred Z_res = Z - Z_pred return prel_theta, Y_res, T_res, Z_res, beta @property def needs_fit(self): return self._model_y_xw.needs_fit or self._model_t_xwz.needs_fit or self._dummy_z.needs_fit class _DummyClassifier: """ A dummy classifier that always returns the prior ratio Parameters ---------- ratio: float The ratio of treatment samples """ def __init__(self, *, ratio): self.ratio = ratio def fit(self, X, y, **kwargs): return self def predict_proba(self, X): n = X.shape[0] return np.hstack((np.tile(1 - self.ratio, (n, 1)), np.tile(self.ratio, (n, 1)))) class _IntentToTreatDRIV(_BaseDRIV): """ Base class for the DRIV algorithm for the intent-to-treat A/B test setting """ def __init__(self, *, model_y_xw="auto", model_t_xwz="auto", prel_model_effect, model_final, z_propensity="auto", featurizer=None, fit_cate_intercept=False, discrete_outcome=False, cov_clip=1e-3, opt_reweighted=False, categories='auto', cv=3, mc_iters=None, mc_agg='mean', random_state=None, allow_missing=False, use_ray=False, ray_remote_func_options=None): self.model_y_xw = clone(model_y_xw, safe=False) self.model_t_xwz = clone(model_t_xwz, safe=False) self.prel_model_effect = clone(prel_model_effect, safe=False) self.z_propensity = z_propensity super().__init__(model_final=model_final, featurizer=featurizer, fit_cate_intercept=fit_cate_intercept, discrete_outcome=discrete_outcome, cov_clip=cov_clip, cv=cv, mc_iters=mc_iters, mc_agg=mc_agg, discrete_instrument=True, discrete_treatment=True, categories=categories, opt_reweighted=opt_reweighted, random_state=random_state, allow_missing=allow_missing, use_ray=use_ray, ray_remote_func_options=ray_remote_func_options) def _gen_prel_model_effect(self): return clone(self.prel_model_effect, safe=False) def _gen_ortho_learner_model_nuisance(self): model_y_xw = _make_first_stage_selector(self.model_y_xw, is_discrete=self.discrete_outcome, random_state=self.random_state) model_t_xwz = _make_first_stage_selector(self.model_t_xwz, is_discrete=True, random_state=self.random_state) if self.z_propensity == "auto": dummy_z = DummyClassifier(strategy="prior") elif isinstance(self.z_propensity, float): dummy_z = _DummyClassifier(ratio=self.z_propensity) else: raise ValueError("Only 'auto' or float is allowed!") dummy_z = _make_first_stage_selector(dummy_z, is_discrete=True, random_state=self.random_state) return _IntentToTreatDRIVNuisanceSelector(model_y_xw, model_t_xwz, dummy_z, self._gen_prel_model_effect()) class _DummyCATE: """ A dummy cate effect model that always returns zero effect """ def __init__(self): return def fit(self, y, T, *, Z, X=None, W=None, sample_weight=None, groups=None, **kwargs): return self def effect(self, X): if X is None: return np.zeros(1) return np.zeros(X.shape[0])
[docs]class IntentToTreatDRIV(_IntentToTreatDRIV): """ Implements the DRIV algorithm for the intent-to-treat A/B test setting Parameters ---------- model_y_xw: estimator, default ``'auto'`` Determines how to fit the outcome to the features and controls (:math:`\\E[Y | X, W]`). - If ``'auto'``, the model will be the best-fitting of a set of linear and forest models - Otherwise, see :ref:`model_selection` for the range of supported options; if a single model is specified it should be a classifier if `discrete_outcome` is True and a regressor otherwise model_t_xwz: estimator, default ``'auto'`` Determines how to fit the treatment to the features, controls, and instrument (:math:`\\E[T | X, W, Z]`). - If ``'auto'``, the model will be the best-fitting of a set of linear and forest models - Otherwise, see :ref:`model_selection` for the range of supported options; if a single model is specified it should be a classifier if `discrete_treatment` is True and a regressor otherwise flexible_model_effect : estimator or 'auto' (default is 'auto') a flexible model for a preliminary version of the CATE, must accept sample_weight at fit time. If 'auto', :class:`.StatsModelsLinearRegression` will be applied. model_final : estimator, optional a final model for the CATE and projections. If None, then flexible_model_effect is also used as a final model prel_cate_approach : one of {'driv', 'dmliv'}, default 'driv' model that estimates a preliminary version of the CATE. If 'driv', :class:`._DRIV` will be used. If 'dmliv', :class:`.NonParamDMLIV` will be used prel_cv : int, cross-validation generator or an iterable, default 1 Determines the cross-validation splitting strategy for the preliminary effect model. prel_opt_reweighted : bool, default True Whether to reweight the samples to minimize variance for the preliminary effect model. z_propensity: float or "auto", default "auto" The ratio of the A/B test in treatment group. If "auto", we assume that the instrument is fully randomized and independent of any other variables. It's calculated as the proportion of Z=1 in the overall population; If input a ratio, it has to be a float between 0 to 1. featurizer : :term:`transformer`, optional Must support fit_transform and transform. Used to create composite features in the final CATE regression. It is ignored if X is None. The final CATE will be trained on the outcome of featurizer.fit_transform(X). If featurizer=None, then CATE is trained on X. fit_cate_intercept : bool, default False Whether the linear CATE model should have a constant term. cov_clip : float, default 0.1 clipping of the covariate for regions with low "overlap", to reduce variance cv: int, cross-validation generator or an iterable, default 3 Determines the cross-validation splitting strategy. Possible inputs for cv are: - None, to use the default 3-fold cross-validation, - integer, to specify the number of folds. - :term:`CV splitter` - An iterable yielding (train, test) splits as arrays of indices. For integer/None inputs, if the treatment is discrete :class:`~sklearn.model_selection.StratifiedKFold` is used, else, :class:`~sklearn.model_selection.KFold` is used (with a random shuffle in either case). Unless an iterable is used, we call `split(concat[W, X], T)` to generate the splits. If all W, X are None, then we call `split(ones((T.shape[0], 1)), T)`. mc_iters: int, optional The number of times to rerun the first stage models to reduce the variance of the nuisances. mc_agg: {'mean', 'median'}, default 'mean' How to aggregate the nuisance value for each sample across the `mc_iters` monte carlo iterations of cross-fitting. opt_reweighted : bool, default False Whether to reweight the samples to minimize variance. If True then final_model_effect.fit must accept sample_weight as a kw argument (WeightWrapper from utilities can be used for any linear model to enable sample_weights). If True then assumes the final_model_effect is flexible enough to fit the true CATE model. Otherwise, it method will return a biased projection to the model_effect space, biased to give more weight on parts of the feature space where the instrument is strong. categories: 'auto' or list, default 'auto' The categories to use when encoding discrete treatments (or 'auto' to use the unique sorted values). The first category will be treated as the control treatment. random_state : int, RandomState instance, or None, default None If int, random_state is the seed used by the random number generator; If :class:`~numpy.random.mtrand.RandomState` instance, random_state is the random number generator; If None, the random number generator is the :class:`~numpy.random.mtrand.RandomState` instance used by :mod:`np.random<numpy.random>`. allow_missing: bool Whether to allow missing values in W. If True, will need to supply nuisance models that can handle missing values. use_ray: bool, default False Whether to use Ray to parallelize the cross-validation step. If True, Ray must be installed. ray_remote_func_options : dict, default None Options to pass to the remote function when using Ray. See https://docs.ray.io/en/latest/ray-core/api/doc/ray.remote.html Examples -------- A simple example with the default models: .. testcode:: :hide: import numpy as np import scipy.special np.set_printoptions(suppress=True) .. testcode:: from econml.iv.dr import IntentToTreatDRIV # Define the data generation functions def dgp(n, p, true_fn): X = np.random.normal(0, 1, size=(n, p)) Z = np.random.binomial(1, 0.5, size=(n,)) nu = np.random.uniform(0, 10, size=(n,)) coef_Z = 0.8 C = np.random.binomial( 1, coef_Z * scipy.special.expit(0.4 * X[:, 0] + nu) ) # Compliers when recomended C0 = np.random.binomial( 1, 0.06 * np.ones(X.shape[0]) ) # Non-compliers when not recommended T = C * Z + C0 * (1 - Z) y = true_fn(X) * T + 2 * nu + 5 * (X[:, 3] > 0) + 0.1 * np.random.uniform(0, 1, size=(n,)) return y, T, Z, X def true_heterogeneity_function(X): return 5 * X[:, 0] np.random.seed(123) y, T, Z, X = dgp(1000, 5, true_heterogeneity_function) est = IntentToTreatDRIV() est.fit(Y=y, T=T, Z=Z, X=X) >>> est.effect(X[:3]) array([-4.52641..., 6.38726..., -2.67055...]) """
[docs] def __init__(self, *, model_y_xw="auto", model_t_xwz="auto", prel_cate_approach="driv", flexible_model_effect="auto", model_final=None, prel_cv=1, prel_opt_reweighted=True, z_propensity="auto", featurizer=None, fit_cate_intercept=False, discrete_outcome=False, cov_clip=1e-3, cv=3, mc_iters=None, mc_agg='mean', opt_reweighted=False, categories='auto', random_state=None, allow_missing=False, use_ray=False, ray_remote_func_options=None): # maybe shouldn't expose fit_cate_intercept in this class? if flexible_model_effect == "auto": self.flexible_model_effect = StatsModelsLinearRegression(fit_intercept=False) else: self.flexible_model_effect = clone(flexible_model_effect, safe=False) self.prel_cate_approach = prel_cate_approach self.prel_cv = prel_cv self.prel_opt_reweighted = prel_opt_reweighted super().__init__(model_y_xw=model_y_xw, model_t_xwz=model_t_xwz, prel_model_effect=self.prel_cate_approach, model_final=model_final, z_propensity=z_propensity, featurizer=featurizer, fit_cate_intercept=fit_cate_intercept, discrete_outcome=discrete_outcome, cov_clip=cov_clip, opt_reweighted=opt_reweighted, categories=categories, cv=cv, mc_iters=mc_iters, mc_agg=mc_agg, random_state=random_state, allow_missing=allow_missing, use_ray=use_ray, ray_remote_func_options=ray_remote_func_options)
def _gen_model_final(self): if self.model_final is None: return clone(self.flexible_model_effect, safe=False) return clone(self.model_final, safe=False) def _gen_prel_model_effect(self): if self.prel_cate_approach == "driv": return _IntentToTreatDRIV(model_y_xw=clone(self.model_y_xw, safe=False), model_t_xwz=clone(self.model_t_xwz, safe=False), prel_model_effect=_DummyCATE(), model_final=clone(self.flexible_model_effect, safe=False), featurizer=self._gen_featurizer(), fit_cate_intercept=self.fit_cate_intercept, cov_clip=self.cov_clip, categories=self.categories, opt_reweighted=self.prel_opt_reweighted, cv=self.prel_cv, random_state=self.random_state, allow_missing=self.allow_missing) elif self.prel_cate_approach == "dmliv": return NonParamDMLIV(model_y_xw=clone(self.model_y_xw, safe=False), model_t_xw=clone(self.model_t_xwz, safe=False), model_t_xwz=clone(self.model_t_xwz, safe=False), model_final=clone(self.flexible_model_effect, safe=False), discrete_instrument=True, discrete_treatment=True, featurizer=self._gen_featurizer(), categories=self.categories, cv=self.prel_cv, mc_iters=self.mc_iters, mc_agg=self.mc_agg, random_state=self.random_state, allow_missing=self.allow_missing) else: raise ValueError( "We only support 'dmliv' or 'driv' preliminary model effect, " f"but received '{self.prel_cate_approach}'!") @property def models_y_xw(self): """ Get the fitted models for :math:`\\E[Y | X]`. Returns ------- models_y_xw: nested list of objects of type(`model_y_xw`) A nested list of instances of the `model_y_xw` object. Number of sublist equals to number of monte carlo iterations, each element in the sublist corresponds to a crossfitting fold and is the model instance that was fitted for that training fold. """ return [[mdl._model_y_xw._model for mdl in mdls] for mdls in super().models_nuisance_] @property def models_t_xwz(self): """ Get the fitted models for :math:`\\E[T | X, Z]`. Returns ------- models_t_xwz: nested list of objects of type(`model_t_xwz`) A nested list of instances of the `model_t_xwz` object. Number of sublist equals to number of monte carlo iterations, each element in the sublist corresponds to a crossfitting fold and is the model instance that was fitted for that training fold. """ return [[mdl._model_t_xwz._model for mdl in mdls] for mdls in super().models_nuisance_] @property def models_prel_model_effect(self): """ Get the fitted preliminary CATE estimator. Returns ------- prel_model_effect: nested list of objects of type(`prel_model_effect`) A nested list of instances of the `prel_model_effect` object. Number of sublist equals to number of monte carlo iterations, each element in the sublist corresponds to a crossfitting fold and is the model instance that was fitted for that training fold. """ return [[mdl._prel_model_effect for mdl in mdls] for mdls in super().models_nuisance_] @property def nuisance_scores_y_xw(self): """ Get the scores for y_xw model on the out-of-sample training data """ return self.nuisance_scores_[0] @property def nuisance_scores_t_xwz(self): """ Get the scores for t_xw model on the out-of-sample training data """ return self.nuisance_scores_[1] @property def nuisance_scores_prel_model_effect(self): """ Get the scores for prel_model_effect model on the out-of-sample training data """ return self.nuisance_scores_[2]
[docs]class LinearIntentToTreatDRIV(StatsModelsCateEstimatorMixin, IntentToTreatDRIV): """ Implements the DRIV algorithm for the Linear Intent-to-Treat A/B test setting Parameters ---------- model_y_xw: estimator, default ``'auto'`` Determines how to fit the outcome to the features and controls (:math:`\\E[Y | X, W]`). - If ``'auto'``, the model will be the best-fitting of a set of linear and forest models - Otherwise, see :ref:`model_selection` for the range of supported options; if a single model is specified it should be a classifier if `discrete_outcome` is True and a regressor otherwise model_t_xwz: estimator, default ``'auto'`` Determines how to fit the treatment to the features, controls, and instrument (:math:`\\E[T | X, W, Z]`). - If ``'auto'``, the model will be the best-fitting of a set of linear and forest models - Otherwise, see :ref:`model_selection` for the range of supported options; if a single model is specified it should be a classifier if `discrete_treatment` is True and a regressor otherwise flexible_model_effect : estimator or 'auto' (default is 'auto') a flexible model for a preliminary version of the CATE, must accept sample_weight at fit time. If 'auto', :class:`.StatsModelsLinearRegression` will be applied. prel_cate_approach : one of {'driv', 'dmliv'}, default 'driv' model that estimates a preliminary version of the CATE. If 'driv', :class:`._DRIV` will be used. If 'dmliv', :class:`.NonParamDMLIV` will be used prel_cv : int, cross-validation generator or an iterable, default 1 Determines the cross-validation splitting strategy for the preliminary effect model. prel_opt_reweighted : bool, default True Whether to reweight the samples to minimize variance for the preliminary effect model. z_propensity: float or "auto", default "auto" The ratio of the A/B test in treatment group. If "auto", we assume that the instrument is fully randomized and independent of any other variables. It's calculated as the proportion of Z=1 in the overall population; If input a ratio, it has to be a float between 0 to 1. featurizer : :term:`transformer`, optional Must support fit_transform and transform. Used to create composite features in the final CATE regression. It is ignored if X is None. The final CATE will be trained on the outcome of featurizer.fit_transform(X). If featurizer=None, then CATE is trained on X. fit_cate_intercept : bool, default True Whether the linear CATE model should have a constant term. cov_clip : float, default 0.1 clipping of the covariate for regions with low "overlap", to reduce variance cv: int, cross-validation generator or an iterable, default 3 Determines the cross-validation splitting strategy. Possible inputs for cv are: - None, to use the default 3-fold cross-validation, - integer, to specify the number of folds. - :term:`CV splitter` - An iterable yielding (train, test) splits as arrays of indices. For integer/None inputs, if the treatment is discrete :class:`~sklearn.model_selection.StratifiedKFold` is used, else, :class:`~sklearn.model_selection.KFold` is used (with a random shuffle in either case). Unless an iterable is used, we call `split(concat[W, X], T)` to generate the splits. If all W, X are None, then we call `split(ones((T.shape[0], 1)), T)`. mc_iters: int, optional The number of times to rerun the first stage models to reduce the variance of the nuisances. mc_agg: {'mean', 'median'}, default 'mean' How to aggregate the nuisance value for each sample across the `mc_iters` monte carlo iterations of cross-fitting. opt_reweighted : bool, default False Whether to reweight the samples to minimize variance. If True then final_model_effect.fit must accept sample_weight as a kw argument (WeightWrapper from utilities can be used for any linear model to enable sample_weights). If True then assumes the final_model_effect is flexible enough to fit the true CATE model. Otherwise, it method will return a biased projection to the model_effect space, biased to give more weight on parts of the feature space where the instrument is strong. categories: 'auto' or list, default 'auto' The categories to use when encoding discrete treatments (or 'auto' to use the unique sorted values). The first category will be treated as the control treatment. random_state : int, RandomState instance, or None, default None If int, random_state is the seed used by the random number generator; If :class:`~numpy.random.mtrand.RandomState` instance, random_state is the random number generator; If None, the random number generator is the :class:`~numpy.random.mtrand.RandomState` instance used by :mod:`np.random<numpy.random>`. allow_missing: bool Whether to allow missing values in W. If True, will need to supply nuisance models that can handle missing values. enable_federation: bool, default False Whether to enable federation for the final model. This has a memory cost so should be enabled only if this model will be aggregated with other models. use_ray: bool, default False Whether to use Ray to parallelize the cross-validation step. If True, Ray must be installed. ray_remote_func_options : dict, default None Options to pass to the remote function when using Ray. See https://docs.ray.io/en/latest/ray-core/api/doc/ray.remote.html Examples -------- A simple example with the default models: .. testcode:: :hide: import numpy as np import scipy.special np.set_printoptions(suppress=True) .. testcode:: from econml.iv.dr import LinearIntentToTreatDRIV # Define the data generation functions def dgp(n, p, true_fn): X = np.random.normal(0, 1, size=(n, p)) Z = np.random.binomial(1, 0.5, size=(n,)) nu = np.random.uniform(0, 10, size=(n,)) coef_Z = 0.8 C = np.random.binomial( 1, coef_Z * scipy.special.expit(0.4 * X[:, 0] + nu) ) # Compliers when recomended C0 = np.random.binomial( 1, 0.06 * np.ones(X.shape[0]) ) # Non-compliers when not recommended T = C * Z + C0 * (1 - Z) y = true_fn(X) * T + 2 * nu + 5 * (X[:, 3] > 0) + 0.1 * np.random.uniform(0, 1, size=(n,)) return y, T, Z, X def true_heterogeneity_function(X): return 5 * X[:, 0] np.random.seed(123) y, T, Z, X = dgp(1000, 5, true_heterogeneity_function) est = LinearIntentToTreatDRIV() est.fit(Y=y, T=T, Z=Z, X=X) >>> est.effect(X[:3]) array([-4.80489..., 6.10521... , -2.94904...]) >>> est.effect_interval(X[:3]) (array([-9.20176..., -0.47031... , -6.67354...]), array([-0.40802..., 12.68073..., 0.77546...])) >>> est.coef_ array([ 5.52418..., 0.96276..., 0.68158..., -0.16803..., -0.13056...]) >>> est.coef__interval() (array([ 3.61373..., -0.81856..., -1.12589..., -1.90193... , -1.92331...]), array([7.43462..., 2.74409... , 2.48906..., 1.56587..., 1.66218...])) >>> est.intercept_ -0.28940... >>> est.intercept__interval() (-2.07653..., 1.49771...) """
[docs] def __init__(self, *, model_y_xw="auto", model_t_xwz="auto", prel_cate_approach="driv", flexible_model_effect="auto", prel_cv=1, prel_opt_reweighted=True, z_propensity="auto", featurizer=None, fit_cate_intercept=True, discrete_outcome=False, cov_clip=1e-3, cv=3, mc_iters=None, mc_agg='mean', opt_reweighted=False, categories='auto', random_state=None, allow_missing=False, enable_federation=False, use_ray=False, ray_remote_func_options=None): super().__init__(model_y_xw=model_y_xw, model_t_xwz=model_t_xwz, flexible_model_effect=flexible_model_effect, model_final=None, prel_cate_approach=prel_cate_approach, prel_cv=prel_cv, prel_opt_reweighted=prel_opt_reweighted, z_propensity=z_propensity, featurizer=featurizer, fit_cate_intercept=fit_cate_intercept, discrete_outcome=discrete_outcome, cov_clip=cov_clip, cv=cv, mc_iters=mc_iters, mc_agg=mc_agg, opt_reweighted=opt_reweighted, categories=categories, random_state=random_state, allow_missing=allow_missing, use_ray=use_ray, ray_remote_func_options=ray_remote_func_options) self.enable_federation = enable_federation
def _gen_model_final(self): return StatsModelsLinearRegression(fit_intercept=False, enable_federation=self.enable_federation) # override only so that we can update the docstring to indicate support for `StatsModelsInference`
[docs] def fit(self, Y, T, *, Z, X=None, W=None, sample_weight=None, freq_weight=None, sample_var=None, groups=None, cache_values=False, inference='auto'): """ Estimate the counterfactual model from data, i.e. estimates function :math:`\\theta(\\cdot)`. Parameters ---------- Y: (n, d_y) matrix or vector of length n Outcomes for each sample T: (n, d_t) matrix or vector of length n Treatments for each sample Z: (n, d_z) matrix or vector of length n Instruments for each sample X:(n, d_x) matrix, optional Features for each sample W:(n, d_w) matrix, optional Controls for each sample sample_weight : (n,) array_like or None Individual weights for each sample. If None, it assumes equal weight. freq_weight: (n,) array_like of int, optional Weight for the observation. Observation i is treated as the mean outcome of freq_weight[i] independent observations. When ``sample_var`` is not None, this should be provided. sample_var : (n,) nd array_like, optional Variance of the outcome(s) of the original freq_weight[i] observations that were used to compute the mean outcome represented by observation i. groups: (n,) vector, optional All rows corresponding to the same group will be kept together during splitting. If groups is not None, the `cv` argument passed to this class's initializer must support a 'groups' argument to its split method. cache_values: bool, default False Whether to cache inputs and first stage results, which will allow refitting a different final model inference: str,:class:`.Inference` instance, or None Method for performing inference. This estimator supports 'bootstrap' (or an instance of:class:`.BootstrapInference`) and 'statsmodels' (or an instance of :class:`.StatsModelsInference`). Returns ------- self : instance """ # TODO: do correct adjustment for sample_var return super().fit(Y, T, Z=Z, X=X, W=W, sample_weight=sample_weight, freq_weight=freq_weight, sample_var=sample_var, groups=groups, cache_values=cache_values, inference=inference)
@property def fit_cate_intercept_(self): return self.ortho_learner_model_final_._fit_cate_intercept @property def bias_part_of_coef(self): return self.ortho_learner_model_final_._fit_cate_intercept @property def model_final(self): return self._gen_model_final() @model_final.setter def model_final(self, value): if value is not None: raise ValueError("Parameter `model_final` cannot be altered for this estimator.")