Skip to content

Commit

Permalink
chore: add logger for the event feed
Browse files Browse the repository at this point in the history
  • Loading branch information
Prathiksha-Nataraja committed May 1, 2024
1 parent d3845f5 commit b4b500c
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 8 deletions.
20 changes: 20 additions & 0 deletions event_feed/src/cosmos/feeder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::*;
use crate::{common, cosmos};
use anyhow::*;
use runtime::{logger::CoreLogger, Logger};
use serde_json::Value;
use tendermint_rpc::event::{Event, EventData};

Expand Down Expand Up @@ -62,9 +63,14 @@ impl CosmosFeed {
///
/// The `event_feed` function returns a `Result<()>`.
pub async fn event_feed(&self, cb: &dyn Fn(Vec<serde_json::Value>)) -> Result<()> {
let logger = CoreLogger::new(Some("./cosmos-event-feed.log"));
let (client, driver) = WebSocketClient::new(&*self.chain_config.node_url)
.await
.unwrap();
logger.info(&format!(
"Following the publisher {}",
self.chain_config.node_url
));
let driver_handle = tokio::spawn(async move { driver.run().await });

let mut subs = client.subscribe(EventType::NewBlock.into()).await.unwrap();
Expand All @@ -91,6 +97,7 @@ impl CosmosFeed {
}

drop(subs);
logger.info("Websocket connection closed!!");
let _ = driver_handle.await;
Ok(())
}
Expand All @@ -115,6 +122,8 @@ impl CosmosFeed {
event: &Event,
chain_config: &ChainConfig,
) -> Option<Vec<serde_json::Value>> {
let logger = CoreLogger::new(Some("./event-feed.log"));
logger.debug(&format!("Processing events : {:?}", event));
match event.data.clone() {
EventData::LegacyNewBlock {
ref block,
Expand All @@ -124,8 +133,16 @@ impl CosmosFeed {
let block = block.as_ref().unwrap();
let block_number = block.header.version.block as usize;
let hash_string = block.header.last_commit_hash.map(|h| h.to_string());
logger.info(&format!(
"Processing LegacyNewBlockEvent for block: {}",
block.header.version.block
));

let filtered_events: Vec<Value> = if chain_config.event_filter.is_empty() {
logger.info(&format!(
"Processing all events from block : {}",
block_number
));
result_begin_block
.unwrap()
.events
Expand All @@ -141,6 +158,7 @@ impl CosmosFeed {
.map(|e| serde_json::to_value(e).unwrap())
.collect()
} else {
logger.info("Filtering events based on the event name");
result_begin_block
.unwrap()
.events
Expand All @@ -161,8 +179,10 @@ impl CosmosFeed {
};

if !filtered_events.is_empty() {
logger.info("returning the filtered events");
Some(filtered_events)
} else {
logger.info("No events matched the filter");
None
}
}
Expand Down
12 changes: 12 additions & 0 deletions event_feed/src/eth/feeder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use super::*;
use runtime::{logger::CoreLogger, Logger};

pub struct EthFeed {
eth_service: Provider<Ws>,
events: Vec<(String, H256)>,
Expand Down Expand Up @@ -33,6 +35,7 @@ impl EthFeed {
}

pub async fn event_feed(&self, cb: &dyn Fn(Vec<Value>)) -> Result<()> {
let logger = CoreLogger::new(Some("./eth-event-feed.log"));
let client = Arc::new(&self.eth_service);

let last_block = client
Expand All @@ -47,6 +50,10 @@ impl EthFeed {
let filter = Filter::new().from_block(last_block - 25).events(events);

let mut stream = client.subscribe_logs(&filter).await?;
logger.info(&format!(
"Subscribed to events with the filter : {:?}",
filter
));

while let Some(log) = stream.next().await {
if self.contracts.is_empty() || self.contracts.contains(&format!("{:?}", &log.address))
Expand All @@ -72,6 +79,10 @@ impl EthFeed {
let tx_receipt = client.get_transaction_receipt(tx_hash).await;

if let Ok(Some(tx_receipt)) = tx_receipt {
logger.info(&format!(
"Received trabsaction receipt for the tx_hash : {:?}",
tx_hash
));
let mut logs = Vec::<Value>::new();

for log in tx_receipt.logs.iter() {
Expand All @@ -80,6 +91,7 @@ impl EthFeed {
{
for evt in self.events.iter() {
if log.topics[0] == evt.1 {
logger.info(&format!("Matched event : {:?}", evt.0.clone()));
logs.push(
serde_json::to_value(
FeederEvent {
Expand Down
29 changes: 26 additions & 3 deletions event_feed/src/icon/feeder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::substrate::polkadot::runtime_apis::core::Core;

use super::*;
use runtime::{logger::CoreLogger, Logger};

pub struct IconFeed {
icon_service: IconService,
Expand Down Expand Up @@ -34,24 +37,32 @@ impl IconFeed {
}

pub async fn filter(&self, transaction: &Value) -> Result<bool> {
let logger = CoreLogger::new(Some("./icon-event-feed.log"));
let mut events_filter = false;
let mut score_filter = false;

if !self.events.is_empty() || !self.score.is_empty() {
logger.info("Checking the evnt filters or score filters");
let tx_hash: String =
serde_json::from_value(transaction.get("txHash").unwrap().clone()).unwrap();
logger.info(&format!(
"Filtering the events with the tx_hash : {:?}",
tx_hash
));

let event_logs = get_event_logs_by_tx_hash(&self.icon_service, &tx_hash).await?;

for event_log in event_logs {
if !&self.score.is_empty() {
for filter_score in &self.score {
if filter_score == &event_log.score_address {
logger.info(&format!("Matched the score filter : {:?}", filter_score));
score_filter = true;
break;
}
}
} else {
logger.info("No score filter found, allowing all the transactions");
score_filter = true;
}

Expand All @@ -64,6 +75,7 @@ impl IconFeed {
}
} else {
events_filter = true;
logger.info("No event filter found, allowing all the transactions");
}

if events_filter && score_filter {
Expand All @@ -75,14 +87,19 @@ impl IconFeed {
events_filter = true;
score_filter = true;
}

Ok(events_filter & score_filter)
let result = events_filter && score_filter;
logger.info(&format!("Filtering the result : {:?}", result));
Ok(result)
}

pub async fn event_feed(&self, cb: &dyn Fn(Vec<serde_json::Value>)) -> Result<()> {
let logger = CoreLogger::new(Some("./icon-event-feed.log"));
let mut latest_height = get_icon_block_height(&self.icon_service).await?;
let mut old_height = latest_height - 1;

logger.info(&format!(
"Event feed started at {:?}, {:?}",
latest_height, old_height
));
loop {
if old_height < latest_height {
let block = match self
Expand Down Expand Up @@ -113,12 +130,18 @@ impl IconFeed {
}
}

logger.info(&format!(
"Filtered the {:?} transactions",
filtered_tx.len()
));

if !filtered_tx.is_empty() {
cb(filtered_tx)
}

old_height += 1;
} else {
logger.info("No new blocks got detected, sleeping for 1 second");
sleep(Duration::from_secs(1));
}

Expand Down
6 changes: 6 additions & 0 deletions event_feed/src/icon/utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use super::*;
use runtime::{logger::CoreLogger, Logger};

pub async fn get_icon_block_height(icon_service: &IconService) -> Result<usize> {
let logger = CoreLogger::new(Some("./icon-event-feed.log"));
let latest_block = icon_service
.get_last_block()
.await
Expand All @@ -9,6 +11,7 @@ pub async fn get_icon_block_height(icon_service: &IconService) -> Result<usize>
.and_then(|value| value.get("height"))
.unwrap()
.clone();
logger.info(&format!("Got the latest block height {:?}", latest_block));

Ok(serde_json::from_value(latest_block)?)
}
Expand All @@ -17,6 +20,7 @@ pub async fn get_event_logs_by_tx_hash(
icon_service: &IconService,
tx_hash: &str,
) -> Result<Vec<TxEventLog>> {
let logger = CoreLogger::new(Some("./icon-event-feed.log"));
let mut tx_result = icon_service.get_transaction_result(tx_hash).await;

while let Err(error) = tx_result {
Expand All @@ -29,6 +33,7 @@ pub async fn get_event_logs_by_tx_hash(
));
}

logger.info(&format!("Retrying the transaction result fetch for tx_hash: {}", tx_hash));
sleep(Duration::from_millis(1000));
tx_result = icon_service.get_transaction_result(tx_hash).await
}
Expand All @@ -42,6 +47,7 @@ pub async fn get_event_logs_by_tx_hash(
.clone(),
)
.unwrap();
logger.info(&format!("Fetched {} event logs for the tx_hash {}", event_logs.len(), tx_hash));

Ok(event_logs)
}
9 changes: 8 additions & 1 deletion event_feed/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use common::cosmos::CosmosFeed;
use common::{ChainConfig, ProducerConfig};
use eth::EthFeed;
use icon::IconFeed;
use runtime::{logger::CoreLogger, Logger};
use substrate::types::PolkadotFeed;
use subxt::PolkadotConfig;

Expand All @@ -12,34 +13,39 @@ mod cosmos;
mod eth;
mod icon;
mod substrate;
// pub use cosmos;

#[tokio::main]
async fn main() {
let logger = CoreLogger::new(Some("./event-feed.log"));
dotenv::dotenv().ok();
let chain_config = envy::from_env::<ChainConfig>().unwrap();
let producer_config = envy::from_env::<ProducerConfig>().unwrap();

let mut ssb_client = Producer::new(producer_config.clone()).await.unwrap();
logger.info("SSB client created");
ssb_client.accept_invite().await.unwrap();

let feed: Context = match chain_config.clone().chain.to_lowercase().as_str() {
"substrate" => {
let polkadot_client = PolkadotFeed::<PolkadotConfig>::new(chain_config.clone())
.await
.unwrap();
logger.info("Polkadot client got created");
Context::PolkadotFeed(polkadot_client)
}
"icon" => {
let icon_client = IconFeed::new(chain_config.clone()).unwrap();
logger.info("Icon client got created");
Context::IconFeed(icon_client)
}
"eth" => {
let eth_client = EthFeed::new(chain_config.clone()).await.unwrap();
logger.info("Eth client got created");
Context::EthFeed(eth_client)
}
"cosmos" => {
let cosmos_client = CosmosFeed::new(chain_config.clone());
logger.info("Cosmos client got created");
Context::CosmosFeed(cosmos_client)
}
_ => panic!("Invalid Chain"),
Expand All @@ -48,6 +54,7 @@ async fn main() {
let _ = feed
.feed_events(&|e| {
for i in e {
logger.info(&format!("Process the events : {:?}", i));
tokio::spawn(async move {
let producer_config = envy::from_env::<ProducerConfig>().unwrap();
let mut ssb_client = Producer::new(producer_config.clone()).await.unwrap();
Expand Down
12 changes: 8 additions & 4 deletions event_feed/src/substrate/types.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::*;
use crate::common::ChainConfig;
use runtime::{logger::CoreLogger, Logger};

#[derive(Debug, Clone)]
pub struct PolkadotFeed<T: subxt::Config> {
Expand All @@ -11,9 +12,7 @@ pub struct PolkadotFeed<T: subxt::Config> {

impl<T: subxt::Config> PolkadotFeed<T> {
pub async fn new(chain_config: ChainConfig) -> Result<PolkadotFeed<PolkadotConfig>> {
let client = OnlineClient::<PolkadotConfig>::from_url(&chain_config.node_url)
.await
?;
let client = OnlineClient::<PolkadotConfig>::from_url(&chain_config.node_url).await?;
Ok(PolkadotFeed {
chain_config,
client,
Expand All @@ -26,13 +25,16 @@ impl<T: subxt::Config> PolkadotFeed<T> {
/// asynchronous function that takes a callback function `cb` as a parameter. This function
/// subscribes to finalized blocks on the Polkadot chain and processes the extrinsics and events
/// within those blocks.
pub async fn event_feed(&self, cb: &dyn Fn(Vec<serde_json::Value>)) -> Result<()>{
pub async fn event_feed(&self, cb: &dyn Fn(Vec<serde_json::Value>)) -> Result<()> {
let logger = CoreLogger::new(Some("./substrate-event-feed.log"));
logger.info("PolkadotEventFeed: event started");
let mut blocks_sub = self.client.blocks().subscribe_finalized().await?;

// For each block, print a bunch of information about it:
loop {
if let Some(block) = blocks_sub.next().await {
let block = block.unwrap();
logger.info("Processing the obtained block number");
let mut fetched_events = Vec::new();
let extrinsics = block.extrinsics().await.unwrap();
for ext in extrinsics.iter() {
Expand Down Expand Up @@ -79,9 +81,11 @@ impl<T: subxt::Config> PolkadotFeed<T> {
}

pub fn split_filter(&self) -> std::collections::HashMap<String, Vec<&str>> {
let logger = CoreLogger::new(Some("./substrate-event-feed.log"));
let mut filter_map: std::collections::HashMap<String, Vec<&str>> =
std::collections::HashMap::new();
if !self.chain_config.event_filter.is_empty() {
logger.info("No event filters configured");
for pair in self.chain_config.event_filter.split(';') {
if !pair.contains("=") {
filter_map.insert(pair.to_string(), vec![]);
Expand Down

0 comments on commit b4b500c

Please sign in to comment.