Skip to content

Commit

Permalink
Adds execution_mode to handler
Browse files Browse the repository at this point in the history
  • Loading branch information
luis-herasme committed Jul 19, 2024
1 parent 8142372 commit 33f014a
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 127 deletions.
211 changes: 98 additions & 113 deletions ghost-crab-macros/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
extern crate proc_macro;
use ghost_crab_common::config::Config;
use ghost_crab_common::config::{Config, ExecutionMode};
use proc_macro::TokenStream;
use proc_macro2::{Ident, Literal};
use quote::{format_ident, quote};
Expand All @@ -8,122 +8,12 @@ use syn::{parse_macro_input, ItemFn};

#[proc_macro_attribute]
pub fn event_handler(metadata: TokenStream, input: TokenStream) -> TokenStream {
let (name, event_name) = get_source_and_event(metadata);
let config = get_config();

let source = config.data_sources.get(&name).expect("Source not found.");
let abi = Literal::string(&source.abi.clone());

let parsed = parse_macro_input!(input as ItemFn);
let fn_name = parsed.sig.ident.clone();
let fn_args = parsed.sig.inputs.clone();
let fn_body = parsed.block.clone();
let ctx = get_context_identifier(parsed);

let contract_name = format_ident!("{}Contract", fn_name);
let data_source = Literal::string(&name);

TokenStream::from(quote! {
sol!(
#[sol(rpc)]
#contract_name,
#abi
);

pub struct #fn_name;

impl #fn_name {
pub fn new() -> Arc<Box<(dyn Handler + Send + Sync)>> {
Arc::new(Box::new(#fn_name {}))
}
}

#[async_trait]
impl Handler for #fn_name {
async fn handle(&self, #fn_args) {
let decoded_log = #ctx
.log
.log_decode::<#contract_name::#event_name>()
.unwrap();

let event = decoded_log.data();

#fn_body
}

fn get_source(&self) -> String {
String::from(#data_source)
}

fn is_template(&self) -> bool {
false
}

fn get_event_signature(&self) -> String {
#contract_name::#event_name::SIGNATURE.to_string()
}
}
})
create_handler(metadata, input, false)
}

#[proc_macro_attribute]
pub fn template(metadata: TokenStream, input: TokenStream) -> TokenStream {
let (name, event_name) = get_source_and_event(metadata);
let config = get_config();

let source = config.templates.get(&name).expect("Source not found.");
let abi = Literal::string(&source.abi.clone());

let parsed = parse_macro_input!(input as ItemFn);
let fn_name = parsed.sig.ident.clone();
let fn_args = parsed.sig.inputs.clone();
let fn_body = parsed.block.clone();
let ctx = get_context_identifier(parsed);

let contract_name = format_ident!("{}Contract", fn_name);
let data_source = Literal::string(&name);

TokenStream::from(quote! {
sol!(
#[sol(rpc)]
#contract_name,
#abi
);

pub struct #fn_name;

impl #fn_name {
pub fn new() -> Arc<Box<(dyn Handler + Send + Sync)>> {
Arc::new(Box::new(#fn_name {}))
}
}

#[async_trait]
impl Handler for #fn_name {
async fn handle(&self, #fn_args) {
let decoded_log = #ctx
.log
.log_decode::<#contract_name::#event_name>()
.unwrap();

let event = decoded_log.data();

#fn_body
}

fn get_source(&self) -> String {
String::from(#data_source)
}

fn is_template(&self) -> bool {
true
}

fn get_event_signature(&self) -> String {
#contract_name::#event_name::SIGNATURE.to_string()
}
}
})
create_handler(metadata, input, true)
}

#[proc_macro_attribute]
Expand Down Expand Up @@ -222,3 +112,98 @@ fn get_context_identifier(parsed: ItemFn) -> Ident {

return ctx;
}

fn create_handler(metadata: TokenStream, input: TokenStream, is_template: bool) -> TokenStream {
let (name, event_name) = get_source_and_event(metadata);
let config = get_config();

let abi = if is_template {
config.templates.get(&name).expect("Source not found.").abi.clone()
} else {
config.data_sources.get(&name).expect("Source not found.").abi.clone()
};

let abi = Literal::string(&abi);

let execution_mode = if is_template {
if let Some(execution_mode) =
config.templates.get(&name).expect("Source not found.").execution_mode.clone()
{
execution_mode
} else {
ExecutionMode::Parallel
}
} else {
if let Some(execution_mode) =
config.data_sources.get(&name).expect("Source not found.").execution_mode.clone()
{
execution_mode
} else {
ExecutionMode::Parallel
}
};

let execution_mode = match execution_mode {
ExecutionMode::Parallel => quote! {
ExecutionMode::Parallel
},
ExecutionMode::Serial => quote! {
ExecutionMode::Serial
},
};

let parsed = parse_macro_input!(input as ItemFn);
let fn_name = parsed.sig.ident.clone();
let fn_args = parsed.sig.inputs.clone();
let fn_body = parsed.block.clone();
let ctx = get_context_identifier(parsed);

let contract_name = format_ident!("{}Contract", fn_name);
let data_source = Literal::string(&name);

TokenStream::from(quote! {
sol!(
#[sol(rpc)]
#contract_name,
#abi
);

pub struct #fn_name;

impl #fn_name {
pub fn new() -> Arc<Box<(dyn Handler + Send + Sync)>> {
Arc::new(Box::new(#fn_name {}))
}
}

#[async_trait]
impl Handler for #fn_name {
async fn handle(&self, #fn_args) {
let decoded_log = #ctx
.log
.log_decode::<#contract_name::#event_name>()
.unwrap();

let event = decoded_log.data();

#fn_body
}

fn get_source(&self) -> String {
String::from(#data_source)
}

fn is_template(&self) -> bool {
#is_template
}

fn execution_mode(&self) -> ExecutionMode {
#execution_mode
}

fn get_event_signature(&self) -> String {
#contract_name::#event_name::SIGNATURE.to_string()
}
}
})
}
2 changes: 1 addition & 1 deletion ghost-crab/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub trait Handler {
async fn handle(&self, params: Context);
fn get_source(&self) -> String;
fn is_template(&self) -> bool;
fn execution_mode(&self) -> ExecutionMode;
fn get_event_signature(&self) -> String;
}

Expand All @@ -32,5 +33,4 @@ pub struct HandlerConfig {
pub handler: HandleInstance,
pub provider: RootProvider<Http<Client>>,
pub templates: TemplateManager,
pub execution_mode: ExecutionMode,
}
4 changes: 0 additions & 4 deletions ghost-crab/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ impl TemplateManager {
let source = config.templates.get(&template.handler.get_source()).unwrap();

let provider = RPC_MANAGER.lock().await.get(source.network.clone()).await;
let execution_mode = source.execution_mode.unwrap_or(ExecutionMode::Parallel);

self.tx
.send(HandlerConfig {
Expand All @@ -35,7 +34,6 @@ impl TemplateManager {
provider,
handler: template.handler,
templates: self.clone(),
execution_mode,
})
.await
.unwrap();
Expand Down Expand Up @@ -82,7 +80,6 @@ impl Indexer {

let source = self.config.data_sources.get(&handler.get_source()).unwrap();
let provider = RPC_MANAGER.lock().await.get(source.network.clone()).await;
let execution_mode = source.execution_mode.unwrap_or(ExecutionMode::Parallel);

self.handlers.push(HandlerConfig {
start_block: source.start_block,
Expand All @@ -91,7 +88,6 @@ impl Indexer {
provider,
handler,
templates: self.templates.clone(),
execution_mode,
});
}

Expand Down
11 changes: 2 additions & 9 deletions ghost-crab/src/process_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,13 @@ use alloy::rpc::types::eth::Filter;
use ghost_crab_common::config::ExecutionMode;

pub async fn process_logs(
HandlerConfig {
start_block,
step,
address,
handler,
provider,
templates,
execution_mode,
}: HandlerConfig,
HandlerConfig { start_block, step, address, handler, provider, templates }: HandlerConfig,
) {
let mut current_block = start_block;
let event_signature = handler.get_event_signature();
let address = address.parse::<Address>().unwrap();

let execution_mode = handler.execution_mode();
let mut block_manager = LatestBlockManager::new(1000, provider.clone());

loop {
Expand Down

0 comments on commit 33f014a

Please sign in to comment.