From 12473d2f4113c872984b085f4a6e9cd055f8aa64 Mon Sep 17 00:00:00 2001 From: Renato Dinhani <101204870+dinhani-cw@users.noreply.github.com> Date: Fri, 7 Jun 2024 15:33:51 -0300 Subject: [PATCH 1/6] feat: upgrade jsonrpsee (#1036) --- Cargo.lock | 331 ++++++++++++++----------------- Cargo.toml | 2 +- src/eth/rpc/rpc_middleware.rs | 22 +- src/eth/rpc/rpc_server.rs | 65 +++--- src/eth/rpc/rpc_subscriptions.rs | 15 +- 5 files changed, 202 insertions(+), 233 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5ad71514a..c73b1207d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -439,12 +439,6 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4c7f02d4ea65f2c1853089ffd8d2787bdbc63de2f0d29dedbcf8ccdfa0ccd4cf" -[[package]] -name = "base64" -version = "0.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" - [[package]] name = "base64" version = "0.21.7" @@ -575,15 +569,6 @@ dependencies = [ "wyz", ] -[[package]] -name = "block-buffer" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4" -dependencies = [ - "generic-array", -] - [[package]] name = "block-buffer" version = "0.10.4" @@ -735,6 +720,12 @@ dependencies = [ "once_cell", ] +[[package]] +name = "cesu8" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c" + [[package]] name = "cexpr" version = "0.6.0" @@ -828,6 +819,16 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" +[[package]] +name = "combine" +version = "4.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" +dependencies = [ + "bytes", + "memchr", +] + [[package]] name = "console-api" version = "0.6.0" @@ -1203,7 +1204,7 @@ version = "0.10.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ - "block-buffer 0.10.4", + "block-buffer", "const-oid", "crypto-common", "subtle", @@ -2112,6 +2113,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.0", "httparse", + "httpdate", "itoa", "pin-project-lite", "smallvec", @@ -2119,22 +2121,6 @@ dependencies = [ "want", ] -[[package]] -name = "hyper-rustls" -version = "0.24.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" -dependencies = [ - "futures-util", - "http 0.2.12", - "hyper 0.14.28", - "log", - "rustls 0.21.12", - "rustls-native-certs 0.6.3", - "tokio", - "tokio-rustls 0.24.1", -] - [[package]] name = "hyper-rustls" version = "0.27.1" @@ -2146,11 +2132,11 @@ dependencies = [ "hyper 1.3.1", "hyper-util", "log", - "rustls 0.23.5", - "rustls-native-certs 0.7.0", + "rustls", + "rustls-native-certs", "rustls-pki-types", "tokio", - "tokio-rustls 0.26.0", + "tokio-rustls", "tower-service", ] @@ -2379,6 +2365,26 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" +[[package]] +name = "jni" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6df18c2e3db7e453d3c6ac5b3e9d5182664d28788126d39b91f2d1e22b017ec" +dependencies = [ + "cesu8", + "combine", + "jni-sys", + "log", + "thiserror", + "walkdir", +] + +[[package]] +name = "jni-sys" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130" + [[package]] name = "jobserver" version = "0.1.31" @@ -2426,9 +2432,9 @@ dependencies = [ [[package]] name = "jsonrpsee" -version = "0.22.5" +version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cfdb12a2381ea5b2e68c3469ec604a007b367778cdb14d09612c8069ebd616ad" +checksum = "67210bd846b2dca59dc73f34717d6e1589305d506cdd6f14c849115d08e40876" dependencies = [ "jsonrpsee-client-transport", "jsonrpsee-core", @@ -2442,40 +2448,44 @@ dependencies = [ [[package]] name = "jsonrpsee-client-transport" -version = "0.22.5" +version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4978087a58c3ab02efc5b07c5e5e2803024536106fd5506f558db172c889b3aa" +checksum = "e4c171d64176ae8f57eec75bca9f9dda2b2f746314adef9160a5bbbb2a7c82cb" dependencies = [ + "base64 0.22.0", "futures-channel", "futures-util", "gloo-net", - "http 0.2.12", + "http 1.1.0", "jsonrpsee-core", "pin-project", - "rustls-native-certs 0.7.0", + "rustls", "rustls-pki-types", + "rustls-platform-verifier", "soketto", "thiserror", "tokio", - "tokio-rustls 0.25.0", + "tokio-rustls", "tokio-util", "tracing", "url", - "webpki-roots", ] [[package]] name = "jsonrpsee-core" -version = "0.22.5" +version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4b257e1ec385e07b0255dde0b933f948b5c8b8c28d42afda9587c3a967b896d" +checksum = "5e966c12c8b6c1790ce67683c792cea9dd250860df49f6b64f1b1ff6c6946850" dependencies = [ "anyhow", "async-trait", "beef", + "bytes", "futures-timer", "futures-util", - "hyper 0.14.28", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", "jsonrpsee-types", "parking_lot", "pin-project", @@ -2492,15 +2502,20 @@ dependencies = [ [[package]] name = "jsonrpsee-http-client" -version = "0.22.5" +version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ccf93fc4a0bfe05d851d37d7c32b7f370fe94336b52a2f0efc5f1981895c2e5" +checksum = "bb0187f1969287e5890d84460fe9f6556d98231e70b06179d4416e5c8c47167d" dependencies = [ "async-trait", - "hyper 0.14.28", - "hyper-rustls 0.24.2", + "base64 0.22.0", + "http-body 1.0.0", + "hyper 1.3.1", + "hyper-rustls", + "hyper-util", "jsonrpsee-core", "jsonrpsee-types", + "rustls", + "rustls-platform-verifier", "serde", "serde_json", "thiserror", @@ -2512,13 +2527,17 @@ dependencies = [ [[package]] name = "jsonrpsee-server" -version = "0.22.5" +version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12d8b6a9674422a8572e0b0abb12feeb3f2aeda86528c80d0350c2bd0923ab41" +checksum = "ba63ec742f5f9c4016ca4c443d04dc3ca56e240a26ebe6e56609a5109bd9d13f" dependencies = [ + "anyhow", "futures-util", - "http 0.2.12", - "hyper 0.14.28", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.3.1", + "hyper-util", "jsonrpsee-core", "jsonrpsee-types", "pin-project", @@ -2536,12 +2555,13 @@ dependencies = [ [[package]] name = "jsonrpsee-types" -version = "0.22.5" +version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "150d6168405890a7a3231a3c74843f58b8959471f6df76078db2619ddee1d07d" +checksum = "328c33717b7bdc4f47cdf31c21d5449c5b713a800cb05cb767841ccf2944f9d9" dependencies = [ "anyhow", "beef", + "http 1.1.0", "serde", "serde_json", "thiserror", @@ -2549,9 +2569,9 @@ dependencies = [ [[package]] name = "jsonrpsee-wasm-client" -version = "0.22.5" +version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f448d8eacd945cc17b6c0b42c361531ca36a962ee186342a97cdb8fca679cd77" +checksum = "6c1c1b2f71c32f763d85c1b9836c1a522377b3972177a76b3e66ba42f2582929" dependencies = [ "jsonrpsee-client-transport", "jsonrpsee-core", @@ -2560,11 +2580,11 @@ dependencies = [ [[package]] name = "jsonrpsee-ws-client" -version = "0.22.5" +version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58b9db2dfd5bb1194b0ce921504df9ceae210a345bc2f6c5a61432089bbab070" +checksum = "aeb22661e1c018eb503d5a47be2d39a176de832b047d2757fa86b3d2b34fc84e" dependencies = [ - "http 0.2.12", + "http 1.1.0", "jsonrpsee-client-transport", "jsonrpsee-core", "jsonrpsee-types", @@ -2656,15 +2676,15 @@ dependencies = [ "http-body 1.0.0", "http-body-util", "hyper 1.3.1", - "hyper-rustls 0.27.1", + "hyper-rustls", "hyper-timeout 0.5.1", "hyper-util", "jsonpath-rust", "k8s-openapi", "kube-core", "pem", - "rustls 0.23.5", - "rustls-pemfile 2.1.2", + "rustls", + "rustls-pemfile", "secrecy", "serde", "serde_json", @@ -3174,12 +3194,6 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" -[[package]] -name = "opaque-debug" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" - [[package]] name = "open-fastrlp" version = "0.1.4" @@ -4046,7 +4060,7 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls-pemfile 2.1.2", + "rustls-pemfile", "serde", "serde_json", "serde_urlencoded", @@ -4343,57 +4357,19 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" -dependencies = [ - "log", - "ring", - "rustls-webpki 0.101.7", - "sct", -] - -[[package]] -name = "rustls" -version = "0.22.4" +version = "0.23.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf4ef73721ac7bcd79b2b315da7779d8fc09718c6b3d2d1b2d94850eb8c18432" -dependencies = [ - "log", - "ring", - "rustls-pki-types", - "rustls-webpki 0.102.3", - "subtle", - "zeroize", -] - -[[package]] -name = "rustls" -version = "0.23.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "afabcee0551bd1aa3e18e5adbf2c0544722014b899adb31bd186ec638d3da97e" +checksum = "a218f0f6d05669de4eabfb24f31ce802035c952429d037507b4a4a39f0e60c5b" dependencies = [ "log", "once_cell", "ring", "rustls-pki-types", - "rustls-webpki 0.102.3", + "rustls-webpki", "subtle", "zeroize", ] -[[package]] -name = "rustls-native-certs" -version = "0.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" -dependencies = [ - "openssl-probe", - "rustls-pemfile 1.0.4", - "schannel", - "security-framework", -] - [[package]] name = "rustls-native-certs" version = "0.7.0" @@ -4401,21 +4377,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792" dependencies = [ "openssl-probe", - "rustls-pemfile 2.1.2", + "rustls-pemfile", "rustls-pki-types", "schannel", "security-framework", ] -[[package]] -name = "rustls-pemfile" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" -dependencies = [ - "base64 0.21.7", -] - [[package]] name = "rustls-pemfile" version = "2.1.2" @@ -4428,25 +4395,42 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.5.0" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "beb461507cee2c2ff151784c52762cf4d9ff6a61f3e80968600ed24fa837fa54" +checksum = "976295e77ce332211c0d24d92c0e83e50f5c5f046d11082cea19f3df13a3562d" [[package]] -name = "rustls-webpki" -version = "0.101.7" +name = "rustls-platform-verifier" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" +checksum = "b5f0d26fa1ce3c790f9590868f0109289a044acb954525f933e2aa3b871c157d" dependencies = [ - "ring", - "untrusted", + "core-foundation", + "core-foundation-sys", + "jni", + "log", + "once_cell", + "rustls", + "rustls-native-certs", + "rustls-platform-verifier-android", + "rustls-webpki", + "security-framework", + "security-framework-sys", + "webpki-roots", + "winapi", ] +[[package]] +name = "rustls-platform-verifier-android" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84e217e7fdc8466b5b35d30f8c0a30febd29173df4a3a0c2115d306b9c4117ad" + [[package]] name = "rustls-webpki" -version = "0.102.3" +version = "0.102.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3bce581c0dd41bce533ce695a1437fa16a7ab5ac3ccfa99fe1a620a7885eabf" +checksum = "ff448f7e92e913c4b7d4c6d8e4540a1724b319b4152b8aef6d4cf8339712b33e" dependencies = [ "ring", "rustls-pki-types", @@ -4477,6 +4461,15 @@ version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e86697c916019a8588c99b5fac3cead74ec0b4b819707a682fd4d23fa0ce1ba1" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "scale-info" version = "2.11.2" @@ -4540,16 +4533,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" -[[package]] -name = "sct" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" -dependencies = [ - "ring", - "untrusted", -] - [[package]] name = "seahash" version = "4.1.0" @@ -4609,6 +4592,7 @@ dependencies = [ "core-foundation", "core-foundation-sys", "libc", + "num-bigint", "security-framework-sys", ] @@ -4893,19 +4877,6 @@ dependencies = [ "unsafe-libyaml", ] -[[package]] -name = "sha-1" -version = "0.9.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99cd6713db3cf16b6c84e06321e049a9b9f699826e16096d23bbcc44d15d51a6" -dependencies = [ - "block-buffer 0.9.0", - "cfg-if", - "cpufeatures", - "digest 0.9.0", - "opaque-debug", -] - [[package]] name = "sha1" version = "0.10.6" @@ -5095,18 +5066,18 @@ dependencies = [ [[package]] name = "soketto" -version = "0.7.1" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41d1c5305e39e09653383c2c7244f2f78b3bcae37cf50c64cb4789c9f5096ec2" +checksum = "37468c595637c10857701c990f93a40ce0e357cedb0953d1c26c8d8027f9bb53" dependencies = [ - "base64 0.13.1", + "base64 0.22.0", "bytes", "futures", - "http 0.2.12", + "http 1.1.0", "httparse", "log", "rand", - "sha-1", + "sha1", ] [[package]] @@ -5772,34 +5743,13 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-rustls" -version = "0.24.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" -dependencies = [ - "rustls 0.21.12", - "tokio", -] - -[[package]] -name = "tokio-rustls" -version = "0.25.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" -dependencies = [ - "rustls 0.22.4", - "rustls-pki-types", - "tokio", -] - [[package]] name = "tokio-rustls" version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" dependencies = [ - "rustls 0.23.5", + "rustls", "rustls-pki-types", "tokio", ] @@ -6250,6 +6200,16 @@ dependencies = [ "libc", ] +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "want" version = "0.3.1" @@ -6392,6 +6352,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d4cc384e1e73b93bafa6fb4f1df8c41695c8a91cf9c4c64358067d15a7b6c6b" +dependencies = [ + "windows-sys 0.52.0", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index b296c4ffe..bc8707a57 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,7 +68,7 @@ rlp = "=0.5.2" triehash = "=0.8.4" # network -jsonrpsee = { version = "=0.22.5", features = ["server", "client"] } +jsonrpsee = { version = "=0.23.0", features = ["server", "client"] } k8s-openapi = { version = "=0.21.1", optional = true, features = ["v1_27"] } kube = { version = "=0.90.0", optional = true, features = ["runtime", "derive"] } raft = { version = "=0.7.0", optional = true } diff --git a/src/eth/rpc/rpc_middleware.rs b/src/eth/rpc/rpc_middleware.rs index 124650d67..d34218c00 100644 --- a/src/eth/rpc/rpc_middleware.rs +++ b/src/eth/rpc/rpc_middleware.rs @@ -10,6 +10,9 @@ use std::sync::atomic::Ordering; use std::task::Poll; use std::time::Instant; +use futures::future::BoxFuture; +use jsonrpsee::server::middleware::rpc::layer::ResponseFuture; +use jsonrpsee::server::middleware::rpc::RpcService; use jsonrpsee::server::middleware::rpc::RpcServiceT; use jsonrpsee::types::Params; use jsonrpsee::MethodResponse; @@ -36,15 +39,12 @@ static ACTIVE_REQUESTS: AtomicU64 = AtomicU64::new(0); // ----------------------------------------------------------------------------- #[derive(Debug, derive_new::new)] -pub struct RpcMiddleware { - service: S, +pub struct RpcMiddleware { + service: RpcService, } -impl<'a, S> RpcServiceT<'a> for RpcMiddleware -where - S: RpcServiceT<'a> + Send + Sync, -{ - type Future = RpcResponse; +impl<'a> RpcServiceT<'a> for RpcMiddleware { + type Future = RpcResponse<'a>; fn call(&self, request: jsonrpsee::types::Request<'a>) -> Self::Future { // extract signature if available @@ -99,9 +99,9 @@ fn extract_function_from_transaction(params: Params) -> Option { +pub struct RpcResponse<'a> { #[pin] - future_response: F, + future_response: ResponseFuture>, id: String, method: String, @@ -109,8 +109,8 @@ pub struct RpcResponse { start: Instant, } -impl> Future for RpcResponse { - type Output = F::Output; +impl<'a> Future for RpcResponse<'a> { + type Output = MethodResponse; fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll { // poll future diff --git a/src/eth/rpc/rpc_server.rs b/src/eth/rpc/rpc_server.rs index ee8a87437..a691edcdc 100644 --- a/src/eth/rpc/rpc_server.rs +++ b/src/eth/rpc/rpc_server.rs @@ -12,6 +12,7 @@ use jsonrpsee::server::RpcModule; use jsonrpsee::server::RpcServiceBuilder; use jsonrpsee::server::Server; use jsonrpsee::types::Params; +use jsonrpsee::Extensions; use jsonrpsee::IntoSubscriptionCloseResponse; use jsonrpsee::PendingSubscriptionSink; use serde_json::json; @@ -190,20 +191,20 @@ fn register_methods(mut module: RpcModule) -> anyhow::Result, ctx: Arc) -> anyhow::Result { +async fn debug_set_head(params: Params<'_>, ctx: Arc, _: Extensions) -> anyhow::Result { let (_, number) = next_rpc_param::(params.sequence())?; ctx.storage.reset(number).await?; Ok(serde_json::to_value(number).expect_infallible()) } #[cfg(feature = "dev")] -async fn evm_mine(_params: Params<'_>, ctx: Arc) -> anyhow::Result { +async fn evm_mine(_params: Params<'_>, ctx: Arc, _: Extensions) -> anyhow::Result { ctx.miner.mine_local_and_commit().await?; Ok(serde_json::to_value(true).expect_infallible()) } #[cfg(feature = "dev")] -async fn evm_set_next_block_timestamp(params: Params<'_>, ctx: Arc) -> anyhow::Result { +async fn evm_set_next_block_timestamp(params: Params<'_>, ctx: Arc, _: Extensions) -> anyhow::Result { use crate::eth::primitives::UnixTime; use crate::log_and_err; @@ -217,7 +218,7 @@ async fn evm_set_next_block_timestamp(params: Params<'_>, ctx: Arc) } #[cfg(feature = "dev")] -async fn debug_read_all_slots(params: Params<'_>, ctx: Arc) -> anyhow::Result { +async fn debug_read_all_slots(params: Params<'_>, ctx: Arc, _: Extensions) -> anyhow::Result { let (_, address) = next_rpc_param::
(params.sequence())?; Ok(serde_json::to_value(ctx.storage.read_all_slots(&address).await?).expect_infallible()) } @@ -226,21 +227,21 @@ async fn debug_read_all_slots(params: Params<'_>, ctx: Arc) -> anyho // Status // ----------------------------------------------------------------------------- -async fn net_listening(params: Params<'_>, arc: Arc) -> anyhow::Result { - stratus_readiness(params, arc).await +async fn net_listening(params: Params<'_>, arc: Arc, ext: Extensions) -> anyhow::Result { + stratus_readiness(params, arc, ext).await } -async fn stratus_startup(_: Params<'_>, _: Arc) -> anyhow::Result { +async fn stratus_startup(_: Params<'_>, _: Arc, _: Extensions) -> anyhow::Result { Ok(json!(true)) } -async fn stratus_readiness(_: Params<'_>, context: Arc) -> anyhow::Result { +async fn stratus_readiness(_: Params<'_>, context: Arc, _: Extensions) -> anyhow::Result { let should_serve = context.consensus.should_serve().await; tracing::info!("stratus_readiness: {}", should_serve); Ok(json!(should_serve)) } -async fn stratus_liveness(_: Params<'_>, _: Arc) -> anyhow::Result { +async fn stratus_liveness(_: Params<'_>, _: Arc, _: Extensions) -> anyhow::Result { Ok(json!(true)) } @@ -249,17 +250,17 @@ async fn stratus_liveness(_: Params<'_>, _: Arc) -> anyhow::Result, ctx: Arc) -> String { +async fn net_version(_: Params<'_>, ctx: Arc, _: Extensions) -> String { ctx.chain_id.to_string() } #[tracing::instrument(name = "rpc::eth_chainId", parent = None, skip_all)] -async fn eth_chain_id(_: Params<'_>, ctx: Arc) -> String { +async fn eth_chain_id(_: Params<'_>, ctx: Arc, _: Extensions) -> String { hex_num(ctx.chain_id) } #[tracing::instrument(name = "rpc::web3_clientVersion", parent = None, skip_all)] -async fn web3_client_version(_: Params<'_>, ctx: Arc) -> String { +async fn web3_client_version(_: Params<'_>, ctx: Arc, _: Extensions) -> String { ctx.client_version.to_owned() } @@ -268,7 +269,7 @@ async fn web3_client_version(_: Params<'_>, ctx: Arc) -> String { // ----------------------------------------------------------------------------- #[tracing::instrument(name = "rpc::eth_gasPrice", parent = None, skip_all)] -async fn eth_gas_price(_: Params<'_>, _: Arc) -> String { +async fn eth_gas_price(_: Params<'_>, _: Arc, _: Extensions) -> String { hex_zero() } @@ -277,23 +278,23 @@ async fn eth_gas_price(_: Params<'_>, _: Arc) -> String { // ----------------------------------------------------------------------------- #[tracing::instrument(name = "rpc::eth_blockNumber", parent = None, skip_all)] -async fn eth_block_number(_params: Params<'_>, ctx: Arc) -> anyhow::Result { +async fn eth_block_number(_params: Params<'_>, ctx: Arc, _: Extensions) -> anyhow::Result { let number = ctx.storage.read_mined_block_number().await?; Ok(serde_json::to_value(number).expect_infallible()) } #[tracing::instrument(name = "rpc::eth_getBlockByHash", parent = None, skip_all, fields(filter, found, number))] -async fn eth_get_block_by_hash(params: Params<'_>, ctx: Arc) -> anyhow::Result { - eth_get_block_by_selector(params, ctx).await +async fn eth_get_block_by_hash(params: Params<'_>, ctx: Arc, ext: Extensions) -> anyhow::Result { + eth_get_block_by_selector(params, ctx, ext).await } #[tracing::instrument(name = "rpc::eth_getBlockByNumber", parent = None, skip_all, fields(filter, found, number))] -async fn eth_get_block_by_number(params: Params<'_>, ctx: Arc) -> anyhow::Result { - eth_get_block_by_selector(params, ctx).await +async fn eth_get_block_by_number(params: Params<'_>, ctx: Arc, ext: Extensions) -> anyhow::Result { + eth_get_block_by_selector(params, ctx, ext).await } #[inline(always)] -async fn eth_get_block_by_selector(params: Params<'_>, ctx: Arc) -> anyhow::Result { +async fn eth_get_block_by_selector(params: Params<'_>, ctx: Arc, _: Extensions) -> anyhow::Result { let (params, block_selection) = next_rpc_param::(params.sequence())?; let (_, full_transactions) = next_rpc_param::(params)?; @@ -323,7 +324,7 @@ async fn eth_get_block_by_selector(params: Params<'_>, ctx: Arc) -> } #[tracing::instrument(name = "rpc::eth_getUncleByBlockHashAndIndex", parent = None, skip_all)] -async fn eth_get_uncle_by_block_hash_and_index(_params: Params<'_>, _ctx: Arc) -> anyhow::Result { +async fn eth_get_uncle_by_block_hash_and_index(_params: Params<'_>, _ctx: Arc, _: Extensions) -> anyhow::Result { Ok(JsonValue::Null) } @@ -332,7 +333,7 @@ async fn eth_get_uncle_by_block_hash_and_index(_params: Params<'_>, _ctx: Arc, ctx: Arc) -> anyhow::Result { +async fn eth_get_transaction_by_hash(params: Params<'_>, ctx: Arc, _: Extensions) -> anyhow::Result { let (_, hash) = next_rpc_param::(params.sequence())?; Span::with(|s| s.rec_str("hash", &hash)); @@ -349,7 +350,7 @@ async fn eth_get_transaction_by_hash(params: Params<'_>, ctx: Arc) - } #[tracing::instrument(name = "rpc::eth_getTransactionReceipt", parent = None, skip_all, fields(hash, found))] -async fn eth_get_transaction_receipt(params: Params<'_>, ctx: Arc) -> anyhow::Result { +async fn eth_get_transaction_receipt(params: Params<'_>, ctx: Arc, _: Extensions) -> anyhow::Result { let (_, hash) = next_rpc_param::(params.sequence())?; Span::with(|s| s.rec_str("hash", &hash)); @@ -365,7 +366,7 @@ async fn eth_get_transaction_receipt(params: Params<'_>, ctx: Arc) - } #[tracing::instrument(name = "rpc::eth_estimateGas", parent = None, skip_all)] -async fn eth_estimate_gas(params: Params<'_>, ctx: Arc) -> anyhow::Result { +async fn eth_estimate_gas(params: Params<'_>, ctx: Arc, _: Extensions) -> anyhow::Result { let (_, call) = next_rpc_param::(params.sequence())?; match ctx.executor.local_call(call, StoragePointInTime::Present).await { @@ -384,7 +385,7 @@ async fn eth_estimate_gas(params: Params<'_>, ctx: Arc) -> anyhow::R } #[tracing::instrument(name = "rpc::eth_call", parent = None, skip_all, fields(from, to))] -async fn eth_call(params: Params<'_>, ctx: Arc) -> anyhow::Result { +async fn eth_call(params: Params<'_>, ctx: Arc, _: Extensions) -> anyhow::Result { let (params, call) = next_rpc_param::(params.sequence())?; let (_, block_selection) = next_rpc_param_or_default::(params)?; @@ -407,7 +408,7 @@ async fn eth_call(params: Params<'_>, ctx: Arc) -> anyhow::Result, ctx: Arc) -> anyhow::Result { +async fn eth_send_raw_transaction(params: Params<'_>, ctx: Arc, _: Extensions) -> anyhow::Result { let (_, data) = next_rpc_param::(params.sequence())?; let tx = parse_rpc_rlp::(&data)?; @@ -452,7 +453,7 @@ async fn eth_send_raw_transaction(params: Params<'_>, ctx: Arc) -> a // ----------------------------------------------------------------------------- #[tracing::instrument(name = "rpc::eth_getLogs", parent = None, skip_all)] -async fn eth_get_logs(params: Params<'_>, ctx: Arc) -> anyhow::Result { +async fn eth_get_logs(params: Params<'_>, ctx: Arc, _: Extensions) -> anyhow::Result { let (_, filter_input) = next_rpc_param::(params.sequence())?; let filter = filter_input.parse(&ctx.storage).await?; @@ -465,12 +466,12 @@ async fn eth_get_logs(params: Params<'_>, ctx: Arc) -> anyhow::Resul // ----------------------------------------------------------------------------- #[tracing::instrument(name = "rpc::eth_accounts", parent = None, skip_all)] -async fn eth_accounts(_: Params<'_>, _ctx: Arc) -> anyhow::Result { +async fn eth_accounts(_: Params<'_>, _ctx: Arc, _: Extensions) -> anyhow::Result { Ok(json!([])) } #[tracing::instrument(name = "rpc::eth_getTransactionCount", parent = None, skip_all, fields(address))] -async fn eth_get_transaction_count(params: Params<'_>, ctx: Arc) -> anyhow::Result { +async fn eth_get_transaction_count(params: Params<'_>, ctx: Arc, _: Extensions) -> anyhow::Result { let (params, address) = next_rpc_param::
(params.sequence())?; let (_, block_selection) = next_rpc_param_or_default::(params)?; @@ -484,7 +485,7 @@ async fn eth_get_transaction_count(params: Params<'_>, ctx: Arc) -> } #[tracing::instrument(name = "rpc::eth_getBalance", parent = None, skip_all, fields(address))] -async fn eth_get_balance(params: Params<'_>, ctx: Arc) -> anyhow::Result { +async fn eth_get_balance(params: Params<'_>, ctx: Arc, _: Extensions) -> anyhow::Result { let (params, address) = next_rpc_param::
(params.sequence())?; let (_, block_selection) = next_rpc_param_or_default::(params)?; @@ -499,7 +500,7 @@ async fn eth_get_balance(params: Params<'_>, ctx: Arc) -> anyhow::Re } #[tracing::instrument(name = "rpc::eth_getCode", parent = None, skip_all, fields(address))] -async fn eth_get_code(params: Params<'_>, ctx: Arc) -> anyhow::Result { +async fn eth_get_code(params: Params<'_>, ctx: Arc, _: Extensions) -> anyhow::Result { let (params, address) = next_rpc_param::
(params.sequence())?; let (_, block_selection) = next_rpc_param_or_default::(params)?; @@ -518,7 +519,7 @@ async fn eth_get_code(params: Params<'_>, ctx: Arc) -> anyhow::Resul // ----------------------------------------------------------------------------- #[tracing::instrument(name = "rpc::eth_subscribe", parent = None, skip_all)] -async fn eth_subscribe(params: Params<'_>, pending: PendingSubscriptionSink, ctx: Arc) -> impl IntoSubscriptionCloseResponse { +async fn eth_subscribe(params: Params<'_>, pending: PendingSubscriptionSink, ctx: Arc, _: Extensions) -> impl IntoSubscriptionCloseResponse { let (params, kind) = next_rpc_param::(params.sequence())?; match kind.deref() { "newPendingTransactions" => { @@ -551,7 +552,7 @@ async fn eth_subscribe(params: Params<'_>, pending: PendingSubscriptionSink, ctx // ----------------------------------------------------------------------------- #[tracing::instrument(name = "rpc::eth_getStorageAt", parent = None, skip_all, fields(address, index))] -async fn eth_get_storage_at(params: Params<'_>, ctx: Arc) -> anyhow::Result { +async fn eth_get_storage_at(params: Params<'_>, ctx: Arc, _: Extensions) -> anyhow::Result { let (params, address) = next_rpc_param::
(params.sequence())?; let (params, index) = next_rpc_param::(params)?; let (_, block_selection) = next_rpc_param_or_default::(params)?; diff --git a/src/eth/rpc/rpc_subscriptions.rs b/src/eth/rpc/rpc_subscriptions.rs index c4dbaf5e9..175b6ad27 100644 --- a/src/eth/rpc/rpc_subscriptions.rs +++ b/src/eth/rpc/rpc_subscriptions.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use futures::join; use itertools::Itertools; +use jsonrpsee::ConnectionId; use jsonrpsee::SubscriptionMessage; use jsonrpsee::SubscriptionSink; use tokio::sync::broadcast; @@ -31,8 +32,6 @@ const CLEANING_FREQUENCY: Duration = Duration::from_secs(10); /// Timeout used when sending notifications to subscribers. const NOTIFICATION_TIMEOUT: Duration = Duration::from_secs(1); -type SubscriptionId = usize; - #[cfg(feature = "metrics")] mod label { pub(super) const PENDING_TXS: &str = "newPendingTransactions"; @@ -200,15 +199,15 @@ impl RpcSubscriptionsHandles { /// Active client subscriptions. #[derive(Debug, Default)] pub struct RpcSubscriptionsConnected { - new_pending_txs: RwLock>, - new_heads: RwLock>, - logs: RwLock>, + new_pending_txs: RwLock>, + new_heads: RwLock>, + logs: RwLock>, } impl RpcSubscriptionsConnected { /// Adds a new subscriber to `newPendingTransactions` event. pub async fn add_new_pending_txs(&self, sink: SubscriptionSink) { - tracing::debug!(id = %sink.connection_id(), "subscribing to newPendingTransactions event"); + tracing::debug!(id = %sink.connection_id().0, "subscribing to newPendingTransactions event"); let mut subs = self.new_pending_txs.write().await; subs.insert(sink.connection_id(), sink); @@ -218,7 +217,7 @@ impl RpcSubscriptionsConnected { /// Adds a new subscriber to `newHeads` event. pub async fn add_new_heads(&self, sink: SubscriptionSink) { - tracing::debug!(id = %sink.connection_id(), "subscribing to newHeads event"); + tracing::debug!(id = %sink.connection_id().0, "subscribing to newHeads event"); let mut subs = self.new_heads.write().await; subs.insert(sink.connection_id(), sink); @@ -228,7 +227,7 @@ impl RpcSubscriptionsConnected { /// Adds a new subscriber to `logs` event. pub async fn add_logs(&self, sink: SubscriptionSink, filter: LogFilter) { - tracing::debug!(id = %sink.connection_id(), ?filter, "subscribing to logs event"); + tracing::debug!(id = %sink.connection_id().0, ?filter, "subscribing to logs event"); let mut subs = self.logs.write().await; subs.insert(sink.connection_id(), (sink, filter)); From 1d14a5360eca880a7c3392e59772d3f37e018b7d Mon Sep 17 00:00:00 2001 From: Renato Dinhani <101204870+dinhani-cw@users.noreply.github.com> Date: Fri, 7 Jun 2024 19:03:12 -0300 Subject: [PATCH 2/6] feat: track client app (#1039) --- Cargo.lock | 1 + Cargo.toml | 1 + e2e-contracts/integration/hardhat.config.ts | 2 +- e2e-contracts/integration/test/helpers/rpc.ts | 2 +- e2e/hardhat.config.ts | 2 +- src/eth/rpc/mod.rs | 4 ++ src/eth/rpc/rpc_client_app.rs | 20 +++++++ src/eth/rpc/rpc_http_middleware.rs | 58 +++++++++++++++++++ src/eth/rpc/rpc_middleware.rs | 8 +++ src/eth/rpc/rpc_server.rs | 2 + 10 files changed, 97 insertions(+), 3 deletions(-) create mode 100644 src/eth/rpc/rpc_client_app.rs create mode 100644 src/eth/rpc/rpc_http_middleware.rs diff --git a/Cargo.lock b/Cargo.lock index c73b1207d..3812f38bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5387,6 +5387,7 @@ dependencies = [ "sentry-tracing", "serde", "serde_json", + "serde_urlencoded", "serde_with 3.8.1", "sqlx", "stringreader", diff --git a/Cargo.toml b/Cargo.toml index bc8707a57..147f64f20 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,7 @@ display_json = "=0.2.1" prost = "=0.12.6" serde = "=1.0.203" serde_json = "=1.0.117" +serde_urlencoded = "=0.7.1" serde_with = "=3.8.1" # parallelism diff --git a/e2e-contracts/integration/hardhat.config.ts b/e2e-contracts/integration/hardhat.config.ts index 67a877c6e..ba8e684cf 100644 --- a/e2e-contracts/integration/hardhat.config.ts +++ b/e2e-contracts/integration/hardhat.config.ts @@ -30,7 +30,7 @@ const config: HardhatUserConfig = { }, }, stratus: { - url: "http://localhost:3000", + url: "http://localhost:3000?app=e2e", accounts: { mnemonic: ACCOUNTS_MNEMONIC, }, diff --git a/e2e-contracts/integration/test/helpers/rpc.ts b/e2e-contracts/integration/test/helpers/rpc.ts index 19d493438..db37a1b98 100644 --- a/e2e-contracts/integration/test/helpers/rpc.ts +++ b/e2e-contracts/integration/test/helpers/rpc.ts @@ -39,7 +39,7 @@ export let ETHERJS = new JsonRpcProvider( export function updateProviderUrl(providerName: string) { switch (providerName) { case 'stratus': - providerUrl = 'http://localhost:3000'; + providerUrl = 'http://localhost:3000?app=e2e'; break; case 'hardhat': providerUrl = 'http://localhost:8545'; diff --git a/e2e/hardhat.config.ts b/e2e/hardhat.config.ts index 255d49daa..8454d1f60 100644 --- a/e2e/hardhat.config.ts +++ b/e2e/hardhat.config.ts @@ -42,7 +42,7 @@ const config: HardhatUserConfig = { }, }, stratus: { - url: "http://localhost:3000", + url: "http://localhost:3000?app=e2e", accounts: { mnemonic: ACCOUNTS_MNEMONIC, }, diff --git a/src/eth/rpc/mod.rs b/src/eth/rpc/mod.rs index 61eb52f50..62591fa9c 100644 --- a/src/eth/rpc/mod.rs +++ b/src/eth/rpc/mod.rs @@ -1,14 +1,18 @@ //! Ethereum JSON-RPC server. +mod rpc_client_app; mod rpc_context; mod rpc_error; +mod rpc_http_middleware; mod rpc_middleware; mod rpc_parser; mod rpc_server; mod rpc_subscriptions; +use rpc_client_app::RpcClientApp; use rpc_context::RpcContext; pub use rpc_error::RpcError; +use rpc_http_middleware::RpcHttpMiddleware; use rpc_middleware::RpcMiddleware; use rpc_parser::next_rpc_param; use rpc_parser::next_rpc_param_or_default; diff --git a/src/eth/rpc/rpc_client_app.rs b/src/eth/rpc/rpc_client_app.rs new file mode 100644 index 000000000..3bb9271cb --- /dev/null +++ b/src/eth/rpc/rpc_client_app.rs @@ -0,0 +1,20 @@ +use std::fmt::Display; + +#[derive(Debug, Clone, Default)] +pub enum RpcClientApp { + /// Client application identified itself. + Identified(String), + + /// Client application is unknown. + #[default] + Unknown, +} + +impl Display for RpcClientApp { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RpcClientApp::Identified(name) => write!(f, "{}", name), + RpcClientApp::Unknown => write!(f, "unknown"), + } + } +} diff --git a/src/eth/rpc/rpc_http_middleware.rs b/src/eth/rpc/rpc_http_middleware.rs new file mode 100644 index 000000000..1d244ff33 --- /dev/null +++ b/src/eth/rpc/rpc_http_middleware.rs @@ -0,0 +1,58 @@ +use core::future::Future; +use core::pin::Pin; +use std::collections::HashMap; + +use futures::TryFutureExt; +use jsonrpsee::client_transport::ws::Uri; +use jsonrpsee::core::BoxError; +use jsonrpsee::server::HttpBody; +use jsonrpsee::server::HttpRequest; +use jsonrpsee::server::HttpResponse; +use tower::Service; + +use crate::eth::rpc::RpcClientApp; + +#[derive(Debug, Clone, derive_new::new)] +pub struct RpcHttpMiddleware { + service: S, +} + +impl Service> for RpcHttpMiddleware +where + S: Service, + S::Error: Into + 'static, + S::Future: Send + 'static, +{ + type Response = S::Response; + type Error = BoxError; + type Future = Pin> + Send + 'static>>; + + fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + self.service.poll_ready(cx).map_err(Into::into) + } + + fn call(&mut self, mut request: HttpRequest) -> Self::Future { + let client_app = parse_client_app(request.uri()); + request.extensions_mut().insert(client_app); + + Box::pin(self.service.call(request).map_err(Into::into)) + } +} + +/// Extracts the client application name from the `app` query parameter. +fn parse_client_app(uri: &Uri) -> RpcClientApp { + let Some(query_params_str) = uri.query() else { return RpcClientApp::Unknown }; + + let query_params: HashMap = match serde_urlencoded::from_str(query_params_str) { + Ok(url) => url, + Err(e) => { + tracing::error!(reason = ?e, "failed to parse http request query parameters"); + return RpcClientApp::Unknown; + } + }; + + match query_params.get("app") { + Some(app) => RpcClientApp::Identified(app.to_owned()), + None => RpcClientApp::Unknown, + } +} diff --git a/src/eth/rpc/rpc_middleware.rs b/src/eth/rpc/rpc_middleware.rs index d34218c00..d4f851134 100644 --- a/src/eth/rpc/rpc_middleware.rs +++ b/src/eth/rpc/rpc_middleware.rs @@ -24,6 +24,7 @@ use crate::eth::primitives::SoliditySignature; use crate::eth::primitives::TransactionInput; use crate::eth::rpc::next_rpc_param; use crate::eth::rpc::parse_rpc_rlp; +use crate::eth::rpc::RpcClientApp; use crate::if_else; #[cfg(feature = "metrics")] use crate::infra::metrics; @@ -55,8 +56,12 @@ impl<'a> RpcServiceT<'a> for RpcMiddleware { _ => None, }; + // extract client app + let client = request.extensions().get::().unwrap_or(&RpcClientApp::Unknown).clone(); + // trace request tracing::info!( + %client, id = %request.id, %method, function = %function.clone().unwrap_or_default(), @@ -73,6 +78,7 @@ impl<'a> RpcServiceT<'a> for RpcMiddleware { } RpcResponse { + client, id: request.id.to_string(), method: method.to_string(), function, @@ -103,6 +109,7 @@ pub struct RpcResponse<'a> { #[pin] future_response: ResponseFuture>, + client: RpcClientApp, id: String, method: String, function: Option, @@ -125,6 +132,7 @@ impl<'a> Future for RpcResponse<'a> { let response_success = response.is_success(); let response_result = response.as_result(); tracing::info!( + client = %proj.client, id = %proj.id, method = %proj.method, function = %proj.function.clone().unwrap_or_default(), diff --git a/src/eth/rpc/rpc_server.rs b/src/eth/rpc/rpc_server.rs index a691edcdc..daafe5378 100644 --- a/src/eth/rpc/rpc_server.rs +++ b/src/eth/rpc/rpc_server.rs @@ -39,6 +39,7 @@ use crate::eth::rpc::rpc_internal_error; use crate::eth::rpc::rpc_invalid_params_error; use crate::eth::rpc::RpcContext; use crate::eth::rpc::RpcError; +use crate::eth::rpc::RpcHttpMiddleware; use crate::eth::rpc::RpcMiddleware; use crate::eth::rpc::RpcSubscriptions; use crate::eth::storage::StratusStorage; @@ -98,6 +99,7 @@ pub async fn serve_rpc( // configure middleware let rpc_middleware = RpcServiceBuilder::new().layer_fn(RpcMiddleware::new); let http_middleware = tower::ServiceBuilder::new() + .layer_fn(RpcHttpMiddleware::new) .layer(ProxyGetRequestLayer::new("/startup", "stratus_startup").unwrap()) .layer(ProxyGetRequestLayer::new("/readiness", "stratus_readiness").unwrap()) .layer(ProxyGetRequestLayer::new("/liveness", "stratus_liveness").unwrap()); From bea170029b6ace4ba0203177e4b9591a81fdbbe6 Mon Sep 17 00:00:00 2001 From: renancloudwalk <53792026+renancloudwalk@users.noreply.github.com> Date: Sat, 8 Jun 2024 13:05:31 -0300 Subject: [PATCH 3/6] fix: health and readiness probe (#1038) it was not returning the actual status code --- src/eth/rpc/rpc_server.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/eth/rpc/rpc_server.rs b/src/eth/rpc/rpc_server.rs index daafe5378..585d1eaa9 100644 --- a/src/eth/rpc/rpc_server.rs +++ b/src/eth/rpc/rpc_server.rs @@ -240,11 +240,20 @@ async fn stratus_startup(_: Params<'_>, _: Arc, _: Extensions) -> an async fn stratus_readiness(_: Params<'_>, context: Arc, _: Extensions) -> anyhow::Result { let should_serve = context.consensus.should_serve().await; tracing::info!("stratus_readiness: {}", should_serve); - Ok(json!(should_serve)) + + if should_serve { + Ok(json!(true)) + } else { + Err(RpcError::Response(rpc_internal_error("Service Not Ready".to_string()))) + } } async fn stratus_liveness(_: Params<'_>, _: Arc, _: Extensions) -> anyhow::Result { - Ok(json!(true)) + if !GlobalState::is_shutdown() { + Ok(json!(true)) + } else { + Err(RpcError::Response(rpc_internal_error("Service Unhealthy".to_string()))) + } } // ----------------------------------------------------------------------------- From 5fd57d89d0d6ff1cbe02b2d7907c7df0a54350d6 Mon Sep 17 00:00:00 2001 From: renancloudwalk <53792026+renancloudwalk@users.noreply.github.com> Date: Sat, 8 Jun 2024 13:38:13 -0300 Subject: [PATCH 4/6] fix: discovery delay for new pods (#1040) --- src/eth/consensus/mod.rs | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index 59fd4fa6b..e87ce3574 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -62,6 +62,7 @@ use crate::eth::primitives::Block; use crate::infra::metrics; const RETRY_DELAY: Duration = Duration::from_millis(10); +const PEER_DISCOVERY_DELAY: Duration = Duration::from_secs(30); #[derive(Clone, Debug, PartialEq)] enum Role { @@ -149,7 +150,7 @@ pub struct Consensus { storage: Arc, peers: Arc>>, direct_peers: Vec, - voted_for: Mutex>, + voted_for: Mutex>, //essential to ensure that a server only votes once per term current_term: AtomicU64, last_arrived_block_number: AtomicU64, //TODO use a true index for both executions and blocks, currently we use something like Bully algorithm so block number is fine role: RwLock, @@ -204,10 +205,21 @@ impl Consensus { /// Initializes the heartbeat and election timers. /// This function periodically checks if the node should start a new election based on the election timeout. /// The timer is reset when an `AppendEntries` request is received, ensuring the node remains a follower if a leader is active. + /// + /// When there are healthy peers we need to wait for the grace period of discovery + /// to avoid starting an election too soon (due to the leader not being discovered yet) fn initialize_heartbeat_timer(consensus: Arc) { named_spawn("consensus::heartbeat_timer", async move { + if consensus.peers.read().await.is_empty() { + tracing::info!("no peers, starting hearbeat timer immediately"); + Self::start_election(Arc::clone(&consensus)).await; + } else { + traced_sleep(PEER_DISCOVERY_DELAY, SleepReason::Interval).await; + tracing::info!("waiting for peer discovery grace period"); + } + + let timeout = consensus.heartbeat_timeout; loop { - let timeout = consensus.heartbeat_timeout; tokio::select! { _ = traced_sleep(timeout, SleepReason::Interval) => { if !consensus.is_leader().await { @@ -324,7 +336,7 @@ impl Consensus { fn initialize_periodic_peer_discovery(consensus: Arc) { named_spawn("consensus::peer_discovery", async move { - let mut interval = tokio::time::interval(Duration::from_secs(30)); + let mut interval = tokio::time::interval(PEER_DISCOVERY_DELAY); loop { tracing::info!("starting periodic peer discovery"); Self::discover_peers(Arc::clone(&consensus)).await; From 3c98711fc647f5456fd7a92eab3c7aecc7ecfdde Mon Sep 17 00:00:00 2001 From: renancloudwalk <53792026+renancloudwalk@users.noreply.github.com> Date: Sat, 8 Jun 2024 14:27:28 -0300 Subject: [PATCH 5/6] fix: remove term from peers (#1041) --- src/eth/consensus/mod.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index e87ce3574..d64e7ef11 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -137,7 +137,6 @@ struct Peer { match_index: u64, next_index: u64, role: Role, - term: u64, receiver: Arc>>, } @@ -542,7 +541,6 @@ impl Consensus { match_index: 0, next_index: 0, role: Role::Follower, // FIXME it won't be always follower, we need to check the leader or candidates - term: 0, // Replace with actual term receiver: Arc::new(Mutex::new(consensus.broadcast_sender.subscribe())), }; peers.push((peer_address.clone(), peer)); @@ -587,7 +585,6 @@ impl Consensus { match_index: 0, next_index: 0, role: Role::Follower, //FIXME it wont be always follower, we need to check the leader or candidates - term: 0, // Replace with actual term receiver: Arc::new(Mutex::new(consensus.broadcast_sender.subscribe())), }; peers.push((PeerAddress::new(address, jsonrpc_port, grpc_port), peer)); @@ -663,7 +660,7 @@ impl Consensus { #[cfg(feature = "metrics")] metrics::inc_append_entries(start.elapsed()); - tracing::info!(match_index = peer.match_index, next_index = peer.next_index, role = ?peer.role, term = peer.term, "current follower state on election"); //TODO also move this to metrics + tracing::info!(match_index = peer.match_index, next_index = peer.next_index, role = ?peer.role, "current follower state on election"); //TODO also move this to metrics match StatusCode::try_from(response.status) { Ok(StatusCode::AppendSuccess) => Ok(()), From 525a76a9f9f6c8379331940177cdcb262d7ad48f Mon Sep 17 00:00:00 2001 From: renancloudwalk <53792026+renancloudwalk@users.noreply.github.com> Date: Sun, 9 Jun 2024 11:53:28 -0300 Subject: [PATCH 6/6] Fix kube discovery (#1043) * chore: prepare discovery for sepukku if it lose its peers to prevent split brain * fix: discovery * lint --- Cargo.toml | 2 +- docker/Dockerfile.run_stratus | 2 +- docker/Dockerfile.run_with_importer | 2 +- src/eth/consensus/mod.rs | 42 ++++++++++++++++++++++++++--- 4 files changed, 41 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 147f64f20..6701ac09c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -174,7 +174,7 @@ path = "src/bin/relayer.rs" # ------------------------------------------------------------------------------ [features] -default = ["metrics", "tracing", "rocks", "kubernetes"] +default = ["metrics", "tracing", "rocks"] # Application is running in develoment mode. dev = [] diff --git a/docker/Dockerfile.run_stratus b/docker/Dockerfile.run_stratus index c03d667b1..6d9f422ed 100644 --- a/docker/Dockerfile.run_stratus +++ b/docker/Dockerfile.run_stratus @@ -19,7 +19,7 @@ ENV CARGO_PROFILE_RELEASE_DEBUG=1 ENV LOG_FORMAT=json ENV NO_COLOR=1 -RUN cargo build --release --bin stratus --features metrics +RUN cargo build --release --bin stratus --features metrics,kubernetes # Runtime diff --git a/docker/Dockerfile.run_with_importer b/docker/Dockerfile.run_with_importer index 661049eaa..b9f03bb01 100644 --- a/docker/Dockerfile.run_with_importer +++ b/docker/Dockerfile.run_with_importer @@ -18,7 +18,7 @@ RUN apt-get install -y libclang-dev cmake protobuf-compiler ENV CARGO_PROFILE_RELEASE_DEBUG=1 ENV LOG_FORMAT=json ENV NO_COLOR=1 -RUN cargo build --release --bin run-with-importer --features metrics,rocks +RUN cargo build --release --bin run-with-importer --features metrics,rocks,kubernetes # Runtime FROM rust:1.75 as runtime diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index d64e7ef11..976b39143 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -1,6 +1,7 @@ pub mod forward_to; use std::collections::HashMap; +#[cfg(feature = "kubernetes")] use std::env; use std::net::UdpSocket; use std::sync::atomic::AtomicU64; @@ -24,6 +25,8 @@ use tokio::sync::mpsc::{self}; use tokio::sync::Mutex; use tokio::sync::RwLock; use tokio::task::JoinHandle; +#[cfg(feature = "kubernetes")] +use tokio::time::sleep; use tonic::transport::Channel; use tonic::transport::Server; use tonic::Request; @@ -438,11 +441,13 @@ impl Consensus { (last_arrived_block_number - 2) <= storage_block_number } + #[cfg(feature = "kubernetes")] fn current_node() -> Option { let pod_name = env::var("MY_POD_NAME").ok()?; Some(pod_name.trim().to_string()) } + #[cfg(feature = "kubernetes")] fn current_namespace() -> Option { let namespace = env::var("NAMESPACE").ok()?; Some(namespace.trim().to_string()) @@ -477,12 +482,41 @@ impl Consensus { let mut new_peers: Vec<(PeerAddress, Peer)> = Vec::new(); #[cfg(feature = "kubernetes")] - if let Ok(k8s_peers) = Self::discover_peers_kubernetes(Arc::clone(&consensus)).await { - new_peers.extend(k8s_peers); + { + let mut attempts = 0; + let max_attempts = 100; + + while attempts < max_attempts { + match Self::discover_peers_kubernetes(Arc::clone(&consensus)).await { + Ok(k8s_peers) => { + new_peers.extend(k8s_peers); + tracing::info!("discovered {} peers from kubernetes", new_peers.len()); + break; + } + Err(e) => { + attempts += 1; + tracing::warn!("failed to discover peers from Kubernetes (attempt {}/{}): {:?}", attempts, max_attempts, e); + + if attempts >= max_attempts { + tracing::error!("exceeded maximum attempts to discover peers from kubernetes. initiating shutdown."); + GlobalState::shutdown_from("consensus", "failed to discover peers from Kubernetes"); + } + + // Optionally, sleep for a bit before retrying + sleep(Duration::from_millis(100)).await; + } + } + } } - if let Ok(env_peers) = Self::discover_peers_env(&consensus.direct_peers, Arc::clone(&consensus)).await { - new_peers.extend(env_peers); + match Self::discover_peers_env(&consensus.direct_peers, Arc::clone(&consensus)).await { + Ok(env_peers) => { + tracing::info!("discovered {} peers from env", env_peers.len()); + new_peers.extend(env_peers); + } + Err(e) => { + tracing::warn!("failed to discover peers from env: {:?}", e); + } } let mut peers_lock = consensus.peers.write().await;