Skip to content

Commit

Permalink
fix: append entries is supposed to be done only by the leaders (#906)
Browse files Browse the repository at this point in the history
  • Loading branch information
renancloudwalk authored May 23, 2024
1 parent f89b2fe commit d97f6a8
Showing 1 changed file with 33 additions and 33 deletions.
66 changes: 33 additions & 33 deletions src/eth/consensus.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use std::env;
use std::fs::File;
use std::io::Read;
use std::time::Duration;

use anyhow::anyhow;
Expand Down Expand Up @@ -28,7 +26,6 @@ pub struct Entry {

pub struct Consensus {
pub sender: Sender<String>,
node_name: String,
leader_name: String,
//XXX current_index: AtomicU64,
}
Expand All @@ -37,7 +34,7 @@ impl Consensus {
//XXX for now we pick the leader name from the environment
// the correct is to have a leader election algorithm
pub fn new(leader_name: Option<String>) -> Self {
let Some(node_name) = Self::current_node() else {
let Some(_node_name) = Self::current_node() else {
tracing::info!("No consensus module available, running in standalone mode");
return Self::new_stand_alone();
};
Expand All @@ -47,36 +44,41 @@ impl Consensus {
return Self::new_stand_alone();
};

tracing::info!("Starting consensus module with leader: {}", leader_name);

let (sender, mut receiver) = mpsc::channel::<String>(32);

let leader_name_clone = leader_name.clone();
tokio::spawn(async move {
let followers = Self::discover_followers().await.expect("Failed to discover followers");

while let Some(data) = receiver.recv().await {
//TODO add data to consensus-log-transactions
//TODO at the begining of temp-storage, load the consensus-log-transactions so the index becomes clear
tracing::info!("Received data: {}", data);
tracing::info!(
"Discovered followers: {}",
followers.iter().map(|f| f.to_string()).collect::<Vec<String>>().join(", ")
);

//TODO use gRPC instead of jsonrpc
//FIXME for now, this has no colateral efects, but it will have in the future
match Self::append_entries_to_followers(vec![Entry { index: 0, data: data.clone() }], followers.clone()).await {
Ok(_) => {
tracing::info!("Data sent to followers: {}", data);
}
Err(e) => {
tracing::error!("Failed to send data to followers: {}", e);
while let Some(data) = receiver.recv().await {
if Self::is_leader(leader_name_clone.clone()) {
//TODO add data to consensus-log-transactions
//TODO at the begining of temp-storage, load the consensus-log-transactions so the index becomes clear
tracing::info!("Received data to append: {}", data);

//TODO use gRPC instead of jsonrpc
//FIXME for now, this has no colateral efects, but it will have in the future
match Self::append_entries_to_followers(vec![Entry { index: 0, data: data.clone() }], followers.clone()).await {
Ok(_) => {
tracing::info!("Data sent to followers: {}", data);
}
Err(e) => {
//TODO rediscover followers on comunication error
tracing::error!("Failed to send data to followers: {}", e);
}
}
}
//TODO rediscover followers on comunication error
//TODO this is where we will send the data to the followers
}
});

Self {
node_name,
leader_name,
sender,
}
Self { leader_name, sender }
}

fn new_stand_alone() -> Self {
Expand All @@ -89,25 +91,23 @@ impl Consensus {
});

Self {
node_name: "standalone".to_string(),
leader_name: "standalone".to_string(),
sender,
}
}

pub fn is_leader(&self) -> bool {
self.node_name == self.leader_name
//FIXME TODO automate the way we gather the leader, instead of using a env var
pub fn is_leader(leader_name: String) -> bool {
Self::current_node().unwrap_or("".to_string()) == leader_name
}

pub fn is_follower(&self) -> bool {
!self.is_leader()
pub fn is_follower(leader_name: String) -> bool {
!Self::is_leader(leader_name)
}

fn current_node() -> Option<String> {
let mut file = File::open("/etc/hostname").ok()?;
let mut contents = String::new();
file.read_to_string(&mut contents).ok()?;
Some(contents.trim().to_string())
let pod_name = env::var("MY_POD_NAME").ok()?;
Some(pod_name.trim().to_string())
}

fn current_namespace() -> Option<String> {
Expand All @@ -119,7 +119,7 @@ impl Consensus {
// later we want the leader to GENERATE blocks
// and even later we want this sync to be replaced by a gossip protocol or raft
pub fn get_chain_url(&self, config: RunWithImporterConfig) -> (String, Option<String>) {
if self.is_follower() {
if Self::is_follower(self.leader_name.clone()) {
if let Some(namespace) = Self::current_namespace() {
return (format!("http://{}.stratus-api.{}.svc.cluster.local:3000", self.leader_name, namespace), None);
}
Expand Down

0 comments on commit d97f6a8

Please sign in to comment.