# Copyright (c) PyWhy contributors. All rights reserved.
# Licensed under the MIT License.
#
# This code contains snippets of code from
# https://github.com/scikit-learn/scikit-learn/blob/main/sklearn/ensemble/_forest.py
# published under the following license and copyright:
# BSD 3-Clause License
#
# Copyright (c) 2007-2020 The scikit-learn developers.
# All rights reserved.
import numbers
from warnings import catch_warnings, simplefilter, warn
from abc import ABCMeta, abstractmethod
import numpy as np
import threading
from .._ensemble import (BaseEnsemble, _partition_estimators, _get_n_samples_subsample,
_accumulate_prediction, _accumulate_prediction_var, _accumulate_prediction_and_var,
_accumulate_oob_preds)
from ..utilities import check_inputs, cross_product
from ..tree._tree import DTYPE, DOUBLE
from ._base_grftree import GRFTree
from joblib import Parallel, delayed
from scipy.sparse import hstack as sparse_hstack
from sklearn.utils import check_random_state, compute_sample_weight
from sklearn.utils.validation import _check_sample_weight, check_is_fitted
from sklearn.utils import check_X_y
import scipy.stats
from scipy.special import erfc
__all__ = ["BaseGRF"]
MAX_INT = np.iinfo(np.int32).max
# =============================================================================
# Base Generalized Random Forest
# =============================================================================
[docs]class BaseGRF(BaseEnsemble, metaclass=ABCMeta):
"""
Base class for Genearlized Random Forests for solving linear moment equations of
the form::
E[J * theta(x) - A | X = x] = 0
where J is an (d, d) random matrix, A is an (d, 1) random vector and theta(x)
is a local parameter to be estimated, which might contain both relevant and
nuisance parameters.
Warning: This class should not be used directly. Use derived classes
instead.
"""
[docs] def __init__(self,
n_estimators=100, *,
criterion="mse",
max_depth=None,
min_samples_split=10,
min_samples_leaf=5,
min_weight_fraction_leaf=0.,
min_var_fraction_leaf=None,
min_var_leaf_on_val=False,
max_features="auto",
min_impurity_decrease=0.,
max_samples=.45,
min_balancedness_tol=.45,
honest=True,
inference=True,
fit_intercept=True,
subforest_size=4,
n_jobs=-1,
random_state=None,
verbose=0,
warm_start=False):
super().__init__(
base_estimator=GRFTree(),
n_estimators=n_estimators,
estimator_params=("criterion", "max_depth", "min_samples_split",
"min_samples_leaf", "min_weight_fraction_leaf",
"min_var_leaf", "min_var_leaf_on_val",
"max_features", "min_impurity_decrease", "honest",
"min_balancedness_tol",
"random_state"))
self.criterion = criterion
self.max_depth = max_depth
self.min_samples_split = min_samples_split
self.min_samples_leaf = min_samples_leaf
self.min_weight_fraction_leaf = min_weight_fraction_leaf
self.min_var_fraction_leaf = min_var_fraction_leaf
self.min_var_leaf_on_val = min_var_leaf_on_val
self.max_features = max_features
self.min_impurity_decrease = min_impurity_decrease
self.min_balancedness_tol = min_balancedness_tol
self.honest = honest
self.inference = inference
self.fit_intercept = fit_intercept
self.subforest_size = subforest_size
self.n_jobs = n_jobs
self.random_state = random_state
self.verbose = verbose
self.warm_start = warm_start
self.max_samples = max_samples
@abstractmethod
def _get_alpha_and_pointJ(self, X, T, y, **kwargs):
""" This function must be implemented by child class and given input variables
X, T, y and any auxiliary variables passed as keyword only, should be calculating
the point-wise random vector A and the point-wise jacobian random variable J of
the linear moment equation for every sample in the input samples.
Returns
-------
A : array of shape (n_samples, n_outputs)
The A part of the moment equation for each sample
J : array of shape (n_samples, n_outputs * n_outputs)
The J matrix part of the moment equation, flattened in Fortran-contiguous format.
"""
raise NotImplementedError("Abstract method")
@abstractmethod
def _get_n_outputs_decomposition(self, X, T, y, **kwargs):
""" This function must be implemented by child class and given input variables
X, T, y and any auxiliary variables passed as keyword only, should return a tuple
(n_outputs, n_relevant_outputs), which determines how many parameters is the moment
estimating and what prefix of these parameters are the relevant ones that we care about.
Returns
-------
n_outputs : int
The number of parameters we are estimating
n_relevant_outputs : int
The length of the prefix of parameters that we care about (remainder are nuisance)
"""
raise NotImplementedError("Abstract method")
[docs] def apply(self, X):
"""
Apply trees in the forest to X, return leaf indices.
Parameters
----------
X : array_like of shape (n_samples, n_features)
The input samples. Internally, it will be converted to
``dtype=np.float64``.
Returns
-------
X_leaves : ndarray of shape (n_samples, n_estimators)
For each datapoint x in X and for each tree in the forest,
return the index of the leaf x ends up in.
"""
X = self._validate_X_predict(X)
results = Parallel(n_jobs=self.n_jobs, verbose=self.verbose, backend="threading")(
delayed(tree.apply)(X, check_input=False)
for tree in self.estimators_)
return np.array(results).T
[docs] def decision_path(self, X):
"""
Return the decision path in the forest.
Parameters
----------
X : array_like of shape (n_samples, n_features)
The input samples. Internally, it will be converted to
``dtype=np.float64``.
Returns
-------
indicator : sparse matrix of shape (n_samples, n_nodes)
Return a node indicator matrix where non zero elements indicates
that the samples goes through the nodes. The matrix is of CSR
format.
n_nodes_ptr : ndarray of shape (n_estimators + 1,)
The columns from indicator[n_nodes_ptr[i]:n_nodes_ptr[i+1]]
gives the indicator value for the i-th estimator.
"""
X = self._validate_X_predict(X)
indicators = Parallel(n_jobs=self.n_jobs, verbose=self.verbose, backend='threading')(
delayed(tree.decision_path)(X, check_input=False)
for tree in self.estimators_)
n_nodes = [0]
n_nodes.extend([i.shape[1] for i in indicators])
n_nodes_ptr = np.array(n_nodes).cumsum()
return sparse_hstack(indicators).tocsr(), n_nodes_ptr
[docs] def fit(self, X, T, y, *, sample_weight=None, **kwargs):
"""
Build a forest of trees from the training set (X, T, y) and any other auxiliary variables.
Parameters
----------
X : array_like of shape (n_samples, n_features)
The training input samples. Internally, its dtype will be converted
to ``dtype=np.float64``.
T : array_like of shape (n_samples, n_treatments)
The treatment vector for each sample
y : array_like of shape (n_samples,) or (n_samples, n_outcomes)
The outcome values for each sample.
sample_weight : array_like of shape (n_samples,), default None
Sample weights. If None, then samples are equally weighted. Splits
that would create child nodes with net zero or negative weight are
ignored while searching for a split in each node.
**kwargs : dictionary of array_like items of shape (n_samples, d_var)
Auxiliary random variables that go into the moment function (e.g. instrument, censoring etc)
Any of these variables will be passed on as is to the `get_pointJ` and
`get_alpha` method of the children classes.
Returns
-------
self : object
"""
# TODO: support freq_weight and sample_var
y, T, X, _ = check_inputs(y, T, X, W=None, multi_output_T=True, multi_output_Y=True)
if sample_weight is not None:
sample_weight = _check_sample_weight(sample_weight, X, DOUBLE)
# Remap output
n_samples, self.n_features_ = X.shape
y = np.atleast_1d(y)
if y.ndim == 1:
# reshape is necessary to preserve the data contiguity against vs
# [:, np.newaxis] that does not.
y = np.reshape(y, (-1, 1))
self.n_y_ = y.shape[1]
T = np.atleast_1d(T)
if T.ndim == 1:
# reshape is necessary to preserve the data contiguity against vs
# [:, np.newaxis] that does not.
T = np.reshape(T, (-1, 1))
alpha, pointJ = self._get_alpha_and_pointJ(X, T, y, **kwargs)
self.n_outputs_, self.n_relevant_outputs_ = self._get_n_outputs_decomposition(X, T, y, **kwargs)
yaug = np.hstack([y, alpha, pointJ])
if getattr(yaug, "dtype", None) != DOUBLE or not yaug.flags.contiguous:
yaug = np.ascontiguousarray(yaug, dtype=DOUBLE)
if getattr(X, "dtype", None) != DTYPE:
X = X.astype(DTYPE)
# Get subsample sample size
n_samples_subsample = _get_n_samples_subsample(
n_samples=n_samples,
max_samples=self.max_samples
)
# Converting `min_var_fraction_leaf` to an absolute `min_var_leaf` that the GRFTree can handle
if self.min_var_fraction_leaf is None:
self.min_var_leaf = None
elif (not isinstance(self.min_var_fraction_leaf, numbers.Real)) or (not (0 < self.min_var_fraction_leaf <= 1)):
msg = "`min_var_fraction_leaf` must be in range (0, 1) but got value {}"
raise ValueError(msg.format(self.min_var_fraction_leaf))
else:
# We calculate the min eigenvalue proxy that each criterion is considering
# on the overall mean jacobian, to determine the absolute level of `min_var_leaf`
jac = np.mean(pointJ, axis=0).reshape((self.n_outputs_, self.n_outputs_))
min_var = np.min(np.abs(np.diag(jac)))
if self.criterion == 'mse':
for i in range(self.n_outputs_):
for j in range(self.n_outputs_):
if j != i:
det = np.sqrt(np.abs(jac[i, i] * jac[j, j] - jac[i, j] * jac[j, i]))
if det < min_var:
min_var = det
self.min_var_leaf = min_var * self.min_var_fraction_leaf
# Check parameters
self._validate_estimator()
random_state = check_random_state(self.random_state)
# We re-initialize the subsample_random_seed_ only if we are not in warm_start mode or
# if this is the first `fit` call of the warm start mode.
if (not self.warm_start) or (not hasattr(self, 'subsample_random_seed_')):
self.subsample_random_seed_ = random_state.randint(MAX_INT)
else:
random_state.randint(MAX_INT) # just advance random_state
subsample_random_state = check_random_state(self.subsample_random_seed_)
if (self.warm_start and hasattr(self, 'inference_') and (self.inference != self.inference_)):
raise ValueError("Parameter inference cannot be altered in between `fit` "
"calls when `warm_start=True`.")
self.inference_ = self.inference
self.warm_start_ = self.warm_start
if not self.warm_start or not hasattr(self, "estimators_"):
# Free allocated memory, if any
self.estimators_ = []
self.slices_ = []
# the below are needed to replicate randomness of subsampling when warm_start=True
self.slices_n_samples_ = []
self.slices_n_samples_subsample_ = []
self.n_samples_ = []
self.n_samples_subsample_ = []
n_more_estimators = self.n_estimators - len(self.estimators_)
if n_more_estimators < 0:
raise ValueError('n_estimators=%d must be larger or equal to '
'len(estimators_)=%d when warm_start==True'
% (self.n_estimators, len(self.estimators_)))
elif n_more_estimators == 0:
warn("Warm-start fitting without increasing n_estimators does not "
"fit new trees.")
else:
if self.inference:
if not isinstance(self.subforest_size, numbers.Integral):
raise ValueError("Parameter `subforest_size` must be "
"an integer but got value {}.".format(self.subforest_size))
if self.subforest_size < 2:
raise ValueError("Parameter `subforest_size` must be at least 2 if `inference=True`, "
"but got value {}".format(self.subforest_size))
if not (n_more_estimators % self.subforest_size == 0):
raise ValueError("The number of estimators to be constructed must be divisible "
"the `subforest_size` parameter. Asked to build `n_estimators={}` "
"with `subforest_size={}`.".format(n_more_estimators, self.subforest_size))
if n_samples_subsample > n_samples // 2:
if isinstance(self.max_samples, numbers.Integral):
raise ValueError("Parameter `max_samples` must be in [1, n_samples // 2], "
"if `inference=True`. "
"Got values n_samples={}, max_samples={}".format(n_samples, self.max_samples))
else:
raise ValueError("Parameter `max_samples` must be in (0, .5], if `inference=True`. "
"Got value {}".format(self.max_samples))
if self.warm_start and len(self.estimators_) > 0:
# We draw from the random state to get the random state we
# would have got if we hadn't used a warm_start.
random_state.randint(MAX_INT, size=len(self.estimators_))
trees = [self._make_estimator(append=False,
random_state=random_state).init()
for i in range(n_more_estimators)]
if self.inference:
if self.warm_start:
# Advancing subsample_random_state. Assumes each prior fit call has the same number of
# samples at fit time. If not then this would not exactly replicate a single batch execution,
# but would still advance randomness enough so that tree subsamples will be different.
for sl, n_, ns_ in zip(self.slices_, self.slices_n_samples_, self.slices_n_samples_subsample_):
subsample_random_state.choice(n_, n_ // 2, replace=False)
for _ in range(len(sl)):
subsample_random_state.choice(n_ // 2, ns_, replace=False)
# Generating indices a priori before parallelism ended up being orders of magnitude
# faster than how sklearn does it. The reason is that random samplers do not release the
# gil it seems.
n_groups = n_more_estimators // self.subforest_size
new_slices = np.array_split(np.arange(len(self.estimators_),
len(self.estimators_) + n_more_estimators),
n_groups)
s_inds = []
for sl in new_slices:
half_sample_inds = subsample_random_state.choice(n_samples, n_samples // 2, replace=False)
s_inds.extend([half_sample_inds[subsample_random_state.choice(n_samples // 2,
n_samples_subsample,
replace=False)]
for _ in range(len(sl))])
else:
if self.warm_start:
# Advancing subsample_random_state. Assumes each prior fit call has the same number of
# samples at fit time. If not then this would not exactly replicate a single batch execution,
# but would still advance randomness enough so that tree subsamples will be different.
for _, n_, ns_ in zip(range(len(self.estimators_)), self.n_samples_, self.n_samples_subsample_):
subsample_random_state.choice(n_, ns_, replace=False)
new_slices = []
s_inds = [subsample_random_state.choice(n_samples, n_samples_subsample, replace=False)
for _ in range(n_more_estimators)]
# Parallel loop: we prefer the threading backend as the Cython code
# for fitting the trees is internally releasing the Python GIL
# making threading more efficient than multiprocessing in
# that case. However, for joblib 0.12+ we respect any
# parallel_backend contexts set at a higher level,
# since correctness does not rely on using threads.
trees = Parallel(n_jobs=self.n_jobs, verbose=self.verbose, backend='threading')(
delayed(t.fit)(X[s], yaug[s], self.n_y_, self.n_outputs_, self.n_relevant_outputs_,
sample_weight=sample_weight[s] if sample_weight is not None else None,
check_input=False)
for t, s in zip(trees, s_inds))
# Collect newly grown trees
self.estimators_.extend(trees)
self.n_samples_.extend([n_samples] * len(trees))
self.n_samples_subsample_.extend([n_samples_subsample] * len(trees))
self.slices_.extend(list(new_slices))
self.slices_n_samples_.extend([n_samples] * len(new_slices))
self.slices_n_samples_subsample_.extend([n_samples_subsample] * len(new_slices))
return self
[docs] def get_subsample_inds(self,):
""" Re-generate the example same sample indices as those at fit time using same pseudo-randomness.
"""
check_is_fitted(self)
subsample_random_state = check_random_state(self.subsample_random_seed_)
if self.inference_:
s_inds = []
for sl, n_, ns_ in zip(self.slices_, self.slices_n_samples_, self.slices_n_samples_subsample_):
half_sample_inds = subsample_random_state.choice(n_, n_ // 2, replace=False)
s_inds.extend([half_sample_inds[subsample_random_state.choice(n_ // 2, ns_, replace=False)]
for _ in range(len(sl))])
return s_inds
else:
return [subsample_random_state.choice(n_, ns_, replace=False)
for n_, ns_ in zip(self.n_samples_, self.n_samples_subsample_)]
[docs] def feature_importances(self, max_depth=4, depth_decay_exponent=2.0):
"""
The feature importances based on the amount of parameter heterogeneity they create.
The higher, the more important the feature.
The importance of a feature is computed as the (normalized) total heterogeneity that the feature
creates. For each tree and for each split that the feature was chosen adds::
parent_weight * (left_weight * right_weight)
* mean((value_left[k] - value_right[k])**2) / parent_weight**2
to the importance of the feature. Each such quantity is also weighted by the depth of the split.
These importances are normalized at the tree level and then averaged across trees.
Parameters
----------
max_depth : int, default 4
Splits of depth larger than `max_depth` are not used in this calculation
depth_decay_exponent: double, default 2.0
The contribution of each split to the total score is re-weighted by 1 / (1 + `depth`)**2.0.
Returns
-------
feature_importances_ : ndarray of shape (n_features,)
Normalized total parameter heterogeneity inducing importance of each feature
"""
check_is_fitted(self)
all_importances = Parallel(n_jobs=self.n_jobs, backend='threading')(
delayed(tree.feature_importances)(
max_depth=max_depth, depth_decay_exponent=depth_decay_exponent)
for tree in self.estimators_ if tree.tree_.node_count > 1)
if not all_importances:
return np.zeros(self.n_features_, dtype=np.float64)
all_importances = np.mean(all_importances,
axis=0, dtype=np.float64)
return all_importances / np.sum(all_importances)
@property
def feature_importances_(self):
return self.feature_importances()
def _validate_X_predict(self, X):
"""
Validate X whenever one tries to predict, apply, and other predict methods."""
check_is_fitted(self)
return self.estimators_[0]._validate_X_predict(X, check_input=True)
[docs] def predict_tree_average_full(self, X):
""" Return the fitted local parameters for each X, i.e. theta(X). This
method simply returns the average of the parameters estimated by each tree. `predict_full`
should be preferred over `pred_tree_average_full`, as it performs a more stable averaging across
trees.
Parameters
----------
X : array_like of shape (n_samples, n_features)
The input samples. Internally, it will be converted to
``dtype=np.float64``.
Returns
-------
theta(X) : array_like of shape (n_samples, n_outputs)
The estimated relevant parameters for each row of X
"""
check_is_fitted(self)
# Check data
X = self._validate_X_predict(X)
# Assign chunk of trees to jobs
n_jobs, _, _ = _partition_estimators(self.n_estimators, self.n_jobs)
# avoid storing the output of every estimator by summing them here
y_hat = np.zeros((X.shape[0], self.n_outputs_), dtype=np.float64)
# Parallel loop
lock = threading.Lock()
Parallel(n_jobs=n_jobs, verbose=self.verbose, backend='threading', require="sharedmem")(
delayed(_accumulate_prediction)(e.predict_full, X, [y_hat], lock)
for e in self.estimators_)
y_hat /= len(self.estimators_)
return y_hat
[docs] def predict_tree_average(self, X):
""" Return the prefix of relevant fitted local parameters for each X, i.e. theta(X)[1..n_relevant_outputs].
This method simply returns the average of the parameters estimated by each tree. `predict`
should be preferred over `pred_tree_average`, as it performs a more stable averaging across
trees.
Parameters
----------
X : array_like of shape (n_samples, n_features)
The input samples. Internally, it will be converted to
``dtype=np.float64``.
Returns
-------
theta(X)[1, .., n_relevant_outputs] : array_like of shape (n_samples, n_relevant_outputs)
The estimated relevant parameters for each row of X
"""
y_hat = self.predict_tree_average_full(X)
if self.n_relevant_outputs_ == self.n_outputs_:
return y_hat
return y_hat[:, :self.n_relevant_outputs_]
[docs] def predict_moment_and_var(self, X, parameter, slice=None, parallel=True):
""" Return the value of the conditional expected moment vector at each sample and
for the given parameter estimate for each sample::
M(x; theta(x)) := E[J | X=x] theta(x) - E[A | X=x]
where conditional expectations are estimated based on the forest weights, i.e.::
M_tree(x; theta(x)) := (1/ |leaf(x)|) sum_{val sample i in leaf(x)} w[i] (J[i] theta(x) - A[i])
M(x; theta(x) = (1/n_trees) sum_{trees} M_tree(x; theta(x))
where w[i] is the sample weight (1.0 if sample_weight is None), as well as the variance of the local
moment vector across trees::
Var(M_tree(x; theta(x))) = (1/n_trees) sum_{trees} M_tree(x; theta(x)) @ M_tree(x; theta(x)).T
Parameters
----------
X : array_like of shape (n_samples, n_features)
The input samples. Internally, it will be converted to
``dtype=np.float64``.
parameter : array_like of shape (n_samples, n_outputs)
An estimate of the parameter theta(x) for each sample x in X
slice : list of int or None, default None
If not None, then only the trees with index in slice, will be used to calculate the mean
and the variance.
parallel : bool , default True
Whether the averaging should happen using parallelism or not. Parallelism adds some overhead
but makes it faster with many trees.
Returns
-------
moment : array_like of shape (n_samples, n_outputs)
The estimated conditional moment M(x; theta(x)) for each sample x in X
moment_var : array_like of shape (n_samples, n_outputs)
The variance of the conditional moment Var(M_tree(x; theta(x))) across trees for each sample x
"""
check_is_fitted(self)
# Check data
X = self._validate_X_predict(X)
# Assign chunk of trees to jobs
if slice is None:
slice = np.arange(len(self.estimators_))
moment_hat = np.zeros((X.shape[0], self.n_outputs_), dtype=np.float64)
moment_var_hat = np.zeros((X.shape[0], self.n_outputs_, self.n_outputs_), dtype=np.float64)
lock = threading.Lock()
if parallel:
n_jobs, _, _ = _partition_estimators(len(slice), self.n_jobs)
verbose = self.verbose
# Parallel loop
Parallel(n_jobs=n_jobs, verbose=verbose, backend='threading', require="sharedmem")(
delayed(_accumulate_prediction_and_var)(self.estimators_[t].predict_moment, X,
[moment_hat], [moment_var_hat], lock,
parameter)
for t in slice)
else:
[_accumulate_prediction_and_var(self.estimators_[t].predict_moment, X,
[moment_hat], [moment_var_hat], lock,
parameter)
for t in slice]
moment_hat /= len(slice)
moment_var_hat /= len(slice)
return moment_hat, moment_var_hat
[docs] def predict_alpha_and_jac(self, X, slice=None, parallel=True):
""" Return the value of the conditional jacobian E[J | X=x] and the conditional alpha E[A | X=x]
using the forest as kernel weights, i.e.::
alpha(x) = (1/n_trees) sum_{trees} (1/ |leaf(x)|) sum_{val sample i in leaf(x)} w[i] A[i]
jac(x) = (1/n_trees) sum_{trees} (1/ |leaf(x)|) sum_{val sample i in leaf(x)} w[i] J[i]
where w[i] is the sample weight (1.0 if sample_weight is None).
Parameters
----------
X : array_like of shape (n_samples, n_features)
The input samples. Internally, it will be converted to
``dtype=np.float64``.
slice : list of int or None, default None
If not None, then only the trees with index in slice, will be used to calculate the mean
and the variance.
parallel : bool , default True
Whether the averaging should happen using parallelism or not. Parallelism adds some overhead
but makes it faster with many trees.
Returns
-------
alpha : array_like of shape (n_samples, n_outputs)
The estimated conditional A, alpha(x) for each sample x in X
jac : array_like of shape (n_samples, n_outputs, n_outputs)
The estimated conditional J, jac(x) for each sample x in X
"""
check_is_fitted(self)
# Check data
X = self._validate_X_predict(X)
# Assign chunk of trees to jobs
if slice is None:
slice = np.arange(len(self.estimators_))
n_jobs = 1
verbose = 0
if parallel:
n_jobs, _, _ = _partition_estimators(len(slice), self.n_jobs)
verbose = self.verbose
alpha_hat = np.zeros((X.shape[0], self.n_outputs_), dtype=np.float64)
jac_hat = np.zeros((X.shape[0], self.n_outputs_**2), dtype=np.float64)
# Parallel loop
lock = threading.Lock()
Parallel(n_jobs=n_jobs, verbose=verbose, backend='threading', require="sharedmem")(
delayed(_accumulate_prediction)(self.estimators_[t].predict_alpha_and_jac, X, [alpha_hat, jac_hat], lock)
for t in slice)
alpha_hat /= len(slice)
jac_hat /= len(slice)
return alpha_hat, jac_hat.reshape((-1, self.n_outputs_, self.n_outputs_))
def _predict_point_and_var(self, X, full=False, point=True, var=False, project=False, projector=None):
""" An internal private method that coordinates all prediction functionality and tries to share
as much computation between different predict methods to avoid re-computation and re-spawining of
parallel executions.
Parameters
----------
X : array_like of shape (n_samples, n_features)
The input samples. Internally, it will be converted to
``dtype=np.float64``.
full : bool, default False
Whether to return the full estimated parameter or only the relevant part
point : bool, default True
Whether to return the point estimate theta(x)
var : bool, default False
Whether to return the co-variance of the point estimate V(theta(x))
project : bool, default False
Whether to project the point estimate using an inner product with a projector, and also
return the variance of the projection
projector : array_like of shape (n_samples, n_outputs)
The projection vector for each sample. The point estimate theta(x) for each sample will
be projected and return the inner produce <theta(x), projector(x)> for each sample x.
Also the variance information will be about the inner product as opposed to the parameter
theta(x).
Returns
-------
point : array_like of shape (n_samples, x)
The point estimate of the parameter theta(x) or its inner product with projector(x) for each
sample x in X.
If `point=False`, this return value is omitted. If `project=True`, then `x=1`. If `project=False`
and `full=True`, then `x=n_outputs`. If `project=False` and `full=False`, then `x=n_relevant_outputs`.
var : array_like of shape (n_samples, x, x) or (n_samples, 1)
The covariance of the parameter theta(x) or its inner product with projector(x) for each sample x in X.
If `var=False`, this return value is omitted. If `project=True`, then return is of shape (n_samples, 1).
If `project=False` and `full=True`, then `x=n_outputs`. If `project=False` and `full=False`,
then `x=n_relevant_outputs`.
"""
alpha, jac = self.predict_alpha_and_jac(X)
invjac = np.linalg.pinv(jac)
parameter = np.einsum('ijk,ik->ij', invjac, alpha)
if var:
if not self.inference:
raise AttributeError("Inference not available. Forest was initiated with `inference=False`.")
slices = self.slices_
n_jobs, _, _ = _partition_estimators(len(slices), self.n_jobs)
moment_bags, moment_var_bags = zip(*Parallel(n_jobs=n_jobs, verbose=self.verbose, backend='threading')(
delayed(self.predict_moment_and_var)(X, parameter, slice=sl, parallel=False) for sl in slices))
moment = np.mean(moment_bags, axis=0)
trans_moment_bags = np.moveaxis(moment_bags, 0, -1)
sq_between = np.einsum('tij,tjk->tik', trans_moment_bags,
np.transpose(trans_moment_bags, (0, 2, 1))) / len(slices)
moment_sq = np.einsum('tij,tjk->tik',
moment.reshape(moment.shape + (1,)),
moment.reshape(moment.shape[:-1] + (1, moment.shape[-1])))
var_between = sq_between - moment_sq
pred_cov = np.einsum('ijk,ikm->ijm', invjac,
np.einsum('ijk,ikm->ijm', var_between, np.transpose(invjac, (0, 2, 1))))
if project:
pred_var = np.einsum('ijk,ikm->ijm', projector.reshape((-1, 1, projector.shape[1])),
np.einsum('ijk,ikm->ijm', pred_cov,
projector.reshape((-1, projector.shape[1], 1))))[:, 0, 0]
else:
pred_var = np.diagonal(pred_cov, axis1=1, axis2=2)
#####################
# Variance correction
#####################
# Subtract the average within bag variance. This ends up being equal to the
# overall (E_{all trees}[moment^2] - E_bags[ E[mean_bag_moment]^2 ]) / sizeof(bag).
# The negative part is just sq_between.
var_total = np.mean(moment_var_bags, axis=0)
correction = (var_total - sq_between) / (len(slices[0]) - 1)
pred_cov_correction = np.einsum('ijk,ikm->ijm', invjac,
np.einsum('ijk,ikm->ijm', correction, np.transpose(invjac, (0, 2, 1))))
if project:
pred_var_correction = np.einsum('ijk,ikm->ijm', projector.reshape((-1, 1, projector.shape[1])),
np.einsum('ijk,ikm->ijm', pred_cov_correction,
projector.reshape((-1, projector.shape[1], 1))))[:, 0, 0]
else:
pred_var_correction = np.diagonal(pred_cov_correction, axis1=1, axis2=2)
# Objective bayes debiasing for the diagonals where we know a-prior they are positive
# The off diagonals we have no objective prior, so no correction is applied.
naive_estimate = pred_var - pred_var_correction
se = np.maximum(pred_var, pred_var_correction) * np.sqrt(2.0 / len(slices))
zstat = naive_estimate / np.clip(se, 1e-10, np.inf)
numerator = np.exp(- (zstat**2) / 2) / np.sqrt(2.0 * np.pi)
denominator = 0.5 * erfc(-zstat / np.sqrt(2.0))
pred_var_corrected = naive_estimate + se * numerator / denominator
# Finally correcting the pred_cov or pred_var
if project:
pred_var = pred_var_corrected
else:
pred_cov = pred_cov - pred_cov_correction
for t in range(self.n_outputs_):
pred_cov[:, t, t] = pred_var_corrected[:, t]
if project:
if point:
pred = np.sum(parameter * projector, axis=1)
if var:
return pred, pred_var
else:
return pred
else:
return pred_var
else:
n_outputs = self.n_outputs_ if full else self.n_relevant_outputs_
if point and var:
return (parameter[:, :n_outputs],
pred_cov[:, :n_outputs, :n_outputs],)
elif point:
return parameter[:, :n_outputs]
else:
return pred_cov[:, :n_outputs, :n_outputs]
[docs] def predict_full(self, X, interval=False, alpha=0.05):
""" Return the fitted local parameters for each x in X, i.e. theta(x).
Parameters
----------
X : array_like of shape (n_samples, n_features)
The input samples. Internally, it will be converted to
``dtype=np.float64``.
interval : bool, default False
Whether to return a confidence interval too
alpha : float in (0, 1), default 0.05
The confidence level of the confidence interval. Returns a symmetric (alpha/2, 1-alpha/2)
confidence interval.
Returns
-------
theta(x) : array_like of shape (n_samples, n_outputs)
The estimated relevant parameters for each row x of X
lb(x), ub(x) : array_like of shape (n_samples, n_outputs)
The lower and upper end of the confidence interval for each parameter. Return value is omitted if
`interval=False`.
"""
if interval:
point, pred_var = self._predict_point_and_var(X, full=True, point=True, var=True)
lb, ub = np.zeros(point.shape), np.zeros(point.shape)
for t in range(self.n_outputs_):
var = pred_var[:, t, t]
assert np.isclose(var[var < 0], 0, atol=1e-8).all(), f'`pred_var` must be > 0 {var[var < 0]}'
var = np.maximum(var, 1e-32)
pred_dist = scipy.stats.norm(loc=point[:, t], scale=np.sqrt(var))
lb[:, t] = pred_dist.ppf(alpha / 2)
ub[:, t] = pred_dist.ppf(1 - (alpha / 2))
return point, lb, ub
return self._predict_point_and_var(X, full=True, point=True, var=False)
[docs] def predict(self, X, interval=False, alpha=0.05):
""" Return the prefix of relevant fitted local parameters for each x in X,
i.e. theta(x)[1..n_relevant_outputs].
Parameters
----------
X : array_like of shape (n_samples, n_features)
The input samples. Internally, it will be converted to
``dtype=np.float64``.
interval : bool, default False
Whether to return a confidence interval too
alpha : float in (0, 1), default 0.05
The confidence level of the confidence interval. Returns a symmetric (alpha/2, 1-alpha/2)
confidence interval.
Returns
-------
theta(X)[1, .., n_relevant_outputs] : array_like of shape (n_samples, n_relevant_outputs)
The estimated relevant parameters for each row of X
lb(x), ub(x) : array_like of shape (n_samples, n_relevant_outputs)
The lower and upper end of the confidence interval for each parameter. Return value is omitted if
`interval=False`.
"""
if interval:
y_hat, lb, ub = self.predict_full(X, interval=interval, alpha=alpha)
if self.n_relevant_outputs_ == self.n_outputs_:
return y_hat, lb, ub
return (y_hat[:, :self.n_relevant_outputs_],
lb[:, :self.n_relevant_outputs_], ub[:, :self.n_relevant_outputs_])
else:
y_hat = self.predict_full(X, interval=False)
if self.n_relevant_outputs_ == self.n_outputs_:
return y_hat
return y_hat[:, :self.n_relevant_outputs_]
[docs] def predict_interval(self, X, alpha=0.05):
""" Return the confidence interval for the relevant fitted local parameters for each x in X,
i.e. theta(x)[1..n_relevant_outputs].
Parameters
----------
X : array_like of shape (n_samples, n_features)
The input samples. Internally, it will be converted to
``dtype=np.float64``.
alpha : float in (0, 1), default 0.05
The confidence level of the confidence interval. Returns a symmetric (alpha/2, 1-alpha/2)
confidence interval.
Returns
-------
lb(x), ub(x) : array_like of shape (n_samples, n_relevant_outputs)
The lower and upper end of the confidence interval for each parameter. Return value is omitted if
`interval=False`.
"""
_, lb, ub = self.predict(X, interval=True, alpha=alpha)
return lb, ub
[docs] def predict_and_var(self, X):
""" Return the prefix of relevant fitted local parameters for each x in X,
i.e. theta(x)[1..n_relevant_outputs] and their covariance matrix.
Parameters
----------
X : array_like of shape (n_samples, n_features)
The input samples. Internally, it will be converted to
``dtype=np.float64``.
Returns
-------
theta(x)[1, .., n_relevant_outputs] : array_like of shape (n_samples, n_relevant_outputs)
The estimated relevant parameters for each row of X
var(theta(x)) : array_like of shape (n_samples, n_relevant_outputs, n_relevant_outputs)
The covariance of theta(x)[1, .., n_relevant_outputs]
"""
return self._predict_point_and_var(X, full=False, point=True, var=True)
[docs] def predict_var(self, X):
""" Return the covariance matrix of the prefix of relevant fitted local parameters
for each x in X.
Parameters
----------
X : array_like of shape (n_samples, n_features)
The input samples. Internally, it will be converted to
``dtype=np.float64``.
Returns
-------
var(theta(x)) : array_like of shape (n_samples, n_relevant_outputs, n_relevant_outputs)
The covariance of theta(x)[1, .., n_relevant_outputs]
"""
return self._predict_point_and_var(X, full=False, point=False, var=True)
[docs] def prediction_stderr(self, X):
""" Return the standard deviation of each coordinate of the prefix of relevant fitted local parameters
for each x in X.
Parameters
----------
X : array_like of shape (n_samples, n_features)
The input samples. Internally, it will be converted to
``dtype=np.float64``.
Returns
-------
std(theta(x)) : array_like of shape (n_samples, n_relevant_outputs)
The standard deviation of each theta(x)[i] for i in {1, .., n_relevant_outputs}
"""
return np.sqrt(np.diagonal(self.predict_var(X), axis1=1, axis2=2))
def _check_projector(self, X, projector):
""" validate the projector parameter
"""
X, projector = check_X_y(X, projector, multi_output=True, y_numeric=True)
if projector.ndim == 1:
projector = projector.reshape((-1, 1))
if self.n_outputs_ > self.n_relevant_outputs_:
projector = np.hstack([projector,
np.zeros((projector.shape[0], self.n_outputs_ - self.n_relevant_outputs_))])
return X, projector
[docs] def predict_projection_and_var(self, X, projector):
""" Return the inner product of the prefix of relevant fitted local parameters for each x in X,
i.e. theta(x)[1..n_relevant_outputs], with a projector vector projector(x), i.e.::
mu(x) := <theta(x)[1..n_relevant_outputs], projector(x)>
as well as the variance of mu(x).
Parameters
----------
X : array_like of shape (n_samples, n_features)
The input samples. Internally, it will be converted to
``dtype=np.float64``.
projector : array_like of shape (n_samples, n_relevant_outputs)
The projector vector for each sample x in X
Returns
-------
mu(x) : array_like of shape (n_samples, 1)
The estimated inner product of the relevant parameters with the projector for each row x of X
var(mu(x)) : array_like of shape (n_samples, 1)
The variance of the estimated inner product
"""
X, projector = self._check_projector(X, projector)
return self._predict_point_and_var(X, full=False, point=True, var=True,
project=True, projector=projector)
[docs] def predict_projection(self, X, projector):
""" Return the inner product of the prefix of relevant fitted local parameters for each x in X,
i.e. theta(x)[1..n_relevant_outputs], with a projector vector projector(x), i.e.::
mu(x) := <theta(x)[1..n_relevant_outputs], projector(x)>
Parameters
----------
X : array_like of shape (n_samples, n_features)
The input samples. Internally, it will be converted to
``dtype=np.float64``.
projector : array_like of shape (n_samples, n_relevant_outputs)
The projector vector for each sample x in X
Returns
-------
mu(x) : array_like of shape (n_samples, 1)
The estimated inner product of the relevant parameters with the projector for each row x of X
"""
X, projector = self._check_projector(X, projector)
return self._predict_point_and_var(X, full=False, point=True, var=False,
project=True, projector=projector)
[docs] def predict_projection_var(self, X, projector):
""" Return the variance of the inner product of the prefix of relevant fitted local parameters
for each x in X, i.e. theta(x)[1..n_relevant_outputs], with a projector vector projector(x), i.e.::
Var(mu(x)) for mu(x) := <theta(x)[1..n_relevant_outputs], projector(x)>
Parameters
----------
X : array_like of shape (n_samples, n_features)
The input samples. Internally, it will be converted to
``dtype=np.float64``.
projector : array_like of shape (n_samples, n_relevant_outputs)
The projector vector for each sample x in X
Returns
-------
var(mu(x)) : array_like of shape (n_samples, 1)
The variance of the estimated inner product
"""
X, projector = self._check_projector(X, projector)
return self._predict_point_and_var(X, full=False, point=False, var=True,
project=True, projector=projector)
[docs] def oob_predict(self, Xtrain):
""" Returns the relevant output predictions for each of the training data points, when
only trees where that data point was not used are incorporated. This method is not
available is the estimator was trained with `warm_start=True`.
Parameters
----------
Xtrain : (n_training_samples, n_features) matrix
Must be the same exact X matrix that was passed to the forest at fit time.
Returns
-------
oob_preds : (n_training_samples, n_relevant_outputs) matrix
The out-of-bag predictions of the relevant output parameters for each of the training points
"""
if self.warm_start_:
raise AttributeError("`oob_predict` is not available when "
"the estimator was fitted with `warm_start=True`")
# avoid storing the output of every estimator by summing them here
alpha_hat = np.zeros((Xtrain.shape[0], self.n_outputs_), dtype=np.float64)
jac_hat = np.zeros((Xtrain.shape[0], self.n_outputs_**2), dtype=np.float64)
counts = np.zeros((Xtrain.shape[0],), dtype=np.intp)
subsample_inds = self.get_subsample_inds()
# Parallel loop
lock = threading.Lock()
Parallel(n_jobs=self.n_jobs, verbose=self.verbose, backend='threading', require="sharedmem")(
delayed(_accumulate_oob_preds)(tree, Xtrain, sinds, alpha_hat, jac_hat, counts, lock)
for tree, sinds in zip(self.estimators_, subsample_inds))
pos_count = (counts > 0)
alpha_hat[pos_count] /= counts[pos_count].reshape((-1, 1))
jac_hat[pos_count] /= counts[pos_count].reshape((-1, 1))
invjac = np.linalg.pinv(jac_hat.reshape((-1, self.n_outputs_, self.n_outputs_)))
oob_preds = np.einsum('ijk,ik->ij', invjac, alpha_hat)[:, :self.n_relevant_outputs_]
oob_preds[~pos_count] = np.nan
return oob_preds