# Source code for econml.iv.sieve._tsls

# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.

"""Provides a non-parametric two-stage least squares instrumental variable estimator."""

import numpy as np
from copy import deepcopy
from sklearn import clone
from sklearn.linear_model import LinearRegression
from ...utilities import (shape, transpose, reshape, cross_product, ndim, size,
_deprecate_positional, check_input_arrays)
from ..._cate_estimator import BaseCateEstimator, LinearCateEstimator
from numpy.polynomial.hermite_e import hermeval
from sklearn.base import TransformerMixin
from sklearn.preprocessing import PolynomialFeatures
from itertools import product

[docs]class HermiteFeatures(TransformerMixin):
"""
Featurizer that returns(unscaled) Hermite function evaluations.

The evaluated functions are of degrees 0..degree, differentiated shift times.

If the input has shape(n, x) and joint is False, the output will have shape(n, (degree+ 1)×x) if shift is 0.
If the input has shape(n, x) and joint is True, the output will have shape(n, (degree+ 1) ^ x) if shift is 0.
In either case, if shift is nonzero there will be shift additional dimensions of size x
between the first and last.
"""

[docs]    def __init__(self, degree, shift=0, joint=False):
self._degree = degree
self._shift = shift
self._joint = joint

def _column_feats(self, X, shift):
"""
Apply Hermite function evaluations of degrees 0..degree differentiated shift times.

When applied to the column X of shape(n,), the resulting array has shape(n, (degree + 1)).
"""
assert ndim(X) == 1
# this will have dimension (d,) + shape(X)
coeffs = np.identity(self._degree + shift + 1)[:, shift:]
feats = ((-1) ** shift) * hermeval(X, coeffs) * np.exp(-X * X / 2)
# send the first dimension to the end
return transpose(feats)

[docs]    def fit(self, X):
"""Fits the data(a NOP for this class) and returns self."""
return self

[docs]    def transform(self, X):
"""
Transform the data by applying the appropriate Hermite functions.

Parameters
----------
X: array_like
2-dimensional array of input features

Returns
-------
The transformed data
"""
assert ndim(X) == 2
n = shape(X)
ncols = shape(X)
columns = []
for indices in product(*[range(ncols) for i in range(self._shift)]):
if self._joint:
columns.append(cross_product(*[self._column_feats(X[:, i], indices.count(i))
for i in range(shape(X))]))
else:
indices = set(indices)
if self._shift == 0:  # return features for all columns:
columns.append(np.hstack([self._column_feats(X[:, i], self._shift) for i in range(shape(X))]))
# columns are featurized independently; partial derivatives are only non-zero
# when taken with respect to the same column each time
elif len(indices) == 1:
index = list(indices)
feats = self._column_feats(X[:, index], self._shift)
columns.append(np.hstack([feats if i == index else np.zeros(shape(feats))
for i in range(shape(X))]))
else:
columns.append(np.zeros((n, (self._degree + 1) * ncols)))
return reshape(np.hstack(columns), (n,) + (ncols,) * self._shift + (-1,))

[docs]class DPolynomialFeatures(TransformerMixin):
"""
Featurizer that returns the derivatives of :class:~sklearn.preprocessing.PolynomialFeatures features in
a way that's compativle with the expectations of :class:.NonparametricTwoStageLeastSquares's
dt_featurizer parameter.

If the input has shape (n, x) and
:meth:PolynomialFeatures.transform<sklearn.preprocessing.PolynomialFeatures.transform> returns an output
of shape (n, f), then :meth:.transform will return an array of shape (n, x, f).

Parameters
----------
degree: integer, default = 2
The degree of the polynomial features.

interaction_only: boolean, default = False
If true, only derivatives of interaction features are produced: features that are products of at most degree
distinct input features (so not x ** 2, x * x ** 3, etc.).

include_bias: boolean, default = True
If True (default), then include the derivative of a bias column, the feature in which all polynomial powers
are zero.
"""

[docs]    def __init__(self, degree=2, interaction_only=False, include_bias=True):
self.F = PolynomialFeatures(degree=degree, interaction_only=interaction_only, include_bias=include_bias)

[docs]    def fit(self, X, y=None):
"""
Compute number of output features.

Parameters
----------
X : array-like, shape (n_samples, n_features)
The data.
y : array, optional
Not used

Returns
-------
self : instance
"""
return self

[docs]    def transform(self, X):
"""
Transform data to derivatives of polynomial features

Parameters
----------
X: array-like, shape (n_samples, n_features)
The data to transform, row by row.

Returns
-------
XP: array-like, shape (n_samples, n_features, n_output_features)
The matrix of features, where n_output_features is the number of features that
would be returned from :class:~sklearn.preprocessing.PolynomialFeatures.
"""
self.F.fit(X)
powers = self.F.powers_
result = np.zeros(X.shape + (self.F.n_output_features_,))
for i in range(X.shape):
p = powers.copy()
c = powers[:, i]
p[:, i] -= 1
M = np.float_power(X[:, np.newaxis, :], p[np.newaxis, :, :])
result[:, i, :] = c[np.newaxis, :] * np.prod(M, axis=-1)
return result

def _add_ones(arr):
"""Add a column of ones to the front of an array."""
return np.hstack([np.ones((shape(arr), 1)), arr])

def _add_zeros(arr):
"""Add a column of zeros to the front of an array."""
return np.hstack([np.zeros((shape(arr), 1)), arr])

[docs]class SieveTSLS(BaseCateEstimator):
"""
Non-parametric instrumental variables estimator.

Supports the use of arbitrary featurizers for the features, treatments, and instruments.

Parameters
----------
t_featurizer: transformer
Featurizer used to transform the treatments

x_featurizer: transformer
Featurizer used to transform the raw features

z_featurizer: transformer
Featurizer used to transform the instruments

dt_featurizer: transformer
Featurizer used to transform the treatments for the computation of the marginal effect.
This should produce a 3-dimensional array, containing the per-treatment derivative of
each transformed treatment. That is, given a treatment array of shape(n, dₜ),
the output should have shape(n, dₜ, fₜ), where fₜ is the number of columns produced by t_featurizer.

"""

[docs]    def __init__(self, *,
t_featurizer,
x_featurizer,
z_featurizer,
dt_featurizer):
self._t_featurizer = clone(t_featurizer, safe=False)
self._x_featurizer = clone(x_featurizer, safe=False)
self._z_featurizer = clone(z_featurizer, safe=False)
self._dt_featurizer = clone(dt_featurizer, safe=False)
# don't fit intercept; manually add column of ones to the data instead;
# this allows us to ignore the intercept when computing marginal effects
self._model_T = LinearRegression(fit_intercept=False)
self._model_Y = LinearRegression(fit_intercept=False)
super().__init__()

[docs]    @_deprecate_positional("X, W, and Z should be passed by keyword only. In a future release "
"we will disallow passing X, W, and Z by position.", ['X', 'W', 'Z'])
@BaseCateEstimator._wrap_fit
def fit(self, Y, T, Z, X=None, W=None, *, inference=None):
"""
Estimate the counterfactual model from data, i.e. estimates functions τ(·, ·, ·), ∂τ(·, ·).

Parameters
----------
Y: (n × d_y) matrix
Outcomes for each sample
T: (n × dₜ) matrix
Treatments for each sample
X: optional(n × dₓ) matrix
Features for each sample
W: optional(n × d_w) matrix
Controls for each sample
Z: optional(n × d_z) matrix
Instruments for each sample
inference: string, :class:.Inference instance, or None
Method for performing inference.  This estimator supports 'bootstrap'
(or an instance of :class:.BootstrapInference)

Returns
-------
self

"""
Y, T, X, W, Z = check_input_arrays(Y, T, X, W, Z)
if X is None:
X = np.empty((shape(Y), 0))
if W is None:
W = np.empty((shape(Y), 0))
assert shape(Y) == shape(T) == shape(X) == shape(W) == shape(Z)

# make T 2D if if was a vector
if ndim(T) == 1:
T = reshape(T, (-1, 1))

# store number of columns of W so that we can create correctly shaped zero array in effect and marginal effect
self._d_w = shape(W)

# two stage approximation
# first, get basis expansions of T, X, and Z
ft_X = self._x_featurizer.fit_transform(X)
ft_Z = self._z_featurizer.fit_transform(Z)
ft_T = self._t_featurizer.fit_transform(T)
# TODO: is it right that the effective number of intruments is the
#       product of ft_X and ft_Z, not just ft_Z?
assert shape(ft_T) <= shape(ft_X) * shape(ft_Z), ("There can be no more T features than the product "
"of the number of X and Z features; otherwise "
"there is not enough information to identify their "
"structure")

# regress T expansion on X,Z expansions concatenated with W
features = _add_ones(np.hstack([W, cross_product(ft_X, ft_Z)]))
self._model_T.fit(features, ft_T)
# predict ft_T from interacted ft_X, ft_Z
ft_T_hat = self._model_T.predict(features)
self._model_Y.fit(_add_ones(np.hstack([W, cross_product(ft_T_hat, ft_X)])), Y)

[docs]    def effect(self, X=None, T0=0, T1=1):
"""
Calculate the heterogeneous treatment effect τ(·,·,·).

The effect is calculated between the two treatment points
conditional on a vector of features on a set of m test samples {T0ᵢ, T1ᵢ, Xᵢ}.

Parameters
----------
T0: (m × dₜ) matrix or vector of length m
Base treatments for each sample
T1: (m × dₜ) matrix or vector of length m
Target treatments for each sample
X: optional (m × dₓ) matrix
Features for each sample

Returns
-------
τ: (m × d_y) matrix
Heterogeneous treatment effects on each outcome for each sample
Note that when Y is a vector rather than a 2-dimensional array, the corresponding
singleton dimension will be collapsed (so this method will return a vector)

"""
if ndim(T0) == 0:
T0 = np.full((1 if X is None else shape(X),) + self._d_t, T0)
if ndim(T1) == 0:
T1 = np.full((1 if X is None else shape(X),) + self._d_t, T1)
if ndim(T0) == 1:
T0 = reshape(T0, (-1, 1))
if ndim(T1) == 1:
T1 = reshape(T1, (-1, 1))
if X is None:
X = np.empty((shape(T0), 0))
assert shape(T0) == shape(T1)
assert shape(T0) == shape(X)

W = np.zeros((shape(T0), self._d_w))  # can set arbitrarily since values will cancel
ft_X = self._x_featurizer.transform(X)
ft_T0 = self._t_featurizer.transform(T0)
ft_T1 = self._t_featurizer.transform(T1)
Y0 = self._model_Y.predict(_add_ones(np.hstack([W, cross_product(ft_T0, ft_X)])))
Y1 = self._model_Y.predict(_add_ones(np.hstack([W, cross_product(ft_T1, ft_X)])))
return Y1 - Y0

[docs]    def marginal_effect(self, T, X=None):
"""
Calculate the heterogeneous marginal effect ∂τ(·, ·).

The marginal effect is calculated around a base treatment
point conditional on a vector of features on a set of m test samples {Tᵢ, Xᵢ}.

Parameters
----------
T: (m × dₜ) matrix
Base treatments for each sample
X: optional(m × dₓ) matrix
Features for each sample

Returns
-------
grad_tau: (m × d_y × dₜ) array
Heterogeneous marginal effects on each outcome for each sample
Note that when Y or T is a vector rather than a 2-dimensional array,
the corresponding singleton dimensions in the output will be collapsed
(e.g. if both are vectors, then the output of this method will also be a vector)
"""
if X is None:
X = np.empty((shape(T), 0))
assert shape(T) == shape(X)

ft_X = self._x_featurizer.transform(X)
n = shape(T)
dT = self._dt_featurizer.transform(T if ndim(T) == 2 else reshape(T, (-1, 1)))
W = np.zeros((size(T), self._d_w))
# dT should be an n×dₜ×fₜ array (but if T was a vector, or if there is only one feature,
# dT may be only 2-dimensional)
# promote dT to 3D if necessary (e.g. if T was a vector)
if ndim(dT) < 3:
dT = reshape(dT, (n, 1, shape(dT)))

# reshape ft_X and dT to allow cross product (result has shape n×dₜ×fₜ×f_x)
features = reshape(ft_X, (n, 1, 1, -1)) * reshape(dT, shape(dT) + (1,))
features = transpose(features, [0, 1, 3, 2])  # swap last two dims to match cross_product
features = reshape(features, (size(T), -1))
output = self._model_Y.predict(_add_zeros(np.hstack([W, features])))
output = reshape(output, shape(T) + shape(output)[1:])
if ndim(output) == 3:
return transpose(output, (0, 2, 1))  # transpose trailing T and Y dims
else:
return output