Skip to content

Commit

Permalink
fix: followers gathering (#956)
Browse files Browse the repository at this point in the history
  • Loading branch information
renancloudwalk authored May 30, 2024
1 parent 1d07175 commit 21ff711
Showing 1 changed file with 38 additions and 31 deletions.
69 changes: 38 additions & 31 deletions src/eth/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ pub mod consensus_kube {

tracing::info!("Starting consensus module with leader: {}", leader_name);

let (sender, mut receiver) = mpsc::channel::<Block>(32);
let (sender, receiver) = mpsc::channel::<Block>(32);
let receiver = Arc::new(Mutex::new(receiver));

let last_arrived_block_number = AtomicU64::new(0); //TODO load from consensus storage

Expand All @@ -84,37 +85,9 @@ pub mod consensus_kube {
};
let consensus = Arc::new(consensus);

let consensus_channel = Arc::clone(&consensus);
tokio::spawn(async move {
let followers = Self::discover_followers().await.expect("Failed to discover followers");

tracing::info!(
"Discovered followers: {}",
followers.iter().map(|f| f.address.to_string()).collect::<Vec<String>>().join(", ")
);

while let Some(data) = channel_read!(receiver) {
if consensus_channel.is_leader() {
//TODO add data to consensus-log-transactions
//TODO at the begining of temp-storage, load the consensus-log-transactions so the index becomes clear
tracing::info!(number = data.header.number.as_u64(), "received block to send to followers");

//TODO use gRPC instead of jsonrpc
//FIXME for now, this has no colateral efects, but it will have in the future
match Self::append_block_commit_to_followers(data.clone(), followers.clone()).await {
Ok(_) => {
tracing::info!(number = data.header.number.as_u64(), "Data sent to followers");
}
Err(e) => {
//TODO rediscover followers on comunication error
tracing::error!("Failed to send data to followers: {}", e);
}
}
}
}
});

Self::initialize_append_entries_channel(Arc::clone(&consensus), Arc::clone(&receiver));
Self::initialize_server(Arc::clone(&consensus));

consensus
}

Expand All @@ -136,6 +109,40 @@ pub mod consensus_kube {
})
}

fn initialize_append_entries_channel(consensus_channel: Arc<Consensus>, receiver: Arc<Mutex<mpsc::Receiver<Block>>>) {
tokio::spawn(async move {
let followers = Self::discover_followers().await.unwrap_or_default();

tracing::info!(
"Discovered followers: {}",
followers.iter().map(|f| f.address.to_string()).collect::<Vec<String>>().join(", ")
);

loop {
let mut receiver_lock = receiver.lock().await;
if let Some(data) = receiver_lock.recv().await {
if consensus_channel.is_leader() {
//TODO add data to consensus-log-transactions
//TODO at the begining of temp-storage, load the consensus-log-transactions so the index becomes clear
tracing::info!(number = data.header.number.as_u64(), "received block to send to followers");

//TODO use gRPC instead of jsonrpc
//FIXME for now, this has no colateral efects, but it will have in the future
match Self::append_block_commit_to_followers(data.clone(), followers.clone()).await {
Ok(_) => {
tracing::info!(number = data.header.number.as_u64(), "Data sent to followers");
}
Err(e) => {
//TODO rediscover followers on comunication error
tracing::error!("Failed to send data to followers: {}", e);
}
}
}
}
}
});
}

fn initialize_server(consensus: Arc<Consensus>) {
tokio::spawn(async move {
tracing::info!("Starting append entry service at port 3777");
Expand Down

0 comments on commit 21ff711

Please sign in to comment.