Source code for econml.validate.drtester

from typing import Tuple, Union, List

import numpy as np
import pandas as pd
import scipy.stats as st
from sklearn.model_selection import check_cv
from sklearn.model_selection import cross_val_predict, StratifiedKFold, KFold
from statsmodels.api import OLS
from statsmodels.tools import add_constant

from econml.utilities import deprecated

from .results import CalibrationEvaluationResults, BLPEvaluationResults, UpliftEvaluationResults, EvaluationResults
from .utils import calculate_dr_outcomes, calc_uplift


[docs]class DRTester: """ Validation tests for CATE models. Includes the best linear predictor (BLP) test as in Chernozhukov et al. (2022), the calibration test in Dwivedi et al. (2020), and the QINI coefficient as in Radcliffe (2007). **Best Linear Predictor (BLP)** Runs ordinary least squares (OLS) of doubly robust (DR) outcomes on DR outcome predictions from the CATE model (and a constant). If the CATE model captures true heterogeneity, then the OLS estimate on the CATE predictions should be positive and significantly different than 0. **Calibration** First, units are binned based on out-of-sample defined quantiles of the CATE predictions (s(Z)). Within each bin (k), the absolute difference between the mean CATE prediction and DR outcome is calculated, along with the absolute difference in the mean CATE prediction and the overall ATE. These measures are then summed across bins, weighted by a probability a unit is in each bin. .. math:: \\mathrm{Cal}_G = \\sum_k \\pi(k) |{\\E[s(Z) | k] - \\E[Y^{DR} | k]}| \\mathrm{Cal}_O = \\sum_k \\pi(k) |{\\E[s(Z) | k] - \\E[Y^{DR}]}| The calibration r-squared is then defined as .. math:: \\mathcal{R^2}_C = 1 - \\frac{\\mathrm{Cal}_G}{\\mathrm{Cal}_O} The calibration r-squared metric is similar to the standard R-square score in that it can take any value less than or equal to 1, with scores closer to 1 indicating a better calibrated CATE model. **Uplift Modeling** Units are ordered by predicted CATE values and a running measure of the average treatment effect in each cohort is kept as we progress through ranks. The resulting TOC curve can then be plotted and its integral calculated and used as a measure of true heterogeneity captured by the CATE model; this integral is referred to as the AUTOC (area under TOC). The QINI curve is a variant of this curve that also incorporates treatment probability; its integral is referred to as the QINI coefficient. More formally, the TOC and QINI curves are given by the following functions: .. math:: \\tau_{TOC}(q) = \\mathrm{Cov}( Y^{DR}(g,p), \\frac{ \\mathbb{1}\\{\\hat{\\tau}(Z) \\geq \\hat{\\mu}(q)\\} }{ \\mathrm{Pr}(\\hat{\\tau}(Z) \\geq \\hat{\\mu}(q)) } ) \\tau_{QINI}(q) = \\mathrm{Cov}(Y^{DR}(g,p), \\mathbb{1}\\{\\hat{\\tau}(Z) \\geq \\hat{\\mu}(q)\\}) Where :math:`q` is the desired quantile, :math:`\\hat{\\mu}` is the quantile function, and :math:`\\hat{\\tau}` is the predicted CATE function. :math:`Y^{DR}(g,p)` refers to the doubly robust outcome difference (relative to control) for the given observation. The AUTOC and QINI coefficient are then given by: .. math:: AUTOC = \\int_0^1 \\tau_{TOC}(q) dq QINI = \\int_0^1 \\tau_{QINI}(q) dq Parameters ---------- model_regression: estimator Nuisance model estimator used to fit the outcome to features. Must be able to implement `fit` and `predict` methods model_propensity: estimator Nuisance model estimator used to fit the treatment assignment to features. Must be able to implement `fit` method and either `predict` (in the case of binary treatment) or `predict_proba` methods (in the case of multiple categorical treatments). cate: estimator Fitted conditional average treatment effect (CATE) estimator to be validated. cv: int or list, default 5 Splitter used for cross-validation. Can be either an integer (corresponding to the number of desired folds) or a list of indices corresponding to membership in each fold. References ---------- [Chernozhukov2022] V. Chernozhukov et al. Generic Machine Learning Inference on Heterogeneous Treatment Effects in Randomized Experiments arXiv preprint arXiv:1712.04802, 2022. `<https://arxiv.org/abs/1712.04802>`_ [Dwivedi2020] R. Dwivedi et al. Stable Discovery of Interpretable Subgroups via Calibration in Causal Studies arXiv preprint arXiv:2008.10109, 2020. `<https://arxiv.org/abs/2008.10109>`_ [Radcliffe2007] N. Radcliffe Using control groups to target on predicted lift: Building and assessing uplift model. Direct Marketing Analytics Journal (2007), pages 14–21. """
[docs] def __init__( self, *, model_regression, model_propensity, cate, cv: Union[int, List] = 5 ): self.model_regression = model_regression self.model_propensity = model_propensity self.cate = cate self.cv = cv
[docs] def get_cv_splitter(self, random_state: int = 123): """ Generates splitter object for cross-validation. Checks if the cv object passed at initialization is a splitting mechanism or a number of folds and returns appropriately modified object for use in downstream cross-validation. Parameters ---------- random_state: integer seed for splitter, default is 123 Returns ------ None, but adds attribute 'cv_splitter' containing the constructed splitter object. """ splitter = check_cv(self.cv, [0], classifier=True) if splitter != self.cv and isinstance(splitter, (KFold, StratifiedKFold)): splitter.shuffle = True splitter.random_state = random_state self.cv_splitter = splitter
[docs] def get_cv_splits(self, vars: np.array, T: np.array): """ Generates splits for cross-validation, given a set of variables and treatment vector. Parameters ---------- vars: (n x k) matrix or vector of length n Features used in nuisance models T: vector of length n_val Treatment assignment vector. Control status must be minimum value. It is recommended to have the control status be equal to 0, and all other treatments integers starting at 1. Returns ------ list of folds of the data, on which to run cross-validation """ if not hasattr(self, 'cv_splitter'): self.get_cv_splitter() all_vars = [var if np.ndim(var) == 2 else var.reshape(-1, 1) for var in vars if var is not None] to_split = np.hstack(all_vars) folds = self.cv_splitter.split(to_split, T) return list(folds)
[docs] def fit_nuisance( self, Xval: np.array, Dval: np.array, yval: np.array, Xtrain: np.array = None, Dtrain: np.array = None, ytrain: np.array = None, ): """ Generates nuisance predictions and calculates doubly robust (DR) outcomes either by (1) cross-fitting in the validation sample, or (2) fitting in the training sample and applying to the validation sample. If Xtrain, Dtrain, and ytrain are all not None, then option (2) will be implemented, otherwise, option (1) will be implemented. In order to use the `evaluate_cal` method then Xtrain, Dtrain, and ytrain must all be specified. Parameters ---------- Xval: (n_val x k) matrix or vector of length n Features used in nuisance models for validation sample Dval: vector of length n_val Treatment assignment of validation sample. Control status must be minimum value. It is recommended to have the control status be equal to 0, and all other treatments integers starting at 1. yval: vector of length n_val Outcomes for the validation sample Xtrain: (n_train x k) matrix or vector of length n, optional Features used in nuisance models for training sample Dtrain: vector of length n_train, optional Treatment assignment of training sample. Control status must be minimum value. It is recommended to have the control status be equal to 0, and all other treatments integers starting at 1. ytrain: vector of length n_train, optional Outcomes for the training sample Returns ------ self, with added attributes for the validation treatments (Dval), treatment values (tmts), number of treatments excluding control (n_treat), boolean flag for whether training data is provided (fit_on_train), doubly robust outcome values for the validation set (dr_val), and the DR ATE value (ate_val). If training data is provided, also adds attributes for the doubly robust outcomes for the training set (dr_train) and the training treatments (Dtrain) """ self.Dval = Dval # Unique treatments (ordered, includes control) self.treatments = np.sort(np.unique(Dval)) # Number of treatments (excluding control) self.n_treat = len(self.treatments) - 1 # Indicator for whether self.fit_on_train = (Xtrain is not None) and (Dtrain is not None) and (ytrain is not None) if self.fit_on_train: # Get DR outcomes in training sample reg_preds_train, prop_preds_train = self.fit_nuisance_cv(Xtrain, Dtrain, ytrain) self.dr_train_ = calculate_dr_outcomes(Dtrain, ytrain, reg_preds_train, prop_preds_train) # Get DR outcomes in validation sample reg_preds_val, prop_preds_val = self.fit_nuisance_train(Xtrain, Dtrain, ytrain, Xval) self.dr_val_ = calculate_dr_outcomes(Dval, yval, reg_preds_val, prop_preds_val) else: # Get DR outcomes in validation sample reg_preds_val, prop_preds_val = self.fit_nuisance_cv(Xval, Dval, yval) self.dr_val_ = calculate_dr_outcomes(Dval, yval, reg_preds_val, prop_preds_val) # Calculate ATE in the validation sample self.ate_val = self.dr_val_.mean(axis=0) return self
[docs] def fit_nuisance_train( self, Xtrain: np.array, Dtrain: np.array, ytrain: np.array, Xval: np.array ) -> Tuple[np.array, np.array]: """ Fits nuisance models in training sample and applies to generate predictions in validation sample. Parameters ---------- Xtrain: (n_train x k) matrix Training sample features used to predict both treatment status and outcomes Dtrain: array of length n_train Training sample treatment assignments. Should have integer values with the lowest-value corresponding to the control treatment. It is recommended to have the control take value 0 and all other treatments be integers starting at 1 ytrain: array of length n_train Outcomes for training sample Xval: (n_train x k) matrix Validation sample features used to predict both treatment status and outcomes Returns ------- 2 (n_val x n_treatment + 1) arrays corresponding to the predicted outcomes under treatment status and predicted treatment probabilities, respectively. Both evaluated on validation set. """ # Fit propensity in treatment model_propensity_fitted = self.model_propensity.fit(Xtrain, Dtrain) # Predict propensity scores prop_preds = model_propensity_fitted.predict_proba(Xval) # Possible treatments (need to allow more than 2) n = Xval.shape[0] reg_preds = np.zeros((n, self.n_treat + 1)) for i in range(self.n_treat + 1): model_regression_fitted = self.model_regression.fit( Xtrain[Dtrain == self.treatments[i]], ytrain[Dtrain == self.treatments[i]]) reg_preds[:, i] = model_regression_fitted.predict(Xval) return reg_preds, prop_preds
[docs] def fit_nuisance_cv( self, X: np.array, D: np.array, y: np.array ) -> Tuple[np.array, np.array]: """ Generates nuisance function predictions using k-folds cross validation. Parameters ---------- X: (n x k) matrix Features used to predict treatment/outcome D: array of length n Treatment assignments. Should have integer values with the lowest-value corresponding to the control treatment. It is recommended to have the control take value 0 and all other treatments be integers starting at 1 y: array of length n Outcomes Returns ------- 2 (n x n_treatment + 1) arrays corresponding to the predicted outcomes under treatment status and predicted treatment probabilities, respectively. """ splits = self.get_cv_splits([X], D) prop_preds = cross_val_predict(self.model_propensity, X, D, cv=splits, method='predict_proba') # Predict outcomes # T-learner logic N = X.shape[0] reg_preds = np.zeros((N, self.n_treat + 1)) for k in range(self.n_treat + 1): for train, test in splits: model_regression_fitted = self.model_regression.fit(X[train][D[train] == self.treatments[k]], y[train][D[train] == self.treatments[k]]) reg_preds[test, k] = model_regression_fitted.predict(X[test]) return reg_preds, prop_preds
[docs] def get_cate_preds( self, Xval: np.array, Xtrain: np.array = None ): """ Generates predictions from fitted CATE model. If Xtrain is None, then the predictions are generated using k-folds cross-validation on the validation set. If Xtrain is specified, then the CATE is assumed to have been fitted on the training sample (where the DR outcomes were generated using k-folds CV), and then applied to the validation sample. Parameters ---------- Xval: (n_val x n_treatment) matrix Validation set features to be used to predict (and potentially fit) DR outcomes in CATE model Xtrain (n_train x n_treatment) matrix, optional Training set features used to fit CATE model Returns ------- None, but adds attribute cate_preds_val_ for predicted CATE values on the validation set and, if training data is provided, attribute cate_preds_train_ for predicted CATE values on the training set """ base = self.treatments[0] vals = [self.cate.effect(X=Xval, T0=base, T1=t) for t in self.treatments[1:]] self.cate_preds_val_ = np.stack(vals).T if Xtrain is not None: trains = [self.cate.effect(X=Xtrain, T0=base, T1=t) for t in self.treatments[1:]] self.cate_preds_train_ = np.stack(trains).T
[docs] def evaluate_cal( self, Xval: np.array = None, Xtrain: np.array = None, n_groups: int = 4 ) -> CalibrationEvaluationResults: """ Implements calibration test as in [Dwivedi2020] Parameters ---------- Xval: (n_val x n_treatment) matrix, optional Validation sample features for CATE model. If not specified, then `fit_cate` method must already have been implemented Xtrain: (n_train x n_treatment) matrix, optional Training sample features for CATE model. If not specified, then `fit cate` method must already have been implemented (with Xtrain specified) n_groups: integer, default 4 Number of quantile-based groups used to calculate calibration score. Returns ------- CalibrationEvaluationResults object showing the results of the calibration test """ if not hasattr(self, 'dr_train_'): raise Exception("Must fit nuisance models on training sample data to use calibration test") # if CATE is given explicitly or has not been fitted at all previously, fit it now if (not hasattr(self, 'cate_preds_train_')) or (not hasattr(self, 'cate_preds_val_')): if (Xval is None) or (Xtrain is None): raise Exception('CATE predictions not yet calculated - must provide both Xval, Xtrain') self.get_cate_preds(Xval, Xtrain) cal_r_squared = np.zeros(self.n_treat) plot_data_dict = dict() for k in range(self.n_treat): cuts = np.quantile(self.cate_preds_train_[:, k], np.linspace(0, 1, n_groups + 1)) probs = np.zeros(n_groups) g_cate = np.zeros(n_groups) se_g_cate = np.zeros(n_groups) gate = np.zeros(n_groups) se_gate = np.zeros(n_groups) for i in range(n_groups): # Assign units in validation set to groups ind = (self.cate_preds_val_[:, k] >= cuts[i]) & (self.cate_preds_val_[:, k] <= cuts[i + 1]) # Proportion of validations set in group probs[i] = np.mean(ind) # Group average treatment effect (GATE) -- average of DR outcomes in group gate[i] = np.mean(self.dr_val_[ind, k]) se_gate[i] = np.std(self.dr_val_[ind, k]) / np.sqrt(np.sum(ind)) # Average of CATE predictions in group g_cate[i] = np.mean(self.cate_preds_val_[ind, k]) se_g_cate[i] = np.std(self.cate_preds_val_[ind, k]) / np.sqrt(np.sum(ind)) # Calculate group calibration score cal_score_g = np.sum(abs(gate - g_cate) * probs) # Calculate overall calibration score cal_score_o = np.sum(abs(gate - self.ate_val[k]) * probs) # Calculate R-square calibration score cal_r_squared[k] = 1 - (cal_score_g / cal_score_o) df_plot = pd.DataFrame({ 'ind': np.array(range(n_groups)), 'gate': gate, 'se_gate': se_gate, 'g_cate': g_cate, 'se_g_cate': se_g_cate }) plot_data_dict[self.treatments[k + 1]] = df_plot self.cal_res = CalibrationEvaluationResults( cal_r_squared=cal_r_squared, plot_data_dict=plot_data_dict, treatments=self.treatments ) return self.cal_res
[docs] def evaluate_blp( self, Xval: np.array = None, Xtrain: np.array = None ) -> BLPEvaluationResults: """ Implements the best linear predictor (BLP) test as in [Chernozhukov2022]. `fit_nusiance` method must already be implemented. Parameters ---------- Xval: (n_val x k) matrix, optional Validation sample features for CATE model. If not specified, then `fit_cate` method must already have been implemented Xtrain: (n_train x k) matrix, optional Training sample features for CATE model. If specified, then CATE is fitted on training sample and applied to Xval. If specified, then Xtrain, Dtrain, Ytrain must have been specified in `fit_nuisance` method (and vice-versa) Returns ------- BLPEvaluationResults object showing the results of the BLP test """ if not hasattr(self, 'dr_val_'): raise Exception("Must fit nuisances before evaluating") # if CATE is given explicitly or has not been fitted at all previously, fit it now if not hasattr(self, 'cate_preds_val_'): if Xval is None: # need at least Xval raise Exception('CATE predictions not yet calculated - must provide Xval') self.get_cate_preds(Xval, Xtrain) if self.n_treat == 1: # binary treatment reg = OLS(self.dr_val_, add_constant(self.cate_preds_val_)).fit() params = [reg.params[1]] errs = [reg.bse[1]] pvals = [reg.pvalues[1]] else: # categorical treatment params = [] errs = [] pvals = [] for k in range(self.n_treat): # run a separate regression for each reg = OLS(self.dr_val_[:, k], add_constant(self.cate_preds_val_[:, k])).fit(cov_type='HC1') params.append(reg.params[1]) errs.append(reg.bse[1]) pvals.append(reg.pvalues[1]) self.blp_res = BLPEvaluationResults( params=params, errs=errs, pvals=pvals, treatments=self.treatments ) return self.blp_res
[docs] def evaluate_uplift( self, Xval: np.array = None, Xtrain: np.array = None, percentiles: np.array = np.linspace(5, 95, 50), metric: str = 'qini', n_bootstrap: int = 1000 ) -> UpliftEvaluationResults: """ Calculates uplift curves and coefficients for the given model, where units are ordered by predicted CATE values and a running measure of the average treatment effect in each cohort is kept as we progress through ranks. The uplift coefficient is then the area under the resulting curve, with a value of 0 interpreted as corresponding to a model with randomly assigned CATE coefficients. All calculations are performed on validation dataset results, using the training set as input. Parameters ---------- Xval: (n_val x k) matrix, optional Validation sample features for CATE model. If not specified, then `fit_cate` method must already have been implemented Xtrain: (n_train x k) matrix, optional Training sample features for CATE model. If specified, then CATE is fitted on training sample and applied to Xval. If specified, then Xtrain, Dtrain, Ytrain must have been specified in `fit_nuisance` method (and vice-versa) percentiles: one-dimensional array, default ``np.linspace(5, 95, 50)`` Array of percentiles over which the QINI curve should be constructed. Defaults to 5%-95% in intervals of 5%. metric: string, default 'qini' Which type of uplift curve to evaluate. Must be one of ['toc', 'qini'] n_bootstrap: integer, default 1000 Number of bootstrap samples to run when calculating uniform confidence bands. Returns ------- UpliftEvaluationResults object showing the fitted results """ if not hasattr(self, 'dr_val_'): raise Exception("Must fit nuisances before evaluating") if (not hasattr(self, 'cate_preds_train_')) or (not hasattr(self, 'cate_preds_val_')): if (Xval is None) or (Xtrain is None): raise Exception('CATE predictions not yet calculated - must provide both Xval, Xtrain') self.get_cate_preds(Xval, Xtrain) curve_data_dict = dict() if self.n_treat == 1: coeff, err, curve_df = calc_uplift( self.cate_preds_train_, self.cate_preds_val_, self.dr_val_, percentiles, metric, n_bootstrap ) coeffs = [coeff] errs = [err] curve_data_dict[self.treatments[1]] = curve_df else: coeffs = [] errs = [] for k in range(self.n_treat): coeff, err, curve_df = calc_uplift( self.cate_preds_train_[:, k], self.cate_preds_val_[:, k], self.dr_val_[:, k], percentiles, metric, n_bootstrap ) coeffs.append(coeff) errs.append(err) curve_data_dict[self.treatments[k + 1]] = curve_df pvals = [st.norm.sf(abs(q / e)) for q, e in zip(coeffs, errs)] self.uplift_res = UpliftEvaluationResults( params=coeffs, errs=errs, pvals=pvals, treatments=self.treatments, curve_data_dict=curve_data_dict ) return self.uplift_res
[docs] def evaluate_all( self, Xval: np.array = None, Xtrain: np.array = None, n_groups: int = 4, n_bootstrap: int = 1000 ) -> EvaluationResults: """ Implements the best linear prediction (`evaluate_blp`), calibration (`evaluate_cal`), uplift curve (`evaluate_uplift`) methods Parameters ---------- Xval: (n_cal x k) matrix, optional Validation sample features for CATE model. If not specified, then `fit_cate` method must already have been implemented Xtrain: (n_train x k) matrix, optional Training sample features for CATE model. If not specified, then `fit_cate` method must already have been implemented n_groups: integer, default 4 Number of quantile-based groups used to calculate calibration score. n_bootstrap: integer, default 1000 Number of bootstrap samples to run when calculating uniform confidence bands for uplift curves. Returns ------- EvaluationResults object summarizing the results of all tests """ if (not hasattr(self, 'cate_preds_train_')) or (not hasattr(self, 'cate_preds_val_')): if (Xval is None) or (Xtrain is None): raise Exception('CATE predictions not yet calculated - must provide both Xval, Xtrain') self.get_cate_preds(Xval, Xtrain) blp_res = self.evaluate_blp() cal_res = self.evaluate_cal(n_groups=n_groups) qini_res = self.evaluate_uplift(metric='qini', n_bootstrap=n_bootstrap) toc_res = self.evaluate_uplift(metric='toc', n_bootstrap=n_bootstrap) self.res = EvaluationResults( blp_res=blp_res, cal_res=cal_res, qini_res=qini_res, toc_res=toc_res ) return self.res
@deprecated("DRtester has been renamed 'DRTester' and the old name has been deprecated and will be removed " "in a future release. Please use 'DRTester' instead.") class DRtester(DRTester): pass