diff --git a/src/bin/run_with_importer.rs b/src/bin/run_with_importer.rs index 24be624f2..c3e8350bc 100644 --- a/src/bin/run_with_importer.rs +++ b/src/bin/run_with_importer.rs @@ -22,7 +22,14 @@ async fn run(config: RunWithImporterConfig) -> anyhow::Result<()> { // init services let storage = config.storage.init().await?; - let consensus = Consensus::new(Arc::clone(&storage), config.clone().candidate_peers.clone(), Some(config.clone())).await; // in development, with no leader configured, the current node ends up being the leader + let consensus = Consensus::new( + Arc::clone(&storage), + config.clone().candidate_peers.clone(), + Some(config.clone()), + config.address, + config.grpc_server_address, + ) + .await; // in development, with no leader configured, the current node ends up being the leader let Some((http_url, ws_url)) = consensus.get_chain_url().await else { return Err(anyhow!("No chain url found")); }; diff --git a/src/config.rs b/src/config.rs index 98ef90e09..d1d21743f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -110,6 +110,10 @@ pub struct CommonConfig { #[arg(long = "candidate-peers", env = "CANDIDATE_PEERS", value_delimiter = ',')] pub candidate_peers: Vec, + // Address for the GRPC Server + #[arg(long = "grpc-server-address", env = "GRPC_SERVER_ADDRESS", default_value = "0.0.0.0:3777")] + pub grpc_server_address: SocketAddr, + /// Prevents clap from breaking when passing `nocapture` options in tests. #[arg(long = "nocapture")] pub nocapture: bool, @@ -837,6 +841,11 @@ pub struct PermanentStorageConfig { /// Permamenent storage timeout when opening a connection (in millis). #[arg(long = "perm-storage-timeout", value_parser=parse_duration, env = "PERM_STORAGE_TIMEOUT")] pub perm_storage_timeout: Duration, + + #[cfg(feature = "rocks")] + /// RocksDB storage path prefix to execute multiple local Stratus instances. + #[arg(long = "rocks-path-prefix", env = "ROCKS_PATH_PREFIX", default_value = "")] + pub rocks_path_prefix: Option, } #[derive(DebugAsJson, Clone, serde::Serialize)] @@ -857,7 +866,7 @@ impl PermanentStorageConfig { let perm: Arc = match self.perm_storage_kind { PermanentStorageKind::InMemory => Arc::new(InMemoryPermanentStorage::default()), #[cfg(feature = "rocks")] - PermanentStorageKind::Rocks => Arc::new(RocksPermanentStorage::new().await?), + PermanentStorageKind::Rocks => Arc::new(RocksPermanentStorage::new(self.rocks_path_prefix.clone()).await?), PermanentStorageKind::Postgres { ref url } => { let config = PostgresPermanentStorageConfig { url: url.to_owned(), diff --git a/src/eth/consensus/mod.rs b/src/eth/consensus/mod.rs index a6dacbf2c..c2f86cbe7 100644 --- a/src/eth/consensus/mod.rs +++ b/src/eth/consensus/mod.rs @@ -3,6 +3,7 @@ pub mod forward_to; use std::collections::HashMap; #[cfg(feature = "kubernetes")] use std::env; +use std::net::SocketAddr; use std::net::UdpSocket; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; @@ -91,7 +92,7 @@ impl PeerAddress { } fn full_grpc_address(&self) -> String { - format!("http://{}:{}", self.address, self.grpc_port) + format!("{}:{}", self.address, self.grpc_port) } fn full_jsonrpc_address(&self) -> String { @@ -158,17 +159,24 @@ pub struct Consensus { role: RwLock, heartbeat_timeout: Duration, my_address: PeerAddress, + grpc_address: SocketAddr, reset_heartbeat_signal: tokio::sync::Notify, } impl Consensus { - pub async fn new(storage: Arc, direct_peers: Vec, importer_config: Option) -> Arc { + pub async fn new( + storage: Arc, + direct_peers: Vec, + importer_config: Option, + jsonrpc_address: SocketAddr, + grpc_address: SocketAddr, + ) -> Arc { let (sender, receiver) = mpsc::channel::(32); let receiver = Arc::new(Mutex::new(receiver)); let (broadcast_sender, _) = broadcast::channel(32); let last_arrived_block_number = AtomicU64::new(storage.read_mined_block_number().await.unwrap_or(BlockNumber::from(0)).into()); let peers = Arc::new(RwLock::new(HashMap::new())); - let my_address = Self::discover_my_address(); + let my_address = Self::discover_my_address(jsonrpc_address.port(), grpc_address.port()); let consensus = Self { sender, @@ -183,6 +191,7 @@ impl Consensus { role: RwLock::new(Role::Follower), heartbeat_timeout: Duration::from_millis(rand::thread_rng().gen_range(1200..1500)), // Adjust as needed my_address: my_address.clone(), + grpc_address, reset_heartbeat_signal: tokio::sync::Notify::new(), }; let consensus = Arc::new(consensus); @@ -196,12 +205,12 @@ impl Consensus { consensus } - fn discover_my_address() -> PeerAddress { + fn discover_my_address(jsonrpc_port: u16, grpc_port: u16) -> PeerAddress { let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); socket.connect("8.8.8.8:80").ok().unwrap(); let my_ip = socket.local_addr().ok().map(|addr| addr.ip().to_string()).unwrap(); - PeerAddress::new(format!("http://{}", my_ip), 3000, 3777) //FIXME TODO pick ports from config + PeerAddress::new(format!("http://{}", my_ip), jsonrpc_port, grpc_port) } /// Initializes the heartbeat and election timers. @@ -378,8 +387,8 @@ impl Consensus { fn initialize_server(consensus: Arc) { named_spawn("consensus::server", async move { - tracing::info!("starting append entry service at port 3777"); - let addr = "0.0.0.0:3777".parse().unwrap(); + tracing::info!("Starting append entry service at address: {}", consensus.grpc_address); + let addr = consensus.grpc_address; let append_entry_service = AppendEntryServiceImpl { consensus: Mutex::new(consensus), @@ -577,12 +586,13 @@ impl Consensus { let mut peers: Vec<(PeerAddress, Peer)> = Vec::new(); for address in addresses { - // Parse the address format using from_string method match PeerAddress::from_string(address.to_string()) { Ok(peer_address) => { - let full_grpc_address = peer_address.full_grpc_address(); - match AppendEntryServiceClient::connect(full_grpc_address.clone()).await { + let grpc_address = peer_address.full_grpc_address(); + tracing::info!("Attempting to connect to peer gRPC address: {}", grpc_address); + match AppendEntryServiceClient::connect(grpc_address.clone()).await { Ok(client) => { + tracing::info!("Successfully connected to peer gRPC address: {}", grpc_address); let peer = Peer { client, match_index: 0, @@ -593,17 +603,18 @@ impl Consensus { peers.push((peer_address.clone(), peer)); tracing::info!(peer = peer_address.to_string(), "peer is available"); } - Err(_) => { - tracing::warn!(peer = peer_address.to_string(), "peer is not available"); + Err(e) => { + tracing::warn!(peer = peer_address.to_string(), "peer is not available. Error: {:?}", e); } } } Err(e) => { - tracing::warn!("Invalid address format: {}. Error: {:?}", address, e); + tracing::error!("Invalid address format: {}. Error: {:?}", address, e); } } } + tracing::info!("Completed peer discovery with {} peers found", peers.len()); Ok(peers) } @@ -622,8 +633,8 @@ impl Consensus { if pod_name != Self::current_node().unwrap() { if let Some(pod_ip) = p.status.and_then(|status| status.pod_ip) { let address = pod_ip; - let jsonrpc_port = 3000; //TODO use kubernetes env config - let grpc_port = 3777; //TODO use kubernetes env config + let jsonrpc_port = consensus.my_address.jsonrpc_port; + let grpc_port = consensus.my_address.grpc_port; let full_grpc_address = format!("http://{}:{}", address, grpc_port); let client = AppendEntryServiceClient::connect(full_grpc_address.clone()).await?; @@ -762,8 +773,22 @@ impl AppendEntryService for AppendEntryServiceImpl { "last arrived block number set", ); - #[cfg(feature = "metrics")] - metrics::set_append_entries_block_number_diff(last_last_arrived_block_number - header.number); + if let Some(diff) = last_last_arrived_block_number.checked_sub(header.number) { + #[cfg(feature = "metrics")] + { + metrics::set_append_entries_block_number_diff(diff); + } + } else { + tracing::error!( + "leader is behind follower: arrived_block: {}, header_block: {}", + last_last_arrived_block_number, + header.number + ); + return Err(Status::new( + (StatusCode::EntryAlreadyExists as i32).into(), + "Leader is behind follower".to_string(), + )); + } Ok(Response::new(AppendBlockCommitResponse { status: StatusCode::AppendSuccess as i32, @@ -821,7 +846,7 @@ mod tests { use super::*; #[test] - fn test_peer_address_from_string_valid() { + fn test_peer_address_from_string_valid_http() { let input = "http://127.0.0.1:3000;3777".to_string(); let result = PeerAddress::from_string(input); @@ -833,7 +858,7 @@ mod tests { } #[test] - fn test_another_peer_address_from_string_valid() { + fn test_peer_address_from_string_valid_https() { let input = "https://127.0.0.1:3000;3777".to_string(); let result = PeerAddress::from_string(input); @@ -852,4 +877,25 @@ mod tests { assert!(result.is_err()); assert_eq!(result.err().unwrap().to_string(), "invalid format"); } + + #[test] + fn test_peer_address_from_string_missing_scheme() { + let input = "127.0.0.1:3000;3777".to_string(); + let result = PeerAddress::from_string(input); + + assert!(result.is_err()); + assert_eq!(result.err().unwrap().to_string(), "invalid scheme"); + } + + #[test] + fn test_peer_address_full_grpc_address() { + let peer_address = PeerAddress::new("127.0.0.1".to_string(), 3000, 3777); + assert_eq!(peer_address.full_grpc_address(), "127.0.0.1:3777"); + } + + #[test] + fn test_peer_address_full_jsonrpc_address() { + let peer_address = PeerAddress::new("127.0.0.1".to_string(), 3000, 3777); + assert_eq!(peer_address.full_jsonrpc_address(), "http://127.0.0.1:3000"); + } } diff --git a/src/eth/storage/rocks/rocks_permanent.rs b/src/eth/storage/rocks/rocks_permanent.rs index 74dc152d3..bdbefc483 100644 --- a/src/eth/storage/rocks/rocks_permanent.rs +++ b/src/eth/storage/rocks/rocks_permanent.rs @@ -41,10 +41,10 @@ pub struct RocksPermanentStorage { } impl RocksPermanentStorage { - pub async fn new() -> anyhow::Result { + pub async fn new(rocks_path_prefix: Option) -> anyhow::Result { tracing::info!("creating rocksdb storage"); - let state = RocksStorageState::new(); + let state = RocksStorageState::new(rocks_path_prefix); state.sync_data().await?; let block_number = state.preload_block_number()?; Ok(Self { state, block_number }) diff --git a/src/eth/storage/rocks/rocks_state.rs b/src/eth/storage/rocks/rocks_state.rs index b7cf34a44..04f406731 100644 --- a/src/eth/storage/rocks/rocks_state.rs +++ b/src/eth/storage/rocks/rocks_state.rs @@ -53,20 +53,23 @@ pub struct RocksStorageState { pub backup_trigger: Arc>, } -impl Default for RocksStorageState { - fn default() -> Self { +impl RocksStorageState { + pub fn new(rocks_path_prefix: Option) -> Self { let (tx, rx) = mpsc::channel::<()>(1); //XXX TODO while repair/restore from backup, make sure to sync online and only when its in sync with other nodes, receive requests + + let path_prefix = rocks_path_prefix.unwrap_or_default(); + let state = Self { - accounts: RocksDb::new("./data/accounts.rocksdb", DbConfig::Default).unwrap(), - accounts_history: RocksDb::new("./data/accounts_history.rocksdb", DbConfig::FastWriteSST).unwrap(), - account_slots: RocksDb::new("./data/account_slots.rocksdb", DbConfig::Default).unwrap(), - account_slots_history: RocksDb::new("./data/account_slots_history.rocksdb", DbConfig::FastWriteSST).unwrap(), - transactions: RocksDb::new("./data/transactions.rocksdb", DbConfig::LargeSSTFiles).unwrap(), - blocks_by_number: RocksDb::new("./data/blocks_by_number.rocksdb", DbConfig::LargeSSTFiles).unwrap(), - blocks_by_hash: RocksDb::new("./data/blocks_by_hash.rocksdb", DbConfig::LargeSSTFiles).unwrap(), //XXX this is not needed we can afford to have blocks_by_hash pointing into blocks_by_number - logs: RocksDb::new("./data/logs.rocksdb", DbConfig::LargeSSTFiles).unwrap(), + accounts: RocksDb::new(&format!("{}./data/accounts.rocksdb", path_prefix), DbConfig::Default).unwrap(), + accounts_history: RocksDb::new(&format!("{}./data/accounts_history.rocksdb", path_prefix), DbConfig::FastWriteSST).unwrap(), + account_slots: RocksDb::new(&format!("{}./data/account_slots.rocksdb", path_prefix), DbConfig::Default).unwrap(), + account_slots_history: RocksDb::new(&format!("{}./data/account_slots_history.rocksdb", path_prefix), DbConfig::FastWriteSST).unwrap(), + transactions: RocksDb::new(&format!("{}./data/transactions.rocksdb", path_prefix), DbConfig::LargeSSTFiles).unwrap(), + blocks_by_number: RocksDb::new(&format!("{}./data/blocks_by_number.rocksdb", path_prefix), DbConfig::LargeSSTFiles).unwrap(), + blocks_by_hash: RocksDb::new(&format!("{}./data/blocks_by_hash.rocksdb", path_prefix), DbConfig::LargeSSTFiles).unwrap(), //XXX this is not needed we can afford to have blocks_by_hash pointing into blocks_by_number + logs: RocksDb::new(&format!("{}./data/logs.rocksdb", path_prefix), DbConfig::LargeSSTFiles).unwrap(), backup_trigger: Arc::new(tx), }; @@ -74,12 +77,6 @@ impl Default for RocksStorageState { state } -} - -impl RocksStorageState { - pub fn new() -> Self { - Self::default() - } pub fn listen_for_backup_trigger(&self, mut rx: mpsc::Receiver<()>) -> anyhow::Result<()> { tracing::info!("creating backup trigger listener"); diff --git a/src/main.rs b/src/main.rs index f580b9ea1..e3a0c48d3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -20,7 +20,14 @@ async fn run(config: StratusConfig) -> anyhow::Result<()> { }; let miner = config.miner.init(Arc::clone(&storage), None, external_relayer).await?; let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner)).await; - let consensus = Consensus::new(Arc::clone(&storage), config.clone().candidate_peers.clone(), None).await; // for now, we force None to initiate with the current node being the leader + let consensus = Consensus::new( + Arc::clone(&storage), + config.clone().candidate_peers.clone(), + None, + config.address, + config.grpc_server_address, + ) + .await; // for now, we force None to initiate with the current node being the leader // start rpc server serve_rpc(storage, executor, miner, consensus, config.address, config.executor.chain_id.into()).await?; diff --git a/tests/test_import_external_snapshot_rocksdb.rs b/tests/test_import_external_snapshot_rocksdb.rs index 3276a2c50..d69b960d3 100644 --- a/tests/test_import_external_snapshot_rocksdb.rs +++ b/tests/test_import_external_snapshot_rocksdb.rs @@ -18,7 +18,8 @@ pub mod rocks_test { let (accounts, slots) = common::filter_accounts_and_slots(snapshot); - let rocks = RocksPermanentStorage::new().await.unwrap(); + let rocks_path_prefix: Option = Some(String::new()); + let rocks = RocksPermanentStorage::new(rocks_path_prefix).await.unwrap(); rocks.save_accounts(accounts).await.unwrap(); rocks.state.write_slots(slots, BlockNumber::ZERO);