Source code for dualbounds.lee

import warnings
import numpy as np
import pandas as pd
import cvxpy as cp
from scipy import stats
from . import utilities
from . import dist_reg
from .utilities import BatchedCategorical
from . import interpolation, generic
from .delta import _delta_bootstrap_ses
from typing import Optional, Union


### Helper functions ###

def compute_cvar(dists, n: int, alpha: float, lower: bool=True, m: int=1000):
	"""
	Computes cvar using quantile approximation with m values.

	Parameters
	----------
	dists : stats.dist
		scipy distribution function of shape n
	n : int
		Batch dimension
	alpha : array or float	
		float or n-length array
	lower : bool
		If true, computes the lower CVAR. 
		Else computes the upper CVAR.
	m : int
		Number of interpolation points
	
	Returns
	-------
	cvars : array
		n-length array.
		E[Y | Y <= Q_{alpha}(Y)] from Y ~ dists if lower = True.
		If lower = False, replaces <= with >=.
	"""

	if isinstance(alpha, float) or isinstance(alpha, int):
		alpha = alpha * np.ones((n))
	# find quantiles
	if lower:
		qs = np.linspace(1/(m+1), alpha, m)
	else:
		qs = np.linspace(alpha, m/(m+1), m)
	# take average
	qmc = dists.ppf(q=qs)
	if qmc.shape != (m, n):
		raise ValueError(f"Unexpected shape of qmc={qmc.shape}")
	cvar_est = qmc.mean(axis=0)
	return cvar_est

[docs] def compute_analytical_lee_bound( s0_probs, s1_probs, y0_dists=None, y1_dists=None, # optional, y0_probs=None, y1_probs=None, y0_vals=None, y1_vals=None, m=1000, ): """ Helper function to compute semi-analytical Lee Bounds. Unlike dual bounds, this function is not at all robust to model misspecification. The estimand is :math:`E[Y(1) - Y(0) | S(0) = S(1) = 1]` where :math:`Y(1), Y(0)` are potential outcomes and :math:`S(1), S(0)` are post-treatment selection events. Parameters ---------- s0_probs : np.array n-length array where s0_probs[i] = P(S(0) = 1 | Xi) s1_probs : np.array n-length array where s1_probs[i] = P(S(1) = 1 | Xi) y0_dists : np.array batched scipy distribution of shape (n,) where the ith distribution is the conditional law of Yi(0) | S(0) = 1, Xi y1_dists : np.array batched scipy distribution of shape (n,) where the ith distribution is the conditional law of Yi(1) | S(1) = 1, Xi y0_vals : np.array nvals0-length array of values y0 can take. y0_probs : np.array (n, nvals0)-length array where y0_probs[i, j] = P(Y(0) = yvals0[j] | S(0) = 1, Xi) y1_vals : np.array (n,nvals1) array of values y1 can take. y1_probs : np.array (n, nvals1) array where y0_probs[i, j] = P(Y(1) = yvals1[j] | S(1) = 1, Xi) m : int Number of quantile discretizations to use when computing CVAR. m = 1000 (default) is more than sufficient. Returns ------- agg_bounds : np.array (2,)-length array of lower and upper bound. This integrates across all n y0_dists/y1_dists, etc. cond_bounds : np.array (2, n)-length array where bounds[0,i] is the ith lower bound and bounds[1,i] is the ith upper bound conditional on :math:`X_i`. """ # Parse arguments n = s0_probs.shape[0] if y0_dists is None: y0_dists = BatchedCategorical( vals=y0_vals, probs=y0_probs ) if y1_dists is None: y1_dists = BatchedCategorical( vals=y1_vals, probs=y1_probs ) # always-takers share alphas = s0_probs / s1_probs if np.any(alphas > 1): raise ValueError(f"Monotonicity is violated for indices of alphas={alphas[alphas > 1]}") # compute E[Y(1) | Y(1) >= Q(alpha), S(1)=1] # (or <= depending on the value of lower) cvars_lower = compute_cvar(y1_dists, n, alpha=alphas, lower=True, m=m) cvars_upper = compute_cvar(y1_dists, n, alpha=1-alphas, lower=False, m=m) y0ms = y0_dists.mean() cond_bounds = np.stack([cvars_lower - y0ms, cvars_upper - y0ms], axis=0) # per-X bounds agg_bounds = np.mean(cond_bounds * s0_probs, axis=1) / np.mean(s0_probs) # aggregated bounds return agg_bounds, cond_bounds
def _lee_bound_no_covariates( y: np.array, W: np.array, S: np.array, pis: np.array ): """ Helper function which gets called by the public variant. """ # compute P(S | W) s0_prob = np.array([np.mean(S[W == 0])]) s1_prob = np.array([np.mean(S[W == 1])]) s1_prob = np.maximum(s0_prob, s1_prob) # compute P(Y(0)) flags0 = (W == 0) & (S == 1) y0_vals = y[flags0] y0_probs = 1 / (1-pis[flags0]); y0_probs /= y0_probs.sum() inds0 = np.argsort(y0_vals) y0_vals = y0_vals[inds0]; y0_probs = y0_probs[inds0] # compute P(Y(1)) flags1 = (W == 1) & (S == 1) y1_vals = y[flags1] y1_probs = 1 / (1-pis[flags1]); y1_probs /= y1_probs.sum() inds1 = np.argsort(y1_vals) y1_vals = y1_vals[inds1]; y1_probs = y1_probs[inds1] # Construct argument list args = dict( s0_probs=s0_prob, s1_probs=s1_prob, y0_probs=y0_probs.reshape(1, -1), y0_vals=y0_vals.reshape(1, -1), y1_probs=y1_probs.reshape(1, -1), y1_vals=y1_vals.reshape(1, -1) ) # compute lower, upper bounds return compute_analytical_lee_bound(**args)[1][:, 0]
[docs] def lee_bound_no_covariates( outcome: np.array, treatment: np.array, selections: np.array, propensities: Optional[np.array]=None, clusters: Optional[np.array]=None, B: int=200, alpha: float=0.05, verbose=False, ): """ Computes plug-in Lee bounds without using covariates. Parameters ---------- outcome : np.array n-length array of outcomes (y) treatment : np.array n-length array of treatments (W). selections : np.array n-length array of selection indicators (S). propensities : np.array n-length array of propensity scores (pis). Default: all equal to treatment.mean(). clusters : np.array Optional n-length array of clusters, so ``clusters[i] = j`` indicates that observation i is in cluster j. B : int Number of bootstrap replications to compute standard errors. Defaults to 0 (no standard errors). alpha : float nominal Type I error level. verbose : bool Show progress bar while bootstrapping if verbose=True. Returns ------- results : dict Dictionary containing up to three keys: - estimates: 2-length array of lower/upper estimates. - ses: 2-length array of lower/upper standard errors. - cis: 2-length array of lower/upper confidence intervals. """ # Infer propensities if propensities is None: propensities = np.ones(len(treatment)) * treatment.mean() # Create estimates estimates = _lee_bound_no_covariates( y=outcome, W=treatment, S=selections, pis=propensities, ) if B == 0: return dict(estimates=estimates) # Compute bootstrapped SEs data = np.stack([outcome, treatment, propensities, selections], axis=1) func = lambda data: _lee_bound_no_covariates( y=data[:, 0], W=data[:, 1], pis=data[:, 2], S=data[:, 3], ) ses = utilities.cluster_bootstrap_se( data=data, clusters=clusters, func=func, B=B, verbose=verbose, )[0] cis = estimates.copy() cis[0] -= stats.norm.ppf(1-alpha/2) * ses[0] cis[1] += stats.norm.ppf(1-alpha/2) * ses[1] return dict( estimates=estimates, ses=ses, cis=cis, )
def lee_delta_method_se( sbetas, skappas, sgammas ): # estimate hat_beta = sbetas.mean() hat_kappa = skappas.mean() hat_gamma = sgammas.mean() hattheta = (hat_beta - hat_kappa) / hat_gamma # standard error hatSigma = np.cov( np.stack([sbetas, skappas, sgammas], axis=0) ) # 3 x 3 cov matrix grad = np.array([ 1 / hat_gamma, - 1 / hat_gamma, -(hat_beta - hat_kappa) / (hat_gamma**2) ]) # estimate se = np.sqrt(grad @ hatSigma @ grad / len(sbetas)) return hattheta, se
[docs] class LeeDualBounds(generic.DualBounds): """ Computes dual bounds on the ATE under selection bias. Precisely, this class bounds :math:`E[Y(1) - Y(0) | S(0) = S(1) = 1]` where :math:`Y(1), Y(0)` are potential outcomes and :math:`S(1), S(0)` are post-treatment selection events. These bounds assume monotonicity, i.e., :math:`S(1) >= S(0)` a.s. (see Lee 2009). Parameters ---------- selections : np.array n-length array-like of binary selection indicators outcome : np.array | pd.Series n-length array of outcome measurements (Y). treatment : np.array | pd.Series n-length array of binary treatment (W). covariates : np.array | pd.Series (n, p)-shaped array of covariates (X). propensities : np.array | pd.Series n-length array-like of propensity scores :math:`P(W=1 | X)`. If ``None``, will be estimated from the data. outcome_model : str | dist_reg.DistReg The model for estimating the law of :math:`Y | X, W`. Two options: - A str identifier, e.g., 'ridge', 'lasso', 'elasticnet', 'randomforest', 'knn'. - An object inheriting from ``dist_reg.DistReg``. E.g., when ``outcome`` is continuous, the default is ``outcome_model=dist_reg.CtsDistReg(model_type='ridge')``. propensity_model : str | sklearn classifier How to estimate the propensity scores if they are not provided. Two options: - A str identifier, e.g., 'ridge', 'lasso', 'elasticnet', 'randomforest', 'knn'. - An sklearn classifier, e.g., ``sklearn.linear_model.LogisticRegressionCV()``. selection_model : str | dist_reg.BinaryDistReg How to estimate the selection probabilities :math:`P(S =1 | W, X)`. Two options: - A str identifier, i.e., 'monotone_logistic', 'ridge', 'lasso'. - An object inheriting from ``dist_reg.BinaryDistReg``. The default is ``monotone_logistic``. support : np.array Optional support of the outcome, if known and discrete. Defaults to ``None`` (inferred from the data). model_kwargs : dict Additional kwargs for the ``outcome_model``, e.g., ``feature_transform``. See :class:`dualbounds.dist_reg.CtsDistReg` or :class:`dualbounds.dist_reg.BinaryDistReg` for more kwargs. """ def __init__( self, selections: Union[np.array, pd.Series], *args, selection_model: Optional[Union[str, dist_reg.BinaryDistReg]]=None, **kwargs ): # Main initialization kwargs['f'] = None super().__init__(*args, **kwargs) # Initialization for S self.selection_model = selection_model if isinstance(selections, pd.Series) or isinstance(selections, pd.DataFrame): selections = selections.values.reshape(self.n) self.S = utilities._binarize_variable(selections, var_name='selections') def _ensure_feasibility( self, i, nu0, nu1, lower, ymin, ymax, grid_size=10000, ): """ ensures nu0 + nu1 <= fvals (if lower) or nu0 + nu1 >= fvals (if upper) by performing a gridsearch. Parameters ---------- nu1 : nvals1-length array note nu1[0] is the dual variable for when s1=1. Returns ------- new_nu0 : np.array ``nvals0`` length array of new dual vars. for Y(0) new_nu1 : np.array ``nvals1`` length array of new dual vars. for Y(1) dx : float Constant which can be subtracted off to obtain valid dual variables. I.e., new_nk = nuk - dx/2. """ y1_vals = self.y1_vals_adj[i] if y1_vals[0] != 0: raise ValueError( "Expected y1_vals[0] to equal zero; this should be the case when s1 = 0." ) dxs = [] new_yvals = np.linspace(ymin, ymax, grid_size) interp_nu = self.interp_fn( x=y1_vals[1:], y=nu1[1:], newx=new_yvals, ) # only three options due to monotonicity # if lower: nu0[s0] + nu1[s1 * y1] <= s0 s1 y1 # if not lower: nu0[s0] + nu1[s1 * y1] >= s0 s1 y1 for s0, s1 in zip( [0, 0, 1], [0, 1, 1] ): if s1 == 0: dxs.append(nu0[s0] + nu1[0]) else: deltas = nu0[s0] + interp_nu - s0 * s1 * new_yvals if lower: dxs.append(np.max(deltas)) else: dxs.append(np.min(deltas)) # return valid dual variables if lower: dx = np.max(np.array(dxs)) else: dx = np.min(np.array(dxs)) return nu0 - dx/2, nu1 - dx/2, dx
[docs] def compute_dual_variables( self, s0_probs, s1_probs, y1_dists=None, y1_vals=None, y1_probs=None, verbose=False, nvals=100, ymin=None, ymax=None, **kwargs, ): """ Estimates dual variables using the outcome model. We generally recommend that the user call .fit() instead of calling this function directly. Parameters ---------- s0_probs : np.array n-length array where s0_probs[i] = :math:`P(S_i(0) = 1 | X_i)`. s1_probs : np.array n-length array where s1_probs[i] = :math:`P(S_i(1) = 1 | X_i)`. y1_dists : list The ith distribution of y1_dists represents the conditional law of :math:`Y_i(1) | X_i, S_i(1) =1`. There are two input formats: - batched scipy distribution of shape (n,) - list of scipy dists whose shapes add up to n. y1_vals : np.array (n, nvals1)-length array where ``y1_vals[i]`` is the support of :math:`Y_i(1)`. Ignored if ``y1_dists`` is provided. y1_probs : np.array (n, nvals1)-length array where ``y1_probs[i, j]`` is the estimated probability that :math:`Y_i(1)` equals ``y1_vals[i, j].`` verbose : bool If True, prints progress reports. nvals : int Number of values to use when discretizing Y(1). ymin : float Minimum value of Y(1) to use numerically. ymax : float Maximum value of Y(1) to use numerically. kwargs : dict kwargs for _ensure_feasibility method. Includes ymin, ymax, grid_size. """ # Constants for solver if verbose: print("Estimating optimal dual variables.") n = s0_probs.shape[0] self.nvals0 = 2 # because S is binary if self.discrete: self.nvals1 = len(self.support) + 1 else: self.nvals1 = nvals self.interp_fn = interpolation.linear_interpolate ymin = ymin if ymin is not None else self.y.min() ymax = ymax if ymax is not None else self.y.max() # discretize if Y is continuous if y1_vals is None or y1_probs is None: # tolerance parameter min_quantile = min([np.min(s0_probs/s1_probs), np.min(1 - s0_probs/s1_probs)]) # law of Y(1) | S(1) = 1, Xi y1_vals, y1_probs = self._discretize( y1_dists, nvals=self.nvals1-1, min_quantile=min_quantile/2, ninterp=1, ymin=ymin, ymax=ymax, ) # ensure y1_vals, y1_probs are sorted self.y1_vals, self.y1_probs = utilities._sort_disc_dist(vals=y1_vals, probs=y1_probs) del y1_vals, y1_probs ## Adjust to make it law of Y(1) S(1) | Xi # note: it is important that the value when S(1) = 0 # is the first value on the second axis # so that the monotonicity constraint is correct. self.y1_vals_adj = np.concatenate( [np.zeros((n, 1)), self.y1_vals], axis=1 ) self.y1_probs_adj = np.concatenate( [ 1 - s1_probs.reshape(-1, 1), s1_probs.reshape(-1, 1) * self.y1_probs ], axis=1 ) # useful constants s0_vals = np.array([0, 1]).reshape(-1, 1) # Initialize results self.n = self.y1_vals.shape[0] self.nu0s = np.zeros((2, n, self.nvals0)) # first dimension = [lower, upper] self.nu1s = np.zeros((2, n, self.nvals1)) self.hatnu0s = np.zeros((2, self.n)) self.hatnu1s = np.zeros((2, self.n)) # estimated cond means of nu0s, nu1s self.c0s = np.zeros((2, self.n)) self.c1s = np.zeros((2, self.n)) # objective values self.objvals = np.zeros((2, self.n)) self.dxs = np.zeros((2, self.n)) # loop through for i in utilities.vrange(self.n, verbose=verbose): # set parameter values fvals = ( s0_vals * self.y1_vals_adj[i].reshape(1, -1) ).astype(float) # below is the max val. used instead of inf to relax # constraints. this provably has no effect compared # to using np.inf, but allows the use of ot instead # of cvxpy, leading to a substantial speedup. max_fval = np.abs(fvals).max() * 1e7 # helpful concatenation probs0 = np.array([1 - s0_probs[i], s0_probs[i]]) # solve for lower in [1, 0]: # Relax constraints due to monotonicity fvals[1][0] = max_fval if lower else -max_fval nu0x, nu1x, objval = self._solve_single_instance( i=i, probs0=probs0, probs1=self.y1_probs_adj[i], y0_vals=np.array([0,1]), y1_vals=self.y1_vals_adj[i], fvals=fvals, not_binding=np.zeros(fvals.shape).astype(bool), lower=lower ) self.objvals[1 - lower, i] = objval if not self.discrete: nu0x, nu1x, dx = self._ensure_feasibility( i=i, nu0=nu0x, nu1=nu1x, lower=lower, ymin=ymin, ymax=ymax, **kwargs, ) else: dx = 0 # Save intermediate quantities self.nu0s[1 - lower, i] = nu0x self.nu1s[1 - lower, i] = nu1x self.c0s[1 - lower, i] = nu0x @ probs0 self.c1s[1 - lower, i] = nu1x @ self.y1_probs_adj[i] self.dxs[1 - lower, i] = dx self._compute_realized_dual_variables(y=self.y, S=self.S)
def _compute_realized_dual_variables(self, y=None, S=None): y = self.y if y is None else y S = self.S if S is None else S ### Compute realized hatnu1s/hatnu0s self.hatnu0s = np.zeros((2, self.n)) self.hatnu1s = np.zeros((2, self.n)) for i in range(self.n): for lower in [0, 1]: nu0x = self.nu0s[1-lower, i] nu1x = self.nu1s[1-lower, i] # Set values self.hatnu0s[1 - lower, i] = nu0x[S[i]] if S[i] == 0: self.hatnu1s[1 - lower, i] = nu1x[0] if not self.discrete and S[i] == 1: self.hatnu1s[1 - lower, i] = self.interp_fn( x=self.y1_vals_adj[i][1:], y=nu1x[1:], newx=y[i], )[0] if self.discrete and S[i] == 1: j = np.argmin(np.abs(self.y1_vals_adj[i][1:] - y[i])) self.hatnu1s[1 - lower, i] = nu1x[j+1]
[docs] def cross_fit( self, nfolds: int=5, suppress_warning: bool=False, verbose: bool=True, ): """ Cross-fits the outcome and selection models. Parameters ---------- nfolds : int Number of folds to use in cross-fitting. suppress_warning : bool If True, suppresses the warning about manual crossfitting. verbose : bool If True, prints progress reports. Returns ------- s0_probs : np.array n-length array where s0_probs[i] = :math:`P(S_i(0) = 1 | X_i)`. s1_probs : np.array n-length array where s1_probs[i] = :math:`P(S_i(1) = 1 | X_i)`. y0_dists : np.array list of batched scipy distributions whose shapes sum to n. the ith dist. is the conditional law of :math:`Y_i(0) | S_i(0) = 1, X_i`. y1_dists : list list of batched scipy distributions whose shapes sum to n. the ith dist. is the conditional law of :math:`Y_i(1) | S_i(1) = 1, X_i`. """ # estimate selection probs if self.s0_probs is None or self.s1_probs is None: if self.selection_model is None: self.selection_model = 'monotone_logistic' self.selection_model = generic.get_default_model( outcome_model=self.selection_model, # the following args are ignored if selection_model already # inherits from dist_reg.DistReg class support=set([0,1]), discrete=True, monotonicity=True, how_transform='intercept', ) if verbose: print("Cross-fitting the selection model.") sout = dist_reg.cross_fit_predictions( W=self.W, X=self.X, y=self.S, nfolds=nfolds, model=self.selection_model, verbose=verbose, probs_only=True, ) counterfactuals, self.selection_model_fits, self.S_oos_preds = sout self.s0_probs, self.s1_probs = counterfactuals elif not suppress_warning: warnings.warn( generic.CROSSFIT_WARNING.replace("y0_", "s0_").replace("y1_", "s1_") ) # Estimate outcome model if self.y0_dists is None or self.y1_dists is None: self.outcome_model = generic.get_default_model( discrete=self.discrete, support=self.support, outcome_model=self.outcome_model, ) if verbose: print("Cross-fitting the outcome model.") yout = dist_reg.cross_fit_predictions( W=self.W, X=self.X, S=self.S, y=self.y, nfolds=nfolds, model=self.outcome_model, verbose=verbose, ) counterfactuals, self.model_fits, self.oos_dist_preds = yout self.y0_dists, self.y1_dists = counterfactuals elif not suppress_warning: warnings.warn(generic.CROSSFIT_WARNING) # return return self.s0_probs, self.s1_probs, self.y0_dists, self.y1_dists
[docs] def fit( self, nfolds=5, alpha=0.05, aipw=True, s0_probs=None, s1_probs=None, y0_dists=None, y1_dists=None, suppress_warning=False, verbose=True, **solve_kwargs, ): """ Main function which (1) performs cross-fitting, (2) computes optimal dual variables, and (3) computes final dual bounds. Parameters ---------- nfolds : int Number of folds to use when cross-fitting. Defaults to 5, alpha : float Nominal coverage level. Defaults to 0.05. aipw : bool If true, returns AIPW estimator. s0_probs : np.array Optional n-length array where s0_probs[i] = :math:`P(S_i(0) = 1 | X_i)`. If not provided, will be estimated from the data. s1_probs : np.array Optional n-length array where s1_probs[i] = :math:`P(S_i(1) = 1 | X_i)`. If not provided, will be estimated from the data. y0_dists : np.array Optional list of batched scipy distributions whose shapes sum to n. the ith dist. is the conditional law of :math:`Y_i(0) | S_i(0) = 1, X_i`. If not provided, will be estimated from the data. y1_dists : np.array Optional list of batched scipy distributions whose shapes sum to n. The ith dist. is the conditional law of :math:`Y_i(1) | S_i(1) = 1, X_i`. If not provided, will be estimated from the data. suppress_warning : bool If True, suppresses warning about cross-fitting. verbose : bool If True, gives occasional progress reports.. solve_kwargs : dict kwargs to self.compute_dual_variables(), e.g., ``verbose``, ``nvals``, ``grid_size`` Returns ------- self : object """ # Save data self.s0_probs, self.s1_probs = s0_probs, s1_probs self.y0_dists, self.y1_dists = y0_dists, y1_dists # if pis not supplied: will use cross-fitting if self.pis is None: self.fit_propensity_scores(verbose=verbose, nfolds=nfolds) # fit outcome models using cross-fitting self.cross_fit( verbose=verbose, nfolds=nfolds, suppress_warning=suppress_warning, ) # compute dual variables self.compute_dual_variables( s0_probs=self.s0_probs, s1_probs=self.s1_probs, y1_dists=self.y1_dists, ymin=self.y.min(), ymax=self.y.max(), verbose=verbose, **solve_kwargs, ) # compute dual bounds self._compute_final_bounds(aipw=aipw, alpha=alpha) return self
def _compute_final_bounds(self, aipw=True, alpha=0.05): """ Computes final bounds based in (A)IPW summands, using the delta method for E[Y(1) - Y(0) | S(1) = S(0) = 1]. Uses the bootstrap for clustered standard errors. """ self._compute_ipw_summands() summands = self.aipw_summands if aipw else self.ipw_summands self._compute_cond_means() self.y0s0_cond_means = self.mu0 * self.s0_probs self.s_probs = self.s0_probs.copy() self.s_probs[self.W == 1] = self.s1_probs[self.W == 1] ests = [] ses = [] bounds = [] scale = stats.norm.ppf(1-alpha/2) # kappa = E[Y(0) S(0)] skappas = (1 - self.W) * (self.y * self.S - self.y0s0_cond_means) skappas = skappas / (1-self.pis) skappas += self.y0s0_cond_means self.skappas = skappas # gamma = E[S(0)] sgammas = (1-self.W) * (self.S - self.s0_probs) / (1-self.pis) sgammas += self.s0_probs self.sgammas = sgammas if self.clusters is None: for lower in [1, 0]: # beta = part. identifiable component E[Y(1) S(0)] sbetas = summands[1-lower] hattheta, se = lee_delta_method_se( sbetas=sbetas, skappas=skappas, sgammas=sgammas, ) ests.append(hattheta) ses.append(se) if lower: bounds.append(hattheta - scale * se) else: bounds.append(hattheta + scale * se) self.estimates = np.array(ests) self.ses = np.array(ses) self.cis = np.array(bounds) return dict( estimates=self.estimates, ses=self.ses, cis=self.cis ) else: self.estimates, self.ses, self.cis = _delta_bootstrap_ses( h=lambda fval, z0, z1: (fval - z0[0]) / z0[1], summands=summands, z0summands=np.stack([skappas, sgammas], axis=1), z1summands=np.zeros((self.n, 1)), clusters=self.clusters, alpha=alpha, ) # Return return dict( estimates=self.estimates, ses=self.ses, cis=self.cis, )
[docs] def summary(self, minval=-np.inf, maxval=np.inf): """ Prints a summary of main results from the class. Parameters ---------- minval : float Analytical lower bound on estimand used to clip results. Defaults to -np.inf. maxval : float Analytical upper bound on estimand used to clip results. Defaults to np.inf. """ print("___________________Inference_____________________") print(self.results(minval=minval, maxval=maxval)) print() print("________________Selection model__________________") sumstats = dist_reg._evaluate_model_predictions( y=self.S, haty=self.s_probs ) print(sumstats) print() print("_________________Outcome model___________________") self._compute_oos_resids() sumstats = dist_reg._evaluate_model_predictions( y=self.y[self.S == 1], haty=self.oos_preds[self.S == 1], ) print(sumstats) print() print("________________Treatment model__________________") sumstats = dist_reg._evaluate_model_predictions( y=self.W, haty=self.pis ) print(sumstats) print()