Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implementation of context for event feeder #126

Merged
merged 4 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions event_feed/src/common/context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use crate::IconFeed;
use crate::PolkadotFeed;
use subxt::PolkadotConfig;
use anyhow::Result;

pub enum Context {
PolkadotFeed(PolkadotFeed<PolkadotConfig>),
IconFeed(IconFeed),
}

impl Context {
pub async fn feed_events(&self, cb: &dyn Fn(Vec<serde_json::Value>)) -> Result<()>{
match self {
Context::PolkadotFeed(feed) => feed.event_feed(cb).await,
Context::IconFeed(feed) => {
feed.event_feed(cb).await
}
}
}
}
7 changes: 4 additions & 3 deletions event_feed/src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ use serde::Deserialize;
use serde_json::Value;
mod kuska_client;
pub use kuska_client::*;
mod context;
pub use context::*;

pub trait EventFeeder {
fn event_feed(&self, cb: fn(events: Vec<serde_json::Value>));
}
pub use super::*;

#[derive(Deserialize, Debug, Clone)]
pub struct ChainConfig {
pub chain: String,
pub node_url: String,
#[serde(default)]
pub event_filter: String,
Expand Down
2 changes: 1 addition & 1 deletion event_feed/src/icon/feeder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl IconFeed {
Ok(events_filter & score_filter)
}

pub async fn event_feed(&self, cb: fn(events: Vec<serde_json::Value>)) -> Result<()> {
pub async fn event_feed(&self, cb: &dyn Fn(Vec<serde_json::Value>)) -> Result<()> {
let mut latest_height = get_icon_block_height(&self.icon_service).await?;
let mut old_height = latest_height - 1;

Expand Down
6 changes: 6 additions & 0 deletions event_feed/src/icon/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod tests {
#[tokio::test]
async fn test_filter_events_true() {
let config = ChainConfig {
chain:"Icon".to_string(),
node_url: "https://api.icon.community/api/v3".to_string(),
event_filter: "ICXTransfer".to_string(),
contract_filter: "".to_string(),
Expand All @@ -30,6 +31,7 @@ mod tests {
#[tokio::test]
async fn test_filter_events_false() {
let config = ChainConfig {
chain:"Icon".to_string(),
node_url: "https://api.icon.community/api/v3".to_string(),
event_filter: "ICXIssued".to_string(),
contract_filter: "".to_string(),
Expand All @@ -52,6 +54,7 @@ mod tests {
#[tokio::test]
async fn test_filter_score_true() {
let config = ChainConfig {
chain:"Icon".to_string(),
node_url: "https://api.icon.community/api/v3".to_string(),
event_filter: "".to_string(),
contract_filter: "cx21e94c08c03daee80c25d8ee3ea22a20786ec231".to_string(),
Expand All @@ -73,6 +76,7 @@ mod tests {
#[tokio::test]
async fn test_filter_score_false() {
let config = ChainConfig {
chain:"Icon".to_string(),
node_url: "https://api.icon.community/api/v3".to_string(),
event_filter: "".to_string(),
contract_filter: "cx21e94c08c03daee80c25d8ee3ea22a20786ec231".to_string(),
Expand All @@ -94,6 +98,7 @@ mod tests {
#[tokio::test]
async fn test_filter_event_and_score_true() {
let config = ChainConfig {
chain:"Icon".to_string(),
node_url: "https://api.icon.community/api/v3".to_string(),
event_filter: "Transfer".to_string(),
contract_filter: "cx88fd7df7ddff82f7cc735c871dc519838cb235bb".to_string(),
Expand All @@ -115,6 +120,7 @@ mod tests {
#[tokio::test]
async fn test_filter_event_and_score_false() {
let config = ChainConfig {
chain:"Icon".to_string(),
node_url: "https://api.icon.community/api/v3".to_string(),
event_filter: "ICXIssued".to_string(),
contract_filter: "cx88fd7df7ddff82f7cc735c871dc519838cb235bb".to_string(),
Expand Down
44 changes: 28 additions & 16 deletions event_feed/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::common::Context;
use crate::common::Producer;
use common::{ChainConfig, EventFeeder, ProducerConfig};
use common::{ChainConfig, ProducerConfig};
use icon::IconFeed;
use substrate::types::PolkadotFeed;
use subxt::PolkadotConfig;

Expand All @@ -18,19 +20,29 @@ async fn main() {
let mut ssb_client = Producer::new(producer_config.clone()).await.unwrap();
ssb_client.accept_invite().await.unwrap();

let polkadot = PolkadotFeed::<PolkadotConfig>::new(chain_config).await;
polkadot
.event_feed(&|e| {
for i in e {
tokio::spawn(async move {
let producer_config = envy::from_env::<ProducerConfig>().unwrap();
let mut ssb_client = Producer::new(producer_config.clone()).await.unwrap();
ssb_client
.publish_feed(i)
.await
.expect("Failed to send event");
});
}
})
.await;
let feed: Context = match chain_config.clone().chain.to_lowercase().as_str() {
"substrate" => {
let polkadot_client = PolkadotFeed::<PolkadotConfig>::new(chain_config.clone()).await.unwrap();
Context::PolkadotFeed(polkadot_client)
}
"icon" => {
let icon_client = IconFeed::new(chain_config.clone()).unwrap();
Context::IconFeed(icon_client)
}
_ => panic!("Invalid Chain"),
};

let _ = feed.feed_events(&|e| {
for i in e {
tokio::spawn(async move {
let producer_config = envy::from_env::<ProducerConfig>().unwrap();
let mut ssb_client = Producer::new(producer_config.clone()).await.unwrap();
ssb_client
.publish_feed(i)
.await
.expect("Failed to send event");
});
}
})
.await;
}
7 changes: 4 additions & 3 deletions event_feed/src/substrate/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
pub mod types;
use types::*;
#[cfg(test)]
mod tests;
pub mod types;
use serde::{Deserialize, Serialize};
use subxt::{Config, OnlineClient, PolkadotConfig};
use subxt::{OnlineClient, PolkadotConfig};
use anyhow::Result;

#[subxt::subxt(runtime_metadata_path = "./src/common/utils/polkadot_metadata_full.scale")]
pub mod polkadot {}
Expand Down
29 changes: 20 additions & 9 deletions event_feed/src/substrate/tests.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,41 @@
use tests::types::PolkadotFeed;

use super::*;
#[cfg(test)]
use crate::ChainConfig;
#[tokio::test]
async fn test_filter_events() {
let config = ChainConfig {
chain: "Susbtrate".to_string(),
node_url: "wss://rococo-rpc.polkadot.io".to_string(),
event_filter: "balances=Transfer".to_string(),
contract_filter: "".to_string(),
};
let polkadot = PolkadotFeed::<PolkadotConfig>::new(config).await;
let result = polkadot
.split_filter();
let polkadot = PolkadotFeed::<PolkadotConfig>::new(config).await.unwrap();
let result = polkadot.split_filter();

assert_eq!(result, std::collections::HashMap::from([("balances".to_string(), vec!["Transfer"])]));
assert_eq!(
result,
std::collections::HashMap::from([("balances".to_string(), vec!["Transfer"])])
);
}

#[tokio::test]
async fn test_filter_events_argument_with_no_method() {
let config = ChainConfig {
chain: "Susbtrate".to_string(),
node_url: "wss://rococo-rpc.polkadot.io".to_string(),
event_filter: "balances=Transfer;system".to_string(),
contract_filter: "".to_string(),
};
let polkadot = PolkadotFeed::<PolkadotConfig>::new(config).await;
let result = polkadot
.split_filter();
let polkadot = PolkadotFeed::<PolkadotConfig>::new(config).await.unwrap();
let result = polkadot.split_filter();

assert_eq!(result, std::collections::HashMap::from([("balances".to_string(), vec!["Transfer"]), ("system".to_string(), vec![])]));
assert_eq!(
result,
std::collections::HashMap::from([
("balances".to_string(), vec!["Transfer"]),
("system".to_string(), vec![])
])
);
}

18 changes: 9 additions & 9 deletions event_feed/src/substrate/types.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::*;
use crate::common::ChainConfig;

#[derive(Debug,Clone)]
#[derive(Debug, Clone)]
pub struct PolkadotFeed<T: subxt::Config> {
chain_config: ChainConfig,

Expand All @@ -10,14 +10,14 @@ pub struct PolkadotFeed<T: subxt::Config> {
}

impl<T: subxt::Config> PolkadotFeed<T> {
pub async fn new(chain_config: ChainConfig) -> PolkadotFeed<PolkadotConfig> {
pub async fn new(chain_config: ChainConfig) -> Result<PolkadotFeed<PolkadotConfig>> {
let client = OnlineClient::<PolkadotConfig>::from_url(&chain_config.node_url)
.await
.unwrap();
PolkadotFeed {
?;
Ok(PolkadotFeed {
chain_config,
client,
}
})
}
}

Expand All @@ -26,8 +26,8 @@ impl<T: subxt::Config> PolkadotFeed<T> {
/// asynchronous function that takes a callback function `cb` as a parameter. This function
/// subscribes to finalized blocks on the Polkadot chain and processes the extrinsics and events
/// within those blocks.
pub async fn event_feed(&self, cb: &dyn Fn( Vec<serde_json::Value>)) {
let mut blocks_sub = self.client.blocks().subscribe_finalized().await.unwrap();
pub async fn event_feed(&self, cb: &dyn Fn(Vec<serde_json::Value>)) -> Result<()>{
let mut blocks_sub = self.client.blocks().subscribe_finalized().await?;

// For each block, print a bunch of information about it:
loop {
Expand Down Expand Up @@ -67,8 +67,8 @@ impl<T: subxt::Config> PolkadotFeed<T> {
method: event.variant_name().to_string(),
field_value: data,
};
let serialize_event = serde_json::to_value(&decode_event);
fetched_events.push(serialize_event.unwrap());
let serialize_event = serde_json::to_value(&decode_event)?;
fetched_events.push(serialize_event);
}
}
}
Expand Down