import copy
import warnings
import numpy as np
import ot
from scipy import stats
from . import utilities, dist_reg, interpolation
from .utilities import BatchedCategorical
import pandas as pd
import cvxpy as cp
# typing
from scipy.stats._distn_infrastructure import rv_generic
import sklearn.base
from typing import Optional, Union
MIN_NVALS = 7
DISC_THRESH = 2 # treat vars. with <= DISC_THRESH values as discrete
CROSSFIT_WARNING = """
==================================================
Not fitting a model because y0_dists/y1_dists were
directly provided. Please ensure cross-fitting is
employed correctly, else inference will be invalid
(see https://arxiv.org/abs/2310.08115). To suppress
this warning, set ``suppress_warning=True``.
==================================================
"""
def get_default_model(discrete, support, outcome_model=None, **model_kwargs):
# Prevent errors when eps_dist is provided for discrete data
if discrete:
model_kwargs.pop("eps_dist", None)
# Handle the case where we have a list of outcome models
if isinstance(outcome_model, list):
return [
get_default_model(outcome_model=x, discrete=discrete, support=support, **model_kwargs)
for x in outcome_model
]
if isinstance(outcome_model, dist_reg.DistReg):
return outcome_model
outcome_model = 'ridge' if outcome_model is None else outcome_model
if not discrete:
return dist_reg.CtsDistReg(
model_type=outcome_model, **model_kwargs
)
elif discrete and set(support) == set([0, 1]):
return dist_reg.BinaryDistReg(
model_type=outcome_model, **model_kwargs
)
else:
raise NotImplementedError("Currently no default for non-binary discrete data")
def infer_discrete(discrete, support, y):
n = len(y)
### Check if discrete
if n <= 10:
if discrete is None:
raise ValueError("Please specify the value of discrete as n <= 10")
if discrete and support is None:
raise ValueError("Please specify the value of support as n <= 10")
if support is None:
support = np.unique(y)
if discrete is None:
if len(support) <= DISC_THRESH:
discrete = True
else:
discrete = False
# Adjust support to avoid being misleading
# in the continuous case
if not discrete:
support = None
return discrete, support
def _default_ylims(
y,
y0_min=None,
y1_min=None,
y0_max=None,
y1_max=None,
**kwargs # ignored
):
# Computes default bounds on y
if y0_min is None:
y0_min = 1.5 * y.min() - y.max() / 2
if y1_min is None:
y1_min = 1.5 * y.min() - y.max() / 2
if y0_max is None:
y0_max = 1.5 * y.max() - y.min() / 2
if y1_max is None:
y1_max = 1.5 * y.max() - y.min() / 2
return y0_min, y1_min, y0_max, y1_max
def _dualvals_are_too_large(nu0, nu1, almost_inf):
return max(np.max(np.abs(nu0)), np.max(np.abs(nu1))) > 0.1 * almost_inf
[docs]
class DualBounds:
"""
Computes dual bounds on :math:`E[f(Y(0),Y(1), X)].`
Here, :math:`X` are covariates and :math:`Y(0), Y(1)` are
potential outcomes.
Parameters
----------
f : function
Function which defines the partially identified estimand.
Must be a function of three arguments: y0, y1, x
(in that order). E.g.,
``f = lambda y0, y1, x : y0 <= y1``
outcome : np.array | pd.Series
n-length array of outcome measurements (Y).
treatment : np.array | pd.Series
n-length array of binary treatment (W).
covariates : np.array | pd.Series
(n, p)-shaped array of covariates (X).
propensities : np.array | pd.Series
n-length array of propensity scores :math:`P(W=1 | X)`.
If ``None``, will be estimated from the data.
clusters : np.array | pd.Series
Optional n-length array of clusters, so ``clusters[i] = j``
indicates that observation i is in cluster j.
outcome_model : str | dist_reg.DistReg | list
The model for estimating the law of :math:`Y | X, W`.
Three options:
- A str identifier, e.g., 'ridge', 'lasso', 'elasticnet', 'randomforest', 'knn'.
- An object inheriting from ``dist_reg.DistReg``.
- A list of ``dist_reg.DistReg`` objects to automatically choose between.
E.g., when ``outcome`` is continuous, the default is
``outcome_model=dist_reg.CtsDistReg(model_type='ridge')``.
propensity_model : str | sklearn classifier
How to estimate the propensity scores if they are not provided.
Two options:
- A str identifier, e.g., 'ridge', 'lasso', 'elasticnet', 'randomforest', 'knn'.
- An sklearn classifier, e.g., ``sklearn.linear_model.LogisticRegressionCV()``.
model_selector : dist_reg.ModelSelector
A ModelSelector object which can choose between several outcome models.
The default performs within-fold nested cross-validation. Note: this
argument is ignored unless ``outcome_model`` is a list.
discrete : bool
If True, treats the outcome as a discrete variable.
Defaults to ``None`` (inferred from the data).
support : np.array
Optional support of the outcome, if known and discrete.
Defaults to ``None`` (inferred from the data).
support_restriction : function
Boolean-valued function of y0, y1, x where
``support_restriction(y0, y1, x) = False`` asserts that
y0, y1, x is not in the support of :math:`Y(0), Y(1), X`.
Defaults to ``None`` (no a-priori support restrictions).
See the user guide for important usage tips.
model_kwargs : dict
Additional kwargs for the ``outcome_model``, e.g.,
``feature_transform``. See
:class:`dualbounds.dist_reg.CtsDistReg` or
:class:`dualbounds.dist_reg.BinaryDistReg` for more kwargs.
Notes
-----
``DualBounds`` will do limited preprocessing to (e.g.) create
dummies for discrete covariates. However, we recommended doing
custom preprocessing for optimal results.
Examples
--------
Here we fit DualBounds on :math:`P(Y(0) < Y(1))` based on
synthetic regression data: ::
import dualbounds as db
# Generate synthetic data
data = db.gen_data.gen_regression_data(n=900, p=30)
# Initialize dual bounds object
dbnd = db.generic.DualBounds(
f=lambda y0, y1, x: y0 < y1,
covariates=data['X'],
treatment=data['W'],
outcome=data['y'],
propensities=data['pis'],
outcome_model='ridge',
)
# Compute dual bounds and observe output
dbnd.fit(alpha=0.05).summary()
"""
def __init__(
self,
f: callable,
outcome: Union[np.array, pd.Series],
treatment: Union[np.array, pd.Series],
covariates: Optional[Union[np.array, pd.DataFrame]]=None,
propensities: Optional[Union[np.array, pd.Series]]=None,
clusters: Optional[Union[np.array, pd.Series]]=None,
outcome_model: Union[str, dist_reg.DistReg, list]='ridge',
propensity_model: Union[str, sklearn.base.BaseEstimator]='ridge',
model_selector: Optional[dist_reg.ModelSelector]=None,
discrete: Optional[np.array]=None,
support: Optional[np.array]=None,
support_restriction: Optional[callable]=None,
**model_kwargs,
) -> None:
# Estimand
self.f = f
### Process outcome
self.y = outcome
if isinstance(self.y, pd.Series) or isinstance(self.y, pd.DataFrame):
self.y = self.y.values
if np.any(np.isnan(self.y)):
raise ValueError("outcome contains nans")
if len(self.y.shape) > 1:
if len(self.y.shape) == 2 and self.y.shape[1] == 1:
self.y = self.y.flatten()
else:
raise ValueError("outcome should be a flat array not a matrix")
self.n = len(self.y)
### Treatment
if isinstance(treatment, pd.Series) or isinstance(treatment, pd.DataFrame):
treatment = treatment.values.reshape(self.n)
self.W = utilities._binarize_variable(treatment, var_name='treatment')
### Process covariates
self.X = covariates
if self.X is None:
self.X = np.zeros((self.n, 1))
# limited preprocessing of covariates
if isinstance(self.X, pd.DataFrame):
self.X = utilities.process_covariates(self.X)
self.cov_names = self.X.columns
self.X = self.X.values
else:
if len(self.X.shape) == 1:
self.X = self.X.reshape(-1, 1)
self.cov_names = np.arange(self.X.shape[1])
# fill NAs if existing
if np.any(np.isnan(self.X)):
self.X = self.X.copy()
naninds = np.where(np.isnan(self.X))
self.X[naninds] = np.take(np.nanmean(self.X, axis=0), naninds[1])
### process propensities
self.pis = propensities
if isinstance(self.pis, pd.Series) or isinstance(self.pis, pd.DataFrame):
self.pis = self.pis.values.reshape(self.n)
if self.pis is not None:
if np.any(np.isnan(self.pis)):
raise ValueError(f"propensities (pis) are provided but contains missing values")
if np.any(self.pis < 0) or np.any(self.pis > 1):
raise ValueError(f"propensities (pis) do not lie within [0,1]")
### Clusters
if clusters is not None:
if isinstance(clusters, pd.Series) or isinstance(clusters, pd.DataFrame):
clusters = clusters.values.reshape(self.n)
self.clusters = clusters
### Check if discrete
self.discrete, self.support = infer_discrete(
discrete=discrete, support=support, y=self.y,
)
self.outcome_model = outcome_model
self.propensity_model = propensity_model
self.model_selector = model_selector
self.model_kwargs = model_kwargs
## Support restrictions
self.support_restriction = support_restriction
## Initialize core objects
self.y0_dists = None
self.y1_dists = None
self.oos_dist_preds = None
def _apply_ot_fn(
self, fn: callable, y0: Union[float, np.array], y1: Union[float, np.array], **kwargs
):
"""
Applies fn and infers appropriate broadcasting for the OT setting.
Parameters
----------
y0 : float | np.array
y1 : float | np.array
kwargs : dict
Other parameters, e.g., x, or w0 and w1 in the IV setting
(since DualIVBounds inherits this function).
Returns
-------
fn_val : float | np.array
Array of shape (len(y0) x len(y1)).
Notes
-----
Internally, this is used to apply self.f and self.support_restriction.
"""
y0_haslength = utilities.haslength(y0)
y1_haslength = utilities.haslength(y1)
# Handle the scalar case
if not y0_haslength and not y1_haslength:
return fn(y0=y0, y1=y1, **kwargs)
# Otherwise vectorize appropriately
if not y0_haslength:
y0 = np.array([y0])
if not y1_haslength:
y1 = np.array([y1])
# Adding extra zeros ensures correct broadcasting
orig = fn(y0=y0.reshape(-1, 1), y1=y1.reshape(1, -1), **kwargs)
return orig + np.zeros((len(y0), len(y1)))
def _discretize(
self,
ydists,
nvals,
ymin,
ymax,
min_quantile=0.001,
ninterp=None,
min_yprob=1e-8,
):
"""
Helper method which discretizes Y before solving +
interpolating to obtain dual variables.
Notes
-----
For discrete Y: extracts conditional PMF/support points.
For continuous Y: discretizes ydists along evenly spaced
quantiles. Also adds extra points near the edge of the
support to ensure numerical stability.
"""
# allow for batched setting
if not isinstance(ydists, list):
ydists = [ydists]
### Discrete case ###
if self.discrete:
yvals = []
yprobs = []
for ydist in ydists:
if not isinstance(ydist, utilities.BatchedCategorical):
msg = "self.discrete=True, so the predicted ydists must be a BatchedCategorical"
msg += "distributions. To get discrete-like behavior using other distribution"
msg += "types, set discrete=False and grid_size=0."
raise ValueError(msg)
yvals.append(ydist.vals)
yprobs.append(ydist.probs)
return np.concatenate(yvals, axis=0), np.concatenate(yprobs, axis=0)
### Continuous case ###
if ninterp is None:
ninterp = min(max(int(0.1 * (nvals-2)), 1), 5)
# Case 2(a): we already have a discrete distribution
if isinstance(ydists[0], utilities.BatchedCategorical):
for ii, ydist in enumerate(ydists):
if not isinstance(ydist, utilities.BatchedCategorical):
raise ValueError(
f"ydists[0] is a BatchedCategorical, ydists[{ii}] is not---mixing cts/discrete dists is unsupported."
)
# Extract and adjust everything to ensure the same support size
nvals_adj = nvals - 2 * ninterp
dists_adj = [
utilities.adjust_support_size(
vals=ydist.vals, probs=ydist.probs, new_nvals=nvals_adj, ymin=ymin, ymax=ymax
)
for ydist in ydists
]
yvals = np.concatenate([x[0] for x in dists_adj], axis=0)
yprobs = np.concatenate([x[1] for x in dists_adj], axis=0)
# Case 2(b): we have a continuous distribution
else:
if nvals <= MIN_NVALS:
raise ValueError(f"nvals must be larger than {nvals}.")
# make sure we get small enough quantiles
alpha = min_quantile
if alpha is None:
alpha = 1 / (2*nvals)
alpha = min(1/(2*nvals), max(alpha, 1e-8))
# num of interp. pts between min/max quantiles
# and ymin/ymax, added to ensure feasbility
nvals_adj = nvals - 2 * ninterp - 3
### Main discretization based on evenly spaced quantiles
# choose endpoints of bins for disc. approx
endpoints = np.sort(np.concatenate(
[[0, alpha],
np.linspace(1/nvals_adj, (nvals_adj-1)/nvals_adj, nvals_adj),
[1-alpha, 1]],
))
qs = (endpoints[1:] + endpoints[0:-1])/2
# loop through batches and concatenate
yvals = []
for dists in ydists:
yvals.append(dists.ppf(qs.reshape(-1, 1)).T)
yvals = np.concatenate(yvals, axis=0)
n = len(yvals)
# concatenate y probs
yprobs = endpoints[1:] - endpoints[0:-1]
yprobs = np.stack([yprobs for _ in range(n)], axis=0)
### Insert additional support points with zero prob
if ninterp > 0:
to_add = np.linspace(ymin, yvals.min(axis=1), ninterp+1)[0:-1].T
to_add = np.concatenate(
[to_add, np.linspace(ymax, yvals.max(axis=1), ninterp+1)[0:-1].T],
axis=1
)
yvals = np.concatenate([to_add, yvals], axis=1)
yprobs = np.concatenate([np.zeros(to_add.shape), yprobs], axis=1)
# Minimum yprob for numerical stability; then return
yprobs = np.maximum(yprobs, min_yprob)
yprobs /= yprobs.sum(axis=1).reshape(-1, 1)
return yvals, yprobs
def _ensure_feasibility(
self,
i,
nu0,
nu1,
lower,
y0_min,
y0_max,
y1_min,
y1_max,
grid_size=100,
tol=5e-4,
):
"""
ensures nu0 + nu1 <= fvals (if lower)
or nu0 + nu1 >= fvals (if upper)
by performing a gridsearch.
Parameters
----------
i : int
Index of which data-point we are performing
this operation for.
nu0 : np.array
nvals0-length array of dual variables
associated with Y(0)
nu1 : np.array
nvals1-length array of dual variables
associated with Y(1)
lower : bool
Specifies lower vs. upper bound.
ymin : float
Minimum value of y
ymax : float
Maximum value of y
grid_size : int
Grid size along each dimension (y(0) and y(1)).
Returns
-------
new_y0_vals : np.array
``nvals0 + grid_size`` length array of new y0_vals.
*Exact size may change to avoid duplicate values
new_nu0 : np.array
``nvals0 + grid_size`` length array of interpolated
and feasible dual vars. for Y(0).
new_y1_vals : np.array
``nvals1 + grid_size`` length array of new y1_vals
new_nu1 : np.array
``nvals1 + grid_size`` length array of interpolated
and feasible dual vars. for Y(1).
*Exact size may change to avoid duplicate values.
dx : float
Maximum numerical error induced by interpolation
process.
objval_diff : np.array
The estimated objective value change for the new
dual variables.
"""
obj_orig = nu0 @ self.y0_probs[i] + nu1 @ self.y1_probs[i]
if grid_size > 0:
# interpolate to compute new dual variables
new_y0_vals = np.unique(np.sort(np.concatenate([
np.linspace(y0_min, y0_max, grid_size), self.y0_vals[i]
])))
new_y1_vals = np.unique(np.sort(np.concatenate([
np.linspace(y1_min, y1_max, grid_size), self.y1_vals[i]
])))
new_nu0 = self.interp_fn(
x=self.y0_vals[i], y=nu0, newx=new_y0_vals,
)
new_nu1 = self.interp_fn(
x=self.y1_vals[i], y=nu1, newx=new_y1_vals,
)
# compute required bounds
fvals = self._apply_ot_fn(
fn=self.f,
y0=new_y0_vals,
y1=new_y1_vals,
x=self.X[i],
)
# support restrictions relax constraints
if self.support_restriction is not None:
not_in_support = ~(self._apply_ot_fn(
fn=self.support_restriction,
y0=new_y0_vals,
y1=new_y1_vals,
x=self.X[i],
).astype(bool))
# Relax constraints
#almost_inf = 1e5 * (np.abs(fvals) + 1).max()
if lower:
fvals[not_in_support] = np.inf
else:
fvals[not_in_support] = - np.inf
## Adjust elementwise (as opposed to a constant subtraction)
# Calculate how far we are from feasibility
deltas = new_nu0.reshape(-1, 1) + new_nu1.reshape(1, -1)
deltas = deltas - fvals
if lower:
deltas0 = deltas.max(axis=1)
dx0 = np.mean(np.maximum(deltas0, 0))
deltas1 = deltas.max(axis=0)
dx1 = np.mean(np.maximum(deltas1, 0))
adj_axis = 1 if dx1 <= dx0 else 0
else:
deltas0 = deltas.min(axis=1)
dx0 = np.mean(np.minimum(deltas0, 0))
deltas1 = deltas.min(axis=0)
dx1 = np.mean(np.minimum(deltas1, 0))
adj_axis = 1 if dx1 >= dx0 else 0
# Adjust and recompute interp_nu0/interp_nu1
if adj_axis == 0:
new_nu0 -= deltas0
adj_nu0 = self.interp_fn(
x=new_y0_vals, y=new_nu0, newx=self.y0_vals[i],
)
adj_nu1 = nu1
else:
new_nu1 -= deltas1
adj_nu1 = self.interp_fn(
x=new_y1_vals, y=new_nu1, newx=self.y1_vals[i],
)
adj_nu0 = nu0
## Track change in objective
new_objval = adj_nu0 @ self.y0_probs[i] + adj_nu1 @ self.y1_probs[i]
objval_diff = obj_orig - new_objval
else:
new_y0_vals = self.y0_vals[i]
new_nu0 = nu0
new_y1_vals = self.y1_vals[i]
new_nu1 = nu1
objval_diff = 0
# return
return new_y0_vals, new_nu0, new_y1_vals, new_nu1, objval_diff
def _ensure_primal_feasibility(
self,
y0_probs: np.array,
y1_probs: np.array,
y0_vals: np.array,
y1_vals: np.array,
not_in_support: np.array,
i: int,
fuzz=0.0001,
solver='SCIPY',
min_prob=0, ## new
):
"""
fuzz : float
Adds fuzz * np.random.uniform() to nonzero values to push
jointprobs into the interior of the primal feasible set.
"""
nvals0 = len(y0_vals)
nvals1 = len(y1_vals)
min_ratio = cp.Parameter()
# Create variable
jointprobs = cp.Variable((nvals0, nvals1), pos=True)
y0p = jointprobs.sum(axis=1)
y1p = jointprobs.sum(axis=0)
constraints = [
cp.sum(jointprobs)==1,
jointprobs[not_in_support] == 0,
y0p >= y0_probs * min_ratio,
y1p >= y1_probs * min_ratio,
]
diffs0 = y0_vals[1:] - y0_vals[:-1]
diffs1 = y1_vals[1:] - y1_vals[:-1]
obj = cp.abs(cp.cumsum(y0p) - np.cumsum(y0_probs))[:-1] @ diffs0
obj += cp.abs(cp.cumsum(y1p) - np.cumsum(y1_probs))[:-1] @ diffs1
problem = cp.Problem(cp.Minimize(obj), constraints)
for mr in [1/2, 1/5, 1/100, 0]:
min_ratio.value = mr
problem.solve(solver=solver)
if problem.status in ['optimal', 'optimal_inaccurate']:
break
if problem.status not in ['optimal', 'optimal_inaccurate']:
raise ValueError(f"For i={i}, failed to enforce primal feasibility.")
# Clip
jp = jointprobs.value
jp += fuzz * np.random.uniform(0, 1, size=jp.shape) / (nvals0 * nvals1)
jp[not_in_support] = 0
jp = jp / jp.sum()
return jp.sum(axis=1), jp.sum(axis=0)
def _solve_single_instance(
self,
i: int,
probs0: np.array,
probs1: np.array,
y0_vals: np.array,
y1_vals: np.array,
fvals: np.array,
not_binding: np.array,
lower: bool,
dual_strategy='ot',
lp_solver='SCIPY',
qp_solver='CLARABEL',
se_solver='CLARABEL',
enforce_primal_feas=True,
debug=False,
**kwargs,
):
"""
Parameters
----------
not_binding : boolean array
not_binding[i,j] = 1 iff the (i,j)th
dual constraint should be ignored.
debug : bool
If True, becomes highly verbose.
dual_strategy : str
One of 'ot', 'lp', 'qp', 'se'
"""
if not lower:
nu0, nu1, objval = self._solve_single_instance(
probs0=probs0,
probs1=probs1,
y0_vals=y0_vals,
y1_vals=y1_vals,
fvals=-1*fvals,
i=i,
not_binding=not_binding,
lower=1,
dual_strategy=dual_strategy,
lp_solver=lp_solver,
qp_solver=qp_solver,
se_solver=se_solver,
debug=debug,
enforce_primal_feas=enforce_primal_feas,
**kwargs,
)
return -nu0, -nu1, -objval
### Parse dual solver
dual_strategy = str(dual_strategy).lower()
if dual_strategy not in ['ot', 'lp', 'qp', 'se']:
raise ValueError(
f"dual_strategy={dual_strategy} must be one of 'ot', 'lp', 'qp', 'se'"
)
### Structure
# 1. initial OT solve [used by ot]
# 2. Ensure_primal_feas [used by all if any constraints are not binding]
# 3. OT solve [used by ot]
# 4. LP solve [used by lp]
# 5. QP solve [used by qp]
### 1. Initial OT solve
if dual_strategy == 'ot':
# Use a hack for non-binding constraints.
# This only influences the OT solver.
almost_inf = 1e5 * np.abs(fvals).max()
if np.any(not_binding):
fvals = fvals.copy()
fvals[not_binding] = almost_inf
# Attempt to solve with pyot
objval, log = ot.lp.emd2(
a=probs0,
b=probs1,
M=fvals,
log=True,
**kwargs,
)
nu0, nu1 = log['u'], log['v']
too_large = _dualvals_are_too_large(nu0, nu1, almost_inf)
if (not too_large) or (not np.any(not_binding)):
nu1 += nu0.mean()
nu0 -= nu0.mean()
return nu0, nu1, objval
### 2. Ensure primal feasibility
if np.any(not_binding) and enforce_primal_feas:
if debug:
print(f"Ensuring primal feasibility. enforce_primal_feas={enforce_primal_feas}")
probs0, probs1 = self._ensure_primal_feasibility(
y0_probs=probs0,
y1_probs=probs1,
y0_vals=y0_vals,
y1_vals=y1_vals,
not_in_support=not_binding,
i=i,
)
## 3. Second OT solve
if dual_strategy == 'ot':
if debug:
print("Trying secondary ot.lp optimization.")
objval, log = ot.lp.emd2(
a=probs0,
b=probs1,
M=fvals,
log=True,
**kwargs,
)
nu0, nu1 = log['u'], log['v']
nu1 += nu0.mean()
nu0 -= nu0.mean()
if _dualvals_are_too_large(nu0, nu1, almost_inf):
dual_strategy = 'se'
## 4/5: solve with LP/QP.
if dual_strategy in ['lp', 'qp', 'se']:
###NOTE: this is 25% faster but less general bc we can't swap in
# new solvers.
# c = np.concatenate([y0_probs, y1_probs])
# i0s, i1s = np.where(in_support)
# col_inds = np.concatenate([i0s, i1s + nvals0])
# row_inds = np.concatenate([np.arange(len(i0s)), np.arange(len(i0s))])
# b = fvals[(i0s, i1s)]
# A_sparse = scipy.sparse.csr_matrix(
# (np.ones(len(row_inds)), (row_inds, col_inds)), shape=(len(i0s), len(c))
# )
# out_upper = scipy.optimize.linprog(
# c=-c, A_ub=A_sparse, b_ub=-b, bounds=(None, None),
# )
nu0 = cp.Variable(len(probs0))
nu1 = cp.Variable(len(probs1))
nusum = cp.reshape(nu0, (-1, 1)) + cp.reshape(nu1, (1, -1))
linobj = nu0 @ probs0 + nu1 @ probs1
binding = (~not_binding).astype(int)
constraints = [cp.multiply(binding, nusum) <= cp.multiply(binding, fvals)]
if dual_strategy == 'lp':
if debug:
print(f"Solving linear problem with {lp_solver} (not PyOT).")
problem = cp.Problem(cp.Maximize(linobj), constraints=constraints)
problem.solve(solver=lp_solver)
if debug:
print(f"LP problem status is {problem.status}.")
elif dual_strategy == 'qp':
if debug:
print(f"Solving quadratic program with {qp_solver} (not PyOT).")
pi = self.pis[i]
pcat = np.maximum(
np.concatenate([probs0 / (1-pi), probs1 / pi]),
1e-6
)
quad_term = (pcat @ cp.hstack([nu0, nu1]))**2 / self.n
problem = cp.Problem(
cp.Maximize(linobj - quad_term), constraints=constraints
)
problem.solve(solver=qp_solver)
# maximize mean - 2 standard errors
else:
if debug:
print(f"Solving se-adjusted problem with {se_solver} (not PyOT).")
pcat = np.concatenate([probs0, probs1])
pi = self.pis[i]
Q = np.diag(np.concatenate([probs0 / (1-pi), probs1 / pi])) - np.outer(pcat, pcat)
evals, evecs = np.linalg.eigh(Q)
sqrtQ = evecs @ np.diag(np.sqrt(np.maximum(evals, 0))) @ evecs.T
stdev = cp.norm2(sqrtQ @ cp.hstack([nu0, nu1]))
problem = cp.Problem(
cp.Maximize(linobj - 2 * stdev / np.sqrt(self.n)), constraints=constraints
)
problem.solve(solver=se_solver)
# Warn the user of failures
if problem.status not in ['optimal', 'optimal_inaccurate']:
msg = "The final bounds may be vacuous."
if problem.status == 'unbounded':
msg = f"Dual solver failed due to primal infeasibility for i={i}, lower={lower}. " + msg
else:
msg = f"Dual solver failed for i={i}, lower={lower} with status {problem.status}. " + msg
warnings.warn(msg)
nu0 = np.zeros(len(probs0))
nu1 = np.zeros(len(probs1))
objval = 0
else:
nu0 = nu0.value
nu1 = nu1.value
objval = linobj.value
# center
if dual_strategy not in ['qp', 'se']:
nu1 += nu0.mean()
nu0 -= nu0.mean()
return nu0, nu1, objval
[docs]
def compute_dual_variables(
self,
y0_dists: Optional[list]=None,
y0_vals: Optional[np.array]=None,
y0_probs: Optional[np.array]=None,
y1_dists: Optional[list]=None,
y1_vals: Optional[np.array]=None,
y1_probs: Optional[np.array]=None,
verbose: bool=True,
min_quantile: Optional[float]=None,
ninterp: Optional[int]=None,
dual_strategy: str='ot',
lp_solver: str='SCIPY',
qp_solver: str='CLARABEL',
se_solver: str='CLARABEL',
nvals0: int=100,
nvals1: int=100,
interp_fn: callable=interpolation.adaptive_interpolate,
y0_min: Optional[float]=None,
y0_max: Optional[float]=None,
y1_min: Optional[float]=None,
y1_max: Optional[float]=None,
**kwargs,
):
"""
Estimates dual variables using the outcome model.
We generally recommend that the user call .fit() instead of
calling this function directly.
Parameters
----------
y0_dists : list
The ith distribution of y0_dists represents the conditional
law of :math:`Y_i(0) | X_i`. There are two input formats:
- batched scipy distribution of shape (n,)
- list of scipy dists whose shapes add up to n.
y0_vals : list
Alternatively, specify a (n, nvals0)-length array
where ``y0_vals[i]`` is the support of :math:`Y_i(0)`.
Ignored if ``y0_dists`` is provided.
y0_probs : np.array
A (n, nvals0)-length array where ``y0_probs[i, j]``
is the estimated probability that :math:`Y_i(0)`
equals ``y0_vals[i, j].``
y1_dists : list
The ith distribution of y1_dists represents the conditional
law of :math:`Y_i(1) | X_i`. There are two input formats:
- batched scipy distribution of shape (n,)
- list of scipy dists whose shapes add up to n.
y1_vals : np.array
(n, nvals1)-length array where ``y1_vals[i]`` is the support
of :math:`Y_i(1)`. Ignored if ``y1_dists`` is provided.
y1_probs : np.array
(n, nvals1)-length array where ``y1_probs[i, j]``
is the estimated probability that :math:`Y_i(1)`
equals ``y1_vals[i, j].``
dual_strategy : str
Specifies the strategy used to find the dual variables.
One of the following:
- 'ot': solves a standard-form optimal transport problem.
- 'lp': solves a full linear program.
- 'qp': solves a quadratic program which approximately accounts for standard errors.
- 'se': solves a convex program which exactly accounts for standard errors.
'ot' is the default and the fastest, but 'se' can reduce
standard errors in noisy problems.
lp_solver : str
When ``dual_strategy='lp'``, specifies which cvxpy solver
to use to solve the optimal transport LP. Default: SCIPY.
qp_solver : str
When ``dual_strategy='qp'``, specifies which cvxpy solver
to use to solve the optimal transport QP. Default: CLARABEL.
se_solver : str
When ``dual_strategy='se'``, specifies which cvxpy solver
to use to solve the convex program. Default: CLARABEL.
min_quantile : float
Minimum quantile to consider when discretizing.
Defaults to 1 / (2*nvals).
nvals0 : int
How many values to use to discretize Y(0).
Defaults to 100. Ignored for discrete Y.
nvals1 : int
How many values to use to discretize Y(1).
Defaults to 100. Ignored for discrete Y.
interp_fn : function
An interpolation function with the same input/output
signature as ``interpolation.adaptive_interpolate``,
which is the default. Ignored for discrete Y.
y0_min : float
Minimum support for Y(0).
Defaults to ``self.y.min() - 0.5 * (self.y.max() - self.y.min())``
y1_min : float
Minimum support for Y(1).
Defaults to ``self.y.min() - 0.5 * (self.y.max() - self.y.min())``
y0_max : float
Maximum support for Y(0).
Defaults to ``self.y.max() + 0.5 * (self.y.max() - self.y.min())``
y1_max : float
Maximum support for Y(1).
Defaults to ``self.y.max() + 0.5 * (self.y.max() - self.y.min())``
kwargs : dict
kwargs for ``_ensure_feasibility`` method, e.g., ``grid_size``.
Returns
-------
None
"""
### Key quantities for optimizer
# to ensure numerical stability, we add extra quantiles
if verbose:
print("Estimating optimal dual variables.")
if min([nvals0, nvals1]) <= MIN_NVALS:
raise ValueError(f"nvals0={nvals0}, nvals1={nvals1} must be larger than {MIN_NVALS}")
# If discrete=True and ydists are BatchedCategorical distributions,
# these values are ignored.
self.nvals0 = nvals0
self.nvals1 = nvals1
self.interp_fn = interp_fn
# max/min yvals
y0_min, y1_min, y0_max, y1_max = _default_ylims(
self.y, y0_min, y1_min, y0_max=y0_max, y1_max=y1_max
)
# this discretizes if Y is continuous
if y0_vals is None or y0_probs is None:
y0_vals, y0_probs = self._discretize(
y0_dists, nvals=self.nvals0, min_quantile=min_quantile,
ymin=y0_min, ymax=y0_max, ninterp=ninterp,
)
if self.discrete:
self.nvals0 = y0_vals.shape[1]
if y1_vals is None or y1_probs is None:
y1_vals, y1_probs = self._discretize(
y1_dists, nvals=self.nvals1, min_quantile=min_quantile,
ymin=y1_min, ymax=y1_max, ninterp=ninterp,
)
if self.discrete:
self.nvals1 = y1_vals.shape[1]
# ensure y1_vals, y1_probs are sorted
self.y0_vals, self.y0_probs = utilities._sort_disc_dist(y0_vals, probs=y0_probs)
self.y1_vals, self.y1_probs = utilities._sort_disc_dist(y1_vals, probs=y1_probs)
# Delete old values
del y0_vals, y1_vals, y0_probs, y1_probs
# Initialize results
self.nu0s = np.zeros((2, self.n, self.nvals0)) # first dimension = [lower, upper]
self.nu1s = np.zeros((2, self.n, self.nvals1))
# Realized dual variables
self.hatnu0s = np.zeros((2, self.n))
self.hatnu1s = np.zeros((2, self.n))
# estimated cond means of nu0s, nu1s
self.c0s = np.zeros((2, self.n))
self.c1s = np.zeros((2, self.n))
# objective values
self.objvals = np.zeros((2, self.n))
self.dxs = np.zeros((2, self.n)) # ignored, only for backward compatability
self.objdiffs = np.zeros((2, self.n))
# loop through
for i in utilities.vrange(self.n, verbose=verbose):
# set parameter values
fvals = self._apply_ot_fn(
fn=self.f,
y0=self.y0_vals[i],
y1=self.y1_vals[i],
x=self.X[i],
)
if self.support_restriction is not None:
not_binding = ~(self._apply_ot_fn(
fn=self.support_restriction,
y0=self.y0_vals[i],
y1=self.y1_vals[i],
x=self.X[i],
).astype(bool))
else:
not_binding = np.zeros(fvals.shape).astype(bool)
# solve
for lower in [1, 0]:
nu0x, nu1x, objval = self._solve_single_instance(
probs0=self.y0_probs[i],
probs1=self.y1_probs[i],
y0_vals=self.y0_vals[i],
y1_vals=self.y1_vals[i],
fvals=fvals,
lower=lower,
i=i,
not_binding=not_binding,
dual_strategy=dual_strategy,
lp_solver=lp_solver,
qp_solver=qp_solver,
se_solver=se_solver,
)
self.objvals[1 - lower, i] = objval
# Save solutions
self.nu0s[1 - lower, i] = nu0x
self.nu1s[1 - lower, i] = nu1x
self.c0s[1 - lower, i] = nu0x @ self.y0_probs[i]
self.c1s[1 - lower, i] = nu1x @ self.y1_probs[i]
# Compute realized dual variables
self._interpolate_and_ensure_feas(
yi=self.y[i],
i=i,
lower=lower,
y0_min=y0_min,
y0_max=y0_max,
y1_min=y1_min,
y1_max=y1_max,
**kwargs
)
def _interpolate_and_ensure_feas(
self, yi, lower, i, y0_min, y1_min, y0_max, y1_max, **kwargs
):
"""
This is used in two places:
1. _compute_realized_dual_variables
2. Main loop of compute_dual_variables
We use this in the main loop of compute_dual_variables
instead of just calling _compute_realized_dual_variables
so that the progress report is accurate.
"""
nu0x = self.nu0s[1-lower, i]
nu1x = self.nu1s[1-lower, i]
# Ensure feasibility
if not self.discrete:
y0v, nu0adj, y1v, nu1adj, odx = self._ensure_feasibility(
i=i, nu0=nu0x, nu1=nu1x, lower=lower,
y0_min=y0_min, y0_max=y0_max,
y1_min=y1_min, y1_max=y1_max,
**kwargs
)
self.objdiffs[1-lower, i] = odx
# interpolate to find realized values
self.hatnu0s[1 - lower, i] = self.interp_fn(
x=y0v, y=nu0adj, newx=yi,
)[0]
self.hatnu1s[1 - lower, i] = self.interp_fn(
x=y1v, y=nu1adj, newx=yi,
)[0]
else:
j0 = np.argmin(np.abs(self.y0_vals[i] - yi))#.item()
j1 = np.argmin(np.abs(self.y1_vals[i] - yi))#.item()
self.hatnu0s[1 - lower, i] = nu0x[j0]
self.hatnu1s[1 - lower, i] = nu1x[j1]
def _compute_realized_dual_variables(self, y=None, **kwargs):
"""
Helper function which applies interpolation
(for continuous Y) to compute realized dual variables.
It also ensures feasibility through a 2D gridsearch.
This is used primarily in unit tests.
"""
y = self.y if y is None else y
# Create ylims
y0_min, y1_min, y0_max, y1_max = _default_ylims(y=y, **kwargs)
kwargs['y0_min'] = y0_min
kwargs['y1_min'] = y1_min
kwargs['y0_max'] = y0_max
kwargs['y1_max'] = y1_max
### Compute realized hatnu1s/hatnu0s
for i in range(self.n):
for lower in [0, 1]:
self._interpolate_and_ensure_feas(
i=i,
yi=y[i],
lower=lower,
**kwargs
)
def _compute_ipw_summands(self):
"""
Helper method to compute (A)IPW estimator summands.
Must be run after running ``self.compute_dual_variables``
"""
# initialize outputs
self.ipw_summands = np.zeros((2, self.n))
self.aipw_summands = np.zeros((2, self.n))
# compute IPW summands and AIPW summands
for lower in [0, 1]:
# IPW
self.ipw_summands[1-lower] = self.hatnu1s[1-lower] / self.pis
self.ipw_summands[1-lower][self.W == 0] = (
self.hatnu0s[1-lower] / (1 - self.pis)
)[self.W == 0]
# AIPW
mus = self.c1s[1-lower] + self.c0s[1-lower]
aipw1 = (self.hatnu1s[1-lower] - self.c1s[1-lower]) / self.pis + mus
aipw0 = (self.hatnu0s[1-lower] - self.c0s[1-lower]) / (1-self.pis) + mus
self.aipw_summands[1-lower] = aipw1 * self.W + (1 - self.W) * aipw0
def _compute_final_bounds(self, aipw=True, alpha=0.05):
"""
Computes final results from the estimated dual variables.
"""
self._compute_ipw_summands()
self.estimates, self.ses, self.cis = utilities.compute_est_bounds(
summands=self.aipw_summands if aipw else self.ipw_summands,
alpha=alpha,
clusters=self.clusters,
)
return dict(
estimates=self.estimates,
ses=self.ses,
cis=self.cis,
)
def _compute_cond_means(self):
"""
Helper function which computes self.mu0 = E[Y(0) | X],
self.mu1 = E[Y(1) | X]. Can only be run after the
conditional distributions have been estimated.
"""
# make lists
if isinstance(self.y0_dists, list):
y0_dists = self.y0_dists
else:
y0_dists = [self.y0_dists]
if isinstance(self.y1_dists, list):
y1_dists = self.y1_dists
else:
y1_dists = [self.y1_dists]
# Compute
self.mu0 = np.concatenate([x.mean() for x in y0_dists])
self.mu1 = np.concatenate([x.mean() for x in y1_dists])
[docs]
def fit_propensity_scores(
self, nfolds: int, clip: float=1e-2, verbose: bool=True,
):
"""
Cross-fits the propensity scores.
Parameters
----------
nfolds : int
Number of folds.
clip : float
Clip propensity scores to be in [clip, 1-clip].
verbose : bool
If True, prints progress reports.
Returns
-------
None
"""
# Parse model
if self.propensity_model is None:
self.propensity_model = 'ridge'
if isinstance(self.propensity_model, str):
model_cls = dist_reg.parse_model_type(
self.propensity_model, discrete=True
)
self.propensity_model = model_cls()
# Loop through
if verbose:
print("Fitting propensity scores.")
starts, ends = dist_reg.create_folds(n=self.n, nfolds=nfolds)
# loop through and fit
self.propensity_model_fits = []
self.pis = np.zeros(self.n)
for ii in utilities.vrange(len(starts), verbose=verbose):
start, end = starts[ii], ends[ii]
# Pick out data from the other folds
not_in_fold = [
i for i in np.arange(self.n) if i < start or i >= end
]
# Fit
model_fit = copy.deepcopy(self.propensity_model)
model_fit.fit(
X=self.X[not_in_fold], y=self.W[not_in_fold]
)
self.propensity_model_fits.append(model_fit)
# Predict out of sample
self.pis[start:end] = model_fit.predict_proba(
self.X[start:end]
)[:, 1]
# Clip
self.pis = np.minimum(np.maximum(self.pis, clip), 1-clip)
[docs]
def cross_fit(
self,
nfolds: int=5,
suppress_warning: bool=False,
verbose: bool=True,
weight_by_propensities: bool=False,
):
"""
Cross-fits the outcome model.
Parameters
----------
nfolds : int
Number of folds to use in cross-fitting.
suppress_warning : bool
If True, suppresses a potential warning about cross-fitting.
verbose : bool
If True, prints progress reports.
weight_by_propensities : bool
If True, when cross-fitting the outcome model, upweights
observations with low propensity scores.
Returns
-------
y0_dists : list
list of batched scipy distributions whose shapes sum to n.
the ith distribution is the out-of-sample estimate of
the conditional law of :math:`Y_i(0) | X[i]`
y1_dists : list
list of batched scipy distributions whose shapes sum to n.
the ith distribution is the out-of-sample estimate of
the conditional law of :math:`Y_i(1) | X[i]`
"""
# if pis not supplied: will use cross-fitting
if self.pis is None:
self.fit_propensity_scores(nfolds=nfolds, verbose=verbose)
# Fit model
if self.y0_dists is None or self.y1_dists is None:
# Note: this returns the existing model
# if an existing model is provided
self.outcome_model = get_default_model(
discrete=self.discrete,
support=self.support,
outcome_model=self.outcome_model,
**self.model_kwargs
)
if verbose:
print("Cross-fitting the outcome model.")
# Possibly create sample weights
if weight_by_propensities:
self.sample_weight = 1 / self.pis.copy()
self.sample_weight[self.W == 0] = 1 / (1 - self.pis[self.W == 0])
else:
self.sample_weight = None
y_out = dist_reg.cross_fit_predictions(
W=self.W, X=self.X, y=self.y,
sample_weight=self.sample_weight,
nfolds=nfolds,
model=self.outcome_model,
model_selector=self.model_selector,
verbose=verbose,
)
counterfactuals, self.model_fits, self.oos_dist_preds = y_out
self.y0_dists = counterfactuals[0]
self.y1_dists = counterfactuals[1]
elif not suppress_warning:
warnings.warn(CROSSFIT_WARNING)
return self.y0_dists, self.y1_dists
def _compute_oos_resids(self):
# compute out-of-sample predictions
self._compute_cond_means()
self.oos_preds = self.mu0.copy()
self.oos_preds[self.W == 1] = self.mu1[self.W == 1]
# residuals and return
self.oos_resids = self.y - self.oos_preds
return self.oos_resids
[docs]
def fit(
self,
nfolds: int = 5,
aipw: bool = True,
alpha: float = 0.05,
y0_dists: Optional[list[rv_generic]] = None,
y1_dists: Optional[list[rv_generic]] = None,
verbose: bool = True,
suppress_warning: bool = False,
weight_by_propensities: bool = False,
**solve_kwargs,
):
"""
Main function which (1) performs cross-fitting, (2) computes
optimal dual variables, and (3) computes final dual bounds.
Parameters
----------
nfolds : int
Number of folds to use when cross-fitting. Defaults to 5.
alpha : float
Nominal coverage level. Defaults to 0.05.
aipw : bool
If true, returns AIPW estimator.
y0_dists : list
The ith distribution of y0_dists represents the conditional
law of :math:`Y_i(0) | X_i`. There are two input formats:
- batched scipy distribution of shape (n,)
- list of scipy dists whose shapes add up to n.
This is an optional input; if provided, ``outcome_model``
will be ignored.
y1_dists : list
The ith distribution of y1_dists represents the conditional
law of :math:`Y_i(1) | X_i`. There are two input formats:
- batched scipy distribution of shape (n,)
- list of scipy dists whose shapes add up to n.
This is an optional input; if provided, ``outcome_model``
will be ignored.
verbose : bool
If True, gives occasional progress reports.
suppress_warning : bool
If True, suppresses a warning about cross-fitting.
weight_by_propensities : bool
If True, when cross-fitting the outcome model, upweights
observations with low propensity scores.
solve_kwargs : dict
Additional (optional) kwargs for the ``compute_dual_variables``
method, e.g. ``nvals0``, ``nvals1``, ``grid_size``.
Returns
-------
self
"""
# Fit model of W | X and Y | X if not provided
self.y0_dists, self.y1_dists = y0_dists, y1_dists
self.cross_fit(
nfolds=nfolds,
suppress_warning=suppress_warning,
verbose=verbose,
weight_by_propensities=weight_by_propensities,
)
# compute dual variables
self.compute_dual_variables(
y0_dists=self.y0_dists,
y1_dists=self.y1_dists,
verbose=verbose,
**solve_kwargs,
)
# compute dual bounds
self.alpha = alpha
self._compute_final_bounds(aipw=aipw, alpha=alpha)
return self
def _plug_in_results(self):
pests, pses, pcis = utilities.compute_est_bounds(
summands=self.objvals,
alpha=self.alpha
)
return pd.DataFrame(
np.stack(
[pests, pses, pcis],
axis=0
),
index=['Estimate', 'SE', 'Conf. Int.'],
columns=['Lower', 'Upper']
)
[docs]
def results(self, minval: float=-np.inf, maxval: float=np.inf):
"""
Returns a dataframe of key inferential results.
Parameters
----------
minval : float
Analytical lower bound on estimand used to clip results.
Defaults to -np.inf.
maxval : float
Analytical upper bound on estimand used to clip results.
Defaults to np.inf.
Returns
-------
results : pd.DataFrame
DataFrame of key inferential results.
"""
self.results_ = pd.DataFrame(
np.stack(
[
np.clip(self.estimates, minval, maxval),
self.ses,
np.clip(self.cis, minval, maxval),
],
axis=0
),
index=['Estimate', 'SE', 'Conf. Int.'],
columns=['Lower', 'Upper']
)
return self.results_
[docs]
def plot_dual_variables(self, i=0):
"""
Plots the estimated dual variables for the ith data-point.
Parameters
----------
i : int
Integer ranging from 0 to n-1, specifying which datapoint to plot.
"""
import matplotlib.pyplot as plt
fig, axes = plt.subplots(1, 2, figsize=(10, 4))
for name, j, ax in zip(['Lower', 'Upper'], [0,1], axes):
# Plot dual variables
for yvals, nus, color, label in zip(
[self.y0_vals[i], self.y1_vals[i]],
[self.nu0s[j, i], self.nu1s[j, i]],
['red', 'blue'],
['Control', 'Treatment'],
):
axes[j].scatter(yvals, nus, color=color, label=label)
# Plot realized value
hatnu = self.hatnu0s[j, i] if self.W[i] == 0 else self.hatnu1s[j, i]
axes[j].axvline(self.y[i], color='black', label='Realized outcome value')
axes[j].scatter(self.y[i], hatnu, color='red' if self.W[i] == 0 else 'blue')
axes[j].set(xlabel='Outcome', ylabel='Dual variable', title=f'{name} Bound')
axes[j].legend()
plt.show()
[docs]
def diagnostics(self, plot=False, aipw=True):
"""
Reports a set of technical diagnostics.
Parameters
----------
plot : bool
If True, creates a set of diagnostic plots.
Returns
-------
df : pd.DataFrame
DataFrame of technical diagnostic information.
Notes
-----
Please see the user guide for more details on the
meaning of the outputs.
"""
summands = self.aipw_summands if aipw else self.ipw_summands
aipw_name = 'AIPW' if aipw else 'IPW'
# Plot AIPW dual summands
if plot:
import matplotlib.pyplot as plt
fig, axes = plt.subplots(1, 2, figsize=(10, 4))
for j, name in zip([0,1], ['Lower', 'Upper']):
for w, color, label in zip([0,1], ['red', 'blue'], ['Control', 'Treatment']):
axes[j].scatter(
self.y[self.W == w],
self.aipw_summands[j][self.W == w],
color=color,
label=label
)
axes[j].legend()
axes[j].set(xlabel='Outcome', ylabel=f'Dual {aipw_name} Summand', title=f"{name} Dual Bound")
plt.show()
# Leverage
s2s = (summands - summands.mean(axis=1).reshape(-1, 1))**2
leverages = np.max(s2s, axis=1) / s2s.sum(axis=1)
# Max contribution
max_contribs = np.array(
[summands[0].min() / self.n, summands[1].max() / self.n]
)
return pd.DataFrame(
np.stack(
[self.objdiffs.mean(axis=1), leverages, max_contribs],
axis=0
),
index=['Loss from gridsearch', 'Max leverage', f'Worst dual {aipw_name} summand'],
columns=['Lower', 'Upper']
)
[docs]
def eval_outcome_model(self):
"""
Thinly wraps ``dist_reg._evaluate_model_predictions``.
Returns
-------
sumstats : pd.DataFrame
DataFrame summarizing goodness-of-fit metrics for
the cross-fit outcome model.
"""
self._compute_oos_resids()
return dist_reg._evaluate_model_predictions(
y=self.y, haty=self.oos_preds
)
[docs]
def eval_treatment_model(self):
"""
Thinly wraps ``dist_reg._evaluate_model_predictions``.
Returns
-------
sumstats : pd.DataFrame
DataFrame summarizing goodness-of-fit metrics for
the cross-fit propensity scores.
"""
return dist_reg._evaluate_model_predictions(
y=self.W, haty=self.pis
)
[docs]
def summary(self, minval: float=-np.inf, maxval: float=np.inf):
"""
Prints a summary of main results from the class.
Parameters
----------
minval : float
Analytical lower bound on estimand used to clip results.
Defaults to -np.inf.
maxval : float
Analytical upper bound on estimand used to clip results.
Defaults to np.inf.
Returns
-------
None
Notes
-----
To access the results a DataFrame, call:
- ``DualBounds.results()`` for inferential results
- ``DualBounds.eval_outcome_model()`` for outcome model metrics
- ``DualBounds.eval_treatment_model()`` for treatment model metrics
- ``DualBounds._plug_in_results()`` for nonrobust plug-in bounds
- ``DualBounds.diagnostics()`` for technical diagnostic information
"""
print("___________________Inference_____________________")
print(self.results(minval=minval, maxval=maxval))
print()
print("_________________Outcome model___________________")
print(self.eval_outcome_model())
print()
print("_________________Treatment model_________________")
print(self.eval_treatment_model())
print()
print("______________Nonrobust plug-in bounds___________")
print(self._plug_in_results())
print()
# possibly print diagnostics. This logic is useful
# because some classes inheriting from
# DualBounds (e.g. VarCATEDualBounds) don't produce diagnostics.
diagnostics = self.diagnostics(plot=False)
if diagnostics is not None:
print("_______________Technical diagnostics_____________")
print(diagnostics)
print()
def _plug_in_no_covariates(
f: callable,
y: np.array,
W: np.array,
pis: Optional[np.array]=None,
max_nvals: int=1000,
) -> float:
"""
Internal function which does the work for plug_in_no_covariates
and gets called in each bootstrap iteration.
"""
n = len(y)
# Create empirical distributions for treatment/control
y0_vals = y[W == 0]
y0_probs = 1 / (1-pis[W==0]); y0_probs /= y0_probs.sum()
y1_vals = y[W == 1]
y1_probs = 1 / (pis[W==1]); y1_probs /= y1_probs.sum()
# Reduce dimension to prevent memory errors for huge datasets
qs = np.linspace(1/(max_nvals+1), max_nvals/(max_nvals+1), max_nvals)
if len(y0_vals) > max_nvals:
y0_vals = utilities.weighted_quantile(y0_vals, y0_probs, quantiles=qs)
y0_probs = np.ones(len(y0_vals)) / len(y0_vals)
if len(y1_vals) > max_nvals:
y1_vals = utilities.weighted_quantile(y1_vals, y1_probs, quantiles=qs)
y1_probs = np.ones(len(y1_vals)) / len(y1_vals)
# Fvals
fvals = f(y0_vals.reshape(-1, 1), y1_vals.reshape(1, -1), x=0)
# lower and upper
lower = ot.lp.emd2(a=y0_probs, b=y1_probs, M=fvals)
upper = -ot.lp.emd2(a=y0_probs, b=y1_probs, M=-fvals)
return np.array([lower, upper])
[docs]
def plug_in_no_covariates(
outcome: np.array,
treatment: np.array,
f: callable,
propensities: Optional[np.array]=None,
clusters: Optional[np.array]=None,
B: int=0,
verbose: bool=True,
alpha: float=0.05,
max_nvals: int=1000,
_which_bound='both',
) -> dict:
"""
Computes plug-in bounds on :math:`E[f(Y(0),Y(1))]` without using covariates.
Parameters
----------
outcome : np.array
n-length array of outcomes (y)
treatment : np.array
n-length array of treatments (W).
f : function
f(y0, y1, x) defines the objective.
propensities : np.array
n-length array of propensity scores (pis).
Default: all equal to treatment.mean().
clusters : np.array
Optional n-length array of clusters, so ``clusters[i] = j``
indicates that observation i is in cluster j.
B : int
Number of bootstrap replications to compute standard errors.
Defaults to 0 (no standard errors).
verbose : bool
Show progress bar while bootstrapping if verbose=True.
alpha : float
nominal Type I error level.
max_nvals : int
Maximum dimension of optimal transport problem.
_which_bound : str
One of 'both', 'lower', 'upper'.
Returns
-------
results : dict
Dictionary containing up to three keys:
- estimates: 2-length array of lower/upper estimates.
- ses: 2-length array of lower/upper standard errors.
- cis: 2-length array of lower/upper confidence intervals.
These arrays will be length 1 (instead of 2) if which_bound != 'both'.
"""
# Infer propensities
if propensities is None:
propensities = np.ones(len(treatment)) * treatment.mean()
# Create estimates
estimates = _plug_in_no_covariates(
f=f,
y=outcome,
W=treatment,
pis=propensities,
max_nvals=max_nvals
)
if B == 0:
return dict(estimates=estimates)
# Compute bootstrapped SEs
data = np.stack([outcome, treatment, propensities], axis=1)
func = lambda data: _plug_in_no_covariates(
f=f,
y=data[:, 0],
W=data[:, 1],
pis=data[:, 2],
max_nvals=max_nvals,
)
ses = utilities.cluster_bootstrap_se(
data=data,
clusters=clusters,
func=func,
B=B,
verbose=verbose,
)[0]
cis = estimates.copy()
cis[0] -= stats.norm.ppf(1-alpha/2) * ses[0]
cis[1] += stats.norm.ppf(1-alpha/2) * ses[1]
return dict(
estimates=estimates,
ses=ses,
cis=cis,
)