diff --git a/src/eth/consensus/server.rs b/src/eth/consensus/server.rs index 2c91d2e89..8a44453b0 100644 --- a/src/eth/consensus/server.rs +++ b/src/eth/consensus/server.rs @@ -19,6 +19,7 @@ use crate::eth::consensus::Role; use crate::eth::Consensus; #[cfg(feature = "metrics")] use crate::infra::metrics; +use crate::infra::metrics::timed_async; #[cfg(feature = "metrics")] mod label { @@ -37,9 +38,6 @@ impl AppendEntryService for AppendEntryServiceImpl { &self, request: Request, ) -> Result, Status> { - #[cfg(feature = "metrics")] - let start = std::time::Instant::now(); - let consensus = self.consensus.lock().await; let request_inner = request.into_inner(); @@ -62,20 +60,18 @@ impl AppendEntryService for AppendEntryServiceImpl { consensus.reset_heartbeat_signal.notify_waiters(); //TODO change cached index from consensus after the storage is implemented - #[cfg(feature = "metrics")] - metrics::inc_consensus_grpc_requests_finished(start.elapsed(), label::APPEND_TRANSACTION_EXECUTIONS); - - Ok(Response::new(AppendTransactionExecutionsResponse { - status: StatusCode::AppendSuccess as i32, - message: "transaction Executions appended successfully".into(), - last_committed_block_number: 0, - })) + timed_async(async { + Ok(Response::new(AppendTransactionExecutionsResponse { + status: StatusCode::AppendSuccess as i32, + message: "transaction Executions appended successfully".into(), + last_committed_block_number: 0, + })) + }).await.with(|m| { + metrics::inc_consensus_grpc_requests_finished(m.elapsed, label::APPEND_TRANSACTION_EXECUTIONS, m.result.is_ok()); + }) } async fn append_block_commit(&self, request: Request) -> Result, Status> { - #[cfg(feature = "metrics")] - let start = std::time::Instant::now(); - let consensus = self.consensus.lock().await; let request_inner = request.into_inner(); @@ -121,20 +117,18 @@ impl AppendEntryService for AppendEntryServiceImpl { "last arrived block number set", ); - #[cfg(feature = "metrics")] - metrics::inc_consensus_grpc_requests_finished(start.elapsed(), label::APPEND_BLOCK_COMMIT); - - Ok(Response::new(AppendBlockCommitResponse { - status: StatusCode::AppendSuccess as i32, - message: "Block Commit appended successfully".into(), - last_committed_block_number: consensus.last_arrived_block_number.load(Ordering::SeqCst), - })) + timed_async(async { + Ok(Response::new(AppendBlockCommitResponse { + status: StatusCode::AppendSuccess as i32, + message: "Block Commit appended successfully".into(), + last_committed_block_number: consensus.last_arrived_block_number.load(Ordering::SeqCst), + })) + }).await.with(|m| { + metrics::inc_consensus_grpc_requests_finished(m.elapsed, label::APPEND_BLOCK_COMMIT, m.result.is_ok()); + }) } async fn request_vote(&self, request: Request) -> Result, Status> { - #[cfg(feature = "metrics")] - let start = std::time::Instant::now(); - let request = request.into_inner(); let consensus = self.consensus.lock().await; let current_term = consensus.current_term.load(Ordering::SeqCst); @@ -171,18 +165,19 @@ impl AppendEntryService for AppendEntryServiceImpl { })); } - #[cfg(feature = "metrics")] - metrics::inc_consensus_grpc_requests_finished(start.elapsed(), label::REQUEST_VOTE); - - Ok(Response::new(RequestVoteResponse { - term: request.term, - vote_granted: false, - message: format!( - "index is bellow expectation: last_log_index {}, last_arrived_block_number {}", - request.last_log_index, - consensus.last_arrived_block_number.load(Ordering::SeqCst) - ), - })) + timed_async(async { + Ok(Response::new(RequestVoteResponse { + term: request.term, + vote_granted: false, + message: format!( + "index is bellow expectation: last_log_index {}, last_arrived_block_number {}", + request.last_log_index, + consensus.last_arrived_block_number.load(Ordering::SeqCst) + ), + })) + }).await.with(|m| { + metrics::inc_consensus_grpc_requests_finished(m.elapsed, label::REQUEST_VOTE, m.result.is_ok()); + }) } } diff --git a/src/infra/metrics/metrics_definitions.rs b/src/infra/metrics/metrics_definitions.rs index 62b9c8874..2fce29427 100644 --- a/src/infra/metrics/metrics_definitions.rs +++ b/src/infra/metrics/metrics_definitions.rs @@ -184,7 +184,7 @@ metrics! { counter consensus_leadership_change{}, "Time to run gRPC requests that finished." - histogram_duration consensus_grpc_requests_finished{method}, + histogram_duration consensus_grpc_requests_finished{method, success}, "The amount of available peers." gauge consensus_available_peers{} diff --git a/src/infra/metrics/metrics_types.rs b/src/infra/metrics/metrics_types.rs index 888e250b9..14764ea2c 100644 --- a/src/infra/metrics/metrics_types.rs +++ b/src/infra/metrics/metrics_types.rs @@ -1,4 +1,5 @@ use std::borrow::Cow; +use std::future::Future; use std::time::Duration; use metrics::describe_counter; @@ -156,3 +157,63 @@ impl Timed { self.result } } + +// ----------------------------------------------------------------------------- +// TimedAsync +// ----------------------------------------------------------------------------- +pub struct TimedAsync { + pub elapsed: Duration, + pub result: T, +} + +impl TimedAsync { + #[cfg(feature = "metrics")] + #[inline(always)] + /// Applies the provided function to the current metrified execution. + pub fn with(self, f: F) -> T + where + F: FnOnce(&TimedAsync), + { + f(&self); + self.result + } + + #[cfg(not(feature = "metrics"))] + #[inline(always)] + /// Do nothing because the `metrics` function is disabled. + pub fn with(self, _: F) -> T + where + F: FnOnce(&TimedAsync), + { + self.result + } +} + +#[cfg(feature = "metrics")] +/// Measures how long the provided async function takes to execute. +/// +/// Returns a wrapper that allows using it to record metrics if the `metrics` feature is enabled. +pub async fn timed_async(future: Fut) -> TimedAsync +where + Fut: Future, +{ + let start = crate::infra::metrics::now(); + let result = future.await; + TimedAsync { + elapsed: start.elapsed(), + result, + } +} + +#[cfg(not(feature = "metrics"))] +/// Executes the provided async function +pub async fn timed_async(future: Fut) -> TimedAsync +where + Fut: Future, +{ + let result = future.await; + TimedAsync { + elapsed: Duration::default(), + result, + } +} \ No newline at end of file