Source code for dualbounds.gen_data

"""
Generate synthetic data for tests and illustrations.
"""

import numpy as np
from scipy import stats
from scipy.special import logsumexp
from .utilities import parse_dist, BatchedCategorical
from typing import Optional, Union

def heteroskedastic_scale(X, heterosked='constant'):
	n, p = X.shape
	if heterosked == 'constant' or heterosked == 'none':
		scale = np.ones(n)
	elif heterosked == 'linear':
		scale = np.abs(X[:, 0])
	elif heterosked == 'norm':
		scale = np.power(X, 2).sum(axis=-1)
	elif heterosked == 'invnorm':
		scale = 1 / (np.power(X, 2).sum(axis=-1))
	elif heterosked == 'exp_linear':
		scale = np.sqrt(np.exp(X[:, 0] + X[:, 1]))
	else:
		raise ValueError(f"Unrecognized heterosked={heterosked}")

	# normalize to ensure Var(epsilon) = 1 marginally
	scale /= np.sqrt(np.power(scale, 2).sum() / n)
	return scale

def create_cov(p, covmethod='identity'):
	covmethod = str(covmethod).lower()
	if covmethod == 'identity':
		return np.eye(p)
	else:
		raise ValueError(f"Unrecognized covmethod={covmethod}")

def _sample_norm_vector(dim, norm, sparsity=0):
	""" samples x ~ N(0, dim) and then normalizes so |x|_2 = norm """
	if norm == 0:
		return np.zeros(dim)
	# Possibly select sparse subset
	if sparsity > 0:
		x = np.zeros(dim)
		n_nonzero = int(max(1, np.ceil((1-sparsity) * dim)))
		x[np.random.choice(dim, n_nonzero, replace=False)] = _sample_norm_vector(
			dim=n_nonzero, norm=norm, sparsity=0,
		)
		return x
	# Else create a dense vector
	else:
		x = np.random.randn(dim)
		return x / np.sqrt(np.power(x, 2).sum() / norm)

[docs] def gen_regression_data( n: int, p: int, lmda_dist: str='constant', eps_dist: str='gaussian', heterosked: str='constant', tauv: float=1, # Var(Y(1) | X ) / Var(Y(0) | X) r2: float=0.95, sparsity: float=0, interactions: bool=True, tau: float=3, betaW_norm: float=0, covmethod: str='identity', dgp_seed: int=1, sample_seed: Optional[int]=None, ): """ Samples a synthetic regression dataset. Parameters ---------- n : int Number of observations. p : int Number of covariates lmda_dist : str str specifying the distribution of lmdai, where Xi = lmdai * N(0, Sigma), so the covariates are elliptically distributed. eps_dist : str str specifying the distribution of the residuals. See ``utilities.parse_dist`` for the list of options. heterosked : str str specifying the type of heteroskedasticity. Defaults to ``constant``. tauv : float Ratio of Var(Y(1) | X) / Var(Y(0) | X) r2 : float Population r^2 of 1 - E[Var(Y | X)] / Var(Y). sparsity : float Proportion of covariates with zero coefficients. Defaults to zero (no sparsity). interactions : bool If True (default), Y = X beta + W * X * beta_int + epsilon. Else, the interactions between the treatment and the covariates are ommitted. tau : float Average treatment effect. betaW_norm : float E[W | X] = logistic(X @ betaW). This parameter controls the norm of betaW and thus the variance of the propensity scores. covmethod : str str identifier for how to generate the covariance matrix. dgp_seed : int Random seed for the data-generating parameters. sample_seed : int Random seed for the randomness from sampling. Returns ------- data : dict Dictionary with keys ``X`` (covariates), ``y`` (response), ``W`` (treatment), ``pis`` (true propensity scores), ``beta``, ``beta_int``, ``betaW``, and more. """ # create parameters np.random.seed(dgp_seed) Sigma = create_cov(p=p, covmethod=covmethod) # Create beta beta_norm = r2 / (1 - r2) if r2 > 0 else 0 # No interactions with treatment if not interactions: beta = _sample_norm_vector(p, beta_norm, sparsity=sparsity) beta_int = np.zeros(p) # interactions with treatment else: beta = _sample_norm_vector(2*p, beta_norm, sparsity=sparsity) beta_int = beta[p:].copy() beta = beta[0:p].copy() # Create beta_W (for W | X) betaW = _sample_norm_vector(p, betaW_norm, sparsity=sparsity) # sample X np.random.seed(sample_seed) X = np.random.randn(n, p) L = np.linalg.cholesky(Sigma) X = X @ L.T lmdas = parse_dist(lmda_dist).rvs(size=n) lmdas /= np.sqrt(np.power(lmdas, 2).mean()) X = X * lmdas.reshape(-1, 1) # sample W muW = X @ betaW pis = np.exp(muW) pis = pis / (1 + pis) # clip in truth pis = np.maximum(np.minimum(pis, 1-1e-3), 1e-3) W = np.random.binomial(1, pis) # conditional mean of Y mu = X @ beta # conditional variance of Y sigmas = heteroskedastic_scale(X, heterosked=heterosked) # allow for sigmas to depend on W sigmas0 = sigmas.copy() sigmas1 = tauv * sigmas.copy() denom = np.sqrt((np.mean(sigmas0**2) + np.mean(sigmas1**2)) / 2) sigmas0 /= denom; sigmas1 /= denom sigmas0 = np.maximum(1e-5, sigmas0) sigmas1 = np.maximum(1e-5, sigmas1) # Sample Y y0_dists = parse_dist( eps_dist, loc=mu, scale=sigmas0 ) cates = X @ beta_int + tau y1_dists = parse_dist( eps_dist, loc=mu+cates, scale=sigmas1 ) Y0 = y0_dists.rvs(); Y1 = y1_dists.rvs() Y = Y0.copy(); Y[W == 1] = Y1[W == 1] # return everything out = dict( X=X, W=W, y=Y, y0_dists=y0_dists, y1_dists=y1_dists, pis=pis, Sigma=Sigma, beta=beta, beta_int=beta_int, cates=y1_dists.mean() - y0_dists.mean(), betaW=betaW, ) # # for convenience # if eps_dist == 'bernoulli': # out.update(dict( # _y0_dists_4input=BatchedCategorical.from_scipy(y0_dists), # _y1_dists_4input=BatchedCategorical.from_scipy(y1_dists), # )) # else: # out.update(dict( # _y0_dists_4input=y0_dists, # _y1_dists_4input=y1_dists, # )) return out
[docs] def gen_lee_bound_data( stau: float=1, betaS_norm: float=1, **kwargs ): """ Generates synthetic datasets with selection bias. Parameters ---------- stau : float In the logistic regression of S on W and X, stau is the coefficient on W. betaS_norm : float norm of coefficients in logistic regression of S on W and X. """ # Generate regression data output = gen_regression_data(**kwargs) X, W = output['X'], output['W'] n, p = X.shape # create DGP for S | X np.random.seed(kwargs.get("dgp_seed", 1)) betaS = _sample_norm_vector(p, norm=betaS_norm) # sample S | X np.random.seed(kwargs.get("sample_seed", None)) muS0 = X @ betaS muS1 = muS0 + stau s0_probs = np.exp(muS0) / (1 + np.exp(muS0)) s1_probs = np.exp(muS1) / (1 + np.exp(muS1)) S0 = np.random.binomial(1, s0_probs) S1 = np.random.binomial(1, s1_probs) S = S0.copy(); S[W == 1] = S1[W == 1] # save and return for key, val in zip( ['s0_probs', 's1_probs', 'S'], [s0_probs, s1_probs, S] ): output[key] = val return output
def gen_iv_data( n: int, p: int, betaZ_norm: float=0, betaW_norm: float=0.5, r2: float=0.95, sparsity: float=0, eps_dist: str='gaussian', lmda_dist: str='constant', interactions: bool=True, tau: float=3, tauv: float=1, tauZ: float=1, tau_conf: float=0, heterosked: str='constant', dgp_seed: int=1, sample_seed: Optional[int]=None, ): """ Generates IV data. Parameters ---------- n : int Number of observations. p : int Number of covariates lmda_dist : str str specifying the distribution of lmdai, where Xi = lmdai * N(0, Sigma), so the covariates are elliptically distributed. eps_dist : str str specifying the distribution of the residuals. See ``utilities.parse_dist`` for the list of options. heterosked : str str specifying the type of heteroskedasticity. Defaults to ``constant``. tau : float Average treatment effect. tauv : float Ratio of Var(Y(1) | X) / Var(Y(0) | X) r2 : float Population r^2 of 1 - E[Var(Y | X)] / Var(Y). sparsity : float Proportion of covariates with zero coefficients. Defaults to zero (no sparsity). interactions : bool If True, there are interactions in the regression of Y on X. betaZ_norm: float Norm of beta in logistic regression of Z on X. betaW_norm : float Norm of beta in logistic regression of W on X, Z. tauZ : float Coefficient of Z in logistic regression of W on X, Z. dgp_seed : int Random seed for the data-generating parameters. sample_seed : int Random seed for the randomness from sampling. Returns ------- data_dictionary : dict Dictionary of various data-generating parameters plus raw data. """ # create parameters np.random.seed(dgp_seed) # Create beta beta_norm = r2 / (1 - r2) if r2 > 0 else 0 # No interactions with treatment if not interactions: beta = _sample_norm_vector(p, beta_norm, sparsity=sparsity) beta_int = np.zeros(p) # interactions with treatment else: beta = _sample_norm_vector(2*p, beta_norm, sparsity=sparsity) beta_int = beta[p:].copy() beta = beta[0:p].copy() # Create beta_W (for W | X) betaW = _sample_norm_vector(p, betaW_norm, sparsity=sparsity) betaZ = _sample_norm_vector(p, betaZ_norm, sparsity=sparsity) # sample X np.random.seed(sample_seed) X = np.random.randn(n, p) lmdas = parse_dist(lmda_dist).rvs(size=n) lmdas /= np.sqrt(np.power(lmdas, 2).mean()) X = X * lmdas.reshape(-1, 1) ### Sample Z | X muZ = X @ betaZ pis = np.exp(muZ) pis = pis / (1 + pis) # clip in truth pis = np.maximum(np.minimum(pis, 1-1e-3), 1e-3) Z = np.random.binomial(1, pis).astype(int) ### Sample W | Z, X. Here U are meant to be unmeasured. wprobs = np.stack([X @ betaW, X @ betaW + tauZ], axis=1) wprobs = np.exp(wprobs) / (1 + np.exp(wprobs)) U = np.random.uniform(size=n) W01 = (U.reshape(n, 1) <= wprobs).astype(int) W = W01[(np.arange(n), Z)] ### Sample Y | Z, W, X ## (a) conditional variance of Y sigmas = heteroskedastic_scale(X, heterosked=heterosked) # allow for sigmas to depend on W sigmas0 = sigmas.copy() sigmas1 = tauv * sigmas.copy() denom = np.sqrt((np.mean(sigmas0**2) + np.mean(sigmas1**2)) / 2) sigmas0 /= denom; sigmas1 /= denom ## (b) conditional mean of Y | X, W U mu0 = X @ beta + tau_conf * np.sign(U - 1/2) _y0_dists = parse_dist( eps_dist, mu=mu0, sd=sigmas0 ) cates = X @ beta_int + tau mu1 = mu0 + cates _y1_dists = parse_dist( eps_dist, mu=mu1, sd=sigmas1 ) Y0 = _y0_dists.rvs(); Y1 = _y1_dists.rvs() Y = Y0.copy(); Y[W == 1] = Y1[W == 1] ## (c) ydists for external use. # ydists[z][w] are the laws of Y(w) | W(z) = w ydists = [[], []] if tau_conf != 0: warnings.warn("Oracle ydists not available when tau_conf != 0; need to implement ScipyMixture.") else: for z in [0,1]: for w, muY, sigmaY in zip([0,1], [mu0, mu1], [sigmas0, sigmas1]): ydists[z].append(parse_dist(eps_dist, mu=muY, sd=sigmaY)) return dict( y=Y, W=W, Z=Z, X=X, pis=pis, cates=_y1_dists.mean() - _y0_dists.mean(), # betas beta=beta, beta_int=beta_int, betaZ=betaZ, betaW=betaW, # distributions wprobs=wprobs, ydists=ydists, ) # class ScipyMixture: # """ # A finite mixture of scipy distributions. # Parameters # --------- # distributions : rv_generic # A list of scipy distributions of shape (n,). # proportions : np.array # The mixture proportions. Must sum to 1. # Notes # ----- # This is only used within the ``gen_iv_data`` # function to calculate the law of Y(w) | W(z) = w # when tau_conf != 0. # """ # def __init__( # self, distributions, proportions # ): # self.dists = distributions # self.props = proportions # if not np.allclose(np.sum(self.props), 1): # raise ValueError("proportions must sum to 1") # def mean(self): # """ # Returns # ------- # mu : np.array # n-shaped array of means of mixture distribution. # """ # mus = np.stack([d.mean() * p for d, p in zip(self.dists, self.props)], axis=0) # return np.sum(mus, axis=0) # def cdf(self, x): # """ # Parameters # ---------- # x : np.array # Inputs to CDF. # Returns # ------- # cdf : np.array # CDF realized values. # """ # cdfs = np.stack([d.cdf() * p for d, p in zip(self.dists, self.props)], axis=0) # return np.sum(cdfs, axis=0) # def ppf(self, qs): # """ # Parameters # ---------- # qs : np.array # Inputs to quantile function. # Returns # ------- # vals : np.array # Quantile function evaluated at qs. # """ # pass