From 3e8af84f1859f962d4df71937fc611a3c6d010e6 Mon Sep 17 00:00:00 2001 From: Ajay Kumar <87293689+ajaykumargdr@users.noreply.github.com> Date: Tue, 23 Apr 2024 21:19:28 +0530 Subject: [PATCH] feat: implement eth event feed (#125) * chore: add eth event feed with filtering based on events and contract addresses * chore: filter the logs inside the tx receipt & use a common type for feeding the event --- event_feed/Cargo.toml | 32 ++++++++--- event_feed/src/common/mod.rs | 13 ++++- event_feed/src/eth/feeder.rs | 106 +++++++++++++++++++++++++++++++++++ event_feed/src/eth/mod.rs | 13 +++++ 4 files changed, 154 insertions(+), 10 deletions(-) create mode 100644 event_feed/src/eth/feeder.rs diff --git a/event_feed/Cargo.toml b/event_feed/Cargo.toml index fd292565..66a95766 100644 --- a/event_feed/Cargo.toml +++ b/event_feed/Cargo.toml @@ -1,4 +1,3 @@ - [package] name = 'event_feed' authors = ['The HugoByte Team '] @@ -9,14 +8,31 @@ version = '0.0.1' [dependencies] subxt = '0.35.3' dotenv = '0.15.0' -serde_json = "1.0.116" -serde ={ version = "1.0.198", features = ["derive"]} -dirs = "5.0.1" -envy = "0.4" -anyhow = "1.0.82" +serde_json = '1.0.116' +dirs = '5.0.1' +envy = '0.4' +anyhow = '1.0.82' icon-sdk = '1.2.0' -runtime = {path = "../runtime/lite"} + +[dependencies.ethers] +version = '2.0.8' +default_features = false +features = [ + 'ws', + 'rustls', +] + +[dependencies.serde] +version = '1.0.198' +features = ['derive'] + +[dependencies.runtime] +path = '../runtime/lite' [dependencies.tokio] version = '1.36.0' -features = ['macros', 'time', 'rt-multi-thread'] +features = [ + 'macros', + 'time', + 'rt-multi-thread', +] diff --git a/event_feed/src/common/mod.rs b/event_feed/src/common/mod.rs index 705f3deb..2e49b533 100644 --- a/event_feed/src/common/mod.rs +++ b/event_feed/src/common/mod.rs @@ -5,8 +5,7 @@ use serde_json::Value; mod kuska_client; pub use kuska_client::*; - -pub trait EventFeeder{ +pub trait EventFeeder { fn event_feed(&self, cb: fn(events: Vec)); } @@ -30,3 +29,13 @@ pub struct ProducerConfig { #[serde(default)] pub ssb_invite: String, } + +#[derive(Deserialize, Debug, Clone)] +pub struct FeederEvent { + pub name: String, + pub contract: String, + pub data: Value, + pub block_number: usize, + pub tx_hash: Option, + pub log_index: usize, +} diff --git a/event_feed/src/eth/feeder.rs b/event_feed/src/eth/feeder.rs new file mode 100644 index 00000000..0511f743 --- /dev/null +++ b/event_feed/src/eth/feeder.rs @@ -0,0 +1,106 @@ +use super::*; +pub struct EthFeed { + eth_service: Provider, + events: Vec<(String, H256)>, + contracts: Vec, +} + +impl EthFeed { + pub async fn new(config: ChainConfig) -> Result { + let events = config + .event_filter + .split(';') + .filter(|evt| !evt.is_empty()) + .map(|evt| (evt.to_string(), H256::from(keccak256(evt)))) + .collect::>(); + + let contracts = config + .contract_filter + .split(',') + .map(|evt| evt.to_string()) + .filter(|evt| !evt.is_empty()) + .collect::>(); + + let client = Provider::::connect(config.node_url).await?; + + let eth_feed = EthFeed { + eth_service: client, + events, + contracts, + }; + + Ok(eth_feed) + } + + pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<()> { + let client = Arc::new(&self.eth_service); + + let last_block = client + .get_block(BlockNumber::Latest) + .await? + .unwrap() + .number + .unwrap(); + + let events = self.events.iter().map(|e| e.0.clone()); + + let filter = Filter::new().from_block(last_block - 25).events(events); + + let mut stream = client.subscribe_logs(&filter).await?; + + while let Some(log) = stream.next().await { + if self.contracts.is_empty() || self.contracts.contains(&format!("{:?}", &log.address)) + { + let tx_hash = match log.transaction_hash { + Some(hash) => hash, + None => { + let event = self.events.iter().find(|e| e.1 == log.topics[0]); + + cb(vec![FeederEvent { + name: event.unwrap().0.clone(), + contract: format!("{:?}", &log.address), + data: json!({ "topics": log.topics, "data": log.data.to_vec() }), + block_number: log.block_number.unwrap_or_default().as_usize(), + tx_hash: None, + log_index: log.log_index.unwrap_or_default().as_usize(), + }]); + continue; + } + }; + + let tx_receipt = client.get_transaction_receipt(tx_hash).await; + + if let Ok(Some(tx_receipt)) = tx_receipt { + let mut logs = Vec::::new(); + + for log in tx_receipt.logs.iter() { + if self.contracts.is_empty() + || self.contracts.contains(&format!("{:?}", &log.address)) + { + for evt in self.events.iter() { + if log.topics[0] == evt.1 { + logs.push(FeederEvent { + name: evt.0.clone(), + contract: format!("{:?}", &log.address), + data: json!({ "topics": log.topics, "data": log.data.to_vec() }), + block_number: log.block_number.unwrap_or_default().as_usize(), + tx_hash: Some(format!("{:?}", tx_hash)), + log_index: log.log_index.unwrap_or_default().as_usize(), + }); + + break; + } + } + } + } + + if !logs.is_empty() { + cb(logs); + } + } + } + } + + Ok(()) + } +} diff --git a/event_feed/src/eth/mod.rs b/event_feed/src/eth/mod.rs index e69de29b..6abf509d 100644 --- a/event_feed/src/eth/mod.rs +++ b/event_feed/src/eth/mod.rs @@ -0,0 +1,13 @@ +use crate::common::{ChainConfig, FeederEvent}; +use anyhow::Result; +use ethers::{ + prelude::Ws, + providers::{Middleware, Provider, StreamExt}, + types::{BlockNumber, Filter, H256}, + utils::keccak256, +}; +use serde_json::json; +use std::sync::Arc; + +mod feeder; +pub use feeder::*;