Skip to content

Commit

Permalink
Merge branch 'main' into rocks_isolation_and_versioning
Browse files Browse the repository at this point in the history
  • Loading branch information
marcospb19-cw authored Aug 1, 2024
2 parents cef40ae + 533e9f4 commit 9461253
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 80 deletions.
2 changes: 1 addition & 1 deletion config/stratus.env.local
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
RUST_LOG=info,stratus::eth::rpc::rpc_subscriptions::rx=off,stratus::eth::consensus::rx=off,stratus::eth::consensus=off
RUST_LOG=info,stratus::eth::rpc::rpc_subscriptions::rx=off,stratus::eth::consensus::rx=off,stratus::eth::consensus=off,jsonrpsee-server=debug

CHAIN_ID=2008
EVMS=1
Expand Down
76 changes: 1 addition & 75 deletions src/eth/rpc/rpc_middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use jsonrpsee::types::error::INTERNAL_ERROR_CODE;
use jsonrpsee::types::Params;
use jsonrpsee::MethodResponse;
use pin_project::pin_project;
use pin_project::pinned_drop;
use tracing::field;
use tracing::info_span;
use tracing::Level;
Expand Down Expand Up @@ -41,68 +40,6 @@ use crate::infra::tracing::new_cid;
use crate::infra::tracing::SpanExt;
use crate::infra::tracing::TracingExt;

// -----------------------------------------------------------------------------
// Active requests tracking
// -----------------------------------------------------------------------------
#[cfg(feature = "metrics")]
mod active_requests {
use std::collections::HashMap;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::RwLock;

use lazy_static::lazy_static;

use crate::eth::rpc::RpcClientApp;
use crate::infra::metrics;

lazy_static! {
pub static ref COUNTERS: ActiveRequests = ActiveRequests::default();
}

#[derive(Default)]
pub struct ActiveRequests {
inner: RwLock<HashMap<String, Arc<AtomicU64>>>,
}

impl ActiveRequests {
pub fn inc(&self, client: &RpcClientApp, method: &str) {
let active = self.counter_for(client, method).fetch_add(1, Ordering::Relaxed) + 1;
metrics::set_rpc_requests_active(active, client, method);
}

pub fn dec(&self, client: &RpcClientApp, method: &str) {
let active = self
.counter_for(client, method)
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
let new = current.saturating_sub(1);
Some(new)
})
.unwrap();
let active = active.saturating_sub(1);
metrics::set_rpc_requests_active(active, client, method);
}

fn counter_for(&self, client: &RpcClientApp, method: &str) -> Arc<AtomicU64> {
let id = format!("{}::{}", client, method);

// try to read counter
let active_requests_read = self.inner.read().unwrap();
if let Some(counter) = active_requests_read.get(&id) {
return Arc::clone(counter);
}
drop(active_requests_read);

// create a new counter
let mut active_requests_write = self.inner.write().unwrap();
let counter = Arc::new(AtomicU64::new(0));
active_requests_write.insert(id, Arc::clone(&counter));
counter
}
}
}

// -----------------------------------------------------------------------------
// Request handling
// -----------------------------------------------------------------------------
Expand Down Expand Up @@ -174,7 +111,6 @@ impl<'a> RpcServiceT<'a> for RpcMiddleware {
#[cfg(feature = "metrics")]
{
let tx_ref = tx.as_ref();
active_requests::COUNTERS.inc(&client, &method);
metrics::inc_rpc_requests_started(&client, &method, tx_ref.map(|tx| tx.contract), tx_ref.map(|tx| tx.function));
}
drop(middleware_enter);
Expand All @@ -198,7 +134,7 @@ impl<'a> RpcServiceT<'a> for RpcMiddleware {
// -----------------------------------------------------------------------------

/// https://blog.adamchalmers.com/pin-unpin/
#[pin_project(PinnedDrop)]
#[pin_project]
pub struct RpcResponse<'a> {
// identifiers
client: RpcClientApp,
Expand Down Expand Up @@ -282,16 +218,6 @@ impl<'a> Future for RpcResponse<'a> {
}
}

#[pinned_drop]
impl PinnedDrop for RpcResponse<'_> {
fn drop(self: std::pin::Pin<&mut Self>) {
#[cfg(feature = "metrics")]
{
active_requests::COUNTERS.dec(&self.client, &self.method);
}
}
}

// -----------------------------------------------------------------------------
// Helpers
// -----------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion src/infra/metrics/metrics_definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ metrics! {
group: json_rpc,

"Number of JSON-RPC requests active right now."
gauge rpc_requests_active{client, method},
gauge rpc_requests_active{},

"Number of JSON-RPC requests that started."
counter rpc_requests_started{client, method, contract, function},
Expand Down
30 changes: 27 additions & 3 deletions src/infra/tracing/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use tonic::metadata::MetadataKey;
use tonic::metadata::MetadataMap;
use tracing::span;
use tracing::span::Attributes;
use tracing::Event;
use tracing::Span;
use tracing::Subscriber;
use tracing_serde::fields::AsMap;
Expand All @@ -52,6 +51,8 @@ use crate::ext::spawn_named;
use crate::ext::to_json_string;
use crate::ext::to_json_value;
use crate::infra::build_info;
#[cfg(feature = "metrics")]
use crate::infra::metrics;
use crate::infra::tracing::TracingConfig;
use crate::infra::tracing::TracingLogFormat;
use crate::infra::tracing::TracingProtocol;
Expand Down Expand Up @@ -310,27 +311,50 @@ where
}
});

// TODO: temporary metrics from events
let fields = to_json_value(event.field_map());
#[cfg(feature = "metrics")]
{
event_to_metrics(&fields);
}

// parse metadata and event
let log = TracingLog {
timestamp: Utc::now(),
level: meta.level().as_serde(),
target: meta.target(),
thread: std::thread::current(),
fields: event.field_map(),
fields,
context,
};

writeln!(writer, "{}", to_json_string(&log))
}
}

#[cfg(feature = "metrics")]
fn event_to_metrics(json: &JsonValue) {
let Some(message) = json.as_object().and_then(|obj| obj.get("message")).and_then(|msg| msg.as_str()) else {
return;
};

// jsonrpsee active connections
let Some(message) = message.strip_prefix("Accepting new connection ") else {
return;
};
let Some((current, _)) = message.split_once('/') else { return };
let Ok(current) = current.parse::<u64>() else { return };
metrics::set_rpc_requests_active(current);
}

#[derive(derive_new::new)]
struct TracingLog<'a> {
timestamp: DateTime<Utc>,
level: SerializeLevel<'a>,
target: &'a str,
thread: Thread,
fields: SerializeFieldMap<'a, Event<'a>>,
// fields: SerializeFieldMap<'a, Event<'a>>,
fields: JsonValue,
context: Option<TracingLogContextField<'a>>,
}

Expand Down

0 comments on commit 9461253

Please sign in to comment.