Source code for econml._cate_estimator

# Copyright (c) PyWhy contributors. All rights reserved.
# Licensed under the MIT License.

"""Base classes for all CATE estimators."""

import abc
import inspect
import numpy as np
from functools import wraps
from copy import deepcopy
from warnings import warn
from .inference import BootstrapInference
from .utilities import (tensordot, ndim, reshape, shape, parse_final_model_params, get_feature_names_or_default,
                        inverse_onehot, Summary, get_input_columns, check_input_arrays, jacify_featurizer)
from .inference import StatsModelsInference, StatsModelsInferenceDiscrete, LinearModelFinalInference, \
    LinearModelFinalInferenceDiscrete, NormalInferenceResults, GenericSingleTreatmentModelFinalInference, \
    GenericModelFinalInferenceDiscrete
from ._shap import _shap_explain_cme, _shap_explain_joint_linear_model_cate
from .dowhy import DoWhyWrapper


[docs]class BaseCateEstimator(metaclass=abc.ABCMeta): """Base class for all CATE estimators in this package.""" def _get_inference_options(self): """ Produce a dictionary mapping string names to :class:`.Inference` types. This is used by the :meth:`fit` method when a str is passed rather than an :class:`.Inference` type. """ return {'bootstrap': BootstrapInference} def _get_inference(self, inference): options = self._get_inference_options() if isinstance(inference, str): if inference in options: inference = options[inference]() else: raise ValueError("Inference option '%s' not recognized; valid values are %s" % (inference, [*options])) # since inference objects can be stateful, we must copy it before fitting; # otherwise this sequence wouldn't work: # est1.fit(..., inference=inf) # est2.fit(..., inference=inf) # est1.effect_interval(...) # because inf now stores state from fitting est2 return deepcopy(inference) def _set_input_names(self, Y, T, X, set_flag=False): """Set input column names if inputs have column metadata.""" self._input_names = { "feature_names": get_input_columns(X, prefix="X"), "output_names": get_input_columns(Y, prefix="Y"), "treatment_names": get_input_columns(T, prefix="T") } if set_flag: # This flag is true when names are set in a child class instead # If names are set in a child class, add an attribute reflecting that self._input_names_set = True def _strata(self, Y, T, *args, **kwargs): """ Get an array of values representing strata that should be preserved by bootstrapping. For example, if treatment is discrete, then each bootstrapped estimator needs to be given at least one instance with each treatment type. For estimators like DRIV, then the same is true of the combination of treatment and instrument. The arguments to this method will match those to fit. Returns ------- strata : array or None A vector with the same number of rows as the inputs, where the unique values represent the strata that need to be preserved by bootstrapping, or None if no preservation is necessary. """ return None def _prefit(self, Y, T, *args, **kwargs): self._d_y = np.shape(Y)[1:] self._d_t = np.shape(T)[1:] # This works only if X is passed as a kwarg # We plan to enforce X as kwarg only in future releases if not hasattr(self, "_input_names_set") or not self._input_names_set: # This checks if names have been set in a child class # If names were set in a child class, don't do it again X = kwargs.get('X') self._set_input_names(Y, T, X) def _postfit(self, Y, T, *args, **kwargs): # Wraps-up fit by setting attributes, cleaning up, etc. pass
[docs] @abc.abstractmethod def fit(self, *args, inference=None, **kwargs): """ Estimate the counterfactual model from data, i.e. estimates functions :math:`\\tau(X, T0, T1)`, :math:`\\partial \\tau(T, X)`. Note that the signature of this method may vary in subclasses (e.g. classes that don't support instruments will not allow a `Z` argument) Parameters ---------- Y: (n, d_y) matrix or vector of length n Outcomes for each sample T: (n, d_t) matrix or vector of length n Treatments for each sample X: (n, d_x) matrix, optional Features for each sample W: (n, d_w) matrix, optional Controls for each sample Z: (n, d_z) matrix, optional Instruments for each sample inference: str or :class:`.Inference` instance, optional Method for performing inference. All estimators support ``'bootstrap'`` (or an instance of :class:`.BootstrapInference`), some support other methods as well. Returns ------- self """ raise NotImplementedError("Abstract method")
def _wrap_fit(m): @wraps(m) def call(self, Y, T, *args, inference=None, **kwargs): inference = self._get_inference(inference) self._prefit(Y, T, *args, **kwargs) if inference is not None: inference.prefit(self, Y, T, *args, **kwargs) # call the wrapped fit method m(self, Y, T, *args, **kwargs) self._postfit(Y, T, *args, **kwargs) if inference is not None: # NOTE: we call inference fit *after* calling the main fit method inference.fit(self, Y, T, *args, **kwargs) self._inference = inference return self return call
[docs] @abc.abstractmethod def effect(self, X=None, *, T0, T1): """ Calculate the heterogeneous treatment effect :math:`\\tau(X, T0, T1)`. The effect is calculated between the two treatment points conditional on a vector of features on a set of m test samples :math:`\\{T0_i, T1_i, X_i\\}`. Parameters ---------- T0: (m, d_t) matrix or vector of length m Base treatments for each sample T1: (m, d_t) matrix or vector of length m Target treatments for each sample X: (m, d_x) matrix, optional Features for each sample Returns ------- τ: (m, d_y) matrix Heterogeneous treatment effects on each outcome for each sample Note that when Y is a vector rather than a 2-dimensional array, the corresponding singleton dimension will be collapsed (so this method will return a vector) """ raise NotImplementedError("Abstract method")
[docs] @abc.abstractmethod def marginal_effect(self, T, X=None): """ Calculate the heterogeneous marginal effect :math:`\\partial\\tau(T, X)`. The marginal effect is calculated around a base treatment point conditional on a vector of features on a set of m test samples :math:`\\{T_i, X_i\\}`. Parameters ---------- T: (m, d_t) matrix Base treatments for each sample X: (m, d_x) matrix, optional Features for each sample Returns ------- grad_tau: (m, d_y, d_t) array Heterogeneous marginal effects on each outcome for each sample Note that when Y or T is a vector rather than a 2-dimensional array, the corresponding singleton dimensions in the output will be collapsed (e.g. if both are vectors, then the output of this method will also be a vector) """ raise NotImplementedError("Abstract method")
[docs] def ate(self, X=None, *, T0, T1): """ Calculate the average treatment effect :math:`E_X[\\tau(X, T0, T1)]`. The effect is calculated between the two treatment points and is averaged over the population of X variables. Parameters ---------- T0: (m, d_t) matrix or vector of length m Base treatments for each sample T1: (m, d_t) matrix or vector of length m Target treatments for each sample X: (m, d_x) matrix, optional Features for each sample Returns ------- τ: float or (d_y,) array Average treatment effects on each outcome Note that when Y is a vector rather than a 2-dimensional array, the result will be a scalar """ return np.mean(self.effect(X=X, T0=T0, T1=T1), axis=0)
[docs] def cate_feature_names(self, feature_names=None): """Public interface for getting feature names. To be overriden by estimators that apply transformations the input features. Parameters ---------- feature_names: list of str of length X.shape[1] or None The names of the input features. If None and X is a dataframe, it defaults to the column names from the dataframe. Returns ------- out_feature_names: list of str or None Returns feature names. """ if feature_names is not None: return feature_names if hasattr(self, "_input_names"): return self._input_names["feature_names"] return None
[docs] def cate_output_names(self, output_names=None): """ Public interface for getting output names. To be overriden by estimators that apply transformations the outputs. Parameters ---------- output_names: list of str of length Y.shape[1] or None The names of the outcomes. If None and the Y passed to fit was a dataframe, it defaults to the column names from the dataframe. Returns ------- output_names: list of str Returns output names. """ if output_names is not None: return output_names if hasattr(self, "_input_names"): return self._input_names["output_names"] return None
[docs] def cate_treatment_names(self, treatment_names=None): """ Public interface for getting treatment names. To be overriden by estimators that apply transformations the treatments. Parameters ---------- treatment_names: list of str of length T.shape[1] or None The names of the treatments. If None and the T passed to fit was a dataframe, it defaults to the column names from the dataframe. Returns ------- treatment_names: list of str Returns treatment names. """ if treatment_names is not None: return treatment_names if hasattr(self, "_input_names"): return self._input_names["treatment_names"] return None
[docs] def marginal_ate(self, T, X=None): """ Calculate the average marginal effect :math:`E_{T, X}[\\partial\\tau(T, X)]`. The marginal effect is calculated around a base treatment point and averaged over the population of X. Parameters ---------- T: (m, d_t) matrix Base treatments for each sample X: (m, d_x) matrix, optional Features for each sample Returns ------- grad_tau: (d_y, d_t) array Average marginal effects on each outcome Note that when Y or T is a vector rather than a 2-dimensional array, the corresponding singleton dimensions in the output will be collapsed (e.g. if both are vectors, then the output of this method will be a scalar) """ return np.mean(self.marginal_effect(T, X=X), axis=0)
def _expand_treatments(self, X=None, *Ts): """ Given a set of features and treatments, return possibly modified features and treatments. Parameters ---------- X: (m, d_x) matrix, optional Features for each sample, or None Ts: sequence of (m, d_t) matrices Base treatments for each sample Returns ------- output : tuple (X',T0',T1',...) """ return (X,) + Ts def _use_inference_method(self, name, *args, **kwargs): if self._inference is not None: return getattr(self._inference, name)(*args, **kwargs) else: raise AttributeError("Can't call '%s' because 'inference' is None" % name) def _defer_to_inference(m): @wraps(m) def call(self, *args, **kwargs): # apply defaults before calling inference method bound_args = inspect.signature(m).bind(self, *args, **kwargs) bound_args.apply_defaults() args = bound_args.args[1:] # remove self kwargs = bound_args.kwargs return self._use_inference_method(m.__name__, *args, **kwargs) return call
[docs] @_defer_to_inference def effect_interval(self, X=None, *, T0=0, T1=1, alpha=0.05): """ Confidence intervals for the quantities :math:`\\tau(X, T0, T1)` produced by the model. Available only when ``inference`` is not ``None``, when calling the fit method. Parameters ---------- X: (m, d_x) matrix, optional Features for each sample T0: (m, d_t) matrix or vector of length m, default 0 Base treatments for each sample T1: (m, d_t) matrix or vector of length m, default 1 Target treatments for each sample alpha: float in [0, 1], default 0.05 The overall level of confidence of the reported interval. The alpha/2, 1-alpha/2 confidence interval is reported. Returns ------- lower, upper : tuple(type of :meth:`effect(X, T0, T1)<effect>`, type of :meth:`effect(X, T0, T1))<effect>` ) The lower and the upper bounds of the confidence interval for each quantity. """ raise NotImplementedError("Defer to inference")
[docs] @_defer_to_inference def effect_inference(self, X=None, *, T0=0, T1=1): """ Inference results for the quantities :math:`\\tau(X, T0, T1)` produced by the model. Available only when ``inference`` is not ``None``, when calling the fit method. Parameters ---------- X: (m, d_x) matrix, optional Features for each sample T0: (m, d_t) matrix or vector of length m, default 0 Base treatments for each sample T1: (m, d_t) matrix or vector of length m, default 1 Target treatments for each sample Returns ------- InferenceResults: object The inference results instance contains prediction and prediction standard error and can on demand calculate confidence interval, z statistic and p value. It can also output a dataframe summary of these inference results. """ raise NotImplementedError("Defer to inference")
[docs] @_defer_to_inference def marginal_effect_interval(self, T, X=None, *, alpha=0.05): """ Confidence intervals for the quantities :math:`\\partial \\tau(T, X)` produced by the model. Available only when ``inference`` is not ``None``, when calling the fit method. Parameters ---------- T: (m, d_t) matrix Base treatments for each sample X: (m, d_x) matrix, optional Features for each sample alpha: float in [0, 1], default 0.05 The overall level of confidence of the reported interval. The alpha/2, 1-alpha/2 confidence interval is reported. Returns ------- lower, upper : tuple(type of :meth:`marginal_effect(T, X)<marginal_effect>`, \ type of :meth:`marginal_effect(T, X)<marginal_effect>` ) The lower and the upper bounds of the confidence interval for each quantity. """ raise NotImplementedError("Defer to inference")
[docs] @_defer_to_inference def marginal_effect_inference(self, T, X=None): """ Inference results for the quantities :math:`\\partial \\tau(T, X)` produced by the model. Available only when ``inference`` is not ``None``, when calling the fit method. Parameters ---------- T: (m, d_t) matrix Base treatments for each sample X: (m, d_x) matrix, optional Features for each sample Returns ------- InferenceResults: object The inference results instance contains prediction and prediction standard error and can on demand calculate confidence interval, z statistic and p value. It can also output a dataframe summary of these inference results. """ raise NotImplementedError("Defer to inference")
[docs] @_defer_to_inference def ate_interval(self, X=None, *, T0, T1, alpha=0.05): """ Confidence intervals for the quantity :math:`E_X[\\tau(X, T0, T1)]` produced by the model. Available only when ``inference`` is not ``None``, when calling the fit method. Parameters ---------- X: (m, d_x) matrix, optional Features for each sample T0: (m, d_t) matrix or vector of length m, default 0 Base treatments for each sample T1: (m, d_t) matrix or vector of length m, default 1 Target treatments for each sample alpha: float in [0, 1], default 0.05 The overall level of confidence of the reported interval. The alpha/2, 1-alpha/2 confidence interval is reported. Returns ------- lower, upper : tuple(type of :meth:`ate(X, T0, T1)<ate>`, type of :meth:`ate(X, T0, T1))<ate>` ) The lower and the upper bounds of the confidence interval for each quantity. """ raise NotImplementedError("Defer to inference")
[docs] @_defer_to_inference def ate_inference(self, X=None, *, T0, T1): """ Inference results for the quantity :math:`E_X[\\tau(X, T0, T1)]` produced by the model. Available only when ``inference`` is not ``None``, when calling the fit method. Parameters ---------- X: (m, d_x) matrix, optional Features for each sample T0: (m, d_t) matrix or vector of length m, default 0 Base treatments for each sample T1: (m, d_t) matrix or vector of length m, default 1 Target treatments for each sample Returns ------- PopulationSummaryResults: object The inference results instance contains prediction and prediction standard error and can on demand calculate confidence interval, z statistic and p value. It can also output a dataframe summary of these inference results. """ raise NotImplementedError("Defer to inference")
[docs] @_defer_to_inference def marginal_ate_interval(self, T, X=None, *, alpha=0.05): """ Confidence intervals for the quantities :math:`E_{T,X}[\\partial \\tau(T, X)]` produced by the model. Available only when ``inference`` is not ``None``, when calling the fit method. Parameters ---------- T: (m, d_t) matrix Base treatments for each sample X: (m, d_x) matrix, optional Features for each sample alpha: float in [0, 1], default 0.05 The overall level of confidence of the reported interval. The alpha/2, 1-alpha/2 confidence interval is reported. Returns ------- lower, upper : tuple(type of :meth:`marginal_ate(T, X)<marginal_ate>`, \ type of :meth:`marginal_ate(T, X)<marginal_ate>` ) The lower and the upper bounds of the confidence interval for each quantity. """ raise NotImplementedError("Defer to inference")
[docs] @_defer_to_inference def marginal_ate_inference(self, T, X=None): """ Inference results for the quantities :math:`E_{T,X}[\\partial \\tau(T, X)]` produced by the model. Available only when ``inference`` is not ``None``, when calling the fit method. Parameters ---------- T: (m, d_t) matrix Base treatments for each sample X: (m, d_x) matrix, optional Features for each sample Returns ------- PopulationSummaryResults: object The inference results instance contains prediction and prediction standard error and can on demand calculate confidence interval, z statistic and p value. It can also output a dataframe summary of these inference results. """ raise NotImplementedError("Defer to inference")
@property def dowhy(self): """ Get an instance of :class:`.DoWhyWrapper` to allow other functionalities from dowhy package. (e.g. causal graph, refutation test, etc.) Returns ------- DoWhyWrapper: instance An instance of :class:`.DoWhyWrapper` """ return DoWhyWrapper(self)
[docs]class LinearCateEstimator(BaseCateEstimator): """ Base class for all CATE estimators in this package where the outcome is linear given some user-defined treatment featurization. """ _original_treatment_featurizer = None
[docs] @abc.abstractmethod def const_marginal_effect(self, X=None): """ Calculate the constant marginal CATE :math:`\\theta(·)`. The marginal effect is conditional on a vector of features on a set of m test samples X[i]. Parameters ---------- X: (m, d_x) matrix, optional Features for each sample. Returns ------- theta: (m, d_y, d_f_t) matrix or (d_y, d_f_t) matrix if X is None where d_f_t is \ the dimension of the featurized treatment. If treatment_featurizer is None, d_f_t = d_t. Constant marginal CATE of each featurized treatment on each outcome for each sample X[i]. Note that when Y or featurized-T (or T if treatment_featurizer is None) is a vector rather than a 2-dimensional array, the corresponding singleton dimensions in the output will be collapsed (e.g. if both are vectors, then the output of this method will also be a vector) """ raise NotImplementedError("Abstract method")
[docs] def effect(self, X=None, *, T0, T1): """ Calculate the heterogeneous treatment effect :math:`\\tau(X, T0, T1)`. The effect is calculated between the two treatment points conditional on a vector of features on a set of m test samples :math:`\\{T0_i, T1_i, X_i\\}`. Since this class assumes a linear effect, only the difference between T0ᵢ and T1ᵢ matters for this computation. Parameters ---------- T0: (m, d_t) matrix Base treatments for each sample T1: (m, d_t) matrix Target treatments for each sample X: (m, d_x) matrix, optional Features for each sample Returns ------- effect: (m, d_y) matrix (or length m vector if Y was a vector) Heterogeneous treatment effects on each outcome for each sample. Note that when Y is a vector rather than a 2-dimensional array, the corresponding singleton dimension will be collapsed (so this method will return a vector) """ X, T0, T1 = self._expand_treatments(X, T0, T1) # TODO: what if input is sparse? - there's no equivalent to einsum, # but tensordot can't be applied to this problem because we don't sum over m eff = self.const_marginal_effect(X) # if X is None then the shape of const_marginal_effect will be wrong because the number # of rows of T was not taken into account if X is None: eff = np.repeat(eff, shape(T0)[0], axis=0) m = shape(eff)[0] dT = T1 - T0 einsum_str = 'myt,mt->my' if ndim(dT) == 1: einsum_str = einsum_str.replace('t', '') if ndim(eff) == ndim(dT): # y is a vector, rather than a 2D array einsum_str = einsum_str.replace('y', '') return np.einsum(einsum_str, eff, dT)
[docs] def marginal_effect(self, T, X=None): """ Calculate the heterogeneous marginal effect :math:`\\partial\\tau(T, X)`. The marginal effect is calculated around a base treatment point conditional on a vector of features on a set of m test samples :math:`\\{T_i, X_i\\}`. If treatment_featurizer is None, the base treatment is ignored in this calculation and the result is equivalent to const_marginal_effect. Parameters ---------- T: (m, d_t) matrix Base treatments for each sample X: (m, d_x) matrix, optional Features for each sample Returns ------- grad_tau: (m, d_y, d_t) array Heterogeneous marginal effects on each outcome for each sample Note that when Y or T is a vector rather than a 2-dimensional array, the corresponding singleton dimensions in the output will be collapsed (e.g. if both are vectors, then the output of this method will also be a vector) """ X, T = self._expand_treatments(X, T, transform=False) eff = self.const_marginal_effect(X) if X is None: eff = np.repeat(eff, shape(T)[0], axis=0) if self._original_treatment_featurizer: feat_T = self.transformer.transform(T) jac_T = self.transformer.jac(T) einsum_str = 'myf, mtf->myt' if ndim(T) == 1: einsum_str = einsum_str.replace('t', '') if ndim(feat_T) == 1: einsum_str = einsum_str.replace('f', '') if (ndim(eff) == ndim(feat_T)): einsum_str = einsum_str.replace('y', '') return np.einsum(einsum_str, eff, jac_T) else: return eff
[docs] def marginal_effect_interval(self, T, X=None, *, alpha=0.05): if self._original_treatment_featurizer: return self._use_inference_method('marginal_effect_interval', T, X, alpha=alpha) else: X, T = self._expand_treatments(X, T) effs = self.const_marginal_effect_interval(X=X, alpha=alpha) if X is None: # need to repeat by the number of rows of T to ensure the right shape effs = tuple(np.repeat(eff, shape(T)[0], axis=0) for eff in effs) return effs
marginal_effect_interval.__doc__ = BaseCateEstimator.marginal_effect_interval.__doc__
[docs] def marginal_effect_inference(self, T, X=None): if self._original_treatment_featurizer: return self._use_inference_method('marginal_effect_inference', T, X) else: X, T = self._expand_treatments(X, T) cme_inf = self.const_marginal_effect_inference(X=X) if X is None: cme_inf = cme_inf._expand_outputs(shape(T)[0]) return cme_inf
marginal_effect_inference.__doc__ = BaseCateEstimator.marginal_effect_inference.__doc__
[docs] @BaseCateEstimator._defer_to_inference def const_marginal_effect_interval(self, X=None, *, alpha=0.05): """ Confidence intervals for the quantities :math:`\\theta(X)` produced by the model. Available only when ``inference`` is not ``None``, when calling the fit method. Parameters ---------- X: (m, d_x) matrix, optional Features for each sample alpha: float in [0, 1], default 0.05 The overall level of confidence of the reported interval. The alpha/2, 1-alpha/2 confidence interval is reported. Returns ------- lower, upper : tuple(type of :meth:`const_marginal_effect(X)<const_marginal_effect>` ,\ type of :meth:`const_marginal_effect(X)<const_marginal_effect>` ) The lower and the upper bounds of the confidence interval for each quantity. """ raise NotImplementedError("Defer to inference")
[docs] @BaseCateEstimator._defer_to_inference def const_marginal_effect_inference(self, X=None): """ Inference results for the quantities :math:`\\theta(X)` produced by the model. Available only when ``inference`` is not ``None``, when calling the fit method. Parameters ---------- X: (m, d_x) matrix, optional Features for each sample Returns ------- InferenceResults: object The inference results instance contains prediction and prediction standard error and can on demand calculate confidence interval, z statistic and p value. It can also output a dataframe summary of these inference results. """ raise NotImplementedError("Defer to inference")
[docs] def const_marginal_ate(self, X=None): """ Calculate the average constant marginal CATE :math:`E_X[\\theta(X)]`. Parameters ---------- X: (m, d_x) matrix, optional Features for each sample. Returns ------- theta: (d_y, d_f_t) matrix where d_f_t is the dimension of the featurized treatment. \ If treatment_featurizer is None, d_f_t = d_t. Average constant marginal CATE of each treatment on each outcome. Note that when Y or featurized-T (or T if treatment_featurizer is None) is a vector rather than a 2-dimensional array, the corresponding singleton dimensions in the output will be collapsed (e.g. if both are vectors, then the output of this method will also be a scalar) """ return np.mean(self.const_marginal_effect(X=X), axis=0)
[docs] @BaseCateEstimator._defer_to_inference def const_marginal_ate_interval(self, X=None, *, alpha=0.05): """ Confidence intervals for the quantities :math:`E_X[\\theta(X)]` produced by the model. Available only when ``inference`` is not ``None``, when calling the fit method. Parameters ---------- X: (m, d_x) matrix, optional Features for each sample alpha: float in [0, 1], default 0.05 The overall level of confidence of the reported interval. The alpha/2, 1-alpha/2 confidence interval is reported. Returns ------- lower, upper : tuple(type of :meth:`const_marginal_ate(X)<const_marginal_ate>` ,\ type of :meth:`const_marginal_ate(X)<const_marginal_ate>` ) The lower and the upper bounds of the confidence interval for each quantity. """ raise NotImplementedError("Defer to inference")
[docs] @BaseCateEstimator._defer_to_inference def const_marginal_ate_inference(self, X=None): """ Inference results for the quantities :math:`E_X[\\theta(X)]` produced by the model. Available only when ``inference`` is not ``None``, when calling the fit method. Parameters ---------- X: (m, d_x) matrix, optional Features for each sample Returns ------- PopulationSummaryResults: object The inference results instance contains prediction and prediction standard error and can on demand calculate confidence interval, z statistic and p value. It can also output a dataframe summary of these inference results. """
[docs] def marginal_ate(self, T, X=None): return np.mean(self.marginal_effect(T, X=X), axis=0)
marginal_ate.__doc__ = BaseCateEstimator.marginal_ate.__doc__
[docs] @BaseCateEstimator._defer_to_inference def marginal_ate_interval(self, T, X=None, *, alpha=0.05): raise NotImplementedError("Defer to inference")
marginal_ate_interval.__doc__ = BaseCateEstimator.marginal_ate_interval.__doc__
[docs] @BaseCateEstimator._defer_to_inference def marginal_ate_inference(self, T, X=None): raise NotImplementedError("Defer to inference")
marginal_ate_inference.__doc__ = BaseCateEstimator.marginal_ate_inference.__doc__
[docs] def shap_values(self, X, *, feature_names=None, treatment_names=None, output_names=None, background_samples=100): """ Shap value for the final stage models (const_marginal_effect) Parameters ---------- X: (m, d_x) matrix Features for each sample. Should be in the same shape of fitted X in final stage. feature_names: list of str of length X.shape[1], optional The names of input features. treatment_names: list, optional The name of featurized treatment. In discrete treatment scenario, the name should not include the name of the baseline treatment (i.e. the control treatment, which by default is the alphabetically smaller) output_names: list, optional The name of the outcome. background_samples: int , default 100 How many samples to use to compute the baseline effect. If None then all samples are used. Returns ------- shap_outs: nested dictionary of Explanation object A nested dictionary by using each output name (e.g. 'Y0', 'Y1', ... when `output_names=None`) and each treatment name (e.g. 'T0', 'T1', ... when `treatment_names=None`) as key and the shap_values explanation object as value. If the input data at fit time also contain metadata, (e.g. are pandas DataFrames), then the column metatdata for the treatments, outcomes and features are used instead of the above defaults (unless the user overrides with explicitly passing the corresponding names). """ return _shap_explain_cme(self.const_marginal_effect, X, self._d_t, self._d_y, feature_names=feature_names, treatment_names=treatment_names, output_names=output_names, input_names=self._input_names, background_samples=background_samples)
[docs]class TreatmentExpansionMixin(BaseCateEstimator): """ Mixin which automatically handles promotions of scalar treatments to the appropriate shape, as well as treatment featurization for discrete treatments and user-specified treatment transformers """ transformer = None _original_treatment_featurizer = None def _prefit(self, Y, T, *args, **kwargs): super()._prefit(Y, T, *args, **kwargs) # need to store the *original* dimensions of T so that we can expand scalar inputs to match; # subclasses should overwrite self._d_t with post-transformed dimensions of T for generating treatments self._d_t_in = self._d_t def _postfit(self, Y, T, *args, **kwargs): super()._postfit(Y, T, *args, **kwargs) if self.transformer: self._set_transformed_treatment_names() def _expand_treatments(self, X=None, *Ts, transform=True): X, *Ts = check_input_arrays(X, *Ts) n_rows = 1 if X is None else shape(X)[0] outTs = [] for T in Ts: if (ndim(T) == 0) and self._d_t_in and self._d_t_in[0] > 1: warn("A scalar was specified but there are multiple treatments; " "the same value will be used for each treatment. Consider specifying" "all treatments, or using the const_marginal_effect method.") if ndim(T) == 0: T = np.full((n_rows,) + self._d_t_in, T) if self.transformer and transform: if not self._original_treatment_featurizer: T = T.reshape(-1, 1) T = self.transformer.transform(T) outTs.append(T) return (X,) + tuple(outTs) def _set_transformed_treatment_names(self): """ Extracts treatment names from sklearn transformers. Or, if transformer does not have a get_feature_names method, sets default treatment names. """ if hasattr(self, "_input_names"): ret = get_feature_names_or_default(self.transformer, self._input_names["treatment_names"], prefix='T') self._input_names["treatment_names"] = list(ret) if ret is not None else ret
[docs] def cate_treatment_names(self, treatment_names=None): """ Get treatment names. If the treatment is discrete or featurized, it will return expanded treatment names. Parameters ---------- treatment_names: list of str of length T.shape[1], optional The names of the treatments. If None and the T passed to fit was a dataframe, it defaults to the column names from the dataframe. Returns ------- out_treatment_names: list of str Returns (possibly expanded) treatment names. """ if treatment_names is not None: if self.transformer: ret = get_feature_names_or_default(self.transformer, treatment_names) return list(ret) if ret is not None else None return treatment_names # Treatment names is None, default to BaseCateEstimator return super().cate_treatment_names()
# override effect to set defaults, which works with the new definition of _expand_treatments
[docs] def effect(self, X=None, *, T0=0, T1=1): # NOTE: don't explicitly expand treatments here, because it's done in the super call return super().effect(X, T0=T0, T1=T1)
effect.__doc__ = BaseCateEstimator.effect.__doc__
[docs] def ate(self, X=None, *, T0=0, T1=1): return super().ate(X=X, T0=T0, T1=T1)
ate.__doc__ = BaseCateEstimator.ate.__doc__
[docs] def ate_interval(self, X=None, *, T0=0, T1=1, alpha=0.05): return super().ate_interval(X=X, T0=T0, T1=T1, alpha=alpha)
ate_interval.__doc__ = BaseCateEstimator.ate_interval.__doc__
[docs] def ate_inference(self, X=None, *, T0=0, T1=1): return super().ate_inference(X=X, T0=T0, T1=T1)
ate_inference.__doc__ = BaseCateEstimator.ate_inference.__doc__
[docs]class LinearModelFinalCateEstimatorMixin(BaseCateEstimator): """ Base class for models where the final stage is a linear model. Such an estimator must implement a :attr:`model_final_` attribute that points to the fitted final :class:`.StatsModelsLinearRegression` object that represents the fitted CATE model. Also must implement :attr:`featurizer_` that points to the fitted featurizer and :attr:`bias_part_of_coef` that designates if the intercept is the first element of the :attr:`model_final_` coefficient. Attributes ---------- bias_part_of_coef: bool Whether the CATE model's intercept is contained in the final model's ``coef_`` rather than as a separate ``intercept_`` """ featurizer = None def _get_inference_options(self): options = super()._get_inference_options() options.update(auto=LinearModelFinalInference) return options @property def bias_part_of_coef(self): return False @property def coef_(self): """ The coefficients in the linear model of the constant marginal treatment effect. Returns ------- coef: (n_x,) or (n_t, n_x) or (n_y, n_t, n_x) array_like Where n_x is the number of features that enter the final model (either the dimension of X or the dimension of featurizer.fit_transform(X) if the CATE estimator has a featurizer.), n_t is the number of treatments, n_y is the number of outcomes. Dimensions are omitted if the original input was a vector and not a 2D array. For binary treatment the n_t dimension is also omitted. """ return parse_final_model_params(self.model_final_.coef_, self.model_final_.intercept_, self._d_y, self._d_t, self._d_t_in, self.bias_part_of_coef, self.fit_cate_intercept_)[0] @property def intercept_(self): """ The intercept in the linear model of the constant marginal treatment effect. Returns ------- intercept: float or (n_y,) or (n_y, n_t) array_like Where n_t is the number of treatments, n_y is the number of outcomes. Dimensions are omitted if the original input was a vector and not a 2D array. For binary treatment the n_t dimension is also omitted. """ if not self.fit_cate_intercept_: raise AttributeError("No intercept was fitted!") return parse_final_model_params(self.model_final_.coef_, self.model_final_.intercept_, self._d_y, self._d_t, self._d_t_in, self.bias_part_of_coef, self.fit_cate_intercept_)[1]
[docs] @BaseCateEstimator._defer_to_inference def coef__interval(self, *, alpha=0.05): """ The coefficients in the linear model of the constant marginal treatment effect. Parameters ---------- alpha: float in [0, 1], default 0.05 The overall level of confidence of the reported interval. The alpha/2, 1-alpha/2 confidence interval is reported. Returns ------- lb, ub: tuple(type of :meth:`coef_()<coef_>`, type of :meth:`coef_()<coef_>`) The lower and upper bounds of the confidence interval for each quantity. """ raise NotImplementedError("Defer to inference")
[docs] @BaseCateEstimator._defer_to_inference def coef__inference(self): """ The inference of coefficients in the linear model of the constant marginal treatment effect. Returns ------- InferenceResults: object The inference of the coefficients in the final linear model """ raise NotImplementedError("Defer to inference")
[docs] @BaseCateEstimator._defer_to_inference def intercept__interval(self, *, alpha=0.05): """ The intercept in the linear model of the constant marginal treatment effect. Parameters ---------- alpha: float in [0, 1], default 0.05 The overall level of confidence of the reported interval. The alpha/2, 1-alpha/2 confidence interval is reported. Returns ------- lower, upper: tuple(type of :meth:`intercept_()<intercept_>`, type of :meth:`intercept_()<intercept_>`) The lower and upper bounds of the confidence interval. """ raise NotImplementedError("Defer to inference")
[docs] @BaseCateEstimator._defer_to_inference def intercept__inference(self): """ The inference of intercept in the linear model of the constant marginal treatment effect. Returns ------- InferenceResults: object The inference of the intercept in the final linear model """ raise NotImplementedError("Defer to inference")
[docs] def summary(self, alpha=0.05, value=0, decimals=3, feature_names=None, treatment_names=None, output_names=None): """ The summary of coefficient and intercept in the linear model of the constant marginal treatment effect. Parameters ---------- alpha: float in [0, 1], default 0.05 The overall level of confidence of the reported interval. The alpha/2, 1-alpha/2 confidence interval is reported. value: float, default 0 The mean value of the metric you'd like to test under null hypothesis. decimals: int, default 3 Number of decimal places to round each column to. feature_names: list of str, optional The input of the feature names treatment_names: list of str, optional The names of the treatments output_names: list of str, optional The names of the outputs Returns ------- smry : Summary instance this holds the summary tables and text, which can be printed or converted to various output formats. """ # Get input names treatment_names = self.cate_treatment_names(treatment_names) output_names = self.cate_output_names(output_names) # Summary smry = Summary() extra_txt = ["<sub>A linear parametric conditional average treatment effect (CATE) model was fitted:"] if self._original_treatment_featurizer: extra_txt.append("$Y = \\Theta(X)\\cdot \\psi(T) + g(X, W) + \\epsilon$") extra_txt.append("where $\\psi(T)$ is the output of the `treatment_featurizer") extra_txt.append( "and for every outcome $i$ and featurized treatment $j$ the CATE $\\Theta_{ij}(X)$ has the form:") else: extra_txt.append("$Y = \\Theta(X)\\cdot T + g(X, W) + \\epsilon$") extra_txt.append( "where for every outcome $i$ and treatment $j$ the CATE $\\Theta_{ij}(X)$ has the form:") if self.featurizer: extra_txt.append("$\\Theta_{ij}(X) = \\phi(X)' coef_{ij} + cate\\_intercept_{ij}$") extra_txt.append("where $\\phi(X)$ is the output of the `featurizer`") else: extra_txt.append("$\\Theta_{ij}(X) = X' coef_{ij} + cate\\_intercept_{ij}$") extra_txt.append("Coefficient Results table portrays the $coef_{ij}$ parameter vector for " "each outcome $i$ and treatment $j$. " "Intercept Results table portrays the $cate\\_intercept_{ij}$ parameter.</sub>") smry.add_extra_txt(extra_txt) d_t = self._d_t[0] if self._d_t else 1 d_y = self._d_y[0] if self._d_y else 1 try: coef_table = self.coef__inference().summary_frame(alpha=alpha, value=value, decimals=decimals, feature_names=feature_names, treatment_names=treatment_names, output_names=output_names) coef_array = coef_table.values coef_headers = coef_table.columns.tolist() n_level = coef_table.index.nlevels if n_level > 1: coef_stubs = ["|".join(ind_value) for ind_value in coef_table.index.values] else: coef_stubs = coef_table.index.tolist() coef_title = 'Coefficient Results' smry.add_table(coef_array, coef_headers, coef_stubs, coef_title) except Exception as e: print("Coefficient Results: ", str(e)) try: intercept_table = self.intercept__inference().summary_frame(alpha=alpha, value=value, decimals=decimals, feature_names=None, treatment_names=treatment_names, output_names=output_names) intercept_array = intercept_table.values intercept_headers = intercept_table.columns.tolist() n_level = intercept_table.index.nlevels if n_level > 1: intercept_stubs = ["|".join(ind_value) for ind_value in intercept_table.index.values] else: intercept_stubs = intercept_table.index.tolist() intercept_title = 'CATE Intercept Results' smry.add_table(intercept_array, intercept_headers, intercept_stubs, intercept_title) except Exception as e: print("CATE Intercept Results: ", str(e)) if len(smry.tables) > 0: return smry
[docs] def shap_values(self, X, *, feature_names=None, treatment_names=None, output_names=None, background_samples=100): if hasattr(self, "featurizer_") and self.featurizer_ is not None: X = self.featurizer_.transform(X) feature_names = self.cate_feature_names(feature_names) return _shap_explain_joint_linear_model_cate(self.model_final_, X, self._d_t, self._d_y, self.bias_part_of_coef, feature_names=feature_names, treatment_names=treatment_names, output_names=output_names, input_names=self._input_names, background_samples=background_samples)
shap_values.__doc__ = LinearCateEstimator.shap_values.__doc__
[docs]class StatsModelsCateEstimatorMixin(LinearModelFinalCateEstimatorMixin): """ Mixin class that offers `inference='statsmodels'` options to the CATE estimator that inherits it. Such an estimator must implement a :attr:`model_final_` attribute that points to the fitted final :class:`.StatsModelsLinearRegression` object that represents the fitted CATE model. Also must implement :attr:`featurizer_` that points to the fitted featurizer and :attr:`bias_part_of_coef` that designates if the intercept is the first element of the :attr:`model_final_` coefficient. """ def _get_inference_options(self): # add statsmodels to parent's options options = super()._get_inference_options() options.update(statsmodels=StatsModelsInference) options.update(auto=StatsModelsInference) return options
[docs]class DebiasedLassoCateEstimatorMixin(LinearModelFinalCateEstimatorMixin): """Mixin for cate models where the final stage is a debiased lasso model.""" def _get_inference_options(self): # add debiasedlasso to parent's options options = super()._get_inference_options() options.update(debiasedlasso=LinearModelFinalInference) options.update(auto=LinearModelFinalInference) return options
[docs]class ForestModelFinalCateEstimatorMixin(BaseCateEstimator): def _get_inference_options(self): # add blb to parent's options options = super()._get_inference_options() options.update(blb=GenericSingleTreatmentModelFinalInference) options.update(auto=GenericSingleTreatmentModelFinalInference) return options @property def feature_importances_(self): return self.model_final_.feature_importances_
[docs]class LinearModelFinalCateEstimatorDiscreteMixin(BaseCateEstimator): # TODO Share some logic with non-discrete version """ Base class for models where the final stage is a linear model. Subclasses must expose a ``fitted_models_final`` attribute returning an array of the fitted models for each non-control treatment """ def _get_inference_options(self): options = super()._get_inference_options() options.update(auto=LinearModelFinalInferenceDiscrete) return options
[docs] def coef_(self, T): """ The coefficients in the linear model of the constant marginal treatment effect associated with treatment T. Parameters ---------- T: alphanumeric The input treatment for which we want the coefficients. Returns ------- coef: (n_x,) or (n_y, n_x) array_like Where n_x is the number of features that enter the final model (either the dimension of X or the dimension of featurizer.fit_transform(X) if the CATE estimator has a featurizer.) """ _, T = self._expand_treatments(None, T) ind = inverse_onehot(T).item() - 1 assert ind >= 0, "No model was fitted for the control" return self.fitted_models_final[ind].coef_
[docs] def intercept_(self, T): """ The intercept in the linear model of the constant marginal treatment effect associated with treatment T. Parameters ---------- T: alphanumeric The input treatment for which we want the coefficients. Returns ------- intercept: float or (n_y,) array_like """ if not self.fit_cate_intercept_: raise AttributeError("No intercept was fitted!") _, T = self._expand_treatments(None, T) ind = inverse_onehot(T).item() - 1 assert ind >= 0, "No model was fitted for the control" return self.fitted_models_final[ind].intercept_.reshape(self._d_y)
[docs] @BaseCateEstimator._defer_to_inference def coef__interval(self, T, *, alpha=0.05): """ The confidence interval for the coefficients in the linear model of the constant marginal treatment effect associated with treatment T. Parameters ---------- T: alphanumeric The input treatment for which we want the coefficients. alpha: float in [0, 1], default 0.05 The overall level of confidence of the reported interval. The alpha/2, 1-alpha/2 confidence interval is reported. Returns ------- lower, upper: tuple(type of :meth:`coef_(T)<coef_>`, type of :meth:`coef_(T)<coef_>`) The lower and upper bounds of the confidence interval for each quantity. """ raise NotImplementedError("Defer to inference")
[docs] @BaseCateEstimator._defer_to_inference def coef__inference(self, T): """ The inference for the coefficients in the linear model of the constant marginal treatment effect associated with treatment T. Parameters ---------- T: alphanumeric The input treatment for which we want the coefficients. Returns ------- InferenceResults: object The inference of the coefficients in the final linear model """ raise NotImplementedError("Defer to inference")
[docs] @BaseCateEstimator._defer_to_inference def intercept__interval(self, T, *, alpha=0.05): """ The intercept in the linear model of the constant marginal treatment effect associated with treatment T. Parameters ---------- T: alphanumeric The input treatment for which we want the coefficients. alpha: float in [0, 1], default 0.05 The overall level of confidence of the reported interval. The alpha/2, 1-alpha/2 confidence interval is reported. Returns ------- lower, upper: tuple(type of :meth:`intercept_(T)<intercept_>`, type of :meth:`intercept_(T)<intercept_>`) The lower and upper bounds of the confidence interval. """ raise NotImplementedError("Defer to inference")
[docs] @BaseCateEstimator._defer_to_inference def intercept__inference(self, T): """ The inference of the intercept in the linear model of the constant marginal treatment effect associated with treatment T. Parameters ---------- T: alphanumeric The input treatment for which we want the coefficients. Returns ------- InferenceResults: object The inference of the intercept in the final linear model """ raise NotImplementedError("Defer to inference")
[docs] def summary(self, T, *, alpha=0.05, value=0, decimals=3, feature_names=None, treatment_names=None, output_names=None): """ The summary of coefficient and intercept in the linear model of the constant marginal treatment effect associated with treatment T. Parameters ---------- alpha: float in [0, 1], default 0.05 The overall level of confidence of the reported interval. The alpha/2, 1-alpha/2 confidence interval is reported. value: float, default 0 The mean value of the metric you'd like to test under null hypothesis. decimals: int, default 3 Number of decimal places to round each column to. feature_names: list of str, optional The input of the feature names treatment_names: list of str, optional The names of the treatments output_names: list of str, optional The names of the outputs Returns ------- smry : Summary instance this holds the summary tables and text, which can be printed or converted to various output formats. """ # Get input names treatment_names = self.cate_treatment_names(treatment_names) output_names = self.cate_output_names(output_names) # Note: we do not transform feature names since that is done within summary_frame # Summary smry = Summary() smry.add_extra_txt(["<sub>A linear parametric conditional average treatment effect (CATE) model was fitted:", "$Y = \\Theta(X)\\cdot T + g(X, W) + \\epsilon$", "where $T$ is the one-hot-encoding of the discrete treatment and " "for every outcome $i$ and treatment $j$ the CATE $\\Theta_{ij}(X)$ has the form:", "$\\Theta_{ij}(X) = \\phi(X)' coef_{ij} + cate\\_intercept_{ij}$", "where $\\phi(X)$ is the output of the `featurizer` or $X$ if `featurizer`=None. " "Coefficient Results table portrays the $coef_{ij}$ parameter vector for " "each outcome $i$ and the designated treatment $j$ passed to summary. " "Intercept Results table portrays the $cate\\_intercept_{ij}$ parameter.</sub>"]) try: coef_table = self.coef__inference(T).summary_frame( alpha=alpha, value=value, decimals=decimals, feature_names=feature_names, output_names=output_names) coef_array = coef_table.values coef_headers = coef_table.columns.tolist() coef_stubs = coef_table.index.tolist() coef_title = 'Coefficient Results' smry.add_table(coef_array, coef_headers, coef_stubs, coef_title) except Exception as e: print("Coefficient Results: ", e) try: intercept_table = self.intercept__inference(T).summary_frame( alpha=alpha, value=value, decimals=decimals, feature_names=None, output_names=output_names) intercept_array = intercept_table.values intercept_headers = intercept_table.columns.tolist() intercept_stubs = intercept_table.index.tolist() intercept_title = 'CATE Intercept Results' smry.add_table(intercept_array, intercept_headers, intercept_stubs, intercept_title) except Exception as e: print("CATE Intercept Results: ", e) if len(smry.tables) > 0: return smry
[docs]class StatsModelsCateEstimatorDiscreteMixin(LinearModelFinalCateEstimatorDiscreteMixin): """ Mixin class that offers `inference='statsmodels'` options to the CATE estimator that inherits it. Such an estimator must implement a :attr:`model_final_` attribute that points to a :class:`.StatsModelsLinearRegression` object that is cloned to fit each discrete treatment target CATE model and a :attr:`fitted_models_final` attribute that returns the list of fitted final models that represent the CATE for each categorical treatment. """ def _get_inference_options(self): # add statsmodels to parent's options options = super()._get_inference_options() options.update(statsmodels=StatsModelsInferenceDiscrete) options.update(auto=StatsModelsInferenceDiscrete) return options
[docs]class DebiasedLassoCateEstimatorDiscreteMixin(LinearModelFinalCateEstimatorDiscreteMixin): """Mixin for cate models where the final stage is a debiased lasso model.""" def _get_inference_options(self): # add statsmodels to parent's options options = super()._get_inference_options() options.update(debiasedlasso=LinearModelFinalInferenceDiscrete) options.update(auto=LinearModelFinalInferenceDiscrete) return options
[docs]class ForestModelFinalCateEstimatorDiscreteMixin(BaseCateEstimator): def _get_inference_options(self): # add blb to parent's options options = super()._get_inference_options() options.update(blb=GenericModelFinalInferenceDiscrete) options.update(auto=GenericModelFinalInferenceDiscrete) return options def feature_importances_(self, T): _, T = self._expand_treatments(None, T) ind = inverse_onehot(T).item() - 1 assert ind >= 0, "No model was fitted for the control" return self.fitted_models_final[ind].feature_importances_