Source code for dualbounds.utilities

import copy
import os
import sys
import time
import numpy as np
import pandas as pd
from scipy import stats
from multiprocessing import Pool
from functools import partial
from itertools import product
from tqdm.auto import tqdm
from typing import Optional, Union

def elapsed(t0):
	return np.around(time.time() - t0, 2)

def vrange(n, verbose=False):
	if not verbose:
		return range(n)
	else:
		return tqdm(list(range(n)))

def haslength(x):
	try:
		len(x)
		return True
	except:
		return False

def floatable(x):
	try:
		float(x)
		return True
	except:
		return False

def itemize(x):
	try:
		return x.item()
	except AttributeError:
		return x

### For processing data
def _binarize_variable(x: np.array, var_name: str):
	"""
	Converts x to a binary r.v. and raises an error
	if it contains more than 2 values.
	"""
	# Check if nans
	if x.dtype in [int, np.int32, np.int64, np.float32, np.float64]:
		if np.any(np.isnan(x)):
			raise ValueError(f"{var_name} has missing values.")

	# Check if binary
	vals = set(list(np.unique(x)))
	if len(vals) > 2:
		raise ValueError(f"{var_name} is not binary and takes {len(vals)} values.")
	elif len(vals - set([0,1])) == 0:
		return x.astype(int)
	else:
		vals = sorted(list(vals))
		print(f"For {var_name}, replacing {vals[0]} with 0 and {vals[1]} with 1.")
		return (x == list(vals)[1]).astype(int)

def process_covariates(data: pd.DataFrame):
	"""
	Performs rudimentary pre-processing of covariates.

	Parameters
	----------
	data : pd.DataFrame
		Dataframe of covariates.

	Returns
	-------
	processed : pd.DataFrame
		DataFrame of preprocessed covariates.
	"""
	cts_covs = []
	discrete_covs = []
	for c in data.columns:
		if np.all(data[c].apply(floatable)):
			cts_covs.append(c)
		else:
			discrete_covs.append(c)

	# Get dummies and return
	return pd.get_dummies(
		data[discrete_covs + cts_covs],
		columns=discrete_covs,
		dummy_na=True,
		drop_first=True,
	).astype(float)

def preprocess_clusters(clusters):
	"""
	Parameters
	----------
	clusters : np.array
		n-length array of cluster indicators.

	Returns
	-------
	new_clusters : np.array
		n-length array of clusters which takes values
		from 0 to len(np.unique(clusters))-1.
	"""
	new_clusters = np.zeros(clusters.shape)
	for i, x in enumerate(np.sort(np.unique(clusters))):
		new_clusters[clusters == x] = i
	return new_clusters.astype(int)

def cluster_bootstrap_se(
	data: np.array,
	clusters: Optional[callable]=None,
	func: Optional[callable]=None,
	B: int=1000,
	verbose: bool=False,
):
	"""
	Computes clustered bootstrap on func(data).

	Parameters
	----------
	data : np.array
		array whose zeroth axis reflects the number of observations n.
		Typically a n-shaped or (n,d)-shaped array.
	func : callable
		A callable that maps an np.array to a scalar or array.
		Defaults to ``func=lambda x: x.mean()``.
	clusters : np.array
		n-shaped array where clusters[i] = j means that observation
		i is in the jth cluster.
	B : int
		Number of bootstrap draws to use. Ignored unless func or
		clusters are provided.
	verbose : bool
		If True, provides a progress bar.

	Returns
	-------
	se : float | np.array
		standard error of the estimator.
	bs_ests : np.array
		Array of bootstrapped estimators.
	"""
	# parse function
	n = len(data)
	if func is None:
		func = lambda x: x.mean()
	# Non-clustered case
	bs_ests = []
	if clusters is None:
		for b in vrange(B, verbose=verbose):
			inds = np.random.choice(n, n, replace=True)
			bs_ests.append(func(data[inds]))
	# Clustered case
	else:
		# Preprocess clusters
		clusters = preprocess_clusters(clusters).astype(int)
		n_clusters = len(np.unique(clusters))
		# data_clusters[j] = observations from cluster j
		data_clusters = [
			data[clusters==i] for i in range(n_clusters)
		]
		# Resample clusters uniformly at random
		for b in vrange(B, verbose=verbose):
			bs_clusters = np.random.choice(clusters, n_clusters, replace=True)
			new_data = np.concatenate(
				[data_clusters[i] for i in bs_clusters], axis=0
			)
			bs_ests.append(func(new_data))
	bs_ests = np.stack(bs_ests, axis=0)
	return bs_ests.std(axis=0), bs_ests

[docs] def compute_est_bounds( summands: np.array, clusters: Optional[callable]=None, func: Optional[callable]=None, B: int=1000, alpha: float=0.05, ): """ Helper to computes confidence intervals. Specifically, computes intervals for ``func(summands[k].mean(axis=-1))`` Provides lower CI for k=0 and upper CI for k=1. Parameters ---------- summands : np.array (2, n)-shaped array or (2, n, d)-shaped array func : callable A callable that maps a 1D np.array to a scalar. Defaults to ``func=lambda x: x.mean()``. clusters : np.array n-shaped array where clusters[i] = j means that observation i is in the jth cluster. B : int Number of bootstrap draws to use. Ignored unless func or clusters are provided. alpha : float Nominal level. Returns ------- ests : np.array 2-shaped array of lower and upper estimators (sample mean). ses : np.array 2-shaped array of standard errors. bounds : np.array 2-shaped array of lower/upper confidence bounds. Notes ----- This is meant primarily for internal use in the DualBounds class. """ scale = stats.norm.ppf(1-alpha/2) n = summands.shape[1] # In the simplest case, use analytical bounds if func is None and clusters is None: ests = summands.mean(axis=1) ses = summands.std(axis=1) / np.sqrt(n) # else, use the bootstrap/delta method else: if func is None: func = lambda x: x.mean() new_func = lambda data: itemize(func(data.mean(axis=0))) # Estimators ests = np.array([new_func(summands[k]) for k in [0,1]]) ses = np.zeros(2) for k in [0,1]: ses[k] = itemize(cluster_bootstrap_se( data=summands[k], clusters=clusters, func=new_func, B=B, )[0]) return ests, ses, np.array([ ests[0] - scale * ses[0], ests[1] + scale * ses[1] ])
def _sort_disc_dist(vals, probs): """ Parameters ---------- vals : (n, nvals)-shaped array. probs : (n, nvals)-shaped array. probs.sum(axis=1) == 1 Returns ------- vals_new : (n, nvals) array ``vals`` sorted so that vals[i] is in increasing order. probs_new : (n, nvals) array ``probs`` sorted in the corresponding order as vals. """ inds = np.argsort(vals, axis=1) new_vals = np.take_along_axis(vals, inds, axis=1) new_probs = np.take_along_axis(probs, inds, axis=1) return new_vals, new_probs class ConstantDist: def __init__(self, loc=0, scale=1): self.loc = loc self.scale = scale def rvs(self, size): return self.loc + self.scale * np.ones(size)
[docs] class BatchedCategorical: """ Batched discrete (categorical) distribution. This class supports the small set of operations needed by the ``DualBounds`` class. Parameters ---------- vals : np.array (n, nvals)-shaped array of supports. probs : np.array (n, nvals)-shaped array of probabilities. """ def __init__( self, vals, probs ): # sort self.n, self.nvals = vals.shape self.vals, self.probs = _sort_disc_dist(vals=vals, probs=probs) self.cumprobs = np.cumsum(self.probs, axis=1) # validate args if np.any(self.probs < 0): raise ValueError("probs must be nonnegative") if np.any(np.abs(self.probs.sum(axis=1)-1) > 1e-5): raise ValueError("probs.sum(axis=1) must equal 1")
[docs] @classmethod def from_scipy( cls, scipy_bernoulli ): """ Instantiates distribution from a 1D scipy.stats.bernoulli object. Parameters ---------- scipy_dist : scipy.stats.bernoulli """ # Probabilities probs = scipy_bernoulli.mean() # Values n = len(probs) vals = np.zeros((n, 2)); vals[:, 1] += 1 # Return return cls( vals=vals, probs=np.stack([1-probs, probs], axis=1) )
[docs] @classmethod def from_binary_probs( cls, probs ): """ Instantiates distribution for a binary-valued variable. Parameters ---------- probs : np.array n-length array of P(Yi = 1) """ n = len(probs) probs_stacked = np.stack([1-probs, probs], axis=1) vals = np.zeros((n, 2)) vals[:, 1] += 1 return cls(vals=vals, probs=probs_stacked)
[docs] def mean(self): """ Returns ------- mu : np.array n-length mean of discrete distributions. """ return np.sum(self.vals * self.probs, axis=1)
[docs] def ppf(self, q): """ Parameters ---------- q : np.array (m,n)-shaped array of desired quantiles, between 0 and 1. Returns ------- quantile_values : np.array The desired quantile values of the categorical distributions. """ m = q.shape[0] # To be compatible with scipy, if q is (m, 1) # change it to (m,) if len(q.shape) == 2: if q.shape[1] == 1: q = q[:, 0] # Vectorize if len(q.shape) == 1: q = np.stack([q for _ in range(self.n)], axis=1) # use a for loop to save memory qvals = np.zeros((m, self.n)) for i in range(self.n): flags = self.cumprobs[i].reshape(self.nvals, 1) >= q[:, i] qvals[:, i] = self.vals[i][np.argmax(flags, axis=0)] return qvals
[docs] def rvs(self): """ Returns ------- y : array n-length array of a single RV drawn from each categorical. """ u = np.random.uniform(size=(self.n, 1)) inds = (u < self.cumprobs).argmax(axis=1) return self.vals[(np.arange(self.n), inds)]
def _adjust_support_size_unbatched( vals, probs, new_nvals, ymin, ymax ): """ Adjust categorical distribution to have a support of size new_nvals. Parameters ---------- vals : np.array nvals-length array of support. must be sorted. probs : np.array nvals-length array of probabilities new_nvals : int Desired size of support. ymin : float Minimum value for support ymax : float Maximum value for support """ nvals = len(vals) vals = vals.copy(); probs = probs.copy() if np.any(vals != np.sort(vals)): raise ValueError("vals must be sorted") # Case 0: no changes if nvals == new_nvals: return vals, probs # Case 1: just add padding elif nvals < new_nvals: pad = np.random.uniform( ymin, ymax, size=new_nvals - nvals ) vals = np.concatenate([vals, pad], axis=0) probs = np.concatenate([probs, np.zeros(new_nvals - nvals)], axis=0) return vals, probs # Case 2: greedily merge support points else: probs = np.maximum(probs, 1e-8) probs /= probs.sum() while len(vals) > new_nvals: # Probability of equaling the left merge mixtures = probs[:-1] / (probs[:-1] + probs[1:]) # candidate values for the post-merge support point candvals = vals[1:] * (1 - mixtures) + vals[:-1] * mixtures # E[|X - Y|] under best coupling costs = (candvals - vals[1:])**2 * probs[1:] costs += (candvals - vals[:-1])**2 * probs[:-1] # pick the best variant istar = np.argmin(costs) vals[istar] = np.nan vals[istar + 1] = candvals[istar] probs[istar + 1] += probs[istar] probs[istar] = np.nan # drop na vals vals = vals[~np.isnan(vals)] probs = probs[~np.isnan(probs)] return vals, probs
[docs] def adjust_support_size( vals, probs, new_nvals, ymin, ymax ): """ Adjust categorical distribution to have support of size ``new_nvals``. Parameters ---------- vals : np.array (n, nvals)-length array of support. vals[i] must be sorted for each i. probs : np.array (n, nvals)-length array of probabilities of original distributions. new_nvals : int Desired size of support. ymin : float Minimum value for support. ymax : float Maximum value for support. Returns ------- new_vals : np.array (n, new_nvals)-array of supports. new_probs : np.array (n, new_nvals)-array of probabilities. """ n, nvals = vals.shape new_vals = np.zeros((n, new_nvals)) new_probs = np.zeros((n, new_nvals)) for i in range(n): new_vals[i], new_probs[i] = _adjust_support_size_unbatched( vals[i], probs[i], new_nvals, ymin=ymin, ymax=ymax, ) return new_vals, new_probs
[docs] def weighted_quantile(values, weights, quantiles, _old_style=True): """ Very close to numpy.percentile, but supports weights. Parameters ---------- values : np.array n-length array of data weights : np.array n-length array of sample weights quantiles : array-like d-length array of desired quantiles Returns ------- results : np.array d-length array of weighted quantiles. """ if np.any(quantiles < 0) or np.any(quantiles > 1): raise ValueError("Quantiles must be in [0,1]") # Sort values sorter = np.argsort(values) values = values[sorter] weights = weights[sorter] # Compute quantiles cdf = np.cumsum(weights) - 0.5 * weights if _old_style: cdf -= cdf[0] cdf /= cdf[-1] else: cdf /= np.sum(weights) # Interp + return return np.interp(quantiles, cdf, values)
def parse_dist( dist, loc=0, scale=1, mu=None, sd=None, **kwargs ): ## variant 1 based on mu/sd dist = dist.lower() if mu is not None and sd is not None and dist != 'bernoulli': # get SD correct temp = parse_dist(dist, loc=0, scale=sd, **kwargs) scale = sd / temp.std() temp = parse_dist(dist, loc=0, scale=sd*scale, **kwargs) shift = mu - temp.mean() return parse_dist(dist, loc=shift, scale=sd*scale, **kwargs) elif mu is not None and sd is not None and dist == 'bernoulli': # Note: Cannot enforce mean/sd for Bernoulli dist. # So we just use loc/scale. loc = copy.copy(mu) scale = copy.copy(sd) mu = None sd = None elif mu is not None or sd is not None: raise ValueError("Must provide both of mu and sd, not just one.") ## variant 2 based on location/scale # sometimes return a regular dist object if not isinstance(dist, str): return dist # Parse if dist == 'constant': return ConstantDist(loc=loc, scale=scale) if dist == 'bernoulli': prob = np.exp(loc / scale) prob = prob / (1 + prob) return stats.bernoulli(p=prob) if dist in ['gaussian', 'normal']: return stats.norm(loc=loc, scale=scale) if dist == 'invchi2': df = kwargs.pop("df", 4) return stats.chi2(df=df, loc=loc, scale=scale/df) if dist == 't' or dist == 'tdist': df = kwargs.pop("df", 4) return stats.t(loc=loc, scale=scale, df=df) if dist == 'cauchy': return stats.cauchy(loc=loc, scale=scale) if dist == 'laplace': return stats.laplace(loc=loc, scale=scale) if dist in ['uniform', 'unif']: return stats.uniform(loc=loc-scale/2, scale=scale) if dist in ['expo', 'expon', 'exponential']: return stats.expon(loc=loc-scale, scale=scale) if dist == 'gamma': a = kwargs.pop("a", 5) return stats.gamma(loc=loc-a*scale, scale=scale, a=a) if dist == 'skewnorm': a = kwargs.pop("a", 5) return stats.skewnorm(loc=loc, scale=scale, a=a) if dist == 'powerlaw': a = kwargs.pop("a", 5) return stats.powerlaw(loc=loc, scale=scale, a=a) else: raise ValueError(f"Dist {dist} is unrecognized.")