Source code for dualbounds.varite

"""
Methods for bounding Var(E[Y(1) - Y(0)]), the
variance of the individual treatment effect (ITE).
"""
import numpy as np
from scipy import stats
from .generic import DualBounds

def compute_analytical_varite_bound(
	n,
	y0_dists,
	y1_dists,
	reps=100,
):
	"""
	Semi-analytical bounds on :math:`Var(Y(0) - Y(1))`.

	Unlike dual bounds, this function is not
	robust to model misspecification.
	
	Parameters
	----------
	n : int
		Number of observations.
	y0_dists : stats.rv_continuous / stats.rv_discrete
		batched scipy distribution of shape (n,) where the ith
		distribution is the conditional law of Yi(0) | Xi
	y1_dists : stats.rv_continuous / stats.rv_discrete
		batched scipy distribution of shape (n,) where the ith
		distribution is the conditional law of Yi(1) | Xi
	reps : int
		Number of samples to take from each distribution.

	Returns
	-------
	lower : float
		Lower bound on Var(Y(1) - Y(0))
	upper : float
		Upper bound on Var(Y(1) - Y(0))
	"""
	# Sample coupled r.v.s
	U = np.random.uniform(size=(reps, n))
	y1 = y1_dists.ppf(U)
	y0l = y0_dists.ppf(U)
	y0u = y0_dists.ppf(1-U)
	# Evaluate bounds
	lower = np.std(y1-y0l)**2
	upper = np.std(y1 - y0u)**2
	return lower, upper

def varite_delta_method_se(
	sbetas, skappa1s, skappa0s
):
	# estimate
	hat_beta = sbetas.mean()
	hat_kappa1 = skappa1s.mean()
	hat_kappa0 = skappa0s.mean()
	ate = hat_kappa1 - hat_kappa0 # average treatment effect
	hattheta = hat_beta - ate**2
	# standard error
	hatSigma = np.cov(
		np.stack([sbetas, skappa1s, skappa0s], axis=0)
	) # 3 x 3 cov matrix
	grad = np.array(
		[1, - 2 * ate, 2 * ate]
	)
	# estimate
	se = np.sqrt(grad @ hatSigma @ grad / len(sbetas))
	return hattheta, se

[docs] class VarITEDualBounds(DualBounds): """ Computes dual bounds on :math:`Var(Y(1) - Y(0)).` The signature of this class is identical to the ``generic.DualBounds`` class. However, the input ``f`` will be ignored. """ def __init__(self, *args, **kwargs): # Initialize with correct f function kwargs['f'] = lambda y0, y1, x: (y0-y1)**2 super().__init__(*args, **kwargs)
[docs] def results(self, minval: float=0, maxval: float=np.inf): # minval is always zero return super().results(minval=0, maxval=maxval)
def _compute_final_bounds(self, aipw=True, alpha=0.05): """ Computes final bounds based in (A)IPW summands, using the delta method for Var(Y(1) - Y(0)). """ self._compute_ipw_summands() summands = self.aipw_summands if aipw else self.ipw_summands self._compute_cond_means() # Note: the notation follows Appendix A.2 of # https://arxiv.org/pdf/2310.08115.pdf (version 1) ests = [] ses = [] bounds = [] scale = stats.norm.ppf(1-alpha/2) for lower in [1, 0]: # part. identifiable component sbetas = summands[1-lower] # kappa1 = E[Y(1)], kappa0 = E[Y(0)] are ident components skappa1s = self.W * (self.y - self.mu1) / self.pis + self.mu1 skappa0s = (1-self.W) * (self.y - self.mu0) / (1-self.pis) + self.mu0 # estimate hattheta, se = varite_delta_method_se( sbetas=sbetas, skappa1s=skappa1s, skappa0s=skappa0s ) ests.append(hattheta) ses.append(se) if lower: bounds.append(hattheta - scale * se) else: bounds.append(hattheta + scale * se) self.estimates = np.array(ests) self.ses = np.array(ses) self.cis = np.array(bounds) return dict( estimates=self.estimates, ses=self.ses, cis=self.cis )