Skip to content

Commit

Permalink
Enable multiple local Stratus instances (#1030)
Browse files Browse the repository at this point in the history
* initial commit

* lint

* fix metrics bug

* path override

* lint

* comment

* commment

* fix test

* add return with status code

* enha

* fix

* remove log

* more logs

* tests

* readd test

* remove duplicate

* fmt

* lint

* simplify

* fix
  • Loading branch information
gabriel-aranha-cw authored Jun 10, 2024
1 parent 171422f commit 0d4c5e9
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 41 deletions.
9 changes: 8 additions & 1 deletion src/bin/run_with_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,14 @@ async fn run(config: RunWithImporterConfig) -> anyhow::Result<()> {

// init services
let storage = config.storage.init().await?;
let consensus = Consensus::new(Arc::clone(&storage), config.clone().candidate_peers.clone(), Some(config.clone())).await; // in development, with no leader configured, the current node ends up being the leader
let consensus = Consensus::new(
Arc::clone(&storage),
config.clone().candidate_peers.clone(),
Some(config.clone()),
config.address,
config.grpc_server_address,
)
.await; // in development, with no leader configured, the current node ends up being the leader
let Some((http_url, ws_url)) = consensus.get_chain_url().await else {
return Err(anyhow!("No chain url found"));
};
Expand Down
11 changes: 10 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ pub struct CommonConfig {
#[arg(long = "candidate-peers", env = "CANDIDATE_PEERS", value_delimiter = ',')]
pub candidate_peers: Vec<String>,

// Address for the GRPC Server
#[arg(long = "grpc-server-address", env = "GRPC_SERVER_ADDRESS", default_value = "0.0.0.0:3777")]
pub grpc_server_address: SocketAddr,

/// Prevents clap from breaking when passing `nocapture` options in tests.
#[arg(long = "nocapture")]
pub nocapture: bool,
Expand Down Expand Up @@ -837,6 +841,11 @@ pub struct PermanentStorageConfig {
/// Permamenent storage timeout when opening a connection (in millis).
#[arg(long = "perm-storage-timeout", value_parser=parse_duration, env = "PERM_STORAGE_TIMEOUT")]
pub perm_storage_timeout: Duration,

#[cfg(feature = "rocks")]
/// RocksDB storage path prefix to execute multiple local Stratus instances.
#[arg(long = "rocks-path-prefix", env = "ROCKS_PATH_PREFIX", default_value = "")]
pub rocks_path_prefix: Option<String>,
}

#[derive(DebugAsJson, Clone, serde::Serialize)]
Expand All @@ -857,7 +866,7 @@ impl PermanentStorageConfig {
let perm: Arc<dyn PermanentStorage> = match self.perm_storage_kind {
PermanentStorageKind::InMemory => Arc::new(InMemoryPermanentStorage::default()),
#[cfg(feature = "rocks")]
PermanentStorageKind::Rocks => Arc::new(RocksPermanentStorage::new().await?),
PermanentStorageKind::Rocks => Arc::new(RocksPermanentStorage::new(self.rocks_path_prefix.clone()).await?),
PermanentStorageKind::Postgres { ref url } => {
let config = PostgresPermanentStorageConfig {
url: url.to_owned(),
Expand Down
84 changes: 65 additions & 19 deletions src/eth/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod forward_to;
use std::collections::HashMap;
#[cfg(feature = "kubernetes")]
use std::env;
use std::net::SocketAddr;
use std::net::UdpSocket;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
Expand Down Expand Up @@ -91,7 +92,7 @@ impl PeerAddress {
}

fn full_grpc_address(&self) -> String {
format!("http://{}:{}", self.address, self.grpc_port)
format!("{}:{}", self.address, self.grpc_port)
}

fn full_jsonrpc_address(&self) -> String {
Expand Down Expand Up @@ -158,17 +159,24 @@ pub struct Consensus {
role: RwLock<Role>,
heartbeat_timeout: Duration,
my_address: PeerAddress,
grpc_address: SocketAddr,
reset_heartbeat_signal: tokio::sync::Notify,
}

impl Consensus {
pub async fn new(storage: Arc<StratusStorage>, direct_peers: Vec<String>, importer_config: Option<RunWithImporterConfig>) -> Arc<Self> {
pub async fn new(
storage: Arc<StratusStorage>,
direct_peers: Vec<String>,
importer_config: Option<RunWithImporterConfig>,
jsonrpc_address: SocketAddr,
grpc_address: SocketAddr,
) -> Arc<Self> {
let (sender, receiver) = mpsc::channel::<Block>(32);
let receiver = Arc::new(Mutex::new(receiver));
let (broadcast_sender, _) = broadcast::channel(32);
let last_arrived_block_number = AtomicU64::new(storage.read_mined_block_number().await.unwrap_or(BlockNumber::from(0)).into());
let peers = Arc::new(RwLock::new(HashMap::new()));
let my_address = Self::discover_my_address();
let my_address = Self::discover_my_address(jsonrpc_address.port(), grpc_address.port());

let consensus = Self {
sender,
Expand All @@ -183,6 +191,7 @@ impl Consensus {
role: RwLock::new(Role::Follower),
heartbeat_timeout: Duration::from_millis(rand::thread_rng().gen_range(1200..1500)), // Adjust as needed
my_address: my_address.clone(),
grpc_address,
reset_heartbeat_signal: tokio::sync::Notify::new(),
};
let consensus = Arc::new(consensus);
Expand All @@ -196,12 +205,12 @@ impl Consensus {
consensus
}

fn discover_my_address() -> PeerAddress {
fn discover_my_address(jsonrpc_port: u16, grpc_port: u16) -> PeerAddress {
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
socket.connect("8.8.8.8:80").ok().unwrap();
let my_ip = socket.local_addr().ok().map(|addr| addr.ip().to_string()).unwrap();

PeerAddress::new(format!("http://{}", my_ip), 3000, 3777) //FIXME TODO pick ports from config
PeerAddress::new(format!("http://{}", my_ip), jsonrpc_port, grpc_port)
}

/// Initializes the heartbeat and election timers.
Expand Down Expand Up @@ -377,8 +386,8 @@ impl Consensus {

fn initialize_server(consensus: Arc<Consensus>) {
named_spawn("consensus::server", async move {
tracing::info!("starting append entry service at port 3777");
let addr = "0.0.0.0:3777".parse().unwrap();
tracing::info!("Starting append entry service at address: {}", consensus.grpc_address);
let addr = consensus.grpc_address;

let append_entry_service = AppendEntryServiceImpl {
consensus: Mutex::new(consensus),
Expand Down Expand Up @@ -577,12 +586,13 @@ impl Consensus {
let mut peers: Vec<(PeerAddress, Peer)> = Vec::new();

for address in addresses {
// Parse the address format using from_string method
match PeerAddress::from_string(address.to_string()) {
Ok(peer_address) => {
let full_grpc_address = peer_address.full_grpc_address();
match AppendEntryServiceClient::connect(full_grpc_address.clone()).await {
let grpc_address = peer_address.full_grpc_address();
tracing::info!("Attempting to connect to peer gRPC address: {}", grpc_address);
match AppendEntryServiceClient::connect(grpc_address.clone()).await {
Ok(client) => {
tracing::info!("Successfully connected to peer gRPC address: {}", grpc_address);
let peer = Peer {
client,
match_index: 0,
Expand All @@ -593,17 +603,18 @@ impl Consensus {
peers.push((peer_address.clone(), peer));
tracing::info!(peer = peer_address.to_string(), "peer is available");
}
Err(_) => {
tracing::warn!(peer = peer_address.to_string(), "peer is not available");
Err(e) => {
tracing::warn!(peer = peer_address.to_string(), "peer is not available. Error: {:?}", e);
}
}
}
Err(e) => {
tracing::warn!("Invalid address format: {}. Error: {:?}", address, e);
tracing::error!("Invalid address format: {}. Error: {:?}", address, e);
}
}
}

tracing::info!("Completed peer discovery with {} peers found", peers.len());
Ok(peers)
}

Expand All @@ -622,8 +633,8 @@ impl Consensus {
if pod_name != Self::current_node().unwrap() {
if let Some(pod_ip) = p.status.and_then(|status| status.pod_ip) {
let address = pod_ip;
let jsonrpc_port = 3000; //TODO use kubernetes env config
let grpc_port = 3777; //TODO use kubernetes env config
let jsonrpc_port = consensus.my_address.jsonrpc_port;
let grpc_port = consensus.my_address.grpc_port;
let full_grpc_address = format!("http://{}:{}", address, grpc_port);
let client = AppendEntryServiceClient::connect(full_grpc_address.clone()).await?;

Expand Down Expand Up @@ -762,8 +773,22 @@ impl AppendEntryService for AppendEntryServiceImpl {
"last arrived block number set",
);

#[cfg(feature = "metrics")]
metrics::set_append_entries_block_number_diff(last_last_arrived_block_number - header.number);
if let Some(diff) = last_last_arrived_block_number.checked_sub(header.number) {
#[cfg(feature = "metrics")]
{
metrics::set_append_entries_block_number_diff(diff);
}
} else {
tracing::error!(
"leader is behind follower: arrived_block: {}, header_block: {}",
last_last_arrived_block_number,
header.number
);
return Err(Status::new(
(StatusCode::EntryAlreadyExists as i32).into(),
"Leader is behind follower".to_string(),
));
}

Ok(Response::new(AppendBlockCommitResponse {
status: StatusCode::AppendSuccess as i32,
Expand Down Expand Up @@ -821,7 +846,7 @@ mod tests {
use super::*;

#[test]
fn test_peer_address_from_string_valid() {
fn test_peer_address_from_string_valid_http() {
let input = "http://127.0.0.1:3000;3777".to_string();
let result = PeerAddress::from_string(input);

Expand All @@ -833,7 +858,7 @@ mod tests {
}

#[test]
fn test_another_peer_address_from_string_valid() {
fn test_peer_address_from_string_valid_https() {
let input = "https://127.0.0.1:3000;3777".to_string();
let result = PeerAddress::from_string(input);

Expand All @@ -852,4 +877,25 @@ mod tests {
assert!(result.is_err());
assert_eq!(result.err().unwrap().to_string(), "invalid format");
}

#[test]
fn test_peer_address_from_string_missing_scheme() {
let input = "127.0.0.1:3000;3777".to_string();
let result = PeerAddress::from_string(input);

assert!(result.is_err());
assert_eq!(result.err().unwrap().to_string(), "invalid scheme");
}

#[test]
fn test_peer_address_full_grpc_address() {
let peer_address = PeerAddress::new("127.0.0.1".to_string(), 3000, 3777);
assert_eq!(peer_address.full_grpc_address(), "127.0.0.1:3777");
}

#[test]
fn test_peer_address_full_jsonrpc_address() {
let peer_address = PeerAddress::new("127.0.0.1".to_string(), 3000, 3777);
assert_eq!(peer_address.full_jsonrpc_address(), "http://127.0.0.1:3000");
}
}
4 changes: 2 additions & 2 deletions src/eth/storage/rocks/rocks_permanent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ pub struct RocksPermanentStorage {
}

impl RocksPermanentStorage {
pub async fn new() -> anyhow::Result<Self> {
pub async fn new(rocks_path_prefix: Option<String>) -> anyhow::Result<Self> {
tracing::info!("creating rocksdb storage");

let state = RocksStorageState::new();
let state = RocksStorageState::new(rocks_path_prefix);
state.sync_data().await?;
let block_number = state.preload_block_number()?;
Ok(Self { state, block_number })
Expand Down
29 changes: 13 additions & 16 deletions src/eth/storage/rocks/rocks_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,33 +53,30 @@ pub struct RocksStorageState {
pub backup_trigger: Arc<mpsc::Sender<()>>,
}

impl Default for RocksStorageState {
fn default() -> Self {
impl RocksStorageState {
pub fn new(rocks_path_prefix: Option<String>) -> Self {
let (tx, rx) = mpsc::channel::<()>(1);

//XXX TODO while repair/restore from backup, make sure to sync online and only when its in sync with other nodes, receive requests

let path_prefix = rocks_path_prefix.unwrap_or_default();

let state = Self {
accounts: RocksDb::new("./data/accounts.rocksdb", DbConfig::Default).unwrap(),
accounts_history: RocksDb::new("./data/accounts_history.rocksdb", DbConfig::FastWriteSST).unwrap(),
account_slots: RocksDb::new("./data/account_slots.rocksdb", DbConfig::Default).unwrap(),
account_slots_history: RocksDb::new("./data/account_slots_history.rocksdb", DbConfig::FastWriteSST).unwrap(),
transactions: RocksDb::new("./data/transactions.rocksdb", DbConfig::LargeSSTFiles).unwrap(),
blocks_by_number: RocksDb::new("./data/blocks_by_number.rocksdb", DbConfig::LargeSSTFiles).unwrap(),
blocks_by_hash: RocksDb::new("./data/blocks_by_hash.rocksdb", DbConfig::LargeSSTFiles).unwrap(), //XXX this is not needed we can afford to have blocks_by_hash pointing into blocks_by_number
logs: RocksDb::new("./data/logs.rocksdb", DbConfig::LargeSSTFiles).unwrap(),
accounts: RocksDb::new(&format!("{}./data/accounts.rocksdb", path_prefix), DbConfig::Default).unwrap(),
accounts_history: RocksDb::new(&format!("{}./data/accounts_history.rocksdb", path_prefix), DbConfig::FastWriteSST).unwrap(),
account_slots: RocksDb::new(&format!("{}./data/account_slots.rocksdb", path_prefix), DbConfig::Default).unwrap(),
account_slots_history: RocksDb::new(&format!("{}./data/account_slots_history.rocksdb", path_prefix), DbConfig::FastWriteSST).unwrap(),
transactions: RocksDb::new(&format!("{}./data/transactions.rocksdb", path_prefix), DbConfig::LargeSSTFiles).unwrap(),
blocks_by_number: RocksDb::new(&format!("{}./data/blocks_by_number.rocksdb", path_prefix), DbConfig::LargeSSTFiles).unwrap(),
blocks_by_hash: RocksDb::new(&format!("{}./data/blocks_by_hash.rocksdb", path_prefix), DbConfig::LargeSSTFiles).unwrap(), //XXX this is not needed we can afford to have blocks_by_hash pointing into blocks_by_number
logs: RocksDb::new(&format!("{}./data/logs.rocksdb", path_prefix), DbConfig::LargeSSTFiles).unwrap(),
backup_trigger: Arc::new(tx),
};

state.listen_for_backup_trigger(rx).unwrap();

state
}
}

impl RocksStorageState {
pub fn new() -> Self {
Self::default()
}

pub fn listen_for_backup_trigger(&self, mut rx: mpsc::Receiver<()>) -> anyhow::Result<()> {
tracing::info!("creating backup trigger listener");
Expand Down
9 changes: 8 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,14 @@ async fn run(config: StratusConfig) -> anyhow::Result<()> {
};
let miner = config.miner.init(Arc::clone(&storage), None, external_relayer).await?;
let executor = config.executor.init(Arc::clone(&storage), Arc::clone(&miner)).await;
let consensus = Consensus::new(Arc::clone(&storage), config.clone().candidate_peers.clone(), None).await; // for now, we force None to initiate with the current node being the leader
let consensus = Consensus::new(
Arc::clone(&storage),
config.clone().candidate_peers.clone(),
None,
config.address,
config.grpc_server_address,
)
.await; // for now, we force None to initiate with the current node being the leader

// start rpc server
serve_rpc(storage, executor, miner, consensus, config.address, config.executor.chain_id.into()).await?;
Expand Down
3 changes: 2 additions & 1 deletion tests/test_import_external_snapshot_rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ pub mod rocks_test {

let (accounts, slots) = common::filter_accounts_and_slots(snapshot);

let rocks = RocksPermanentStorage::new().await.unwrap();
let rocks_path_prefix: Option<String> = Some(String::new());
let rocks = RocksPermanentStorage::new(rocks_path_prefix).await.unwrap();
rocks.save_accounts(accounts).await.unwrap();
rocks.state.write_slots(slots, BlockNumber::ZERO);

Expand Down

0 comments on commit 0d4c5e9

Please sign in to comment.