Skip to content

Commit

Permalink
feat: Add rate limit layer
Browse files Browse the repository at this point in the history
  • Loading branch information
luis-herasme committed Jul 24, 2024
1 parent ecf0c73 commit fea27de
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 4 deletions.
2 changes: 1 addition & 1 deletion ghost-crab/src/cache/cache_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ fn contains_invalid_word(input: &[u8]) -> bool {
}

fn cacheable_request(request: &SerializedRequest) -> bool {
if !matches!(request.method(), "eth_getBlockByNumber") {
if !matches!(request.method(), "eth_getBlockByNumber" | "eth_getLogs" | "eth_call") {
return false;
}

Expand Down
13 changes: 11 additions & 2 deletions ghost-crab/src/cache/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@ use alloy::providers::RootProvider;
use alloy::rpc::client::ClientBuilder;
use alloy::transports::http::{Client, Http};
use std::collections::HashMap;
use std::time::Duration;

use crate::rate_limit::RateLimit;
use crate::rate_limit::RateLimitLayer;

use super::cache::load_cache;
use super::cache_layer::CacheLayer;
use super::cache_layer::CacheService;

pub type CacheProvider = RootProvider<CacheService<Http<Client>>>;
pub type CacheProvider = RootProvider<CacheService<RateLimit<Http<Client>>>>;

pub struct RPCManager {
rpcs: HashMap<String, CacheProvider>,
Expand All @@ -26,8 +30,13 @@ impl RPCManager {

let cache = load_cache(&network).unwrap();
let cache_layer = CacheLayer::new(cache);
let rate_limit_layer = RateLimitLayer::new(10_000, Duration::from_secs(1));

let client = ClientBuilder::default()
.layer(cache_layer)
.layer(rate_limit_layer)
.http(rpc_url.parse().unwrap());

let client = ClientBuilder::default().layer(cache_layer).http(rpc_url.parse().unwrap());
let provider = ProviderBuilder::new().on_client(client);

self.rpcs.insert(rpc_url.clone(), provider.clone());
Expand Down
4 changes: 3 additions & 1 deletion ghost-crab/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ pub mod block_handler;
pub mod cache;
pub mod event_handler;
pub mod indexer;
pub mod latest_block_manager;
pub mod prelude;

pub use ghost_crab_common::config;
pub use indexer::Indexer;

mod latest_block_manager;
mod rate_limit;
112 changes: 112 additions & 0 deletions ghost-crab/src/rate_limit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tokio::time::{Instant, Sleep};
use tower::Layer;
use tower::Service;

#[derive(Debug, Copy, Clone)]
pub struct Rate {
limit: u64,
period: Duration,
}

/// Enforces a rate limit on the number of requests the underlying
/// service can handle over a period of time.
#[derive(Debug, Clone)]
pub struct RateLimitLayer {
rate: Rate,
}

impl RateLimitLayer {
/// Create new rate limit layer.
pub fn new(limit: u64, period: Duration) -> Self {
let rate = Rate { limit, period };
RateLimitLayer { rate }
}
}

impl<S> Layer<S> for RateLimitLayer {
type Service = RateLimit<S>;

fn layer(&self, service: S) -> Self::Service {
RateLimit::new(service, self.rate)
}
}

/// Enforces a rate limit on the number of requests the underlying
/// service can handle over a period of time.
#[derive(Debug, Clone)]
pub struct RateLimit<T> {
inner: T,
rate: Rate,
state: Arc<Mutex<State>>,
}

#[derive(Debug)]
struct State {
until: Instant,
reserved: u64,
timer: Pin<Box<Sleep>>,
}

impl<T> RateLimit<T> {
/// Create a new rate limiter
pub fn new(inner: T, rate: Rate) -> Self {
let until = Instant::now() + rate.period;

let state = Arc::new(Mutex::new(State {
until,
reserved: rate.limit,
timer: Box::pin(tokio::time::sleep_until(until)),
}));

RateLimit { inner, rate, state }
}
}

impl<S, Request> Service<Request> for RateLimit<S>
where
S: Service<Request>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;

fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let now = Instant::now();
let mut state = self.state.lock().unwrap();

if now >= state.until {
state.until = now + self.rate.period;
state.reserved = 0;
state.timer.as_mut().reset(now + self.rate.period);
}

if state.reserved >= self.rate.limit {
ctx.waker().wake_by_ref();
let _ = state.timer.as_mut().poll(ctx);
return Poll::Pending;
}

match self.inner.poll_ready(ctx) {
Poll::Ready(value) => {
state.reserved += 1;
Poll::Ready(value)
}
Poll::Pending => {
ctx.waker().wake_by_ref();
let _ = state.timer.as_mut().poll(ctx);
Poll::Pending
}
}
}

fn call(&mut self, request: Request) -> Self::Future {
self.inner.call(request)
}
}

0 comments on commit fea27de

Please sign in to comment.