forked from vkobel/ethdrain
-
Notifications
You must be signed in to change notification settings - Fork 4
/
ethdrain.py
executable file
·316 lines (252 loc) · 12.1 KB
/
ethdrain.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
#!/usr/bin/python3
import asyncio
import json
import logging
import multiprocessing as mp
import argparse
import requests
import aiohttp
import psycopg2
import sqlalchemy
from elasticsearch import exceptions as es_exceptions
from elasticdatastore import ElasticDatastore
from postgresqldatastore import PostgreSQLDatastore
from csvdatastore import CSVDatastore
logging.basicConfig(filename='error_blocks.log', level=logging.ERROR)
class Ethdrain:
# Holds the list of datastore classes
data_store_classes = ()
sem_size = 256
# chunk_counter = 0
def __init__(self, block_range, block):
self.block = block
self.block_range = block_range
self.data_stores = list()
@classmethod
def load_datastore_classes(cls, *data_store_classes):
cls.data_store_classes = data_store_classes
@classmethod
def launch_history_sync(cls, block_range):
"""
This class method will instanciate Ethdrain classes (one per process)
and then instanciate and attach every datastore available to each on them
"""
# cls.chunk_counter = cls.chunk_counter + 1
# print("Chunk counter: {}".format(chunk_counter))
inst = cls(block_range, 0)
for data_class in cls.data_store_classes:
inst.data_stores.append(data_class())
inst.setup_process_history_sync()
@classmethod
def launch_continuous_sync(cls, block):
"""
This class method will instanciate Ethdrain classes (one per process)
and then instanciate and attach every datastore available to each on them
"""
inst = cls([], block)
for data_class in cls.data_store_classes:
inst.data_stores.append(data_class())
inst.setup_process_continuous_sync()
def setup_process_history_sync(self):
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(self.run_history_sync(self.block_range))
loop.run_until_complete(future)
# Now that everything has been "extracted", perform the "save" action
for data_store in self.data_stores:
msg = data_store.save()
print("{}: {}".format(data_store.__class__.__name__, msg))
def setup_process_continuous_sync(self):
# print("setup_process_continuous_sync")
loop = asyncio.get_event_loop()
asyncio.ensure_future(self.indexNext())
loop.run_forever()
async def fetch(self, session, block_nb):
try:
async with session.post(self.__class__.eth_url,
data=Ethdrain.make_request(block_nb),
headers={"content-type": "application/json"}) as response:
for data_store in self.data_stores:
data_store.extract(await response.json())
except (aiohttp.ClientError, asyncio.TimeoutError) as exception:
logging.error("block: " + str(block_nb))
print("Issue with block {}:\n{}\n".format(block_nb, exception))
async def sema_fetch(self, sem, session, block_nb):
async with sem:
await self.fetch(session, block_nb)
async def run_history_sync(self, block_range):
tasks = []
sem = asyncio.Semaphore(self.__class__.sem_size)
# Create client session that will ensure we dont open new connection
# per each request.
async with aiohttp.ClientSession() as session:
for block_nb in block_range:
# pass Semaphore and session to every POST request
task = asyncio.ensure_future(self.sema_fetch(sem, session, block_nb))
tasks.append(task)
await asyncio.gather(*tasks)
async def fetch2(self, session, block_nb):
# print("[fetch2] Fethcing block number #{}".format(block_nb))
try:
async with session.post(self.__class__.eth_url,
data=Ethdrain.make_request(block_nb),
headers={"content-type": "application/json"}) as response:
return await response.json()
except (aiohttp.ClientError, asyncio.TimeoutError) as exception:
logging.error("block: " + str(block_nb))
print("Issue with block {}:\n{}\n".format(block_nb, exception))
async def indexNext(self):
try:
async with aiohttp.ClientSession() as session:
response = await self.fetch2(session, self.block)
if response["result"] != None:
for data_store in self.data_stores:
data_store.actions = list()
data_store.extract(response)
msg = data_store.save()
print("{}: {}".format(data_store.__class__.__name__, msg))
self.block = self.block + 1
return asyncio.ensure_future(self.indexNext())
else:
# print("[indexNext] Waiting block #{} for next 5 seconds".format(self.block+1))
await asyncio.sleep(5)
return asyncio.ensure_future(self.indexNext())
except (aiohttp.ClientError, asyncio.TimeoutError) as exception:
logging.error("block: " + str(self.block))
print("Issue with block {}:\n{}\n".format(self.block, exception))
return asyncio.ensure_future(self.indexNext())
@staticmethod
def make_request(block_nb, use_hex=True):
return json.dumps({
"jsonrpc": "2.0",
"method": "eth_getBlockByNumber",
"params": [hex(block_nb) if use_hex else block_nb, True],
"id": 1
})
if __name__ == "__main__":
def http_post_request(url, request):
# print(str(url))
return requests.post(url, data=request, headers={"content-type": "application/json"}).json()
def chunks(lst, nb_chunks=250):
for i in range(0, len(lst), nb_chunks):
yield lst[i:i + nb_chunks]
# Elasticsearch maximum number of connections
ES_MAXSIZE = 25
# Elasticsearch default url
ES_URL = "http://localhost:9200"
# PostgreSQL default url
POSTRES_URL = "postgresql://postgres:postgres@localhost:5432/Ethereum"
# Ethereum RPC endpoint
ETH_URL = "http://localhost:8545"
# Size of multiprocessing Pool processing the chunks
POOL_SIZE = mp.cpu_count()-2
# Database output, may be one of this: ["postgres", "elasticsearch","csv"]
OUTPUT = "postgres"
OUTPUT_ELASTICSEARCH = "elasticsearch"
OUTPUT_POSTRES = "postgres"
OUTPUT_CSV = "csv"
BLOCK_WAIT = 10
parser = argparse.ArgumentParser()
parser.add_argument('-s', '--start', dest='start_block', type=int,
help='What block to start indexing. If nothing is provided, the latest block indexed will be used.')
parser.add_argument('-e', '--end', dest='end_block', type=int,
help='What block to finish indexing. If nothing is provided, the latest one will be used.')
parser.add_argument('-f', '--file', default=None,
help='Use an input file, each block number on a new line.')
parser.add_argument('-es', '--esurl', default=ES_URL,
help='The elasticsearch url and port. Accepts all the same parameters needed as a normal Elasticsearch client expects.')
parser.add_argument('-m', '--esmaxsize', default=ES_MAXSIZE,
help='The elasticsearch max chunk size.')
parser.add_argument('-pg', '--postgresurl', default=POSTRES_URL,
help='The PostgreSQL url and port. Accepts all the same parameters needed as a normal PostgreSQL client expects.')
parser.add_argument('-r', '--ethrpcurl', default=ETH_URL,
help='The Ethereum RPC node url and port.')
parser.add_argument('-o', '--output', default=OUTPUT,
help='System for output data from Ethereum (may be: "postgres", "elasticsearch","csv").')
args = parser.parse_args()
Ethdrain.eth_url = args.ethrpcurl
# Determine last block number if needed
if not args.end_block:
args.end_block = int(http_post_request(Ethdrain.eth_url,
Ethdrain.make_request("latest", False))["result"]["number"],
0) - BLOCK_WAIT
print("Last block automatically set to: {}".format(args.end_block))
if args.output == OUTPUT_ELASTICSEARCH:
# Determine start block number if needed
if not args.start_block:
try:
args.start_block = ElasticDatastore.find_start_block(args.esurl)
except (es_exceptions.NotFoundError, es_exceptions.RequestError):
args.start_block = 0
print("Start block automatically set to: {}".format(args.start_block))
if args.file:
with open(args.file) as f:
CONTENT = f.readlines()
BLOCK_LIST = [int(x) for x in CONTENT if x.strip() and len(x.strip()) <= 8]
else:
BLOCK_LIST = list(range(int(args.start_block), int(args.end_block)))
CHUNKS_ARR = list(chunks(BLOCK_LIST))
# Setup all datastores
ElasticDatastore.config(args.esurl, args.esmaxsize)
print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
print("~~~~~~~~~~ Ethdrain ~~~~~~~~~~")
print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
print("Processing {} blocks split into {} chunks on {} processes:".format(
len(BLOCK_LIST), len(CHUNKS_ARR), POOL_SIZE
))
ElasticDatastore.delete_replacement_rows(args.esurl, args.start_block)
Ethdrain.load_datastore_classes(ElasticDatastore)
elif args.output == OUTPUT_POSTRES:
# Determine start block number if needed
if not args.start_block:
try:
args.start_block = PostgreSQLDatastore.start_block(args.postgresurl)
except (sqlalchemy.exc.ProgrammingError, psycopg2.ProgrammingError):
args.start_block = 0
print("Start block automatically set to: {}".format(args.start_block))
if args.file:
with open(args.file) as f:
CONTENT = f.readlines()
BLOCK_LIST = [int(x) for x in CONTENT if x.strip() and len(x.strip()) <= 8]
else:
BLOCK_LIST = list(range(int(args.start_block), int(args.end_block)))
CHUNKS_ARR = list(chunks(BLOCK_LIST))
# Setup all datastores
PostgreSQLDatastore.config(args.postgresurl)
print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
print("~~~~~~~~~~ Ethdrain ~~~~~~~~~~")
print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
print("Processing {} blocks split into {} chunks on {} processes:".format(
len(BLOCK_LIST), len(CHUNKS_ARR), POOL_SIZE
))
PostgreSQLDatastore.delete_replacement_rows(args.postgresurl, args.start_block)
Ethdrain.load_datastore_classes(PostgreSQLDatastore)
elif args.output == OUTPUT_CSV:
# Determine start block number if needed
if not args.start_block:
try:
args.start_block = CSVDatastore.request(CSVDatastore.NAME_FILE_BLOCKS)
except FileNotFoundError:
args.start_block = 0
print("Start block automatically set to: {}".format(args.start_block))
if args.file:
with open(args.file) as f:
CONTENT = f.readlines()
BLOCK_LIST = [int(x) for x in CONTENT if x.strip() and len(x.strip()) <= 8]
else:
BLOCK_LIST = list(range(int(args.start_block), int(args.end_block)))
CHUNKS_ARR = list(chunks(BLOCK_LIST))
print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
print("~~~~~~~~~~ Ethdrain ~~~~~~~~~~")
print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
print("Processing {} blocks split into {} chunks on {} processes:".format(
len(BLOCK_LIST), len(CHUNKS_ARR), POOL_SIZE
))
Ethdrain.load_datastore_classes(CSVDatastore)
else:
print("Not specified correctly Output system")
exit()
POOL = mp.Pool(POOL_SIZE)
POOL.map(Ethdrain.launch_history_sync, CHUNKS_ARR)
print("[message] Ended history sync")
block = args.end_block + 1
Ethdrain.launch_continuous_sync(block)