From fd019e7df2b882779d1a05db6a20c027e606dac2 Mon Sep 17 00:00:00 2001 From: renancloudwalk Date: Thu, 30 May 2024 10:18:27 -0300 Subject: [PATCH 1/2] fix: followers gathering --- src/eth/consensus.rs | 69 ++++++++++++++++++++++++-------------------- 1 file changed, 38 insertions(+), 31 deletions(-) diff --git a/src/eth/consensus.rs b/src/eth/consensus.rs index d41ad6854..8defb35ec 100644 --- a/src/eth/consensus.rs +++ b/src/eth/consensus.rs @@ -73,7 +73,8 @@ pub mod consensus_kube { tracing::info!("Starting consensus module with leader: {}", leader_name); - let (sender, mut receiver) = mpsc::channel::(32); + let (sender, receiver) = mpsc::channel::(32); + let receiver = Arc::new(Mutex::new(receiver)); let last_arrived_block_number = AtomicU64::new(0); //TODO load from consensus storage @@ -84,37 +85,9 @@ pub mod consensus_kube { }; let consensus = Arc::new(consensus); - let consensus_channel = Arc::clone(&consensus); - tokio::spawn(async move { - let followers = Self::discover_followers().await.expect("Failed to discover followers"); - - tracing::info!( - "Discovered followers: {}", - followers.iter().map(|f| f.address.to_string()).collect::>().join(", ") - ); - - while let Some(data) = channel_read!(receiver) { - if consensus_channel.is_leader() { - //TODO add data to consensus-log-transactions - //TODO at the begining of temp-storage, load the consensus-log-transactions so the index becomes clear - tracing::info!(number = data.header.number.as_u64(), "received block to send to followers"); - - //TODO use gRPC instead of jsonrpc - //FIXME for now, this has no colateral efects, but it will have in the future - match Self::append_block_commit_to_followers(data.clone(), followers.clone()).await { - Ok(_) => { - tracing::info!(number = data.header.number.as_u64(), "Data sent to followers"); - } - Err(e) => { - //TODO rediscover followers on comunication error - tracing::error!("Failed to send data to followers: {}", e); - } - } - } - } - }); - + Self::initialize_append_entries_channel(Arc::clone(&consensus), Arc::clone(&receiver)); Self::initialize_server(Arc::clone(&consensus)); + consensus } @@ -136,6 +109,40 @@ pub mod consensus_kube { }) } + fn initialize_append_entries_channel(consensus_channel: Arc, receiver: Arc>>) { + tokio::spawn(async move { + let followers = Self::discover_followers().await.unwrap_or_default(); + + tracing::info!( + "Discovered followers: {}", + followers.iter().map(|f| f.address.to_string()).collect::>().join(", ") + ); + + loop { + let mut lock = receiver.lock().await; + if let Some(data) = lock.recv().await { + if consensus_channel.is_leader() { + //TODO add data to consensus-log-transactions + //TODO at the begining of temp-storage, load the consensus-log-transactions so the index becomes clear + tracing::info!(number = data.header.number.as_u64(), "received block to send to followers"); + + //TODO use gRPC instead of jsonrpc + //FIXME for now, this has no colateral efects, but it will have in the future + match Self::append_block_commit_to_followers(data.clone(), followers.clone()).await { + Ok(_) => { + tracing::info!(number = data.header.number.as_u64(), "Data sent to followers"); + } + Err(e) => { + //TODO rediscover followers on comunication error + tracing::error!("Failed to send data to followers: {}", e); + } + } + } + } + } + }); + } + fn initialize_server(consensus: Arc) { tokio::spawn(async move { tracing::info!("Starting append entry service at port 3777"); From 4db096443a0bf9c0942c083c4cb4ac418fcc6b22 Mon Sep 17 00:00:00 2001 From: renancloudwalk Date: Thu, 30 May 2024 10:32:12 -0300 Subject: [PATCH 2/2] lint --- src/eth/consensus.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/eth/consensus.rs b/src/eth/consensus.rs index 8defb35ec..4c97a51a0 100644 --- a/src/eth/consensus.rs +++ b/src/eth/consensus.rs @@ -119,8 +119,8 @@ pub mod consensus_kube { ); loop { - let mut lock = receiver.lock().await; - if let Some(data) = lock.recv().await { + let mut receiver_lock = receiver.lock().await; + if let Some(data) = receiver_lock.recv().await { if consensus_channel.is_leader() { //TODO add data to consensus-log-transactions //TODO at the begining of temp-storage, load the consensus-log-transactions so the index becomes clear