diff --git a/src/eth/consensus.rs b/src/eth/consensus.rs index 7ba437ea5..27a33b19f 100644 --- a/src/eth/consensus.rs +++ b/src/eth/consensus.rs @@ -1,6 +1,4 @@ use std::env; -use std::fs::File; -use std::io::Read; use std::time::Duration; use anyhow::anyhow; @@ -28,7 +26,6 @@ pub struct Entry { pub struct Consensus { pub sender: Sender, - node_name: String, leader_name: String, //XXX current_index: AtomicU64, } @@ -37,7 +34,7 @@ impl Consensus { //XXX for now we pick the leader name from the environment // the correct is to have a leader election algorithm pub fn new(leader_name: Option) -> Self { - let Some(node_name) = Self::current_node() else { + let Some(_node_name) = Self::current_node() else { tracing::info!("No consensus module available, running in standalone mode"); return Self::new_stand_alone(); }; @@ -47,36 +44,41 @@ impl Consensus { return Self::new_stand_alone(); }; + tracing::info!("Starting consensus module with leader: {}", leader_name); + let (sender, mut receiver) = mpsc::channel::(32); + let leader_name_clone = leader_name.clone(); tokio::spawn(async move { let followers = Self::discover_followers().await.expect("Failed to discover followers"); - while let Some(data) = receiver.recv().await { - //TODO add data to consensus-log-transactions - //TODO at the begining of temp-storage, load the consensus-log-transactions so the index becomes clear - tracing::info!("Received data: {}", data); + tracing::info!( + "Discovered followers: {}", + followers.iter().map(|f| f.to_string()).collect::>().join(", ") + ); - //TODO use gRPC instead of jsonrpc - //FIXME for now, this has no colateral efects, but it will have in the future - match Self::append_entries_to_followers(vec![Entry { index: 0, data: data.clone() }], followers.clone()).await { - Ok(_) => { - tracing::info!("Data sent to followers: {}", data); - } - Err(e) => { - tracing::error!("Failed to send data to followers: {}", e); + while let Some(data) = receiver.recv().await { + if Self::is_leader(leader_name_clone.clone()) { + //TODO add data to consensus-log-transactions + //TODO at the begining of temp-storage, load the consensus-log-transactions so the index becomes clear + tracing::info!("Received data to append: {}", data); + + //TODO use gRPC instead of jsonrpc + //FIXME for now, this has no colateral efects, but it will have in the future + match Self::append_entries_to_followers(vec![Entry { index: 0, data: data.clone() }], followers.clone()).await { + Ok(_) => { + tracing::info!("Data sent to followers: {}", data); + } + Err(e) => { + //TODO rediscover followers on comunication error + tracing::error!("Failed to send data to followers: {}", e); + } } } - //TODO rediscover followers on comunication error - //TODO this is where we will send the data to the followers } }); - Self { - node_name, - leader_name, - sender, - } + Self { leader_name, sender } } fn new_stand_alone() -> Self { @@ -89,25 +91,23 @@ impl Consensus { }); Self { - node_name: "standalone".to_string(), leader_name: "standalone".to_string(), sender, } } - pub fn is_leader(&self) -> bool { - self.node_name == self.leader_name + //FIXME TODO automate the way we gather the leader, instead of using a env var + pub fn is_leader(leader_name: String) -> bool { + Self::current_node().unwrap_or("".to_string()) == leader_name } - pub fn is_follower(&self) -> bool { - !self.is_leader() + pub fn is_follower(leader_name: String) -> bool { + !Self::is_leader(leader_name) } fn current_node() -> Option { - let mut file = File::open("/etc/hostname").ok()?; - let mut contents = String::new(); - file.read_to_string(&mut contents).ok()?; - Some(contents.trim().to_string()) + let pod_name = env::var("MY_POD_NAME").ok()?; + Some(pod_name.trim().to_string()) } fn current_namespace() -> Option { @@ -119,7 +119,7 @@ impl Consensus { // later we want the leader to GENERATE blocks // and even later we want this sync to be replaced by a gossip protocol or raft pub fn get_chain_url(&self, config: RunWithImporterConfig) -> (String, Option) { - if self.is_follower() { + if Self::is_follower(self.leader_name.clone()) { if let Some(namespace) = Self::current_namespace() { return (format!("http://{}.stratus-api.{}.svc.cluster.local:3000", self.leader_name, namespace), None); }