diff --git a/chaos/experiments/leader-election.sh b/chaos/experiments/leader-election.sh index c175146ab..ae1e6564c 100755 --- a/chaos/experiments/leader-election.sh +++ b/chaos/experiments/leader-election.sh @@ -86,7 +86,7 @@ check_leader() { local grpc_address=$1 # Send the gRPC request using grpcurl and capture both stdout and stderr - response=$(grpcurl -import-path static/proto -proto append_entry.proto -plaintext -d '{"term": 5, "prevLogIndex": 0, "prevLogTerm": 4, "leader_id": "leader_id_value", "block_entry": {"number": 1, "hash": "ZXh0cmFfZGF0YV92YWx1ZQ==", "transactions_root": "ZXh0cmFfZGF0YV92YWx1ZQ==", "gas_used": 999, "gas_limit": 999, "bloom": "ZXh0cmFfZGF0YV92YWx1ZQ==", "timestamp": 123456789, "parent_hash": "ZXh0cmFfZGF0YV92YWx1ZQ==", "author": "ZXh0cmFfZGF0YV92YWx1ZQ==", "extra_data": "ZXh0cmFfZGF0YV92YWx1ZQ==", "miner": "ZXh0cmFfZGF0YV92YWx1ZQ==", "receipts_root": "ZXh0cmFfZGF0YV92YWx1ZQ==", "uncle_hash": "ZXh0cmFfZGF0YV92YWx1ZQ==", "size": 12345, "state_root": "ZXh0cmFfZGF0YV92YWx1ZQ==", "transaction_hashes": []}}' "$grpc_address" append_entry.AppendEntryService/AppendBlockCommit 2>&1) + response=$(grpcurl -import-path static/proto -proto append_entry.proto -plaintext -d '{"term": 999999999, "prevLogIndex": 0, "prevLogTerm": 0, "leader_id": "leader_id_value", "block_entry": {"number": 1, "hash": "ZXh0cmFfZGF0YV92YWx1ZQ==", "transactions_root": "ZXh0cmFfZGF0YV92YWx1ZQ==", "gas_used": 999, "gas_limit": 999, "bloom": "ZXh0cmFfZGF0YV92YWx1ZQ==", "timestamp": 123456789, "parent_hash": "ZXh0cmFfZGF0YV92YWx1ZQ==", "author": "ZXh0cmFfZGF0YV92YWx1ZQ==", "extra_data": "ZXh0cmFfZGF0YV92YWx1ZQ==", "miner": "ZXh0cmFfZGF0YV92YWx1ZQ==", "receipts_root": "ZXh0cmFfZGF0YV92YWx1ZQ==", "uncle_hash": "ZXh0cmFfZGF0YV92YWx1ZQ==", "size": 12345, "state_root": "ZXh0cmFfZGF0YV92YWx1ZQ==", "transaction_hashes": []}}' "$grpc_address" append_entry.AppendEntryService/AppendBlockCommit 2>&1) # Check the response for specific strings to determine the node status if [[ "$response" == *"append_transaction_executions called on leader node"* ]]; then @@ -113,7 +113,7 @@ find_leader() { # Function to remove rocks-path directory remove_rocks_path() { - rm -rf data/ + find . -type d -name "tmp_rocks_*" -print0 | xargs -0 rm -rf } # Function to run the election test diff --git a/src/bin/run_with_importer.rs b/src/bin/run_with_importer.rs index b97ad20c9..c177be894 100644 --- a/src/bin/run_with_importer.rs +++ b/src/bin/run_with_importer.rs @@ -24,6 +24,7 @@ async fn run(config: RunWithImporterConfig) -> anyhow::Result<()> { let miner = config.miner.init_external_mode(Arc::clone(&storage), None)?; let consensus = Consensus::new( Arc::clone(&storage), + config.storage.perm_storage.rocks_path_prefix.clone(), config.clone().candidate_peers.clone(), Some(config.clone()), config.address, diff --git a/src/config.rs b/src/config.rs index 37b8080fc..d014a093b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -846,7 +846,6 @@ pub struct PermanentStorageConfig { #[arg(long = "perm-storage", env = "PERM_STORAGE")] pub perm_storage_kind: PermanentStorageKind, - #[cfg(feature = "rocks")] /// RocksDB storage path prefix to execute multiple local Stratus instances. #[arg(long = "rocks-path-prefix", env = "ROCKS_PATH_PREFIX")] pub rocks_path_prefix: Option, diff --git a/src/eth/consensus/append_log_entries_storage.rs b/src/eth/consensus/append_log_entries_storage.rs index 26ee8430d..7dcea39b2 100644 --- a/src/eth/consensus/append_log_entries_storage.rs +++ b/src/eth/consensus/append_log_entries_storage.rs @@ -7,15 +7,32 @@ use rocksdb::Options; use rocksdb::DB; use super::log_entry::LogEntry; +use super::log_entry::LogEntryData; pub struct AppendLogEntriesStorage { db: DB, } impl AppendLogEntriesStorage { - pub fn new>(path: P) -> Result { + pub fn new(path: Option) -> Result { let mut opts = Options::default(); opts.create_if_missing(true); + + let path = if let Some(prefix) = path { + // run some checks on the given prefix + assert!(!prefix.is_empty(), "given prefix for RocksDB is empty, try not providing the flag"); + if Path::new(&prefix).is_dir() || Path::new(&prefix).iter().count() > 1 { + tracing::warn!(?prefix, "given prefix for RocksDB might put it in another folder"); + } + + let path = format!("{prefix}-log-entries-rocksdb"); + tracing::info!("starting rocksdb log entries storage - at custom path: '{:?}'", path); + path + } else { + tracing::info!("starting rocksdb log entries storage - at default path: 'data/log-entries-rocksdb'"); // TODO: keep inside data? + "data/log-entries-rocksdb".to_string() + }; + let db = DB::open(&opts, path).context("Failed to open RocksDB")?; Ok(Self { db }) } @@ -81,6 +98,35 @@ impl AppendLogEntriesStorage { None => Ok(0), // Default to 0 if not set } } + + pub fn save_log_entry(&self, index: u64, term: u64, data: LogEntryData, entry_type: &str) -> Result<()> { + tracing::debug!(index, term, "Creating {} log entry", entry_type); + let log_entry = LogEntry { term, index, data }; + tracing::debug!(index = log_entry.index, term = log_entry.term, "{} log entry created", entry_type); + + tracing::debug!("Checking for existing {} entry at new index", entry_type); + match self.get_entry(log_entry.index) { + Ok(Some(existing_entry)) => + if existing_entry.term != log_entry.term { + tracing::warn!( + index = log_entry.index, + "Conflicting entry found, deleting existing entry and all that follow it" + ); + self.delete_entries_from(log_entry.index)?; + }, + Ok(None) => { + // No existing entry at this index, proceed to save the new entry + } + Err(e) => { + tracing::error!(index = log_entry.index, "Error retrieving entry: {}", e); + return Err(anyhow::anyhow!("Error retrieving entry: {}", e)); + } + } + + tracing::debug!("Appending new {} log entry", entry_type); + self.save_entry(&log_entry) + .map_err(|e| anyhow::anyhow!("Failed to append {} log entry: {}", entry_type, e)) + } } #[cfg(test)] @@ -89,12 +135,11 @@ mod tests { use super::*; use crate::eth::consensus::tests::factories::*; - use crate::eth::consensus::LogEntryData; fn setup_storage() -> AppendLogEntriesStorage { let temp_dir = TempDir::new().unwrap(); - let temp_path = temp_dir.path(); - AppendLogEntriesStorage::new(temp_path).unwrap() + let temp_path = temp_dir.path().to_str().expect("Failed to get temp path").to_string(); + AppendLogEntriesStorage::new(Some(temp_path)).unwrap() } #[test] @@ -192,4 +237,91 @@ mod tests { panic!("Expected TransactionExecutionEntries"); } } + + #[test] + fn test_save_log_entry_no_conflict() { + let storage = setup_storage(); + let index = 1; + let term = 1; + let log_entry_data = create_mock_log_entry_data_block(); + + storage.save_log_entry(index, term, log_entry_data.clone(), "block").unwrap(); + let retrieved_entry = storage.get_entry(index).unwrap().unwrap(); + + assert_eq!(retrieved_entry.index, index); + assert_eq!(retrieved_entry.term, term); + + if let LogEntryData::BlockEntry(ref block) = retrieved_entry.data { + if let LogEntryData::BlockEntry(ref expected_block) = log_entry_data { + assert_eq!(block.hash, expected_block.hash); + assert_eq!(block.number, expected_block.number); + assert_eq!(block.parent_hash, expected_block.parent_hash); + assert_eq!(block.uncle_hash, expected_block.uncle_hash); + assert_eq!(block.transactions_root, expected_block.transactions_root); + assert_eq!(block.state_root, expected_block.state_root); + assert_eq!(block.receipts_root, expected_block.receipts_root); + assert_eq!(block.miner, expected_block.miner); + assert_eq!(block.extra_data, expected_block.extra_data); + assert_eq!(block.size, expected_block.size); + assert_eq!(block.gas_limit, expected_block.gas_limit); + assert_eq!(block.gas_used, expected_block.gas_used); + assert_eq!(block.timestamp, expected_block.timestamp); + assert_eq!(block.bloom, expected_block.bloom); + assert_eq!(block.author, expected_block.author); + assert_eq!(block.transaction_hashes, expected_block.transaction_hashes); + } else { + panic!("Expected BlockEntry"); + } + } else { + panic!("Expected BlockEntry"); + } + } + + #[test] + fn test_save_log_entry_with_conflict() { + let storage = setup_storage(); + let index = 1; + let term = 1; + let conflicting_term = 2; + let log_entry_data = create_mock_log_entry_data_block(); + + // Save initial log entry + storage.save_log_entry(index, term, log_entry_data.clone(), "block").unwrap(); + + // Save conflicting log entry at the same index but with a different term + storage.save_log_entry(index, conflicting_term, log_entry_data.clone(), "block").unwrap(); + + // Assert no entries exist after the conflicting entry's index, confirming that the conflicting entry and all that follow it were deleted + assert!(storage.get_entry(index + 1).unwrap().is_none()); + + // Retrieve the entry at the index and assert it matches the conflicting term entry + let retrieved_entry = storage.get_entry(index).unwrap().unwrap(); + assert_eq!(retrieved_entry.index, index); + assert_eq!(retrieved_entry.term, conflicting_term); + + if let LogEntryData::BlockEntry(ref block) = retrieved_entry.data { + if let LogEntryData::BlockEntry(ref expected_block) = log_entry_data { + assert_eq!(block.hash, expected_block.hash); + assert_eq!(block.number, expected_block.number); + assert_eq!(block.parent_hash, expected_block.parent_hash); + assert_eq!(block.uncle_hash, expected_block.uncle_hash); + assert_eq!(block.transactions_root, expected_block.transactions_root); + assert_eq!(block.state_root, expected_block.state_root); + assert_eq!(block.receipts_root, expected_block.receipts_root); + assert_eq!(block.miner, expected_block.miner); + assert_eq!(block.extra_data, expected_block.extra_data); + assert_eq!(block.size, expected_block.size); + assert_eq!(block.gas_limit, expected_block.gas_limit); + assert_eq!(block.gas_used, expected_block.gas_used); + assert_eq!(block.timestamp, expected_block.timestamp); + assert_eq!(block.bloom, expected_block.bloom); + assert_eq!(block.author, expected_block.author); + assert_eq!(block.transaction_hashes, expected_block.transaction_hashes); + } else { + panic!("Expected BlockEntry"); + } + } else { + panic!("Expected BlockEntry"); + } + } } diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index fa210e339..405d1d245 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -63,6 +63,8 @@ use append_entry::RequestVoteRequest; use append_entry::StatusCode; use append_entry::TransactionExecutionEntry; +#[cfg(feature = "rocks")] +use self::append_log_entries_storage::AppendLogEntriesStorage; use self::log_entry::LogEntryData; use super::primitives::TransactionExecution; use super::primitives::TransactionInput; @@ -164,6 +166,8 @@ pub struct Consensus { broadcast_sender: broadcast::Sender, //propagates the blocks importer_config: Option, //HACK this is used with sync online only storage: Arc, + #[cfg(feature = "rocks")] + log_entries_storage: Arc, peers: Arc>>, #[allow(dead_code)] direct_peers: Vec, @@ -180,8 +184,10 @@ pub struct Consensus { } impl Consensus { + #[allow(clippy::too_many_arguments)] //TODO: refactor into consensus config pub async fn new( storage: Arc, + log_storage_path: Option, direct_peers: Vec, importer_config: Option, jsonrpc_address: SocketAddr, @@ -197,6 +203,8 @@ impl Consensus { let consensus = Self { broadcast_sender, storage, + #[cfg(feature = "rocks")] + log_entries_storage: Arc::new(AppendLogEntriesStorage::new(log_storage_path).unwrap()), peers, direct_peers, current_term: AtomicU64::new(0), @@ -385,19 +393,48 @@ impl Consensus { } fn initialize_transaction_execution_queue(consensus: Arc) { - //TODO add data to consensus-log-transactions - //TODO rediscover followers on comunication error - //XXX FIXME deal with the scenario where a transactionHash arrives after the block, in this case before saving the block LogEntry, it should ALWAYS wait for all transaction hashes - //TODO maybe check if I'm currently the leader? + // TODO: add data to consensus-log-transactions + // TODO: rediscover followers on communication error + // XXX FIXME: deal with the scenario where a transactionHash arrives after the block; + // in this case, before saving the block LogEntry, it should ALWAYS wait for all transaction hashes + // TODO: maybe check if I'm currently the leader? + spawn_named("consensus::transaction_execution_queue", async move { let interval = Duration::from_millis(40); loop { tokio::time::sleep(interval).await; + if consensus.is_leader() { let mut queue = consensus.transaction_execution_queue.lock().await; let executions = queue.drain(..).collect::>(); drop(queue); + #[cfg(feature = "rocks")] + { + tracing::debug!(executions_len = executions.len(), "Processing transaction executions"); + if !executions.is_empty() { + let last_index = consensus.log_entries_storage.get_last_index().unwrap_or(0); + tracing::debug!(last_index, "Last index fetched"); + + let current_term = consensus.current_term.load(Ordering::SeqCst); + tracing::debug!(current_term, "Current term loaded"); + + match consensus.log_entries_storage.save_log_entry( + last_index + 1, + current_term, + LogEntryData::TransactionExecutionEntries(executions.clone()), + "transaction", + ) { + Ok(_) => { + tracing::debug!("Transaction execution entry saved successfully"); + } + Err(e) => { + tracing::error!("Failed to save transaction execution entry: {:?}", e); + } + } + } + } + let peers = consensus.peers.read().await; for (_, (peer, _)) in peers.iter() { let mut peer_clone = peer.clone(); @@ -420,6 +457,7 @@ impl Consensus { loop { tokio::select! { Ok(tx) = rx_pending_txs.recv() => { + tracing::debug!("Attempting to receive transaction execution"); if consensus.is_leader() { tracing::info!(tx_hash = %tx.hash(), "received transaction execution to send to followers"); if tx.is_local() { @@ -427,7 +465,6 @@ impl Consensus { continue; } - //TODO XXX save transaction to appendEntries log let transaction = vec![tx.to_append_entry_transaction()]; let transaction_entry = LogEntryData::TransactionExecutionEntries(transaction); if consensus.broadcast_sender.send(transaction_entry).is_err() { @@ -437,13 +474,44 @@ impl Consensus { } Ok(block) = rx_blocks.recv() => { if consensus.is_leader() { - tracing::info!(number = block.header.number.as_u64(), "received block to send to followers"); - - //TODO save block to appendEntries log - //TODO before saving check if all transaction_hashes are already in the log - let block_entry = LogEntryData::BlockEntry(block.header.to_append_entry_block_header(Vec::new())); - if consensus.broadcast_sender.send(block_entry).is_err() { - tracing::error!("failed to broadcast block"); + tracing::info!(number = block.header.number.as_u64(), "Leader received block to send to followers"); + + #[cfg(feature = "rocks")] + { + //TODO: before saving check if all transaction_hashes are already in the log + let last_index = consensus.log_entries_storage.get_last_index().unwrap_or(0); + tracing::debug!(last_index, "Last index fetched"); + + let current_term = consensus.current_term.load(Ordering::SeqCst); + tracing::debug!(current_term, "Current term loaded"); + + let transaction_hashes: Vec> = block.transactions.iter().map(|tx| tx.input.hash.to_string().into_bytes()).collect(); + + match consensus.log_entries_storage.save_log_entry( + last_index + 1, + current_term, + LogEntryData::BlockEntry(block.header.to_append_entry_block_header(transaction_hashes.clone())), + "block", + ) { + Ok(_) => { + tracing::debug!("Block entry saved successfully"); + let block_entry = LogEntryData::BlockEntry(block.header.to_append_entry_block_header(transaction_hashes)); + if consensus.broadcast_sender.send(block_entry).is_err() { + tracing::error!("Failed to broadcast block"); + } + } + Err(e) => { + tracing::error!("Failed to save block entry: {:?}", e); + } + } + } + #[cfg(not(feature = "rocks"))] + { + let transaction_hashes: Vec> = block.transactions.iter().map(|tx| tx.input.hash.to_string().into_bytes()).collect(); + let block_entry = LogEntryData::BlockEntry(block.header.to_append_entry_block_header(transaction_hashes)); + if consensus.broadcast_sender.send(block_entry).is_err() { + tracing::error!("Failed to broadcast block"); + } } } } diff --git a/src/eth/consensus/server.rs b/src/eth/consensus/server.rs index 2c91d2e89..c6d208513 100644 --- a/src/eth/consensus/server.rs +++ b/src/eth/consensus/server.rs @@ -14,6 +14,7 @@ use super::append_entry::RequestVoteRequest; use super::append_entry::RequestVoteResponse; use super::append_entry::StatusCode; use crate::eth::consensus::AppendEntryService; +use crate::eth::consensus::LogEntryData; use crate::eth::consensus::PeerAddress; use crate::eth::consensus::Role; use crate::eth::Consensus; @@ -41,6 +42,7 @@ impl AppendEntryService for AppendEntryServiceImpl { let start = std::time::Instant::now(); let consensus = self.consensus.lock().await; + let current_term = consensus.current_term.load(Ordering::SeqCst); let request_inner = request.into_inner(); if consensus.is_leader() { @@ -51,8 +53,23 @@ impl AppendEntryService for AppendEntryServiceImpl { )); } + if request_inner.term < current_term { + let error_message = format!("Request term {} is less than current term {}", request_inner.term, current_term); + tracing::error!(request_term = request_inner.term, current_term = current_term, "{}", &error_message); + return Err(Status::new((StatusCode::TermMismatch as i32).into(), error_message)); + } + let executions = request_inner.executions; - //TODO save the transaction executions here + let index = request_inner.prev_log_index + 1; + let term = request_inner.prev_log_term; + let data = LogEntryData::TransactionExecutionEntries(executions.clone()); + + #[cfg(feature = "rocks")] + if let Err(e) = consensus.log_entries_storage.save_log_entry(index, term, data, "transaction") { + tracing::error!("Failed to save log entry: {:?}", e); + return Err(Status::internal("Failed to save log entry")); + } + //TODO send the executions to the Storage tracing::info!(executions = executions.len(), "appending executions"); @@ -77,6 +94,7 @@ impl AppendEntryService for AppendEntryServiceImpl { let start = std::time::Instant::now(); let consensus = self.consensus.lock().await; + let current_term = consensus.current_term.load(Ordering::SeqCst); let request_inner = request.into_inner(); if consensus.is_leader() { @@ -87,12 +105,28 @@ impl AppendEntryService for AppendEntryServiceImpl { )); } + if request_inner.term < current_term { + let error_message = format!("Request term {} is less than current term {}", request_inner.term, current_term); + tracing::error!(request_term = request_inner.term, current_term = current_term, "{}", &error_message); + return Err(Status::new((StatusCode::TermMismatch as i32).into(), error_message)); + } + let Some(block_entry) = request_inner.block_entry else { return Err(Status::invalid_argument("empty block entry")); }; tracing::info!(number = block_entry.number, "appending new block"); + let index = request_inner.prev_log_index + 1; + let term = request_inner.prev_log_term; + let data = LogEntryData::BlockEntry(block_entry.clone()); + + #[cfg(feature = "rocks")] + if let Err(e) = consensus.log_entries_storage.save_log_entry(index, term, data, "block") { + tracing::error!("Failed to save log entry: {:?}", e); + return Err(Status::internal("Failed to save log entry")); + } + let last_last_arrived_block_number = consensus.last_arrived_block_number.load(Ordering::SeqCst); //TODO FIXME move this code back when we have propagation: let Some(diff) = last_last_arrived_block_number.checked_sub(block_entry.number) else { @@ -192,6 +226,40 @@ mod tests { use crate::eth::consensus::tests::factories::*; use crate::eth::consensus::BlockEntry; + #[tokio::test] + async fn test_append_transaction_executions_insert() { + let consensus = create_mock_consensus().await; + let service = AppendEntryServiceImpl { + consensus: Mutex::new(Arc::clone(&consensus)), + }; + + consensus.set_role(Role::Follower); + + let executions = vec![create_mock_transaction_execution_entry()]; + + let request = Request::new(AppendTransactionExecutionsRequest { + term: 1, + leader_id: "leader_id".to_string(), + prev_log_index: 0, + prev_log_term: 0, + executions: executions.clone(), + }); + + let response = service.append_transaction_executions(request).await; + assert!(response.is_ok()); + + // Check if the log entry was inserted correctly + let log_entries_storage = &consensus.log_entries_storage; + let last_index = log_entries_storage.get_last_index().unwrap(); + let saved_entry = log_entries_storage.get_entry(last_index).unwrap().unwrap(); + + if let LogEntryData::TransactionExecutionEntries(saved_executions) = saved_entry.data { + assert_eq!(saved_executions, executions); + } else { + panic!("Expected transaction execution entries in the log entry"); + } + } + #[tokio::test] async fn test_append_transaction_executions_not_leader() { let consensus = create_leader_consensus().await; @@ -247,7 +315,7 @@ mod tests { #[tokio::test] async fn test_append_block_commit_not_leader() { - let consensus = create_follower_consensus_with_leader().await; + let consensus = create_follower_consensus_with_leader(None).await; let service = AppendEntryServiceImpl { consensus: Mutex::new(Arc::clone(&consensus)), }; @@ -329,4 +397,57 @@ mod tests { assert_eq!(response.term, 1); assert!(response.vote_granted); } + + #[tokio::test] + async fn test_append_transaction_executions_not_leader_term_mismatch() { + // Create follower with term 2 + let consensus = create_follower_consensus_with_leader(Some(2)).await; + let service = AppendEntryServiceImpl { + consensus: Mutex::new(Arc::clone(&consensus)), + }; + + // Send gRPC with term 1, which is less than the current term to force an error response + let request = Request::new(AppendTransactionExecutionsRequest { + term: 1, + leader_id: "leader_id".to_string(), + prev_log_index: 0, + prev_log_term: 0, + executions: vec![], + }); + + let response = service.append_transaction_executions(request).await; + assert!(response.is_err()); + + let status = response.unwrap_err(); + assert_eq!(status.code(), tonic::Code::NotFound); + assert_eq!(status.message(), "Request term 1 is less than current term 2"); + } + + #[tokio::test] + async fn test_append_block_commit_not_leader_term_mismatch() { + // Create follower with term 2 + let consensus = create_follower_consensus_with_leader(Some(2)).await; + let service = AppendEntryServiceImpl { + consensus: Mutex::new(Arc::clone(&consensus)), + }; + + // Send gRPC with term 1, which is less than the current term to force an error response + let request = Request::new(AppendBlockCommitRequest { + term: 1, + leader_id: "leader_id".to_string(), + prev_log_index: 0, + prev_log_term: 0, + block_entry: Some(BlockEntry { + number: 1, + ..Default::default() + }), + }); + + let response = service.append_block_commit(request).await; + assert!(response.is_err()); + + let status = response.unwrap_err(); + assert_eq!(status.code(), tonic::Code::NotFound); + assert_eq!(status.message(), "Request term 1 is less than current term 2"); + } } diff --git a/src/eth/consensus/tests/factories.rs b/src/eth/consensus/tests/factories.rs index 86a5ce0f4..2112754a7 100644 --- a/src/eth/consensus/tests/factories.rs +++ b/src/eth/consensus/tests/factories.rs @@ -1,3 +1,4 @@ +use core::sync::atomic::Ordering; use std::net::Ipv4Addr; use std::net::SocketAddr; use std::sync::Arc; @@ -95,6 +96,7 @@ pub fn create_mock_log_entry(index: u64, term: u64, data: LogEntryData) -> LogEn pub async fn create_mock_consensus() -> Arc { let (storage, _tmpdir) = StratusStorage::mock_new_rocksdb(); + let (_log_entries_storage, tmpdir_log_entries) = StratusStorage::mock_new_rocksdb(); let direct_peers = Vec::new(); let importer_config = None; let jsonrpc_address = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0); @@ -102,8 +104,11 @@ pub async fn create_mock_consensus() -> Arc { let (tx_pending_txs, _) = broadcast::channel(10); let (tx_blocks, _) = broadcast::channel(10); + let tmpdir_log_entries_path = tmpdir_log_entries.path().to_str().map(|s| s.to_owned()); + Consensus::new( storage.into(), + tmpdir_log_entries_path, direct_peers, importer_config, jsonrpc_address, @@ -137,10 +142,14 @@ async fn create_mock_leader_peer(consensus: Arc) -> (PeerAddress, Pee (leader_address, leader_peer) } -pub async fn create_follower_consensus_with_leader() -> Arc { +pub async fn create_follower_consensus_with_leader(term: Option) -> Arc { let consensus = create_mock_consensus().await; consensus.set_role(Role::Follower); + if let Some(term) = term { + consensus.current_term.store(term, Ordering::SeqCst); + } + let (leader_address, leader_peer) = create_mock_leader_peer(Arc::clone(&consensus)).await; let mut peers = consensus.peers.write().await; diff --git a/src/eth/consensus/tests/test_simple_blocks.rs b/src/eth/consensus/tests/test_simple_blocks.rs index 35074196e..1012969d2 100644 --- a/src/eth/consensus/tests/test_simple_blocks.rs +++ b/src/eth/consensus/tests/test_simple_blocks.rs @@ -15,7 +15,7 @@ use crate::eth::consensus::TransactionExecutionEntry; #[tokio::test] async fn test_append_entries_transaction_executions_and_block() { - let consensus = create_follower_consensus_with_leader().await; + let consensus = create_follower_consensus_with_leader(None).await; let service = AppendEntryServiceImpl { consensus: Mutex::new(Arc::clone(&consensus)), }; diff --git a/src/main.rs b/src/main.rs index 5fb71b654..c71f97aed 100644 --- a/src/main.rs +++ b/src/main.rs @@ -22,6 +22,7 @@ async fn run(config: StratusConfig) -> anyhow::Result<()> { let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner)); let consensus = Consensus::new( Arc::clone(&storage), + config.storage.perm_storage.rocks_path_prefix.clone(), config.clone().candidate_peers.clone(), None, config.address,