Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
RogerKSI committed Sep 28, 2023
2 parents e51af9a + 05234a8 commit 051ae6b
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 7 deletions.
12 changes: 12 additions & 0 deletions flusher/flusher/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,3 +484,15 @@ def Column(*args, **kwargs):
Column("order", sa.String),
Column("last_update", CustomDateTime, index=True),
)

relayer_tx_stat_days = sa.Table(
"relayer_tx_stat_days",
metadata,
Column("date", CustomDate, primary_key=True),
Column("port", sa.String, primary_key=True),
Column("channel", sa.String, primary_key=True),
Column("address", sa.String, primary_key=True),
Column("ibc_received_txs", sa.Integer),
Column("last_update_at", CustomDateTime),
sa.ForeignKeyConstraint(["port", "channel"], ["channels.port", "channels.channel"]),
)
61 changes: 54 additions & 7 deletions flusher/flusher/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
counterparty_chains,
connections,
channels,
relayer_tx_stat_days,
)


Expand All @@ -48,6 +49,9 @@ def __init__(self, conn):
def get_transaction_id(self, tx_hash):
return self.conn.execute(select([transactions.c.id]).where(transactions.c.hash == tx_hash)).scalar()

def get_transaction_sender(self, id):
return self.conn.execute(select([transactions.c.sender]).where(transactions.c.id == id)).scalar()

def get_validator_id(self, val):
return self.conn.execute(select([validators.c.id]).where(validators.c.operator_address == val)).scalar()

Expand Down Expand Up @@ -81,6 +85,14 @@ def get_data_source_id(self, id):
def get_oracle_script_id(self, id):
return self.conn.execute(select([oracle_scripts.c.id]).where(oracle_scripts.c.id == id)).scalar()

def get_ibc_received_txs(self, date, port, channel, address):
msg = {"date": date, "port": port, "channel": channel, "address": address}
condition = True
for col in relayer_tx_stat_days.primary_key.columns.values():
condition = (col == msg[col.name]) & condition

return self.conn.execute(select([relayer_tx_stat_days.c.ibc_received_txs]).where(condition)).scalar()

def handle_new_block(self, msg):
self.conn.execute(blocks.insert(), msg)

Expand Down Expand Up @@ -434,17 +446,22 @@ def handle_update_data_source_requests_count_per_day(self, msg):
)

def handle_new_incoming_packet(self, msg):
self.update_last_update_channel(msg['dst_port'], msg['dst_channel'], msg['block_time'])
del msg["block_time"]
self.update_last_update_channel(msg["dst_port"], msg["dst_channel"], msg["block_time"])

msg["tx_id"] = self.get_transaction_id(msg["hash"])
del msg["hash"]

msg["sender"] = self.get_transaction_sender(msg["tx_id"])
self.handle_set_relayer_tx_stat_days(msg["dst_port"], msg["dst_channel"], msg["block_time"], msg["sender"])
del msg["block_time"]
del msg["sender"]

self.conn.execute(
insert(incoming_packets).values(**msg).on_conflict_do_nothing(constraint="incoming_packets_pkey")
)

def handle_new_outgoing_packet(self, msg):
self.update_last_update_channel(msg['src_port'], msg['src_channel'], msg['block_time'])
self.update_last_update_channel(msg["src_port"], msg["src_channel"], msg["block_time"])
del msg["block_time"]

msg["tx_id"] = self.get_transaction_id(msg["hash"])
Expand All @@ -455,7 +472,7 @@ def handle_new_outgoing_packet(self, msg):
)

def handle_update_outgoing_packet(self, msg):
self.update_last_update_channel(msg['src_port'], msg['src_channel'], msg['block_time'])
self.update_last_update_channel(msg["src_port"], msg["src_channel"], msg["block_time"])
del msg["block_time"]

condition = True
Expand Down Expand Up @@ -508,7 +525,37 @@ def handle_set_channel(self, msg):

def update_last_update_channel(self, port, channel, timestamp):
self.conn.execute(
channels.update().where((channels.c.port == port) & (channels.c.channel == channel)).values(
last_update=timestamp
)
channels.update()
.where((channels.c.port == port) & (channels.c.channel == channel))
.values(last_update=timestamp)
)

def handle_set_relayer_tx_stat_days(self, port, channel, timestamp, address):
relayer_tx_stat_day = {
"date": timestamp,
"port": port,
"channel": channel,
"address": address,
"last_update_at": timestamp,
}

if (
self.get_ibc_received_txs(
relayer_tx_stat_day["date"],
relayer_tx_stat_day["port"],
relayer_tx_stat_day["channel"],
relayer_tx_stat_day["address"],
)
is None
):
relayer_tx_stat_day["ibc_received_txs"] = 1
self.conn.execute(relayer_tx_stat_days.insert(), relayer_tx_stat_day)
else:
condition = True
for col in relayer_tx_stat_days.primary_key.columns.values():
condition = (col == relayer_tx_stat_day[col.name]) & condition
self.conn.execute(
relayer_tx_stat_days.update()
.where(condition)
.values(ibc_received_txs=relayer_tx_stat_days.c.ibc_received_txs + 1, last_update_at=timestamp)
)
24 changes: 24 additions & 0 deletions hasura/hasura-metadata/tables.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,16 @@
remote_table:
name: outgoing_packets
schema: public
- name: relayer_tx_stat_days
using:
manual_configuration:
column_mapping:
channel: channel
port: port
insertion_order: null
remote_table:
name: relayer_tx_stat_days
schema: public
- table:
name: connections
schema: public
Expand Down Expand Up @@ -495,6 +505,20 @@
- name: oracle_script
using:
foreign_key_constraint_on: oracle_script_id
- table:
name: relayer_tx_stat_days
schema: public
object_relationships:
- name: channel_by_channel_port
using:
manual_configuration:
column_mapping:
channel: channel
port: port
insertion_order: null
remote_table:
name: channels
schema: public
- table:
name: reporters
schema: public
Expand Down

0 comments on commit 051ae6b

Please sign in to comment.