Source code for econml.inference._inference

# Copyright (c) PyWhy contributors. All rights reserved.
# Licensed under the MIT License.

import abc
from collections import OrderedDict
from warnings import warn

import numpy as np
import pandas as pd
import scipy
from scipy.stats import norm
from statsmodels.iolib.table import SimpleTable

from ._bootstrap import BootstrapEstimator
from ..sklearn_extensions.linear_model import StatsModelsLinearRegression
from ..utilities import (Summary, _safe_norm_ppf, broadcast_unit_treatments,
                         cross_product, inverse_onehot, ndim,
                         parse_final_model_params, jacify_featurizer,
                         reshape_treatmentwise_effects, shape, filter_none_kwargs)

"""Options for performing inference in estimators."""


class Inference(metaclass=abc.ABCMeta):
    def prefit(self, estimator, *args, **kwargs):
        """Performs any necessary logic before the estimator's fit has been called."""
        pass

    @abc.abstractmethod
    def fit(self, estimator, *args, **kwargs):
        """
        Fits the inference model.

        This is called after the estimator's fit.
        """
        raise NotImplementedError("Abstract method")

    def ate_interval(self, X=None, *, T0=0, T1=1, alpha=0.05):
        return self.effect_inference(X=X, T0=T0, T1=T1).population_summary(alpha=alpha).conf_int_mean()

    def ate_inference(self, X=None, *, T0=0, T1=1):
        return self.effect_inference(X=X, T0=T0, T1=T1).population_summary()

    def marginal_ate_interval(self, T, X=None, *, alpha=0.05):
        return self.marginal_effect_inference(T, X=X).population_summary(alpha=alpha).conf_int_mean()

    def marginal_ate_inference(self, T, X=None):
        return self.marginal_effect_inference(T, X=X).population_summary()

    def const_marginal_ate_interval(self, X=None, *, alpha=0.05):
        return self.const_marginal_effect_inference(X=X).population_summary(alpha=alpha).conf_int_mean()

    def const_marginal_ate_inference(self, X=None):
        return self.const_marginal_effect_inference(X=X).population_summary()


[docs]class BootstrapInference(Inference): """ Inference instance to perform bootstrapping. This class can be used for inference with any CATE estimator. Parameters ---------- n_bootstrap_samples : int, default 100 How many draws to perform. n_jobs: int, default -1 The maximum number of concurrently running jobs, as in joblib.Parallel. verbose: int, default: 0 Verbosity level bootstrap_type: 'percentile', 'pivot', or 'normal', default 'pivot' Bootstrap method used to compute results. 'percentile' will result in using the empiracal CDF of the replicated computations of the statistics. 'pivot' will also use the replicates but create a pivot interval that also relies on the estimate over the entire dataset. 'normal' will instead compute a pivot interval assuming the replicates are normally distributed. """
[docs] def __init__(self, n_bootstrap_samples=100, n_jobs=-1, bootstrap_type='pivot', verbose=0): self._n_bootstrap_samples = n_bootstrap_samples self._n_jobs = n_jobs self._bootstrap_type = bootstrap_type self._verbose = verbose
[docs] def fit(self, estimator, *args, **kwargs): est = BootstrapEstimator(estimator, self._n_bootstrap_samples, self._n_jobs, compute_means=False, bootstrap_type=self._bootstrap_type, verbose=self._verbose) filtered_kwargs = filter_none_kwargs(**kwargs) est.fit(*args, **filtered_kwargs) self._est = est self._d_t = estimator._d_t self._d_y = estimator._d_y self.d_t = self._d_t[0] if self._d_t else 1 self.d_y = self._d_y[0] if self._d_y else 1
def __getattr__(self, name): if name.startswith('__'): raise AttributeError() m = getattr(self._est, name) if name.endswith('_interval'): # convert alpha to lower/upper def wrapped(*args, alpha=0.05, **kwargs): return m(*args, lower=100 * alpha / 2, upper=100 * (1 - alpha / 2), **kwargs) return wrapped else: return m
[docs]class GenericModelFinalInference(Inference): """ Inference based on predict_interval of the model_final model. Assumes that estimator class has a model_final method, whose predict(cross_product(X, [0, ..., 1, ..., 0])) gives the const_marginal_effect of the treamtnent at the column with value 1 and which also supports prediction_stderr(X). """
[docs] def prefit(self, estimator, *args, **kwargs): self.model_final = estimator.model_final_ self.featurizer = estimator.featurizer_ if hasattr(estimator, 'featurizer_') else None
[docs] def fit(self, estimator, *args, **kwargs): # once the estimator has been fit, it's kosher to store d_t here # (which needs to have been expanded if there's a discrete treatment) self._est = estimator self._d_t = estimator._d_t self._d_y = estimator._d_y self.d_t = self._d_t[0] if self._d_t else 1 self.d_y = self._d_y[0] if self._d_y else 1
def const_marginal_effect_interval(self, X, *, alpha=0.05): return self.const_marginal_effect_inference(X).conf_int(alpha=alpha) def const_marginal_effect_inference(self, X): if X is None: X = np.ones((1, 1)) elif self.featurizer is not None: X = self.featurizer.transform(X) X, T = broadcast_unit_treatments(X, self.d_t) pred = reshape_treatmentwise_effects(self._predict(cross_product(X, T)), self._d_t, self._d_y) pred_stderr = None if hasattr(self.model_final, 'prediction_stderr'): pred_stderr = reshape_treatmentwise_effects(self._prediction_stderr(cross_product(X, T)), self._d_t, self._d_y) else: warn("Final model doesn't have a `prediction_stderr` method, " "only point estimates will be returned.") return NormalInferenceResults(d_t=self.d_t, d_y=self.d_y, pred=pred, pred_stderr=pred_stderr, mean_pred_stderr=None, inf_type='effect', feature_names=self._est.cate_feature_names(), output_names=self._est.cate_output_names(), treatment_names=self._est.cate_treatment_names()) def _predict(self, X): return self.model_final.predict(X) def _prediction_stderr(self, X): if not hasattr(self.model_final, 'prediction_stderr'): warn("Final model doesn't have a `prediction_stderr` method, " "only point estimates will be returned.") return None return self.model_final.prediction_stderr(X)
[docs]class GenericSingleTreatmentModelFinalInference(GenericModelFinalInference): """ Inference based on predict_interval of the model_final model. Assumes that treatment is single dimensional. Thus, the predict(X) of model_final gives the const_marginal_effect(X). The single dimensionality allows us to implement effect_interval(X, T0, T1) based on the const_marginal_effect_interval. """
[docs] def fit(self, estimator, *args, **kwargs): super().fit(estimator, *args, **kwargs) if len(self._d_t) > 1 and (self._d_t[0] > 1): raise AttributeError("This method only works for single-dimensional continuous treatment " "or binary categorical treatment")
def effect_interval(self, X, *, T0, T1, alpha=0.05): return self.effect_inference(X, T0=T0, T1=T1).conf_int(alpha=alpha) def effect_inference(self, X, *, T0, T1): # We can write effect inference as a function of const_marginal_effect_inference for a single treatment X, T0, T1 = self._est._expand_treatments(X, T0, T1) cme_pred = self.const_marginal_effect_inference(X).point_estimate cme_stderr = self.const_marginal_effect_inference(X).stderr dT = T1 - T0 einsum_str = 'myt,mt->my' if ndim(dT) == 1: einsum_str = einsum_str.replace('t', '') if ndim(cme_pred) == ndim(dT): # y is a vector, rather than a 2D array einsum_str = einsum_str.replace('y', '') e_pred = np.einsum(einsum_str, cme_pred, dT) e_stderr = np.einsum(einsum_str, cme_stderr, np.abs(dT)) if cme_stderr is not None else None d_y = self._d_y[0] if self._d_y else 1 # d_t=None here since we measure the effect across all Ts return NormalInferenceResults(d_t=None, d_y=d_y, pred=e_pred, pred_stderr=e_stderr, mean_pred_stderr=None, inf_type='effect', feature_names=self._est.cate_feature_names(), output_names=self._est.cate_output_names()) def marginal_effect_inference(self, T, X): X, T = self._est._expand_treatments(X, T, transform=False) cme_inf = self.const_marginal_effect_inference(X) if not self._est._original_treatment_featurizer: return cme_inf feat_T = self._est.transformer.transform(T) cme_pred = cme_inf.point_estimate cme_stderr = cme_inf.stderr jac_T = self._est.transformer.jac(T) einsum_str = 'myf, mtf->myt' if ndim(T) == 1: einsum_str = einsum_str.replace('t', '') if ndim(feat_T) == 1: einsum_str = einsum_str.replace('f', '') # y is a vector, rather than a 2D array if (ndim(cme_pred) == ndim(feat_T)): einsum_str = einsum_str.replace('y', '') e_pred = np.einsum(einsum_str, cme_pred, jac_T) e_stderr = np.einsum(einsum_str, cme_stderr, np.abs(jac_T)) if cme_stderr is not None else None d_y = self._d_y[0] if self._d_y else 1 d_t = self._d_t[0] if self._d_t else 1 d_t_orig = T.shape[1:][0] if T.shape[1:] else 1 return NormalInferenceResults(d_t=d_t_orig, d_y=d_y, pred=e_pred, pred_stderr=e_stderr, mean_pred_stderr=None, inf_type='effect', feature_names=self._est.cate_feature_names(), output_names=self._est.cate_output_names()) def marginal_effect_interval(self, T, X, *, alpha=0.05): return self.marginal_effect_inference(T, X).conf_int(alpha=alpha)
[docs]class LinearModelFinalInference(GenericModelFinalInference): """ Inference based on predict_interval of the model_final model. Assumes that estimator class has a model_final method and that model is linear. Thus, the predict(cross_product(X, T1 - T0)) gives the effect(X, T0, T1). This allows us to implement effect_interval(X, T0, T1) based on the predict_interval of model_final. """
[docs] def fit(self, estimator, *args, **kwargs): # once the estimator has been fit super().fit(estimator, *args, **kwargs) self._d_t_in = estimator._d_t_in self.bias_part_of_coef = estimator.bias_part_of_coef self.fit_cate_intercept = estimator.fit_cate_intercept
# replacing _predict of super to fend against misuse, when the user has used a final linear model with # an intercept even when bias is part of coef. def _predict(self, X): intercept = 0 if self.bias_part_of_coef: intercept = self.model_final.predict(np.zeros((1, X.shape[1]))) if np.any(np.abs(intercept) > 0): warn("The final model has a nonzero intercept for at least one outcome; " "it will be subtracted, but consider fitting a model without an intercept if possible. " "Standard errors will also be slightly incorrect if the final model used fits an intercept " "as they will be including the variance of the intercept parameter estimate.", UserWarning) return self.model_final.predict(X) - intercept def effect_interval(self, X, *, T0, T1, alpha=0.05): return self.effect_inference(X, T0=T0, T1=T1).conf_int(alpha=alpha) def effect_inference(self, X, *, T0, T1): # We can write effect inference as a function of prediction and prediction standard error of # the final method for linear models X, T0, T1 = self._est._expand_treatments(X, T0, T1) if X is None: X = np.ones((T0.shape[0], 1)) elif self.featurizer is not None: X = self.featurizer.transform(X) XT = cross_product(X, T1 - T0) e_pred = self._predict(XT) e_stderr = self._prediction_stderr(XT) d_y = self._d_y[0] if self._d_y else 1 mean_XT = XT.mean(axis=0, keepdims=True) mean_pred_stderr = self._prediction_stderr(mean_XT) # shape[0] will always be 1 here # squeeze the first axis mean_pred_stderr = np.squeeze(mean_pred_stderr, axis=0) if mean_pred_stderr is not None else None # d_t=None here since we measure the effect across all Ts return NormalInferenceResults(d_t=None, d_y=d_y, pred=e_pred, pred_stderr=e_stderr, mean_pred_stderr=mean_pred_stderr, inf_type='effect', feature_names=self._est.cate_feature_names(), output_names=self._est.cate_output_names()) def const_marginal_effect_inference(self, X): inf_res = super().const_marginal_effect_inference(X) # set the mean_pred_stderr if X is None: X = np.ones((1, 1)) elif self.featurizer is not None: X = self.featurizer.transform(X) X_mean, T_mean = broadcast_unit_treatments(X.mean(axis=0).reshape(1, -1), self.d_t) mean_XT = cross_product(X_mean, T_mean) mean_pred_stderr = self._prediction_stderr(mean_XT) if mean_pred_stderr is not None: mean_pred_stderr = reshape_treatmentwise_effects(mean_pred_stderr, self._d_t, self._d_y) # shape[0] will always be 1 here inf_res.mean_pred_stderr = np.squeeze(mean_pred_stderr, axis=0) return inf_res def marginal_effect_inference(self, T, X): X, T = self._est._expand_treatments(X, T, transform=False) if not self._est._original_treatment_featurizer: return self.const_marginal_effect_inference(X) if X is None: X = np.ones((T.shape[0], 1)) elif self.featurizer is not None: X = self.featurizer.transform(X) feat_T = self._est.transformer.transform(T) jac_T = self._est.transformer.jac(T) d_t_orig = T.shape[1:] d_t_orig = d_t_orig[0] if d_t_orig else 1 d_y = self._d_y[0] if self._d_y else 1 d_t = self._d_t[0] if self._d_t else 1 output_shape = [X.shape[0]] if self._d_y: output_shape.append(self._d_y[0]) if T.shape[1:]: output_shape.append(T.shape[1]) me_pred = np.zeros(shape=output_shape) me_stderr = np.zeros(shape=output_shape) mean_pred_stderr_res = np.zeros(shape=output_shape[1:]) for i in range(d_t_orig): # conditionally index multiple dimensions depending on shapes of T, Y and feat_T jac_index = [slice(None)] me_index = [slice(None)] if self._d_y: me_index.append(slice(None)) if T.shape[1:]: jac_index.append(i) me_index.append(i) if feat_T.shape[1:]: # if featurized T is not a vector jac_index.append(slice(None)) XT = cross_product(X, jac_T[tuple(jac_index)]) e_pred = self._predict(XT).reshape(X.shape[:1] + self._d_y) # enforce output shape e_stderr = self._prediction_stderr(XT).reshape(X.shape[:1] + self._d_y) mean_XT = XT.mean(axis=0, keepdims=True) mean_pred_stderr = self._prediction_stderr(mean_XT) # shape[0] will always be 1 here # squeeze the first axis mean_pred_stderr = np.squeeze(mean_pred_stderr, axis=0) if mean_pred_stderr is not None else None if mean_pred_stderr is not None: mean_pred_stderr_res[tuple(me_index[1:])] = mean_pred_stderr me_pred[tuple(me_index)] = e_pred me_stderr[tuple(me_index)] = e_stderr return NormalInferenceResults(d_t=d_t_orig, d_y=d_y, pred=me_pred, pred_stderr=me_stderr, mean_pred_stderr=mean_pred_stderr_res, inf_type='effect', feature_names=self._est.cate_feature_names(), output_names=self._est.cate_output_names()) def marginal_effect_interval(self, T, X, *, alpha=0.05): return self.marginal_effect_inference(T, X).conf_int(alpha=alpha) def coef__interval(self, *, alpha=0.05): lo, hi = self.model_final.coef__interval(alpha) lo_int, hi_int = self.model_final.intercept__interval(alpha) lo = parse_final_model_params(lo, lo_int, self._d_y, self._d_t, self._d_t_in, self.bias_part_of_coef, self.fit_cate_intercept)[0] hi = parse_final_model_params(hi, hi_int, self._d_y, self._d_t, self._d_t_in, self.bias_part_of_coef, self.fit_cate_intercept)[0] return lo, hi def coef__inference(self): coef = self.model_final.coef_ intercept = self.model_final.intercept_ coef = parse_final_model_params(coef, intercept, self._d_y, self._d_t, self._d_t_in, self.bias_part_of_coef, self.fit_cate_intercept)[0] if hasattr(self.model_final, 'coef_stderr_') and hasattr(self.model_final, 'intercept_stderr_'): coef_stderr = self.model_final.coef_stderr_ intercept_stderr = self.model_final.intercept_stderr_ coef_stderr = parse_final_model_params(coef_stderr, intercept_stderr, self._d_y, self._d_t, self._d_t_in, self.bias_part_of_coef, self.fit_cate_intercept)[0] else: warn("Final model doesn't have a `coef_stderr_` and `intercept_stderr_` attributes, " "only point estimates will be available.") coef_stderr = None if coef.size == 0: # X is None raise AttributeError("X is None, please call intercept_inference to learn the constant!") fname_transformer = None if hasattr(self._est, 'cate_feature_names') and callable(self._est.cate_feature_names): fname_transformer = self._est.cate_feature_names return NormalInferenceResults(d_t=self.d_t, d_y=self.d_y, pred=coef, pred_stderr=coef_stderr, mean_pred_stderr=None, inf_type='coefficient', fname_transformer=fname_transformer, feature_names=self._est.cate_feature_names(), output_names=self._est.cate_output_names(), treatment_names=self._est.cate_treatment_names()) def intercept__interval(self, *, alpha=0.05): if not self.fit_cate_intercept: raise AttributeError("No intercept was fitted!") lo, hi = self.model_final.coef__interval(alpha) lo_int, hi_int = self.model_final.intercept__interval(alpha) lo = parse_final_model_params(lo, lo_int, self._d_y, self._d_t, self._d_t_in, self.bias_part_of_coef, self.fit_cate_intercept)[1] hi = parse_final_model_params(hi, hi_int, self._d_y, self._d_t, self._d_t_in, self.bias_part_of_coef, self.fit_cate_intercept)[1] return lo, hi def intercept__inference(self): if not self.fit_cate_intercept: raise AttributeError("No intercept was fitted!") coef = self.model_final.coef_ intercept = self.model_final.intercept_ intercept = parse_final_model_params(coef, intercept, self._d_y, self._d_t, self._d_t_in, self.bias_part_of_coef, self.fit_cate_intercept)[1] if hasattr(self.model_final, 'coef_stderr_') and hasattr(self.model_final, 'intercept_stderr_'): coef_stderr = self.model_final.coef_stderr_ intercept_stderr = self.model_final.intercept_stderr_ intercept_stderr = parse_final_model_params(coef_stderr, intercept_stderr, self._d_y, self._d_t, self._d_t_in, self.bias_part_of_coef, self.fit_cate_intercept)[1] else: warn("Final model doesn't have a `coef_stderr_` and `intercept_stderr_` attributes, " "only point estimates will be available.") intercept_stderr = None return NormalInferenceResults(d_t=self.d_t, d_y=self.d_y, pred=intercept, pred_stderr=intercept_stderr, mean_pred_stderr=None, inf_type='intercept', feature_names=self._est.cate_feature_names(), output_names=self._est.cate_output_names(), treatment_names=self._est.cate_treatment_names())
[docs]class StatsModelsInference(LinearModelFinalInference): """Stores statsmodels covariance options. This class can be used for inference by the LinearDML. Parameters ---------- cov_type : str, default 'HC1' The type of covariance estimation method to use. Supported values are 'nonrobust', 'HC0', 'HC1'. """
[docs] def __init__(self, cov_type='HC1'): if cov_type not in ['nonrobust', 'HC0', 'HC1']: raise ValueError("Unsupported cov_type; " "must be one of 'nonrobust', " "'HC0', 'HC1'") self.cov_type = cov_type
[docs] def prefit(self, estimator, *args, **kwargs): super().prefit(estimator, *args, **kwargs) assert not (self.model_final.fit_intercept), ("Inference can only be performed on models linear in " "their features, but here fit_intercept is True") self.model_final.cov_type = self.cov_type
[docs]class GenericModelFinalInferenceDiscrete(Inference): """ Assumes estimator is fitted on categorical treatment and a separate generic model_final is used to fit the CATE associated with each treatment. This model_final supports predict_interval. Inference is based on predict_interval of the model_final model. """
[docs] def prefit(self, estimator, *args, **kwargs): self.model_final = estimator.model_final_ self.featurizer = estimator.featurizer_ if hasattr(estimator, 'featurizer_') else None
[docs] def fit(self, estimator, *args, **kwargs): # once the estimator has been fit, it's kosher to store d_t here # (which needs to have been expanded if there's a discrete treatment) self._est = estimator self._d_t = estimator._d_t self._d_y = estimator._d_y self.fitted_models_final = estimator.fitted_models_final self.d_t = self._d_t[0] if self._d_t else 1 self.d_y = self._d_y[0] if self._d_y else 1 if hasattr(estimator, 'fit_cate_intercept'): self.fit_cate_intercept = estimator.fit_cate_intercept
def const_marginal_effect_interval(self, X, *, alpha=0.05): return self.const_marginal_effect_inference(X).conf_int(alpha=alpha) def const_marginal_effect_inference(self, X): if (X is not None) and (self.featurizer is not None): X = self.featurizer.transform(X) pred = np.moveaxis(np.array([mdl.predict(X).reshape((-1,) + self._d_y) for mdl in self.fitted_models_final]), 0, -1) if hasattr(self.fitted_models_final[0], 'prediction_stderr'): # send treatment to the end, pull bounds to the front pred_stderr = np.moveaxis(np.array([mdl.prediction_stderr(X).reshape((-1,) + self._d_y) for mdl in self.fitted_models_final]), 0, -1) else: warn("Final model doesn't have a `prediction_stderr` method. " "Only point estimates will be available.") pred_stderr = None return NormalInferenceResults(d_t=self.d_t, d_y=self.d_y, pred=pred, pred_stderr=pred_stderr, mean_pred_stderr=None, inf_type='effect', feature_names=self._est.cate_feature_names(), output_names=self._est.cate_output_names(), treatment_names=self._est.cate_treatment_names()) def effect_interval(self, X, *, T0, T1, alpha=0.05): return self.effect_inference(X, T0=T0, T1=T1).conf_int(alpha=alpha) def effect_inference(self, X, *, T0, T1): X, T0, T1 = self._est._expand_treatments(X, T0, T1) if np.any(np.any(T0 > 0, axis=1)) or np.any(np.all(T1 == 0, axis=1)): raise AttributeError("Can only calculate inference of effects between a non-baseline treatment " "and the baseline treatment!") ind = inverse_onehot(T1) pred = self.const_marginal_effect_inference(X).point_estimate pred = np.concatenate([np.zeros(pred.shape[0:-1] + (1,)), pred], -1) pred_stderr = self.const_marginal_effect_inference(X).stderr if pred_stderr is not None: pred_stderr = np.concatenate([np.zeros(pred_stderr.shape[0:-1] + (1,)), pred_stderr], -1) if X is None: # Then const_marginal_effect_interval will return a single row pred = np.repeat(pred, T0.shape[0], axis=0) pred_stderr = np.repeat(pred_stderr, T0.shape[0], axis=0) if pred_stderr is not None else None pred = pred[np.arange(T0.shape[0]), ..., ind] pred_stderr = pred_stderr[np.arange(T0.shape[0]), ..., ind] if pred_stderr is not None else None # d_t=None here since we measure the effect across all Ts return NormalInferenceResults(d_t=None, d_y=self.d_y, pred=pred, pred_stderr=pred_stderr, mean_pred_stderr=None, inf_type='effect', feature_names=self._est.cate_feature_names(), output_names=self._est.cate_output_names())
[docs]class LinearModelFinalInferenceDiscrete(GenericModelFinalInferenceDiscrete): """ Inference method for estimators with categorical treatments, where a linear in X model is used for the CATE associated with each treatment. Implements the coef__interval and intercept__interval based on the corresponding methods of the underlying model_final estimator. """ def const_marginal_effect_inference(self, X): res_inf = super().const_marginal_effect_inference(X) # set the mean_pred_stderr if (X is not None) and (self.featurizer is not None): X = self.featurizer.transform(X) if hasattr(self.fitted_models_final[0], 'prediction_stderr'): mean_X = X.mean(axis=0).reshape(1, -1) if X is not None else None mean_pred_stderr = np.moveaxis(np.array([mdl.prediction_stderr(mean_X).reshape((-1,) + self._d_y) for mdl in self.fitted_models_final]), 0, -1) # shape[0] will always be 1 here res_inf.mean_pred_stderr = np.squeeze(mean_pred_stderr, axis=0) return res_inf def effect_inference(self, X, *, T0, T1): res_inf = super().effect_inference(X, T0=T0, T1=T1) # replace the mean_pred_stderr if T1 and T0 is a constant or a constant of vector _, _, T1 = self._est._expand_treatments(X, T0, T1) ind = inverse_onehot(T1) if len(set(ind)) == 1: unique_ind = ind[0] - 1 mean_pred_stderr = self.const_marginal_effect_inference(X).mean_pred_stderr[..., unique_ind] res_inf.mean_pred_stderr = mean_pred_stderr return res_inf def coef__interval(self, T, *, alpha=0.05): _, T = self._est._expand_treatments(None, T) ind = inverse_onehot(T).item() - 1 assert ind >= 0, "No model was fitted for the control" return self.fitted_models_final[ind].coef__interval(alpha) def coef__inference(self, T): _, T = self._est._expand_treatments(None, T) ind = inverse_onehot(T).item() - 1 assert ind >= 0, "No model was fitted for the control" coef = self.fitted_models_final[ind].coef_ if hasattr(self.fitted_models_final[ind], 'coef_stderr_'): coef_stderr = self.fitted_models_final[ind].coef_stderr_ else: warn("Final model doesn't have a `coef_stderr_` attribute. " "Only point estimates will be available.") coef_stderr = None if coef.size == 0: # X is None raise AttributeError("X is None, please call intercept_inference to learn the constant!") fname_transformer = None if hasattr(self._est, 'cate_feature_names') and callable(self._est.cate_feature_names): fname_transformer = self._est.cate_feature_names # d_t=None here since we measure the effect across all Ts return NormalInferenceResults(d_t=None, d_y=self.d_y, pred=coef, pred_stderr=coef_stderr, mean_pred_stderr=None, inf_type='coefficient', fname_transformer=fname_transformer, feature_names=self._est.cate_feature_names(), output_names=self._est.cate_output_names()) def intercept__interval(self, T, *, alpha=0.05): if not self.fit_cate_intercept: raise AttributeError("No intercept was fitted!") _, T = self._est._expand_treatments(None, T) ind = inverse_onehot(T).item() - 1 assert ind >= 0, "No model was fitted for the control" return self.fitted_models_final[ind].intercept__interval(alpha) def intercept__inference(self, T): if not self.fit_cate_intercept: raise AttributeError("No intercept was fitted!") _, T = self._est._expand_treatments(None, T) ind = inverse_onehot(T).item() - 1 assert ind >= 0, "No model was fitted for the control" if hasattr(self.fitted_models_final[ind], 'intercept_stderr_'): intercept_stderr = self.fitted_models_final[ind].intercept_stderr_ else: warn("Final model doesn't have a `intercept_stderr_` attribute. " "Only point estimates will be available.") intercept_stderr = None # d_t=None here since we measure the effect across all Ts return NormalInferenceResults(d_t=None, d_y=self.d_y, pred=self.fitted_models_final[ind].intercept_, pred_stderr=intercept_stderr, mean_pred_stderr=None, inf_type='intercept', feature_names=self._est.cate_feature_names(), output_names=self._est.cate_output_names())
[docs]class StatsModelsInferenceDiscrete(LinearModelFinalInferenceDiscrete): """ Special case where final model is a StatsModelsLinearRegression Parameters ---------- cov_type : str, default 'HC1' The type of covariance estimation method to use. Supported values are 'nonrobust', 'HC0', 'HC1'. """
[docs] def __init__(self, cov_type='HC1'): if cov_type not in ['nonrobust', 'HC0', 'HC1']: raise ValueError("Unsupported cov_type; " "must be one of 'nonrobust', " "'HC0', 'HC1'") self.cov_type = cov_type
[docs] def prefit(self, estimator, *args, **kwargs): super().prefit(estimator, *args, **kwargs) # need to set the fit args before the estimator is fit self.model_final.cov_type = self.cov_type
class InferenceResults(metaclass=abc.ABCMeta): """ Results class for inferences. Parameters ---------- d_t: int or None Number of treatments d_y: int Number of outputs pred : array_like, shape (m, d_y, d_t) or (m, d_y) The prediction of the metric for each sample X[i]. Note that when Y or T is a vector rather than a 2-dimensional array, the corresponding singleton dimensions should be collapsed (e.g. if both are vectors, then the input of this argument will also be a vector) inf_type: str The type of inference result. It could be either 'effect', 'coefficient' or 'intercept'. fname_transformer: None or predefined function The transform function to get the corresponding feature names from featurizer """ def __init__(self, d_t, d_y, pred, inf_type, fname_transformer=None, feature_names=None, output_names=None, treatment_names=None): self.d_t = d_t # For effect summaries, d_t is None, but the result arrays behave as if d_t=1 self._d_t = d_t or 1 self.d_y = d_y self.pred = np.copy(pred) if pred is not None and not np.isscalar(pred) else pred self.inf_type = inf_type self.fname_transformer = fname_transformer self.feature_names = feature_names self.output_names = output_names self.treatment_names = treatment_names @property def point_estimate(self): """ Get the point estimate of each treatment on each outcome for each sample X[i]. Returns ------- prediction : array_like, shape (m, d_y, d_t) or (m, d_y) The point estimate of each treatment on each outcome for each sample X[i]. Note that when Y or T is a vector rather than a 2-dimensional array, the corresponding singleton dimensions in the output will be collapsed (e.g. if both are vectors, then the output of this method will also be a vector) """ return self.pred @property @abc.abstractmethod def stderr(self): """ Get the standard error of the metric of each treatment on each outcome for each sample X[i]. Returns ------- stderr : array_like, shape (m, d_y, d_t) or (m, d_y) The standard error of the metric of each treatment on each outcome for each sample X[i]. Note that when Y or T is a vector rather than a 2-dimensional array, the corresponding singleton dimensions in the output will be collapsed (e.g. if both are vectors, then the output of this method will also be a vector) """ raise NotImplementedError("Abstract method") @property def var(self): """ Get the variance of the metric of each treatment on each outcome for each sample X[i]. Returns ------- var : array_like, shape (m, d_y, d_t) or (m, d_y) The variance of the metric of each treatment on each outcome for each sample X[i]. Note that when Y or T is a vector rather than a 2-dimensional array, the corresponding singleton dimensions in the output will be collapsed (e.g. if both are vectors, then the output of this method will also be a vector) """ if self.stderr is not None: return self.stderr**2 return None @abc.abstractmethod def conf_int(self, alpha=0.05): """ Get the confidence interval of the metric of each treatment on each outcome for each sample X[i]. Parameters ---------- alpha: float in [0, 1], default 0.05 The overall level of confidence of the reported interval. The alpha/2, 1-alpha/2 confidence interval is reported. Returns ------- lower, upper: tuple of array, shape (m, d_y, d_t) or (m, d_y) The lower and the upper bounds of the confidence interval for each quantity. Note that when Y or T is a vector rather than a 2-dimensional array, the corresponding singleton dimensions in the output will be collapsed (e.g. if both are vectors, then the output of this method will also be a vector) """ raise NotImplementedError("Abstract method") @abc.abstractmethod def pvalue(self, value=0): """ Get the p value of the z test of each treatment on each outcome for each sample X[i]. Parameters ---------- value: float, default 0 The mean value of the metric you'd like to test under null hypothesis. Returns ------- pvalue : array_like, shape (m, d_y, d_t) or (m, d_y) The p value of the z test of each treatment on each outcome for each sample X[i]. Note that when Y or T is a vector rather than a 2-dimensional array, the corresponding singleton dimensions in the output will be collapsed (e.g. if both are vectors, then the output of this method will also be a vector) """ raise NotImplementedError("Abstract method") def zstat(self, value=0): """ Get the z statistic of the metric of each treatment on each outcome for each sample X[i]. Parameters ---------- value: float, default 0 The mean value of the metric you'd like to test under null hypothesis. Returns ------- zstat : array_like, shape (m, d_y, d_t) or (m, d_y) The z statistic of the metric of each treatment on each outcome for each sample X[i]. Note that when Y or T is a vector rather than a 2-dimensional array, the corresponding singleton dimensions in the output will be collapsed (e.g. if both are vectors, then the output of this method will also be a vector) """ if self.stderr is None: raise AttributeError("Only point estimates are available!") return (self.point_estimate - value) / self.stderr def summary_frame(self, alpha=0.05, value=0, decimals=3, feature_names=None, output_names=None, treatment_names=None): """ Output the dataframe for all the inferences above. Parameters ---------- alpha: float in [0, 1], default 0.05 The overall level of confidence of the reported interval. The alpha/2, 1-alpha/2 confidence interval is reported. value: float, default 0 The mean value of the metric you'd like to test under null hypothesis. decimals: int, default 3 Number of decimal places to round each column to. feature_names: list of str, optional The names of the features X output_names: list of str, optional The names of the outputs treatment_names: list of str, optional The names of the treatments Returns ------- output: DataFrame The output dataframe includes point estimate, standard error, z score, p value and confidence intervals of the estimated metric of each treatment on each outcome for each sample X[i] """ treatment_names = self.treatment_names if treatment_names is None else treatment_names output_names = self.output_names if output_names is None else output_names to_include = OrderedDict() to_include['point_estimate'] = self._reshape_array(self.point_estimate) # get the length of X when it's effect, or length of coefficient/intercept when it's coefficient/intercpet # to_include['point_estimate'] is a flatten vector with length d_t*d_y*nx nx = to_include['point_estimate'].shape[0] // self._d_t // self.d_y if self.stderr is not None: ci_mean = self.conf_int(alpha=alpha) to_include['stderr'] = self._reshape_array(self.stderr) to_include['zstat'] = self._reshape_array(self.zstat(value)) to_include['pvalue'] = self._reshape_array(self.pvalue(value)) to_include['ci_lower'] = self._reshape_array(ci_mean[0]) to_include['ci_upper'] = self._reshape_array(ci_mean[1]) if output_names is None: output_names = ['Y' + str(i) for i in range(self.d_y)] assert len(output_names) == self.d_y, "Incompatible length of output names" if treatment_names is None: treatment_names = ['T' + str(i) for i in range(self._d_t)] names = ['X', 'Y', 'T'] if self.d_t: assert len(treatment_names) == self._d_t, "Incompatible length of treatment names" index = pd.MultiIndex.from_product([range(nx), output_names, treatment_names], names=names) else: index = pd.MultiIndex.from_product([range(nx), output_names, [treatment_names[0]]], names=names) res = pd.DataFrame(to_include, index=index).round(decimals) if self.inf_type == 'coefficient': if feature_names is not None: if self.fname_transformer is not None: feature_names = self.fname_transformer(feature_names) else: feature_names = self.feature_names if feature_names is not None: ind = feature_names else: ind = ['X' + str(i) for i in range(nx)] res.index = res.index.set_levels(ind, level="X") elif self.inf_type == 'intercept': res.index = res.index.set_levels(['cate_intercept'], level="X") elif self.inf_type == 'ate': res.index = res.index.set_levels(['ATE'], level="X") elif self.inf_type == 'att': res.index = res.index.set_levels(['ATT'], level="X") if self._d_t == 1: res.index = res.index.droplevel("T") if self.d_y == 1: res.index = res.index.droplevel("Y") return res def population_summary(self, alpha=0.05, value=0, decimals=3, tol=0.001, output_names=None, treatment_names=None): """ Output the object of population summary results. Parameters ---------- alpha: float in [0, 1], default 0.05 The overall level of confidence of the reported interval. The alpha/2, 1-alpha/2 confidence interval is reported. value: float, default 0 The mean value of the metric you'd like to test under null hypothesis. decimals: int, default 3 Number of decimal places to round each column to. tol: float, default 0.001 The stopping criterion. The iterations will stop when the outcome is less than ``tol`` output_names: list of str, optional The names of the outputs treatment_names: list of str, optional The names of the treatments Returns ------- PopulationSummaryResults: object The population summary results instance contains the different summary analysis of point estimate for sample X on each treatment and outcome. """ treatment_names = self.treatment_names if treatment_names is None else treatment_names output_names = self.output_names if output_names is None else output_names if self.inf_type == 'effect': return PopulationSummaryResults(pred=self.point_estimate, pred_stderr=self.stderr, mean_pred_stderr=None, d_t=self.d_t, d_y=self.d_y, alpha=alpha, value=value, decimals=decimals, tol=tol, output_names=output_names, treatment_names=treatment_names) else: raise AttributeError(self.inf_type + " inference doesn't support population_summary function!") def _reshape_array(self, arr): if np.isscalar(arr): arr = np.array([arr]) if self.inf_type == 'coefficient': arr = np.moveaxis(arr, -1, 0) arr = arr.flatten() return arr @abc.abstractmethod def _expand_outputs(self, n_rows): """ Expand the inference results from 1 row to n_rows identical rows. This is used internally when we move from constant effects when X is None to a marginal effect of a different dimension. Parameters ---------- n_rows: positive int The number of rows to expand to Returns ------- results: InferenceResults The expanded results """ raise NotImplementedError("Abstract method") def translate(self, offset): """ Update the results in place by translating by an offset. Parameters ---------- offset: array_like The offset by which to translate these results """ # Use broadcast to ensure that the shape of pred isn't being changed due to broadcasting the other direction offset = np.broadcast_to(np.asarray(offset), np.shape(self.pred)) self.pred = self.pred + offset @abc.abstractmethod def scale(self, factor): """ Update the results in place by scaling by a factor. Parameters ---------- factor: array_like The factor by which to scale these results """ # Use broadcast to ensure that the shape of pred isn't being changed due to broadcasting the other direction factor = np.broadcast_to(np.asarray(factor), np.shape(self.pred)) self.pred = self.pred * np.asarray(factor)
[docs]class NormalInferenceResults(InferenceResults): """ Results class for inference assuming a normal distribution. Parameters ---------- d_t: int or None Number of treatments d_y: int Number of outputs pred : array_like, shape (m, d_y, d_t) or (m, d_y) The prediction of the metric for each sample X[i]. Note that when Y or T is a vector rather than a 2-dimensional array, the corresponding singleton dimensions should be collapsed (e.g. if both are vectors, then the input of this argument will also be a vector) pred_stderr : array_like, shape (m, d_y, d_t) or (m, d_y) The prediction standard error of the metric for each sample X[i]. Note that when Y or T is a vector rather than a 2-dimensional array, the corresponding singleton dimensions should be collapsed (e.g. if both are vectors, then the input of this argument will also be a vector) mean_pred_stderr: None or array_like or scaler, shape (d_y, d_t) or (d_y,) The standard error of the mean point estimate, this is derived from coefficient stderr when final stage is linear model, otherwise it's None. This is the exact standard error of the mean, which is not conservative. inf_type: str The type of inference result. It could be either 'effect', 'coefficient' or 'intercept'. fname_transformer: None or predefined function The transform function to get the corresponding feature names from featurizer """
[docs] def __init__(self, d_t, d_y, pred, pred_stderr, mean_pred_stderr, inf_type, fname_transformer=None, feature_names=None, output_names=None, treatment_names=None): self.pred_stderr = np.copy(pred_stderr) if pred_stderr is not None and not np.isscalar( pred_stderr) else pred_stderr self.mean_pred_stderr = mean_pred_stderr super().__init__(d_t, d_y, pred, inf_type, fname_transformer, feature_names, output_names, treatment_names)
@property def stderr(self): """ Get the standard error of the metric of each treatment on each outcome for each sample X[i]. Returns ------- stderr : array_like, shape (m, d_y, d_t) or (m, d_y) The standard error of the metric of each treatment on each outcome for each sample X[i]. Note that when Y or T is a vector rather than a 2-dimensional array, the corresponding singleton dimensions in the output will be collapsed (e.g. if both are vectors, then the output of this method will also be a vector) """ return self.pred_stderr
[docs] def conf_int(self, alpha=0.05): """ Get the confidence interval of the metric of each treatment on each outcome for each sample X[i]. Parameters ---------- alpha: float in [0, 1], default 0.05 The overall level of confidence of the reported interval. The alpha/2, 1-alpha/2 confidence interval is reported. Returns ------- lower, upper: tuple of array, shape (m, d_y, d_t) or (m, d_y) The lower and the upper bounds of the confidence interval for each quantity. Note that when Y or T is a vector rather than a 2-dimensional array, the corresponding singleton dimensions in the output will be collapsed (e.g. if both are vectors, then the output of this method will also be a vector) """ if self.stderr is None: raise AttributeError("Only point estimates are available!") if np.isscalar(self.point_estimate): return _safe_norm_ppf(alpha / 2, loc=self.point_estimate, scale=self.stderr), \ _safe_norm_ppf(1 - alpha / 2, loc=self.point_estimate, scale=self.stderr) else: return np.array([_safe_norm_ppf(alpha / 2, loc=p, scale=err) for p, err in zip(self.point_estimate, self.stderr)]), \ np.array([_safe_norm_ppf(1 - alpha / 2, loc=p, scale=err) for p, err in zip(self.point_estimate, self.stderr)])
[docs] def pvalue(self, value=0): """ Get the p value of the z test of each treatment on each outcome for each sample X[i]. Parameters ---------- value: float, default 0 The mean value of the metric you'd like to test under null hypothesis. Returns ------- pvalue : array_like, shape (m, d_y, d_t) or (m, d_y) The p value of the z test of each treatment on each outcome for each sample X[i]. Note that when Y or T is a vector rather than a 2-dimensional array, the corresponding singleton dimensions in the output will be collapsed (e.g. if both are vectors, then the output of this method will also be a vector) """ return norm.sf(np.abs(self.zstat(value)), loc=0, scale=1) * 2
[docs] def population_summary(self, alpha=0.05, value=0, decimals=3, tol=0.001, output_names=None, treatment_names=None): pop_summ = super().population_summary(alpha=alpha, value=value, decimals=decimals, tol=tol, output_names=output_names, treatment_names=treatment_names) pop_summ.mean_pred_stderr = self.mean_pred_stderr return pop_summ
population_summary.__doc__ = InferenceResults.population_summary.__doc__ def _expand_outputs(self, n_rows): assert shape(self.pred)[0] == shape(self.pred_stderr)[0] == 1 pred = np.repeat(self.pred, n_rows, axis=0) pred_stderr = np.repeat(self.pred_stderr, n_rows, axis=0) if self.pred_stderr is not None else None return NormalInferenceResults(self.d_t, self.d_y, pred, pred_stderr, self.mean_pred_stderr, self.inf_type, self.fname_transformer, self.feature_names, self.output_names, self.treatment_names)
[docs] def scale(self, factor): # scale preds super().scale(factor) # scale std errs factor = np.broadcast_to(np.asarray(factor), np.shape(self.pred)) if self.pred_stderr is not None: self.pred_stderr = self.pred_stderr * np.abs(factor) if self.mean_pred_stderr is not None: self.mean_pred_stderr = self.mean_pred_stderr * np.abs(factor)
scale.__doc__ = InferenceResults.scale.__doc__
[docs]class EmpiricalInferenceResults(InferenceResults): """ Results class for inference with an empirical set of samples. Parameters ---------- pred : array_like, shape (m, d_y, d_t) or (m, d_y) the point estimates of the metric using the full sample pred_dist : array_like, shape (b, m, d_y, d_t) or (b, m, d_y) the raw predictions of the metric sampled b times. Note that when Y or T is a vector rather than a 2-dimensional array, the corresponding singleton dimensions should be collapsed d_t: int or None Number of treatments d_y: int Number of outputs inf_type: str The type of inference result. It could be either 'effect', 'coefficient' or 'intercept'. fname_transformer: None or predefined function The transform function to get the corresponding feature names from featurizer """
[docs] def __init__(self, d_t, d_y, pred, pred_dist, inf_type, fname_transformer=None, feature_names=None, output_names=None, treatment_names=None): self.pred_dist = np.copy(pred_dist) if pred_dist is not None and not np.isscalar(pred_dist) else pred_dist super().__init__(d_t, d_y, pred, inf_type, fname_transformer, feature_names, output_names, treatment_names)
@property def stderr(self): """ Get the standard error of the metric of each treatment on each outcome for each sample X[i]. Returns ------- stderr : array_like, shape (m, d_y, d_t) or (m, d_y) The standard error of the metric of each treatment on each outcome for each sample X[i]. Note that when Y or T is a vector rather than a 2-dimensional array, the corresponding singleton dimensions in the output will be collapsed (e.g. if both are vectors, then the output of this method will also be a vector) """ return np.std(self.pred_dist, axis=0)
[docs] def conf_int(self, alpha=0.05): """ Get the confidence interval of the metric of each treatment on each outcome for each sample X[i]. Parameters ---------- alpha: float in [0, 1], default 0.05 The overall level of confidence of the reported interval. The alpha/2, 1-alpha/2 confidence interval is reported. Returns ------- lower, upper: tuple of array, shape (m, d_y, d_t) or (m, d_y) The lower and the upper bounds of the confidence interval for each quantity. Note that when Y or T is a vector rather than a 2-dimensional array, the corresponding singleton dimensions in the output will be collapsed (e.g. if both are vectors, then the output of this method will also be a vector) """ lower = alpha / 2 upper = 1 - alpha / 2 return np.percentile(self.pred_dist, lower * 100, axis=0), np.percentile(self.pred_dist, upper * 100, axis=0)
[docs] def pvalue(self, value=0): """ Get the p value of the each treatment on each outcome for each sample X[i]. Parameters ---------- value: float, default 0 The mean value of the metric you'd like to test under null hypothesis. Returns ------- pvalue : array_like, shape (m, d_y, d_t) or (m, d_y) The p value of of each treatment on each outcome for each sample X[i]. Note that when Y or T is a vector rather than a 2-dimensional array, the corresponding singleton dimensions in the output will be collapsed (e.g. if both are vectors, then the output of this method will also be a vector) """ pvalues = np.minimum((self.pred_dist <= value).sum(axis=0), (self.pred_dist >= value).sum(axis=0)) / self.pred_dist.shape[0] # in the degenerate case where every point in the distribution is equal to the value tested, return nan return np.where(np.all(self.pred_dist == value, axis=0), np.nan, pvalues)
def _expand_outputs(self, n_rows): assert shape(self.pred)[0] == shape(self.pred_dist)[1] == 1 pred = np.repeat(self.pred, n_rows, axis=0) pred_dist = np.repeat(self.pred_dist, n_rows, axis=1) return EmpiricalInferenceResults(self.d_t, self.d_y, pred, pred_dist, self.inf_type, self.fname_transformer, self.feature_names, self.output_names, self.treatment_names)
[docs] def translate(self, other): # offset preds super().translate(other) # offset the distribution, too other = np.broadcast_to(np.asarray(other), np.shape(self.pred_dist)) self.pred_dist = self.pred_dist + other
translate.__doc__ = InferenceResults.translate.__doc__
[docs] def scale(self, factor): # scale preds super().scale(factor) # scale the distribution, too factor = np.broadcast_to(np.asarray(factor), np.shape(self.pred_dist)) self.pred_dist = self.pred_dist * factor
scale.__doc__ = InferenceResults.scale.__doc__
[docs]class PopulationSummaryResults: """ Population summary results class for inferences. Parameters ---------- d_t: int or None Number of treatments d_y: int Number of outputs pred : array_like, shape (m, d_y, d_t) or (m, d_y) The prediction of the metric for each sample X[i]. Note that when Y or T is a vector rather than a 2-dimensional array, the corresponding singleton dimensions should be collapsed (e.g. if both are vectors, then the input of this argument will also be a vector) pred_stderr : array_like, shape (m, d_y, d_t) or (m, d_y) The prediction standard error of the metric for each sample X[i]. Note that when Y or T is a vector rather than a 2-dimensional array, the corresponding singleton dimensions should be collapsed (e.g. if both are vectors, then the input of this argument will also be a vector) mean_pred_stderr: None or array_like or scalar, shape (d_y, d_t) or (d_y,) The standard error of the mean point estimate, this is derived from coefficient stderr when final stage is linear model, otherwise it's None. This is the exact standard error of the mean, which is not conservative. alpha: ffloat in [0, 1], default 0.05 The overall level of confidence of the reported interval. The alpha/2, 1-alpha/2 confidence interval is reported. value: float, default 0 The mean value of the metric you'd like to test under null hypothesis. decimals: int, default 3 Number of decimal places to round each column to. tol: float, default 0.001 The stopping criterion. The iterations will stop when the outcome is less than ``tol`` output_names: list of str, optional The names of the outputs treatment_names: list of str, optional The names of the treatments """
[docs] def __init__(self, pred, pred_stderr, mean_pred_stderr, d_t, d_y, alpha=0.05, value=0, decimals=3, tol=0.001, output_names=None, treatment_names=None): self.pred = pred self.pred_stderr = pred_stderr self.mean_pred_stderr = mean_pred_stderr self.d_t = d_t # For effect summaries, d_t is None, but the result arrays behave as if d_t=1 self._d_t = d_t or 1 self.d_y = d_y self.alpha = alpha self.value = value self.decimals = decimals self.tol = tol self.output_names = output_names self.treatment_names = treatment_names
def __str__(self): return self._print().as_text() def _repr_html_(self): '''Display as HTML in IPython notebook.''' return self._print().as_html() @property def mean_point(self): """ Get the mean of the point estimate of each treatment on each outcome for sample X. Returns ------- mean_point : array_like, shape (d_y, d_t) The point estimate of each treatment on each outcome for sample X. Note that when Y or T is a vector rather than a 2-dimensional array, the corresponding singleton dimensions in the output will be collapsed (e.g. if both are vectors, then the output of this method will be a scalar) """ return np.mean(self.pred, axis=0) @property def stderr_mean(self): """ Get the standard error of the mean point estimate of each treatment on each outcome for sample X. The output is a conservative upper bound. Returns ------- stderr_mean : array_like, shape (d_y, d_t) The standard error of the mean point estimate of each treatment on each outcome for sample X. Note that when Y or T is a vector rather than a 2-dimensional array, the corresponding singleton dimensions in the output will be collapsed (e.g. if both are vectors, then the output of this method will be a scalar) """ if self.mean_pred_stderr is not None: return self.mean_pred_stderr elif self.pred_stderr is None: raise AttributeError("Only point estimates are available!") return np.sqrt(np.mean(self.pred_stderr**2, axis=0))
[docs] def zstat(self, *, value=None): """ Get the z statistic of the mean point estimate of each treatment on each outcome for sample X. Parameters ---------- value: float, optional The mean value of the metric you'd like to test under null hypothesis. Returns ------- zstat : array_like, shape (d_y, d_t) The z statistic of the mean point estimate of each treatment on each outcome for sample X. Note that when Y or T is a vector rather than a 2-dimensional array, the corresponding singleton dimensions in the output will be collapsed (e.g. if both are vectors, then the output of this method will be a scalar) """ value = self.value if value is None else value zstat = (self.mean_point - value) / self.stderr_mean return zstat
[docs] def pvalue(self, *, value=None): """ Get the p value of the z test of each treatment on each outcome for sample X. Parameters ---------- value: float, optional The mean value of the metric you'd like to test under null hypothesis. Returns ------- pvalue : array_like, shape (d_y, d_t) The p value of the z test of each treatment on each outcome for sample X. Note that when Y or T is a vector rather than a 2-dimensional array, the corresponding singleton dimensions in the output will be collapsed (e.g. if both are vectors, then the output of this method will be a scalar) """ value = self.value if value is None else value pvalue = norm.sf(np.abs(self.zstat(value=value)), loc=0, scale=1) * 2 return pvalue
[docs] def conf_int_mean(self, *, alpha=None): """ Get the confidence interval of the mean point estimate of each treatment on each outcome for sample X. Parameters ---------- alpha: float in [0, 1], optional The overall level of confidence of the reported interval. The alpha/2, 1-alpha/2 confidence interval is reported. Returns ------- lower, upper: tuple of array, shape (d_y, d_t) The lower and the upper bounds of the confidence interval for each quantity. Note that when Y or T is a vector rather than a 2-dimensional array, the corresponding singleton dimensions in the output will be collapsed (e.g. if both are vectors, then the output of this method will also be a vector) """ alpha = self.alpha if alpha is None else alpha mean_point = self.mean_point stderr_mean = self.stderr_mean if np.isscalar(mean_point): return (_safe_norm_ppf(alpha / 2, loc=mean_point, scale=stderr_mean), _safe_norm_ppf(1 - alpha / 2, loc=mean_point, scale=stderr_mean)) else: return np.array([_safe_norm_ppf(alpha / 2, loc=p, scale=err) for p, err in zip(mean_point, stderr_mean)]), \ np.array([_safe_norm_ppf(1 - alpha / 2, loc=p, scale=err) for p, err in zip(mean_point, stderr_mean)])
@property def std_point(self): """ Get the standard deviation of the point estimate of each treatment on each outcome for sample X. Returns ------- std_point : array_like, shape (d_y, d_t) The standard deviation of the point estimate of each treatment on each outcome for sample X. Note that when Y or T is a vector rather than a 2-dimensional array, the corresponding singleton dimensions in the output will be collapsed (e.g. if both are vectors, then the output of this method will be a scalar) """ return np.std(self.pred, axis=0)
[docs] def percentile_point(self, *, alpha=None): """ Get the confidence interval of the point estimate of each treatment on each outcome for sample X. Parameters ---------- alpha: float in [0, 1], default 0.05 The overall level of confidence of the reported interval. The alpha/2, 1-alpha/2 confidence interval is reported. Returns ------- lower, upper: tuple of array, shape (d_y, d_t) The lower and the upper bounds of the confidence interval for each quantity. Note that when Y or T is a vector rather than a 2-dimensional array, the corresponding singleton dimensions in the output will be collapsed (e.g. if both are vectors, then the output of this method will also be a vector) """ alpha = self.alpha if alpha is None else alpha lower_percentile_point = np.percentile(self.pred, (alpha / 2) * 100, axis=0) upper_percentile_point = np.percentile(self.pred, (1 - alpha / 2) * 100, axis=0) return lower_percentile_point, upper_percentile_point
[docs] def conf_int_point(self, *, alpha=None, tol=None): """ Get the confidence interval of the point estimate of each treatment on each outcome for sample X. Parameters ---------- alpha: float in [0, 1], optional The overall level of confidence of the reported interval. The alpha/2, 1-alpha/2 confidence interval is reported. tol: optinal float The stopping criterion. The iterations will stop when the outcome is less than ``tol`` Returns ------- lower, upper: tuple of array, shape (d_y, d_t) The lower and the upper bounds of the confidence interval for each quantity. Note that when Y or T is a vector rather than a 2-dimensional array, the corresponding singleton dimensions in the output will be collapsed (e.g. if both are vectors, then the output of this method will also be a vector) """ if self.pred_stderr is None: raise AttributeError("Only point estimates are available!") alpha = self.alpha if alpha is None else alpha tol = self.tol if tol is None else tol lower_ci_point = np.array([self._mixture_ppf(alpha / 2, self.pred, self.pred_stderr, tol)]) upper_ci_point = np.array([self._mixture_ppf(1 - alpha / 2, self.pred, self.pred_stderr, tol)]) return lower_ci_point, upper_ci_point
@property def stderr_point(self): """ Get the standard error of the point estimate of each treatment on each outcome for sample X. Returns ------- stderr_point : array_like, shape (d_y, d_t) The standard error of the point estimate of each treatment on each outcome for sample X. Note that when Y or T is a vector rather than a 2-dimensional array, the corresponding singleton dimensions in the output will be collapsed (e.g. if both are vectors, then the output of this method will be a scalar) """ return np.sqrt(self.stderr_mean**2 + self.std_point**2)
[docs] def summary(self, alpha=None, value=None, decimals=None, tol=None, output_names=None, treatment_names=None): """ Output the summary inferences above. Parameters ---------- alpha: float in [0, 1], optional The overall level of confidence of the reported interval. The alpha/2, 1-alpha/2 confidence interval is reported. value: float, optional The mean value of the metric you'd like to test under null hypothesis. decimals: int, optional Number of decimal places to round each column to. tol: float, optional The stopping criterion. The iterations will stop when the outcome is less than ``tol`` output_names: list of str, optional The names of the outputs treatment_names: list of str, optional The names of the treatments Returns ------- smry : Summary instance this holds the summary tables and text, which can be printed or converted to various output formats. """ return self._print(alpha=alpha, value=value, decimals=decimals, tol=tol, output_names=output_names, treatment_names=treatment_names)
def _print(self, *, alpha=None, value=None, decimals=None, tol=None, output_names=None, treatment_names=None): """ Helper function to be used by both `summary` and `__repr__`, in the former case with passed attributes in the latter case with None inputs, hence using the `__init__` params. """ alpha = self.alpha if alpha is None else alpha value = self.value if value is None else value decimals = self.decimals if decimals is None else decimals tol = self.tol if tol is None else tol treatment_names = self.treatment_names if treatment_names is None else treatment_names output_names = self.output_names if output_names is None else output_names # 1. Uncertainty of Mean Point Estimate res1 = self._format_res(self.mean_point, decimals) if self.pred_stderr is not None: res1 = np.hstack((res1, self._format_res(self.stderr_mean, decimals), self._format_res(self.zstat(value=value), decimals), self._format_res(self.pvalue(value=value), decimals), self._format_res(self.conf_int_mean(alpha=alpha)[0], decimals), self._format_res(self.conf_int_mean(alpha=alpha)[1], decimals))) if treatment_names is None: treatment_names = ['T' + str(i) for i in range(self._d_t)] if output_names is None: output_names = ['Y' + str(i) for i in range(self.d_y)] myheaders1 = ['mean_point', 'stderr_mean', 'zstat', 'pvalue', 'ci_mean_lower', 'ci_mean_upper'] mystubs = self._get_stub_names(self.d_y, self._d_t, treatment_names, output_names) title1 = "Uncertainty of Mean Point Estimate" # 2. Distribution of Point Estimate res2 = np.hstack((self._format_res(self.std_point, decimals), self._format_res(self.percentile_point(alpha=alpha)[0], decimals), self._format_res(self.percentile_point(alpha=alpha)[1], decimals))) myheaders2 = ['std_point', 'pct_point_lower', 'pct_point_upper'] title2 = "Distribution of Point Estimate" smry = Summary() smry.add_table(res1, myheaders1, mystubs, title1) if self.pred_stderr is not None and self.mean_pred_stderr is None: text1 = "Note: The stderr_mean is a conservative upper bound." smry.add_extra_txt([text1]) smry.add_table(res2, myheaders2, mystubs, title2) if self.pred_stderr is not None: # 3. Total Variance of Point Estimate res3 = np.hstack((self._format_res(self.stderr_point, self.decimals), self._format_res(self.conf_int_point(alpha=alpha, tol=tol)[0], self.decimals), self._format_res(self.conf_int_point(alpha=alpha, tol=tol)[1], self.decimals))) myheaders3 = ['stderr_point', 'ci_point_lower', 'ci_point_upper'] title3 = "Total Variance of Point Estimate" smry.add_table(res3, myheaders3, mystubs, title3) return smry def _mixture_ppf(self, alpha, mean, stderr, tol): """ Helper function to get the confidence interval of mixture gaussian distribution """ # if stderr is zero, ppf will return nans and the loop below would never terminate # so bail out early; note that it might be possible to correct the algorithm for # this scenario, but since scipy's cdf returns nan whenever scale is zero it won't # be clean if (np.any(stderr == 0)): return np.full(shape(mean)[1:], np.nan) mix_ppf = scipy.stats.norm.ppf(alpha, loc=mean, scale=stderr) lower = np.min(mix_ppf, axis=0) upper = np.max(mix_ppf, axis=0) while True: cur = (lower + upper) / 2 cur_mean = np.mean(scipy.stats.norm.cdf(cur, loc=mean, scale=stderr), axis=0) if np.isscalar(cur): if np.abs(cur_mean - alpha) < tol or (cur == lower): return cur elif cur_mean < alpha: lower = cur else: upper = cur else: if np.all((np.abs(cur_mean - alpha) < tol) | (cur == lower)): return cur lower[cur_mean < alpha] = cur[cur_mean < alpha] upper[cur_mean > alpha] = cur[cur_mean > alpha] def _format_res(self, res, decimals): arr = np.array([[res]]) if np.isscalar(res) else res.reshape(-1, 1) arr = np.round(arr, decimals) return arr def _get_stub_names(self, d_y, d_t, treatment_names, output_names): if d_y > 1: if d_t > 1: stubs = [oname + "|" + tname for oname in output_names for tname in treatment_names] else: stubs = output_names else: if d_t > 1: stubs = treatment_names else: stubs = [] return stubs