diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index 7824c816a..9ec5da783 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -20,15 +20,15 @@ use std::time::Duration; use anyhow::anyhow; use rand::Rng; -use server::AppendEntryServiceImpl; use tokio::sync::broadcast; use tokio::sync::Mutex; use tokio::sync::RwLock; use tokio::task::JoinHandle; -use tonic::transport::Server; use tonic::Request; use crate::eth::primitives::Hash; +#[allow(unused_imports)] +use crate::eth::primitives::TransactionExecution; use crate::eth::storage::StratusStorage; use crate::ext::spawn_named; use crate::ext::traced_sleep; @@ -43,14 +43,12 @@ pub mod append_entry { #[allow(unused_imports)] use append_entry::append_entry_service_client::AppendEntryServiceClient; use append_entry::append_entry_service_server::AppendEntryService; -use append_entry::append_entry_service_server::AppendEntryServiceServer; use append_entry::RequestVoteRequest; use append_entry::TransactionExecutionEntry; use self::append_log_entries_storage::AppendLogEntriesStorage; use self::log_entry::LogEntryData; use super::primitives::Bytes; -use super::primitives::TransactionExecution; use crate::config::RunWithImporterConfig; use crate::eth::miner::Miner; use crate::eth::primitives::Block; @@ -216,9 +214,9 @@ impl Consensus { //TODO replace this for a synchronous call let rx_pending_txs: broadcast::Receiver = miner.notifier_pending_txs.subscribe(); let rx_blocks: broadcast::Receiver = miner.notifier_blocks.subscribe(); - Self::initialize_transaction_execution_queue(Arc::clone(&consensus)); - Self::initialize_append_entries_channel(Arc::clone(&consensus), rx_pending_txs, rx_blocks); - Self::initialize_server(Arc::clone(&consensus)); + propagation::initialize_transaction_execution_queue(Arc::clone(&consensus)); + propagation::initialize_append_entries_channel(Arc::clone(&consensus), rx_pending_txs, rx_blocks); + server::initialize_server(Arc::clone(&consensus)); } Self::initialize_heartbeat_timer(Arc::clone(&consensus)); @@ -437,95 +435,6 @@ impl Consensus { }); } - #[allow(dead_code)] - fn initialize_transaction_execution_queue(consensus: Arc) { - // XXX FIXME: deal with the scenario where a transactionHash arrives after the block; - // in this case, before saving the block LogEntry, it should ALWAYS wait for all transaction hashes - - const TASK_NAME: &str = "consensus::transaction_execution_queue"; - - spawn_named(TASK_NAME, async move { - let interval = Duration::from_millis(40); - loop { - if GlobalState::is_shutdown_warn(TASK_NAME) { - return; - }; - - tokio::time::sleep(interval).await; - - propagation::handle_transaction_executions(Arc::clone(&consensus)).await; - } - }); - } - - /// This channel broadcasts blocks and transactons executions to followers. - /// Each follower has a queue of blocks and transactions to be sent at handle_peer_propagation. - //TODO this broadcast needs to wait for majority of followers to confirm the log before sending the next one - #[allow(dead_code)] - fn initialize_append_entries_channel( - consensus: Arc, - mut rx_pending_txs: broadcast::Receiver, - mut rx_blocks: broadcast::Receiver, - ) { - const TASK_NAME: &str = "consensus::block_and_executions_sender"; - spawn_named(TASK_NAME, async move { - loop { - tokio::select! { - _ = GlobalState::wait_shutdown_warn(TASK_NAME) => { - return; - }, - Ok(tx) = rx_pending_txs.recv() => { - tracing::debug!("Attempting to receive transaction execution"); - if Self::is_leader() { - tracing::info!(tx_hash = %tx.hash(), "received transaction execution to send to followers"); - if tx.is_local() { - tracing::debug!(tx_hash = %tx.hash(), "skipping local transaction because only external transactions are supported for now"); - continue; - } - - let transaction = vec![tx.to_append_entry_transaction()]; - let transaction_entry = LogEntryData::TransactionExecutionEntries(transaction); - if Arc::clone(&consensus).broadcast_sender.send(transaction_entry).is_err() { - tracing::debug!("failed to broadcast transaction"); - } - } - }, - Ok(block) = rx_blocks.recv() => { - propagation::handle_block_entry(Arc::clone(&consensus), block).await; - }, - else => { - tokio::task::yield_now().await; - }, - } - } - }); - } - - #[allow(dead_code)] - fn initialize_server(consensus: Arc) { - const TASK_NAME: &str = "consensus::server"; - spawn_named(TASK_NAME, async move { - tracing::info!("Starting append entry service at address: {}", consensus.grpc_address); - let addr = consensus.grpc_address; - - let append_entry_service = AppendEntryServiceImpl { - consensus: Mutex::new(consensus), - }; - - let shutdown = GlobalState::wait_shutdown_warn(TASK_NAME); - - let server = Server::builder() - .add_service(AppendEntryServiceServer::new(append_entry_service)) - .serve_with_shutdown(addr, shutdown) - .await; - - if let Err(e) = server { - let message = GlobalState::shutdown_from("consensus", &format!("failed to create server at {}", addr)); - tracing::error!(reason = ?e, %message); - } - }); - } - fn set_role(&self, role: Role) { if ROLE.load(Ordering::SeqCst) == role as u8 { tracing::info!(role = ?role, "role remains the same"); diff --git a/src/eth/consensus/propagation.rs b/src/eth/consensus/propagation.rs index 8aaeb1e62..6114bbc01 100644 --- a/src/eth/consensus/propagation.rs +++ b/src/eth/consensus/propagation.rs @@ -4,6 +4,7 @@ use std::time::Duration; use anyhow::anyhow; use anyhow::Result; +use tokio::sync::broadcast; use tonic::Request; use super::Block; @@ -15,6 +16,8 @@ use crate::eth::consensus::append_entry::AppendBlockCommitResponse; use crate::eth::consensus::append_entry::AppendTransactionExecutionsRequest; use crate::eth::consensus::append_entry::AppendTransactionExecutionsResponse; use crate::eth::consensus::append_entry::StatusCode; +use crate::eth::primitives::TransactionExecution; +use crate::ext::spawn_named; use crate::ext::traced_sleep; use crate::ext::SleepReason; use crate::GlobalState; @@ -299,3 +302,66 @@ async fn send_append_entry_request( Ok(response) } + +#[allow(dead_code)] +pub fn initialize_transaction_execution_queue(consensus: Arc) { + // XXX FIXME: deal with the scenario where a transactionHash arrives after the block; + // in this case, before saving the block LogEntry, it should ALWAYS wait for all transaction hashes + + const TASK_NAME: &str = "consensus::transaction_execution_queue"; + + spawn_named(TASK_NAME, async move { + let interval = Duration::from_millis(40); + loop { + if GlobalState::is_shutdown_warn(TASK_NAME) { + return; + }; + + tokio::time::sleep(interval).await; + + handle_transaction_executions(Arc::clone(&consensus)).await; + } + }); +} + +/// This channel broadcasts blocks and transactons executions to followers. +/// Each follower has a queue of blocks and transactions to be sent at handle_peer_propagation. +//TODO this broadcast needs to wait for majority of followers to confirm the log before sending the next one +#[allow(dead_code)] +pub fn initialize_append_entries_channel( + consensus: Arc, + mut rx_pending_txs: broadcast::Receiver, + mut rx_blocks: broadcast::Receiver, +) { + const TASK_NAME: &str = "consensus::block_and_executions_sender"; + spawn_named(TASK_NAME, async move { + loop { + tokio::select! { + _ = GlobalState::wait_shutdown_warn(TASK_NAME) => { + return; + }, + Ok(tx) = rx_pending_txs.recv() => { + if Consensus::is_leader() { + tracing::info!(tx_hash = %tx.hash(), "received transaction execution to send to followers"); + if tx.is_local() { + tracing::debug!(tx_hash = %tx.hash(), "skipping local transaction because only external transactions are supported for now"); + continue; + } + + let transaction = vec![tx.to_append_entry_transaction()]; + let transaction_entry = LogEntryData::TransactionExecutionEntries(transaction); + if Arc::clone(&consensus).broadcast_sender.send(transaction_entry).is_err() { + tracing::debug!("failed to broadcast transaction"); + } + } + }, + Ok(block) = rx_blocks.recv() => { + handle_block_entry(Arc::clone(&consensus), block).await; + }, + else => { + tokio::task::yield_now().await; + }, + } + } + }); +} diff --git a/src/eth/consensus/server.rs b/src/eth/consensus/server.rs index 739d7b34e..6a054d839 100644 --- a/src/eth/consensus/server.rs +++ b/src/eth/consensus/server.rs @@ -5,6 +5,7 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use tokio::sync::Mutex; +use tonic::transport::Server; use tonic::Request; use tonic::Response; use tonic::Status; @@ -16,6 +17,7 @@ use super::append_entry::AppendTransactionExecutionsResponse; use super::append_entry::RequestVoteRequest; use super::append_entry::RequestVoteResponse; use super::append_entry::StatusCode; +use crate::eth::consensus::append_entry::append_entry_service_server::AppendEntryServiceServer; use crate::eth::consensus::AppendEntryService; use crate::eth::consensus::LogEntryData; use crate::eth::consensus::PeerAddress; @@ -24,8 +26,10 @@ use crate::eth::miner::block_from_propagation; use crate::eth::primitives::LocalTransactionExecution; use crate::eth::primitives::TransactionExecution; use crate::eth::Consensus; +use crate::ext::spawn_named; #[cfg(feature = "metrics")] use crate::infra::metrics; +use crate::GlobalState; #[cfg(feature = "metrics")] mod label { @@ -34,6 +38,31 @@ mod label { pub(super) const REQUEST_VOTE: &str = "request_vote"; } +#[allow(dead_code)] +pub fn initialize_server(consensus: Arc) { + const TASK_NAME: &str = "consensus::server"; + spawn_named(TASK_NAME, async move { + tracing::info!("Starting append entry service at address: {}", consensus.grpc_address); + let addr = consensus.grpc_address; + + let append_entry_service = AppendEntryServiceImpl { + consensus: Mutex::new(consensus), + }; + + let shutdown = GlobalState::wait_shutdown_warn(TASK_NAME); + + let server = Server::builder() + .add_service(AppendEntryServiceServer::new(append_entry_service)) + .serve_with_shutdown(addr, shutdown) + .await; + + if let Err(e) = server { + let message = GlobalState::shutdown_from("consensus", &format!("failed to create server at {}", addr)); + tracing::error!(reason = ?e, %message); + } + }); +} + pub struct AppendEntryServiceImpl { pub consensus: Mutex>, } diff --git a/src/eth/consensus/tests/test_simple_blocks.rs b/src/eth/consensus/tests/test_simple_blocks.rs index ce75c9c20..ecfcdedf1 100644 --- a/src/eth/consensus/tests/test_simple_blocks.rs +++ b/src/eth/consensus/tests/test_simple_blocks.rs @@ -13,7 +13,7 @@ use crate::eth::consensus::append_entry::append_entry_service_server::AppendEntr use crate::eth::consensus::append_entry::AppendBlockCommitRequest; use crate::eth::consensus::append_entry::AppendTransactionExecutionsRequest; use crate::eth::consensus::append_entry::StatusCode; -use crate::eth::consensus::AppendEntryServiceImpl; +use crate::eth::consensus::server::AppendEntryServiceImpl; use crate::eth::consensus::Role; use crate::eth::consensus::TransactionExecutionEntry; use crate::eth::primitives::Address;