Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/aurras-runtime-lite-dev' into AV…
Browse files Browse the repository at this point in the history
…-243-Implementation-of-context
  • Loading branch information
shanithkk committed Apr 24, 2024
2 parents 9439bca + 3e8af84 commit 8274248
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 9 deletions.
33 changes: 24 additions & 9 deletions event_feed/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

[package]
name = 'event_feed'
authors = ['The HugoByte Team <[email protected]>']
Expand All @@ -9,15 +8,31 @@ version = '0.0.1'
[dependencies]
subxt = '0.35.3'
dotenv = '0.15.0'
serde_json = "1.0.116"
serde ={ version = "1.0.198", features = ["derive"]}
dirs = "5.0.1"
envy = "0.4"
anyhow = "1.0.82"
serde_json = '1.0.116'
dirs = '5.0.1'
envy = '0.4'
anyhow = '1.0.82'
icon-sdk = '1.2.0'
runtime = {path = "../runtime/lite"}
async-trait = '0.1.80'

[dependencies.ethers]
version = '2.0.8'
default_features = false
features = [
'ws',
'rustls',
]

[dependencies.serde]
version = '1.0.198'
features = ['derive']

[dependencies.runtime]
path = '../runtime/lite'

[dependencies.tokio]
version = '1.36.0'
features = ['macros', 'time', 'rt-multi-thread']
features = [
'macros',
'time',
'rt-multi-thread',
]
10 changes: 10 additions & 0 deletions event_feed/src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,13 @@ pub struct ProducerConfig {
#[serde(default)]
pub ssb_invite: String,
}

#[derive(Deserialize, Debug, Clone)]
pub struct FeederEvent {
pub name: String,
pub contract: String,
pub data: Value,
pub block_number: usize,
pub tx_hash: Option<String>,
pub log_index: usize,
}
106 changes: 106 additions & 0 deletions event_feed/src/eth/feeder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
use super::*;
pub struct EthFeed {
eth_service: Provider<Ws>,
events: Vec<(String, H256)>,
contracts: Vec<String>,
}

impl EthFeed {
pub async fn new(config: ChainConfig) -> Result<EthFeed> {
let events = config
.event_filter
.split(';')
.filter(|evt| !evt.is_empty())
.map(|evt| (evt.to_string(), H256::from(keccak256(evt))))
.collect::<Vec<(String, H256)>>();

let contracts = config
.contract_filter
.split(',')
.map(|evt| evt.to_string())
.filter(|evt| !evt.is_empty())
.collect::<Vec<String>>();

let client = Provider::<Ws>::connect(config.node_url).await?;

let eth_feed = EthFeed {
eth_service: client,
events,
contracts,
};

Ok(eth_feed)
}

pub async fn event_feed(&self, cb: &dyn Fn(Vec<FeederEvent>)) -> Result<()> {
let client = Arc::new(&self.eth_service);

let last_block = client
.get_block(BlockNumber::Latest)
.await?
.unwrap()
.number
.unwrap();

let events = self.events.iter().map(|e| e.0.clone());

let filter = Filter::new().from_block(last_block - 25).events(events);

let mut stream = client.subscribe_logs(&filter).await?;

while let Some(log) = stream.next().await {
if self.contracts.is_empty() || self.contracts.contains(&format!("{:?}", &log.address))
{
let tx_hash = match log.transaction_hash {
Some(hash) => hash,
None => {
let event = self.events.iter().find(|e| e.1 == log.topics[0]);

cb(vec![FeederEvent {
name: event.unwrap().0.clone(),
contract: format!("{:?}", &log.address),
data: json!({ "topics": log.topics, "data": log.data.to_vec() }),
block_number: log.block_number.unwrap_or_default().as_usize(),
tx_hash: None,
log_index: log.log_index.unwrap_or_default().as_usize(),
}]);
continue;
}
};

let tx_receipt = client.get_transaction_receipt(tx_hash).await;

if let Ok(Some(tx_receipt)) = tx_receipt {
let mut logs = Vec::<FeederEvent>::new();

for log in tx_receipt.logs.iter() {
if self.contracts.is_empty()
|| self.contracts.contains(&format!("{:?}", &log.address))
{
for evt in self.events.iter() {
if log.topics[0] == evt.1 {
logs.push(FeederEvent {
name: evt.0.clone(),
contract: format!("{:?}", &log.address),
data: json!({ "topics": log.topics, "data": log.data.to_vec() }),
block_number: log.block_number.unwrap_or_default().as_usize(),
tx_hash: Some(format!("{:?}", tx_hash)),
log_index: log.log_index.unwrap_or_default().as_usize(),
});

break;
}
}
}
}

if !logs.is_empty() {
cb(logs);
}
}
}
}

Ok(())
}
}
13 changes: 13 additions & 0 deletions event_feed/src/eth/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use crate::common::{ChainConfig, FeederEvent};
use anyhow::Result;
use ethers::{
prelude::Ws,
providers::{Middleware, Provider, StreamExt},
types::{BlockNumber, Filter, H256},
utils::keccak256,
};
use serde_json::json;
use std::sync::Arc;

mod feeder;
pub use feeder::*;

0 comments on commit 8274248

Please sign in to comment.