Skip to content

Commit

Permalink
Merge branch 'main' into cf_poc
Browse files Browse the repository at this point in the history
  • Loading branch information
carneiro-cw authored Jun 9, 2024
2 parents d81faa2 + 525a76a commit d1e7e44
Show file tree
Hide file tree
Showing 14 changed files with 367 additions and 252 deletions.
332 changes: 151 additions & 181 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ display_json = "=0.2.1"
prost = "=0.12.6"
serde = "=1.0.203"
serde_json = "=1.0.117"
serde_urlencoded = "=0.7.1"
serde_with = "=3.8.1"

# parallelism
Expand All @@ -71,7 +72,7 @@ rlp = "=0.5.2"
triehash = "=0.8.4"

# network
jsonrpsee = { version = "=0.22.5", features = ["server", "client"] }
jsonrpsee = { version = "=0.23.0", features = ["server", "client"] }
k8s-openapi = { version = "=0.21.1", optional = true, features = ["v1_27"] }
kube = { version = "=0.90.0", optional = true, features = ["runtime", "derive"] }
raft = { version = "=0.7.0", optional = true }
Expand Down Expand Up @@ -176,7 +177,7 @@ path = "src/bin/relayer.rs"
# ------------------------------------------------------------------------------

[features]
default = ["metrics", "tracing", "rocks", "kubernetes"]
default = ["metrics", "tracing", "rocks"]

# Application is running in develoment mode.
dev = []
Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfile.run_stratus
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ ENV CARGO_PROFILE_RELEASE_DEBUG=1
ENV LOG_FORMAT=json
ENV NO_COLOR=1

RUN cargo build --release --bin stratus --features metrics
RUN cargo build --release --bin stratus --features metrics,kubernetes


# Runtime
Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfile.run_with_importer
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ RUN apt-get install -y libclang-dev cmake protobuf-compiler
ENV CARGO_PROFILE_RELEASE_DEBUG=1
ENV LOG_FORMAT=json
ENV NO_COLOR=1
RUN cargo build --release --bin run-with-importer --features metrics,rocks
RUN cargo build --release --bin run-with-importer --features metrics,rocks,kubernetes

# Runtime
FROM rust:1.75 as runtime
Expand Down
2 changes: 1 addition & 1 deletion e2e-contracts/integration/hardhat.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const config: HardhatUserConfig = {
},
},
stratus: {
url: "http://localhost:3000",
url: "http://localhost:3000?app=e2e",
accounts: {
mnemonic: ACCOUNTS_MNEMONIC,
},
Expand Down
2 changes: 1 addition & 1 deletion e2e-contracts/integration/test/helpers/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export let ETHERJS = new JsonRpcProvider(
export function updateProviderUrl(providerName: string) {
switch (providerName) {
case 'stratus':
providerUrl = 'http://localhost:3000';
providerUrl = 'http://localhost:3000?app=e2e';
break;
case 'hardhat':
providerUrl = 'http://localhost:8545';
Expand Down
2 changes: 1 addition & 1 deletion e2e/hardhat.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const config: HardhatUserConfig = {
},
},
stratus: {
url: "http://localhost:3000",
url: "http://localhost:3000?app=e2e",
accounts: {
mnemonic: ACCOUNTS_MNEMONIC,
},
Expand Down
65 changes: 54 additions & 11 deletions src/eth/consensus/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod forward_to;

use std::collections::HashMap;
#[cfg(feature = "kubernetes")]
use std::env;
use std::net::UdpSocket;
use std::sync::atomic::AtomicU64;
Expand All @@ -24,6 +25,8 @@ use tokio::sync::mpsc::{self};
use tokio::sync::Mutex;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
#[cfg(feature = "kubernetes")]
use tokio::time::sleep;
use tonic::transport::Channel;
use tonic::transport::Server;
use tonic::Request;
Expand Down Expand Up @@ -62,6 +65,7 @@ use crate::eth::primitives::Block;
use crate::infra::metrics;

const RETRY_DELAY: Duration = Duration::from_millis(10);
const PEER_DISCOVERY_DELAY: Duration = Duration::from_secs(30);

#[derive(Clone, Debug, PartialEq)]
enum Role {
Expand Down Expand Up @@ -136,7 +140,6 @@ struct Peer {
match_index: u64,
next_index: u64,
role: Role,
term: u64,
receiver: Arc<Mutex<broadcast::Receiver<Block>>>,
}

Expand All @@ -149,7 +152,7 @@ pub struct Consensus {
storage: Arc<StratusStorage>,
peers: Arc<RwLock<HashMap<PeerAddress, PeerTuple>>>,
direct_peers: Vec<String>,
voted_for: Mutex<Option<PeerAddress>>,
voted_for: Mutex<Option<PeerAddress>>, //essential to ensure that a server only votes once per term
current_term: AtomicU64,
last_arrived_block_number: AtomicU64, //TODO use a true index for both executions and blocks, currently we use something like Bully algorithm so block number is fine
role: RwLock<Role>,
Expand Down Expand Up @@ -205,10 +208,21 @@ impl Consensus {
/// Initializes the heartbeat and election timers.
/// This function periodically checks if the node should start a new election based on the election timeout.
/// The timer is reset when an `AppendEntries` request is received, ensuring the node remains a follower if a leader is active.
///
/// When there are healthy peers we need to wait for the grace period of discovery
/// to avoid starting an election too soon (due to the leader not being discovered yet)
fn initialize_heartbeat_timer(consensus: Arc<Consensus>) {
named_spawn("consensus::heartbeat_timer", async move {
if consensus.peers.read().await.is_empty() {
tracing::info!("no peers, starting hearbeat timer immediately");
Self::start_election(Arc::clone(&consensus)).await;
} else {
traced_sleep(PEER_DISCOVERY_DELAY, SleepReason::Interval).await;
tracing::info!("waiting for peer discovery grace period");
}

let timeout = consensus.heartbeat_timeout;
loop {
let timeout = consensus.heartbeat_timeout;
tokio::select! {
_ = traced_sleep(timeout, SleepReason::Interval) => {
if !consensus.is_leader().await {
Expand Down Expand Up @@ -325,7 +339,7 @@ impl Consensus {

fn initialize_periodic_peer_discovery(consensus: Arc<Consensus>) {
named_spawn("consensus::peer_discovery", async move {
let mut interval = tokio::time::interval(Duration::from_secs(30));
let mut interval = tokio::time::interval(PEER_DISCOVERY_DELAY);
loop {
tracing::info!("starting periodic peer discovery");
Self::discover_peers(Arc::clone(&consensus)).await;
Expand Down Expand Up @@ -428,11 +442,13 @@ impl Consensus {
(last_arrived_block_number - 2) <= storage_block_number
}

#[cfg(feature = "kubernetes")]
fn current_node() -> Option<String> {
let pod_name = env::var("MY_POD_NAME").ok()?;
Some(pod_name.trim().to_string())
}

#[cfg(feature = "kubernetes")]
fn current_namespace() -> Option<String> {
let namespace = env::var("NAMESPACE").ok()?;
Some(namespace.trim().to_string())
Expand Down Expand Up @@ -468,12 +484,41 @@ impl Consensus {
let mut new_peers: Vec<(PeerAddress, Peer)> = Vec::new();

#[cfg(feature = "kubernetes")]
if let Ok(k8s_peers) = Self::discover_peers_kubernetes(Arc::clone(&consensus)).await {
new_peers.extend(k8s_peers);
{
let mut attempts = 0;
let max_attempts = 100;

while attempts < max_attempts {
match Self::discover_peers_kubernetes(Arc::clone(&consensus)).await {
Ok(k8s_peers) => {
new_peers.extend(k8s_peers);
tracing::info!("discovered {} peers from kubernetes", new_peers.len());
break;
}
Err(e) => {
attempts += 1;
tracing::warn!("failed to discover peers from Kubernetes (attempt {}/{}): {:?}", attempts, max_attempts, e);

if attempts >= max_attempts {
tracing::error!("exceeded maximum attempts to discover peers from kubernetes. initiating shutdown.");
GlobalState::shutdown_from("consensus", "failed to discover peers from Kubernetes");
}

// Optionally, sleep for a bit before retrying
sleep(Duration::from_millis(100)).await;
}
}
}
}

if let Ok(env_peers) = Self::discover_peers_env(&consensus.direct_peers, Arc::clone(&consensus)).await {
new_peers.extend(env_peers);
match Self::discover_peers_env(&consensus.direct_peers, Arc::clone(&consensus)).await {
Ok(env_peers) => {
tracing::info!("discovered {} peers from env", env_peers.len());
new_peers.extend(env_peers);
}
Err(e) => {
tracing::warn!("failed to discover peers from env: {:?}", e);
}
}

let mut peers_lock = consensus.peers.write().await;
Expand Down Expand Up @@ -532,7 +577,6 @@ impl Consensus {
match_index: 0,
next_index: 0,
role: Role::Follower, // FIXME it won't be always follower, we need to check the leader or candidates
term: 0, // Replace with actual term
receiver: Arc::new(Mutex::new(consensus.broadcast_sender.subscribe())),
};
peers.push((peer_address.clone(), peer));
Expand Down Expand Up @@ -577,7 +621,6 @@ impl Consensus {
match_index: 0,
next_index: 0,
role: Role::Follower, //FIXME it wont be always follower, we need to check the leader or candidates
term: 0, // Replace with actual term
receiver: Arc::new(Mutex::new(consensus.broadcast_sender.subscribe())),
};
peers.push((PeerAddress::new(address, jsonrpc_port, grpc_port), peer));
Expand Down Expand Up @@ -653,7 +696,7 @@ impl Consensus {
#[cfg(feature = "metrics")]
metrics::inc_append_entries(start.elapsed());

tracing::info!(match_index = peer.match_index, next_index = peer.next_index, role = ?peer.role, term = peer.term, "current follower state on election"); //TODO also move this to metrics
tracing::info!(match_index = peer.match_index, next_index = peer.next_index, role = ?peer.role, "current follower state on election"); //TODO also move this to metrics

match StatusCode::try_from(response.status) {
Ok(StatusCode::AppendSuccess) => Ok(()),
Expand Down
4 changes: 4 additions & 0 deletions src/eth/rpc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
//! Ethereum JSON-RPC server.
mod rpc_client_app;
mod rpc_context;
mod rpc_error;
mod rpc_http_middleware;
mod rpc_middleware;
mod rpc_parser;
mod rpc_server;
mod rpc_subscriptions;

use rpc_client_app::RpcClientApp;
use rpc_context::RpcContext;
pub use rpc_error::RpcError;
use rpc_http_middleware::RpcHttpMiddleware;
use rpc_middleware::RpcMiddleware;
use rpc_parser::next_rpc_param;
use rpc_parser::next_rpc_param_or_default;
Expand Down
20 changes: 20 additions & 0 deletions src/eth/rpc/rpc_client_app.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use std::fmt::Display;

#[derive(Debug, Clone, Default)]
pub enum RpcClientApp {
/// Client application identified itself.
Identified(String),

/// Client application is unknown.
#[default]
Unknown,
}

impl Display for RpcClientApp {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RpcClientApp::Identified(name) => write!(f, "{}", name),
RpcClientApp::Unknown => write!(f, "unknown"),
}
}
}
58 changes: 58 additions & 0 deletions src/eth/rpc/rpc_http_middleware.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use core::future::Future;
use core::pin::Pin;
use std::collections::HashMap;

use futures::TryFutureExt;
use jsonrpsee::client_transport::ws::Uri;
use jsonrpsee::core::BoxError;
use jsonrpsee::server::HttpBody;
use jsonrpsee::server::HttpRequest;
use jsonrpsee::server::HttpResponse;
use tower::Service;

use crate::eth::rpc::RpcClientApp;

#[derive(Debug, Clone, derive_new::new)]
pub struct RpcHttpMiddleware<S> {
service: S,
}

impl<S> Service<HttpRequest<HttpBody>> for RpcHttpMiddleware<S>
where
S: Service<HttpRequest, Response = HttpResponse>,
S::Error: Into<BoxError> + 'static,
S::Future: Send + 'static,
{
type Response = S::Response;
type Error = BoxError;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send + 'static>>;

fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> std::task::Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx).map_err(Into::into)
}

fn call(&mut self, mut request: HttpRequest<HttpBody>) -> Self::Future {
let client_app = parse_client_app(request.uri());
request.extensions_mut().insert(client_app);

Box::pin(self.service.call(request).map_err(Into::into))
}
}

/// Extracts the client application name from the `app` query parameter.
fn parse_client_app(uri: &Uri) -> RpcClientApp {
let Some(query_params_str) = uri.query() else { return RpcClientApp::Unknown };

let query_params: HashMap<String, String> = match serde_urlencoded::from_str(query_params_str) {
Ok(url) => url,
Err(e) => {
tracing::error!(reason = ?e, "failed to parse http request query parameters");
return RpcClientApp::Unknown;
}
};

match query_params.get("app") {
Some(app) => RpcClientApp::Identified(app.to_owned()),
None => RpcClientApp::Unknown,
}
}
Loading

0 comments on commit d1e7e44

Please sign in to comment.