From ba60eaaacf90b4591f4b3aaa6d316cfa5cc33419 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Fri, 21 Jun 2024 10:30:29 -0300 Subject: [PATCH 01/65] initial commit --- src/eth/consensus/mod.rs | 63 +++++++++++++++++++++++++++++++++++----- 1 file changed, 56 insertions(+), 7 deletions(-) diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index 0389bfb0d..b6fc910ae 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -63,7 +63,9 @@ use append_entry::RequestVoteRequest; use append_entry::StatusCode; use append_entry::TransactionExecutionEntry; +use self::log_entry::LogEntry; use self::log_entry::LogEntryData; +use self::append_log_entries_storage::AppendLogEntriesStorage; use super::primitives::TransactionExecution; use super::primitives::TransactionInput; use crate::config::RunWithImporterConfig; @@ -156,6 +158,7 @@ pub struct Consensus { broadcast_sender: broadcast::Sender, //propagates the blocks importer_config: Option, //HACK this is used with sync online only storage: Arc, + log_storage: Arc, peers: Arc>>, direct_peers: Vec, voted_for: Mutex>, //essential to ensure that a server only votes once per term @@ -188,6 +191,7 @@ impl Consensus { let consensus = Self { broadcast_sender, storage, + log_storage: Arc::new(AppendLogEntriesStorage::new("log_storage").unwrap()), // FIXME: use a proper path peers, direct_peers, current_term: AtomicU64::new(0), @@ -412,10 +416,32 @@ impl Consensus { continue; } - //TODO XXX save transaction to appendEntries log - let transaction = vec![tx.to_append_entry_transaction()]; - let transaction_entry = LogEntryData::TransactionExecutionEntries(transaction); - if consensus.broadcast_sender.send(transaction_entry).is_err() { + let last_index = consensus.log_storage.get_last_index().unwrap_or(0); + + let current_term = consensus.current_term.load(Ordering::SeqCst); + + let transaction_entry = LogEntry { + term: current_term, + index: last_index + 1, + data: LogEntryData::TransactionExecutionEntries(vec![tx.to_append_entry_transaction()]), // TODO Check ordering? + }; + + if let Some(existing_entry) = consensus.log_storage.get_entry(transaction_entry.index).unwrap_or(None) { + if existing_entry.term != transaction_entry.term { + consensus.log_storage.delete_entries_from(transaction_entry.index).expect("Failed to delete existing transaction entries"); + } + } + + if let Err(e) = consensus.log_storage.save_entry(&transaction_entry) { + tracing::error!("failed to save transaction log entry: {:?}", e); + } + + // TODO + // If leaderCommit > commitIndex, set commitIndex = + // min(leaderCommit, index of last new entry) + + let transaction_entry_data = LogEntryData::TransactionExecutionEntries(vec![tx.to_append_entry_transaction()]); + if consensus.broadcast_sender.send(transaction_entry_data).is_err() { tracing::error!("failed to broadcast transaction"); } } @@ -424,10 +450,33 @@ impl Consensus { if consensus.is_leader() { tracing::info!(number = block.header.number.as_u64(), "received block to send to followers"); - //TODO save block to appendEntries log + let last_index = consensus.log_storage.get_last_index().unwrap_or(0); + + let current_term = consensus.current_term.load(Ordering::SeqCst); + + let block_entry = LogEntry { + term: current_term, + index: last_index + 1, + data: LogEntryData::BlockEntry(block.header.to_append_entry_block_header(Vec::new())), // TODO Check ordering? + }; + + if let Some(existing_entry) = consensus.log_storage.get_entry(block_entry.index).unwrap_or(None) { + if existing_entry.term != block_entry.term { + consensus.log_storage.delete_entries_from(block_entry.index).expect("Failed to delete existing block entries"); + } + } + + if let Err(e) = consensus.log_storage.save_entry(&block_entry) { + tracing::error!("failed to save block log entry: {:?}", e); + } + + // TODO + // If leaderCommit > commitIndex, set commitIndex = + // min(leaderCommit, index of last new entry) + //TODO before saving check if all transaction_hashes are already in the log - let block_entry = LogEntryData::BlockEntry(block.header.to_append_entry_block_header(Vec::new())); - if consensus.broadcast_sender.send(block_entry).is_err() { + let block_entry_data = LogEntryData::BlockEntry(block.header.to_append_entry_block_header(Vec::new())); + if consensus.broadcast_sender.send(block_entry_data).is_err() { tracing::error!("failed to broadcast block"); } } From 07178d0022c04a431a00e3ace6897e119bdc81d0 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Fri, 21 Jun 2024 10:35:58 -0300 Subject: [PATCH 02/65] merge --- src/eth/consensus/mod.rs | 4 +- src/eth/consensus/server.rs | 172 ++++++++++++++++++++++- src/eth/storage/rocks/rocks_permanent.rs | 2 +- src/eth/storage/stratus_storage.rs | 19 +++ 4 files changed, 192 insertions(+), 5 deletions(-) diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index b6fc910ae..71c0aa5d6 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -63,9 +63,9 @@ use append_entry::RequestVoteRequest; use append_entry::StatusCode; use append_entry::TransactionExecutionEntry; +use self::append_log_entries_storage::AppendLogEntriesStorage; use self::log_entry::LogEntry; use self::log_entry::LogEntryData; -use self::append_log_entries_storage::AppendLogEntriesStorage; use super::primitives::TransactionExecution; use super::primitives::TransactionInput; use crate::config::RunWithImporterConfig; @@ -439,7 +439,7 @@ impl Consensus { // TODO // If leaderCommit > commitIndex, set commitIndex = // min(leaderCommit, index of last new entry) - + let transaction_entry_data = LogEntryData::TransactionExecutionEntries(vec![tx.to_append_entry_transaction()]); if consensus.broadcast_sender.send(transaction_entry_data).is_err() { tracing::error!("failed to broadcast transaction"); diff --git a/src/eth/consensus/server.rs b/src/eth/consensus/server.rs index 12d879156..97f629927 100644 --- a/src/eth/consensus/server.rs +++ b/src/eth/consensus/server.rs @@ -1,5 +1,3 @@ -// append_entry_service.rs - use core::sync::atomic::Ordering; use std::sync::Arc; @@ -150,3 +148,173 @@ impl AppendEntryService for AppendEntryServiceImpl { })) } } + +#[cfg(test)] +mod tests { + use std::net::Ipv4Addr; + use std::net::SocketAddr; + + use tokio::sync::broadcast; + use tokio::sync::Mutex; + use tonic::Request; + + use super::*; + use crate::eth::consensus::BlockEntry; + use crate::eth::storage::StratusStorage; + + // Helper function to create a mock consensus instance + async fn create_mock_consensus() -> Arc { + let (storage, _tmpdir) = StratusStorage::mock_new_rocksdb(); + let direct_peers = Vec::new(); + let importer_config = None; + let jsonrpc_address = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0); + let grpc_address = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0); + let (tx_pending_txs, _) = broadcast::channel(10); + let (tx_blocks, _) = broadcast::channel(10); + + Consensus::new( + storage.into(), + direct_peers, + importer_config, + jsonrpc_address, + grpc_address, + tx_pending_txs.subscribe(), + tx_blocks.subscribe(), + ) + .await + } + + #[tokio::test] + async fn test_append_transaction_executions_not_leader() { + let consensus = create_mock_consensus().await; + let service = AppendEntryServiceImpl { + consensus: Mutex::new(Arc::clone(&consensus)), + }; + + // Simulate the node as not a leader + consensus.set_role(Role::Follower); + + let request = Request::new(AppendTransactionExecutionsRequest { + term: 1, + leader_id: "leader_id".to_string(), + prev_log_index: 0, + prev_log_term: 0, + executions: vec![], + }); + + let response = service.append_transaction_executions(request).await; + assert!(response.is_ok()); + + let response = response.unwrap().into_inner(); + assert_eq!(response.status, StatusCode::AppendSuccess as i32); + assert_eq!(response.message, "transaction Executions appended successfully"); + assert_eq!(response.last_committed_block_number, 0); + } + + #[tokio::test] + async fn test_append_transaction_executions_leader() { + let consensus = create_mock_consensus().await; + let service = AppendEntryServiceImpl { + consensus: Mutex::new(Arc::clone(&consensus)), + }; + + // Simulate the node as a leader + consensus.set_role(Role::Leader); + + let request = Request::new(AppendTransactionExecutionsRequest { + term: 1, + leader_id: "leader_id".to_string(), + prev_log_index: 0, + prev_log_term: 0, + executions: vec![], + }); + + let response = service.append_transaction_executions(request).await; + assert!(response.is_err()); + + let status = response.unwrap_err(); + assert_eq!(status.code(), tonic::Code::Unknown); + assert_eq!(status.message(), "append_transaction_executions called on leader node"); + } + + #[tokio::test] + async fn test_append_block_commit_not_leader() { + let consensus = create_mock_consensus().await; + let service = AppendEntryServiceImpl { + consensus: Mutex::new(Arc::clone(&consensus)), + }; + + // Simulate the node as not a leader + consensus.set_role(Role::Follower); + + let request = Request::new(AppendBlockCommitRequest { + term: 1, + leader_id: "leader_id".to_string(), + prev_log_index: 0, + prev_log_term: 0, + block_entry: Some(BlockEntry { + number: 1, + ..Default::default() + }), + }); + + let response = service.append_block_commit(request).await; + assert!(response.is_ok()); + + let response = response.unwrap().into_inner(); + assert_eq!(response.status, StatusCode::AppendSuccess as i32); + assert_eq!(response.message, "Block Commit appended successfully"); + assert_eq!(response.last_committed_block_number, 1); + } + + #[tokio::test] + async fn test_append_block_commit_leader() { + let consensus = create_mock_consensus().await; + let service = AppendEntryServiceImpl { + consensus: Mutex::new(Arc::clone(&consensus)), + }; + + // Simulate the node as a leader + consensus.set_role(Role::Leader); + + let request = Request::new(AppendBlockCommitRequest { + term: 1, + leader_id: "leader_id".to_string(), + prev_log_index: 0, + prev_log_term: 0, + block_entry: Some(BlockEntry { + number: 1, + ..Default::default() + }), + }); + + let response = service.append_block_commit(request).await; + assert!(response.is_err()); + + let status = response.unwrap_err(); + assert_eq!(status.code(), tonic::Code::Unknown); + assert_eq!(status.message(), "append_block_commit called on leader node"); + } + + #[tokio::test] + async fn test_request_vote() { + let consensus = create_mock_consensus().await; + let service = AppendEntryServiceImpl { + consensus: Mutex::new(Arc::clone(&consensus)), + }; + + let request = Request::new(RequestVoteRequest { + term: 1, + candidate_id: "https://candidate:1234;4321".to_string(), + last_log_index: 0, + last_log_term: 0, + }); + + let response = service.request_vote(request).await; + assert!(response.is_ok()); + + let response = response.unwrap().into_inner(); + assert_eq!(response.term, 1); + //XXX assert_eq!(response.vote_granted, true); + } +} diff --git a/src/eth/storage/rocks/rocks_permanent.rs b/src/eth/storage/rocks/rocks_permanent.rs index d35de95ba..3a6a1283e 100644 --- a/src/eth/storage/rocks/rocks_permanent.rs +++ b/src/eth/storage/rocks/rocks_permanent.rs @@ -35,7 +35,7 @@ impl RocksPermanentStorage { tracing::warn!(?prefix, "given prefix for RocksDB might put it in another folder"); } - let path = format!("data/{prefix}-rocksdb"); + let path = format!("{prefix}-rocksdb"); tracing::info!("starting rocksdb storage - at custom path: '{:?}'", path); path } else { diff --git a/src/eth/storage/stratus_storage.rs b/src/eth/storage/stratus_storage.rs index c8e1733c0..91c858a68 100644 --- a/src/eth/storage/stratus_storage.rs +++ b/src/eth/storage/stratus_storage.rs @@ -71,6 +71,25 @@ impl StratusStorage { } } + /// Creates an inmemory stratus storage for testing. + #[cfg(test)] + pub fn mock_new_rocksdb() -> (Self, tempfile::TempDir) { + // Create a unique temporary directory within the ./data directory + let temp_dir = tempfile::TempDir::new().expect("Failed to create temp dir"); + let temp_path = temp_dir.path().to_str().expect("Failed to get temp path").to_string(); + + let rocks_permanent_storage = + crate::eth::storage::RocksPermanentStorage::new(false, Some(temp_path.clone())).expect("Failed to create RocksPermanentStorage"); + + ( + Self { + temp: Arc::new(InMemoryTemporaryStorage::new()), + perm: Arc::new(rocks_permanent_storage), + }, + temp_dir, + ) + } + // ------------------------------------------------------------------------- // Block number // ------------------------------------------------------------------------- From 94c24dce5a2eea1210526a84fccdf9afa1371860 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Fri, 21 Jun 2024 11:29:06 -0300 Subject: [PATCH 03/65] implement custom storage prefix --- src/bin/run_with_importer.rs | 1 + src/config.rs | 1 - .../consensus/append_log_entries_storage.rs | 18 +++++++++++++++++- src/eth/consensus/mod.rs | 3 ++- src/eth/consensus/server.rs | 1 + src/main.rs | 1 + 6 files changed, 22 insertions(+), 3 deletions(-) diff --git a/src/bin/run_with_importer.rs b/src/bin/run_with_importer.rs index b97ad20c9..cc5950c07 100644 --- a/src/bin/run_with_importer.rs +++ b/src/bin/run_with_importer.rs @@ -24,6 +24,7 @@ async fn run(config: RunWithImporterConfig) -> anyhow::Result<()> { let miner = config.miner.init_external_mode(Arc::clone(&storage), None)?; let consensus = Consensus::new( Arc::clone(&storage), + config.clone().storage.perm_storage.rocks_path_prefix, config.clone().candidate_peers.clone(), Some(config.clone()), config.address, diff --git a/src/config.rs b/src/config.rs index 805330802..ac131adbe 100644 --- a/src/config.rs +++ b/src/config.rs @@ -844,7 +844,6 @@ pub struct PermanentStorageConfig { #[arg(long = "perm-storage", env = "PERM_STORAGE")] pub perm_storage_kind: PermanentStorageKind, - #[cfg(feature = "rocks")] /// RocksDB storage path prefix to execute multiple local Stratus instances. #[arg(long = "rocks-path-prefix", env = "ROCKS_PATH_PREFIX")] pub rocks_path_prefix: Option, diff --git a/src/eth/consensus/append_log_entries_storage.rs b/src/eth/consensus/append_log_entries_storage.rs index f59729d86..1b9657537 100644 --- a/src/eth/consensus/append_log_entries_storage.rs +++ b/src/eth/consensus/append_log_entries_storage.rs @@ -13,9 +13,25 @@ pub struct AppendLogEntriesStorage { } impl AppendLogEntriesStorage { - pub fn new>(path: P) -> Result { + pub fn new(path: Option) -> Result { let mut opts = Options::default(); opts.create_if_missing(true); + + let path = if let Some(prefix) = path { + // run some checks on the given prefix + assert!(!prefix.is_empty(), "given prefix for RocksDB is empty, try not providing the flag"); + if Path::new(&prefix).is_dir() || Path::new(&prefix).iter().count() > 1 { + tracing::warn!(?prefix, "given prefix for RocksDB might put it in another folder"); + } + + let path = format!("{prefix}-log-entries-rocksdb"); + tracing::info!("starting rocksdb log entries storage - at custom path: '{:?}'", path); + path + } else { + tracing::info!("starting rocksdb log entries storage - at default path: 'data/log-entries-rocksdb'"); // TODO: keep inside data? + "data/log-entries-rocksdb".to_string() + }; + let db = DB::open(&opts, path).context("Failed to open RocksDB")?; Ok(Self { db }) } diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index 71c0aa5d6..7fbaa39cb 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -176,6 +176,7 @@ pub struct Consensus { impl Consensus { pub async fn new( storage: Arc, + log_storage_path: Option, direct_peers: Vec, importer_config: Option, jsonrpc_address: SocketAddr, @@ -191,7 +192,7 @@ impl Consensus { let consensus = Self { broadcast_sender, storage, - log_storage: Arc::new(AppendLogEntriesStorage::new("log_storage").unwrap()), // FIXME: use a proper path + log_storage: Arc::new(AppendLogEntriesStorage::new(log_storage_path).unwrap()), peers, direct_peers, current_term: AtomicU64::new(0), diff --git a/src/eth/consensus/server.rs b/src/eth/consensus/server.rs index 97f629927..eb81c543d 100644 --- a/src/eth/consensus/server.rs +++ b/src/eth/consensus/server.rs @@ -174,6 +174,7 @@ mod tests { Consensus::new( storage.into(), + None, direct_peers, importer_config, jsonrpc_address, diff --git a/src/main.rs b/src/main.rs index 5fb71b654..5c7980c34 100644 --- a/src/main.rs +++ b/src/main.rs @@ -22,6 +22,7 @@ async fn run(config: StratusConfig) -> anyhow::Result<()> { let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner)); let consensus = Consensus::new( Arc::clone(&storage), + config.clone().storage.perm_storage.rocks_path_prefix, config.clone().candidate_peers.clone(), None, config.address, From 20320ca4b4e21e019a131c293e2af06e39f4dcd5 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Fri, 21 Jun 2024 11:30:30 -0300 Subject: [PATCH 04/65] fix var name --- src/eth/consensus/mod.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index 7fbaa39cb..f7551195c 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -158,7 +158,7 @@ pub struct Consensus { broadcast_sender: broadcast::Sender, //propagates the blocks importer_config: Option, //HACK this is used with sync online only storage: Arc, - log_storage: Arc, + log_entries_storage: Arc, peers: Arc>>, direct_peers: Vec, voted_for: Mutex>, //essential to ensure that a server only votes once per term @@ -192,7 +192,7 @@ impl Consensus { let consensus = Self { broadcast_sender, storage, - log_storage: Arc::new(AppendLogEntriesStorage::new(log_storage_path).unwrap()), + log_entries_storage: Arc::new(AppendLogEntriesStorage::new(log_storage_path).unwrap()), peers, direct_peers, current_term: AtomicU64::new(0), @@ -417,7 +417,7 @@ impl Consensus { continue; } - let last_index = consensus.log_storage.get_last_index().unwrap_or(0); + let last_index = consensus.log_entries_storage.get_last_index().unwrap_or(0); let current_term = consensus.current_term.load(Ordering::SeqCst); @@ -427,13 +427,13 @@ impl Consensus { data: LogEntryData::TransactionExecutionEntries(vec![tx.to_append_entry_transaction()]), // TODO Check ordering? }; - if let Some(existing_entry) = consensus.log_storage.get_entry(transaction_entry.index).unwrap_or(None) { + if let Some(existing_entry) = consensus.log_entries_storage.get_entry(transaction_entry.index).unwrap_or(None) { if existing_entry.term != transaction_entry.term { - consensus.log_storage.delete_entries_from(transaction_entry.index).expect("Failed to delete existing transaction entries"); + consensus.log_entries_storage.delete_entries_from(transaction_entry.index).expect("Failed to delete existing transaction entries"); } } - if let Err(e) = consensus.log_storage.save_entry(&transaction_entry) { + if let Err(e) = consensus.log_entries_storage.save_entry(&transaction_entry) { tracing::error!("failed to save transaction log entry: {:?}", e); } @@ -451,7 +451,7 @@ impl Consensus { if consensus.is_leader() { tracing::info!(number = block.header.number.as_u64(), "received block to send to followers"); - let last_index = consensus.log_storage.get_last_index().unwrap_or(0); + let last_index = consensus.log_entries_storage.get_last_index().unwrap_or(0); let current_term = consensus.current_term.load(Ordering::SeqCst); @@ -461,13 +461,13 @@ impl Consensus { data: LogEntryData::BlockEntry(block.header.to_append_entry_block_header(Vec::new())), // TODO Check ordering? }; - if let Some(existing_entry) = consensus.log_storage.get_entry(block_entry.index).unwrap_or(None) { + if let Some(existing_entry) = consensus.log_entries_storage.get_entry(block_entry.index).unwrap_or(None) { if existing_entry.term != block_entry.term { - consensus.log_storage.delete_entries_from(block_entry.index).expect("Failed to delete existing block entries"); + consensus.log_entries_storage.delete_entries_from(block_entry.index).expect("Failed to delete existing block entries"); } } - if let Err(e) = consensus.log_storage.save_entry(&block_entry) { + if let Err(e) = consensus.log_entries_storage.save_entry(&block_entry) { tracing::error!("failed to save block log entry: {:?}", e); } From c824dc9df8f7711178c35f8d679345ea9f511662 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Fri, 21 Jun 2024 11:44:17 -0300 Subject: [PATCH 05/65] fix e2e clean up --- chaos/experiments/leader-election.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chaos/experiments/leader-election.sh b/chaos/experiments/leader-election.sh index df235e0c2..f29661e75 100755 --- a/chaos/experiments/leader-election.sh +++ b/chaos/experiments/leader-election.sh @@ -113,7 +113,7 @@ find_leader() { # Function to remove rocks-path directory remove_rocks_path() { - rm -rf data/ + find . -type d -name "tmp_rocks_*" -print0 | xargs -0 rm -rf } # Function to run the election test From d4bd2efea8f7f66958a2a55861091b5ec9f2fcee Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Fri, 21 Jun 2024 11:57:07 -0300 Subject: [PATCH 06/65] change clone --- src/bin/run_with_importer.rs | 2 +- src/main.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/bin/run_with_importer.rs b/src/bin/run_with_importer.rs index cc5950c07..c177be894 100644 --- a/src/bin/run_with_importer.rs +++ b/src/bin/run_with_importer.rs @@ -24,7 +24,7 @@ async fn run(config: RunWithImporterConfig) -> anyhow::Result<()> { let miner = config.miner.init_external_mode(Arc::clone(&storage), None)?; let consensus = Consensus::new( Arc::clone(&storage), - config.clone().storage.perm_storage.rocks_path_prefix, + config.storage.perm_storage.rocks_path_prefix.clone(), config.clone().candidate_peers.clone(), Some(config.clone()), config.address, diff --git a/src/main.rs b/src/main.rs index 5c7980c34..c71f97aed 100644 --- a/src/main.rs +++ b/src/main.rs @@ -22,7 +22,7 @@ async fn run(config: StratusConfig) -> anyhow::Result<()> { let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner)); let consensus = Consensus::new( Arc::clone(&storage), - config.clone().storage.perm_storage.rocks_path_prefix, + config.storage.perm_storage.rocks_path_prefix.clone(), config.clone().candidate_peers.clone(), None, config.address, From 92d0ee381cb7f56bcb0764c11b7a939f446430aa Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Fri, 21 Jun 2024 11:59:54 -0300 Subject: [PATCH 07/65] debug logs --- src/eth/consensus/mod.rs | 64 ++++++++++++++++++++++++++-------------- 1 file changed, 42 insertions(+), 22 deletions(-) diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index f7551195c..de74271e0 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -410,79 +410,99 @@ impl Consensus { loop { tokio::select! { Ok(tx) = rx_pending_txs.recv() => { + tracing::debug!("Attempting to receive transaction execution"); if consensus.is_leader() { - tracing::info!(hash = %tx.hash(), "received transaction execution to send to followers"); + tracing::info!(hash = %tx.hash(), "Leader received transaction execution to send to followers"); if tx.is_local() { - tracing::debug!(hash = %tx.hash(), "skipping local transaction because only external transactions are supported for now"); + tracing::debug!(hash = %tx.hash(), "Skipping local transaction because only external transactions are supported for now"); continue; } + tracing::debug!("Fetching last index from log entries storage"); let last_index = consensus.log_entries_storage.get_last_index().unwrap_or(0); + tracing::debug!(last_index, "Last index fetched"); + tracing::debug!("Loading current term"); let current_term = consensus.current_term.load(Ordering::SeqCst); + tracing::debug!(current_term, "Current term loaded"); + tracing::debug!("Creating transaction log entry"); let transaction_entry = LogEntry { term: current_term, index: last_index + 1, - data: LogEntryData::TransactionExecutionEntries(vec![tx.to_append_entry_transaction()]), // TODO Check ordering? + data: LogEntryData::TransactionExecutionEntries(vec![tx.to_append_entry_transaction()]), }; + tracing::debug!(index = transaction_entry.index, term = transaction_entry.term, "Transaction log entry created"); + tracing::debug!("Checking for existing entry at new index"); if let Some(existing_entry) = consensus.log_entries_storage.get_entry(transaction_entry.index).unwrap_or(None) { if existing_entry.term != transaction_entry.term { + tracing::debug!(index = transaction_entry.index, "Deleting entries from index due to term mismatch"); consensus.log_entries_storage.delete_entries_from(transaction_entry.index).expect("Failed to delete existing transaction entries"); } } + tracing::debug!("Saving new transaction log entry"); if let Err(e) = consensus.log_entries_storage.save_entry(&transaction_entry) { - tracing::error!("failed to save transaction log entry: {:?}", e); + tracing::error!("Failed to save transaction log entry: {:?}", e); + } else { + tracing::debug!("Transaction log entry saved successfully"); } - // TODO - // If leaderCommit > commitIndex, set commitIndex = - // min(leaderCommit, index of last new entry) - - let transaction_entry_data = LogEntryData::TransactionExecutionEntries(vec![tx.to_append_entry_transaction()]); - if consensus.broadcast_sender.send(transaction_entry_data).is_err() { - tracing::error!("failed to broadcast transaction"); + tracing::debug!("Broadcasting transaction"); + if consensus.broadcast_sender.send(transaction_entry.data.clone()).is_err() { + tracing::error!("Failed to broadcast transaction"); + } else { + tracing::debug!("Transaction broadcasted successfully"); } } } Ok(block) = rx_blocks.recv() => { + tracing::debug!("Attempting to receive block"); if consensus.is_leader() { - tracing::info!(number = block.header.number.as_u64(), "received block to send to followers"); + tracing::info!(number = block.header.number.as_u64(), "Leader received block to send to followers"); + tracing::debug!("Fetching last index from log entries storage for block"); let last_index = consensus.log_entries_storage.get_last_index().unwrap_or(0); + tracing::debug!(last_index, "Last index for block fetched"); + tracing::debug!("Loading current term for block"); let current_term = consensus.current_term.load(Ordering::SeqCst); + tracing::debug!(current_term, "Current term for block loaded"); + tracing::debug!("Creating block log entry"); let block_entry = LogEntry { term: current_term, index: last_index + 1, - data: LogEntryData::BlockEntry(block.header.to_append_entry_block_header(Vec::new())), // TODO Check ordering? + data: LogEntryData::BlockEntry(block.header.to_append_entry_block_header(Vec::new())), }; + tracing::debug!(index = block_entry.index, term = block_entry.term, "Block log entry created"); + tracing::debug!("Checking for existing block entry at new index"); if let Some(existing_entry) = consensus.log_entries_storage.get_entry(block_entry.index).unwrap_or(None) { if existing_entry.term != block_entry.term { + tracing::debug!(index = block_entry.index, "Deleting block entries from index due to term mismatch"); consensus.log_entries_storage.delete_entries_from(block_entry.index).expect("Failed to delete existing block entries"); } } + tracing::debug!("Saving new block log entry"); if let Err(e) = consensus.log_entries_storage.save_entry(&block_entry) { - tracing::error!("failed to save block log entry: {:?}", e); + tracing::error!("Failed to save block log entry: {:?}", e); + } else { + tracing::debug!("Block log entry saved successfully"); } - // TODO - // If leaderCommit > commitIndex, set commitIndex = - // min(leaderCommit, index of last new entry) - - //TODO before saving check if all transaction_hashes are already in the log - let block_entry_data = LogEntryData::BlockEntry(block.header.to_append_entry_block_header(Vec::new())); - if consensus.broadcast_sender.send(block_entry_data).is_err() { - tracing::error!("failed to broadcast block"); + tracing::debug!("Broadcasting block"); + if consensus.broadcast_sender.send(block_entry.data.clone()).is_err() { + tracing::error!("Failed to broadcast block"); + } else { + tracing::debug!("Block broadcasted successfully"); } } } else => { + tracing::debug!("No transactions or blocks received, yielding"); tokio::task::yield_now().await; } } From 731efa356ec7bbb149a5166b4dbf1c054a7135a1 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Fri, 21 Jun 2024 13:49:45 -0300 Subject: [PATCH 08/65] refactor --- src/eth/consensus/mod.rs | 112 +++++++++++++++++---------------------- 1 file changed, 50 insertions(+), 62 deletions(-) diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index de74271e0..59dd51676 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -417,43 +417,17 @@ impl Consensus { tracing::debug!(hash = %tx.hash(), "Skipping local transaction because only external transactions are supported for now"); continue; } - + tracing::debug!("Fetching last index from log entries storage"); let last_index = consensus.log_entries_storage.get_last_index().unwrap_or(0); tracing::debug!(last_index, "Last index fetched"); - + tracing::debug!("Loading current term"); let current_term = consensus.current_term.load(Ordering::SeqCst); tracing::debug!(current_term, "Current term loaded"); - - tracing::debug!("Creating transaction log entry"); - let transaction_entry = LogEntry { - term: current_term, - index: last_index + 1, - data: LogEntryData::TransactionExecutionEntries(vec![tx.to_append_entry_transaction()]), - }; - tracing::debug!(index = transaction_entry.index, term = transaction_entry.term, "Transaction log entry created"); - - tracing::debug!("Checking for existing entry at new index"); - if let Some(existing_entry) = consensus.log_entries_storage.get_entry(transaction_entry.index).unwrap_or(None) { - if existing_entry.term != transaction_entry.term { - tracing::debug!(index = transaction_entry.index, "Deleting entries from index due to term mismatch"); - consensus.log_entries_storage.delete_entries_from(transaction_entry.index).expect("Failed to delete existing transaction entries"); - } - } - - tracing::debug!("Saving new transaction log entry"); - if let Err(e) = consensus.log_entries_storage.save_entry(&transaction_entry) { - tracing::error!("Failed to save transaction log entry: {:?}", e); - } else { - tracing::debug!("Transaction log entry saved successfully"); - } - - tracing::debug!("Broadcasting transaction"); - if consensus.broadcast_sender.send(transaction_entry.data.clone()).is_err() { - tracing::error!("Failed to broadcast transaction"); - } else { - tracing::debug!("Transaction broadcasted successfully"); + + if Self::save_log_entry(&consensus, last_index + 1, current_term, LogEntryData::TransactionExecutionEntries(vec![tx.to_append_entry_transaction()]), "transaction").is_ok() { + Self::broadcast_log_entry(&consensus, LogEntryData::TransactionExecutionEntries(vec![tx.to_append_entry_transaction()]), "transaction"); } } } @@ -461,43 +435,17 @@ impl Consensus { tracing::debug!("Attempting to receive block"); if consensus.is_leader() { tracing::info!(number = block.header.number.as_u64(), "Leader received block to send to followers"); - + tracing::debug!("Fetching last index from log entries storage for block"); let last_index = consensus.log_entries_storage.get_last_index().unwrap_or(0); tracing::debug!(last_index, "Last index for block fetched"); - + tracing::debug!("Loading current term for block"); let current_term = consensus.current_term.load(Ordering::SeqCst); tracing::debug!(current_term, "Current term for block loaded"); - - tracing::debug!("Creating block log entry"); - let block_entry = LogEntry { - term: current_term, - index: last_index + 1, - data: LogEntryData::BlockEntry(block.header.to_append_entry_block_header(Vec::new())), - }; - tracing::debug!(index = block_entry.index, term = block_entry.term, "Block log entry created"); - - tracing::debug!("Checking for existing block entry at new index"); - if let Some(existing_entry) = consensus.log_entries_storage.get_entry(block_entry.index).unwrap_or(None) { - if existing_entry.term != block_entry.term { - tracing::debug!(index = block_entry.index, "Deleting block entries from index due to term mismatch"); - consensus.log_entries_storage.delete_entries_from(block_entry.index).expect("Failed to delete existing block entries"); - } - } - - tracing::debug!("Saving new block log entry"); - if let Err(e) = consensus.log_entries_storage.save_entry(&block_entry) { - tracing::error!("Failed to save block log entry: {:?}", e); - } else { - tracing::debug!("Block log entry saved successfully"); - } - - tracing::debug!("Broadcasting block"); - if consensus.broadcast_sender.send(block_entry.data.clone()).is_err() { - tracing::error!("Failed to broadcast block"); - } else { - tracing::debug!("Block broadcasted successfully"); + + if Self::save_log_entry(&consensus, last_index + 1, current_term, LogEntryData::BlockEntry(block.header.to_append_entry_block_header(Vec::new())),"block").is_ok() { + Self::broadcast_log_entry(&consensus, LogEntryData::BlockEntry(block.header.to_append_entry_block_header(Vec::new())), "block"); } } } @@ -531,6 +479,46 @@ impl Consensus { }); } + fn save_log_entry( + consensus: &Arc, + index: u64, + term: u64, + data: LogEntryData, + entry_type: &str, + ) -> Result<(), String> { + tracing::debug!(index, term, "Creating {} log entry", entry_type); + let log_entry = LogEntry { + term, + index, + data, + }; + tracing::debug!(index = log_entry.index, term = log_entry.term, "{} log entry created", entry_type); + + tracing::debug!("Checking for existing {} entry at new index", entry_type); + if let Some(existing_entry) = consensus.log_entries_storage.get_entry(log_entry.index).unwrap_or(None) { + if existing_entry.term != log_entry.term { + tracing::debug!(index = log_entry.index, "Deleting {} entries from index due to term mismatch", entry_type); + consensus.log_entries_storage.delete_entries_from(log_entry.index).map_err(|e| format!("Failed to delete existing {} entries: {:?}", entry_type, e))?; + } + } + + tracing::debug!("Saving new {} log entry", entry_type); + consensus.log_entries_storage.save_entry(&log_entry).map_err(|e| format!("Failed to save {} log entry: {:?}", entry_type, e)) + } + + fn broadcast_log_entry( + consensus: &Arc, + data: LogEntryData, + entry_type: &str, + ) { + tracing::debug!("Broadcasting {}", entry_type); + if consensus.broadcast_sender.send(data.clone()).is_err() { + tracing::error!("Failed to broadcast {}", entry_type); + } else { + tracing::debug!("{} broadcasted successfully", entry_type); + } + } + fn set_role(&self, role: Role) { self.role.store(role as u8, Ordering::SeqCst); } From 69599d8794adca123991b3d13308952b07f18861 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Fri, 21 Jun 2024 14:41:51 -0300 Subject: [PATCH 09/65] lint --- src/eth/consensus/mod.rs | 46 +++++++++++++++++----------------------- 1 file changed, 19 insertions(+), 27 deletions(-) diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index 59dd51676..dccd190aa 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -417,15 +417,15 @@ impl Consensus { tracing::debug!(hash = %tx.hash(), "Skipping local transaction because only external transactions are supported for now"); continue; } - + tracing::debug!("Fetching last index from log entries storage"); let last_index = consensus.log_entries_storage.get_last_index().unwrap_or(0); tracing::debug!(last_index, "Last index fetched"); - + tracing::debug!("Loading current term"); let current_term = consensus.current_term.load(Ordering::SeqCst); tracing::debug!(current_term, "Current term loaded"); - + if Self::save_log_entry(&consensus, last_index + 1, current_term, LogEntryData::TransactionExecutionEntries(vec![tx.to_append_entry_transaction()]), "transaction").is_ok() { Self::broadcast_log_entry(&consensus, LogEntryData::TransactionExecutionEntries(vec![tx.to_append_entry_transaction()]), "transaction"); } @@ -435,15 +435,15 @@ impl Consensus { tracing::debug!("Attempting to receive block"); if consensus.is_leader() { tracing::info!(number = block.header.number.as_u64(), "Leader received block to send to followers"); - + tracing::debug!("Fetching last index from log entries storage for block"); let last_index = consensus.log_entries_storage.get_last_index().unwrap_or(0); tracing::debug!(last_index, "Last index for block fetched"); - + tracing::debug!("Loading current term for block"); let current_term = consensus.current_term.load(Ordering::SeqCst); tracing::debug!(current_term, "Current term for block loaded"); - + if Self::save_log_entry(&consensus, last_index + 1, current_term, LogEntryData::BlockEntry(block.header.to_append_entry_block_header(Vec::new())),"block").is_ok() { Self::broadcast_log_entry(&consensus, LogEntryData::BlockEntry(block.header.to_append_entry_block_header(Vec::new())), "block"); } @@ -479,38 +479,30 @@ impl Consensus { }); } - fn save_log_entry( - consensus: &Arc, - index: u64, - term: u64, - data: LogEntryData, - entry_type: &str, - ) -> Result<(), String> { + fn save_log_entry(consensus: &Arc, index: u64, term: u64, data: LogEntryData, entry_type: &str) -> Result<(), String> { tracing::debug!(index, term, "Creating {} log entry", entry_type); - let log_entry = LogEntry { - term, - index, - data, - }; + let log_entry = LogEntry { term, index, data }; tracing::debug!(index = log_entry.index, term = log_entry.term, "{} log entry created", entry_type); - + tracing::debug!("Checking for existing {} entry at new index", entry_type); if let Some(existing_entry) = consensus.log_entries_storage.get_entry(log_entry.index).unwrap_or(None) { if existing_entry.term != log_entry.term { tracing::debug!(index = log_entry.index, "Deleting {} entries from index due to term mismatch", entry_type); - consensus.log_entries_storage.delete_entries_from(log_entry.index).map_err(|e| format!("Failed to delete existing {} entries: {:?}", entry_type, e))?; + consensus + .log_entries_storage + .delete_entries_from(log_entry.index) + .map_err(|e| format!("Failed to delete existing {} entries: {:?}", entry_type, e))?; } } - + tracing::debug!("Saving new {} log entry", entry_type); - consensus.log_entries_storage.save_entry(&log_entry).map_err(|e| format!("Failed to save {} log entry: {:?}", entry_type, e)) + consensus + .log_entries_storage + .save_entry(&log_entry) + .map_err(|e| format!("Failed to save {} log entry: {:?}", entry_type, e)) } - fn broadcast_log_entry( - consensus: &Arc, - data: LogEntryData, - entry_type: &str, - ) { + fn broadcast_log_entry(consensus: &Arc, data: LogEntryData, entry_type: &str) { tracing::debug!("Broadcasting {}", entry_type); if consensus.broadcast_sender.send(data.clone()).is_err() { tracing::error!("Failed to broadcast {}", entry_type); From 0f813536c73aefcd0924c6ce87fa531c3f05c6ef Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Fri, 21 Jun 2024 14:53:12 -0300 Subject: [PATCH 10/65] test fix --- src/eth/consensus/server.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/eth/consensus/server.rs b/src/eth/consensus/server.rs index 5b0f8dfa4..9867a3a26 100644 --- a/src/eth/consensus/server.rs +++ b/src/eth/consensus/server.rs @@ -171,6 +171,7 @@ mod tests { // Helper function to create a mock consensus instance async fn create_mock_consensus() -> Arc { let (storage, _tmpdir) = StratusStorage::mock_new_rocksdb(); + let (_log_entries_storage, tmpdir_log_entries) = StratusStorage::mock_new_rocksdb(); let direct_peers = Vec::new(); let importer_config = None; let jsonrpc_address = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0); @@ -178,9 +179,11 @@ mod tests { let (tx_pending_txs, _) = broadcast::channel(10); let (tx_blocks, _) = broadcast::channel(10); + let tmpdir_log_entries_path = tmpdir_log_entries.path().to_str().map(|s| s.to_owned()); + Consensus::new( storage.into(), - None, + tmpdir_log_entries_path, direct_peers, importer_config, jsonrpc_address, From 8bd833da11d55b467e46f073f2b3858aa533e09e Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Fri, 21 Jun 2024 15:20:23 -0300 Subject: [PATCH 11/65] temp fix --- src/eth/consensus/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index dccd190aa..c82e68343 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -174,6 +174,7 @@ pub struct Consensus { } impl Consensus { + #[allow(clippy::too_many_arguments)] pub async fn new( storage: Arc, log_storage_path: Option, From 14f65ad71b08c4166d0d143c08b8cb45cce67a9d Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Fri, 21 Jun 2024 15:31:05 -0300 Subject: [PATCH 12/65] revert --- src/eth/consensus/mod.rs | 40 ++++++++++------------------------------ 1 file changed, 10 insertions(+), 30 deletions(-) diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index c82e68343..af91aa199 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -418,17 +418,11 @@ impl Consensus { tracing::debug!(hash = %tx.hash(), "Skipping local transaction because only external transactions are supported for now"); continue; } - - tracing::debug!("Fetching last index from log entries storage"); - let last_index = consensus.log_entries_storage.get_last_index().unwrap_or(0); - tracing::debug!(last_index, "Last index fetched"); - - tracing::debug!("Loading current term"); - let current_term = consensus.current_term.load(Ordering::SeqCst); - tracing::debug!(current_term, "Current term loaded"); - - if Self::save_log_entry(&consensus, last_index + 1, current_term, LogEntryData::TransactionExecutionEntries(vec![tx.to_append_entry_transaction()]), "transaction").is_ok() { - Self::broadcast_log_entry(&consensus, LogEntryData::TransactionExecutionEntries(vec![tx.to_append_entry_transaction()]), "transaction"); + + let transaction = vec![tx.to_append_entry_transaction()]; + let transaction_entry = LogEntryData::TransactionExecutionEntries(transaction); + if consensus.broadcast_sender.send(transaction_entry).is_err() { + tracing::error!("failed to broadcast transaction"); } } } @@ -437,16 +431,11 @@ impl Consensus { if consensus.is_leader() { tracing::info!(number = block.header.number.as_u64(), "Leader received block to send to followers"); - tracing::debug!("Fetching last index from log entries storage for block"); - let last_index = consensus.log_entries_storage.get_last_index().unwrap_or(0); - tracing::debug!(last_index, "Last index for block fetched"); - - tracing::debug!("Loading current term for block"); - let current_term = consensus.current_term.load(Ordering::SeqCst); - tracing::debug!(current_term, "Current term for block loaded"); - - if Self::save_log_entry(&consensus, last_index + 1, current_term, LogEntryData::BlockEntry(block.header.to_append_entry_block_header(Vec::new())),"block").is_ok() { - Self::broadcast_log_entry(&consensus, LogEntryData::BlockEntry(block.header.to_append_entry_block_header(Vec::new())), "block"); + //TODO save block to appendEntries log + //TODO before saving check if all transaction_hashes are already in the log + let block_entry = LogEntryData::BlockEntry(block.header.to_append_entry_block_header(Vec::new())); + if consensus.broadcast_sender.send(block_entry).is_err() { + tracing::error!("failed to broadcast block"); } } } @@ -503,15 +492,6 @@ impl Consensus { .map_err(|e| format!("Failed to save {} log entry: {:?}", entry_type, e)) } - fn broadcast_log_entry(consensus: &Arc, data: LogEntryData, entry_type: &str) { - tracing::debug!("Broadcasting {}", entry_type); - if consensus.broadcast_sender.send(data.clone()).is_err() { - tracing::error!("Failed to broadcast {}", entry_type); - } else { - tracing::debug!("{} broadcasted successfully", entry_type); - } - } - fn set_role(&self, role: Role) { self.role.store(role as u8, Ordering::SeqCst); } From 928453e806e3a2ab581a3d6fba4bcb35569a14c9 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Fri, 21 Jun 2024 15:38:35 -0300 Subject: [PATCH 13/65] add todo --- src/eth/consensus/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index af91aa199..606ee80f0 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -174,7 +174,7 @@ pub struct Consensus { } impl Consensus { - #[allow(clippy::too_many_arguments)] + #[allow(clippy::too_many_arguments)] //TODO refactor into consensus config pub async fn new( storage: Arc, log_storage_path: Option, From 4851e16bb0e83e6b6b7dd2d0812b8e7654c3ac83 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Fri, 21 Jun 2024 16:29:34 -0300 Subject: [PATCH 14/65] implement tx execution save log --- src/eth/consensus/mod.rs | 41 +++++++++++++++++++++++++++++++++++----- 1 file changed, 36 insertions(+), 5 deletions(-) diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index 606ee80f0..aaa026c10 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -376,19 +376,48 @@ impl Consensus { } fn initialize_transaction_execution_queue(consensus: Arc) { - //TODO add data to consensus-log-transactions - //TODO rediscover followers on comunication error - //XXX FIXME deal with the scenario where a transactionHash arrives after the block, in this case before saving the block LogEntry, it should ALWAYS wait for all transaction hashes - //TODO maybe check if I'm currently the leader? + // TODO: add data to consensus-log-transactions + // TODO: rediscover followers on communication error + // XXX FIXME: deal with the scenario where a transactionHash arrives after the block; + // in this case, before saving the block LogEntry, it should ALWAYS wait for all transaction hashes + // TODO: maybe check if I'm currently the leader? + spawn_named("consensus::transaction_execution_queue", async move { let interval = Duration::from_millis(40); loop { tokio::time::sleep(interval).await; + if consensus.is_leader() { let mut queue = consensus.transaction_execution_queue.lock().await; let executions = queue.drain(..).collect::>(); drop(queue); - + + tracing::debug!(executions_len = executions.len(), "Processing transaction executions"); + if !executions.is_empty() { + tracing::debug!("Fetching last index from log entries storage"); + let last_index = consensus.log_entries_storage.get_last_index().unwrap_or(0); + tracing::debug!(last_index, "Last index fetched"); + + tracing::debug!("Loading current term"); + let current_term = consensus.current_term.load(Ordering::SeqCst); + tracing::debug!(current_term, "Current term loaded"); + + match Self::save_log_entry( + &consensus, + last_index + 1, + current_term, + LogEntryData::TransactionExecutionEntries(executions.clone()), + "transaction" + ) { + Ok(_) => { + tracing::debug!("Log entry saved successfully"); + }, + Err(e) => { + tracing::error!("Failed to save log entry: {:?}", e); //TODO: handle error + } + } + } + let peers = consensus.peers.read().await; for (_, (peer, _)) in peers.iter() { let mut peer_clone = peer.clone(); @@ -399,6 +428,8 @@ impl Consensus { }); } + + /// This channel broadcasts blocks and transactons executions to followers. /// Each follower has a queue of blocks and transactions to be sent at handle_peer_propagation. //TODO this broadcast needs to wait for majority of followers to confirm the log before sending the next one From 1a410133064113534955da84463047765455d9bf Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Fri, 21 Jun 2024 16:30:31 -0300 Subject: [PATCH 15/65] fmt --- src/eth/consensus/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index aaa026c10..d833d7c17 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -429,7 +429,6 @@ impl Consensus { } - /// This channel broadcasts blocks and transactons executions to followers. /// Each follower has a queue of blocks and transactions to be sent at handle_peer_propagation. //TODO this broadcast needs to wait for majority of followers to confirm the log before sending the next one From 0ca93ecb546637dee558c3f9472a81621f7730bf Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Fri, 21 Jun 2024 16:42:14 -0300 Subject: [PATCH 16/65] lint --- src/eth/consensus/mod.rs | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index d833d7c17..39659687f 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -378,46 +378,47 @@ impl Consensus { fn initialize_transaction_execution_queue(consensus: Arc) { // TODO: add data to consensus-log-transactions // TODO: rediscover followers on communication error - // XXX FIXME: deal with the scenario where a transactionHash arrives after the block; + // XXX FIXME: deal with the scenario where a transactionHash arrives after the block; // in this case, before saving the block LogEntry, it should ALWAYS wait for all transaction hashes // TODO: maybe check if I'm currently the leader? - + spawn_named("consensus::transaction_execution_queue", async move { let interval = Duration::from_millis(40); loop { tokio::time::sleep(interval).await; - + if consensus.is_leader() { let mut queue = consensus.transaction_execution_queue.lock().await; let executions = queue.drain(..).collect::>(); drop(queue); - + tracing::debug!(executions_len = executions.len(), "Processing transaction executions"); if !executions.is_empty() { tracing::debug!("Fetching last index from log entries storage"); let last_index = consensus.log_entries_storage.get_last_index().unwrap_or(0); tracing::debug!(last_index, "Last index fetched"); - + tracing::debug!("Loading current term"); let current_term = consensus.current_term.load(Ordering::SeqCst); tracing::debug!(current_term, "Current term loaded"); - + match Self::save_log_entry( - &consensus, - last_index + 1, - current_term, - LogEntryData::TransactionExecutionEntries(executions.clone()), - "transaction" + &consensus, + last_index + 1, + current_term, + LogEntryData::TransactionExecutionEntries(executions.clone()), + "transaction", ) { Ok(_) => { tracing::debug!("Log entry saved successfully"); - }, + } Err(e) => { - tracing::error!("Failed to save log entry: {:?}", e); //TODO: handle error + tracing::error!("Failed to save log entry: {:?}", e); + //TODO: handle error } } } - + let peers = consensus.peers.read().await; for (_, (peer, _)) in peers.iter() { let mut peer_clone = peer.clone(); @@ -428,7 +429,6 @@ impl Consensus { }); } - /// This channel broadcasts blocks and transactons executions to followers. /// Each follower has a queue of blocks and transactions to be sent at handle_peer_propagation. //TODO this broadcast needs to wait for majority of followers to confirm the log before sending the next one @@ -448,7 +448,7 @@ impl Consensus { tracing::debug!(hash = %tx.hash(), "Skipping local transaction because only external transactions are supported for now"); continue; } - + let transaction = vec![tx.to_append_entry_transaction()]; let transaction_entry = LogEntryData::TransactionExecutionEntries(transaction); if consensus.broadcast_sender.send(transaction_entry).is_err() { From 8c713e4560200969bd8c4ab388ad8391e5b2c89c Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Fri, 21 Jun 2024 16:56:32 -0300 Subject: [PATCH 17/65] add block back --- src/eth/consensus/mod.rs | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index 39659687f..a96395bf2 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -174,7 +174,7 @@ pub struct Consensus { } impl Consensus { - #[allow(clippy::too_many_arguments)] //TODO refactor into consensus config + #[allow(clippy::too_many_arguments)] //TODO: refactor into consensus config pub async fn new( storage: Arc, log_storage_path: Option, @@ -461,11 +461,20 @@ impl Consensus { if consensus.is_leader() { tracing::info!(number = block.header.number.as_u64(), "Leader received block to send to followers"); - //TODO save block to appendEntries log - //TODO before saving check if all transaction_hashes are already in the log - let block_entry = LogEntryData::BlockEntry(block.header.to_append_entry_block_header(Vec::new())); - if consensus.broadcast_sender.send(block_entry).is_err() { - tracing::error!("failed to broadcast block"); + //TODO: before saving check if all transaction_hashes are already in the log + tracing::debug!("Fetching last index from log entries storage for block"); + let last_index = consensus.log_entries_storage.get_last_index().unwrap_or(0); + tracing::debug!(last_index, "Last index for block fetched"); + + tracing::debug!("Loading current term for block"); + let current_term = consensus.current_term.load(Ordering::SeqCst); + tracing::debug!(current_term, "Current term for block loaded"); + + if Self::save_log_entry(&consensus, last_index + 1, current_term, LogEntryData::BlockEntry(block.header.to_append_entry_block_header(Vec::new())),"block").is_ok() { + let block_entry = LogEntryData::BlockEntry(block.header.to_append_entry_block_header(Vec::new())); + if consensus.broadcast_sender.send(block_entry).is_err() { + tracing::error!("failed to broadcast block"); + } } } } From d5ddc45c454bc97e2ede6daaf566395c2414e23d Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Fri, 21 Jun 2024 17:10:21 -0300 Subject: [PATCH 18/65] test --- src/eth/consensus/mod.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index a96395bf2..f1e57ed36 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -1,5 +1,3 @@ -#[cfg(feature = "rocks")] -#[allow(dead_code)] mod append_log_entries_storage; mod discovery; pub mod forward_to; From d057de0529492b05d68599fb7090f8753131d41b Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Fri, 21 Jun 2024 17:12:23 -0300 Subject: [PATCH 19/65] fix --- src/eth/consensus/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index f1e57ed36..9d64e5538 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -1,3 +1,4 @@ +#[allow(dead_code)] mod append_log_entries_storage; mod discovery; pub mod forward_to; From f4a4097d1522ad9a1a6fe73cd7576224a8bc7962 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Fri, 21 Jun 2024 17:30:17 -0300 Subject: [PATCH 20/65] fix rocks flag --- src/eth/consensus/mod.rs | 79 ++++++++++++++++++++++++---------------- 1 file changed, 47 insertions(+), 32 deletions(-) diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index 9d64e5538..1d813c0fd 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -1,3 +1,4 @@ +#[cfg(feature = "rocks")] #[allow(dead_code)] mod append_log_entries_storage; mod discovery; @@ -62,6 +63,7 @@ use append_entry::RequestVoteRequest; use append_entry::StatusCode; use append_entry::TransactionExecutionEntry; +#[cfg(feature = "rocks")] use self::append_log_entries_storage::AppendLogEntriesStorage; use self::log_entry::LogEntry; use self::log_entry::LogEntryData; @@ -391,29 +393,32 @@ impl Consensus { let executions = queue.drain(..).collect::>(); drop(queue); - tracing::debug!(executions_len = executions.len(), "Processing transaction executions"); - if !executions.is_empty() { - tracing::debug!("Fetching last index from log entries storage"); - let last_index = consensus.log_entries_storage.get_last_index().unwrap_or(0); - tracing::debug!(last_index, "Last index fetched"); + #[cfg(feature = "rocks")] + { + tracing::debug!(executions_len = executions.len(), "Processing transaction executions"); + if !executions.is_empty() { + tracing::debug!("Fetching last index from log entries storage"); + let last_index = consensus.log_entries_storage.get_last_index().unwrap_or(0); + tracing::debug!(last_index, "Last index fetched"); - tracing::debug!("Loading current term"); - let current_term = consensus.current_term.load(Ordering::SeqCst); - tracing::debug!(current_term, "Current term loaded"); - - match Self::save_log_entry( - &consensus, - last_index + 1, - current_term, - LogEntryData::TransactionExecutionEntries(executions.clone()), - "transaction", - ) { - Ok(_) => { - tracing::debug!("Log entry saved successfully"); - } - Err(e) => { - tracing::error!("Failed to save log entry: {:?}", e); - //TODO: handle error + tracing::debug!("Loading current term"); + let current_term = consensus.current_term.load(Ordering::SeqCst); + tracing::debug!(current_term, "Current term loaded"); + + match Self::save_log_entry( + &consensus, + last_index + 1, + current_term, + LogEntryData::TransactionExecutionEntries(executions.clone()), + "transaction", + ) { + Ok(_) => { + tracing::debug!("Log entry saved successfully"); + } + Err(e) => { + tracing::error!("Failed to save log entry: {:?}", e); + //TODO: handle error + } } } } @@ -460,16 +465,26 @@ impl Consensus { if consensus.is_leader() { tracing::info!(number = block.header.number.as_u64(), "Leader received block to send to followers"); - //TODO: before saving check if all transaction_hashes are already in the log - tracing::debug!("Fetching last index from log entries storage for block"); - let last_index = consensus.log_entries_storage.get_last_index().unwrap_or(0); - tracing::debug!(last_index, "Last index for block fetched"); - - tracing::debug!("Loading current term for block"); - let current_term = consensus.current_term.load(Ordering::SeqCst); - tracing::debug!(current_term, "Current term for block loaded"); - - if Self::save_log_entry(&consensus, last_index + 1, current_term, LogEntryData::BlockEntry(block.header.to_append_entry_block_header(Vec::new())),"block").is_ok() { + #[cfg(feature = "rocks")] + { + //TODO: before saving check if all transaction_hashes are already in the log + tracing::debug!("Fetching last index from log entries storage for block"); + let last_index = consensus.log_entries_storage.get_last_index().unwrap_or(0); + tracing::debug!(last_index, "Last index for block fetched"); + + tracing::debug!("Loading current term for block"); + let current_term = consensus.current_term.load(Ordering::SeqCst); + tracing::debug!(current_term, "Current term for block loaded"); + + if Self::save_log_entry(&consensus, last_index + 1, current_term, LogEntryData::BlockEntry(block.header.to_append_entry_block_header(Vec::new())), "block").is_ok() { + let block_entry = LogEntryData::BlockEntry(block.header.to_append_entry_block_header(Vec::new())); + if consensus.broadcast_sender.send(block_entry).is_err() { + tracing::error!("failed to broadcast block"); + } + } + } + #[cfg(not(feature = "rocks"))] + { let block_entry = LogEntryData::BlockEntry(block.header.to_append_entry_block_header(Vec::new())); if consensus.broadcast_sender.send(block_entry).is_err() { tracing::error!("failed to broadcast block"); From 16128ad34ff00a281c29c5d6b7da000849fbb015 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Fri, 21 Jun 2024 17:39:34 -0300 Subject: [PATCH 21/65] fix --- src/eth/consensus/mod.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index 1d813c0fd..504433e69 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -159,6 +159,7 @@ pub struct Consensus { broadcast_sender: broadcast::Sender, //propagates the blocks importer_config: Option, //HACK this is used with sync online only storage: Arc, + #[cfg(feature = "rocks")] log_entries_storage: Arc, peers: Arc>>, direct_peers: Vec, @@ -194,6 +195,7 @@ impl Consensus { let consensus = Self { broadcast_sender, storage, + #[cfg(feature = "rocks")] log_entries_storage: Arc::new(AppendLogEntriesStorage::new(log_storage_path).unwrap()), peers, direct_peers, @@ -522,6 +524,7 @@ impl Consensus { }); } + #[cfg(feature = "rocks")] fn save_log_entry(consensus: &Arc, index: u64, term: u64, data: LogEntryData, entry_type: &str) -> Result<(), String> { tracing::debug!(index, term, "Creating {} log entry", entry_type); let log_entry = LogEntry { term, index, data }; From f11340ae50d7232c0df60538258e6e0a164c15c2 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Fri, 21 Jun 2024 17:52:38 -0300 Subject: [PATCH 22/65] refactor --- .../consensus/append_log_entries_storage.rs | 26 +++++++++++++++++ src/eth/consensus/mod.rs | 29 ++----------------- 2 files changed, 28 insertions(+), 27 deletions(-) diff --git a/src/eth/consensus/append_log_entries_storage.rs b/src/eth/consensus/append_log_entries_storage.rs index 1b9657537..a8af3f1a2 100644 --- a/src/eth/consensus/append_log_entries_storage.rs +++ b/src/eth/consensus/append_log_entries_storage.rs @@ -1,4 +1,5 @@ use std::path::Path; +use std::sync::Arc; use anyhow::Context; use anyhow::Result; @@ -7,6 +8,8 @@ use rocksdb::Options; use rocksdb::DB; use super::log_entry::LogEntry; +use super::log_entry::LogEntryData; +use super::Consensus; pub struct AppendLogEntriesStorage { db: DB, @@ -97,4 +100,27 @@ impl AppendLogEntriesStorage { None => Ok(0), // Default to 0 if not set } } + + pub fn save_log_entry(&self, consensus: &Arc, index: u64, term: u64, data: LogEntryData, entry_type: &str) -> Result<(), String> { + tracing::debug!(index, term, "Creating {} log entry", entry_type); + let log_entry = LogEntry { term, index, data }; + tracing::debug!(index = log_entry.index, term = log_entry.term, "{} log entry created", entry_type); + + tracing::debug!("Checking for existing {} entry at new index", entry_type); + if let Some(existing_entry) = consensus.log_entries_storage.get_entry(log_entry.index).unwrap_or(None) { + if existing_entry.term != log_entry.term { + tracing::debug!(index = log_entry.index, "Deleting {} entries from index due to term mismatch", entry_type); + consensus + .log_entries_storage + .delete_entries_from(log_entry.index) + .map_err(|e| format!("Failed to delete existing {} entries: {:?}", entry_type, e))?; + } + } + + tracing::debug!("Saving new {} log entry", entry_type); + consensus + .log_entries_storage + .save_entry(&log_entry) + .map_err(|e| format!("Failed to save {} log entry: {:?}", entry_type, e)) + } } diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index 504433e69..914d5235c 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -65,7 +65,6 @@ use append_entry::TransactionExecutionEntry; #[cfg(feature = "rocks")] use self::append_log_entries_storage::AppendLogEntriesStorage; -use self::log_entry::LogEntry; use self::log_entry::LogEntryData; use super::primitives::TransactionExecution; use super::primitives::TransactionInput; @@ -407,7 +406,7 @@ impl Consensus { let current_term = consensus.current_term.load(Ordering::SeqCst); tracing::debug!(current_term, "Current term loaded"); - match Self::save_log_entry( + match consensus.log_entries_storage.save_log_entry( &consensus, last_index + 1, current_term, @@ -478,7 +477,7 @@ impl Consensus { let current_term = consensus.current_term.load(Ordering::SeqCst); tracing::debug!(current_term, "Current term for block loaded"); - if Self::save_log_entry(&consensus, last_index + 1, current_term, LogEntryData::BlockEntry(block.header.to_append_entry_block_header(Vec::new())), "block").is_ok() { + if consensus.log_entries_storage.save_log_entry(&consensus, last_index + 1, current_term, LogEntryData::BlockEntry(block.header.to_append_entry_block_header(Vec::new())), "block").is_ok() { let block_entry = LogEntryData::BlockEntry(block.header.to_append_entry_block_header(Vec::new())); if consensus.broadcast_sender.send(block_entry).is_err() { tracing::error!("failed to broadcast block"); @@ -524,30 +523,6 @@ impl Consensus { }); } - #[cfg(feature = "rocks")] - fn save_log_entry(consensus: &Arc, index: u64, term: u64, data: LogEntryData, entry_type: &str) -> Result<(), String> { - tracing::debug!(index, term, "Creating {} log entry", entry_type); - let log_entry = LogEntry { term, index, data }; - tracing::debug!(index = log_entry.index, term = log_entry.term, "{} log entry created", entry_type); - - tracing::debug!("Checking for existing {} entry at new index", entry_type); - if let Some(existing_entry) = consensus.log_entries_storage.get_entry(log_entry.index).unwrap_or(None) { - if existing_entry.term != log_entry.term { - tracing::debug!(index = log_entry.index, "Deleting {} entries from index due to term mismatch", entry_type); - consensus - .log_entries_storage - .delete_entries_from(log_entry.index) - .map_err(|e| format!("Failed to delete existing {} entries: {:?}", entry_type, e))?; - } - } - - tracing::debug!("Saving new {} log entry", entry_type); - consensus - .log_entries_storage - .save_entry(&log_entry) - .map_err(|e| format!("Failed to save {} log entry: {:?}", entry_type, e)) - } - fn set_role(&self, role: Role) { self.role.store(role as u8, Ordering::SeqCst); } From 15d7182297269a3a6d9b109834214dffd41e6a59 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Fri, 21 Jun 2024 17:58:02 -0300 Subject: [PATCH 23/65] lint --- src/eth/consensus/append_log_entries_storage.rs | 14 ++++---------- src/eth/consensus/mod.rs | 3 +-- 2 files changed, 5 insertions(+), 12 deletions(-) diff --git a/src/eth/consensus/append_log_entries_storage.rs b/src/eth/consensus/append_log_entries_storage.rs index a8af3f1a2..2dddb62d3 100644 --- a/src/eth/consensus/append_log_entries_storage.rs +++ b/src/eth/consensus/append_log_entries_storage.rs @@ -1,5 +1,4 @@ use std::path::Path; -use std::sync::Arc; use anyhow::Context; use anyhow::Result; @@ -9,7 +8,6 @@ use rocksdb::DB; use super::log_entry::LogEntry; use super::log_entry::LogEntryData; -use super::Consensus; pub struct AppendLogEntriesStorage { db: DB, @@ -101,26 +99,22 @@ impl AppendLogEntriesStorage { } } - pub fn save_log_entry(&self, consensus: &Arc, index: u64, term: u64, data: LogEntryData, entry_type: &str) -> Result<(), String> { + pub fn save_log_entry(&self, index: u64, term: u64, data: LogEntryData, entry_type: &str) -> Result<(), String> { tracing::debug!(index, term, "Creating {} log entry", entry_type); let log_entry = LogEntry { term, index, data }; tracing::debug!(index = log_entry.index, term = log_entry.term, "{} log entry created", entry_type); tracing::debug!("Checking for existing {} entry at new index", entry_type); - if let Some(existing_entry) = consensus.log_entries_storage.get_entry(log_entry.index).unwrap_or(None) { + if let Some(existing_entry) = self.get_entry(log_entry.index).unwrap_or(None) { if existing_entry.term != log_entry.term { tracing::debug!(index = log_entry.index, "Deleting {} entries from index due to term mismatch", entry_type); - consensus - .log_entries_storage - .delete_entries_from(log_entry.index) + self.delete_entries_from(log_entry.index) .map_err(|e| format!("Failed to delete existing {} entries: {:?}", entry_type, e))?; } } tracing::debug!("Saving new {} log entry", entry_type); - consensus - .log_entries_storage - .save_entry(&log_entry) + self.save_entry(&log_entry) .map_err(|e| format!("Failed to save {} log entry: {:?}", entry_type, e)) } } diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index 914d5235c..b59cc590a 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -407,7 +407,6 @@ impl Consensus { tracing::debug!(current_term, "Current term loaded"); match consensus.log_entries_storage.save_log_entry( - &consensus, last_index + 1, current_term, LogEntryData::TransactionExecutionEntries(executions.clone()), @@ -477,7 +476,7 @@ impl Consensus { let current_term = consensus.current_term.load(Ordering::SeqCst); tracing::debug!(current_term, "Current term for block loaded"); - if consensus.log_entries_storage.save_log_entry(&consensus, last_index + 1, current_term, LogEntryData::BlockEntry(block.header.to_append_entry_block_header(Vec::new())), "block").is_ok() { + if consensus.log_entries_storage.save_log_entry(last_index + 1, current_term, LogEntryData::BlockEntry(block.header.to_append_entry_block_header(Vec::new())), "block").is_ok() { let block_entry = LogEntryData::BlockEntry(block.header.to_append_entry_block_header(Vec::new())); if consensus.broadcast_sender.send(block_entry).is_err() { tracing::error!("failed to broadcast block"); From 775dfe294e456c2e5f645a613c5489e33ae65cc5 Mon Sep 17 00:00:00 2001 From: renancloudwalk <53792026+renancloudwalk@users.noreply.github.com> Date: Mon, 24 Jun 2024 07:21:46 -0300 Subject: [PATCH 24/65] chore: return error if we have a conflict appending entries (#1218) --- src/eth/consensus/append_log_entries_storage.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/eth/consensus/append_log_entries_storage.rs b/src/eth/consensus/append_log_entries_storage.rs index 2dddb62d3..edf948a3d 100644 --- a/src/eth/consensus/append_log_entries_storage.rs +++ b/src/eth/consensus/append_log_entries_storage.rs @@ -99,7 +99,7 @@ impl AppendLogEntriesStorage { } } - pub fn save_log_entry(&self, index: u64, term: u64, data: LogEntryData, entry_type: &str) -> Result<(), String> { + pub fn save_log_entry(&self, index: u64, term: u64, data: LogEntryData, entry_type: &str) -> Result<()> { tracing::debug!(index, term, "Creating {} log entry", entry_type); let log_entry = LogEntry { term, index, data }; tracing::debug!(index = log_entry.index, term = log_entry.term, "{} log entry created", entry_type); @@ -107,14 +107,13 @@ impl AppendLogEntriesStorage { tracing::debug!("Checking for existing {} entry at new index", entry_type); if let Some(existing_entry) = self.get_entry(log_entry.index).unwrap_or(None) { if existing_entry.term != log_entry.term { - tracing::debug!(index = log_entry.index, "Deleting {} entries from index due to term mismatch", entry_type); - self.delete_entries_from(log_entry.index) - .map_err(|e| format!("Failed to delete existing {} entries: {:?}", entry_type, e))?; + tracing::error!(index = log_entry.index, "duplicated entries from index due to term mismatch"); + return Err(anyhow::anyhow!("Duplicated entries from index due to term mismatch")); } } tracing::debug!("Saving new {} log entry", entry_type); self.save_entry(&log_entry) - .map_err(|e| format!("Failed to save {} log entry: {:?}", entry_type, e)) + .map_err(|_| anyhow::anyhow!("Failed to save {}", entry_type)) } } From 470c72266445784b88c330b788b7f19a2165dfad Mon Sep 17 00:00:00 2001 From: renancloudwalk Date: Mon, 24 Jun 2024 07:30:55 -0300 Subject: [PATCH 25/65] chore: save log entries at executions --- src/eth/consensus/server.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/eth/consensus/server.rs b/src/eth/consensus/server.rs index 819cb552e..67f33a030 100644 --- a/src/eth/consensus/server.rs +++ b/src/eth/consensus/server.rs @@ -1,3 +1,4 @@ +use crate::eth::consensus::LogEntryData; use core::sync::atomic::Ordering; use std::sync::Arc; @@ -40,7 +41,15 @@ impl AppendEntryService for AppendEntryServiceImpl { } let executions = request_inner.executions; - //TODO save the transaction executions here + let index = request_inner.prev_log_index + 1; + let term = request_inner.prev_log_term; + let data = LogEntryData::TransactionExecutionEntries(executions.clone()); + + if let Err(e) = consensus.log_entries_storage.save_log_entry(index, term, data, "transaction") { + tracing::error!("Failed to save log entry: {:?}", e); + return Err(Status::internal("Failed to save log entry")); + } + //TODO send the executions to the Storage tracing::info!(executions = executions.len(), "appending executions"); From b566763bcb140f7b8899a30a92ed6b1777a91585 Mon Sep 17 00:00:00 2001 From: renancloudwalk Date: Mon, 24 Jun 2024 07:41:02 -0300 Subject: [PATCH 26/65] lint --- src/eth/consensus/append_log_entries_storage.rs | 5 ++--- src/eth/consensus/server.rs | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/eth/consensus/append_log_entries_storage.rs b/src/eth/consensus/append_log_entries_storage.rs index edf948a3d..2d671386a 100644 --- a/src/eth/consensus/append_log_entries_storage.rs +++ b/src/eth/consensus/append_log_entries_storage.rs @@ -108,12 +108,11 @@ impl AppendLogEntriesStorage { if let Some(existing_entry) = self.get_entry(log_entry.index).unwrap_or(None) { if existing_entry.term != log_entry.term { tracing::error!(index = log_entry.index, "duplicated entries from index due to term mismatch"); - return Err(anyhow::anyhow!("Duplicated entries from index due to term mismatch")); + return Err(anyhow::anyhow!("Duplicated entries from index due to term mismatch")); } } tracing::debug!("Saving new {} log entry", entry_type); - self.save_entry(&log_entry) - .map_err(|_| anyhow::anyhow!("Failed to save {}", entry_type)) + self.save_entry(&log_entry).map_err(|_| anyhow::anyhow!("Failed to save {}", entry_type)) } } diff --git a/src/eth/consensus/server.rs b/src/eth/consensus/server.rs index 67f33a030..c3d358c5b 100644 --- a/src/eth/consensus/server.rs +++ b/src/eth/consensus/server.rs @@ -1,4 +1,3 @@ -use crate::eth::consensus::LogEntryData; use core::sync::atomic::Ordering; use std::sync::Arc; @@ -15,6 +14,7 @@ use super::append_entry::RequestVoteRequest; use super::append_entry::RequestVoteResponse; use super::append_entry::StatusCode; use crate::eth::consensus::AppendEntryService; +use crate::eth::consensus::LogEntryData; use crate::eth::consensus::PeerAddress; use crate::eth::consensus::Role; use crate::eth::Consensus; From e308fe7b5a768046821a8b728db11ee877ae67d1 Mon Sep 17 00:00:00 2001 From: renancloudwalk Date: Mon, 24 Jun 2024 08:14:44 -0300 Subject: [PATCH 27/65] chore: test server --- src/eth/consensus/server.rs | 65 +++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/src/eth/consensus/server.rs b/src/eth/consensus/server.rs index c3d358c5b..988e30368 100644 --- a/src/eth/consensus/server.rs +++ b/src/eth/consensus/server.rs @@ -168,6 +168,7 @@ impl AppendEntryService for AppendEntryServiceImpl { mod tests { use std::net::Ipv4Addr; use std::net::SocketAddr; + use ethereum_types::H256; use tokio::sync::broadcast; use tokio::sync::Mutex; @@ -175,6 +176,7 @@ mod tests { use super::*; use crate::eth::consensus::BlockEntry; + use crate::eth::consensus::TransactionExecutionEntry; use crate::eth::storage::StratusStorage; // Helper function to create a mock consensus instance @@ -203,6 +205,69 @@ mod tests { .await } + fn create_mock_transaction_execution_entry() -> TransactionExecutionEntry { + TransactionExecutionEntry { + hash: H256::zero().to_string(), + nonce: 0, + value: vec![0u8; 32], + gas_price: vec![0u8; 32], + input: vec![0u8; 32], + v: 27, + r: vec![0u8; 32], + s: vec![0u8; 32], + chain_id: 1, + result: vec![0u8; 32], + output: vec![0u8; 32], + from: "0x0000000000000000000000000000000000000000".to_string(), + to: "0x0000000000000000000000000000000000000000".to_string(), + block_hash: H256::zero().to_string(), + block_number: 1, + transaction_index: 0, + logs: vec![], + gas: vec![0u8; 32], + receipt_cumulative_gas_used: vec![0u8; 32], + receipt_gas_used: vec![0u8; 32], + receipt_contract_address: vec![], + receipt_status: 1, + receipt_logs_bloom: vec![0u8; 256], + receipt_effective_gas_price: vec![0u8; 32], + } + } + + #[tokio::test] + async fn test_append_transaction_executions_insert() { + let consensus = create_mock_consensus().await; + let service = AppendEntryServiceImpl { + consensus: Mutex::new(Arc::clone(&consensus)), + }; + + consensus.set_role(Role::Follower); + + let executions = vec![create_mock_transaction_execution_entry()]; + + let request = Request::new(AppendTransactionExecutionsRequest { + term: 1, + leader_id: "leader_id".to_string(), + prev_log_index: 0, + prev_log_term: 0, + executions: executions.clone(), + }); + + let response = service.append_transaction_executions(request).await; + assert!(response.is_ok()); + + // Check if the log entry was inserted correctly + let log_entries_storage = &consensus.log_entries_storage; + let last_index = log_entries_storage.get_last_index().unwrap(); + let saved_entry = log_entries_storage.get_entry(last_index).unwrap().unwrap(); + + if let LogEntryData::TransactionExecutionEntries(saved_executions) = saved_entry.data { + assert_eq!(saved_executions, executions); + } else { + panic!("Expected transaction execution entries in the log entry"); + } + } + #[tokio::test] async fn test_append_transaction_executions_not_leader() { let consensus = create_mock_consensus().await; From 5d6c62852dd7c66e5898b79d8bf0113d43e0aaf7 Mon Sep 17 00:00:00 2001 From: renancloudwalk Date: Mon, 24 Jun 2024 09:08:18 -0300 Subject: [PATCH 28/65] fix: append log entries test --- src/eth/consensus/append_log_entries_storage.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/eth/consensus/append_log_entries_storage.rs b/src/eth/consensus/append_log_entries_storage.rs index 203f9e906..0c5e7f5ac 100644 --- a/src/eth/consensus/append_log_entries_storage.rs +++ b/src/eth/consensus/append_log_entries_storage.rs @@ -127,8 +127,8 @@ mod tests { fn setup_storage() -> AppendLogEntriesStorage { let temp_dir = TempDir::new().unwrap(); - let temp_path = temp_dir.path(); - AppendLogEntriesStorage::new(temp_path).unwrap() + let temp_path = temp_dir.path().to_str().expect("Failed to get temp path").to_string(); + AppendLogEntriesStorage::new(Some(temp_path)).unwrap() } #[test] From d5c49d9692244df3f4cc1ab16bb74b47bed77564 Mon Sep 17 00:00:00 2001 From: renancloudwalk Date: Mon, 24 Jun 2024 09:42:24 -0300 Subject: [PATCH 29/65] lint --- src/eth/consensus/server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/eth/consensus/server.rs b/src/eth/consensus/server.rs index 988e30368..54c844f03 100644 --- a/src/eth/consensus/server.rs +++ b/src/eth/consensus/server.rs @@ -168,8 +168,8 @@ impl AppendEntryService for AppendEntryServiceImpl { mod tests { use std::net::Ipv4Addr; use std::net::SocketAddr; - use ethereum_types::H256; + use ethereum_types::H256; use tokio::sync::broadcast; use tokio::sync::Mutex; use tonic::Request; From b6bd324af7bbc29dfce7aa030e12daa5f7e36965 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Mon, 24 Jun 2024 09:55:17 -0300 Subject: [PATCH 30/65] add conditional rocks save entry --- src/eth/consensus/server.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/eth/consensus/server.rs b/src/eth/consensus/server.rs index 54c844f03..f2575f4ca 100644 --- a/src/eth/consensus/server.rs +++ b/src/eth/consensus/server.rs @@ -45,6 +45,7 @@ impl AppendEntryService for AppendEntryServiceImpl { let term = request_inner.prev_log_term; let data = LogEntryData::TransactionExecutionEntries(executions.clone()); + #[cfg(feature = "rocks")] if let Err(e) = consensus.log_entries_storage.save_log_entry(index, term, data, "transaction") { tracing::error!("Failed to save log entry: {:?}", e); return Err(Status::internal("Failed to save log entry")); From 328eaf2f607b13ed7c5dbde8f2c4a601174e0de7 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Mon, 24 Jun 2024 10:13:07 -0300 Subject: [PATCH 31/65] improving resiliency --- .../consensus/append_log_entries_storage.rs | 27 +++++++++++++------ 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/src/eth/consensus/append_log_entries_storage.rs b/src/eth/consensus/append_log_entries_storage.rs index 0c5e7f5ac..d6b231f33 100644 --- a/src/eth/consensus/append_log_entries_storage.rs +++ b/src/eth/consensus/append_log_entries_storage.rs @@ -103,18 +103,29 @@ impl AppendLogEntriesStorage { tracing::debug!(index, term, "Creating {} log entry", entry_type); let log_entry = LogEntry { term, index, data }; tracing::debug!(index = log_entry.index, term = log_entry.term, "{} log entry created", entry_type); - + tracing::debug!("Checking for existing {} entry at new index", entry_type); - if let Some(existing_entry) = self.get_entry(log_entry.index).unwrap_or(None) { - if existing_entry.term != log_entry.term { - tracing::error!(index = log_entry.index, "duplicated entries from index due to term mismatch"); - return Err(anyhow::anyhow!("Duplicated entries from index due to term mismatch")); + match self.get_entry(log_entry.index) { + Ok(Some(existing_entry)) => { + if existing_entry.term != log_entry.term { + tracing::warn!(index = log_entry.index, "Conflicting entry found, deleting existing entry and all that follow it"); + self.delete_entries_from(log_entry.index)?; + } + } + Ok(None) => { + // No existing entry at this index, proceed to save the new entry + } + Err(e) => { + tracing::error!(index = log_entry.index, "Error retrieving entry: {}", e); + return Err(anyhow::anyhow!("Error retrieving entry: {}", e)); } } - - tracing::debug!("Saving new {} log entry", entry_type); - self.save_entry(&log_entry).map_err(|_| anyhow::anyhow!("Failed to save {}", entry_type)) + + tracing::debug!("Appending new {} log entry", entry_type); + self.save_entry(&log_entry) + .map_err(|e| anyhow::anyhow!("Failed to append {} log entry: {}", entry_type, e)) } + } #[cfg(test)] From 39547e9b5ad44d678a2527382e45005167448f9e Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Mon, 24 Jun 2024 10:14:24 -0300 Subject: [PATCH 32/65] remove debug logs --- src/eth/consensus/mod.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index fec709994..e32a1c0ed 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -396,11 +396,9 @@ impl Consensus { { tracing::debug!(executions_len = executions.len(), "Processing transaction executions"); if !executions.is_empty() { - tracing::debug!("Fetching last index from log entries storage"); let last_index = consensus.log_entries_storage.get_last_index().unwrap_or(0); tracing::debug!(last_index, "Last index fetched"); - tracing::debug!("Loading current term"); let current_term = consensus.current_term.load(Ordering::SeqCst); tracing::debug!(current_term, "Current term loaded"); From 5134ce65c8ca24f0d73af6e149ccc5ac27080759 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Mon, 24 Jun 2024 10:30:50 -0300 Subject: [PATCH 33/65] improve --- src/eth/consensus/mod.rs | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index e32a1c0ed..19dd29d82 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -464,18 +464,30 @@ impl Consensus { #[cfg(feature = "rocks")] { //TODO: before saving check if all transaction_hashes are already in the log - tracing::debug!("Fetching last index from log entries storage for block"); let last_index = consensus.log_entries_storage.get_last_index().unwrap_or(0); - tracing::debug!(last_index, "Last index for block fetched"); + tracing::debug!(last_index, "Last index fetched"); - tracing::debug!("Loading current term for block"); let current_term = consensus.current_term.load(Ordering::SeqCst); - tracing::debug!(current_term, "Current term for block loaded"); - - if consensus.log_entries_storage.save_log_entry(last_index + 1, current_term, LogEntryData::BlockEntry(block.header.to_append_entry_block_header(Vec::new())), "block").is_ok() { - let block_entry = LogEntryData::BlockEntry(block.header.to_append_entry_block_header(Vec::new())); - if consensus.broadcast_sender.send(block_entry).is_err() { - tracing::error!("failed to broadcast block"); + tracing::debug!(current_term, "Current term loaded"); + + let transaction_hashes: Vec = block.transactions.iter().map(|tx| tx.input.hash.to_string()).collect(); + + match consensus.log_entries_storage.save_log_entry( + last_index + 1, + current_term, + LogEntryData::BlockEntry(block.header.to_append_entry_block_header(transaction_hashes.clone())), + "block", + ) { + Ok(_) => { + tracing::debug!("Block log entry saved successfully"); + let block_entry = LogEntryData::BlockEntry(block.header.to_append_entry_block_header(transaction_hashes)); + if consensus.broadcast_sender.send(block_entry).is_err() { + tracing::error!("Failed to broadcast block"); + } + } + Err(e) => { + tracing::error!("Failed to save block log entry: {:?}", e); + // TODO: handle error } } } From 27027b0bfe1b253a3332ad673070ae98032dec76 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Mon, 24 Jun 2024 10:47:36 -0300 Subject: [PATCH 34/65] improve --- src/eth/consensus/mod.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index 19dd29d82..40d1b43b0 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -409,11 +409,10 @@ impl Consensus { "transaction", ) { Ok(_) => { - tracing::debug!("Log entry saved successfully"); + tracing::debug!("Transaction execution entry saved successfully"); } Err(e) => { - tracing::error!("Failed to save log entry: {:?}", e); - //TODO: handle error + tracing::error!("Failed to save transaction execution entry: {:?}", e); } } } @@ -457,7 +456,6 @@ impl Consensus { } } Ok(block) = rx_blocks.recv() => { - tracing::debug!("Attempting to receive block"); if consensus.is_leader() { tracing::info!(number = block.header.number.as_u64(), "Leader received block to send to followers"); @@ -479,15 +477,14 @@ impl Consensus { "block", ) { Ok(_) => { - tracing::debug!("Block log entry saved successfully"); + tracing::debug!("Block entry saved successfully"); let block_entry = LogEntryData::BlockEntry(block.header.to_append_entry_block_header(transaction_hashes)); if consensus.broadcast_sender.send(block_entry).is_err() { tracing::error!("Failed to broadcast block"); } } Err(e) => { - tracing::error!("Failed to save block log entry: {:?}", e); - // TODO: handle error + tracing::error!("Failed to save block entry: {:?}", e); } } } From f2412fe962f317ffc47b56b4b5579f1a3660695e Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Mon, 24 Jun 2024 10:50:20 -0300 Subject: [PATCH 35/65] fmt --- src/eth/consensus/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index 40d1b43b0..565d95bdd 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -492,7 +492,7 @@ impl Consensus { { let block_entry = LogEntryData::BlockEntry(block.header.to_append_entry_block_header(Vec::new())); if consensus.broadcast_sender.send(block_entry).is_err() { - tracing::error!("failed to broadcast block"); + tracing::error!("Failed to broadcast block"); } } } From b980f0b37f247dfc27e79d7a25bc38db8f038eed Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Mon, 24 Jun 2024 10:51:46 -0300 Subject: [PATCH 36/65] fmt --- src/eth/consensus/append_log_entries_storage.rs | 10 ++++++---- src/eth/consensus/log_entry.rs | 15 +++++++++------ src/eth/consensus/mod.rs | 7 ++++--- 3 files changed, 19 insertions(+), 13 deletions(-) diff --git a/src/eth/consensus/append_log_entries_storage.rs b/src/eth/consensus/append_log_entries_storage.rs index d6b231f33..0cb2c7f9a 100644 --- a/src/eth/consensus/append_log_entries_storage.rs +++ b/src/eth/consensus/append_log_entries_storage.rs @@ -103,12 +103,15 @@ impl AppendLogEntriesStorage { tracing::debug!(index, term, "Creating {} log entry", entry_type); let log_entry = LogEntry { term, index, data }; tracing::debug!(index = log_entry.index, term = log_entry.term, "{} log entry created", entry_type); - + tracing::debug!("Checking for existing {} entry at new index", entry_type); match self.get_entry(log_entry.index) { Ok(Some(existing_entry)) => { if existing_entry.term != log_entry.term { - tracing::warn!(index = log_entry.index, "Conflicting entry found, deleting existing entry and all that follow it"); + tracing::warn!( + index = log_entry.index, + "Conflicting entry found, deleting existing entry and all that follow it" + ); self.delete_entries_from(log_entry.index)?; } } @@ -120,12 +123,11 @@ impl AppendLogEntriesStorage { return Err(anyhow::anyhow!("Error retrieving entry: {}", e)); } } - + tracing::debug!("Appending new {} log entry", entry_type); self.save_entry(&log_entry) .map_err(|e| anyhow::anyhow!("Failed to append {} log entry: {}", entry_type, e)) } - } #[cfg(test)] diff --git a/src/eth/consensus/log_entry.rs b/src/eth/consensus/log_entry.rs index 9e2adf44a..68dabb553 100644 --- a/src/eth/consensus/log_entry.rs +++ b/src/eth/consensus/log_entry.rs @@ -32,10 +32,11 @@ impl Message for LogEntryData { LogEntryData::BlockEntry(header) => { prost::encoding::message::encode(1, header, buf); } - LogEntryData::TransactionExecutionEntries(executions) => + LogEntryData::TransactionExecutionEntries(executions) => { for execution in executions { prost::encoding::message::encode(2, execution, buf); - }, + } + } LogEntryData::EmptyData => {} } } @@ -51,7 +52,7 @@ impl Message for LogEntryData { B: bytes::Buf, { match tag { - 1 => + 1 => { if let LogEntryData::BlockEntry(ref mut header) = self { prost::encoding::message::merge(wire_type, header, buf, ctx) } else { @@ -59,7 +60,8 @@ impl Message for LogEntryData { prost::encoding::message::merge(wire_type, &mut header, buf, ctx)?; *self = LogEntryData::BlockEntry(header); Ok(()) - }, + } + } 2 => { let mut execution = TransactionExecutionEntry::default(); prost::encoding::message::merge(wire_type, &mut execution, buf, ctx)?; @@ -77,8 +79,9 @@ impl Message for LogEntryData { fn encoded_len(&self) -> usize { match self { LogEntryData::BlockEntry(header) => prost::encoding::message::encoded_len(1, header), - LogEntryData::TransactionExecutionEntries(executions) => - executions.iter().map(|execution| prost::encoding::message::encoded_len(2, execution)).sum(), + LogEntryData::TransactionExecutionEntries(executions) => { + executions.iter().map(|execution| prost::encoding::message::encoded_len(2, execution)).sum() + } LogEntryData::EmptyData => 0, } } diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index 565d95bdd..f769dd442 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -469,7 +469,7 @@ impl Consensus { tracing::debug!(current_term, "Current term loaded"); let transaction_hashes: Vec = block.transactions.iter().map(|tx| tx.input.hash.to_string()).collect(); - + match consensus.log_entries_storage.save_log_entry( last_index + 1, current_term, @@ -726,12 +726,13 @@ impl Consensus { }); match peer.client.append_transaction_executions(request).await { - Ok(response) => + Ok(response) => { if response.into_inner().status == StatusCode::AppendSuccess as i32 { tracing::info!("Successfully appended transaction executions to peer: {:?}", peer.client); } else { tracing::warn!("Failed to append transaction executions to peer: {:?}", peer.client); - }, + } + } Err(e) => { tracing::warn!("Error appending transaction executions to peer {:?}: {:?}", peer.client, e); } From 5a31ffc74424e4caf56909d5414aad4e74a2dafa Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Mon, 24 Jun 2024 10:54:49 -0300 Subject: [PATCH 37/65] fix broadcast --- src/eth/consensus/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index f769dd442..0b429687e 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -490,7 +490,8 @@ impl Consensus { } #[cfg(not(feature = "rocks"))] { - let block_entry = LogEntryData::BlockEntry(block.header.to_append_entry_block_header(Vec::new())); + let transaction_hashes: Vec = block.transactions.iter().map(|tx| tx.input.hash.to_string()).collect(); + let block_entry = LogEntryData::BlockEntry(block.header.to_append_entry_block_header(transaction_hashes)); if consensus.broadcast_sender.send(block_entry).is_err() { tracing::error!("Failed to broadcast block"); } From 8fce3a35f5bf4a33dc64272ac3625c9417d5a1c9 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Mon, 24 Jun 2024 10:55:45 -0300 Subject: [PATCH 38/65] remove log --- src/eth/consensus/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index 0b429687e..29f14cc74 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -499,7 +499,6 @@ impl Consensus { } } else => { - tracing::debug!("No transactions or blocks received, yielding"); tokio::task::yield_now().await; } } From 03079a2bde739964ec8dcf8ced95de30d130d110 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Mon, 24 Jun 2024 10:58:45 -0300 Subject: [PATCH 39/65] lint --- src/eth/consensus/append_log_entries_storage.rs | 5 ++--- src/eth/consensus/log_entry.rs | 15 ++++++--------- src/eth/consensus/mod.rs | 5 ++--- 3 files changed, 10 insertions(+), 15 deletions(-) diff --git a/src/eth/consensus/append_log_entries_storage.rs b/src/eth/consensus/append_log_entries_storage.rs index 0cb2c7f9a..2cf039bca 100644 --- a/src/eth/consensus/append_log_entries_storage.rs +++ b/src/eth/consensus/append_log_entries_storage.rs @@ -106,15 +106,14 @@ impl AppendLogEntriesStorage { tracing::debug!("Checking for existing {} entry at new index", entry_type); match self.get_entry(log_entry.index) { - Ok(Some(existing_entry)) => { + Ok(Some(existing_entry)) => if existing_entry.term != log_entry.term { tracing::warn!( index = log_entry.index, "Conflicting entry found, deleting existing entry and all that follow it" ); self.delete_entries_from(log_entry.index)?; - } - } + }, Ok(None) => { // No existing entry at this index, proceed to save the new entry } diff --git a/src/eth/consensus/log_entry.rs b/src/eth/consensus/log_entry.rs index 68dabb553..9e2adf44a 100644 --- a/src/eth/consensus/log_entry.rs +++ b/src/eth/consensus/log_entry.rs @@ -32,11 +32,10 @@ impl Message for LogEntryData { LogEntryData::BlockEntry(header) => { prost::encoding::message::encode(1, header, buf); } - LogEntryData::TransactionExecutionEntries(executions) => { + LogEntryData::TransactionExecutionEntries(executions) => for execution in executions { prost::encoding::message::encode(2, execution, buf); - } - } + }, LogEntryData::EmptyData => {} } } @@ -52,7 +51,7 @@ impl Message for LogEntryData { B: bytes::Buf, { match tag { - 1 => { + 1 => if let LogEntryData::BlockEntry(ref mut header) = self { prost::encoding::message::merge(wire_type, header, buf, ctx) } else { @@ -60,8 +59,7 @@ impl Message for LogEntryData { prost::encoding::message::merge(wire_type, &mut header, buf, ctx)?; *self = LogEntryData::BlockEntry(header); Ok(()) - } - } + }, 2 => { let mut execution = TransactionExecutionEntry::default(); prost::encoding::message::merge(wire_type, &mut execution, buf, ctx)?; @@ -79,9 +77,8 @@ impl Message for LogEntryData { fn encoded_len(&self) -> usize { match self { LogEntryData::BlockEntry(header) => prost::encoding::message::encoded_len(1, header), - LogEntryData::TransactionExecutionEntries(executions) => { - executions.iter().map(|execution| prost::encoding::message::encoded_len(2, execution)).sum() - } + LogEntryData::TransactionExecutionEntries(executions) => + executions.iter().map(|execution| prost::encoding::message::encoded_len(2, execution)).sum(), LogEntryData::EmptyData => 0, } } diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index 29f14cc74..666d3df17 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -726,13 +726,12 @@ impl Consensus { }); match peer.client.append_transaction_executions(request).await { - Ok(response) => { + Ok(response) => if response.into_inner().status == StatusCode::AppendSuccess as i32 { tracing::info!("Successfully appended transaction executions to peer: {:?}", peer.client); } else { tracing::warn!("Failed to append transaction executions to peer: {:?}", peer.client); - } - } + }, Err(e) => { tracing::warn!("Error appending transaction executions to peer {:?}: {:?}", peer.client, e); } From 56eb8833fc3e78afff99d2662c0356ff94fcc431 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Mon, 24 Jun 2024 14:35:21 -0300 Subject: [PATCH 40/65] add save_log_entry tests --- .../consensus/append_log_entries_storage.rs | 93 +++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/src/eth/consensus/append_log_entries_storage.rs b/src/eth/consensus/append_log_entries_storage.rs index 2cf039bca..417cc261a 100644 --- a/src/eth/consensus/append_log_entries_storage.rs +++ b/src/eth/consensus/append_log_entries_storage.rs @@ -244,4 +244,97 @@ mod tests { panic!("Expected TransactionExecutionEntries"); } } + + #[test] + fn test_save_log_entry_no_conflict() { + let storage = setup_storage(); + let index = 1; + let term = 1; + let log_entry_data = create_mock_log_entry_data_block(); + + storage.save_log_entry(index, term, log_entry_data.clone(), "block").unwrap(); + let retrieved_entry = storage.get_entry(index).unwrap().unwrap(); + + assert_eq!(retrieved_entry.index, index); + assert_eq!(retrieved_entry.term, term); + + if let LogEntryData::BlockEntry(ref block) = retrieved_entry.data { + if let LogEntryData::BlockEntry(ref expected_block) = log_entry_data { + assert_eq!(block.hash, expected_block.hash); + assert_eq!(block.number, expected_block.number); + assert_eq!(block.parent_hash, expected_block.parent_hash); + assert_eq!(block.nonce, expected_block.nonce); + assert_eq!(block.uncle_hash, expected_block.uncle_hash); + assert_eq!(block.transactions_root, expected_block.transactions_root); + assert_eq!(block.state_root, expected_block.state_root); + assert_eq!(block.receipts_root, expected_block.receipts_root); + assert_eq!(block.miner, expected_block.miner); + assert_eq!(block.difficulty, expected_block.difficulty); + assert_eq!(block.total_difficulty, expected_block.total_difficulty); + assert_eq!(block.extra_data, expected_block.extra_data); + assert_eq!(block.size, expected_block.size); + assert_eq!(block.gas_limit, expected_block.gas_limit); + assert_eq!(block.gas_used, expected_block.gas_used); + assert_eq!(block.timestamp, expected_block.timestamp); + assert_eq!(block.bloom, expected_block.bloom); + assert_eq!(block.author, expected_block.author); + assert_eq!(block.transaction_hashes, expected_block.transaction_hashes); + } else { + panic!("Expected BlockEntry"); + } + } else { + panic!("Expected BlockEntry"); + } + } + + #[test] + fn test_save_log_entry_with_conflict() { + let storage = setup_storage(); + let index = 1; + let term = 1; + let conflicting_term = 2; + let log_entry_data = create_mock_log_entry_data_block(); + + // Save initial log entry + storage.save_log_entry(index, term, log_entry_data.clone(), "block").unwrap(); + + // Save conflicting log entry at the same index but with a different term + storage.save_log_entry(index, conflicting_term, log_entry_data.clone(), "block").unwrap(); + + // Assert no entries exist after the conflicting entry's index + assert!(storage.get_entry(index + 1).unwrap().is_none()); + + // Retrieve the entry at the index and assert it matches the conflicting term entry + let retrieved_entry = storage.get_entry(index).unwrap().unwrap(); + assert_eq!(retrieved_entry.index, index); + assert_eq!(retrieved_entry.term, conflicting_term); + + if let LogEntryData::BlockEntry(ref block) = retrieved_entry.data { + if let LogEntryData::BlockEntry(ref expected_block) = log_entry_data { + assert_eq!(block.hash, expected_block.hash); + assert_eq!(block.number, expected_block.number); + assert_eq!(block.parent_hash, expected_block.parent_hash); + assert_eq!(block.nonce, expected_block.nonce); + assert_eq!(block.uncle_hash, expected_block.uncle_hash); + assert_eq!(block.transactions_root, expected_block.transactions_root); + assert_eq!(block.state_root, expected_block.state_root); + assert_eq!(block.receipts_root, expected_block.receipts_root); + assert_eq!(block.miner, expected_block.miner); + assert_eq!(block.difficulty, expected_block.difficulty); + assert_eq!(block.total_difficulty, expected_block.total_difficulty); + assert_eq!(block.extra_data, expected_block.extra_data); + assert_eq!(block.size, expected_block.size); + assert_eq!(block.gas_limit, expected_block.gas_limit); + assert_eq!(block.gas_used, expected_block.gas_used); + assert_eq!(block.timestamp, expected_block.timestamp); + assert_eq!(block.bloom, expected_block.bloom); + assert_eq!(block.author, expected_block.author); + assert_eq!(block.transaction_hashes, expected_block.transaction_hashes); + } else { + panic!("Expected BlockEntry"); + } + } else { + panic!("Expected BlockEntry"); + } + } } From 0907d716f87d5fa5cb7822f9085a839d41c958d1 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Mon, 24 Jun 2024 14:36:52 -0300 Subject: [PATCH 41/65] comment --- src/eth/consensus/append_log_entries_storage.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/eth/consensus/append_log_entries_storage.rs b/src/eth/consensus/append_log_entries_storage.rs index 417cc261a..ce8937be0 100644 --- a/src/eth/consensus/append_log_entries_storage.rs +++ b/src/eth/consensus/append_log_entries_storage.rs @@ -301,7 +301,7 @@ mod tests { // Save conflicting log entry at the same index but with a different term storage.save_log_entry(index, conflicting_term, log_entry_data.clone(), "block").unwrap(); - // Assert no entries exist after the conflicting entry's index + // Assert no entries exist after the conflicting entry's index, confirming that the conflicting entry and all that follow it were deleted assert!(storage.get_entry(index + 1).unwrap().is_none()); // Retrieve the entry at the index and assert it matches the conflicting term entry From 50cf10400d0c3994ea45b4e0f72ad42da8a89241 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Tue, 25 Jun 2024 09:26:08 -0300 Subject: [PATCH 42/65] fix lint --- src/eth/consensus/append_log_entries_storage.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/eth/consensus/append_log_entries_storage.rs b/src/eth/consensus/append_log_entries_storage.rs index ce8937be0..435898604 100644 --- a/src/eth/consensus/append_log_entries_storage.rs +++ b/src/eth/consensus/append_log_entries_storage.rs @@ -135,7 +135,6 @@ mod tests { use super::*; use crate::eth::consensus::tests::factories::*; - use crate::eth::consensus::LogEntryData; fn setup_storage() -> AppendLogEntriesStorage { let temp_dir = TempDir::new().unwrap(); From 62e5fe28beb63db4b90887ceea8600c11b2436a2 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Tue, 25 Jun 2024 13:43:42 -0300 Subject: [PATCH 43/65] fix conflicts --- src/eth/consensus/tests/factories.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/eth/consensus/tests/factories.rs b/src/eth/consensus/tests/factories.rs index a792bae1d..fcb3276c8 100644 --- a/src/eth/consensus/tests/factories.rs +++ b/src/eth/consensus/tests/factories.rs @@ -96,6 +96,7 @@ pub fn create_mock_log_entry(index: u64, term: u64, data: LogEntryData) -> LogEn pub async fn create_mock_consensus() -> Arc { let (storage, _tmpdir) = StratusStorage::mock_new_rocksdb(); + let (_log_entries_storage, tmpdir_log_entries) = StratusStorage::mock_new_rocksdb(); let direct_peers = Vec::new(); let importer_config = None; let jsonrpc_address = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0); @@ -103,8 +104,11 @@ pub async fn create_mock_consensus() -> Arc { let (tx_pending_txs, _) = broadcast::channel(10); let (tx_blocks, _) = broadcast::channel(10); + let tmpdir_log_entries_path = tmpdir_log_entries.path().to_str().map(|s| s.to_owned()); + Consensus::new( storage.into(), + tmpdir_log_entries_path, direct_peers, importer_config, jsonrpc_address, From a94b6ac52cbb892c05fa613c075df849b4073c7c Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Wed, 26 Jun 2024 14:35:17 -0300 Subject: [PATCH 44/65] add initial append block --- src/eth/consensus/server.rs | 49 +++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/src/eth/consensus/server.rs b/src/eth/consensus/server.rs index 382392dc0..0c1f64866 100644 --- a/src/eth/consensus/server.rs +++ b/src/eth/consensus/server.rs @@ -87,8 +87,16 @@ impl AppendEntryService for AppendEntryServiceImpl { let start = std::time::Instant::now(); let consensus = self.consensus.lock().await; + let current_term = consensus.current_term.load(Ordering::SeqCst); let request_inner = request.into_inner(); + // Return error if request term < current term + if request_inner.term < current_term { + let error_message = format!("Request term {} is less than current term {}", request_inner.term, current_term); + tracing::error!(request_term = request_inner.term, current_term = current_term, "{}", &error_message); + return Err(Status::new((StatusCode::TermMismatch as i32).into(), error_message)); + } + if consensus.is_leader() { tracing::error!(sender = request_inner.leader_id, "append_block_commit called on leader node"); return Err(Status::new( @@ -103,7 +111,48 @@ impl AppendEntryService for AppendEntryServiceImpl { tracing::info!(number = block_entry.number, "appending new block"); + // Return error if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm + if let Some(prev_entry) = consensus.log_entries_storage.get_entry(request_inner.prev_log_index).ok().flatten() { + if prev_entry.term != request_inner.prev_log_term { + let error_message = format!( + "prevLogIndex term mismatch: expected {}, found {} at index {}", + request_inner.prev_log_term, prev_entry.term, request_inner.prev_log_index + ); + tracing::error!( + prev_log_index = request_inner.prev_log_index, + expected_term = request_inner.prev_log_term, + actual_term = prev_entry.term, + "{}", + &error_message + ); + return Err(Status::new((StatusCode::LogMismatch as i32).into(), error_message)); + } + } else { + let error_message = format!("No entry found at prevLogIndex {}", request_inner.prev_log_index); + tracing::error!(prev_log_index = request_inner.prev_log_index, "{}", &error_message); + return Err(Status::new((StatusCode::LogMismatch as i32).into(), error_message)); + } + + // TODO: resolve log inconsistency instead? let last_last_arrived_block_number = consensus.last_arrived_block_number.load(Ordering::SeqCst); + if request_inner.prev_log_index != last_last_arrived_block_number { + tracing::error!( + "prevLogIndex mismatch: expected {}, got {}", + last_last_arrived_block_number, + request_inner.prev_log_index + ); + return Err(Status::invalid_argument("empty block entry")); + } + + let index = request_inner.prev_log_index + 1; + let term = request_inner.prev_log_term; + let data = LogEntryData::BlockEntry(block_entry.clone()); + + #[cfg(feature = "rocks")] + if let Err(e) = consensus.log_entries_storage.save_log_entry(index, term, data, "block") { + tracing::error!("Failed to save log entry: {:?}", e); + return Err(Status::internal("Failed to save log entry")); + } //TODO FIXME move this code back when we have propagation: let Some(diff) = last_last_arrived_block_number.checked_sub(block_entry.number) else { //TODO FIXME move this code back when we have propagation: tracing::error!( From 9dd82e1aee7828387c2c821c244a3c7b71c58ac0 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Wed, 26 Jun 2024 14:47:58 -0300 Subject: [PATCH 45/65] fmt --- src/eth/consensus/server.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/eth/consensus/server.rs b/src/eth/consensus/server.rs index 0c1f64866..174ed7796 100644 --- a/src/eth/consensus/server.rs +++ b/src/eth/consensus/server.rs @@ -136,12 +136,12 @@ impl AppendEntryService for AppendEntryServiceImpl { // TODO: resolve log inconsistency instead? let last_last_arrived_block_number = consensus.last_arrived_block_number.load(Ordering::SeqCst); if request_inner.prev_log_index != last_last_arrived_block_number { - tracing::error!( + let error_message = format!( "prevLogIndex mismatch: expected {}, got {}", - last_last_arrived_block_number, - request_inner.prev_log_index + last_last_arrived_block_number, request_inner.prev_log_index ); - return Err(Status::invalid_argument("empty block entry")); + tracing::error!("{}", &error_message); + return Err(Status::invalid_argument(error_message)); } let index = request_inner.prev_log_index + 1; From d3fd65d72866fb524f90a5d41607b92396a86df1 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Wed, 26 Jun 2024 14:56:04 -0300 Subject: [PATCH 46/65] fix --- src/eth/consensus/server.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/eth/consensus/server.rs b/src/eth/consensus/server.rs index 174ed7796..f24661226 100644 --- a/src/eth/consensus/server.rs +++ b/src/eth/consensus/server.rs @@ -90,13 +90,6 @@ impl AppendEntryService for AppendEntryServiceImpl { let current_term = consensus.current_term.load(Ordering::SeqCst); let request_inner = request.into_inner(); - // Return error if request term < current term - if request_inner.term < current_term { - let error_message = format!("Request term {} is less than current term {}", request_inner.term, current_term); - tracing::error!(request_term = request_inner.term, current_term = current_term, "{}", &error_message); - return Err(Status::new((StatusCode::TermMismatch as i32).into(), error_message)); - } - if consensus.is_leader() { tracing::error!(sender = request_inner.leader_id, "append_block_commit called on leader node"); return Err(Status::new( @@ -105,6 +98,13 @@ impl AppendEntryService for AppendEntryServiceImpl { )); } + // Return error if request term < current term + if request_inner.term < current_term { + let error_message = format!("Request term {} is less than current term {}", request_inner.term, current_term); + tracing::error!(request_term = request_inner.term, current_term = current_term, "{}", &error_message); + return Err(Status::new((StatusCode::TermMismatch as i32).into(), error_message)); + } + let Some(block_entry) = request_inner.block_entry else { return Err(Status::invalid_argument("empty block entry")); }; From 278f4bb4d600cb9cda1386907e2d59d1485454d6 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Wed, 26 Jun 2024 15:04:26 -0300 Subject: [PATCH 47/65] revert --- src/eth/consensus/server.rs | 49 ------------------------------------- 1 file changed, 49 deletions(-) diff --git a/src/eth/consensus/server.rs b/src/eth/consensus/server.rs index f24661226..382392dc0 100644 --- a/src/eth/consensus/server.rs +++ b/src/eth/consensus/server.rs @@ -87,7 +87,6 @@ impl AppendEntryService for AppendEntryServiceImpl { let start = std::time::Instant::now(); let consensus = self.consensus.lock().await; - let current_term = consensus.current_term.load(Ordering::SeqCst); let request_inner = request.into_inner(); if consensus.is_leader() { @@ -98,61 +97,13 @@ impl AppendEntryService for AppendEntryServiceImpl { )); } - // Return error if request term < current term - if request_inner.term < current_term { - let error_message = format!("Request term {} is less than current term {}", request_inner.term, current_term); - tracing::error!(request_term = request_inner.term, current_term = current_term, "{}", &error_message); - return Err(Status::new((StatusCode::TermMismatch as i32).into(), error_message)); - } - let Some(block_entry) = request_inner.block_entry else { return Err(Status::invalid_argument("empty block entry")); }; tracing::info!(number = block_entry.number, "appending new block"); - // Return error if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm - if let Some(prev_entry) = consensus.log_entries_storage.get_entry(request_inner.prev_log_index).ok().flatten() { - if prev_entry.term != request_inner.prev_log_term { - let error_message = format!( - "prevLogIndex term mismatch: expected {}, found {} at index {}", - request_inner.prev_log_term, prev_entry.term, request_inner.prev_log_index - ); - tracing::error!( - prev_log_index = request_inner.prev_log_index, - expected_term = request_inner.prev_log_term, - actual_term = prev_entry.term, - "{}", - &error_message - ); - return Err(Status::new((StatusCode::LogMismatch as i32).into(), error_message)); - } - } else { - let error_message = format!("No entry found at prevLogIndex {}", request_inner.prev_log_index); - tracing::error!(prev_log_index = request_inner.prev_log_index, "{}", &error_message); - return Err(Status::new((StatusCode::LogMismatch as i32).into(), error_message)); - } - - // TODO: resolve log inconsistency instead? let last_last_arrived_block_number = consensus.last_arrived_block_number.load(Ordering::SeqCst); - if request_inner.prev_log_index != last_last_arrived_block_number { - let error_message = format!( - "prevLogIndex mismatch: expected {}, got {}", - last_last_arrived_block_number, request_inner.prev_log_index - ); - tracing::error!("{}", &error_message); - return Err(Status::invalid_argument(error_message)); - } - - let index = request_inner.prev_log_index + 1; - let term = request_inner.prev_log_term; - let data = LogEntryData::BlockEntry(block_entry.clone()); - - #[cfg(feature = "rocks")] - if let Err(e) = consensus.log_entries_storage.save_log_entry(index, term, data, "block") { - tracing::error!("Failed to save log entry: {:?}", e); - return Err(Status::internal("Failed to save log entry")); - } //TODO FIXME move this code back when we have propagation: let Some(diff) = last_last_arrived_block_number.checked_sub(block_entry.number) else { //TODO FIXME move this code back when we have propagation: tracing::error!( From 1787959882f9d90697b4679376ad550b889752b9 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Wed, 26 Jun 2024 15:11:14 -0300 Subject: [PATCH 48/65] test --- src/eth/consensus/server.rs | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/src/eth/consensus/server.rs b/src/eth/consensus/server.rs index 382392dc0..02cbcb0f2 100644 --- a/src/eth/consensus/server.rs +++ b/src/eth/consensus/server.rs @@ -87,6 +87,7 @@ impl AppendEntryService for AppendEntryServiceImpl { let start = std::time::Instant::now(); let consensus = self.consensus.lock().await; + let current_term = consensus.current_term.load(Ordering::SeqCst); let request_inner = request.into_inner(); if consensus.is_leader() { @@ -97,13 +98,39 @@ impl AppendEntryService for AppendEntryServiceImpl { )); } + // Return error if request term < current term + if request_inner.term < current_term { + let error_message = format!("Request term {} is less than current term {}", request_inner.term, current_term); + tracing::error!(request_term = request_inner.term, current_term = current_term, "{}", &error_message); + return Err(Status::new((StatusCode::TermMismatch as i32).into(), error_message)); + } + let Some(block_entry) = request_inner.block_entry else { return Err(Status::invalid_argument("empty block entry")); }; tracing::info!(number = block_entry.number, "appending new block"); + // TODO: resolve log inconsistency instead? let last_last_arrived_block_number = consensus.last_arrived_block_number.load(Ordering::SeqCst); + if request_inner.prev_log_index != last_last_arrived_block_number { + tracing::error!( + "prevLogIndex mismatch: expected {}, got {}", + last_last_arrived_block_number, + request_inner.prev_log_index + ); + return Err(Status::invalid_argument("empty block entry")); + } + + let index = request_inner.prev_log_index + 1; + let term = request_inner.prev_log_term; + let data = LogEntryData::BlockEntry(block_entry.clone()); + + #[cfg(feature = "rocks")] + if let Err(e) = consensus.log_entries_storage.save_log_entry(index, term, data, "block") { + tracing::error!("Failed to save log entry: {:?}", e); + return Err(Status::internal("Failed to save log entry")); + } //TODO FIXME move this code back when we have propagation: let Some(diff) = last_last_arrived_block_number.checked_sub(block_entry.number) else { //TODO FIXME move this code back when we have propagation: tracing::error!( From e63816cccdd33bba0466404d1af04548367e704b Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Wed, 26 Jun 2024 15:14:17 -0300 Subject: [PATCH 49/65] comment out --- src/eth/consensus/server.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/eth/consensus/server.rs b/src/eth/consensus/server.rs index 02cbcb0f2..7ff0eca79 100644 --- a/src/eth/consensus/server.rs +++ b/src/eth/consensus/server.rs @@ -112,15 +112,15 @@ impl AppendEntryService for AppendEntryServiceImpl { tracing::info!(number = block_entry.number, "appending new block"); // TODO: resolve log inconsistency instead? - let last_last_arrived_block_number = consensus.last_arrived_block_number.load(Ordering::SeqCst); - if request_inner.prev_log_index != last_last_arrived_block_number { - tracing::error!( - "prevLogIndex mismatch: expected {}, got {}", - last_last_arrived_block_number, - request_inner.prev_log_index - ); - return Err(Status::invalid_argument("empty block entry")); - } + //let last_last_arrived_block_number = consensus.last_arrived_block_number.load(Ordering::SeqCst); + //if request_inner.prev_log_index != last_last_arrived_block_number { + // tracing::error!( + // "prevLogIndex mismatch: expected {}, got {}", + // last_last_arrived_block_number, + // request_inner.prev_log_index + // ); + // return Err(Status::invalid_argument("empty block entry")); + //} let index = request_inner.prev_log_index + 1; let term = request_inner.prev_log_term; From 2be41fc375cfe366e6225e00d0a2143851620fbd Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Wed, 26 Jun 2024 15:17:25 -0300 Subject: [PATCH 50/65] comment out --- src/eth/consensus/server.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/eth/consensus/server.rs b/src/eth/consensus/server.rs index 7ff0eca79..0e3c404ea 100644 --- a/src/eth/consensus/server.rs +++ b/src/eth/consensus/server.rs @@ -122,15 +122,15 @@ impl AppendEntryService for AppendEntryServiceImpl { // return Err(Status::invalid_argument("empty block entry")); //} - let index = request_inner.prev_log_index + 1; - let term = request_inner.prev_log_term; - let data = LogEntryData::BlockEntry(block_entry.clone()); - - #[cfg(feature = "rocks")] - if let Err(e) = consensus.log_entries_storage.save_log_entry(index, term, data, "block") { - tracing::error!("Failed to save log entry: {:?}", e); - return Err(Status::internal("Failed to save log entry")); - } + //let index = request_inner.prev_log_index + 1; + //let term = request_inner.prev_log_term; + //let data = LogEntryData::BlockEntry(block_entry.clone()); + + //#[cfg(feature = "rocks")] + //if let Err(e) = consensus.log_entries_storage.save_log_entry(index, term, data, "block") { + // tracing::error!("Failed to save log entry: {:?}", e); + // return Err(Status::internal("Failed to save log entry")); + //} //TODO FIXME move this code back when we have propagation: let Some(diff) = last_last_arrived_block_number.checked_sub(block_entry.number) else { //TODO FIXME move this code back when we have propagation: tracing::error!( From b9141aae35a5ba25c24bd99da560f3d00a673e19 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Wed, 26 Jun 2024 15:21:29 -0300 Subject: [PATCH 51/65] revert --- src/eth/consensus/server.rs | 29 +---------------------------- 1 file changed, 1 insertion(+), 28 deletions(-) diff --git a/src/eth/consensus/server.rs b/src/eth/consensus/server.rs index 0e3c404ea..382392dc0 100644 --- a/src/eth/consensus/server.rs +++ b/src/eth/consensus/server.rs @@ -87,7 +87,6 @@ impl AppendEntryService for AppendEntryServiceImpl { let start = std::time::Instant::now(); let consensus = self.consensus.lock().await; - let current_term = consensus.current_term.load(Ordering::SeqCst); let request_inner = request.into_inner(); if consensus.is_leader() { @@ -98,39 +97,13 @@ impl AppendEntryService for AppendEntryServiceImpl { )); } - // Return error if request term < current term - if request_inner.term < current_term { - let error_message = format!("Request term {} is less than current term {}", request_inner.term, current_term); - tracing::error!(request_term = request_inner.term, current_term = current_term, "{}", &error_message); - return Err(Status::new((StatusCode::TermMismatch as i32).into(), error_message)); - } - let Some(block_entry) = request_inner.block_entry else { return Err(Status::invalid_argument("empty block entry")); }; tracing::info!(number = block_entry.number, "appending new block"); - // TODO: resolve log inconsistency instead? - //let last_last_arrived_block_number = consensus.last_arrived_block_number.load(Ordering::SeqCst); - //if request_inner.prev_log_index != last_last_arrived_block_number { - // tracing::error!( - // "prevLogIndex mismatch: expected {}, got {}", - // last_last_arrived_block_number, - // request_inner.prev_log_index - // ); - // return Err(Status::invalid_argument("empty block entry")); - //} - - //let index = request_inner.prev_log_index + 1; - //let term = request_inner.prev_log_term; - //let data = LogEntryData::BlockEntry(block_entry.clone()); - - //#[cfg(feature = "rocks")] - //if let Err(e) = consensus.log_entries_storage.save_log_entry(index, term, data, "block") { - // tracing::error!("Failed to save log entry: {:?}", e); - // return Err(Status::internal("Failed to save log entry")); - //} + let last_last_arrived_block_number = consensus.last_arrived_block_number.load(Ordering::SeqCst); //TODO FIXME move this code back when we have propagation: let Some(diff) = last_last_arrived_block_number.checked_sub(block_entry.number) else { //TODO FIXME move this code back when we have propagation: tracing::error!( From f7de3eb7829a356a75558bcf5bc065be7d181b67 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Wed, 26 Jun 2024 15:36:57 -0300 Subject: [PATCH 52/65] test --- src/eth/consensus/server.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/eth/consensus/server.rs b/src/eth/consensus/server.rs index 382392dc0..5869428f3 100644 --- a/src/eth/consensus/server.rs +++ b/src/eth/consensus/server.rs @@ -103,6 +103,16 @@ impl AppendEntryService for AppendEntryServiceImpl { tracing::info!(number = block_entry.number, "appending new block"); + let index = request_inner.prev_log_index + 1; + let term = request_inner.prev_log_term; + let data = LogEntryData::BlockEntry(block_entry.clone()); + + #[cfg(feature = "rocks")] + if let Err(e) = consensus.log_entries_storage.save_log_entry(index, term, data, "block") { + tracing::error!("Failed to save log entry: {:?}", e); + return Err(Status::internal("Failed to save log entry")); + } + let last_last_arrived_block_number = consensus.last_arrived_block_number.load(Ordering::SeqCst); //TODO FIXME move this code back when we have propagation: let Some(diff) = last_last_arrived_block_number.checked_sub(block_entry.number) else { From 53a3458a3a1bc658f0776e7a66c58907f8fa2d3d Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Wed, 26 Jun 2024 15:56:10 -0300 Subject: [PATCH 53/65] test term validation --- src/eth/consensus/server.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/eth/consensus/server.rs b/src/eth/consensus/server.rs index 5869428f3..7b7eb3850 100644 --- a/src/eth/consensus/server.rs +++ b/src/eth/consensus/server.rs @@ -42,6 +42,7 @@ impl AppendEntryService for AppendEntryServiceImpl { let start = std::time::Instant::now(); let consensus = self.consensus.lock().await; + let current_term = consensus.current_term.load(Ordering::SeqCst); let request_inner = request.into_inner(); if consensus.is_leader() { @@ -52,6 +53,13 @@ impl AppendEntryService for AppendEntryServiceImpl { )); } + // Return error if request term < current term + if request_inner.term < current_term { + let error_message = format!("Request term {} is less than current term {}", request_inner.term, current_term); + tracing::error!(request_term = request_inner.term, current_term = current_term, "{}", &error_message); + return Err(Status::new((StatusCode::TermMismatch as i32).into(), error_message)); + } + let executions = request_inner.executions; let index = request_inner.prev_log_index + 1; let term = request_inner.prev_log_term; @@ -87,6 +95,7 @@ impl AppendEntryService for AppendEntryServiceImpl { let start = std::time::Instant::now(); let consensus = self.consensus.lock().await; + let current_term = consensus.current_term.load(Ordering::SeqCst); let request_inner = request.into_inner(); if consensus.is_leader() { @@ -97,6 +106,13 @@ impl AppendEntryService for AppendEntryServiceImpl { )); } + // Return error if request term < current term + if request_inner.term < current_term { + let error_message = format!("Request term {} is less than current term {}", request_inner.term, current_term); + tracing::error!(request_term = request_inner.term, current_term = current_term, "{}", &error_message); + return Err(Status::new((StatusCode::TermMismatch as i32).into(), error_message)); + } + let Some(block_entry) = request_inner.block_entry else { return Err(Status::invalid_argument("empty block entry")); }; From f3645767c2e3272d37cca9d8f7ac938db961e8cc Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Wed, 26 Jun 2024 16:01:06 -0300 Subject: [PATCH 54/65] update e2e check --- chaos/experiments/leader-election.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chaos/experiments/leader-election.sh b/chaos/experiments/leader-election.sh index 6fbe8fcfa..c873a0dd7 100755 --- a/chaos/experiments/leader-election.sh +++ b/chaos/experiments/leader-election.sh @@ -91,7 +91,7 @@ check_leader() { # Check the response for specific strings to determine the node status if [[ "$response" == *"append_transaction_executions called on leader node"* ]]; then return 0 # Success exit code for leader - elif [[ "$response" == *"APPEND_SUCCESS"* ]]; then + elif [[ "$response" == *"TERM_MISMATCH"* ]]; then return 1 # Failure exit code for non-leader fi } From b434dbc9b8ded070d692c082cc44a7e4de3166ba Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Wed, 26 Jun 2024 16:18:51 -0300 Subject: [PATCH 55/65] test --- chaos/experiments/leader-election.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chaos/experiments/leader-election.sh b/chaos/experiments/leader-election.sh index c873a0dd7..99208bf91 100755 --- a/chaos/experiments/leader-election.sh +++ b/chaos/experiments/leader-election.sh @@ -91,7 +91,7 @@ check_leader() { # Check the response for specific strings to determine the node status if [[ "$response" == *"append_transaction_executions called on leader node"* ]]; then return 0 # Success exit code for leader - elif [[ "$response" == *"TERM_MISMATCH"* ]]; then + else return 1 # Failure exit code for non-leader fi } From f3e8927dec5d038af0f43914de9b1f9845e88e02 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Wed, 26 Jun 2024 16:26:19 -0300 Subject: [PATCH 56/65] revert --- chaos/experiments/leader-election.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chaos/experiments/leader-election.sh b/chaos/experiments/leader-election.sh index 99208bf91..6fbe8fcfa 100755 --- a/chaos/experiments/leader-election.sh +++ b/chaos/experiments/leader-election.sh @@ -91,7 +91,7 @@ check_leader() { # Check the response for specific strings to determine the node status if [[ "$response" == *"append_transaction_executions called on leader node"* ]]; then return 0 # Success exit code for leader - else + elif [[ "$response" == *"APPEND_SUCCESS"* ]]; then return 1 # Failure exit code for non-leader fi } From 286d7195737f0e2cea7055e737a00e4e76ca6bda Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Thu, 27 Jun 2024 10:48:07 -0300 Subject: [PATCH 57/65] revert --- src/eth/consensus/server.rs | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/src/eth/consensus/server.rs b/src/eth/consensus/server.rs index 7b7eb3850..cc446bb3d 100644 --- a/src/eth/consensus/server.rs +++ b/src/eth/consensus/server.rs @@ -53,13 +53,6 @@ impl AppendEntryService for AppendEntryServiceImpl { )); } - // Return error if request term < current term - if request_inner.term < current_term { - let error_message = format!("Request term {} is less than current term {}", request_inner.term, current_term); - tracing::error!(request_term = request_inner.term, current_term = current_term, "{}", &error_message); - return Err(Status::new((StatusCode::TermMismatch as i32).into(), error_message)); - } - let executions = request_inner.executions; let index = request_inner.prev_log_index + 1; let term = request_inner.prev_log_term; @@ -107,11 +100,11 @@ impl AppendEntryService for AppendEntryServiceImpl { } // Return error if request term < current term - if request_inner.term < current_term { - let error_message = format!("Request term {} is less than current term {}", request_inner.term, current_term); - tracing::error!(request_term = request_inner.term, current_term = current_term, "{}", &error_message); - return Err(Status::new((StatusCode::TermMismatch as i32).into(), error_message)); - } + //if request_inner.term < current_term { + // let error_message = format!("Request term {} is less than current term {}", request_inner.term, current_term); + // tracing::error!(request_term = request_inner.term, current_term = current_term, "{}", &error_message); + // return Err(Status::new((StatusCode::TermMismatch as i32).into(), error_message)); + //} let Some(block_entry) = request_inner.block_entry else { return Err(Status::invalid_argument("empty block entry")); From 5e78af1137b9cedf1d8928e277c7d7ae2dccfacc Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Thu, 27 Jun 2024 10:50:57 -0300 Subject: [PATCH 58/65] fix main merge --- src/eth/consensus/append_log_entries_storage.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/eth/consensus/append_log_entries_storage.rs b/src/eth/consensus/append_log_entries_storage.rs index bcc57e148..7dcea39b2 100644 --- a/src/eth/consensus/append_log_entries_storage.rs +++ b/src/eth/consensus/append_log_entries_storage.rs @@ -256,14 +256,11 @@ mod tests { assert_eq!(block.hash, expected_block.hash); assert_eq!(block.number, expected_block.number); assert_eq!(block.parent_hash, expected_block.parent_hash); - assert_eq!(block.nonce, expected_block.nonce); assert_eq!(block.uncle_hash, expected_block.uncle_hash); assert_eq!(block.transactions_root, expected_block.transactions_root); assert_eq!(block.state_root, expected_block.state_root); assert_eq!(block.receipts_root, expected_block.receipts_root); assert_eq!(block.miner, expected_block.miner); - assert_eq!(block.difficulty, expected_block.difficulty); - assert_eq!(block.total_difficulty, expected_block.total_difficulty); assert_eq!(block.extra_data, expected_block.extra_data); assert_eq!(block.size, expected_block.size); assert_eq!(block.gas_limit, expected_block.gas_limit); @@ -307,14 +304,11 @@ mod tests { assert_eq!(block.hash, expected_block.hash); assert_eq!(block.number, expected_block.number); assert_eq!(block.parent_hash, expected_block.parent_hash); - assert_eq!(block.nonce, expected_block.nonce); assert_eq!(block.uncle_hash, expected_block.uncle_hash); assert_eq!(block.transactions_root, expected_block.transactions_root); assert_eq!(block.state_root, expected_block.state_root); assert_eq!(block.receipts_root, expected_block.receipts_root); assert_eq!(block.miner, expected_block.miner); - assert_eq!(block.difficulty, expected_block.difficulty); - assert_eq!(block.total_difficulty, expected_block.total_difficulty); assert_eq!(block.extra_data, expected_block.extra_data); assert_eq!(block.size, expected_block.size); assert_eq!(block.gas_limit, expected_block.gas_limit); From a811fac0cbc11a662bb1d600c0a0f1cf9d37b24f Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Thu, 27 Jun 2024 10:54:19 -0300 Subject: [PATCH 59/65] type --- src/eth/consensus/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index 63a09a4c5..4cd1626c8 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -485,7 +485,7 @@ impl Consensus { let current_term = consensus.current_term.load(Ordering::SeqCst); tracing::debug!(current_term, "Current term loaded"); - let transaction_hashes: Vec = block.transactions.iter().map(|tx| tx.input.hash.to_string()).collect(); + let transaction_hashes: Vec> = block.transactions.iter().map(|tx| tx.input.hash.to_string().into_bytes()).collect(); match consensus.log_entries_storage.save_log_entry( last_index + 1, From 1298d07a4ba6f52f7c8587f97ec53be33695d555 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Thu, 27 Jun 2024 11:00:17 -0300 Subject: [PATCH 60/65] fix --- src/eth/consensus/server.rs | 9 --------- 1 file changed, 9 deletions(-) diff --git a/src/eth/consensus/server.rs b/src/eth/consensus/server.rs index cc446bb3d..5869428f3 100644 --- a/src/eth/consensus/server.rs +++ b/src/eth/consensus/server.rs @@ -42,7 +42,6 @@ impl AppendEntryService for AppendEntryServiceImpl { let start = std::time::Instant::now(); let consensus = self.consensus.lock().await; - let current_term = consensus.current_term.load(Ordering::SeqCst); let request_inner = request.into_inner(); if consensus.is_leader() { @@ -88,7 +87,6 @@ impl AppendEntryService for AppendEntryServiceImpl { let start = std::time::Instant::now(); let consensus = self.consensus.lock().await; - let current_term = consensus.current_term.load(Ordering::SeqCst); let request_inner = request.into_inner(); if consensus.is_leader() { @@ -99,13 +97,6 @@ impl AppendEntryService for AppendEntryServiceImpl { )); } - // Return error if request term < current term - //if request_inner.term < current_term { - // let error_message = format!("Request term {} is less than current term {}", request_inner.term, current_term); - // tracing::error!(request_term = request_inner.term, current_term = current_term, "{}", &error_message); - // return Err(Status::new((StatusCode::TermMismatch as i32).into(), error_message)); - //} - let Some(block_entry) = request_inner.block_entry else { return Err(Status::invalid_argument("empty block entry")); }; From 158b95afd381f4d36ef8a8858a8f0228626c28d0 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Thu, 27 Jun 2024 11:17:19 -0300 Subject: [PATCH 61/65] fix --- src/eth/consensus/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index 4cd1626c8..a5a5f02fa 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -507,7 +507,7 @@ impl Consensus { } #[cfg(not(feature = "rocks"))] { - let transaction_hashes: Vec = block.transactions.iter().map(|tx| tx.input.hash.to_string()).collect(); + let transaction_hashes: Vec> = block.transactions.iter().map(|tx| tx.input.hash.to_string().into_bytes()).collect(); let block_entry = LogEntryData::BlockEntry(block.header.to_append_entry_block_header(transaction_hashes)); if consensus.broadcast_sender.send(block_entry).is_err() { tracing::error!("Failed to broadcast block"); From 174d5f7a040e5fcddbdc82ffced181939b7c74ff Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Thu, 27 Jun 2024 13:35:56 -0300 Subject: [PATCH 62/65] add term check --- chaos/experiments/leader-election.sh | 2 +- src/eth/consensus/server.rs | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/chaos/experiments/leader-election.sh b/chaos/experiments/leader-election.sh index 8ee7f8e11..ae1e6564c 100755 --- a/chaos/experiments/leader-election.sh +++ b/chaos/experiments/leader-election.sh @@ -86,7 +86,7 @@ check_leader() { local grpc_address=$1 # Send the gRPC request using grpcurl and capture both stdout and stderr - response=$(grpcurl -import-path static/proto -proto append_entry.proto -plaintext -d '{"term": 5, "prevLogIndex": 0, "prevLogTerm": 4, "leader_id": "leader_id_value", "block_entry": {"number": 1, "hash": "ZXh0cmFfZGF0YV92YWx1ZQ==", "transactions_root": "ZXh0cmFfZGF0YV92YWx1ZQ==", "gas_used": 999, "gas_limit": 999, "bloom": "ZXh0cmFfZGF0YV92YWx1ZQ==", "timestamp": 123456789, "parent_hash": "ZXh0cmFfZGF0YV92YWx1ZQ==", "author": "ZXh0cmFfZGF0YV92YWx1ZQ==", "extra_data": "ZXh0cmFfZGF0YV92YWx1ZQ==", "miner": "ZXh0cmFfZGF0YV92YWx1ZQ==", "receipts_root": "ZXh0cmFfZGF0YV92YWx1ZQ==", "uncle_hash": "ZXh0cmFfZGF0YV92YWx1ZQ==", "size": 12345, "state_root": "ZXh0cmFfZGF0YV92YWx1ZQ==", "transaction_hashes": []}}' "$grpc_address" append_entry.AppendEntryService/AppendBlockCommit 2>&1) + response=$(grpcurl -import-path static/proto -proto append_entry.proto -plaintext -d '{"term": 999999999, "prevLogIndex": 0, "prevLogTerm": 0, "leader_id": "leader_id_value", "block_entry": {"number": 1, "hash": "ZXh0cmFfZGF0YV92YWx1ZQ==", "transactions_root": "ZXh0cmFfZGF0YV92YWx1ZQ==", "gas_used": 999, "gas_limit": 999, "bloom": "ZXh0cmFfZGF0YV92YWx1ZQ==", "timestamp": 123456789, "parent_hash": "ZXh0cmFfZGF0YV92YWx1ZQ==", "author": "ZXh0cmFfZGF0YV92YWx1ZQ==", "extra_data": "ZXh0cmFfZGF0YV92YWx1ZQ==", "miner": "ZXh0cmFfZGF0YV92YWx1ZQ==", "receipts_root": "ZXh0cmFfZGF0YV92YWx1ZQ==", "uncle_hash": "ZXh0cmFfZGF0YV92YWx1ZQ==", "size": 12345, "state_root": "ZXh0cmFfZGF0YV92YWx1ZQ==", "transaction_hashes": []}}' "$grpc_address" append_entry.AppendEntryService/AppendBlockCommit 2>&1) # Check the response for specific strings to determine the node status if [[ "$response" == *"append_transaction_executions called on leader node"* ]]; then diff --git a/src/eth/consensus/server.rs b/src/eth/consensus/server.rs index 5869428f3..e0efa3e9c 100644 --- a/src/eth/consensus/server.rs +++ b/src/eth/consensus/server.rs @@ -42,6 +42,7 @@ impl AppendEntryService for AppendEntryServiceImpl { let start = std::time::Instant::now(); let consensus = self.consensus.lock().await; + let current_term = consensus.current_term.load(Ordering::SeqCst); let request_inner = request.into_inner(); if consensus.is_leader() { @@ -52,6 +53,12 @@ impl AppendEntryService for AppendEntryServiceImpl { )); } + if request_inner.term < current_term { + let error_message = format!("Request term {} is less than current term {}", request_inner.term, current_term); + tracing::error!(request_term = request_inner.term, current_term = current_term, "{}", &error_message); + return Err(Status::new((StatusCode::TermMismatch as i32).into(), error_message)); + } + let executions = request_inner.executions; let index = request_inner.prev_log_index + 1; let term = request_inner.prev_log_term; @@ -87,6 +94,7 @@ impl AppendEntryService for AppendEntryServiceImpl { let start = std::time::Instant::now(); let consensus = self.consensus.lock().await; + let current_term = consensus.current_term.load(Ordering::SeqCst); let request_inner = request.into_inner(); if consensus.is_leader() { @@ -97,6 +105,12 @@ impl AppendEntryService for AppendEntryServiceImpl { )); } + if request_inner.term < current_term { + let error_message = format!("Request term {} is less than current term {}", request_inner.term, current_term); + tracing::error!(request_term = request_inner.term, current_term = current_term, "{}", &error_message); + return Err(Status::new((StatusCode::TermMismatch as i32).into(), error_message)); + } + let Some(block_entry) = request_inner.block_entry else { return Err(Status::invalid_argument("empty block entry")); }; From 525a95a15da52e3ba9e4202f3fdf3eb1835f9b94 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Thu, 27 Jun 2024 14:47:22 -0300 Subject: [PATCH 63/65] test unit --- src/eth/consensus/server.rs | 51 ++++++++++++++++++- src/eth/consensus/tests/factories.rs | 7 ++- src/eth/consensus/tests/test_simple_blocks.rs | 2 +- 3 files changed, 57 insertions(+), 3 deletions(-) diff --git a/src/eth/consensus/server.rs b/src/eth/consensus/server.rs index e0efa3e9c..4ba95a0bc 100644 --- a/src/eth/consensus/server.rs +++ b/src/eth/consensus/server.rs @@ -315,7 +315,7 @@ mod tests { #[tokio::test] async fn test_append_block_commit_not_leader() { - let consensus = create_follower_consensus_with_leader().await; + let consensus = create_follower_consensus_with_leader(None).await; let service = AppendEntryServiceImpl { consensus: Mutex::new(Arc::clone(&consensus)), }; @@ -397,4 +397,53 @@ mod tests { assert_eq!(response.term, 1); assert!(response.vote_granted); } + + #[tokio::test] + async fn test_append_transaction_executions_not_leader_term_mismatch() { + let consensus = create_follower_consensus_with_leader(Some(2)).await; + let service = AppendEntryServiceImpl { + consensus: Mutex::new(Arc::clone(&consensus)), + }; + + let request = Request::new(AppendTransactionExecutionsRequest { + term: 1, + leader_id: "leader_id".to_string(), + prev_log_index: 0, + prev_log_term: 0, + executions: vec![], + }); + + let response = service.append_transaction_executions(request).await; + assert!(response.is_err()); + + let status = response.unwrap_err(); + assert_eq!(status.code(), tonic::Code::Unknown); + assert_eq!(status.message(), "Request term 1 is less than current term 2"); + } + + #[tokio::test] + async fn test_append_block_commit_not_leader_term_mismatch() { + let consensus = create_follower_consensus_with_leader(Some(2)).await; + let service = AppendEntryServiceImpl { + consensus: Mutex::new(Arc::clone(&consensus)), + }; + + let request = Request::new(AppendBlockCommitRequest { + term: 1, + leader_id: "leader_id".to_string(), + prev_log_index: 0, + prev_log_term: 0, + block_entry: Some(BlockEntry { + number: 1, + ..Default::default() + }), + }); + + let response = service.append_block_commit(request).await; + assert!(response.is_err()); + + let status = response.unwrap_err(); + assert_eq!(status.code(), tonic::Code::Unknown); + assert_eq!(status.message(), "Request term 1 is less than current term 2"); + } } diff --git a/src/eth/consensus/tests/factories.rs b/src/eth/consensus/tests/factories.rs index fa76fc417..2112754a7 100644 --- a/src/eth/consensus/tests/factories.rs +++ b/src/eth/consensus/tests/factories.rs @@ -1,3 +1,4 @@ +use core::sync::atomic::Ordering; use std::net::Ipv4Addr; use std::net::SocketAddr; use std::sync::Arc; @@ -141,10 +142,14 @@ async fn create_mock_leader_peer(consensus: Arc) -> (PeerAddress, Pee (leader_address, leader_peer) } -pub async fn create_follower_consensus_with_leader() -> Arc { +pub async fn create_follower_consensus_with_leader(term: Option) -> Arc { let consensus = create_mock_consensus().await; consensus.set_role(Role::Follower); + if let Some(term) = term { + consensus.current_term.store(term, Ordering::SeqCst); + } + let (leader_address, leader_peer) = create_mock_leader_peer(Arc::clone(&consensus)).await; let mut peers = consensus.peers.write().await; diff --git a/src/eth/consensus/tests/test_simple_blocks.rs b/src/eth/consensus/tests/test_simple_blocks.rs index 35074196e..1012969d2 100644 --- a/src/eth/consensus/tests/test_simple_blocks.rs +++ b/src/eth/consensus/tests/test_simple_blocks.rs @@ -15,7 +15,7 @@ use crate::eth::consensus::TransactionExecutionEntry; #[tokio::test] async fn test_append_entries_transaction_executions_and_block() { - let consensus = create_follower_consensus_with_leader().await; + let consensus = create_follower_consensus_with_leader(None).await; let service = AppendEntryServiceImpl { consensus: Mutex::new(Arc::clone(&consensus)), }; From ca394a8877bbb94c0f92a1c52a4d030b6a231052 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Thu, 27 Jun 2024 14:58:38 -0300 Subject: [PATCH 64/65] fix --- src/eth/consensus/server.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/eth/consensus/server.rs b/src/eth/consensus/server.rs index 4ba95a0bc..3dfa89160 100644 --- a/src/eth/consensus/server.rs +++ b/src/eth/consensus/server.rs @@ -417,7 +417,7 @@ mod tests { assert!(response.is_err()); let status = response.unwrap_err(); - assert_eq!(status.code(), tonic::Code::Unknown); + assert_eq!(status.code(), tonic::Code::NotFound); assert_eq!(status.message(), "Request term 1 is less than current term 2"); } @@ -443,7 +443,7 @@ mod tests { assert!(response.is_err()); let status = response.unwrap_err(); - assert_eq!(status.code(), tonic::Code::Unknown); + assert_eq!(status.code(), tonic::Code::NotFound); assert_eq!(status.message(), "Request term 1 is less than current term 2"); } } From 054e7361e83b73b3531188815d321b85c60ddd01 Mon Sep 17 00:00:00 2001 From: gabriel-aranha-cw Date: Thu, 27 Jun 2024 15:00:00 -0300 Subject: [PATCH 65/65] doc --- src/eth/consensus/server.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/eth/consensus/server.rs b/src/eth/consensus/server.rs index 3dfa89160..c6d208513 100644 --- a/src/eth/consensus/server.rs +++ b/src/eth/consensus/server.rs @@ -400,11 +400,13 @@ mod tests { #[tokio::test] async fn test_append_transaction_executions_not_leader_term_mismatch() { + // Create follower with term 2 let consensus = create_follower_consensus_with_leader(Some(2)).await; let service = AppendEntryServiceImpl { consensus: Mutex::new(Arc::clone(&consensus)), }; + // Send gRPC with term 1, which is less than the current term to force an error response let request = Request::new(AppendTransactionExecutionsRequest { term: 1, leader_id: "leader_id".to_string(), @@ -423,11 +425,13 @@ mod tests { #[tokio::test] async fn test_append_block_commit_not_leader_term_mismatch() { + // Create follower with term 2 let consensus = create_follower_consensus_with_leader(Some(2)).await; let service = AppendEntryServiceImpl { consensus: Mutex::new(Arc::clone(&consensus)), }; + // Send gRPC with term 1, which is less than the current term to force an error response let request = Request::new(AppendBlockCommitRequest { term: 1, leader_id: "leader_id".to_string(),