diff --git a/event_feed/src/common/context.rs b/event_feed/src/common/context.rs new file mode 100644 index 0000000..0bdac6b --- /dev/null +++ b/event_feed/src/common/context.rs @@ -0,0 +1,20 @@ +use crate::IconFeed; +use crate::PolkadotFeed; +use subxt::PolkadotConfig; +use anyhow::Result; + +pub enum Context { + PolkadotFeed(PolkadotFeed), + IconFeed(IconFeed), +} + +impl Context { + pub async fn feed_events(&self, cb: &dyn Fn(Vec)) -> Result<()>{ + match self { + Context::PolkadotFeed(feed) => feed.event_feed(cb).await, + Context::IconFeed(feed) => { + feed.event_feed(cb).await + } + } + } +} diff --git a/event_feed/src/common/mod.rs b/event_feed/src/common/mod.rs index 2e49b53..06e26d9 100644 --- a/event_feed/src/common/mod.rs +++ b/event_feed/src/common/mod.rs @@ -4,13 +4,14 @@ use serde::Deserialize; use serde_json::Value; mod kuska_client; pub use kuska_client::*; +mod context; +pub use context::*; -pub trait EventFeeder { - fn event_feed(&self, cb: fn(events: Vec)); -} +pub use super::*; #[derive(Deserialize, Debug, Clone)] pub struct ChainConfig { + pub chain: String, pub node_url: String, #[serde(default)] pub event_filter: String, diff --git a/event_feed/src/icon/feeder.rs b/event_feed/src/icon/feeder.rs index 60db7e8..ae19174 100644 --- a/event_feed/src/icon/feeder.rs +++ b/event_feed/src/icon/feeder.rs @@ -79,7 +79,7 @@ impl IconFeed { Ok(events_filter & score_filter) } - pub async fn event_feed(&self, cb: fn(events: Vec)) -> Result<()> { + pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<()> { let mut latest_height = get_icon_block_height(&self.icon_service).await?; let mut old_height = latest_height - 1; diff --git a/event_feed/src/icon/tests.rs b/event_feed/src/icon/tests.rs index 523009c..72644b7 100644 --- a/event_feed/src/icon/tests.rs +++ b/event_feed/src/icon/tests.rs @@ -4,6 +4,7 @@ mod tests { #[tokio::test] async fn test_filter_events_true() { let config = ChainConfig { + chain:"Icon".to_string(), node_url: "https://api.icon.community/api/v3".to_string(), event_filter: "ICXTransfer".to_string(), contract_filter: "".to_string(), @@ -30,6 +31,7 @@ mod tests { #[tokio::test] async fn test_filter_events_false() { let config = ChainConfig { + chain:"Icon".to_string(), node_url: "https://api.icon.community/api/v3".to_string(), event_filter: "ICXIssued".to_string(), contract_filter: "".to_string(), @@ -52,6 +54,7 @@ mod tests { #[tokio::test] async fn test_filter_score_true() { let config = ChainConfig { + chain:"Icon".to_string(), node_url: "https://api.icon.community/api/v3".to_string(), event_filter: "".to_string(), contract_filter: "cx21e94c08c03daee80c25d8ee3ea22a20786ec231".to_string(), @@ -73,6 +76,7 @@ mod tests { #[tokio::test] async fn test_filter_score_false() { let config = ChainConfig { + chain:"Icon".to_string(), node_url: "https://api.icon.community/api/v3".to_string(), event_filter: "".to_string(), contract_filter: "cx21e94c08c03daee80c25d8ee3ea22a20786ec231".to_string(), @@ -94,6 +98,7 @@ mod tests { #[tokio::test] async fn test_filter_event_and_score_true() { let config = ChainConfig { + chain:"Icon".to_string(), node_url: "https://api.icon.community/api/v3".to_string(), event_filter: "Transfer".to_string(), contract_filter: "cx88fd7df7ddff82f7cc735c871dc519838cb235bb".to_string(), @@ -115,6 +120,7 @@ mod tests { #[tokio::test] async fn test_filter_event_and_score_false() { let config = ChainConfig { + chain:"Icon".to_string(), node_url: "https://api.icon.community/api/v3".to_string(), event_filter: "ICXIssued".to_string(), contract_filter: "cx88fd7df7ddff82f7cc735c871dc519838cb235bb".to_string(), diff --git a/event_feed/src/main.rs b/event_feed/src/main.rs index cb0927d..0598beb 100644 --- a/event_feed/src/main.rs +++ b/event_feed/src/main.rs @@ -1,5 +1,7 @@ +use crate::common::Context; use crate::common::Producer; -use common::{ChainConfig, EventFeeder, ProducerConfig}; +use common::{ChainConfig, ProducerConfig}; +use icon::IconFeed; use substrate::types::PolkadotFeed; use subxt::PolkadotConfig; @@ -18,19 +20,29 @@ async fn main() { let mut ssb_client = Producer::new(producer_config.clone()).await.unwrap(); ssb_client.accept_invite().await.unwrap(); - let polkadot = PolkadotFeed::::new(chain_config).await; - polkadot - .event_feed(&|e| { - for i in e { - tokio::spawn(async move { - let producer_config = envy::from_env::().unwrap(); - let mut ssb_client = Producer::new(producer_config.clone()).await.unwrap(); - ssb_client - .publish_feed(i) - .await - .expect("Failed to send event"); - }); - } - }) - .await; + let feed: Context = match chain_config.clone().chain.to_lowercase().as_str() { + "substrate" => { + let polkadot_client = PolkadotFeed::::new(chain_config.clone()).await.unwrap(); + Context::PolkadotFeed(polkadot_client) + } + "icon" => { + let icon_client = IconFeed::new(chain_config.clone()).unwrap(); + Context::IconFeed(icon_client) + } + _ => panic!("Invalid Chain"), + }; + + let _ = feed.feed_events(&|e| { + for i in e { + tokio::spawn(async move { + let producer_config = envy::from_env::().unwrap(); + let mut ssb_client = Producer::new(producer_config.clone()).await.unwrap(); + ssb_client + .publish_feed(i) + .await + .expect("Failed to send event"); + }); + } + }) + .await; } diff --git a/event_feed/src/substrate/mod.rs b/event_feed/src/substrate/mod.rs index e0a64bf..85745ff 100644 --- a/event_feed/src/substrate/mod.rs +++ b/event_feed/src/substrate/mod.rs @@ -1,8 +1,9 @@ -pub mod types; -use types::*; +#[cfg(test)] mod tests; +pub mod types; use serde::{Deserialize, Serialize}; -use subxt::{Config, OnlineClient, PolkadotConfig}; +use subxt::{OnlineClient, PolkadotConfig}; +use anyhow::Result; #[subxt::subxt(runtime_metadata_path = "./src/common/utils/polkadot_metadata_full.scale")] pub mod polkadot {} diff --git a/event_feed/src/substrate/tests.rs b/event_feed/src/substrate/tests.rs index ed60e4b..ac13de1 100644 --- a/event_feed/src/substrate/tests.rs +++ b/event_feed/src/substrate/tests.rs @@ -1,30 +1,41 @@ +use tests::types::PolkadotFeed; + use super::*; +#[cfg(test)] use crate::ChainConfig; #[tokio::test] async fn test_filter_events() { let config = ChainConfig { + chain: "Susbtrate".to_string(), node_url: "wss://rococo-rpc.polkadot.io".to_string(), event_filter: "balances=Transfer".to_string(), contract_filter: "".to_string(), }; - let polkadot = PolkadotFeed::::new(config).await; - let result = polkadot - .split_filter(); + let polkadot = PolkadotFeed::::new(config).await.unwrap(); + let result = polkadot.split_filter(); - assert_eq!(result, std::collections::HashMap::from([("balances".to_string(), vec!["Transfer"])])); + assert_eq!( + result, + std::collections::HashMap::from([("balances".to_string(), vec!["Transfer"])]) + ); } #[tokio::test] async fn test_filter_events_argument_with_no_method() { let config = ChainConfig { + chain: "Susbtrate".to_string(), node_url: "wss://rococo-rpc.polkadot.io".to_string(), event_filter: "balances=Transfer;system".to_string(), contract_filter: "".to_string(), }; - let polkadot = PolkadotFeed::::new(config).await; - let result = polkadot - .split_filter(); + let polkadot = PolkadotFeed::::new(config).await.unwrap(); + let result = polkadot.split_filter(); - assert_eq!(result, std::collections::HashMap::from([("balances".to_string(), vec!["Transfer"]), ("system".to_string(), vec![])])); + assert_eq!( + result, + std::collections::HashMap::from([ + ("balances".to_string(), vec!["Transfer"]), + ("system".to_string(), vec![]) + ]) + ); } - diff --git a/event_feed/src/substrate/types.rs b/event_feed/src/substrate/types.rs index c476bb1..dad7803 100644 --- a/event_feed/src/substrate/types.rs +++ b/event_feed/src/substrate/types.rs @@ -1,7 +1,7 @@ use super::*; use crate::common::ChainConfig; -#[derive(Debug,Clone)] +#[derive(Debug, Clone)] pub struct PolkadotFeed { chain_config: ChainConfig, @@ -10,14 +10,14 @@ pub struct PolkadotFeed { } impl PolkadotFeed { - pub async fn new(chain_config: ChainConfig) -> PolkadotFeed { + pub async fn new(chain_config: ChainConfig) -> Result> { let client = OnlineClient::::from_url(&chain_config.node_url) .await - .unwrap(); - PolkadotFeed { + ?; + Ok(PolkadotFeed { chain_config, client, - } + }) } } @@ -26,8 +26,8 @@ impl PolkadotFeed { /// asynchronous function that takes a callback function `cb` as a parameter. This function /// subscribes to finalized blocks on the Polkadot chain and processes the extrinsics and events /// within those blocks. - pub async fn event_feed(&self, cb: &dyn Fn( Vec)) { - let mut blocks_sub = self.client.blocks().subscribe_finalized().await.unwrap(); + pub async fn event_feed(&self, cb: &dyn Fn(Vec)) -> Result<()>{ + let mut blocks_sub = self.client.blocks().subscribe_finalized().await?; // For each block, print a bunch of information about it: loop { @@ -67,8 +67,8 @@ impl PolkadotFeed { method: event.variant_name().to_string(), field_value: data, }; - let serialize_event = serde_json::to_value(&decode_event); - fetched_events.push(serialize_event.unwrap()); + let serialize_event = serde_json::to_value(&decode_event)?; + fetched_events.push(serialize_event); } } }