Skip to content

Commit

Permalink
rpc: re-use server builder per rpc interface (#6652)
Browse files Browse the repository at this point in the history
This PR changes that the server builder is created once and
shared/cloned for each connection to avoid some extra overhead to
construct this for each connection (as it was before).

I don't know why I constructed a new builder for each connection because
it's not needed but shouldn't make a big difference to my understanding.

---------

Co-authored-by: command-bot <>
(cherry picked from commit e1add3e)
  • Loading branch information
niklasad1 authored and github-actions[bot] committed Dec 13, 2024
1 parent 981d6c0 commit 4ddeb0f
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 55 deletions.
13 changes: 13 additions & 0 deletions prdoc/pr_6652.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json

title: "rpc server: re-use server builder per rpc interface"

doc:
- audience: Node Dev
description: |
This changes that the RPC server builder is re-used for each RPC interface which is more efficient than to build it for every connection.

crates:
- name: sc-rpc-server
bump: patch
101 changes: 50 additions & 51 deletions substrate/client/rpc-servers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,56 @@ where
local_addrs.push(local_addr);
let cfg = cfg.clone();

let id_provider2 = id_provider.clone();
let RpcSettings {
batch_config,
max_connections,
max_payload_in_mb,
max_payload_out_mb,
max_buffer_capacity_per_connection,
max_subscriptions_per_connection,
rpc_methods,
rate_limit_trust_proxy_headers,
rate_limit_whitelisted_ips,
host_filter,
cors,
rate_limit,
} = listener.rpc_settings();

let http_middleware = tower::ServiceBuilder::new()
.option_layer(host_filter)
// Proxy `GET /health, /health/readiness` requests to the internal
// `system_health` method.
.layer(NodeHealthProxyLayer::default())
.layer(cors);

let mut builder = jsonrpsee::server::Server::builder()
.max_request_body_size(max_payload_in_mb.saturating_mul(MEGABYTE))
.max_response_body_size(max_payload_out_mb.saturating_mul(MEGABYTE))
.max_connections(max_connections)
.max_subscriptions_per_connection(max_subscriptions_per_connection)
.enable_ws_ping(
PingConfig::new()
.ping_interval(Duration::from_secs(30))
.inactive_limit(Duration::from_secs(60))
.max_failures(3),
)
.set_http_middleware(http_middleware)
.set_message_buffer_capacity(max_buffer_capacity_per_connection)
.set_batch_request_config(batch_config)
.custom_tokio_runtime(cfg.tokio_handle.clone());

if let Some(provider) = id_provider.clone() {
builder = builder.set_id_provider(provider);
} else {
builder = builder.set_id_provider(RandomStringIdProvider::new(16));
};

let service_builder = builder.to_service_builder();
let deny_unsafe = deny_unsafe(&local_addr, &rpc_methods);

tokio_handle.spawn(async move {
loop {
let (sock, remote_addr, rpc_cfg) = tokio::select! {
let (sock, remote_addr) = tokio::select! {
res = listener.accept() => {
match res {
Ok(s) => s,
Expand All @@ -130,56 +175,10 @@ where
_ = cfg.stop_handle.clone().shutdown() => break,
};

let RpcSettings {
batch_config,
max_connections,
max_payload_in_mb,
max_payload_out_mb,
max_buffer_capacity_per_connection,
max_subscriptions_per_connection,
rpc_methods,
rate_limit_trust_proxy_headers,
rate_limit_whitelisted_ips,
host_filter,
cors,
rate_limit,
} = rpc_cfg;

let http_middleware = tower::ServiceBuilder::new()
.option_layer(host_filter)
// Proxy `GET /health, /health/readiness` requests to the internal
// `system_health` method.
.layer(NodeHealthProxyLayer::default())
.layer(cors);

let mut builder = jsonrpsee::server::Server::builder()
.max_request_body_size(max_payload_in_mb.saturating_mul(MEGABYTE))
.max_response_body_size(max_payload_out_mb.saturating_mul(MEGABYTE))
.max_connections(max_connections)
.max_subscriptions_per_connection(max_subscriptions_per_connection)
.enable_ws_ping(
PingConfig::new()
.ping_interval(Duration::from_secs(30))
.inactive_limit(Duration::from_secs(60))
.max_failures(3),
)
.set_http_middleware(http_middleware)
.set_message_buffer_capacity(max_buffer_capacity_per_connection)
.set_batch_request_config(batch_config)
.custom_tokio_runtime(cfg.tokio_handle.clone());

if let Some(provider) = id_provider2.clone() {
builder = builder.set_id_provider(provider);
} else {
builder = builder.set_id_provider(RandomStringIdProvider::new(16));
};

let service_builder = builder.to_service_builder();
let deny_unsafe = deny_unsafe(&local_addr, &rpc_methods);

let ip = remote_addr.ip();
let cfg2 = cfg.clone();
let service_builder2 = service_builder.clone();
let rate_limit_whitelisted_ips2 = rate_limit_whitelisted_ips.clone();

let svc =
tower::service_fn(move |mut req: http::Request<hyper::body::Incoming>| {
Expand All @@ -192,14 +191,14 @@ where
let proxy_ip =
if rate_limit_trust_proxy_headers { get_proxy_ip(&req) } else { None };

let rate_limit_cfg = if rate_limit_whitelisted_ips
let rate_limit_cfg = if rate_limit_whitelisted_ips2
.iter()
.any(|ips| ips.contains(proxy_ip.unwrap_or(ip)))
{
log::debug!(target: "rpc", "ip={ip}, proxy_ip={:?} is trusted, disabling rate-limit", proxy_ip);
None
} else {
if !rate_limit_whitelisted_ips.is_empty() {
if !rate_limit_whitelisted_ips2.is_empty() {
log::debug!(target: "rpc", "ip={ip}, proxy_ip={:?} is not trusted, rate-limit enabled", proxy_ip);
}
rate_limit
Expand Down
10 changes: 6 additions & 4 deletions substrate/client/rpc-servers/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,17 +176,19 @@ pub(crate) struct Listener {

impl Listener {
/// Accepts a new connection.
pub(crate) async fn accept(
&mut self,
) -> std::io::Result<(tokio::net::TcpStream, SocketAddr, RpcSettings)> {
pub(crate) async fn accept(&mut self) -> std::io::Result<(tokio::net::TcpStream, SocketAddr)> {
let (sock, remote_addr) = self.listener.accept().await?;
Ok((sock, remote_addr, self.cfg.clone()))
Ok((sock, remote_addr))
}

/// Returns the local address the listener is bound to.
pub fn local_addr(&self) -> SocketAddr {
self.local_addr
}

pub fn rpc_settings(&self) -> RpcSettings {
self.cfg.clone()
}
}

pub(crate) fn host_filtering(enabled: bool, addr: SocketAddr) -> Option<HostFilterLayer> {
Expand Down

0 comments on commit 4ddeb0f

Please sign in to comment.