Skip to content

Commit

Permalink
Finish isolating raft (#1490)
Browse files Browse the repository at this point in the history
* chore: move more functions into propagation

* chore: move more propagation helpers out of consensus

* lint

* chore: isolate raft related methods

* lint
  • Loading branch information
renancloudwalk authored Jul 21, 2024
1 parent 9bce5d9 commit 288b510
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 97 deletions.
101 changes: 5 additions & 96 deletions src/eth/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ use std::time::Duration;

use anyhow::anyhow;
use rand::Rng;
use server::AppendEntryServiceImpl;
use tokio::sync::broadcast;
use tokio::sync::Mutex;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use tonic::transport::Server;
use tonic::Request;

use crate::eth::primitives::Hash;
#[allow(unused_imports)]
use crate::eth::primitives::TransactionExecution;
use crate::eth::storage::StratusStorage;
use crate::ext::spawn_named;
use crate::ext::traced_sleep;
Expand All @@ -43,14 +43,12 @@ pub mod append_entry {
#[allow(unused_imports)]
use append_entry::append_entry_service_client::AppendEntryServiceClient;
use append_entry::append_entry_service_server::AppendEntryService;
use append_entry::append_entry_service_server::AppendEntryServiceServer;
use append_entry::RequestVoteRequest;
use append_entry::TransactionExecutionEntry;

use self::append_log_entries_storage::AppendLogEntriesStorage;
use self::log_entry::LogEntryData;
use super::primitives::Bytes;
use super::primitives::TransactionExecution;
use crate::config::RunWithImporterConfig;
use crate::eth::miner::Miner;
use crate::eth::primitives::Block;
Expand Down Expand Up @@ -216,9 +214,9 @@ impl Consensus {
//TODO replace this for a synchronous call
let rx_pending_txs: broadcast::Receiver<TransactionExecution> = miner.notifier_pending_txs.subscribe();
let rx_blocks: broadcast::Receiver<Block> = miner.notifier_blocks.subscribe();
Self::initialize_transaction_execution_queue(Arc::clone(&consensus));
Self::initialize_append_entries_channel(Arc::clone(&consensus), rx_pending_txs, rx_blocks);
Self::initialize_server(Arc::clone(&consensus));
propagation::initialize_transaction_execution_queue(Arc::clone(&consensus));
propagation::initialize_append_entries_channel(Arc::clone(&consensus), rx_pending_txs, rx_blocks);
server::initialize_server(Arc::clone(&consensus));
}
Self::initialize_heartbeat_timer(Arc::clone(&consensus));

Expand Down Expand Up @@ -437,95 +435,6 @@ impl Consensus {
});
}

#[allow(dead_code)]
fn initialize_transaction_execution_queue(consensus: Arc<Consensus>) {
// XXX FIXME: deal with the scenario where a transactionHash arrives after the block;
// in this case, before saving the block LogEntry, it should ALWAYS wait for all transaction hashes

const TASK_NAME: &str = "consensus::transaction_execution_queue";

spawn_named(TASK_NAME, async move {
let interval = Duration::from_millis(40);
loop {
if GlobalState::is_shutdown_warn(TASK_NAME) {
return;
};

tokio::time::sleep(interval).await;

propagation::handle_transaction_executions(Arc::clone(&consensus)).await;
}
});
}

/// This channel broadcasts blocks and transactons executions to followers.
/// Each follower has a queue of blocks and transactions to be sent at handle_peer_propagation.
//TODO this broadcast needs to wait for majority of followers to confirm the log before sending the next one
#[allow(dead_code)]
fn initialize_append_entries_channel(
consensus: Arc<Consensus>,
mut rx_pending_txs: broadcast::Receiver<TransactionExecution>,
mut rx_blocks: broadcast::Receiver<Block>,
) {
const TASK_NAME: &str = "consensus::block_and_executions_sender";
spawn_named(TASK_NAME, async move {
loop {
tokio::select! {
_ = GlobalState::wait_shutdown_warn(TASK_NAME) => {
return;
},
Ok(tx) = rx_pending_txs.recv() => {
tracing::debug!("Attempting to receive transaction execution");
if Self::is_leader() {
tracing::info!(tx_hash = %tx.hash(), "received transaction execution to send to followers");
if tx.is_local() {
tracing::debug!(tx_hash = %tx.hash(), "skipping local transaction because only external transactions are supported for now");
continue;
}

let transaction = vec![tx.to_append_entry_transaction()];
let transaction_entry = LogEntryData::TransactionExecutionEntries(transaction);
if Arc::clone(&consensus).broadcast_sender.send(transaction_entry).is_err() {
tracing::debug!("failed to broadcast transaction");
}
}
},
Ok(block) = rx_blocks.recv() => {
propagation::handle_block_entry(Arc::clone(&consensus), block).await;
},
else => {
tokio::task::yield_now().await;
},
}
}
});
}

#[allow(dead_code)]
fn initialize_server(consensus: Arc<Consensus>) {
const TASK_NAME: &str = "consensus::server";
spawn_named(TASK_NAME, async move {
tracing::info!("Starting append entry service at address: {}", consensus.grpc_address);
let addr = consensus.grpc_address;

let append_entry_service = AppendEntryServiceImpl {
consensus: Mutex::new(consensus),
};

let shutdown = GlobalState::wait_shutdown_warn(TASK_NAME);

let server = Server::builder()
.add_service(AppendEntryServiceServer::new(append_entry_service))
.serve_with_shutdown(addr, shutdown)
.await;

if let Err(e) = server {
let message = GlobalState::shutdown_from("consensus", &format!("failed to create server at {}", addr));
tracing::error!(reason = ?e, %message);
}
});
}

fn set_role(&self, role: Role) {
if ROLE.load(Ordering::SeqCst) == role as u8 {
tracing::info!(role = ?role, "role remains the same");
Expand Down
66 changes: 66 additions & 0 deletions src/eth/consensus/propagation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::time::Duration;

use anyhow::anyhow;
use anyhow::Result;
use tokio::sync::broadcast;
use tonic::Request;

use super::Block;
Expand All @@ -15,6 +16,8 @@ use crate::eth::consensus::append_entry::AppendBlockCommitResponse;
use crate::eth::consensus::append_entry::AppendTransactionExecutionsRequest;
use crate::eth::consensus::append_entry::AppendTransactionExecutionsResponse;
use crate::eth::consensus::append_entry::StatusCode;
use crate::eth::primitives::TransactionExecution;
use crate::ext::spawn_named;
use crate::ext::traced_sleep;
use crate::ext::SleepReason;
use crate::GlobalState;
Expand Down Expand Up @@ -299,3 +302,66 @@ async fn send_append_entry_request(

Ok(response)
}

#[allow(dead_code)]
pub fn initialize_transaction_execution_queue(consensus: Arc<Consensus>) {
// XXX FIXME: deal with the scenario where a transactionHash arrives after the block;
// in this case, before saving the block LogEntry, it should ALWAYS wait for all transaction hashes

const TASK_NAME: &str = "consensus::transaction_execution_queue";

spawn_named(TASK_NAME, async move {
let interval = Duration::from_millis(40);
loop {
if GlobalState::is_shutdown_warn(TASK_NAME) {
return;
};

tokio::time::sleep(interval).await;

handle_transaction_executions(Arc::clone(&consensus)).await;
}
});
}

/// This channel broadcasts blocks and transactons executions to followers.
/// Each follower has a queue of blocks and transactions to be sent at handle_peer_propagation.
//TODO this broadcast needs to wait for majority of followers to confirm the log before sending the next one
#[allow(dead_code)]
pub fn initialize_append_entries_channel(
consensus: Arc<Consensus>,
mut rx_pending_txs: broadcast::Receiver<TransactionExecution>,
mut rx_blocks: broadcast::Receiver<Block>,
) {
const TASK_NAME: &str = "consensus::block_and_executions_sender";
spawn_named(TASK_NAME, async move {
loop {
tokio::select! {
_ = GlobalState::wait_shutdown_warn(TASK_NAME) => {
return;
},
Ok(tx) = rx_pending_txs.recv() => {
if Consensus::is_leader() {
tracing::info!(tx_hash = %tx.hash(), "received transaction execution to send to followers");
if tx.is_local() {
tracing::debug!(tx_hash = %tx.hash(), "skipping local transaction because only external transactions are supported for now");
continue;
}

let transaction = vec![tx.to_append_entry_transaction()];
let transaction_entry = LogEntryData::TransactionExecutionEntries(transaction);
if Arc::clone(&consensus).broadcast_sender.send(transaction_entry).is_err() {
tracing::debug!("failed to broadcast transaction");
}
}
},
Ok(block) = rx_blocks.recv() => {
handle_block_entry(Arc::clone(&consensus), block).await;
},
else => {
tokio::task::yield_now().await;
},
}
}
});
}
29 changes: 29 additions & 0 deletions src/eth/consensus/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;

use tokio::sync::Mutex;
use tonic::transport::Server;
use tonic::Request;
use tonic::Response;
use tonic::Status;
Expand All @@ -16,6 +17,7 @@ use super::append_entry::AppendTransactionExecutionsResponse;
use super::append_entry::RequestVoteRequest;
use super::append_entry::RequestVoteResponse;
use super::append_entry::StatusCode;
use crate::eth::consensus::append_entry::append_entry_service_server::AppendEntryServiceServer;
use crate::eth::consensus::AppendEntryService;
use crate::eth::consensus::LogEntryData;
use crate::eth::consensus::PeerAddress;
Expand All @@ -24,8 +26,10 @@ use crate::eth::miner::block_from_propagation;
use crate::eth::primitives::LocalTransactionExecution;
use crate::eth::primitives::TransactionExecution;
use crate::eth::Consensus;
use crate::ext::spawn_named;
#[cfg(feature = "metrics")]
use crate::infra::metrics;
use crate::GlobalState;

#[cfg(feature = "metrics")]
mod label {
Expand All @@ -34,6 +38,31 @@ mod label {
pub(super) const REQUEST_VOTE: &str = "request_vote";
}

#[allow(dead_code)]
pub fn initialize_server(consensus: Arc<Consensus>) {
const TASK_NAME: &str = "consensus::server";
spawn_named(TASK_NAME, async move {
tracing::info!("Starting append entry service at address: {}", consensus.grpc_address);
let addr = consensus.grpc_address;

let append_entry_service = AppendEntryServiceImpl {
consensus: Mutex::new(consensus),
};

let shutdown = GlobalState::wait_shutdown_warn(TASK_NAME);

let server = Server::builder()
.add_service(AppendEntryServiceServer::new(append_entry_service))
.serve_with_shutdown(addr, shutdown)
.await;

if let Err(e) = server {
let message = GlobalState::shutdown_from("consensus", &format!("failed to create server at {}", addr));
tracing::error!(reason = ?e, %message);
}
});
}

pub struct AppendEntryServiceImpl {
pub consensus: Mutex<Arc<Consensus>>,
}
Expand Down
2 changes: 1 addition & 1 deletion src/eth/consensus/tests/test_simple_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::eth::consensus::append_entry::append_entry_service_server::AppendEntr
use crate::eth::consensus::append_entry::AppendBlockCommitRequest;
use crate::eth::consensus::append_entry::AppendTransactionExecutionsRequest;
use crate::eth::consensus::append_entry::StatusCode;
use crate::eth::consensus::AppendEntryServiceImpl;
use crate::eth::consensus::server::AppendEntryServiceImpl;
use crate::eth::consensus::Role;
use crate::eth::consensus::TransactionExecutionEntry;
use crate::eth::primitives::Address;
Expand Down

0 comments on commit 288b510

Please sign in to comment.