# Copyright (c) PyWhy contributors. All rights reserved.
# Licensed under the MIT License.
"""Module for assessing causal feature importance."""
import warnings
from collections import OrderedDict, namedtuple
import joblib
import lightgbm as lgb
from numba.core.utils import erase_traceback
import numpy as np
from numpy.lib.function_base import iterable
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier, RandomForestRegressor
from sklearn.linear_model import Lasso, LassoCV, LogisticRegression, LogisticRegressionCV
from sklearn.pipeline import make_pipeline, Pipeline
from sklearn.preprocessing import OneHotEncoder, PolynomialFeatures, StandardScaler
from sklearn.tree import _tree
from sklearn.utils.validation import column_or_1d
from ...cate_interpreter import SingleTreeCateInterpreter, SingleTreePolicyInterpreter
from ...dml import LinearDML, CausalForestDML
from ...inference import NormalInferenceResults
from ...sklearn_extensions.linear_model import WeightedLasso
from ...sklearn_extensions.model_selection import GridSearchCVList
from ...utilities import _RegressionWrapper, get_feature_names_or_default, inverse_onehot, one_hot_encoder
# TODO: this utility is documented but internal; reimplement?
from sklearn.utils import _safe_indexing
# TODO: this utility is even less public...
from packaging.version import parse
import sklearn
if parse(sklearn.__version__) < parse("1.5"):
from sklearn.utils import _get_column_indices
else:
from sklearn.utils._indexing import _get_column_indices
class _CausalInsightsConstants:
RawFeatureNameKey = 'raw_name'
EngineeredNameKey = 'name'
CategoricalColumnKey = 'cat'
TypeKey = 'type'
PointEstimateKey = 'point'
StandardErrorKey = 'stderr'
ZStatKey = 'zstat'
ConfidenceIntervalLowerKey = 'ci_lower'
ConfidenceIntervalUpperKey = 'ci_upper'
PValueKey = 'p_value'
Version = 'version'
CausalComputationTypeKey = 'causal_computation_type'
ConfoundingIntervalKey = 'confounding_interval'
ViewKey = 'view'
InitArgsKey = 'init_args'
RowData = 'row_data' # NOTE: RowData is mutually exclusive with the other data columns
ALL = [RawFeatureNameKey,
EngineeredNameKey,
CategoricalColumnKey,
TypeKey,
PointEstimateKey,
StandardErrorKey,
ZStatKey,
ConfidenceIntervalLowerKey,
ConfidenceIntervalUpperKey,
PValueKey,
Version,
CausalComputationTypeKey,
ConfoundingIntervalKey,
ViewKey,
InitArgsKey,
RowData]
def _get_default_shared_insights_output():
"""
Dictionary elements shared among all analyses.
In case of breaking changes to this dictionary output, the major version of this
dictionary should be updated. In case of a change to this dictionary, the minor
version should be updated.
"""
return {
_CausalInsightsConstants.RawFeatureNameKey: [],
_CausalInsightsConstants.EngineeredNameKey: [],
_CausalInsightsConstants.CategoricalColumnKey: [],
_CausalInsightsConstants.TypeKey: [],
_CausalInsightsConstants.Version: '1.0',
_CausalInsightsConstants.CausalComputationTypeKey: "simple",
_CausalInsightsConstants.ConfoundingIntervalKey: None,
_CausalInsightsConstants.InitArgsKey: {}
}
def _get_default_specific_insights(view):
# keys should be mutually exclusive with shared keys, so that the dictionaries can be cleanly merged
return {
_CausalInsightsConstants.PointEstimateKey: [],
_CausalInsightsConstants.StandardErrorKey: [],
_CausalInsightsConstants.ZStatKey: [],
_CausalInsightsConstants.ConfidenceIntervalLowerKey: [],
_CausalInsightsConstants.ConfidenceIntervalUpperKey: [],
_CausalInsightsConstants.PValueKey: [],
_CausalInsightsConstants.ViewKey: view
}
def _get_metadata_causal_insights_keys():
return [_CausalInsightsConstants.Version,
_CausalInsightsConstants.CausalComputationTypeKey,
_CausalInsightsConstants.ConfoundingIntervalKey,
_CausalInsightsConstants.ViewKey]
def _get_column_causal_insights_keys():
return [_CausalInsightsConstants.RawFeatureNameKey,
_CausalInsightsConstants.EngineeredNameKey,
_CausalInsightsConstants.CategoricalColumnKey,
_CausalInsightsConstants.TypeKey]
def _get_data_causal_insights_keys():
return [_CausalInsightsConstants.PointEstimateKey,
_CausalInsightsConstants.StandardErrorKey,
_CausalInsightsConstants.ZStatKey,
_CausalInsightsConstants.ConfidenceIntervalLowerKey,
_CausalInsightsConstants.ConfidenceIntervalUpperKey,
_CausalInsightsConstants.PValueKey]
def _first_stage_reg(X, y, *, automl=True, random_state=None, verbose=0):
if automl:
model = GridSearchCVList([LassoCV(random_state=random_state),
RandomForestRegressor(
n_estimators=100, random_state=random_state, min_samples_leaf=10),
lgb.LGBMRegressor(num_leaves=32, random_state=random_state)],
param_grid_list=[{},
{'min_weight_fraction_leaf':
[.001, .01, .1]},
{'learning_rate': [0.1, 0.3], 'max_depth': [3, 5]}],
cv=3,
scoring='r2',
verbose=verbose)
best_est = model.fit(X, y).best_estimator_
if isinstance(best_est, LassoCV):
return Lasso(alpha=best_est.alpha_, random_state=random_state)
else:
return best_est
else:
model = LassoCV(cv=5, random_state=random_state).fit(X, y)
return Lasso(alpha=model.alpha_, random_state=random_state)
def _first_stage_clf(X, y, *, make_regressor=False, automl=True, min_count=None, random_state=None, verbose=0):
# use same Cs as would be used by default by LogisticRegressionCV
cs = np.logspace(-4, 4, 10)
if min_count is None:
min_count = _CAT_LIMIT # we have at least this many instances
if automl:
# NOTE: we don't use LogisticRegressionCV inside the grid search because of the nested stratification
# which could affect how many times each distinct Y value needs to be present in the data
model = GridSearchCVList([LogisticRegression(max_iter=1000,
random_state=random_state),
RandomForestClassifier(n_estimators=100, min_samples_leaf=10,
random_state=random_state),
lgb.LGBMClassifier(num_leaves=32, random_state=random_state)],
param_grid_list=[{'C': cs},
{'max_depth': [3, None],
'min_weight_fraction_leaf': [.001, .01, .1]},
{'learning_rate': [0.1, 0.3], 'max_depth': [3, 5]}],
cv=min(3, min_count),
scoring='neg_log_loss',
verbose=verbose)
est = model.fit(X, y).best_estimator_
else:
model = LogisticRegressionCV(
cv=min(5, min_count), max_iter=1000, Cs=cs, random_state=random_state).fit(X, y)
est = LogisticRegression(C=model.C_[0], max_iter=1000, random_state=random_state)
if make_regressor:
return _RegressionWrapper(est)
else:
return est
def _final_stage(*, random_state=None, verbose=0):
return GridSearchCVList([WeightedLasso(random_state=random_state),
RandomForestRegressor(n_estimators=100, random_state=random_state, verbose=verbose)],
param_grid_list=[{'alpha': [.001, .01, .1, 1, 10]},
{'max_depth': [3, 5],
'min_samples_leaf': [10, 50]}],
cv=3,
scoring='neg_mean_squared_error',
verbose=verbose)
# simplification of sklearn's ColumnTransformer that encodes categoricals and passes through selected other columns
# but also supports get_feature_names with expected signature
class _ColumnTransformer(TransformerMixin):
def __init__(self, categorical, passthrough):
self.categorical = categorical
self.passthrough = passthrough
def fit(self, X):
cat_cols = _safe_indexing(X, self.categorical, axis=1)
if cat_cols.shape[1] > 0:
self.has_cats = True
# NOTE: set handle_unknown to 'ignore' so that we don't throw at runtime if given a novel value
self.one_hot_encoder = one_hot_encoder(handle_unknown='ignore').fit(cat_cols)
else:
self.has_cats = False
self.d_x = X.shape[1]
return self
def transform(self, X):
rest = _safe_indexing(X, self.passthrough, axis=1)
if self.has_cats:
cats = self.one_hot_encoder.transform(_safe_indexing(X, self.categorical, axis=1))
# NOTE: we rely on the passthrough columns coming first in the concatenated X;W
# when we pipeline scaling with our first stage models later, so the order here is important
return np.hstack((rest, cats))
else:
return rest
# TODO: remove once older sklearn support is no longer needed
def get_feature_names(self, names=None):
return self.get_feature_names_out(names)
def get_feature_names_out(self, names=None):
if names is None:
names = [f"x{i}" for i in range(self.d_x)]
rest = _safe_indexing(names, self.passthrough, axis=0)
if self.has_cats:
cats = get_feature_names_or_default(self.one_hot_encoder, _safe_indexing(names, self.categorical, axis=0))
return np.concatenate((rest, cats))
else:
return rest
# Wrapper to make sure that we get a deep copy of the contents instead of clone returning an untrained copy
class _Wrapper:
def __init__(self, item):
self.item = item
class _FrozenTransformer(TransformerMixin, BaseEstimator):
def __init__(self, wrapper):
self.wrapper = wrapper
def fit(self, X, y):
return self
def transform(self, X):
return self.wrapper.item.transform(X)
def _freeze(transformer):
return _FrozenTransformer(_Wrapper(transformer))
# Convert python objects to (possibly nested) types that can easily be represented as literals
def _sanitize(obj):
if obj is None or isinstance(obj, (bool, int, str, float)):
return obj
elif isinstance(obj, dict):
return {_sanitize(key): _sanitize(obj[key]) for key in obj}
else:
try:
return [_sanitize(item) for item in obj]
except Exception:
raise ValueError(f"Could not sanitize input {obj}")
# Convert SingleTreeInterpreter to a python dictionary
def _tree_interpreter_to_dict(interp, features, leaf_data=lambda t, n: {}):
tree = interp.tree_model_.tree_
node_dict = interp.node_dict_
def recurse(node_id):
if tree.children_left[node_id] == _tree.TREE_LEAF:
return {'leaf': True, 'n_samples': tree.n_node_samples[node_id], **leaf_data(tree, node_id, node_dict)}
else:
return {'leaf': False, 'feature': features[tree.feature[node_id]], 'threshold': tree.threshold[node_id],
'left': recurse(tree.children_left[node_id]),
'right': recurse(tree.children_right[node_id])}
return recurse(0)
class _PolicyOutput:
"""
A type encapsulating various information related to a learned policy.
Attributes
----------
tree_dictionary:dict
The policy tree represented as a dictionary,
policy_value:float
The average value of applying the recommended policy (over using the control),
always_treat:dict of str to float
A dictionary mapping each non-control treatment to the value of always treating with it (over control),
control_name:str
The name of the control treatment
"""
def __init__(self, tree_dictionary, policy_value, always_treat, control_name):
self.tree_dictionary = tree_dictionary
self.policy_value = policy_value
self.always_treat = always_treat
self.control_name = control_name
# named tuple type for storing results inside CausalAnalysis class;
# must be lifted to module level to enable pickling
_result = namedtuple("_result", field_names=[
"feature_index", "feature_name", "feature_baseline", "feature_levels", "hinds",
"X_transformer", "W_transformer", "estimator", "global_inference", "treatment_value"])
def _process_feature(name, feat_ind, verbose, categorical_inds, categories, heterogeneity_inds, min_counts, y, X,
nuisance_models, h_model, random_state, model_y, cv, mc_iters):
try:
if verbose > 0:
print(f"CausalAnalysis: Feature {name}")
discrete_treatment = feat_ind in categorical_inds
if discrete_treatment:
cats = categories[categorical_inds.index(feat_ind)]
else:
cats = 'auto' # just leave the setting at the default otherwise
# the transformation logic here is somewhat tricky; we always need to encode the categorical columns,
# whether they end up in X or in W. However, for the continuous columns, we want to scale them all
# when running the first stage models, but don't want to scale the X columns when running the final model,
# since then our coefficients will have odd units and our trees will also have decisions using those units.
#
# we achieve this by pipelining the X scaling with the Y and T models (with fixed scaling, not refitting)
hinds = heterogeneity_inds[feat_ind]
WX_transformer = ColumnTransformer([('encode', one_hot_encoder(drop='first'),
[ind for ind in categorical_inds
if ind != feat_ind]),
('drop', 'drop', feat_ind)],
remainder=StandardScaler())
W_transformer = ColumnTransformer([('encode', one_hot_encoder(drop='first'),
[ind for ind in categorical_inds
if ind != feat_ind and ind not in hinds]),
('drop', 'drop', hinds),
('drop_feat', 'drop', feat_ind)],
remainder=StandardScaler())
X_cont_inds = [ind for ind in hinds
if ind != feat_ind and ind not in categorical_inds]
# Use _ColumnTransformer instead of ColumnTransformer so we can get feature names
X_transformer = _ColumnTransformer([ind for ind in categorical_inds
if ind != feat_ind and ind in hinds],
X_cont_inds)
# Controls are all other columns of X
WX = WX_transformer.fit_transform(X)
# can't use X[:, feat_ind] when X is a DataFrame
T = _safe_indexing(X, feat_ind, axis=1)
# TODO: we can't currently handle unseen values of the feature column when getting the effect;
# we might want to modify OrthoLearner (and other discrete treatment classes)
# so that the user can opt-in to allowing unseen treatment values
# (and return NaN or something in that case)
W = W_transformer.fit_transform(X)
X_xf = X_transformer.fit_transform(X)
# HACK: this is slightly ugly because we rely on the fact that DML passes [X;W] to the first stage models
# and so we can just peel the first columns off of that combined array for rescaling in the pipeline
# TODO: consider addding an API to DML that allows for better understanding of how the nuisance inputs are
# built, such as model_y_feature_names, model_t_feature_names, model_y_transformer, etc., so that this
# becomes a valid approach to handling this
X_scaler = ColumnTransformer([('scale', StandardScaler(),
list(range(len(X_cont_inds))))],
remainder='passthrough').fit(np.hstack([X_xf, W])).named_transformers_['scale']
X_scaler_fixed = ColumnTransformer([('scale', _freeze(X_scaler),
list(range(len(X_cont_inds))))],
remainder='passthrough')
if W.shape[1] == 0:
# array checking routines don't accept 0-width arrays
W = None
if X_xf.shape[1] == 0:
X_xf = None
if verbose > 0:
print("CausalAnalysis: performing model selection on T model")
# perform model selection
model_t = (_first_stage_clf(WX, T, automl=nuisance_models == 'automl',
min_count=min_counts.get(feat_ind, None),
random_state=random_state, verbose=verbose)
if discrete_treatment else _first_stage_reg(WX, T, automl=nuisance_models == 'automl',
random_state=random_state,
verbose=verbose))
pipelined_model_t = Pipeline([('scale', X_scaler_fixed),
('model', model_t)])
pipelined_model_y = Pipeline([('scale', X_scaler_fixed),
('model', model_y)])
if X_xf is None and h_model == 'forest':
warnings.warn(f"Using a linear model instead of a forest model for feature '{name}' "
"because forests don't support models with no heterogeneity indices")
h_model = 'linear'
if h_model == 'linear':
est = LinearDML(model_y=pipelined_model_y,
model_t=pipelined_model_t,
discrete_treatment=discrete_treatment,
fit_cate_intercept=True,
categories=cats,
random_state=random_state,
cv=cv,
mc_iters=mc_iters)
elif h_model == 'forest':
est = CausalForestDML(model_y=pipelined_model_y,
model_t=pipelined_model_t,
discrete_treatment=discrete_treatment,
n_estimators=4000,
min_var_leaf_on_val=True,
categories=cats,
random_state=random_state,
verbose=verbose,
cv=cv,
mc_iters=mc_iters)
if verbose > 0:
print("CausalAnalysis: tuning forest")
est.tune(y, T, X=X_xf, W=W)
if verbose > 0:
print("CausalAnalysis: training causal model")
est.fit(y, T, X=X_xf, W=W, cache_values=True)
# Prefer ate__inference to const_marginal_ate_inference(X) because it is doubly-robust and not conservative
if h_model == 'forest' and discrete_treatment:
global_inference = est.ate__inference()
else:
# convert to NormalInferenceResults for consistency
inf = est.const_marginal_ate_inference(X=X_xf)
global_inference = NormalInferenceResults(d_t=inf.d_t, d_y=inf.d_y,
pred=inf.mean_point,
pred_stderr=inf.stderr_mean,
mean_pred_stderr=None,
inf_type='ate')
# Set the dictionary values shared between local and global summaries
if discrete_treatment:
cats = est.transformer.categories_[0]
baseline = cats[est.transformer.drop_idx_[0]]
cats = cats[np.setdiff1d(np.arange(len(cats)),
est.transformer.drop_idx_[0])]
d_t = len(cats)
insights = {
_CausalInsightsConstants.TypeKey: ['cat'] * d_t,
_CausalInsightsConstants.RawFeatureNameKey: [name] * d_t,
_CausalInsightsConstants.CategoricalColumnKey: cats.tolist(),
_CausalInsightsConstants.EngineeredNameKey: [
f"{name} (base={baseline}): {c}" for c in cats]
}
treatment_value = 1
else:
d_t = 1
cats = ["num"]
baseline = None
insights = {
_CausalInsightsConstants.TypeKey: ["num"],
_CausalInsightsConstants.RawFeatureNameKey: [name],
_CausalInsightsConstants.CategoricalColumnKey: [name],
_CausalInsightsConstants.EngineeredNameKey: [name]
}
# calculate a "typical" treatment value, using the mean of the absolute value of non-zero treatments
treatment_value = np.mean(np.abs(T[T != 0]))
result = _result(feature_index=feat_ind,
feature_name=name,
feature_baseline=baseline,
feature_levels=cats,
hinds=hinds,
X_transformer=X_transformer,
W_transformer=W_transformer,
estimator=est,
global_inference=global_inference,
treatment_value=treatment_value)
return insights, result
except Exception as e:
warnings.warn(f"Exception caught when training model for feature {name}: {e}")
return e
# Unless we're opting into minimal cross-fitting, this is the minimum number of instances of each category
# required to fit a discrete DML model
_CAT_LIMIT = 10
# TODO: Add other nuisance model options, such as {'azure_automl', 'forests', 'boosting'} that will use particular
# sub-cases of models or also integrate with azure autoML. (post-MVP)
# TODO: Add other heterogeneity model options, such as {'automl'} for performing
# model selection for the causal effect, or {'sparse_linear'} for using a debiased lasso. (post-MVP)
# TODO: Enable multi-class classification (post-MVP)
[docs]class CausalAnalysis:
"""
Note: this class is experimental and the API may evolve over our next few releases.
Gets causal importance of features.
Parameters
----------
feature_inds: array_like of int, str, or bool
The features for which to estimate causal effects, expressed as either column indices,
column names, or boolean flags indicating which columns to pick
categorical: array_like of int, str, or bool
The features which are categorical in nature, expressed as either column indices,
column names, or boolean flags indicating which columns to pick
heterogeneity_inds: array_like of int, str, or bool, or None or list of array_like elements or None, default None
If a 1d array, then whenever estimating a heterogeneous (local) treatment effect
model, then only the features in this array will be used for heterogeneity. If a 2d
array then its first dimension should be len(feature_inds) and whenever estimating
a local causal effect for target feature feature_inds[i], then only features in
heterogeneity_inds[i] will be used for heterogeneity. If heterogeneity_inds[i]=None, then all features
are used for heterogeneity when estimating local causal effect for feature_inds[i], and likewise if
heterogeneity_inds[i]=[] then no features will be used for heterogeneity. If heterogeneity_ind=None
then all features are used for heterogeneity for all features, and if heterogeneity_inds=[] then
no features will be.
feature_names: list of str, optional
The names for all of the features in the data. Not necessary if the input will be a dataframe.
If None and the input is a plain numpy array, generated feature names will be ['X1', 'X2', ...].
upper_bound_on_cat_expansion: int, default 5
The maximum number of categorical values allowed, because they are expanded via one-hot encoding. If a
feature has more than this many values, then a causal effect model is not fitted for that target feature
and a warning flag is raised. The remainder of the models are fitted.
classification: bool, default False
Whether this is a classification (as opposed to regression) task
nuisance_models: one of {'linear', 'automl'}, default 'linear'
The model class to use for nuisance estimation. Separate nuisance models are trained to predict the
outcome and also each individual feature column from all of the other columns in the dataset as a prerequisite
step before computing the actual causal effect for that feature column. If 'linear', then
:class:`~sklearn.linear_model.LassoCV` (for regression) or :class:`~sklearn.linear_model.LogisticRegressionCV`
(for classification) is used for these models. If 'automl', then model selection picks the best-performing
among several different model classes for each model being trained using k-fold cross-validation, which
requires additional computation.
heterogeneity_model: one of {'linear', 'forest'}, default 'linear'
The type of model to use for the final heterogeneous treatment effect model. 'linear' means that a
the estimated treatment effect for a column will be a linear function of the heterogeneity features for that
column, while 'forest' means that a forest model will be trained to compute the effect from those heterogeneity
features instead.
categories: 'auto' or list of ('auto' or list of values), default 'auto'
What categories to use for the categorical columns. If 'auto', then the categories will be inferred for
all categorical columns; otherwise this argument should have as many entries as there are categorical columns,
and each entry should be either 'auto' to infer the values for that column or the list of values for the
column. If explicit values are provided, the first value is treated as the "control" value for that column
against which other values are compared.
n_jobs: int, default -1
Degree of parallelism to use when training models via joblib.Parallel
verbose : int, default 0
Controls the verbosity when fitting and predicting.
cv: int, cross-validation generator or an iterable, default 5
Determines the strategy for cross-fitting used when training causal models for each feature.
Possible inputs for cv are:
- integer, to specify the number of folds.
- :term:`CV splitter`
- An iterable yielding (train, test) splits as arrays of indices.
For integer inputs, if the treatment is discrete
:class:`~sklearn.model_selection.StratifiedKFold` is used, else,
:class:`~sklearn.model_selection.KFold` is used
(with a random shuffle in either case).
mc_iters: int, default 3
The number of times to rerun the first stage models to reduce the variance of the causal model nuisances.
skip_cat_limit_checks: bool, default False
By default, categorical features need to have several instances of each category in order for a model to be
fit robustly. Setting this to True will skip these checks (although at least 2 instances will always be
required for linear heterogeneity models, and 4 for forest heterogeneity models even in that case).
random_state : int, RandomState instance, or None, default None
Controls the randomness of the estimator. The features are always
randomly permuted at each split. When ``max_features < n_features``, the algorithm will
select ``max_features`` at random at each split before finding the best
split among them. But the best found split may vary across different
runs, even if ``max_features=n_features``. That is the case, if the
improvement of the criterion is identical for several splits and one
split has to be selected at random. To obtain a deterministic behaviour
during fitting, ``random_state`` has to be fixed to an integer.
Attributes
----------
nuisance_models_: str
The nuisance models setting used for the most recent call to fit
heterogeneity_model: str
The heterogeneity model setting used for the most recent call to fit
feature_names_: list of str
The list of feature names from the data in the most recent call to fit
trained_feature_indices_: list of int
The list of feature indices where models were trained successfully
untrained_feature_indices_: list of tuple of (int, str or Exception)
The list of indices that were requested but not able to be trained succesfully,
along with either a reason or caught Exception for each
"""
[docs] def __init__(self, feature_inds, categorical, heterogeneity_inds=None, feature_names=None, classification=False,
upper_bound_on_cat_expansion=5, nuisance_models='linear', heterogeneity_model='linear', *,
categories='auto', n_jobs=-1, verbose=0, cv=5, mc_iters=3, skip_cat_limit_checks=False,
random_state=None):
self.feature_inds = feature_inds
self.categorical = categorical
self.heterogeneity_inds = heterogeneity_inds
self.feature_names = feature_names
self.classification = classification
self.upper_bound_on_cat_expansion = upper_bound_on_cat_expansion
self.nuisance_models = nuisance_models
self.heterogeneity_model = heterogeneity_model
self.categories = categories
self.n_jobs = n_jobs
self.verbose = verbose
self.cv = cv
self.mc_iters = mc_iters
self.skip_cat_limit_checks = skip_cat_limit_checks
self.random_state = random_state
[docs] def fit(self, X, y, warm_start=False):
"""
Fits global and local causal effect models for each feature in feature_inds on the data
Parameters
----------
X : array_like
Feature data
y : array_like of shape (n,) or (n,1)
Outcome. If classification=True, then y should take two values. Otherwise an error is raised
that only binary classification is implemented for now.
TODO. enable multi-class classification for y (post-MVP)
warm_start : bool, default False
If False, train models for each feature in `feature_inds`.
If True, train only models for features in `feature_inds` that had not already been trained by
the previous call to `fit`, and for which neither the corresponding heterogeneity_inds, nor the
automl flag have changed. If heterogeneity_inds have changed, then the final stage model of these features
will be refit. If the automl flag has changed, then whole model is refit, despite the warm start flag.
"""
# Validate inputs
assert self.nuisance_models in ['automl', 'linear'], (
"The only supported nuisance models are 'linear' and 'automl', "
f"but was given {self.nuisance_models}")
assert self.heterogeneity_model in ['linear', 'forest'], (
"The only supported heterogeneity models are 'linear' and 'forest' but received "
f"{self.heterogeneity_model}")
assert np.ndim(X) == 2, f"X must be a 2-dimensional array, but here had shape {np.shape(X)}"
assert iterable(self.feature_inds), f"feature_inds should be array_like, but got {self.feature_inds}"
assert iterable(self.categorical), f"categorical should be array_like, but got {self.categorical}"
assert self.heterogeneity_inds is None or iterable(self.heterogeneity_inds), (
f"heterogeneity_inds should be None or array_like, but got {self.heterogeneity_inds}")
assert self.feature_names is None or iterable(self.feature_names), (
f"feature_names should be None or array_like, but got {self.feature_names}")
assert self.categories == 'auto' or iterable(self.categories), (
f"categories should be 'auto' or array_like, but got {self.categories}")
# TODO: check compatibility of X and Y lengths
if warm_start:
if not hasattr(self, "_results"):
# no previous fit, cancel warm start
warm_start = False
elif self._d_x != X.shape[1]:
raise ValueError(
f"Can't warm start: previous X had {self._d_x} columns, new X has {X.shape[1]} columns")
# work with numeric feature indices, so that we can easily compare with categorical ones
train_inds = _get_column_indices(X, self.feature_inds)
if len(train_inds) == 0:
raise ValueError(
"No features specified. At least one feature index must be specified so that a model can be trained.")
heterogeneity_inds = self.heterogeneity_inds
if heterogeneity_inds is None:
heterogeneity_inds = [None for ind in train_inds]
# if heterogeneity_inds is 1D, repeat it
if heterogeneity_inds == [] or isinstance(heterogeneity_inds[0], (int, str, bool)):
heterogeneity_inds = [heterogeneity_inds for _ in train_inds]
# heterogeneity inds should be a 2D list of length same as train_inds
elif heterogeneity_inds is not None and len(heterogeneity_inds) != len(train_inds):
raise ValueError("Heterogeneity indexes should have the same number of entries, but here "
f" there were {len(heterogeneity_inds)} heterogeneity entries but "
f" {len(train_inds)} feature indices.")
# replace None elements of heterogeneity_inds and ensure indices are numeric
heterogeneity_inds = {ind: list(range(X.shape[1])) if hinds is None else _get_column_indices(X, hinds)
for ind, hinds in zip(train_inds, heterogeneity_inds)}
if warm_start:
train_y_model = False
if self.nuisance_models != self.nuisance_models_:
warnings.warn("warm_start will be ignored since the nuisance models have changed "
f"from {self.nuisance_models_} to {self.nuisance_models} since the previous call to fit")
warm_start = False
train_y_model = True
if self.heterogeneity_model != self.heterogeneity_model_:
warnings.warn("warm_start will be ignored since the heterogeneity model has changed "
f"from {self.heterogeneity_model_} to {self.heterogeneity_model} "
"since the previous call to fit")
warm_start = False
# TODO: bail out also if categorical columns, classification, random_state changed?
else:
train_y_model = True
# TODO: should we also train a new model_y under any circumstances when warm_start is True?
if warm_start:
new_inds = [ind for ind in train_inds if (ind not in self._cache or
heterogeneity_inds[ind] != self._cache[ind][1].hinds)]
else:
new_inds = list(train_inds)
self._cache = {} # store mapping from feature to insights, results
# train the Y model
if train_y_model:
# perform model selection for the Y model using all X, not on a per-column basis
allX = ColumnTransformer([('encode',
one_hot_encoder(drop='first'),
self.categorical)],
remainder=StandardScaler()).fit_transform(X)
if self.verbose > 0:
print("CausalAnalysis: performing model selection on overall Y model")
if self.classification:
self._model_y = _first_stage_clf(allX, y, automl=self.nuisance_models == 'automl',
make_regressor=True,
random_state=self.random_state, verbose=self.verbose)
else:
self._model_y = _first_stage_reg(allX, y, automl=self.nuisance_models == 'automl',
random_state=self.random_state, verbose=self.verbose)
if self.classification:
# now that we've trained the classifier and wrapped it, ensure that y is transformed to
# work with the regression wrapper
# we use column_or_1d to treat pd.Series and pd.DataFrame objects the same way as arrays
y = column_or_1d(y).reshape(-1, 1)
# note that this needs to happen after wrapping to generalize to the multi-class case,
# since otherwise we'll have too many columns to be able to train a classifier
y = one_hot_encoder(drop='first').fit_transform(y)
assert y.ndim == 1 or y.shape[1] == 1, ("Multiclass classification isn't supported" if self.classification
else "Only a single outcome is supported")
self._vec_y = y.ndim == 1
self._d_x = X.shape[1]
# start with empty results and default shared insights
self._results = []
self._shared = _get_default_shared_insights_output()
self._shared[_CausalInsightsConstants.InitArgsKey] = {
'feature_inds': _sanitize(self.feature_inds),
'categorical': _sanitize(self.categorical),
'heterogeneity_inds': _sanitize(self.heterogeneity_inds),
'feature_names': _sanitize(self.feature_names),
'classification': _sanitize(self.classification),
'upper_bound_on_cat_expansion': _sanitize(self.upper_bound_on_cat_expansion),
'nuisance_models': _sanitize(self.nuisance_models),
'heterogeneity_model': _sanitize(self.heterogeneity_model),
'categories': _sanitize(self.categories),
'n_jobs': _sanitize(self.n_jobs),
'verbose': _sanitize(self.verbose),
'random_state': _sanitize(self.random_state)
}
# convert categorical indicators to numeric indices
categorical_inds = _get_column_indices(X, self.categorical)
categories = self.categories
if categories == 'auto':
categories = ['auto' for _ in categorical_inds]
else:
assert len(categories) == len(categorical_inds), (
"If categories is not 'auto', it must contain one entry per categorical column. Instead, categories"
f"has length {len(categories)} while there are {len(categorical_inds)} categorical columns.")
# check for indices over the categorical expansion bound
invalid_inds = getattr(self, 'untrained_feature_indices_', [])
# assume we'll be able to train former failures this time; we'll add them back if not
invalid_inds = [(ind, reason) for (ind, reason) in invalid_inds if ind not in new_inds]
self._has_column_names = True
if self.feature_names is None:
if hasattr(X, "iloc"):
feature_names = X.columns
else:
self._has_column_names = False
feature_names = [f"x{i}" for i in range(X.shape[1])]
else:
feature_names = self.feature_names
self.feature_names_ = feature_names
min_counts = {}
for ind in new_inds:
column_text = self._format_col(ind)
if ind in categorical_inds:
cats, counts = np.unique(_safe_indexing(X, ind, axis=1), return_counts=True)
min_ind = np.argmin(counts)
n_cat = len(cats)
if n_cat > self.upper_bound_on_cat_expansion:
warnings.warn(f"{column_text} has more than {self.upper_bound_on_cat_expansion} "
f"values (found {n_cat}) so no heterogeneity model will be fit for it; "
"increase 'upper_bound_on_cat_expansion' to change this behavior.")
# can't remove in place while iterating over new_inds, so store in separate list
invalid_inds.append((ind, 'upper_bound_on_cat_expansion'))
elif counts[min_ind] < _CAT_LIMIT:
if self.skip_cat_limit_checks and (counts[min_ind] >= 5 or
(counts[min_ind] >= 2 and
self.heterogeneity_model != 'forest')):
# train the model, but warn
warnings.warn(f"{column_text}'s value {cats[min_ind]} has only {counts[min_ind]} instances in "
f"the training dataset, which is less than the lower limit ({_CAT_LIMIT}). "
"A model will still be fit because 'skip_cat_limit_checks' is True, "
"but this model may not be robust.")
min_counts[ind] = counts[min_ind]
elif counts[min_ind] < 2 or (counts[min_ind] < 5 and self.heterogeneity_model == 'forest'):
# no model can be trained in this case since we need more folds
warnings.warn(f"{column_text}'s value {cats[min_ind]} has only {counts[min_ind]} instances in "
"the training dataset, but linear heterogeneity models need at least 2 and "
"forest heterogeneity models need at least 5 instances, so no model will be fit "
"for this column")
invalid_inds.append((ind, 'cat_limit'))
else:
# don't train a model, but suggest workaround since there are enough instances of least
# populated class
warnings.warn(f"{column_text}'s value {cats[min_ind]} has only {counts[min_ind]} instances in "
f"the training dataset, which is less than the lower limit ({_CAT_LIMIT}), "
"so no heterogeneity model will be fit for it. This check can be turned off by "
"setting 'skip_cat_limit_checks' to True, but that may result in an inaccurate "
"model for this feature.")
invalid_inds.append((ind, 'cat_limit'))
for (ind, _) in invalid_inds:
new_inds.remove(ind)
# also remove from train_inds so we don't try to access the result later
train_inds.remove(ind)
if len(train_inds) == 0:
raise ValueError("No features remain; increase the upper_bound_on_cat_expansion and ensure that there "
"are several instances of each categorical value so that at least "
"one feature model can be trained.")
# extract subset of names matching new columns
new_feat_names = _safe_indexing(feature_names, new_inds)
cache_updates = dict(zip(new_inds,
joblib.Parallel(
n_jobs=self.n_jobs,
verbose=self.verbose
)(joblib.delayed(_process_feature)(
feat_name, feat_ind,
self.verbose, categorical_inds, categories, heterogeneity_inds, min_counts, y, X,
self.nuisance_models, self.heterogeneity_model, self.random_state, self._model_y,
self.cv, self.mc_iters)
for feat_name, feat_ind in zip(new_feat_names, new_inds))))
# track indices where an exception was thrown, since we can't remove from dictionary while iterating
inds_to_remove = []
for ind, value in cache_updates.items():
if isinstance(value, Exception):
# don't want to cache this failed result
inds_to_remove.append(ind)
train_inds.remove(ind)
invalid_inds.append((ind, value))
for ind in inds_to_remove:
del cache_updates[ind]
self._cache.update(cache_updates)
for ind in train_inds:
dict_update, result = self._cache[ind]
self._results.append(result)
for k in dict_update:
self._shared[k] += dict_update[k]
invalid_inds.sort()
self.untrained_feature_indices_ = invalid_inds
self.trained_feature_indices_ = train_inds
self.nuisance_models_ = self.nuisance_models
self.heterogeneity_model_ = self.heterogeneity_model
return self
def _format_col(self, ind):
if self._has_column_names:
return f"Column {ind} ({self.feature_names_[ind]})"
else:
return f"Column {ind}"
# properties to return from effect InferenceResults
@staticmethod
def _point_props(alpha):
return [(_CausalInsightsConstants.PointEstimateKey, 'point_estimate'),
(_CausalInsightsConstants.StandardErrorKey, 'stderr'),
(_CausalInsightsConstants.ZStatKey, 'zstat'),
(_CausalInsightsConstants.PValueKey, 'pvalue'),
(_CausalInsightsConstants.ConfidenceIntervalLowerKey, lambda inf: inf.conf_int(alpha=alpha)[0]),
(_CausalInsightsConstants.ConfidenceIntervalUpperKey, lambda inf: inf.conf_int(alpha=alpha)[1])]
# properties to return from PopulationSummaryResults
@staticmethod
def _summary_props(alpha):
return [(_CausalInsightsConstants.PointEstimateKey, 'mean_point'),
(_CausalInsightsConstants.StandardErrorKey, 'stderr_mean'),
(_CausalInsightsConstants.ZStatKey, 'zstat'),
(_CausalInsightsConstants.PValueKey, 'pvalue'),
(_CausalInsightsConstants.ConfidenceIntervalLowerKey, lambda inf: inf.conf_int_mean(alpha=alpha)[0]),
(_CausalInsightsConstants.ConfidenceIntervalUpperKey, lambda inf: inf.conf_int_mean(alpha=alpha)[1])]
# Converts strings to property lookups or method calls as a convenience so that the
# _point_props and _summary_props above can be applied to an inference object
@staticmethod
def _make_accessor(attr):
if isinstance(attr, str):
s = attr
def attr(o):
val = getattr(o, s)
if callable(val):
return val()
else:
return val
return attr
# Create a summary combining all results into a single output; this is used
# by the various causal_effect and causal_effect_dict methods to generate either a dataframe
# or a dictionary, respectively, based on the summary function passed into this method
def _summarize(self, *, summary, get_inference, props, expand_arr, drop_sample):
assert hasattr(self, "_results"), "This object has not been fit, so cannot get results"
# ensure array has shape (m,y,t)
def ensure_proper_dims(arr):
if expand_arr:
# population summary is missing sample dimension; add it for consistency
arr = np.expand_dims(arr, 0)
if self._vec_y:
# outcome dimension is missing; add it for consistency
arr = np.expand_dims(arr, axis=1)
assert 2 <= arr.ndim <= 3
# add singleton treatment dimension if missing
return arr if arr.ndim == 3 else np.expand_dims(arr, axis=2)
# store set of inference results so we don't need to recompute per-attribute below in summary/coalesce
infs = [get_inference(res) for res in self._results]
# each attr has dimension (m,y) or (m,y,t)
def coalesce(attr):
"""Join together the arrays for each feature"""
attr = self._make_accessor(attr)
# concatenate along treatment dimension
arr = np.concatenate([ensure_proper_dims(attr(inf))
for inf in infs], axis=2)
# for dictionary representation, want to remove unneeded sample dimension
# in cohort and global results
if drop_sample:
arr = np.squeeze(arr, 0)
return arr
return summary([(key, coalesce(val)) for key, val in props])
def _pandas_summary(self, get_inference, *, props, n,
expand_arr=False, keep_all_levels=False):
"""
Summarizes results into a dataframe.
Parameters
----------
get_inference : lambda
Method to get the relevant inference results from each result object
props : list of (string, str or lambda)
Set of column names and ways to get the corresponding values from the inference object
n : int
The number of samples in the dataset
expand_arr : bool, default False
Whether to add a synthetic sample dimension to the result arrays when performing internal computations
keep_all_levels : bool, default False
Whether to keep all levels, even when they don't take on more than one value;
Note that regardless of this argument the "sample" level will only be present if expand_arr is False
"""
def make_dataframe(props):
to_include = OrderedDict([(key, value.reshape(-1))
for key, value in props])
# TODO: enrich outcome logic for multi-class classification when that is supported
index = pd.MultiIndex.from_tuples([(i, outcome, res.feature_name, f"{lvl}v{res.feature_baseline}"
if res.feature_baseline is not None
else lvl)
for i in range(n)
for outcome in ["y0"]
for res in self._results
for lvl in res.feature_levels],
names=["sample", "outcome", "feature", "feature_value"])
if expand_arr:
# There is no actual sample level in this data
index = index.droplevel("sample")
if not keep_all_levels:
for lvl in index.levels:
if len(lvl) == 1:
if not isinstance(index, pd.MultiIndex):
# can't drop only level
index = pd.Index([self._results[0].feature_name], name="feature")
else:
index = index.droplevel(lvl.name)
return pd.DataFrame(to_include, index=index)
return self._summarize(summary=make_dataframe,
get_inference=get_inference,
props=props,
expand_arr=expand_arr,
drop_sample=False) # dropping the sample dimension is handled above instead
def _dict_summary(self, get_inference, *, n, props, kind, drop_sample=False, expand_arr=False, row_wise=False):
"""
Summarizes results into a dictionary.
Parameters
----------
get_inference : lambda
Method to get the relevant inference results from each result object
n : int
The number of samples in the dataset
props : list of (string, str or lambda)
Set of column names and ways to get the corresponding values from the inference object
kind : str
The kind of inference results to get (e.g. 'global', 'local', or 'cohort')
drop_sample : bool, default False
Whether to drop the sample dimension from each array
expand_arr : bool, default False
Whether to add an initial sample dimension to the result arrays
row_wise : bool, default False
Whether to return a list of dictionaries (one dictionary per row) instead of
a dictionary of lists (one list per column)
"""
def make_dict(props):
# should be serialization-ready and contain no numpy arrays
res = _get_default_specific_insights(kind)
shared = self._shared
if row_wise:
row_data = {}
# remove entries belonging to row data, since we're including them in the list of nested dictionaries
for k in _get_data_causal_insights_keys():
del res[k]
shared = shared.copy() # copy so that we can modify without affecting shared state
# TODO: Note that there's no column metadata for the sample number - should there be?
for k in _get_column_causal_insights_keys():
# need to replicate the column info for each sample, then remove from the shared data
row_data[k] = shared[k] * n
del shared[k]
# NOTE: the flattened order has the ouptut dimension before the feature dimension
# which may need to be revisited once we support multiclass
row_data.update([(key, value.flatten()) for key, value in props])
# get the length of the list corresponding to the first dictionary key
# `list(row_data)` gets the keys as a list, since `row_data.keys()` can't be indexed into
n_rows = len(row_data[list(row_data)[0]])
res[_CausalInsightsConstants.RowData] = [{key: row_data[key][i]
for key in row_data} for i in range(n_rows)]
else:
res.update([(key, value.tolist()) for key, value in props])
return {**shared, **res}
return self._summarize(summary=make_dict,
get_inference=get_inference,
props=props,
expand_arr=expand_arr,
drop_sample=drop_sample)
[docs] def global_causal_effect(self, *, alpha=0.05, keep_all_levels=False):
"""
Get the global causal effect for each feature as a pandas DataFrame.
Parameters
----------
alpha : float, default 0.05
The confidence level of the confidence interval
keep_all_levels : bool, default False
Whether to keep all levels of the output dataframe ('outcome', 'feature', and 'feature_level')
even if there was only a single value for that level; by default single-valued levels are dropped.
Returns
-------
global_effects : DataFrame
DataFrame with the following structure:
:Columns: ['point', 'stderr', 'zstat', 'pvalue', 'ci_lower', 'ci_upper']
:Index: ['feature', 'feature_value']
:Rows: For each feature that is numerical, we have an entry with index ['{feature_name}', 'num'], where
'num' is literally the string 'num' and feature_name is the input feature name.
For each feature that is categorical, we have an entry with index ['{feature_name}',
'{cat}v{base}'] where cat is the category value and base is the category used as baseline.
If all features are numerical then the feature_value index is dropped in the dataframe, but not
in the serialized dict.
"""
# a global inference indicates the effect of that one feature on the outcome
return self._pandas_summary(lambda res: res.global_inference, props=self._point_props(alpha),
n=1, expand_arr=True, keep_all_levels=keep_all_levels)
def _global_causal_effect_dict(self, *, alpha=0.05, row_wise=False):
"""
Gets the global causal effect for each feature as dictionary.
Dictionary entries for predictions, etc. will be nested lists of shape (d_y, sum(d_t))
Only for serialization purposes to upload to AzureML
"""
return self._dict_summary(lambda res: res.global_inference, props=self._point_props(alpha),
kind='global', n=1, row_wise=row_wise, drop_sample=True, expand_arr=True)
def _cohort_effect_inference(self, Xtest):
assert np.ndim(Xtest) == 2 and np.shape(Xtest)[1] == self._d_x, (
"Shape of Xtest must be compatible with shape of X, "
f"but got shape {np.shape(Xtest)} instead of (n, {self._d_x})"
)
def inference_from_result(result):
est = result.estimator
X = result.X_transformer.transform(Xtest)
if X.shape[1] == 0:
X = None
return est.const_marginal_ate_inference(X=X)
return inference_from_result
[docs] def cohort_causal_effect(self, Xtest, *, alpha=0.05, keep_all_levels=False):
"""
Gets the average causal effects for a particular cohort defined by a population of X's.
Parameters
----------
Xtest : array_like
The cohort samples for which to return the average causal effects within cohort
alpha : float, default 0.05
The confidence level of the confidence interval
keep_all_levels : bool, default False
Whether to keep all levels of the output dataframe ('outcome', 'feature', and 'feature_level')
even if there was only a single value for that level; by default single-valued levels are dropped.
Returns
-------
cohort_effects : DataFrame
DataFrame with the following structure:
:Columns: ['point', 'stderr', 'zstat', 'pvalue', 'ci_lower', 'ci_upper']
:Index: ['feature', 'feature_value']
:Rows: For each feature that is numerical, we have an entry with index ['{feature_name}', 'num'], where
'num' is literally the string 'num' and feature_name is the input feature name.
For each feature that is categorical, we have an entry with index ['{feature_name}', '{cat}v{base}']
where cat is the category value and base is the category used as baseline.
If all features are numerical then the feature_value index is dropped in the dataframe, but not
in the serialized dict.
"""
return self._pandas_summary(self._cohort_effect_inference(Xtest),
props=self._summary_props(alpha), n=1,
expand_arr=True, keep_all_levels=keep_all_levels)
def _cohort_causal_effect_dict(self, Xtest, *, alpha=0.05, row_wise=False):
"""
Gets the cohort causal effects for each feature as dictionary.
Dictionary entries for predictions, etc. will be nested lists of shape (d_y, sum(d_t))
Only for serialization purposes to upload to AzureML
"""
return self._dict_summary(self._cohort_effect_inference(Xtest), props=self._summary_props(alpha),
kind='cohort', n=1, row_wise=row_wise, expand_arr=True, drop_sample=True)
def _local_effect_inference(self, Xtest):
assert np.ndim(Xtest) == 2 and np.shape(Xtest)[1] == self._d_x, (
"Shape of Xtest must be compatible with shape of X, "
f"but got shape {np.shape(Xtest)} instead of (n, {self._d_x})"
)
def inference_from_result(result):
est = result.estimator
X = result.X_transformer.transform(Xtest)
if X.shape[1] == 0:
X = None
eff = est.const_marginal_effect_inference(X=X)
if X is None:
# need to reshape the output to match the input
eff = eff._expand_outputs(Xtest.shape[0])
return eff
return inference_from_result
[docs] def local_causal_effect(self, Xtest, *, alpha=0.05, keep_all_levels=False):
"""
Gets the local causal effect for each feature as a pandas DataFrame.
Parameters
----------
Xtest : array_like
The samples for which to return the causal effects
alpha : float, default 0.05
The confidence level of the confidence interval
keep_all_levels : bool, default False
Whether to keep all levels of the output dataframe ('sample', 'outcome', 'feature', and 'feature_level')
even if there was only a single value for that level; by default single-valued levels are dropped.
Returns
-------
global_effect : DataFrame
DataFrame with the following structure:
:Columns: ['point', 'stderr', 'zstat', 'pvalue', 'ci_lower', 'ci_upper']
:Index: ['sample', 'feature', 'feature_value']
:Rows: For each feature that is numeric, we have an entry with index
['{sampleid}', '{feature_name}', 'num'],
where 'num' is literally the string 'num' and feature_name is the input feature name and sampleid is
the index of the sample in Xtest.
For each feature that is categorical, we have an entry with index
['{sampleid', '{feature_name}', '{cat}v{base}']
where cat is the category value and base is the category used as baseline.
If all features are numerical then the feature_value index is dropped in the dataframe, but not
in the serialized dict.
"""
return self._pandas_summary(self._local_effect_inference(Xtest),
props=self._point_props(alpha), n=Xtest.shape[0], keep_all_levels=keep_all_levels)
def _local_causal_effect_dict(self, Xtest, *, alpha=0.05, row_wise=False):
"""
Gets the local feature importance as dictionary
Dictionary entries for predictions, etc. will be nested lists of shape (n_rows, d_y, sum(d_t))
Only for serialization purposes to upload to AzureML
"""
return self._dict_summary(self._local_effect_inference(Xtest), props=self._point_props(alpha),
kind='local', n=Xtest.shape[0], row_wise=row_wise)
def _safe_result_index(self, X, feature_index):
assert hasattr(self, "_results"), "This instance has not yet been fitted"
assert np.ndim(X) == 2 and np.shape(X)[1] == self._d_x, (
"Shape of X must be compatible with shape of the fitted X, "
f"but got shape {np.shape(X)} instead of (n, {self._d_x})"
)
(numeric_index,) = _get_column_indices(X, [feature_index])
bad_inds = dict(self.untrained_feature_indices_)
if numeric_index in bad_inds:
error = bad_inds[numeric_index]
col_text = self._format_col(numeric_index)
if error == 'cat_limit':
msg = f"{col_text} had a value with fewer than {_CAT_LIMIT} occurences, so no model was fit for it"
elif error == 'upper_bound_on_cat_expansion':
msg = (f"{col_text} had more distinct values than the setting of 'upper_bound_on_cat_expansion', "
"so no model was fit for it")
else:
msg = (f"{col_text} generated the following error during fitting, "
f"so no model was fit for it:\n{str(error)}")
raise ValueError(msg)
if numeric_index not in self.trained_feature_indices_:
raise ValueError(f"{self._format_col(numeric_index)} was not passed as a feature index "
"so no model was fit for it")
results = [res for res in self._results
if res.feature_index == numeric_index]
assert len(results) == 1
(result,) = results
return result
def _whatif_inference(self, X, Xnew, feature_index, y):
assert not self.classification, "What-if analysis cannot be applied to classification tasks"
assert np.shape(X)[0] == np.shape(Xnew)[0] == np.shape(y)[0], (
"X, Xnew, and y must have the same length, but have shapes "
f"{np.shape(X)}, {np.shape(Xnew)}, and {np.shape(y)}"
)
assert np.size(feature_index) == 1, f"Only one feature index may be changed, but got {np.size(feature_index)}"
T0 = _safe_indexing(X, feature_index, axis=1)
T1 = Xnew
result = self._safe_result_index(X, feature_index)
X = result.X_transformer.transform(X)
if X.shape[1] == 0:
X = None
inf = result.estimator.effect_inference(X=X, T0=T0, T1=T1)
# we want to offset the inference object by the baseline estimate of y
inf.translate(y)
return inf
[docs] def whatif(self, X, Xnew, feature_index, y, *, alpha=0.05):
"""
Get counterfactual predictions when feature_index is changed to Xnew from its observational counterpart.
Note that this only applies to regression use cases; for classification what-if analysis is not supported.
Parameters
----------
X: array_like
Features
Xnew: array_like
New values of a single column of X
feature_index: int or str
The index of the feature being varied to Xnew, either as a numeric index or
the string name if the input is a dataframe
y: array_like
Observed labels or outcome of a predictive model for baseline y values
alpha : float in [0, 1], default 0.05
Confidence level of the confidence intervals displayed in the leaf nodes.
A (1-alpha)*100% confidence interval is displayed.
Returns
-------
y_new: DataFrame
The predicted outputs that would have been observed under the counterfactual features
"""
return self._whatif_inference(X, Xnew, feature_index, y).summary_frame(alpha=alpha)
def _whatif_dict(self, X, Xnew, feature_index, y, *, alpha=0.05, row_wise=False):
"""
Get counterfactual predictions when feature_index is changed to Xnew from its observational counterpart.
Note that this only applies to regression use cases; for classification what-if analysis is not supported.
Parameters
----------
X: array_like
Features
Xnew: array_like
New values of a single column of X
feature_index: int or str
The index of the feature being varied to Xnew, either as a numeric index or
the string name if the input is a dataframe
y: array_like
Observed labels or outcome of a predictive model for baseline y values
alpha : float in [0, 1], default 0.05
Confidence level of the confidence intervals displayed in the leaf nodes.
A (1-alpha)*100% confidence interval is displayed.
row_wise : bool, default False
Whether to return a list of dictionaries (one dictionary per row) instead of
a dictionary of lists (one list per column)
Returns
-------
dict : dict
The counterfactual predictions, as a dictionary
"""
inf = self._whatif_inference(X, Xnew, feature_index, y)
props = self._point_props(alpha=alpha)
res = _get_default_specific_insights('whatif')
if row_wise:
row_data = {}
# remove entries belonging to row data, since we're including them in the list of nested dictionaries
for k in _get_data_causal_insights_keys():
del res[k]
row_data.update([(key, self._make_accessor(attr)(inf).flatten()) for key, attr in props])
# get the length of the list corresponding to the first dictionary key
# `list(row_data)` gets the keys as a list, since `row_data.keys()` can't be indexed into
n_rows = len(row_data[list(row_data)[0]])
res[_CausalInsightsConstants.RowData] = [{key: row_data[key][i]
for key in row_data} for i in range(n_rows)]
else:
res.update([(key, self._make_accessor(attr)(inf).tolist()) for key, attr in props])
return res
def _tree(self, is_policy, Xtest, feature_index, *, treatment_costs=0,
max_depth=3, min_samples_leaf=2, min_impurity_decrease=1e-4,
include_model_uncertainty=False, alpha=0.05):
result = self._safe_result_index(Xtest, feature_index)
Xtest = result.X_transformer.transform(Xtest)
if Xtest.shape[1] == 0:
Xtest = None
if result.feature_baseline is None:
treatment_names = ['decrease', 'increase']
else:
treatment_names = [f"{result.feature_baseline}"] + \
[f"{lvl}" for lvl in result.feature_levels]
TreeType = SingleTreePolicyInterpreter if is_policy else SingleTreeCateInterpreter
intrp = TreeType(include_model_uncertainty=include_model_uncertainty,
uncertainty_level=alpha,
max_depth=max_depth,
min_samples_leaf=min_samples_leaf,
min_impurity_decrease=min_impurity_decrease,
random_state=self.random_state)
if is_policy:
intrp.interpret(result.estimator, Xtest,
sample_treatment_costs=treatment_costs)
if result.feature_baseline is None: # continuous treatment, so apply a treatment level 10% of typical
treatment_level = result.treatment_value * 0.1
# NOTE: this calculation is correct only if treatment costs are marginal costs,
# because then scaling the difference between treatment value and treatment costs is the
# same as scaling the treatment value and subtracting the scaled treatment cost.
#
# Note also that unlike the standard outputs of the SinglePolicyTreeInterpreter, for
# continuous treatments, the policy value should include the benefit of decreasing treatments
# (rather than just not treating at all)
#
# We can get the total by seeing that if we restrict attention to units where we would treat,
# 2 * policy_value - always_treat
# includes exactly their contribution because policy_value and always_treat both include it
# and likewise restricting attention to the units where we want to decrease treatment,
# 2 * policy_value - always-treat
# also computes the *benefit* of decreasing treatment, because their contribution to policy_value
# is zero and the contribution to always_treat is negative
treatment_total = (2 * intrp.policy_value_ - intrp.always_treat_value_.item()) * treatment_level
always_totals = intrp.always_treat_value_ * treatment_level
else:
treatment_total = intrp.policy_value_
always_totals = intrp.always_treat_value_
policy_values = treatment_total, always_totals
else: # no policy values for CATE trees
intrp.interpret(result.estimator, Xtest)
policy_values = None
return intrp, result.X_transformer.get_feature_names_out(self.feature_names_), treatment_names, policy_values
# TODO: it seems like it would be better to just return the tree itself rather than plot it;
# however, the tree can't store the feature and treatment names we compute here...
[docs] def plot_policy_tree(self, Xtest, feature_index, *, treatment_costs=0,
max_depth=3, min_samples_leaf=2, min_value_increase=1e-4, include_model_uncertainty=False,
alpha=0.05):
"""
Plot a recommended policy tree using matplotlib.
Parameters
----------
X : array_like
Features
feature_index
Index of the feature to be considered as treament
treatment_costs: array_like, default 0
Cost of treatment, as a scalar value or per-sample. For continuous features this is the marginal cost per
unit of treatment; for discrete features, this is the difference in cost between each of the non-default
values and the default value (i.e., if non-scalar the array should have shape (n,d_t-1))
max_depth : int, default 3
maximum depth of the tree
min_samples_leaf : int, default 2
minimum number of samples on each leaf
min_value_increase : float, default 1e-4
The minimum increase in the policy value that a split needs to create to construct it
include_model_uncertainty : bool, default False
Whether to include confidence interval information when building a simplified model of the cate model.
alpha : float in [0, 1], default 0.05
Confidence level of the confidence intervals displayed in the leaf nodes.
A (1-alpha)*100% confidence interval is displayed.
"""
intrp, feature_names, treatment_names, _ = self._tree(True, Xtest, feature_index,
treatment_costs=treatment_costs,
max_depth=max_depth,
min_samples_leaf=min_samples_leaf,
min_impurity_decrease=min_value_increase,
include_model_uncertainty=include_model_uncertainty,
alpha=alpha)
return intrp.plot(feature_names=feature_names, treatment_names=treatment_names)
def _policy_tree_output(self, Xtest, feature_index, *, treatment_costs=0,
max_depth=3, min_samples_leaf=2, min_value_increase=1e-4, alpha=0.05):
"""
Get a tuple of policy outputs.
The first item in the tuple is the recommended policy tree expressed as a dictionary.
The second item is the per-unit-average value of applying the learned policy; if the feature is continuous this
means the gain from increasing the treatment by 10% of the typical amount for units where the treatment should
be increased and decreasing the treatment by 10% of the typical amount when not.
The third item is the value of always treating. This is a list, with one entry per non-control-treatment for
discrete features, or just a single entry for continuous features, again increasing by 10% of a typical amount.
Parameters
----------
X : array_like
Features
feature_index
Index of the feature to be considered as treament
treatment_costs: array_like, default 0
Cost of treatment, as a scalar value or per-sample. For continuous features this is the marginal cost per
unit of treatment; for discrete features, this is the difference in cost between each of the non-default
values and the default value (i.e., if non-scalar the array should have shape (n,d_t-1))
max_depth : int, default 3
maximum depth of the tree
min_samples_leaf : int, default 2
minimum number of samples on each leaf
min_value_increase : float, default 1e-4
The minimum increase in the policy value that a split needs to create to construct it
alpha : float in [0, 1], default 0.05
Confidence level of the confidence intervals displayed in the leaf nodes.
A (1-alpha)*100% confidence interval is displayed.
Returns
-------
output : _PolicyOutput
"""
(intrp, feature_names, treatment_names,
(policy_val, always_trt)) = self._tree(True, Xtest, feature_index,
treatment_costs=treatment_costs,
max_depth=max_depth,
min_samples_leaf=min_samples_leaf,
min_impurity_decrease=min_value_increase,
alpha=alpha)
def policy_data(tree, node_id, node_dict):
return {'treatment': treatment_names[np.argmax(tree.value[node_id])]}
return _PolicyOutput(_tree_interpreter_to_dict(intrp, feature_names, policy_data),
policy_val,
{treatment_names[i + 1]: val
for (i, val) in enumerate(always_trt.tolist())},
treatment_names[0])
# TODO: it seems like it would be better to just return the tree itself rather than plot it;
# however, the tree can't store the feature and treatment names we compute here...
[docs] def plot_heterogeneity_tree(self, Xtest, feature_index, *,
max_depth=3, min_samples_leaf=2, min_impurity_decrease=1e-4,
include_model_uncertainty=False,
alpha=0.05):
"""
Plot an effect heterogeneity tree using matplotlib.
Parameters
----------
X : array_like
Features
feature_index
Index of the feature to be considered as treament
max_depth : int, default 3
maximum depth of the tree
min_samples_leaf : int, default 2
minimum number of samples on each leaf
min_impurity_decrease : float, default 1e-4
The minimum decrease in the impurity/uniformity of the causal effect that a split needs to
achieve to construct it
include_model_uncertainty : bool, default False
Whether to include confidence interval information when building a simplified model of the cate model.
alpha : float in [0, 1], default 0.05
Confidence level of the confidence intervals displayed in the leaf nodes.
A (1-alpha)*100% confidence interval is displayed.
"""
intrp, feature_names, treatment_names, _ = self._tree(False, Xtest, feature_index,
max_depth=max_depth,
min_samples_leaf=min_samples_leaf,
min_impurity_decrease=min_impurity_decrease,
include_model_uncertainty=include_model_uncertainty,
alpha=alpha)
return intrp.plot(feature_names=feature_names,
treatment_names=treatment_names)
def _heterogeneity_tree_output(self, Xtest, feature_index, *,
max_depth=3, min_samples_leaf=2, min_impurity_decrease=1e-4,
include_model_uncertainty=False, alpha=0.05):
"""
Get an effect heterogeneity tree expressed as a dictionary.
Parameters
----------
X : array_like
Features
feature_index
Index of the feature to be considered as treament
max_depth : int, default 3
maximum depth of the tree
min_samples_leaf : int, default 2
minimum number of samples on each leaf
min_impurity_decrease : float, default 1e-4
The minimum decrease in the impurity/uniformity of the causal effect that a split needs to
achieve to construct it
include_model_uncertainty : bool, default False
Whether to include confidence interval information when building a simplified model of the cate model.
alpha : float in [0, 1], default 0.05
Confidence level of the confidence intervals displayed in the leaf nodes.
A (1-alpha)*100% confidence interval is displayed.
"""
intrp, feature_names, _, _ = self._tree(False, Xtest, feature_index,
max_depth=max_depth,
min_samples_leaf=min_samples_leaf,
min_impurity_decrease=min_impurity_decrease,
include_model_uncertainty=include_model_uncertainty,
alpha=alpha)
def hetero_data(tree, node_id, node_dict):
if include_model_uncertainty:
return {'effect': _sanitize(tree.value[node_id]),
'ci': _sanitize(node_dict[node_id]['ci'])}
else:
return {'effect': _sanitize(tree.value[node_id])}
return _tree_interpreter_to_dict(intrp, feature_names, hetero_data)
[docs] def individualized_policy(self, Xtest, feature_index, *, n_rows=None, treatment_costs=0, alpha=0.05):
"""
Get individualized treatment policy based on the learned model for a feature, sorted by the predicted effect.
Parameters
----------
Xtest: array_like
Features
feature_index: int or str
Index of the feature to be considered as treatment
n_rows: int, optional
How many rows to return (all rows by default)
treatment_costs: array_like, default 0
Cost of treatment, as a scalar value or per-sample. For continuous features this is the marginal cost per
unit of treatment; for discrete features, this is the difference in cost between each of the non-default
values and the default value (i.e., if non-scalar the array should have shape (n,d_t-1))
alpha: float in [0, 1], default 0.05
Confidence level of the confidence intervals
A (1-alpha)*100% confidence interval is returned
Returns
-------
output: DataFrame
Dataframe containing recommended treatment, effect, confidence interval, sorted by effect
"""
result = self._safe_result_index(Xtest, feature_index)
# get dataframe with all but selected column
orig_df = pd.DataFrame(Xtest, columns=self.feature_names_).rename(
columns={self.feature_names_[result.feature_index]: 'Current treatment'})
Xtest = result.X_transformer.transform(Xtest)
if Xtest.shape[1] == 0:
x_rows = Xtest.shape[0]
Xtest = None
if result.feature_baseline is None:
# apply 10% of a typical treatment for this feature
effect = result.estimator.effect_inference(Xtest, T1=result.treatment_value * 0.1)
else:
effect = result.estimator.const_marginal_effect_inference(Xtest)
if Xtest is None: # we got a scalar effect although our original X may have had more rows
effect = effect._expand_outputs(x_rows)
multi_y = (not self._vec_y) or self.classification
if multi_y and result.feature_baseline is not None and np.ndim(treatment_costs) == 2:
# we've got treatment costs of shape (n, d_t-1) so we need to add a y dimension to broadcast safely
treatment_costs = np.expand_dims(treatment_costs, 1)
effect.translate(-treatment_costs)
est = effect.point_estimate
est_lb = effect.conf_int(alpha)[0]
est_ub = effect.conf_int(alpha)[1]
if multi_y: # y was an array, not a vector
est = np.squeeze(est, 1)
est_lb = np.squeeze(est_lb, 1)
est_ub = np.squeeze(est_ub, 1)
if result.feature_baseline is None:
rec = np.empty(est.shape[0], dtype=object)
rec[est > 0] = "increase"
rec[est <= 0] = "decrease"
# set the effect bounds; for positive treatments these agree with
# the estimates; for negative treatments, we need to invert the interval
eff_lb, eff_ub = est_lb, est_ub
eff_lb[est <= 0], eff_ub[est <= 0] = -eff_ub[est <= 0], -eff_lb[est <= 0]
# the effect is now always positive since we decrease treatment when negative
eff = np.abs(est)
else:
# for discrete treatment, stack a zero result in front for control
zeros = np.zeros((est.shape[0], 1))
all_effs = np.hstack([zeros, est])
eff_ind = np.argmax(all_effs, axis=1)
treatment_arr = np.array([result.feature_baseline] + [lvl for lvl in result.feature_levels], dtype=object)
rec = treatment_arr[eff_ind]
# we need to call effect_inference to get the correct CI between the two treatment options
effect = result.estimator.effect_inference(Xtest, T0=orig_df['Current treatment'], T1=rec)
# we now need to construct the delta in the cost between the two treatments and translate the effect
current_treatment = orig_df['Current treatment'].values
if isinstance(current_treatment, pd.core.arrays.categorical.Categorical):
current_treatment = current_treatment.to_numpy()
if np.ndim(treatment_costs) >= 2:
# remove third dimenions potentially added
if multi_y: # y was an array, not a vector
treatment_costs = np.squeeze(treatment_costs, 1)
assert treatment_costs.shape[1] == len(treatment_arr) - 1, ("If treatment costs are an array, "
" they must be of shape (n, d_t-1),"
" where n is the number of samples"
" and d_t the number of treatment"
" categories.")
all_costs = np.hstack([zeros, treatment_costs])
# find cost of current treatment: equality creates a 2d array with True on each row,
# only if its the location of the current treatment. Then we take the corresponding cost.
current_cost = all_costs[current_treatment.reshape(-1, 1) == treatment_arr.reshape(1, -1)]
target_cost = np.take_along_axis(all_costs, eff_ind.reshape(-1, 1), 1).reshape(-1)
else:
assert isinstance(treatment_costs, (int, float)), ("Treatments costs should either be float or "
"a 2d array of size (n, d_t-1).")
all_costs = np.array([0] + [treatment_costs] * (len(treatment_arr) - 1))
# construct index of current treatment
current_ind = (current_treatment.reshape(-1, 1) ==
treatment_arr.reshape(1, -1)) @ np.arange(len(treatment_arr))
current_cost = all_costs[current_ind]
target_cost = all_costs[eff_ind]
delta_cost = current_cost - target_cost
# add second dimension if needed for broadcasting during translation of effect
if multi_y:
delta_cost = np.expand_dims(delta_cost, 1)
effect.translate(delta_cost)
eff = effect.point_estimate
eff_lb, eff_ub = effect.conf_int(alpha)
if multi_y: # y was an array, not a vector
eff = np.squeeze(eff, 1)
eff_lb = np.squeeze(eff_lb, 1)
eff_ub = np.squeeze(eff_ub, 1)
df = pd.DataFrame({'Treatment': rec,
'Effect of treatment': eff,
'Effect of treatment lower bound': eff_lb,
'Effect of treatment upper bound': eff_ub},
index=orig_df.index)
return df.join(orig_df).sort_values('Effect of treatment',
ascending=False).head(n_rows)
def _individualized_policy_dict(self, Xtest, feature_index, *, n_rows=None, treatment_costs=0, alpha=0.05):
"""
Get individualized treatment policy based on the learned model for a feature, sorted by the predicted effect.
Parameters
----------
Xtest: array_like
Features
feature_index: int or str
Index of the feature to be considered as treatment
n_rows: int, optional
How many rows to return (all rows by default)
treatment_costs: array_like, default 0
Cost of treatment, as a scalar value or per-sample
alpha: float in [0, 1], default 0.05
Confidence level of the confidence intervals
A (1-alpha)*100% confidence interval is returned
Returns
-------
output: dictionary
dictionary containing treatment policy, effects, and other columns
"""
return self.individualized_policy(Xtest, feature_index,
n_rows=n_rows,
treatment_costs=treatment_costs,
alpha=alpha).to_dict('list')
[docs] def typical_treatment_value(self, feature_index):
"""
Get the typical treatment value used for the specified feature
Parameters
----------
feature_index: int or str
The index of the feature to be considered as treatment
Returns
-------
treatment_value : float
The treatment value considered 'typical' for this feature
"""
result = [res for res in self._results if res.feature_index == feature_index]
if len(result) == 0:
if self._has_column_names:
result = [res for res in self._results if res.feature_name == feature_index]
assert len(result) == 1, f"Could not find feature with index/name {feature_index}"
return result[0].treatment_value
else:
raise ValueError(f"No feature with index {feature_index}")
return result[0].treatment_value