-
Notifications
You must be signed in to change notification settings - Fork 2
/
preprocess.py
111 lines (90 loc) · 5.24 KB
/
preprocess.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import os
import torch
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
class StockPreprocessor():
def __init__(self, stock_fns = ["aa.us.txt"], window_size = 250, train = 0.8, test = 0.2, sma_or_ema = 1, smoothing_window_size = 50):
self.stock_fns = stock_fns if type(stock_fns) is list else [stock_fns] # a user can pass in a single stock fn, or a list of stock fns, but make sure to always convert it to a list
self.WINDOW_SIZE = window_size
self.TRAIN = train
self.TEST = test
self.data = []
self.train_data = []
self.test_data = []
self.sma_or_ema = sma_or_ema # 0 = use Simple Moving Average, 1 = use Exponential Moving Average, any other number = else don't use either SMA or EMA
self.smoothing_window_size = smoothing_window_size
self.normalization_window_size = 1500
# iterate over all the stock files that belong to this dataset
for stock_fn in self.stock_fns:
# read in this stock's data into a pandas dataframe
path = os.path.join("data", "Stocks", stock_fn)
data_csv = pd.read_csv(path, header = 0).sort_values('Date')
close_prices = data_csv.loc[:, 'Close'].to_numpy()
# make sure this dataset belongs to the overall data variable belonging to this class so we can plot them later on
self.data.append(data_csv)
print("Num rows in {}: {}".format(stock_fn, len(data_csv)))
# extract training and testing windows, and concatenate them onto our already existing training and testing data
train_windows, test_windows = self.preprocess_stocks(close_prices)
self.train_data += train_windows
self.test_data += test_windows
def preprocess_stocks(self, stock_data):
# select training and testing data
train = stock_data[: int(self.TRAIN * len(stock_data))].reshape(-1, 1)
test = stock_data[int(self.TRAIN * len(stock_data)): ].reshape(-1, 1)
# scale the data between 0 and 1
# also, reshape the data and transform the test set
#scaler = MinMaxScaler()
#train = scaler.fit_transform(train).reshape(-1)
#test = scaler.transform(test).reshape(-1)
if self.sma_or_ema == 0: # perform simple moving average smoothing
train = self.simple_mov_avg(train)
test = self.simple_mov_avg(test)
elif self.sma_or_ema == 1: # perform exponential moving average smoothing
train = self.exp_mov_avg(train)
test = self.exp_mov_avg(test)
#scaler = MinMaxScaler()
#train = scaler.fit_transform(train).reshape(-1)
#test = scaler.transform(test).reshape(-1)
train_windows = self.create_windows(train)
test_windows = self.create_windows(test)
return train_windows, test_windows
# optional -- Exponential Moving Average (EMA)
def exp_mov_avg(self, stock_data):
EMA = 0.0
gamma = 2 / (self.smoothing_window_size + 1) # general formula = 2 / (window_size + 1) (e.g. 20 days = 0.0952, 50 days = 0.0392, and 100 days = 0.0198), typically 12-day or 26-days are used for short-term EMA and 50-day and 100-day are used for long-term EMA
for index in range(len(stock_data)):
EMA = gamma * stock_data[index] + (1 - gamma) * EMA
stock_data[index] = EMA
return stock_data
# optional -- Simple Moving Average (SMA)
def simple_mov_avg(self, stock_data):
smoothed_data = [np.average(stock_data[(i-self.smoothing_window_size):i]) for i in range(self.smoothing_window_size, len(stock_data)+1)]
smoothed_data = np.reshape(smoothed_data, (-1, 1))
return smoothed_data
def create_windows(self, stock_data):
output = []
#for index in range(len(stock_data) - self.WINDOW_SIZE - 1):
#for index in range(len(stock_data) - self.normalization_window_size - 1):
for index in range(len(stock_data) - self.WINDOW_SIZE - 1):
new_stock_data = stock_data[0 : (index + self.WINDOW_SIZE)]
scaler = MinMaxScaler()
scaler.fit(new_stock_data)
data_input = scaler.transform(new_stock_data[index : (index + self.WINDOW_SIZE)]).reshape(-1)
data_label = scaler.transform(stock_data[index + self.WINDOW_SIZE].reshape(1, -1))
'''new_stock_data = stock_data[index : (index + self.normalization_window_size)]
scaler = MinMaxScaler()
scaler.fit(new_stock_data)
data_input = scaler.transform(new_stock_data[(self.normalization_window_size - self.WINDOW_SIZE):]).reshape(-1)
data_label = scaler.transform(stock_data[index + self.normalization_window_size].reshape(1, -1))'''
'''data_input = stock_data[index : (index + self.WINDOW_SIZE)]
scaler = MinMaxScaler()
scaler.fit(data_input)
data_input = scaler.transform(data_input).reshape(-1)
data_label = scaler.transform(stock_data[index + self.WINDOW_SIZE].reshape(1, -1))'''
output.append((data_input, float(data_label)))
return output
def get_splits(self):
return self.train_data, self.test_data
def get_all_data(self):
return self.train_data + self.test_data