Skip to content

Commit

Permalink
fix: append entries is supposed to be done only by the leaders
Browse files Browse the repository at this point in the history
  • Loading branch information
renancloudwalk committed May 23, 2024
1 parent f89b2fe commit 9f3317b
Showing 1 changed file with 26 additions and 19 deletions.
45 changes: 26 additions & 19 deletions src/eth/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,28 +47,34 @@ impl Consensus {
return Self::new_stand_alone();
};

tracing::info!("Starting consensus module with leader: {}", leader_name);

let (sender, mut receiver) = mpsc::channel::<String>(32);

let leader_name_clone = leader_name.clone();
tokio::spawn(async move {
let followers = Self::discover_followers().await.expect("Failed to discover followers");

while let Some(data) = receiver.recv().await {
//TODO add data to consensus-log-transactions
//TODO at the begining of temp-storage, load the consensus-log-transactions so the index becomes clear
tracing::info!("Received data: {}", data);
tracing::info!("Discovered followers: {}", followers.iter().map(|f| f.to_string()).collect::<Vec<String>>().join(", "));

//TODO use gRPC instead of jsonrpc
//FIXME for now, this has no colateral efects, but it will have in the future
match Self::append_entries_to_followers(vec![Entry { index: 0, data: data.clone() }], followers.clone()).await {
Ok(_) => {
tracing::info!("Data sent to followers: {}", data);
}
Err(e) => {
tracing::error!("Failed to send data to followers: {}", e);
while let Some(data) = receiver.recv().await {
if Self::is_leader(leader_name_clone.clone()) {
//TODO add data to consensus-log-transactions
//TODO at the begining of temp-storage, load the consensus-log-transactions so the index becomes clear
tracing::info!("Received data to append: {}", data);

//TODO use gRPC instead of jsonrpc
//FIXME for now, this has no colateral efects, but it will have in the future
match Self::append_entries_to_followers(vec![Entry { index: 0, data: data.clone() }], followers.clone()).await {
Ok(_) => {
tracing::info!("Data sent to followers: {}", data);
}
Err(e) => {
//TODO rediscover followers on comunication error
tracing::error!("Failed to send data to followers: {}", e);
}
}
}
//TODO rediscover followers on comunication error
//TODO this is where we will send the data to the followers
}
});

Expand All @@ -95,12 +101,13 @@ impl Consensus {
}
}

pub fn is_leader(&self) -> bool {
self.node_name == self.leader_name
//FIXME TODO automate the way we gather the leader, instead of using a env var
pub fn is_leader(leader_name: String) -> bool {
Self::current_node().unwrap_or("".to_string()) == leader_name
}

pub fn is_follower(&self) -> bool {
!self.is_leader()
pub fn is_follower(leader_name: String) -> bool {
!Self::is_leader(leader_name)
}

fn current_node() -> Option<String> {
Expand All @@ -119,7 +126,7 @@ impl Consensus {
// later we want the leader to GENERATE blocks
// and even later we want this sync to be replaced by a gossip protocol or raft
pub fn get_chain_url(&self, config: RunWithImporterConfig) -> (String, Option<String>) {
if self.is_follower() {
if Self::is_follower(self.leader_name.clone()) {
if let Some(namespace) = Self::current_namespace() {
return (format!("http://{}.stratus-api.{}.svc.cluster.local:3000", self.leader_name, namespace), None);
}
Expand Down

0 comments on commit 9f3317b

Please sign in to comment.