Skip to content

Commit

Permalink
Reutilize connection (#931)
Browse files Browse the repository at this point in the history
* chore: reutilize http2 client

* lint
  • Loading branch information
renancloudwalk authored May 27, 2024
1 parent 5ca4584 commit defdae6
Showing 1 changed file with 24 additions and 15 deletions.
39 changes: 24 additions & 15 deletions src/eth/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use serde::Serialize;
use tokio::sync::mpsc::Sender;
use tokio::sync::mpsc::{self};
use tokio::time::sleep;
use tonic::transport::Channel;
use tonic::transport::Server;
use tonic::Request;
use tonic::Response;
Expand All @@ -38,10 +39,16 @@ pub struct LogEntry {
data: String,
}

#[derive(Clone)]
struct Peer {
address: String,
client: RaftServiceClient<Channel>,
}

pub struct Consensus {
pub sender: Sender<String>,
leader_name: String,
//XXX current_index: AtomicU64,
leader_name: String, //XXX check the peers instead of using it
//XXX current_index: AtomicU64,
}

impl Consensus {
Expand All @@ -68,7 +75,7 @@ impl Consensus {

tracing::info!(
"Discovered followers: {}",
followers.iter().map(|f| f.to_string()).collect::<Vec<String>>().join(", ")
followers.iter().map(|f| f.address.to_string()).collect::<Vec<String>>().join(", ")
);

while let Some(data) = receiver.recv().await {
Expand Down Expand Up @@ -154,7 +161,7 @@ impl Consensus {
}

#[tracing::instrument(skip_all)]
pub async fn discover_followers() -> Result<Vec<String>, anyhow::Error> {
pub async fn discover_followers() -> Result<Vec<Peer>, anyhow::Error> {
let client = Client::try_default().await?;
let pods: Api<Pod> = Api::namespaced(client, &Self::current_namespace().unwrap_or("default".to_string()));

Expand All @@ -166,7 +173,11 @@ impl Consensus {
if let Some(pod_name) = p.metadata.name {
if pod_name != Self::current_node().unwrap() {
if let Some(namespace) = Self::current_namespace() {
followers.push(format!("http://{}.stratus-api.{}.svc.cluster.local:3777", pod_name, namespace));
let address = format!("http://{}.stratus-api.{}.svc.cluster.local:3777", pod_name, namespace);
let client = RaftServiceClient::connect(address.clone()).await?;

let peer = Peer { address, client };
followers.push(peer);
}
}
}
Expand All @@ -175,12 +186,10 @@ impl Consensus {
Ok(followers)
}

async fn append_entries(follower: String, entries: Vec<LogEntry>) -> Result<(), anyhow::Error> {
async fn append_entries(mut follower: Peer, entries: Vec<LogEntry>) -> Result<(), anyhow::Error> {
#[cfg(feature = "metrics")]
let start = metrics::now();

let mut client = RaftServiceClient::connect(follower.clone()).await?;

for attempt in 1..=RETRY_ATTEMPTS {
let grpc_entries: Vec<Entry> = entries
.iter()
Expand All @@ -191,41 +200,41 @@ impl Consensus {
.collect();

let request = Request::new(AppendEntriesRequest { entries: grpc_entries });
let response = client.append_entries(request).await;
let response = follower.client.append_entries(request).await;

match response {
Ok(resp) =>
if resp.into_inner().success {
#[cfg(not(feature = "metrics"))]
tracing::debug!("Entries appended to follower {}: attempt {}: success", follower, attempt);
tracing::debug!("Entries appended to follower {}: attempt {}: success", follower.address, attempt);
#[cfg(feature = "metrics")]
tracing::debug!(
"Entries appended to follower {}: attempt {}: success time_elapsed: {:?}",
follower,
follower.address,
attempt,
start.elapsed()
);
return Ok(());
},
Err(e) => tracing::error!("Error appending entries to follower {}: attempt {}: {:?}", follower, attempt, e),
Err(e) => tracing::error!("Error appending entries to follower {}: attempt {}: {:?}", follower.address, attempt, e),
}
sleep(RETRY_DELAY).await;
}

#[cfg(feature = "metrics")]
metrics::inc_append_entries(start.elapsed());

Err(anyhow!("Failed to append entries to {} after {} attempts", follower, RETRY_ATTEMPTS))
Err(anyhow!("Failed to append entries to {} after {} attempts", follower.address, RETRY_ATTEMPTS))
}

#[tracing::instrument(skip_all)]
pub async fn append_entries_to_followers(entries: Vec<LogEntry>, followers: Vec<String>) -> Result<(), anyhow::Error> {
pub async fn append_entries_to_followers(entries: Vec<LogEntry>, followers: Vec<Peer>) -> Result<(), anyhow::Error> {
#[cfg(feature = "metrics")]
let start = metrics::now();
for entry in entries {
for follower in &followers {
if let Err(e) = Self::append_entries(follower.clone(), vec![entry.clone()]).await {
tracing::debug!("Error appending entry to follower {}: {:?}", follower, e);
tracing::debug!("Error appending entry to follower {}: {:?}", follower.address, e);
}
}
}
Expand Down

0 comments on commit defdae6

Please sign in to comment.