diff --git a/flusher/flusher/db.py b/flusher/flusher/db.py index 7d47ee984..bf1b6e090 100644 --- a/flusher/flusher/db.py +++ b/flusher/flusher/db.py @@ -484,3 +484,15 @@ def Column(*args, **kwargs): Column("order", sa.String), Column("last_update", CustomDateTime, index=True), ) + +relayer_tx_stat_days = sa.Table( + "relayer_tx_stat_days", + metadata, + Column("date", CustomDate, primary_key=True), + Column("port", sa.String, primary_key=True), + Column("channel", sa.String, primary_key=True), + Column("address", sa.String, primary_key=True), + Column("ibc_received_txs", sa.Integer), + Column("last_update_at", CustomDateTime), + sa.ForeignKeyConstraint(["port", "channel"], ["channels.port", "channels.channel"]), +) diff --git a/flusher/flusher/handler.py b/flusher/flusher/handler.py index 4478d57e9..6fe84ba57 100644 --- a/flusher/flusher/handler.py +++ b/flusher/flusher/handler.py @@ -38,6 +38,7 @@ counterparty_chains, connections, channels, + relayer_tx_stat_days, ) @@ -48,6 +49,9 @@ def __init__(self, conn): def get_transaction_id(self, tx_hash): return self.conn.execute(select([transactions.c.id]).where(transactions.c.hash == tx_hash)).scalar() + def get_transaction_sender(self, id): + return self.conn.execute(select([transactions.c.sender]).where(transactions.c.id == id)).scalar() + def get_validator_id(self, val): return self.conn.execute(select([validators.c.id]).where(validators.c.operator_address == val)).scalar() @@ -81,6 +85,14 @@ def get_data_source_id(self, id): def get_oracle_script_id(self, id): return self.conn.execute(select([oracle_scripts.c.id]).where(oracle_scripts.c.id == id)).scalar() + def get_ibc_received_txs(self, date, port, channel, address): + msg = {"date": date, "port": port, "channel": channel, "address": address} + condition = True + for col in relayer_tx_stat_days.primary_key.columns.values(): + condition = (col == msg[col.name]) & condition + + return self.conn.execute(select([relayer_tx_stat_days.c.ibc_received_txs]).where(condition)).scalar() + def handle_new_block(self, msg): self.conn.execute(blocks.insert(), msg) @@ -434,17 +446,22 @@ def handle_update_data_source_requests_count_per_day(self, msg): ) def handle_new_incoming_packet(self, msg): - self.update_last_update_channel(msg['dst_port'], msg['dst_channel'], msg['block_time']) - del msg["block_time"] + self.update_last_update_channel(msg["dst_port"], msg["dst_channel"], msg["block_time"]) msg["tx_id"] = self.get_transaction_id(msg["hash"]) del msg["hash"] + + msg["sender"] = self.get_transaction_sender(msg["tx_id"]) + self.handle_set_relayer_tx_stat_days(msg["dst_port"], msg["dst_channel"], msg["block_time"], msg["sender"]) + del msg["block_time"] + del msg["sender"] + self.conn.execute( insert(incoming_packets).values(**msg).on_conflict_do_nothing(constraint="incoming_packets_pkey") ) def handle_new_outgoing_packet(self, msg): - self.update_last_update_channel(msg['src_port'], msg['src_channel'], msg['block_time']) + self.update_last_update_channel(msg["src_port"], msg["src_channel"], msg["block_time"]) del msg["block_time"] msg["tx_id"] = self.get_transaction_id(msg["hash"]) @@ -455,7 +472,7 @@ def handle_new_outgoing_packet(self, msg): ) def handle_update_outgoing_packet(self, msg): - self.update_last_update_channel(msg['src_port'], msg['src_channel'], msg['block_time']) + self.update_last_update_channel(msg["src_port"], msg["src_channel"], msg["block_time"]) del msg["block_time"] condition = True @@ -508,7 +525,37 @@ def handle_set_channel(self, msg): def update_last_update_channel(self, port, channel, timestamp): self.conn.execute( - channels.update().where((channels.c.port == port) & (channels.c.channel == channel)).values( - last_update=timestamp - ) + channels.update() + .where((channels.c.port == port) & (channels.c.channel == channel)) + .values(last_update=timestamp) ) + + def handle_set_relayer_tx_stat_days(self, port, channel, timestamp, address): + relayer_tx_stat_day = { + "date": timestamp, + "port": port, + "channel": channel, + "address": address, + "last_update_at": timestamp, + } + + if ( + self.get_ibc_received_txs( + relayer_tx_stat_day["date"], + relayer_tx_stat_day["port"], + relayer_tx_stat_day["channel"], + relayer_tx_stat_day["address"], + ) + is None + ): + relayer_tx_stat_day["ibc_received_txs"] = 1 + self.conn.execute(relayer_tx_stat_days.insert(), relayer_tx_stat_day) + else: + condition = True + for col in relayer_tx_stat_days.primary_key.columns.values(): + condition = (col == relayer_tx_stat_day[col.name]) & condition + self.conn.execute( + relayer_tx_stat_days.update() + .where(condition) + .values(ibc_received_txs=relayer_tx_stat_days.c.ibc_received_txs + 1, last_update_at=timestamp) + ) diff --git a/hasura/hasura-metadata/tables.yaml b/hasura/hasura-metadata/tables.yaml index 69f619ac8..1f2e87d04 100644 --- a/hasura/hasura-metadata/tables.yaml +++ b/hasura/hasura-metadata/tables.yaml @@ -168,6 +168,16 @@ remote_table: name: outgoing_packets schema: public + - name: relayer_tx_stat_days + using: + manual_configuration: + column_mapping: + channel: channel + port: port + insertion_order: null + remote_table: + name: relayer_tx_stat_days + schema: public - table: name: connections schema: public @@ -495,6 +505,20 @@ - name: oracle_script using: foreign_key_constraint_on: oracle_script_id +- table: + name: relayer_tx_stat_days + schema: public + object_relationships: + - name: channel_by_channel_port + using: + manual_configuration: + column_mapping: + channel: channel + port: port + insertion_order: null + remote_table: + name: channels + schema: public - table: name: reporters schema: public