forked from YthanZhang/TDX-to-vnpy-converter
-
Notifications
You must be signed in to change notification settings - Fork 0
/
converter.py
144 lines (107 loc) · 3.98 KB
/
converter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
from datetime import datetime, timedelta, time
import pandas as pd
exchange_df = pd.read_csv("exchange_list.csv", encoding="GBK")
def get_exchange_from_symbol(symbol: str) -> str:
symbol = symbol[:-2]
exchange_row = exchange_df.loc[exchange_df["交易代码"] == symbol]
if not exchange_row.empty:
return exchange_row.iloc[0, 2]
elif len(symbol) > 4:
symbol = symbol[:4]
if symbol == "1000":
return "SSE"
elif symbol == "9000":
return "SZSE"
return "None"
def symbol_transform(original: str, exchange: str) -> str:
uppercase_exchange = ["CZCE", "CFFEX"]
should_lower = True
if exchange in uppercase_exchange:
should_lower = False
postfix = original[-1] * 2
symbol = original[:-2].lower() if should_lower else original[:-2].upper()
return symbol + postfix
def load_tdx(path: str) -> (str, str, list[list]):
with open(path, mode="r", encoding="GBK") as f:
text = f.read()
lines = text.split("\n")
symbol = lines[0].split()[0]
lines = lines[2:-2]
exchange = get_exchange_from_symbol(symbol)
grid = []
for line in lines:
grid.append(line.split())
symbol = symbol_transform(symbol, exchange)
return symbol, exchange, grid
def tdx_date_to_datetime(date: str) -> str:
return date + " 09:00:00"
def output_csv_daily(path: str, grid: list[list], symbol: str, exchange: str):
# df_map maps column name to grid column index
df_map = {"datetime": 0,
"open": 1,
"high": 2,
"low": 3,
"close": 4,
"volume": 5,
"open_interest": 6}
# df_dict match pandas dataframe with dictionary
df_dict = {}
for key in df_map.keys():
df_dict[key] = []
# append data to df_dict from grid
for row in grid:
for key in df_dict.keys():
if key == "datetime":
df_dict[key].append(tdx_date_to_datetime(row[df_map[key]]))
else:
df_dict[key].append(row[df_map[key]])
df_dict["symbol"] = [symbol for _ in
range(len(df_dict["open"]))]
df_dict["exchange"] = [exchange for _ in range(len(df_dict["open"]))]
# export csv file
df = pd.DataFrame(df_dict)
df.to_csv(path, index=False)
def tdx_date_time_to_datetime(date_str: str, time_str: str,
adjust_end_time=False) -> str:
dt_str = date_str + " " + time_str[:2] + ":" + time_str[2:] + ":00"
if not adjust_end_time:
return dt_str
dt = datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S")
if dt.time() >= time(21):
dt -= timedelta(days=1)
dt -= timedelta(minutes=1)
return dt.strftime("%Y-%m-%d %H:%M:%S")
def output_csv_minute(path: str, grid: list[list], symbol: str, exchange: str,
time_adjust: bool):
# df_map maps column name to grid column index
df_map = {
"datetime": [0, 1],
"open": 2,
"high": 3,
"low": 4,
"close": 5,
"volume": 6,
"open_interest": 7,
}
# df_dict match pandas dataframe with dictionary
df_dict = {}
for key in df_map.keys():
df_dict[key] = []
# append data to df_dict from grid
for row in grid:
for key in df_dict.keys():
if key == "datetime":
date = row[df_map[key][0]]
time = row[df_map[key][1]]
df_dict[key].append(
tdx_date_time_to_datetime(date, time, time_adjust))
else:
df_dict[key].append(row[df_map[key]])
df_dict["symbol"] = [symbol for _ in range(len(df_dict["open"]))]
df_dict["exchange"] = [exchange for _ in range(len(df_dict["open"]))]
# export csv file
df = pd.DataFrame(df_dict)
df.to_csv(path, index=False)
if __name__ == "__main__":
symbol_, exchange_, tdx_grid_ = load_tdx("tdx_1m/AGL9.txt")
output_csv_minute("csv_1m/AGL9.csv", tdx_grid_, symbol_, exchange_, True)