forked from snyderg2/car_price_nnet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
NeuralNetworkTorch.py
185 lines (145 loc) · 6.43 KB
/
NeuralNetworkTorch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
import numpy as np
import matplotlib.pyplot as plt
import torch
import sys
import pandas
from IPython.display import display
import seaborn as sns
class NeuralNetworkTorch(torch.nn.Module):
def __init__(self, n_inputs, n_hiddens_list, n_outputs, device='cpu'):
super().__init__()
self.n_inputs = n_inputs
self.n_hiddens_list = n_hiddens_list
self.n_outputs = n_outputs
self.device = device
self.n_layers = len(n_hiddens_list) + 1
self.layers = torch.nn.ModuleList()
for n_units in n_hiddens_list:
self.layers.append(self._make_tanh_layer(n_inputs, n_units))
n_inputs = n_units
self.layers.append(torch.nn.Linear(n_inputs, n_outputs))
self.stand_params = None
self.error_trace = []
self.error_trace_val = []
def _make_tanh_layer(self, n_inputs, n_units):
return torch.nn.Sequential(torch.nn.Linear(n_inputs, n_units),
torch.nn.Tanh())
def __repr__(self):
return f'NeuralNetworkTorch({self.n_inputs}, {self.n_hiddens_list}, {self.n_outputs}, device={self.device})'
def forward(self, Xst):
Ys = [Xst]
for layer in self.layers:
Ys.append(layer(Ys[-1]))
return Ys[1:] # remove X from Ys
def train(self, Xtrain, Ttrain, n_epochs=10, learning_rate=0.01, method='adam', verbose=True, Xval=None, Tval=None):
if isinstance(Xtrain, np.ndarray):
Xtrain = torch.from_numpy(Xtrain.astype(np.float32))
if isinstance(Ttrain, np.ndarray):
Ttrain = torch.from_numpy(Ttrain.astype(np.float32))
if Xval is not None:
if isinstance(Xval, np.ndarray):
Xval = torch.from_numpy(Xval.astype(np.float32))
if isinstance(Tval, np.ndarray):
Tval = torch.from_numpy(Tval.astype(np.float32))
self.stand_params = self.calc_standardize_parameters(Xtrain, Ttrain)
Xtrain = self.standardize_X(Xtrain)
Ttrain = self.standardize_T(Ttrain)
if Xval is not None:
Xval = self.standardize_X(Xval)
Tval = self.standardize_T(Tval)
if method == 'sgd':
optimizer = torch.optim.SGD(self.parameters(), lr=learning_rate)
elif method == 'adam':
optimizer = torch.optim.Adam(self.parameters(), lr=learning_rate)
else:
print('train: method must be \'sgd\', or \'adam\'.')
error_f = torch.nn.MSELoss()
self.best_epoch = None
best_mse = None
best_weights = self.get_all_weights()
for epoch in range(n_epochs):
Ytrain = self.forward(Xtrain)[-1] # to get last layer output
mse = error_f(Ytrain, Ttrain)
optimizer.zero_grad()
mse.backward()
optimizer.step()
self.error_trace.append(mse.sqrt())
if Xval is not None:
Yval = self.forward(Xval)[-1]
mse_val = error_f(Yval, Tval)
self.error_trace_val.append(mse_val.sqrt())
if best_mse is None or mse_val < best_mse:
best_mse = mse_val
best_weights = self.get_all_weights()
self.best_epoch = epoch
if verbose and ((epoch+1) % (n_epochs // 10) == 0 or epoch == n_epochs - 1):
if Xval is not None:
print(f'Epoch {epoch+1} RMSE train {self.error_trace[-1]:.4f} val {self.error_trace_val[-1]:.4f}')
else:
print(f'Epoch {epoch+1} RMSE {self.error_trace[-1]:.4f}')
if Xval is not None:
self.set_all_weights(best_weights)
return self
def use(self, X, return_hidden_layer_outputs=False):
if isinstance(X, np.ndarray):
X = torch.from_numpy(X.astype(np.float32))
Xst = self.standardize_X(X)
Ys = self.forward(Xst)
Y = Ys[-1]
Y = self.unstandardize_T(Y)
Zs = Ys[:-1]
Y = Y.detach().cpu().numpy()
Zs = [Z.detach().cpu().numpy() for Z in Zs]
return (Y, Zs) if return_hidden_layer_outputs else Y
def get_error_trace(self):
return self.error_trace
def get_error_traces(self):
return self.error_trace, self.error_trace_val, self.best_epoch
def calc_standardize_parameters(self, X, T):
Xmeans = X.mean(axis=0)
Xstds = X.std(axis=0)
Xstds[Xstds == 0] = Xstds[Xstds > 0].mean(axis=0)
if T is None:
return {'Xmeans': Xmeans, 'Xstds': Xstds}
else:
Tmeans = T.mean(axis=0)
Tstds = T.std(axis=0)
return {'Xmeans': Xmeans, 'Xstds': Xstds, 'Tmeans': Tmeans, 'Tstds': Tstds}
def standardize_X(self, X):
return (X - self.stand_params['Xmeans']) / self.stand_params['Xstds']
def unstandardize_X(self, Xst):
return Xst * self.stand_params['Xstds'] + self.stand_params['Xmeans']
def standardize_T(self, T):
return (T - self.stand_params['Tmeans']) / self.stand_params['Tstds']
def unstandardize_T(self, Tst):
return Tst * self.stand_params['Tstds'] + self.stand_params['Tmeans']
def get_Ws(self):
Ws = []
for layer in self.layers:
W_and_bias = list(layer.parameters())
W = W_and_bias[0].detach().numpy()
Wbias = W_and_bias[1].detach().numpy().T.reshape(1, -1)
if W.ndim == 4:
W = np.moveaxis(W, 0, 3) # first dim is units. Move it to last, fourth, dim
n_units = Wbias.shape[-1]
W = W.reshape(-1, n_units)
else:
W = W.T
Ws.append(np.vstack((Wbias, W)))
return Ws
def get_all_weights(self):
return torch.nn.utils.parameters_to_vector(self.parameters())
def set_all_weights(self, all_weights):
torch.nn.utils.vector_to_parameters(all_weights, self.parameters())
if __name__ == "__main__":
X = np.arange(10).reshape(-1, 1)
T = X * 0.1
nnet.stand_params = nnet.calc_standardize_parameters(X, T)
#def __init__(self, n_inputs, n_hiddens_list, n_outputs, device='cpu'):
nnet = NeuralNetworkTorch(1, [10], 1)
#def train(self, X, T, n_epochs, learning_rate=0.01, method='adam', verbose=True):
nnet.train(X, T, 100)
#predicts the output T
#use(self, X, return_hidden_layer_outputs=False):
results = nnet.use(X)
plt.plot(nnet.error_trace)