This repository has been archived by the owner on Oct 19, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
check_airdrop.py
143 lines (125 loc) · 5.7 KB
/
check_airdrop.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
from dotenv import load_dotenv
from substrateinterface import SubstrateInterface, Keypair, ExtrinsicReceipt
import os
import time
import redis
from scalecodec.types import GenericExtrinsic
from redis import Redis
# from db.base import *
# from crawler import *
# from ssl import SSLEOFError, SSLError
# from post_api import *
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, AsyncEngine
from sqlalchemy import select, insert, or_
from sqlalchemy.engine.result import ScalarResult
import asyncio
import json
from common_fn import *
from substrateinterface.exceptions import SubstrateRequestException
from websocket import WebSocketConnectionClosedException, WebSocketTimeoutException, WebSocketException, WebSocketBadStatusException
load_dotenv()
async def get_asset_hub_batchall_by_block_num(substrate: SubstrateInterface, block_num: int):
res = []
try:
block_hash = substrate.get_block_hash(block_num)
txs = substrate.get_extrinsics(block_hash=block_hash)
for index, tx in enumerate(txs):
issuer = "162aMTRcXF27yNeNE82SfZj5KWH94sBtivvy7a5uef2ry81r"
# 如果签名地址是singer = tx.value.get("address")
if tx.value.get("address") == issuer and tx.value.get("call").get("call_function") == "batch_all":
extrinsic_hash = tx.value["extrinsic_hash"]
# print("dest: ", dest)
receipt = ExtrinsicReceipt(substrate, extrinsic_hash=extrinsic_hash,
block_hash=block_hash,
block_number=block_num,
extrinsic_idx=index, finalized=True)
if receipt.is_success:
batch_all = tx.value.get("call")["call_args"][0]["value"]
for mint in batch_all[:-1]:
if mint["call_function"] == "mint":
call_args = mint["call_args"]
dest = call_args[1]["value"]
amount = call_args[2]["value"]
print(f"user:{dest}, amount: {amount}")
async with async_session() as session:
async with session.begin():
stmt = insert(Airdrop).where(Airdrop.extrinsic_hash == extrinsic_hash)
rs = await session.scalars(stmt)
for r in rs:
# 把状态改成成功
r.status = 4
with open("airdrop.txt", 'a') as file:
file.write(f"{dest}, {amount}\n")
print("extrinsic_hash:", extrinsic_hash)
res.append(extrinsic_hash)
return res
except (SubstrateRequestException, WebSocketConnectionClosedException, WebSocketTimeoutException, WebSocketException, WebSocketBadStatusException) as e:
raise e
async def get_success_hash_but_fail_in_mysql(hash: str):
r = []
amt = 0
async with async_session() as session:
async with session.begin():
# 查询异常或者失败的交易
stmt = select(Airdrop).where(or_(Airdrop.status == 10, Airdrop.status == 3)).where(Airdrop.extrinsic_hash == hash)
res: ScalarResult = await session.scalars(stmt)
if len(res.all()) > 0:
print(f"{hash} 交易在数据库中失败或者异常, 但是在链上已经成功")
for r in res:
print("r:", r)
amt += r.amount
print("该笔交易的总金额是: ", amt)
if amt > 0:
return [hash, amt]
return []
async def main():
s = connect_substrate()
result = []
_range1 = list(range(6033389, 6033657))
_range2 = list(range(6035918, 6035969))
_range1.extend(_range2)
for num in _range1:
try:
print("区块高度:", num)
hashes = await get_asset_hub_batchall_by_block_num(s, num)
print(hashes)
for hash in hashes:
res = await get_success_hash_but_fail_in_mysql(hash)
if len(res) > 0:
result.append(res)
except (SubstrateRequestException, WebSocketConnectionClosedException, WebSocketTimeoutException, WebSocketException, WebSocketBadStatusException) as e:
try:
s = connect_substrate()
except Exception as e:
pass
print("result:", result)
amount = 0
for r in result:
amount += r[1]
print(f"数据库失败或者异常,但是链上成功的金额有: {amount}")
# 把状态是10的,全部改成0
while True:
async with async_session() as session:
async with session.begin():
stmt = select(Airdrop).where(Airdrop.status == 10)
res = await session.scalar(stmt)
if res is None:
break
res.status = 0
# 对账
async with async_session() as session:
async with session.begin():
stmt = select(func.sum(Airdrop.amount)).where(Airdrop.status == 4)
amount = await session.scalar(stmt)
if int(amount) == 18312933206:
print("对账成功")
else:
exit("对账失败,请重新对账")
async def test():
async with async_session() as session:
async with session.begin():
stmt = select(func.sum(AlreadyAirdrop.amount)).where(1==1)
res = await session.scalar(stmt)
print("res:", res)
if __name__ == "__main__":
asyncio.run(main(), debug=True)