Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metrics for async rpcs #3016

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
713 changes: 658 additions & 55 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ members = [
"crates/pos/types/pow-types",
"crates/pos/types/types",
"crates/pos/config/management/network-address-encryption",
"crates/rpc/rpc-middlewares",
]

resolver = "2"
Expand Down Expand Up @@ -194,6 +195,7 @@ cfx-rpc = { path = "./crates/rpc/rpc-eth-impl" }
cfx-rpc-utils = { path = "./crates/rpc/rpc-utils" }
cfx-rpc-builder = { path = "./crates/rpc/rpc-builder" }
cfx-rpc-cfx-impl = { path = "./crates/rpc/rpc-cfx-impl" }
cfx-rpc-middlewares = { path = "./crates/rpc/rpc-middlewares" }
bounded-executor = { path = "./crates/pos/common/bounded-executor" }
diem-channel = { path = "./crates/pos/common/channel", package = "channel" }
channel = { path = "./crates/pos/common/channel" }
Expand Down Expand Up @@ -320,7 +322,7 @@ secp256k1 = "0.30.0"
clap = "2"
rand = "0.7"
rand_xorshift = "0.2"
rand_08 = { package = "rand", version = "0.8"}
rand_08 = { package = "rand", version = "0.8" }
log = "0.4"
log4rs = "1.2.0"
env_logger = "0.11"
Expand Down Expand Up @@ -349,6 +351,7 @@ docopt = "1.0"
vergen = "7.0.0"
target_info = "0.1"
libc = "0.2"
rustls = "0.21"

# conflux forked crates
rocksdb = { git = "https://github.com/Conflux-Chain/rust-rocksdb.git", rev = "3773afe5b953997188f37c39308105b5deb0faac" }
Expand Down
1 change: 1 addition & 0 deletions crates/client/src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,7 @@ pub fn initialize_not_light_node_modules(
let async_eth_rpc_http_server =
tokio_runtime.block_on(launch_async_rpc_servers(
conf.rpc_impl_config(),
conf.raw_conf.throttling_conf.clone(),
conf.raw_conf.public_evm_rpc_async_apis.clone(),
consensus.clone(),
sync.clone(),
Expand Down
13 changes: 8 additions & 5 deletions crates/client/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,16 +518,19 @@ where

// start espace rpc server v2(async)
pub async fn launch_async_rpc_servers(
config: RpcImplConfiguration, apis: RpcModuleSelection,
consensus: SharedConsensusGraph, sync: SharedSynchronizationService,
tx_pool: SharedTransactionPool, addr: Option<SocketAddr>,
rpc_conf: RpcImplConfiguration, throttling_conf_file: Option<String>,
apis: RpcModuleSelection, consensus: SharedConsensusGraph,
sync: SharedSynchronizationService, tx_pool: SharedTransactionPool,
addr: Option<SocketAddr>,
) -> Result<Option<RpcServerHandle>, String> {
if addr.is_none() {
return Ok(None);
}

let enable_metrics = rpc_conf.enable_metrics;

let rpc_module_builder =
RpcModuleBuilder::new(config, consensus, sync, tx_pool);
RpcModuleBuilder::new(rpc_conf, consensus, sync, tx_pool);

info!(
"Enabled evm async rpc modules: {:?}",
Expand All @@ -545,7 +548,7 @@ pub async fn launch_async_rpc_servers(
.with_http_address(addr.unwrap());

let server_handle = server_config
.start(&transport_rpc_modules)
.start(&transport_rpc_modules, throttling_conf_file, enable_metrics)
.await
.map_err(|e| e.to_string())?;

Expand Down
13 changes: 11 additions & 2 deletions crates/rpc/rpc-builder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ strum = { workspace = true, features = ["derive"] }
alloy-primitives = { workspace = true}
thiserror = { workspace = true }

jsonrpc-core ={ workspace = true}
jsonrpsee-core = { workspace = true }
jsonrpsee-types = { workspace = true }
jsonrpsee = { workspace = true, features = ["server"] }
# tower-http = { workspace = true, features = ["full"] }
tower = { workspace = true, features = ["full"] }
http.workspace = true
pin-project = { workspace = true }
Expand All @@ -31,4 +31,13 @@ cfx-rpc-eth-api = { workspace = true }
cfxcore = { workspace = true }
cfx-types = { workspace = true }
cfx-rpc-eth-types = { workspace = true }
cfx-rpc-cfx-types = { workspace = true }
cfx-rpc-cfx-types = { workspace = true }
throttling = { workspace = true }
cfx-util-macros = { workspace = true }
log = { workspace = true }
cfx-rpc-utils = { workspace = true }
cfx-rpc-middlewares = { workspace = true }
futures-util = { workspace = true, features = ["io", "async-await-macro"] }

[dev-dependencies]
tokio = { workspace = true, features = ["full"] }
164 changes: 75 additions & 89 deletions crates/rpc/rpc-builder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ mod error;
mod id_provider;
mod module;

use cfx_rpc_middlewares::{Metrics, Throttle};
pub use error::*;
pub use id_provider::EthSubscriptionIdProvider;
use log::debug;
pub use module::{EthRpcModule, RpcModuleSelection};

use cfx_rpc::{helpers::ChainInfo, *};
Expand Down Expand Up @@ -267,7 +269,7 @@ impl RpcRegistryInner {
/// started, See also [`ServerBuilder::build`] and
/// [`Server::start`](jsonrpsee::server::Server::start).
#[derive(Debug)]
pub struct RpcServerConfig<RpcMiddleware = Identity> {
pub struct RpcServerConfig {
/// Configs for JSON-RPC Http.
http_server_config: Option<ServerBuilder<Identity, Identity>>,
/// Allowed CORS Domains for http
Expand All @@ -280,12 +282,12 @@ pub struct RpcServerConfig<RpcMiddleware = Identity> {
ws_cors_domains: Option<String>,
/// Address where to bind the ws server to
ws_addr: Option<SocketAddr>,
/// Configurable RPC middleware
#[allow(dead_code)]
rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
// /// Configurable RPC middleware
// #[allow(dead_code)]
// rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
}

impl Default for RpcServerConfig<Identity> {
impl Default for RpcServerConfig {
fn default() -> Self {
Self {
http_server_config: None,
Expand All @@ -294,7 +296,7 @@ impl Default for RpcServerConfig<Identity> {
ws_server_config: None,
ws_cors_domains: None,
ws_addr: None,
rpc_middleware: RpcServiceBuilder::new(),
// rpc_middleware: RpcServiceBuilder::new(),
}
}
}
Expand Down Expand Up @@ -337,21 +339,21 @@ impl RpcServerConfig {
}
}

impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
impl RpcServerConfig {
/// Configure rpc middleware
pub fn set_rpc_middleware<T>(
self, rpc_middleware: RpcServiceBuilder<T>,
) -> RpcServerConfig<T> {
RpcServerConfig {
http_server_config: self.http_server_config,
http_cors_domains: self.http_cors_domains,
http_addr: self.http_addr,
ws_server_config: self.ws_server_config,
ws_cors_domains: self.ws_cors_domains,
ws_addr: self.ws_addr,
rpc_middleware,
}
}
// pub fn set_rpc_middleware<T>(
// self, rpc_middleware: RpcServiceBuilder<T>,
// ) -> RpcServerConfig<T> {
// RpcServerConfig {
// http_server_config: self.http_server_config,
// http_cors_domains: self.http_cors_domains,
// http_addr: self.http_addr,
// ws_server_config: self.ws_server_config,
// ws_cors_domains: self.ws_cors_domains,
// ws_addr: self.ws_addr,
// rpc_middleware,
// }
// }

/// Configure the cors domains for http _and_ ws
pub fn with_cors(self, cors_domain: Option<String>) -> Self {
Expand Down Expand Up @@ -426,9 +428,20 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
// Returns the [`RpcServerHandle`] with the handle to the started servers.
pub async fn start(
self, modules: &TransportRpcModules,
throttling_conf_file: Option<String>, enable_metrics: bool,
) -> Result<RpcServerHandle, RpcError> {
let mut http_handle = None;
let mut ws_handle = None;
// TODO: handle enable metrics
debug!("enable metrics: {}", enable_metrics);

let rpc_middleware = RpcServiceBuilder::new()
.layer_fn(move |s| {
Throttle::new(
throttling_conf_file.as_ref().map(|s| s.as_str()),
"rpc",
s,
)
})
.layer_fn(|s| Metrics::new(s));

let http_socket_addr =
self.http_addr.unwrap_or(SocketAddr::V4(SocketAddrV4::new(
Expand Down Expand Up @@ -468,21 +481,7 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {

if let Some(builder) = self.http_server_config {
let server = builder
// .set_http_middleware(
// tower::ServiceBuilder::new()
// .option_layer(Self::maybe_cors_layer(cors)?)
// .option_layer(Self::maybe_jwt_layer(self.
// jwt_secret)), )
// .set_rpc_middleware(
// self.rpc_middleware.clone().layer(
// modules
// .http
// .as_ref()
// .or(modules.ws.as_ref())
// .map(RpcRequestMetrics::same_port)
// .unwrap_or_default(),
// ),
// )
.set_rpc_middleware(rpc_middleware)
.build(http_socket_addr)
.await
.map_err(|err| {
Expand All @@ -501,38 +500,33 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
modules.http.as_ref().or(modules.ws.as_ref())
{
let handle = server.start(module.clone());
http_handle = Some(handle.clone());
ws_handle = Some(handle);
let http_handle = Some(handle.clone());
let ws_handle = Some(handle);

return Ok(RpcServerHandle {
http_local_addr: Some(addr),
ws_local_addr: Some(addr),
http: http_handle,
ws: ws_handle,
});
}
return Ok(RpcServerHandle {
http_local_addr: Some(addr),
ws_local_addr: Some(addr),
http: http_handle,
ws: ws_handle,
});

return Err(RpcError::Custom(
"No valid RpcModule found from modules".to_string(),
));
}
}

let mut ws_local_addr = None;
let mut ws_server = None;
let mut http_local_addr = None;
let mut http_server = None;

let mut result = RpcServerHandle {
http_local_addr: None,
ws_local_addr: None,
http: None,
ws: None,
};
if let Some(builder) = self.ws_server_config {
let server = builder
.ws_only()
// .set_http_middleware(
// tower::ServiceBuilder::new()
// .option_layer(Self::maybe_cors_layer(self.
// ws_cors_domains.clone())?)
// .option_layer(Self::maybe_jwt_layer(self.
// jwt_secret)), )
// .set_rpc_middleware(
// self.rpc_middleware
// .clone()
// .layer(modules.ws.as_ref().
// map(RpcRequestMetrics::ws).unwrap_or_default()),
// )
.set_rpc_middleware(rpc_middleware.clone())
.build(ws_socket_addr)
.await
.map_err(|err| {
Expand All @@ -543,24 +537,20 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
RpcError::server_error(err, ServerKind::WS(ws_socket_addr))
})?;

ws_local_addr = Some(addr);
ws_server = Some(server);
let ws_local_addr = Some(addr);
let ws_server = Some(server);
let ws_handle = ws_server.map(|ws_server| {
ws_server.start(modules.ws.clone().expect("ws server error"))
});

result.ws = ws_handle;
result.ws_local_addr = ws_local_addr;
}

if let Some(builder) = self.http_server_config {
let server = builder
.http_only()
// .set_http_middleware(
// tower::ServiceBuilder::new()
// .option_layer(Self::maybe_cors_layer(self.
// http_cors_domains.clone())?)
// .option_layer(Self::maybe_jwt_layer(self.
// jwt_secret)), )
// .set_rpc_middleware(
// self.rpc_middleware.clone().layer(
// modules.http.as_ref().map(RpcRequestMetrics::http).
// unwrap_or_default(), ),
// )
.set_rpc_middleware(rpc_middleware)
.build(http_socket_addr)
.await
.map_err(|err| {
Expand All @@ -572,22 +562,18 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
let local_addr = server.local_addr().map_err(|err| {
RpcError::server_error(err, ServerKind::Http(http_socket_addr))
})?;
http_local_addr = Some(local_addr);
http_server = Some(server);
let http_local_addr = Some(local_addr);
let http_server = Some(server);
let http_handle = http_server.map(|http_server| {
http_server
.start(modules.http.clone().expect("http server error"))
});

result.http = http_handle;
result.http_local_addr = http_local_addr;
}

http_handle = http_server.map(|http_server| {
http_server.start(modules.http.clone().expect("http server error"))
});
ws_handle = ws_server.map(|ws_server| {
ws_server.start(modules.ws.clone().expect("ws server error"))
});
Ok(RpcServerHandle {
http_local_addr,
ws_local_addr,
http: http_handle,
ws: ws_handle,
})
Ok(result)
}
}

Expand Down
31 changes: 31 additions & 0 deletions crates/rpc/rpc-middlewares/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
[package]
name = "cfx-rpc-middlewares"
edition = "2021"
version.workspace = true
authors.workspace = true
description.workspace = true
documentation.workspace = true
homepage.workspace = true
keywords.workspace = true
repository.workspace = true
license-file.workspace = true

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
jsonrpc-core = { workspace = true }
jsonrpsee = { workspace = true ,features = ["server","ws-client"]}
rustls = { workspace = true, features = ["dangerous_configuration"] }
throttling = { workspace = true }
futures-util = { workspace = true }
log = { workspace = true }
jsonrpsee-types = { workspace = true }
cfx-rpc-utils = { workspace = true }
cfx-util-macros = { workspace = true }
lazy_static = { workspace = true }
parking_lot = { workspace = true }
metrics = { workspace = true }
futures = { workspace = true }
tracing-subscriber = {version = "=0.3.0",features = ["env-filter"]}
anyhow = {workspace = true}
tokio = { workspace = true }
Loading