-
Notifications
You must be signed in to change notification settings - Fork 0
/
AIT_condition.py
146 lines (113 loc) · 4.78 KB
/
AIT_condition.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import numpy as np
import pandas as pd
import rpy2.robjects as robjects
import os
from rpy2.robjects import pandas2ri
import indTest.HSIC2 as fasthsic
from rpy2.robjects import r
import rpy2.robjects.packages as rpackages
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor
def AIT_test(data, Z, **params):
alpha = params.get('alpha', 10 / data.shape[0])
verbose = params.get('verbose', False)
relation = params.get('relation', 'nonlinear')
indexs = list(data.columns)
if 'Treatment' not in indexs or 'Outcome' not in indexs:
print('Please ensure the input is the variable of data!')
exit(-1)
if any(col.startswith('W') for col in data.columns): # Determine whether covariates exist.
if verbose: print("There are covariates.")
if relation == 'linear':
A, Z_data = linear_get_A_with_W(data, Z)
else:
A, Z_data = cf_with_W(data, Z)
else:
if verbose: print("There are no covariates.")
if relation == 'linear':
A = linear_get_A(data, Z)
Z_data = data[Z].values.reshape(-1, 1)
else:
A, Z_data = cf_no_W(data, Z)
pValue_Z = fasthsic.test(A, Z_data, alpha=alpha, verbose=verbose)
if pValue_Z < alpha :
valid_IV = False
else:
valid_IV = True
return {'IV_validity':valid_IV,'pValue_Z': pValue_Z}
def linear_get_A(df, Z):
X_data = df['Treatment'].values.reshape(-1)
Y_data = df['Outcome'].values.reshape(-1)
Z_data = df[Z].values.reshape(-1)
cov_YZ_given_W = np.cov(Y_data, Z_data)[0, 1]
cov_XZ_given_W = np.cov(X_data, Z_data)[0, 1]
if cov_XZ_given_W == 0:
raise ValueError("Covariance of X and Z is zero, cannot divide by zero")
f_hat = cov_YZ_given_W / cov_XZ_given_W
A = Y_data - f_hat * X_data
if len(A.shape) == 1 : A = A.reshape(-1, 1)
return A
def linear_get_A_with_W(data, Z):
X_data = data['Treatment'].values.reshape(-1, 1)
Y_data = data['Outcome'].values.reshape(-1, 1)
Z_data = data[Z].values.reshape(-1, 1)
W_data = data.filter(like='W').values
# Linear regression
model_YW = LinearRegression().fit(W_data, Y_data)
residual_Y = Y_data - model_YW.predict(W_data)
model_XW = LinearRegression().fit(W_data, X_data)
residual_X = X_data - model_XW.predict(W_data)
model_ZW = LinearRegression().fit(W_data, Z_data)
residual_Z = Z_data - model_ZW.predict(W_data)
if len(residual_Y.shape) == 2: residual_Y = residual_Y.reshape(-1)
if len(residual_X.shape) == 2: residual_X = residual_X.reshape(-1)
if len(residual_Z.shape) == 2: residual_Z = residual_Z.reshape(-1)
cov_YZ_given_W = np.cov(residual_Y, residual_Z)[0, 1]
cov_XZ_given_W = np.cov(residual_X, residual_Z)[0, 1]
if cov_XZ_given_W == 0:
raise ValueError("Covariance of X and Z is zero, cannot divide by zero")
f_hat = cov_YZ_given_W / cov_XZ_given_W
A = residual_Y - f_hat * residual_X
if len(A.shape) == 1: A = A.reshape(-1, 1)
return A, residual_Z.reshape(-1, 1)
def cf_no_W(data, Z):
if not rpackages.isinstalled('readxl'):
rpackages.importr('readxl')
if not rpackages.isinstalled('Formula'):
rpackages.importr('Formula')
# path = os.path.join('control_IV/controlfunctionIV-main/R/using_cf.R')
robjects.r.source('pretest.R')
robjects.r.source('cf.R')
path = os.path.join('using_cf.R')
robjects.r.source(path)
pandas2ri.activate()
r_dataframe = pandas2ri.py2rpy(data)
result = robjects.r.using_R_cf_no_W(r_dataframe, Z)
A = np.array(result).reshape(-1, 1)
Z_data = data[Z].values.reshape(-1, 1)
return A, Z_data
def cf_with_W(data, Z):
if not rpackages.isinstalled('readxl'):
rpackages.importr('readxl')
if not rpackages.isinstalled('Formula'):
rpackages.importr('Formula')
# path = os.path.join('control_IV/controlfunctionIV-main/R/using_cf.R')
robjects.r.source('pretest.R')
robjects.r.source('cf.R')
path = os.path.join('using_cf.R')
robjects.r.source(path)
pandas2ri.activate()
r_dataframe = pandas2ri.py2rpy(data)
result = robjects.r.using_R_cf_with_W(r_dataframe, Z)
A = np.array(result).reshape(-1, 1)
W_data = data.filter(like='W')
residual_Z = random_forest_residuals(data[Z], W_data)
return A, residual_Z.reshape(-1, 1)
def random_forest_residuals(Z_data, Ws_data):
combined_data = pd.concat([Z_data, Ws_data], axis=1).dropna()
dependent_var_clean = combined_data.iloc[:, 0]
independent_vars_clean = combined_data.iloc[:, 1:]
model = RandomForestRegressor(n_estimators=100)
model.fit(independent_vars_clean, dependent_var_clean.values.ravel())
residuals = dependent_var_clean.values.ravel() - model.predict(independent_vars_clean)
return residuals