Source code for econml.utilities

# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

"""Utility methods."""

import numpy as np
import pandas as pd
import scipy.sparse
import sparse as sp
import itertools
import inspect
from operator import getitem
from collections import defaultdict, Counter
from sklearn import clone
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.linear_model import LassoCV, MultiTaskLassoCV, Lasso, MultiTaskLasso
from functools import reduce, wraps
from sklearn.utils import check_array, check_X_y
from sklearn.utils.validation import assert_all_finite
import warnings
from warnings import warn
from sklearn.model_selection import KFold, StratifiedKFold, GroupKFold
from collections.abc import Iterable
from sklearn.utils.multiclass import type_of_target
import numbers
from statsmodels.iolib.table import SimpleTable
from statsmodels.iolib.summary import summary_return
from statsmodels.compat.python import lmap
import copy
from inspect import signature

MAX_RAND_SEED = np.iinfo(np.int32).max


[docs]class IdentityFeatures(TransformerMixin): """Featurizer that just returns the input data."""
[docs] def fit(self, X): """Fit method (does nothing, just returns self).""" return self
[docs] def transform(self, X): """Perform the identity transform, which returns the input unmodified.""" return X
def parse_final_model_params(coef, intercept, d_y, d_t, d_t_in, bias_part_of_coef, fit_cate_intercept): dt = d_t if (d_t_in != d_t) and (d_t[0] == 1): # binary treatment dt = () cate_intercept = None if bias_part_of_coef: cate_coef = coef.reshape(d_y + dt + (-1,))[..., 1:] if fit_cate_intercept: cate_intercept = coef.reshape(d_y + dt + (-1,))[..., 0] else: cate_coef = coef.reshape(d_y + dt + (-1,)) if fit_cate_intercept: cate_intercept = np.reshape(intercept, d_y + dt) if (cate_intercept is not None) and (np.ndim(cate_intercept) == 0): cate_intercept = cate_intercept.item() return cate_coef, cate_intercept def check_high_dimensional(X, T, *, threshold, featurizer=None, discrete_treatment=False, msg=""): # Check if model is sparse enough for this model X, T = check_input_arrays(X, T) if X is None: d_x = 1 elif featurizer is None: d_x = X.shape[1] else: d_x = clone(featurizer, safe=False).fit_transform(X[[0], :]).shape[1] if discrete_treatment: d_t = len(set(T.flatten())) - 1 else: d_t = 1 if np.ndim(T) < 2 else T.shape[1] if d_x * d_t < threshold: warn(msg, UserWarning)
[docs]def inverse_onehot(T): """ Given a one-hot encoding of a value, return a vector reversing the encoding to get numeric treatment indices. Note that we assume that the first column has been removed from the input. Parameters ---------- T : array (shape (n, d_t-1)) The one-hot-encoded array Returns ------- A : vector of int (shape (n,)) The un-encoded 0-based category indices """ assert ndim(T) == 2 # note that by default OneHotEncoder returns float64s, so need to convert to int return (T @ np.arange(1, T.shape[1] + 1)).astype(int)
[docs]def issparse(X): """Determine whether an input is sparse. For the purposes of this function, both `scipy.sparse` matrices and `sparse.SparseArray` types are considered sparse. Parameters ---------- X : array-like The input to check Returns ------- bool Whether the input is sparse """ return scipy.sparse.issparse(X) or isinstance(X, sp.SparseArray)
[docs]def iscoo(X): """Determine whether an input is a `sparse.COO` array. Parameters ---------- X : array-like The input to check Returns ------- bool Whether the input is a `COO` array """ return isinstance(X, sp.COO)
[docs]def tocoo(X): """ Convert an array to a sparse COO array. If the input is already an `sparse.COO` object, this returns the object directly; otherwise it is converted. """ if isinstance(X, sp.COO): return X elif isinstance(X, sp.DOK): return sp.COO(X) elif scipy.sparse.issparse(X): return sp.COO.from_scipy_sparse(X) else: return sp.COO.from_numpy(X)
[docs]def todense(X): """ Convert an array to a dense numpy array. If the input is already a numpy array, this may create a new copy. """ if scipy.sparse.issparse(X): return X.toarray() elif isinstance(X, sp.SparseArray): return X.todense() else: # TODO: any way to avoid creating a copy if the array was already dense? # the call is necessary if the input was something like a list, though return np.array(X)
[docs]def size(X): """Return the number of elements in the array. Parameters ---------- a : array_like Input data Returns ------- int The number of elements of the array """ return X.size if issparse(X) else np.size(X)
[docs]def shape(X): """Return a tuple of array dimensions.""" return X.shape if issparse(X) else np.shape(X)
[docs]def ndim(X): """Return the number of array dimensions.""" return X.ndim if issparse(X) else np.ndim(X)
[docs]def reshape(X, shape): """Return a new array that is a reshaped version of an input array. The output will be sparse iff the input is. Parameters ---------- X : array_like The array to reshape shape : tuple of ints The desired shape of the output array Returns ------- ndarray or SparseArray The reshaped output array """ if scipy.sparse.issparse(X): # scipy sparse arrays don't support reshaping (even for 2D they throw not implemented errors), # so convert to pydata sparse first X = sp.COO.from_scipy_sparse(X) if len(shape) == 2: # in the 2D case, we can convert back to scipy sparse; in other cases we can't return X.reshape(shape).to_scipy_sparse() return X.reshape(shape)
def _apply(op, *XS): """ Apply a function to a sequence of sparse or dense array arguments. If any array is sparse then all arrays are converted to COO before the function is applied; if all of the arrays are scipy sparse arrays, and if the result is 2D, the returned value will be a scipy sparse array as well """ all_scipy_sparse = all(scipy.sparse.issparse(X) for X in XS) if any(issparse(X) for X in XS): XS = tuple(tocoo(X) for X in XS) result = op(*XS) if all_scipy_sparse and len(shape(result)) == 2: # both inputs were scipy and we can safely convert back to scipy because it's 2D return result.to_scipy_sparse() return result
[docs]def tensordot(X1, X2, axes): """ Compute tensor dot product along specified axes for arrays >= 1-D. Parameters ---------- X1, X2 : array_like, len(shape) >= 1 Tensors to "dot" axes : int or (2,) array_like integer_like If an int N, sum over the last N axes of `X1` and the first N axes of `X2` in order. The sizes of the corresponding axes must match (2,) array_like Or, a list of axes to be summed over, first sequence applying to `X1`, second to `X2`. Both elements array_like must be of the same length. """ def td(X1, X2): return sp.tensordot(X1, X2, axes) if iscoo(X1) else np.tensordot(X1, X2, axes) return _apply(td, X1, X2)
[docs]def cross_product(*XS): """ Compute the cross product of features. Parameters ---------- X1 : n x d1 matrix First matrix of n samples of d1 features (or an n-element vector, which will be treated as an n x 1 matrix) X2 : n x d2 matrix Second matrix of n samples of d2 features (or an n-element vector, which will be treated as an n x 1 matrix) Returns ------- A : n x (d1*d2*...) matrix Matrix of n samples of d1*d2*... cross product features, arranged in form such that each row t of X12 contains: [X1[t,0]*X2[t,0]*..., ..., X1[t,d1-1]*X2[t,0]*..., X1[t,0]*X2[t,1]*..., ..., X1[t,d1-1]*X2[t,1]*..., ...] """ for X in XS: assert 2 >= ndim(X) >= 1 n = shape(XS[0])[0] for X in XS: assert n == shape(X)[0] def cross(XS): k = len(XS) XS = [reshape(XS[i], (n,) + (1,) * (k - i - 1) + (-1,) + (1,) * i) for i in range(k)] return reshape(reduce(np.multiply, XS), (n, -1)) return _apply(cross, XS)
[docs]def stack(XS, axis=0): """ Join a sequence of arrays along a new axis. The axis parameter specifies the index of the new axis in the dimensions of the result. For example, if axis=0 it will be the first dimension and if axis=-1 it will be the last dimension. Parameters ---------- arrays : sequence of array_like Each array must have the same shape axis : int, optional The axis in the result array along which the input arrays are stacked Returns ------- ndarray or SparseArray The stacked array, which has one more dimension than the input arrays. It will be sparse if the inputs are. """ def st(*XS): return sp.stack(XS, axis=axis) if iscoo(XS[0]) else np.stack(XS, axis=axis) return _apply(st, *XS)
[docs]def concatenate(XS, axis=0): """ Join a sequence of arrays along an existing axis. Parameters ---------- X1, X2, ... : sequence of array_like The arrays must have the same shape, except in the dimension corresponding to `axis` (the first, by default). axis : int, optional The axis along which the arrays will be joined. Default is 0. Returns ------- ndarray or SparseArray The concatenated array. It will be sparse if the inputs are. """ def conc(*XS): return sp.concatenate(XS, axis=axis) if iscoo(XS[0]) else np.concatenate(XS, axis=axis) return _apply(conc, *XS)
# note: in contrast to np.hstack this only works with arrays of dimension at least 2
[docs]def hstack(XS): """ Stack arrays in sequence horizontally (column wise). This is equivalent to concatenation along the second axis Parameters ---------- XS : sequence of ndarrays The arrays must have the same shape along all but the second axis. Returns ------- ndarray or SparseArray The array formed by stacking the given arrays. It will be sparse if the inputs are. """ # Confusingly, this needs to concatenate, not stack (stack returns an array with an extra dimension) return concatenate(XS, 1)
[docs]def vstack(XS): """ Stack arrays in sequence vertically (row wise). This is equivalent to concatenation along the first axis after 1-D arrays of shape (N,) have been reshaped to (1,N). Parameters ---------- XS : sequence of ndarrays The arrays must have the same shape along all but the first axis. 1-D arrays must have the same length. Returns ------- ndarray or SparseArray The array formed by stacking the given arrays, will be at least 2-D. It will be sparse if the inputs are. """ # Confusingly, this needs to concatenate, not stack (stack returns an array with an extra dimension) return concatenate(XS, 0)
[docs]def transpose(X, axes=None): """ Permute the dimensions of an array. Parameters ---------- X : array_like Input array. axes : list of ints, optional By default, reverse the dimensions, otherwise permute the axes according to the values given Returns ------- p : ndarray or SparseArray `X` with its axes permuted. This will be sparse if `X` is. """ def t(X): if iscoo(X): return X.transpose(axes) else: return np.transpose(X, axes) return _apply(t, X)
[docs]def add_intercept(X): """ Adds an intercept feature to an array by prepending a column of ones. Parameters ---------- X : array-like Input array. Must be 2D. Returns ------- arr : ndarray `X` with a column of ones prepended """ return hstack([np.ones((X.shape[0], 1)), X])
[docs]def reshape_Y_T(Y, T): """ Reshapes Y and T when Y.ndim = 2 and/or T.ndim = 1. Parameters ---------- Y : array_like, shape (n, ) or (n, 1) Outcome for the treatment policy. Must be a vector or single-column matrix. T : array_like, shape (n, ) or (n, d_t) Treatment policy. Returns ------- Y : array_like, shape (n, ) Flattened outcome for the treatment policy. T : array_like, shape (n, 1) or (n, d_t) Reshaped treatment policy. """ assert(len(Y) == len(T)) assert(Y.ndim <= 2) if Y.ndim == 2: assert(Y.shape[1] == 1) Y = Y.flatten() if T.ndim == 1: T = T.reshape(-1, 1) return Y, T
[docs]def check_inputs(Y, T, X, W=None, multi_output_T=True, multi_output_Y=True): """ Input validation for CATE estimators. Checks Y, T, X, W for consistent length, enforces X, W 2d. Standard input checks are only applied to all inputs, such as checking that an input does not have np.nan or np.inf targets. Converts regular Python lists to numpy arrays. Parameters ---------- Y : array_like, shape (n, ) or (n, d_y) Outcome for the treatment policy. T : array_like, shape (n, ) or (n, d_t) Treatment policy. X : array-like, shape (n, d_x) Feature vector that captures heterogeneity. W : array-like, shape (n, d_w) or None (default=None) High-dimensional controls. multi_output_T : bool Whether to allow more than one treatment. multi_output_Y: bool Whether to allow more than one outcome. Returns ------- Y : array_like, shape (n, ) or (n, d_y) Converted and validated Y. T : array_like, shape (n, ) or (n, d_t) Converted and validated T. X : array-like, shape (n, d_x) Converted and validated X. W : array-like, shape (n, d_w) or None (default=None) Converted and validated W. """ X, T = check_X_y(X, T, multi_output=multi_output_T, y_numeric=True) _, Y = check_X_y(X, Y, multi_output=multi_output_Y, y_numeric=True) if W is not None: W, _ = check_X_y(W, Y, multi_output=multi_output_Y, y_numeric=True) return Y, T, X, W
[docs]def check_input_arrays(*args, validate_len=True, force_all_finite=True): """Cast input sequences into numpy arrays. Only inputs that are sequence-like will be converted, all other inputs will be left as is. When `validate_len` is True, the sequences will be checked for equal length. Parameters ---------- args : scalar or array_like Inputs to be checked. validate_len : bool (default=True) Whether to check if the input arrays have the same length. force_all_finite : bool (default=True) Whether to allow inf and nan in input arrays. Returns ------- args: array-like List of inputs where sequence-like objects have been cast to numpy arrays. """ n = None args = list(args) for i, arg in enumerate(args): if np.ndim(arg) > 0: new_arg = check_array(arg, dtype=None, ensure_2d=False, accept_sparse=True, force_all_finite=force_all_finite) if not force_all_finite: # For when checking input values is disabled try: assert_all_finite(new_arg) except ValueError: warnings.warn("Input contains NaN, infinity or a value too large for dtype('float64') " "but input check is disabled. Check the inputs before proceeding.") if validate_len: m = new_arg.shape[0] if n is None: n = m else: assert (m == n), "Input arrays have incompatible lengths: {} and {}".format(n, m) args[i] = new_arg return args
[docs]def get_input_columns(X, prefix="X"): """Extracts column names from dataframe-like input object. Currently supports column name extraction from pandas DataFrame and Series objects. Parameters ---------- X : array_like or None Input array with column names to be extracted. prefix: string or None If input array doesn't have column names, a default using the naming scheme "{prefix}{column number}" will be returned. Returns ------- cols: array-like or None List of columns corresponding to the dataframe-like object. None if the input array is not in the supported types. """ if X is None: return None if np.ndim(X) == 0: raise ValueError( f"Expected array-like object for imput with prefix {prefix} but got '{X}' object instead.") # Type to column extraction function type_to_func = { pd.DataFrame: lambda x: x.columns.tolist(), pd.Series: lambda x: [x.name] } if type(X) in type_to_func: return type_to_func[type(X)](X) len_X = 1 if np.ndim(X) == 1 else np.asarray(X).shape[1] return [f"{prefix}{i}" for i in range(len_X)]
def get_feature_names_or_default(featurizer, feature_names): if hasattr(featurizer, 'get_feature_names'): # Get number of arguments, some sklearn featurizer don't accept feature_names arg_no = len(inspect.getfullargspec(featurizer.get_feature_names).args) if arg_no == 1: return featurizer.get_feature_names() elif arg_no == 2: return featurizer.get_feature_names(feature_names) # Featurizer doesn't have 'get_feature_names' or has atypical 'get_feature_names' try: # Get feature names using featurizer dummy_X = np.ones((1, len(feature_names))) return get_input_columns(featurizer.transform(dummy_X), prefix="feat(X)") except Exception: # All attempts at retrieving transformed feature names have failed # Delegate handling to downstream logic return None
[docs]def check_models(models, n): """ Input validation for metalearner models. Check whether the input models satisfy the criteria below. Parameters ---------- models : estimator or a list/tuple of estimators n : int Number of models needed Returns ---------- models : a list/tuple of estimators """ if isinstance(models, (tuple, list)): if n != len(models): raise ValueError("The number of estimators doesn't equal to the number of treatments. " "Please provide either a tuple/list of estimators " "with same number of treatments or an unified estimator.") elif hasattr(models, 'fit'): models = [clone(models, safe=False) for i in range(n)] else: raise ValueError( "models must be either a tuple/list of estimators with same number of treatments " "or an unified estimator.") return models
[docs]def broadcast_unit_treatments(X, d_t): """ Generate `d_t` unit treatments for each row of `X`. Parameters ---------- d_t: int Number of treatments X : array Features Returns ------- X, T : (array, array) The updated `X` array (with each row repeated `d_t` times), and the generated `T` array """ d_x = shape(X)[0] eye = np.eye(d_t) # tile T and repeat X along axis 0 (so that the duplicated rows of X remain consecutive) T = np.tile(eye, (d_x, 1)) Xs = np.repeat(X, d_t, axis=0) return Xs, T
[docs]def reshape_treatmentwise_effects(A, d_t, d_y): """ Given an effects matrix ordered first by treatment, transform it to be ordered by outcome. Parameters ---------- A : array The array of effects, of size n*d_y*d_t d_t : tuple of int Either () if T was a vector, or a 1-tuple of the number of columns of T if it was an array d_y : tuple of int Either () if Y was a vector, or a 1-tuple of the number of columns of Y if it was an array Returns ------- A : array (shape (m, d_y, d_t)) The transformed array. Note that singleton dimensions will be dropped for any inputs which were vectors, as in the specification of `BaseCateEstimator.marginal_effect`. """ A = reshape(A, (-1,) + d_t + d_y) if d_t and d_y: return transpose(A, (0, 2, 1)) # need to return as m by d_y by d_t matrix else: return A
[docs]def einsum_sparse(subscripts, *arrs): """ Evaluate the Einstein summation convention on the operands. Using the Einstein summation convention, many common multi-dimensional array operations can be represented in a simple fashion. This function provides a way to compute such summations. Parameters ---------- subscripts : str Specifies the subscripts for summation. Unlike `np.eisnum` elipses are not supported and the output must be explicitly included arrs : list of COO arrays These are the sparse arrays for the operation. Returns ------- SparseArray The sparse array calculated based on the Einstein summation convention. """ inputs, outputs = subscripts.split('->') inputs = inputs.split(',') outputInds = set(outputs) allInds = set.union(*[set(i) for i in inputs]) # same number of input definitions as arrays assert len(inputs) == len(arrs) # input definitions have same number of dimensions as each array assert all(arr.ndim == len(input) for (arr, input) in zip(arrs, inputs)) # all result indices are unique assert len(outputInds) == len(outputs) # all result indices must match at least one input index assert outputInds <= allInds # map indices to all array, axis pairs for that index indMap = {c: [(n, i) for n in range(len(inputs)) for (i, x) in enumerate(inputs[n]) if x == c] for c in allInds} for c in indMap: # each index has the same cardinality wherever it appears assert len({arrs[n].shape[i] for (n, i) in indMap[c]}) == 1 # State: list of (set of letters, list of (corresponding indices, value)) # Algo: while list contains more than one entry # take two entries # sort both lists by intersection of their indices # merge compatible entries (where intersection of indices is equal - in the resulting list, # take the union of indices and the product of values), stepping through each list linearly # TODO: might be faster to break into connected components first # e.g. for "ab,d,bc->ad", the two components "ab,bc" and "d" are independent, # so compute their content separately, then take cartesian product # this would save a few pointless sorts by empty tuples # TODO: Consider investigating other performance ideas for these cases # where the dense method beat the sparse method (usually sparse is faster) # e,facd,c->cfed # sparse: 0.0335489 # dense: 0.011465999999999997 # gbd,da,egb->da # sparse: 0.0791625 # dense: 0.007319099999999995 # dcc,d,faedb,c->abe # sparse: 1.2868097 # dense: 0.44605229999999985 def merge(x1, x2): (s1, l1), (s2, l2) = x1, x2 keys = {c for c in s1 if c in s2} # intersection of strings outS = ''.join(set(s1 + s2)) # union of strings outMap = [(True, s1.index(c)) if c in s1 else (False, s2.index(c)) for c in outS] def keyGetter(s): inds = [s.index(c) for c in keys] return lambda p: tuple(p[0][ind] for ind in inds) kg1 = keyGetter(s1) kg2 = keyGetter(s2) l1.sort(key=kg1) l2.sort(key=kg2) i1 = i2 = 0 outL = [] while i1 < len(l1) and i2 < len(l2): k1, k2 = kg1(l1[i1]), kg2(l2[i2]) if k1 < k2: i1 += 1 elif k2 < k1: i2 += 1 else: j1, j2 = i1, i2 while j1 < len(l1) and kg1(l1[j1]) == k1: j1 += 1 while j2 < len(l2) and kg2(l2[j2]) == k2: j2 += 1 for c1, d1 in l1[i1:j1]: for c2, d2 in l2[i2:j2]: outL.append((tuple(c1[charIdx] if inFirst else c2[charIdx] for inFirst, charIdx in outMap), d1 * d2)) i1 = j1 i2 = j2 return outS, outL # when indices are repeated within an array, pre-filter the coordinates and data def filter_inds(coords, data, n): counts = Counter(inputs[n]) repeated = [(c, counts[c]) for c in counts if counts[c] > 1] if len(repeated) > 0: mask = np.full(len(data), True) for (k, v) in repeated: inds = [i for i in range(len(inputs[n])) if inputs[n][i] == k] for i in range(1, v): mask &= (coords[:, inds[0]] == coords[:, inds[i]]) if not all(mask): return coords[mask, :], data[mask] return coords, data xs = [(s, list(zip(c, d))) for n, (s, arr) in enumerate(zip(inputs, arrs)) for c, d in [filter_inds(arr.coords.T, arr.data, n)]] # TODO: would using einsum's paths to optimize the order of merging help? while len(xs) > 1: xs.append(merge(xs.pop(), xs.pop())) results = defaultdict(int) for (s, l) in xs: coordMap = [s.index(c) for c in outputs] for (c, d) in l: results[tuple(c[i] for i in coordMap)] += d return sp.COO(np.array(list(results.keys())).T if results else np.empty((len(outputs), 0)), np.array(list(results.values())), [arrs[indMap[c][0][0]].shape[indMap[c][0][1]] for c in outputs])
[docs]def fit_with_groups(model, X, y, groups=None, **kwargs): """ Fit a model while correctly handling grouping if necessary. This enables us to perform an inner-loop cross-validation of a model which handles grouping correctly, which is not easy using typical sklearn models. For example, GridSearchCV and RandomSearchCV both support passing 'groups' to fit, but other CV-related estimators (such as those derived from LinearModelCV, including LassoCV), do not support passing groups to fit which meanst that GroupKFold cannot be used as the cv instance when using these types, because the required 'groups' argument will never be passed to the GroupKFold's split method. See also https://github.com/scikit-learn/scikit-learn/issues/12052 The (hacky) workaround that is used here is to explicitly set the 'cv' attribute (if there is one) to the exact set of rows and not to use GroupKFold even with the sklearn classes that could support it; this should work with classes derived from BaseSearchCV, LinearModelCV, and CalibratedClassifierCV. Parameters ---------- model : estimator The model to fit X : array-like The features to fit against y : array-like The target to fit against groups : array-like, optional The set of groupings that should be kept together when splitting rows for cross-validation kwargs : dict Any other named arguments to pass to the model's fit """ if groups is not None: # assume that we should perform nested cross-validation if and only if # the model has a 'cv' attribute; this is a somewhat brittle assumption... if hasattr(model, 'cv'): old_cv = model.cv # logic copied from check_cv cv = 5 if old_cv is None else old_cv if isinstance(cv, numbers.Integral): cv = GroupKFold(cv) # otherwise we will assume the user already set the cv attribute to something # compatible with splitting with a 'groups' argument # now we have to compute the folds explicitly because some classifiers (like LassoCV) # don't use the groups when calling split internally splits = list(cv.split(X, y, groups=groups)) try: model.cv = splits return model.fit(X, y, **kwargs) finally: model.cv = old_cv return model.fit(X, y, **kwargs)
[docs]def filter_none_kwargs(**kwargs): """ Filters out any keyword arguments that are None. This is useful when specific optional keyword arguments might not be universally supported, so that stripping them out when they are not set enables more uses to succeed. Parameters ---------- kwargs: dict The keyword arguments to filter Returns ------- filtered_kwargs: dict The input dictionary, but with all entries having value None removed """ return {key: value for key, value in kwargs.items() if value is not None}
[docs]class WeightedModelWrapper: """Helper class for assiging weights to models without this option. Parameters ---------- model_instance : estimator Model that requires weights. sample_type : string, optional (default=`weighted`) Method for adding weights to the model. `weighted` for linear regression models where the weights can be incorporated in the matrix multiplication, `sampled` for other models. `sampled` samples the training set according to the normalized weights and creates a dataset larger than the original. """ def __init__(self, model_instance, sample_type="weighted"): self.model_instance = model_instance if sample_type == "weighted": self.data_transform = self._weighted_inputs else: warnings.warn("The model provided does not support sample weights. " "Manual weighted sampling may icrease the variance in the results.", UserWarning) self.data_transform = self._sampled_inputs
[docs] def fit(self, X, y, sample_weight=None): """Fit underlying model instance with weighted inputs. Parameters ---------- X : array-like, shape (n_samples, n_features) Training data. y : array-like, shape (n_samples, n_outcomes) Target values. Returns ------- self: an instance of the underlying estimator. """ if sample_weight is not None: X, y = self.data_transform(X, y, sample_weight) return self.model_instance.fit(X, y)
[docs] def predict(self, X): """Predict using the linear model. Parameters ---------- X : array-like or sparse matrix, shape (n_samples, n_features) Samples. Returns ------- C : array, shape (n_samples, n_outcomes) Returns predicted values. """ return self.model_instance.predict(X)
def _weighted_inputs(self, X, y, sample_weight): X, y = check_X_y(X, y, y_numeric=True, multi_output=True) normalized_weights = sample_weight * X.shape[0] / np.sum(sample_weight) sqrt_weights = np.sqrt(normalized_weights) weighted_X = sqrt_weights.reshape(-1, 1) * X weighted_y = sqrt_weights.reshape(-1, 1) * y if y.ndim > 1 else sqrt_weights * y return weighted_X, weighted_y def _sampled_inputs(self, X, y, sample_weight): # Normalize weights normalized_weights = sample_weight / np.sum(sample_weight) data_length = int(min(1 / np.min(normalized_weights[normalized_weights > 0]), 10) * X.shape[0]) data_indices = np.random.choice(X.shape[0], size=data_length, p=normalized_weights) return X[data_indices], y[data_indices]
[docs]class MultiModelWrapper: """Helper class for training different models for each treatment. Parameters ---------- model_list : array-like, shape (n_T, ) List of models to be trained separately for each treatment group. """ def __init__(self, model_list=[]): self.model_list = model_list self.n_T = len(model_list)
[docs] def fit(self, Xt, y, sample_weight=None): """Fit underlying list of models with weighted inputs. Parameters ---------- X : array-like, shape (n_samples, n_features + n_treatments) Training data. The last n_T columns should be a one-hot encoding of the treatment assignment. y : array-like, shape (n_samples, ) Target values. Returns ------- self: an instance of the class """ X = Xt[:, :-self.n_T] t = Xt[:, -self.n_T:] if sample_weight is None: for i in range(self.n_T): mask = (t[:, i] == 1) self.model_list[i].fit(X[mask], y[mask]) else: for i in range(self.n_T): mask = (t[:, i] == 1) self.model_list[i].fit(X[mask], y[mask], sample_weight[mask]) return self
[docs] def predict(self, Xt): """Predict using the linear model. Parameters ---------- X : array-like, shape (n_samples, n_features + n_treatments) Samples. The last n_T columns should be a one-hot encoding of the treatment assignment. Returns ------- C : array, shape (n_samples, ) Returns predicted values. """ X = Xt[:, :-self.n_T] t = Xt[:, -self.n_T:] predictions = [self.model_list[np.nonzero(t[i])[0][0]].predict(X[[i]]) for i in range(len(X))] return np.concatenate(predictions)
def _safe_norm_ppf(q, loc=0, scale=1): if hasattr(loc, "__len__"): prelim = loc.copy() if np.any(scale > 0): prelim[scale > 0] = scipy.stats.norm.ppf(q, loc=loc[scale > 0], scale=scale[scale > 0]) elif scale > 0: prelim = scipy.stats.norm.ppf(q, loc=loc, scale=scale) else: prelim = loc return prelim
[docs]class Summary: # This class is mainly derived from statsmodels.iolib.summary.Summary """ Result summary Construction does not take any parameters. Tables and text can be added with the `add_` methods. Attributes ---------- tables : list of tables Contains the list of SimpleTable instances, horizontally concatenated tables are not saved separately. extra_txt : str extra lines that are added to the text output, used for warnings and explanations. """ def __init__(self): self.tables = [] self.extra_txt = None def __str__(self): return self.as_text() def __repr__(self): return str(type(self)) + '\n"""\n' + self.__str__() + '\n"""' def _repr_html_(self): '''Display as HTML in IPython notebook.''' return self.as_html() def add_table(self, res, header, index, title): table = SimpleTable(res, header, index, title) self.tables.append(table)
[docs] def add_extra_txt(self, etext): '''add additional text that will be added at the end in text format Parameters ---------- etext : list[str] string with lines that are added to the text output. ''' self.extra_txt = '\n'.join(etext)
[docs] def as_text(self): '''return tables as string Returns ------- txt : str summary tables and extra text as one string ''' txt = summary_return(self.tables, return_fmt='text') if self.extra_txt is not None: txt = txt + '\n\n' + self.extra_txt return txt
[docs] def as_latex(self): '''return tables as string Returns ------- latex : str summary tables and extra text as string of Latex Notes ----- This currently merges tables with different number of columns. It is recommended to use `as_latex_tabular` directly on the individual tables. ''' latex = summary_return(self.tables, return_fmt='latex') if self.extra_txt is not None: latex = latex + '\n\n' + self.extra_txt.replace('\n', ' \\newline\n ') return latex
[docs] def as_csv(self): '''return tables as string Returns ------- csv : str concatenated summary tables in comma delimited format ''' csv = summary_return(self.tables, return_fmt='csv') if self.extra_txt is not None: csv = csv + '\n\n' + self.extra_txt return csv
[docs] def as_html(self): '''return tables as string Returns ------- html : str concatenated summary tables in HTML format ''' html = summary_return(self.tables, return_fmt='html') if self.extra_txt is not None: html = html + '<br/><br/>' + self.extra_txt.replace('\n', '<br/>') return html
[docs]class SeparateModel: """ Splits the data based on the last feature and trains a separate model for each subsample. At predict time, it uses the last feature to choose which model to use to predict. """ def __init__(self, *models): self.models = [clone(model) for model in models] def fit(self, XZ, T): for (i, m) in enumerate(self.models): inds = (XZ[:, -1] == i) m.fit(XZ[inds, :-1], T[inds]) return self def predict(self, XZ): t_pred = np.zeros(XZ.shape[0]) for (i, m) in enumerate(self.models): inds = (XZ[:, -1] == i) if np.any(inds): t_pred[inds] = m.predict(XZ[inds, :-1]) return t_pred @property def coef_(self): return np.concatenate((model.coef_ for model in self.models))
[docs]def deprecated(message, category=FutureWarning): """ Enables decorating a method or class to providing a warning when it is used. Parameters ---------- message: string The deprecation message to use category: optional :class:`type`, default :class:`FutureWarning` The warning category to use """ def decorator(to_wrap): # if we're decorating a class, just update the __init__ method, # so that the result is still a class instead of a wrapper method if isinstance(to_wrap, type): old_init = to_wrap.__init__ @wraps(to_wrap.__init__) def new_init(*args, **kwargs): warn(message, category, stacklevel=2) old_init(*args, **kwargs) to_wrap.__init__ = new_init return to_wrap else: @wraps(to_wrap) def m(*args, **kwargs): warn(message, category, stacklevel=2) return to_wrap(*args, **kwargs) return m return decorator
def _deprecate_positional(message, bad_args, category=FutureWarning): """ Enables decorating a method to provide a warning when certain arguments are used positionally. Parameters ---------- message: string The deprecation message to use bad_args : list of string The positional arguments that will be keyword-only in the future category: optional :class:`type`, default :class:`FutureWarning` The warning category to use """ def decorator(to_wrap): @wraps(to_wrap) def m(*args, **kwargs): # want to enforce that each bad_arg was either in kwargs, # or else it was in neither and is just taking its default value bound = signature(m).bind(*args, **kwargs) wrong_args = False for arg in bad_args: if arg not in kwargs and arg in bound.arguments: wrong_args = True if wrong_args: warn(message, category, stacklevel=2) return to_wrap(*args, **kwargs) return m return decorator
[docs]class MissingModule: """ Placeholder to stand in for a module that couldn't be imported, delaying ImportErrors until use. Parameters ---------- msg:string The message to display when an attempt to access a module memeber is made exn:ImportError The original ImportError to pass as the source of the exception """ def __init__(self, msg, exn): self.msg = msg self.exn = exn # Any access should throw def __getattr__(self, _): raise ImportError(self.msg) from self.exn # As a convenience, also throw on calls to allow MissingModule to be used in lieu of specific imports def __call__(self, *args, **kwargs): raise ImportError(self.msg) from self.exn
[docs]def transpose_dictionary(d): """ Transpose a dictionary of dictionaries, bringing the keys from the second level to the top and vice versa Parameters ---------- d: dict The dictionary to transpose; the values of this dictionary should all themselves be dictionaries Returns ------- output: dict The output dictionary with first- and second-level keys swapped """ output = defaultdict(dict) for key1, value in d.items(): for key2, val in value.items(): output[key2][key1] = val return output
[docs]def reshape_arrays_2dim(length, *args): """ Reshape the input arrays as two dimensional. If None, will be reshaped as (n, 0). Parameters ---------- length: scalar Number of samples args: arrays Inputs to be reshaped Returns ------- new_args: arrays Output of reshaped arrays """ new_args = [] for arg in args: if arg is None: new_args.append(np.array([]).reshape(length, 0)) elif arg.ndim == 1: new_args.append(arg.reshape((-1, 1))) else: new_args.append(arg) return new_args
class _RegressionWrapper: """ A simple wrapper that makes a binary classifier behave like a regressor. Essentially .fit, calls the fit method of the classifier and .predict calls the .predict_proba method of the classifier and returns the probability of label 1. """ def __init__(self, clf): """ Parameters ---------- clf : the classifier model """ self._clf = clf def fit(self, X, y, **kwargs): """ Parameters ---------- X : features y : one-hot-encoding of binary label, with drop='first' """ if len(y.shape) > 1 and y.shape[1] > 1: y = y @ np.arange(1, y.shape[1] + 1) self._clf.fit(X, y, **kwargs) return self def predict(self, X): """ Parameters ---------- X : features """ return self._clf.predict_proba(X)[:, 1:]
[docs]@deprecated("This class will be removed from a future version of this package; " "please use econml.sklearn_extensions.linear_model.WeightedLassoCV instead.") class LassoCVWrapper: """Helper class to wrap either LassoCV or MultiTaskLassoCV depending on the shape of the target.""" def __init__(self, *args, **kwargs): self.args = args self.kwargs = kwargs def fit(self, X, Y): assert shape(X)[0] == shape(Y)[0] assert ndim(Y) <= 2 self.needs_unravel = False if ndim(Y) == 2 and shape(Y)[1] > 1: self.model = MultiTaskLassoCV(*self.args, **self.kwargs) else: if ndim(Y) == 2 and shape(Y)[1] == 1: Y = np.ravel(Y) self.needs_unravel = True self.model = LassoCV(*self.args, **self.kwargs) self.model.fit(X, Y) return self def predict(self, X): predictions = self.model.predict(X) return reshape(predictions, (-1, 1)) if self.needs_unravel else predictions