Skip to content

Commit

Permalink
add timed async wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
gabriel-aranha-cw committed Jun 25, 2024
1 parent aad26e5 commit 3374152
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 38 deletions.
69 changes: 32 additions & 37 deletions src/eth/consensus/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::eth::consensus::Role;
use crate::eth::Consensus;
#[cfg(feature = "metrics")]
use crate::infra::metrics;
use crate::infra::metrics::timed_async;

#[cfg(feature = "metrics")]
mod label {
Expand All @@ -37,9 +38,6 @@ impl AppendEntryService for AppendEntryServiceImpl {
&self,
request: Request<AppendTransactionExecutionsRequest>,
) -> Result<Response<AppendTransactionExecutionsResponse>, Status> {
#[cfg(feature = "metrics")]
let start = std::time::Instant::now();

let consensus = self.consensus.lock().await;
let request_inner = request.into_inner();

Expand All @@ -62,20 +60,18 @@ impl AppendEntryService for AppendEntryServiceImpl {
consensus.reset_heartbeat_signal.notify_waiters();
//TODO change cached index from consensus after the storage is implemented

#[cfg(feature = "metrics")]
metrics::inc_consensus_grpc_requests_finished(start.elapsed(), label::APPEND_TRANSACTION_EXECUTIONS);

Ok(Response::new(AppendTransactionExecutionsResponse {
status: StatusCode::AppendSuccess as i32,
message: "transaction Executions appended successfully".into(),
last_committed_block_number: 0,
}))
timed_async(async {
Ok(Response::new(AppendTransactionExecutionsResponse {
status: StatusCode::AppendSuccess as i32,
message: "transaction Executions appended successfully".into(),
last_committed_block_number: 0,
}))
}).await.with(|m| {
metrics::inc_consensus_grpc_requests_finished(m.elapsed, label::APPEND_TRANSACTION_EXECUTIONS, m.result.is_ok());
})
}

async fn append_block_commit(&self, request: Request<AppendBlockCommitRequest>) -> Result<Response<AppendBlockCommitResponse>, Status> {
#[cfg(feature = "metrics")]
let start = std::time::Instant::now();

let consensus = self.consensus.lock().await;
let request_inner = request.into_inner();

Expand Down Expand Up @@ -121,20 +117,18 @@ impl AppendEntryService for AppendEntryServiceImpl {
"last arrived block number set",
);

#[cfg(feature = "metrics")]
metrics::inc_consensus_grpc_requests_finished(start.elapsed(), label::APPEND_BLOCK_COMMIT);

Ok(Response::new(AppendBlockCommitResponse {
status: StatusCode::AppendSuccess as i32,
message: "Block Commit appended successfully".into(),
last_committed_block_number: consensus.last_arrived_block_number.load(Ordering::SeqCst),
}))
timed_async(async {
Ok(Response::new(AppendBlockCommitResponse {
status: StatusCode::AppendSuccess as i32,
message: "Block Commit appended successfully".into(),
last_committed_block_number: consensus.last_arrived_block_number.load(Ordering::SeqCst),
}))
}).await.with(|m| {
metrics::inc_consensus_grpc_requests_finished(m.elapsed, label::APPEND_BLOCK_COMMIT, m.result.is_ok());
})
}

async fn request_vote(&self, request: Request<RequestVoteRequest>) -> Result<Response<RequestVoteResponse>, Status> {
#[cfg(feature = "metrics")]
let start = std::time::Instant::now();

let request = request.into_inner();
let consensus = self.consensus.lock().await;
let current_term = consensus.current_term.load(Ordering::SeqCst);
Expand Down Expand Up @@ -171,18 +165,19 @@ impl AppendEntryService for AppendEntryServiceImpl {
}));
}

#[cfg(feature = "metrics")]
metrics::inc_consensus_grpc_requests_finished(start.elapsed(), label::REQUEST_VOTE);

Ok(Response::new(RequestVoteResponse {
term: request.term,
vote_granted: false,
message: format!(
"index is bellow expectation: last_log_index {}, last_arrived_block_number {}",
request.last_log_index,
consensus.last_arrived_block_number.load(Ordering::SeqCst)
),
}))
timed_async(async {
Ok(Response::new(RequestVoteResponse {
term: request.term,
vote_granted: false,
message: format!(
"index is bellow expectation: last_log_index {}, last_arrived_block_number {}",
request.last_log_index,
consensus.last_arrived_block_number.load(Ordering::SeqCst)
),
}))
}).await.with(|m| {
metrics::inc_consensus_grpc_requests_finished(m.elapsed, label::REQUEST_VOTE, m.result.is_ok());
})
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/infra/metrics/metrics_definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ metrics! {
counter consensus_leadership_change{},

"Time to run gRPC requests that finished."
histogram_duration consensus_grpc_requests_finished{method},
histogram_duration consensus_grpc_requests_finished{method, success},

"The amount of available peers."
gauge consensus_available_peers{}
Expand Down
61 changes: 61 additions & 0 deletions src/infra/metrics/metrics_types.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::borrow::Cow;
use std::future::Future;
use std::time::Duration;

use metrics::describe_counter;
Expand Down Expand Up @@ -156,3 +157,63 @@ impl<T> Timed<T> {
self.result
}
}

// -----------------------------------------------------------------------------
// TimedAsync
// -----------------------------------------------------------------------------
pub struct TimedAsync<T> {
pub elapsed: Duration,
pub result: T,
}

impl<T> TimedAsync<T> {
#[cfg(feature = "metrics")]
#[inline(always)]
/// Applies the provided function to the current metrified execution.
pub fn with<F>(self, f: F) -> T
where
F: FnOnce(&TimedAsync<T>),
{
f(&self);
self.result
}

#[cfg(not(feature = "metrics"))]
#[inline(always)]
/// Do nothing because the `metrics` function is disabled.
pub fn with<F>(self, _: F) -> T
where
F: FnOnce(&TimedAsync<T>),
{
self.result
}
}

#[cfg(feature = "metrics")]
/// Measures how long the provided async function takes to execute.
///
/// Returns a wrapper that allows using it to record metrics if the `metrics` feature is enabled.
pub async fn timed_async<Fut, T>(future: Fut) -> TimedAsync<T>
where
Fut: Future<Output = T>,
{
let start = crate::infra::metrics::now();
let result = future.await;
TimedAsync {
elapsed: start.elapsed(),
result,
}
}

#[cfg(not(feature = "metrics"))]
/// Executes the provided async function
pub async fn timed_async<Fut, T>(future: Fut) -> TimedAsync<T>
where
Fut: Future<Output = T>,
{
let result = future.await;
TimedAsync {
elapsed: Duration::default(),
result,
}
}

0 comments on commit 3374152

Please sign in to comment.