Skip to content

Commit

Permalink
feat: Add requests per second to the "config.json"
Browse files Browse the repository at this point in the history
  • Loading branch information
luis-herasme committed Jul 25, 2024
1 parent f8ec7af commit 0244330
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 13 deletions.
15 changes: 11 additions & 4 deletions ghost-crab-common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,19 @@ pub struct BlockHandler {
pub step: u64,
}

#[derive(Debug, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct NetworkConfig {
pub rpc_url: String,
pub requests_per_second: u64,
}

#[derive(Debug, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Config {
pub data_sources: HashMap<String, DataSource>,
pub templates: HashMap<String, Template>,
pub networks: HashMap<String, String>,
pub networks: HashMap<String, NetworkConfig>,
pub block_handlers: HashMap<String, BlockHandler>,
}

Expand Down Expand Up @@ -103,9 +110,9 @@ fn parse_config(config_string: &str) -> Result<Config, ConfigError> {

fn replace_env_vars(config: &mut Config) -> Result<(), ConfigError> {
for (_key, value) in &mut config.networks {
if value.starts_with('$') {
*value = env::var(&value[1..])
.map_err(|_| ConfigError::EnvVarNotFound(value[1..].to_string()))?;
if value.rpc_url.starts_with('$') {
(*value).rpc_url = env::var(&value.rpc_url[1..])
.map_err(|_| ConfigError::EnvVarNotFound(value.rpc_url[1..].to_string()))?;
}
}

Expand Down
19 changes: 15 additions & 4 deletions ghost-crab-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ pub fn block_handler(metadata: TokenStream, input: TokenStream) -> TokenStream {
let config = config::load().unwrap();
let source = config.block_handlers.get(name).expect("Source not found.");

let rpc_url = config.networks.get(&source.network).expect("RPC url not found for network");
let rpc_url = Literal::string(&rpc_url);
let network_config =
config.networks.get(&source.network).expect("RPC url not found for network");
let rpc_url = Literal::string(&network_config.rpc_url);
let requests_per_second = Literal::u64_suffixed(network_config.requests_per_second);

let step = Literal::u64_suffixed(source.step);
let start_block = Literal::u64_suffixed(source.start_block);
Expand Down Expand Up @@ -71,6 +73,10 @@ pub fn block_handler(metadata: TokenStream, input: TokenStream) -> TokenStream {
String::from(#rpc_url)
}

fn rate_limit(&self) -> u64 {
#requests_per_second
}

fn start_block(&self) -> u64 {
#start_block
}
Expand Down Expand Up @@ -161,8 +167,9 @@ fn create_handler(metadata: TokenStream, input: TokenStream, is_template: bool)
start_block = Literal::u64_suffixed(source.start_block);
};

let rpc_url = config.networks.get(&network).expect("RPC url not found for network");
let rpc_url = Literal::string(&rpc_url);
let network_config = config.networks.get(&network).expect("RPC url not found for network");
let rpc_url = Literal::string(&network_config.rpc_url);
let requests_per_second = Literal::u64_suffixed(network_config.requests_per_second);

let abi = Literal::string(&abi);
let network = Literal::string(&network);
Expand Down Expand Up @@ -237,6 +244,10 @@ fn create_handler(metadata: TokenStream, input: TokenStream, is_template: bool)
String::from(#rpc_url)
}

fn rate_limit(&self) -> u64 {
#requests_per_second
}

fn execution_mode(&self) -> ExecutionMode {
#execution_mode
}
Expand Down
1 change: 1 addition & 0 deletions ghost-crab/src/block_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub trait BlockHandler {
fn network(&self) -> String;
fn rpc_url(&self) -> String;
fn start_block(&self) -> u64;
fn rate_limit(&self) -> u64;
fn execution_mode(&self) -> ExecutionMode;
}

Expand Down
9 changes: 7 additions & 2 deletions ghost-crab/src/cache/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,19 @@ impl RPCManager {
RPCManager { rpcs: HashMap::new() }
}

pub async fn get_or_create(&mut self, network: String, rpc_url: String) -> CacheProvider {
pub async fn get_or_create(
&mut self,
network: String,
rpc_url: String,
rate_limit: u64,
) -> CacheProvider {
if let Some(provider) = self.rpcs.get(&rpc_url) {
return provider.clone();
}

let cache = load_cache(&network).unwrap();
let cache_layer = CacheLayer::new(cache);
let rate_limit_layer = RateLimitLayer::new(10_000, Duration::from_secs(1));
let rate_limit_layer = RateLimitLayer::new(rate_limit, Duration::from_secs(1));

let client = ClientBuilder::default()
.layer(cache_layer)
Expand Down
1 change: 1 addition & 0 deletions ghost-crab/src/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub trait EventHandler {
fn address(&self) -> Address;
fn network(&self) -> String;
fn rpc_url(&self) -> String;
fn rate_limit(&self) -> u64;
fn execution_mode(&self) -> ExecutionMode;
fn event_signature(&self) -> String;
}
Expand Down
14 changes: 11 additions & 3 deletions ghost-crab/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ impl Indexer {
return;
}

let provider = self.rpc_manager.get_or_create(handler.network(), handler.rpc_url()).await;
let provider = self
.rpc_manager
.get_or_create(handler.network(), handler.rpc_url(), handler.rate_limit())
.await;

self.handlers.push(ProcessEventsInput {
start_block: handler.start_block(),
Expand All @@ -61,7 +64,10 @@ impl Indexer {
}

pub async fn load_block_handler(&mut self, handler: BlockHandlerInstance) {
let provider = self.rpc_manager.get_or_create(handler.network(), handler.rpc_url()).await;
let provider = self
.rpc_manager
.get_or_create(handler.network(), handler.rpc_url(), handler.rate_limit())
.await;

self.block_handlers.push(ProcessBlocksInput {
handler,
Expand Down Expand Up @@ -91,7 +97,9 @@ impl Indexer {
while let Some(template) = self.rx.recv().await {
let network = template.handler.network();
let rpc_url = template.handler.rpc_url();
let provider = self.rpc_manager.get_or_create(network, rpc_url).await;
let rate_limit = template.handler.rate_limit();

let provider = self.rpc_manager.get_or_create(network, rpc_url, rate_limit).await;

let handler = ProcessEventsInput {
start_block: template.start_block,
Expand Down

0 comments on commit 0244330

Please sign in to comment.