From 17d546c9b90b92bfb000567615ee21cf3fc0dcf1 Mon Sep 17 00:00:00 2001 From: adel Date: Tue, 26 Nov 2024 09:49:28 +0100 Subject: [PATCH] feat: Better logs integration with Signoz (#146) * dev(better_logs): Better tracing * dev(better_logs): better logs * dev(better_logs): logs * dev(better_logs): minor stuff * dev(better_logs): Removed AXIOM from readme --- README.md | 1 - pragma-ingestor/src/main.rs | 2 +- pragma-node/src/handlers/create_entry.rs | 15 +------- .../src/handlers/create_future_entry.rs | 15 +------- pragma-node/src/handlers/get_entry.rs | 4 +-- pragma-node/src/handlers/get_expiries.rs | 4 +-- pragma-node/src/handlers/get_ohlc.rs | 3 +- pragma-node/src/handlers/get_volatility.rs | 3 +- .../handlers/merkle_feeds/get_merkle_proof.rs | 3 +- .../src/handlers/merkle_feeds/get_option.rs | 6 +--- .../src/handlers/onchain/get_checkpoints.rs | 4 +-- pragma-node/src/handlers/onchain/get_entry.rs | 3 +- .../src/handlers/onchain/get_history.rs | 3 +- .../src/handlers/onchain/get_publishers.rs | 2 +- .../src/handlers/onchain/subscribe_to_ohlc.rs | 35 ++++++++++++++++++- .../src/handlers/subscribe_to_entry.rs | 31 +++++++++++++++- .../src/handlers/subscribe_to_price.rs | 30 +++++++++++++++- pragma-node/src/main.rs | 3 +- pragma-node/src/server/middlewares.rs | 29 +++++++++++++++ pragma-node/src/server/mod.rs | 3 ++ pragma-node/src/types/pricer.rs | 23 ++++++++++++ 21 files changed, 162 insertions(+), 60 deletions(-) create mode 100644 pragma-node/src/server/middlewares.rs diff --git a/README.md b/README.md index fe9f7073..0ed68271 100644 --- a/README.md +++ b/README.md @@ -173,7 +173,6 @@ export HOST="0.0.0.0" export PORT=3000 export METRICS_PORT=8080 export KAFKA_BROKERS=localhost:29092 -export AXIOM_TOKEN=xaat- # OPTIONAL export OTEL_EXPORTER_OTLP_ENDPOINT=localhost:4317 ``` diff --git a/pragma-ingestor/src/main.rs b/pragma-ingestor/src/main.rs index 5ec92357..6449b02c 100644 --- a/pragma-ingestor/src/main.rs +++ b/pragma-ingestor/src/main.rs @@ -40,7 +40,7 @@ async fn main() -> Result<(), Box> { } } -#[tracing::instrument(skip(pool))] +#[tracing::instrument(skip(pool, payload))] async fn process_payload(pool: &Pool, payload: Vec) -> Result<(), Box> { let decoded_payload = String::from_utf8_lossy(&payload); let is_future_entries = decoded_payload.contains("expiration_timestamp"); diff --git a/pragma-node/src/handlers/create_entry.rs b/pragma-node/src/handlers/create_entry.rs index f3b35678..ee246b2f 100644 --- a/pragma-node/src/handlers/create_entry.rs +++ b/pragma-node/src/handlers/create_entry.rs @@ -47,13 +47,12 @@ pub struct CreateEntryResponse { (status = 401, description = "Unauthorized Publisher", body = EntryError) ) )] -#[tracing::instrument] +#[tracing::instrument(skip(state))] pub async fn create_entries( State(state): State, extract::Json(new_entries): extract::Json, ) -> Result, EntryError> { tracing::info!("Received new entries: {:?}", new_entries); - let config = config().await; if new_entries.entries.is_empty() { @@ -77,12 +76,6 @@ pub async fn create_entries( let public_key = Felt::from_hex(&public_key) .map_err(|_| EntryError::PublisherError(PublisherError::InvalidKey(public_key)))?; - tracing::info!( - "Retrieved {:?} public key: {:?}", - publisher_name, - public_key - ); - // Fetch account address from database // TODO: Cache it let account_address = publisher_repository::get(&state.offchain_pool, publisher_name.clone()) @@ -92,12 +85,6 @@ pub async fn create_entries( let account_address = Felt::from_hex(&account_address) .map_err(|_| EntryError::PublisherError(PublisherError::InvalidAddress(account_address)))?; - tracing::info!( - "Retrieved {:?} account address: {:?}", - publisher_name, - account_address - ); - let signature = assert_request_signature_is_valid::( &new_entries, &account_address, diff --git a/pragma-node/src/handlers/create_future_entry.rs b/pragma-node/src/handlers/create_future_entry.rs index 5477b3c9..080cec78 100644 --- a/pragma-node/src/handlers/create_future_entry.rs +++ b/pragma-node/src/handlers/create_future_entry.rs @@ -47,13 +47,12 @@ pub struct CreateFutureEntryResponse { (status = 401, description = "Unauthorized Publisher", body = EntryError) ) )] -#[tracing::instrument] +#[tracing::instrument(skip(state))] pub async fn create_future_entries( State(state): State, extract::Json(new_entries): extract::Json, ) -> Result, EntryError> { tracing::info!("Received new future entries: {:?}", new_entries); - let config = config().await; if new_entries.entries.is_empty() { @@ -77,12 +76,6 @@ pub async fn create_future_entries( let public_key = Felt::from_hex(&public_key) .map_err(|_| EntryError::PublisherError(PublisherError::InvalidKey(public_key)))?; - tracing::info!( - "Retrieved {:?} public key: {:?}", - publisher_name, - public_key - ); - // Fetch account address from database // TODO: Cache it let account_address = publisher_repository::get(&state.offchain_pool, publisher_name.clone()) @@ -92,12 +85,6 @@ pub async fn create_future_entries( let account_address = Felt::from_hex(&account_address) .map_err(|_| EntryError::PublisherError(PublisherError::InvalidAddress(account_address)))?; - tracing::info!( - "Retrieved {:?} account address: {:?}", - publisher_name, - account_address - ); - let signature = assert_request_signature_is_valid::( &new_entries, &account_address, diff --git a/pragma-node/src/handlers/get_entry.rs b/pragma-node/src/handlers/get_entry.rs index 9d33e37e..12e204d6 100644 --- a/pragma-node/src/handlers/get_entry.rs +++ b/pragma-node/src/handlers/get_entry.rs @@ -102,14 +102,12 @@ pub struct GetEntryResponse { GetEntryParams, ), )] -#[tracing::instrument] +#[tracing::instrument(skip(state))] pub async fn get_entry( State(state): State, PathExtractor(pair): PathExtractor<(String, String)>, Query(params): Query, ) -> Result, EntryError> { - tracing::info!("Received get entry request for pair {:?}", pair); - let is_routing = params.routing.unwrap_or(false); let routing_params = RoutingParams::try_from(params)?; diff --git a/pragma-node/src/handlers/get_expiries.rs b/pragma-node/src/handlers/get_expiries.rs index 20ba322b..45c0cb97 100644 --- a/pragma-node/src/handlers/get_expiries.rs +++ b/pragma-node/src/handlers/get_expiries.rs @@ -21,13 +21,11 @@ use crate::utils::currency_pair_to_pair_id; ("quote" = String, Path, description = "Quote Asset"), ), )] -#[tracing::instrument] +#[tracing::instrument(skip(state))] pub async fn get_expiries( State(state): State, PathExtractor(pair): PathExtractor<(String, String)>, ) -> Result>, EntryError> { - tracing::info!("Received get expiries for pair {:?}", pair); - let pair_id = currency_pair_to_pair_id(&pair.0, &pair.1); let req_result = entry_repository::get_expiries_list(&state.offchain_pool, pair_id.clone()) diff --git a/pragma-node/src/handlers/get_ohlc.rs b/pragma-node/src/handlers/get_ohlc.rs index 24a21851..d125b143 100644 --- a/pragma-node/src/handlers/get_ohlc.rs +++ b/pragma-node/src/handlers/get_ohlc.rs @@ -30,13 +30,12 @@ pub struct GetOHLCResponse { GetEntryParams, ), )] -#[tracing::instrument] +#[tracing::instrument(skip(state))] pub async fn get_ohlc( State(state): State, PathExtractor(pair): PathExtractor<(String, String)>, Query(params): Query, ) -> Result, EntryError> { - tracing::info!("Received get entry request for pair {:?}", pair); // Construct pair id let pair_id = currency_pair_to_pair_id(&pair.0, &pair.1); diff --git a/pragma-node/src/handlers/get_volatility.rs b/pragma-node/src/handlers/get_volatility.rs index 647590ab..660f8186 100644 --- a/pragma-node/src/handlers/get_volatility.rs +++ b/pragma-node/src/handlers/get_volatility.rs @@ -38,13 +38,12 @@ pub struct GetVolatilityResponse { VolatilityQuery ), )] -#[tracing::instrument] +#[tracing::instrument(skip(state))] pub async fn get_volatility( State(state): State, PathExtractor(pair): PathExtractor<(String, String)>, Query(volatility_query): Query, ) -> Result, EntryError> { - tracing::info!("Received get volatility request for pair {:?}", pair); // Construct pair id let pair_id = currency_pair_to_pair_id(&pair.0, &pair.1); diff --git a/pragma-node/src/handlers/merkle_feeds/get_merkle_proof.rs b/pragma-node/src/handlers/merkle_feeds/get_merkle_proof.rs index b7443ea9..0c688c3c 100644 --- a/pragma-node/src/handlers/merkle_feeds/get_merkle_proof.rs +++ b/pragma-node/src/handlers/merkle_feeds/get_merkle_proof.rs @@ -35,13 +35,12 @@ pub struct GetMerkleProofResponse(pub MerkleProof); GetMerkleProofQuery ), )] -#[tracing::instrument] +#[tracing::instrument(skip(state))] pub async fn get_merkle_feeds_proof( State(state): State, PathExtractor(option_hex_hash): PathExtractor, Query(params): Query, ) -> Result, MerkleFeedError> { - tracing::info!("Received get merkle proof request"); if state.redis_client.is_none() { return Err(MerkleFeedError::RedisConnection); } diff --git a/pragma-node/src/handlers/merkle_feeds/get_option.rs b/pragma-node/src/handlers/merkle_feeds/get_option.rs index e4ffdc2b..f1bcd6b2 100644 --- a/pragma-node/src/handlers/merkle_feeds/get_option.rs +++ b/pragma-node/src/handlers/merkle_feeds/get_option.rs @@ -38,16 +38,12 @@ pub struct GetOptionResponse { GetOptionQuery ), )] -#[tracing::instrument] +#[tracing::instrument(skip(state))] pub async fn get_merkle_feeds_option( State(state): State, PathExtractor(instrument): PathExtractor, Query(params): Query, ) -> Result, MerkleFeedError> { - tracing::info!( - "Received get option request for instrument {:?}", - instrument - ); if state.redis_client.is_none() { return Err(MerkleFeedError::RedisConnection); } diff --git a/pragma-node/src/handlers/onchain/get_checkpoints.rs b/pragma-node/src/handlers/onchain/get_checkpoints.rs index c0f4ac62..322870d9 100644 --- a/pragma-node/src/handlers/onchain/get_checkpoints.rs +++ b/pragma-node/src/handlers/onchain/get_checkpoints.rs @@ -53,14 +53,12 @@ pub struct GetOnchainCheckpointsResponse(pub Vec); GetOnchainCheckpointsParams ), )] -#[tracing::instrument] +#[tracing::instrument(skip(state))] pub async fn get_onchain_checkpoints( State(state): State, PathExtractor(pair): PathExtractor<(String, String)>, Query(params): Query, ) -> Result, CheckpointError> { - tracing::info!("Received get onchain entry request for pair {:?}", pair); - let pair_id: String = currency_pair_to_pair_id(&pair.0, &pair.1); let limit = params.limit.unwrap_or(DEFAULT_LIMIT); diff --git a/pragma-node/src/handlers/onchain/get_entry.rs b/pragma-node/src/handlers/onchain/get_entry.rs index 706a6751..ab72a605 100644 --- a/pragma-node/src/handlers/onchain/get_entry.rs +++ b/pragma-node/src/handlers/onchain/get_entry.rs @@ -59,13 +59,12 @@ pub struct GetOnchainEntryResponse { GetOnchainEntryParams ), )] -#[tracing::instrument] +#[tracing::instrument(skip(state))] pub async fn get_onchain_entry( State(state): State, PathExtractor(pair): PathExtractor<(String, String)>, Query(params): Query, ) -> Result, EntryError> { - tracing::info!("Received get onchain entry request for pair {:?}", pair); let pair_id: String = currency_pair_to_pair_id(&pair.0, &pair.1); let with_components = params.components.unwrap_or(true); let with_variations = params.variations.unwrap_or(true); diff --git a/pragma-node/src/handlers/onchain/get_history.rs b/pragma-node/src/handlers/onchain/get_history.rs index 4b299401..b81da4b1 100644 --- a/pragma-node/src/handlers/onchain/get_history.rs +++ b/pragma-node/src/handlers/onchain/get_history.rs @@ -46,13 +46,12 @@ pub struct GetOnchainHistoryResponse(pub Vec); GetOnchainHistoryParams ), )] -#[tracing::instrument] +#[tracing::instrument(skip(state))] pub async fn get_onchain_history( State(state): State, PathExtractor(pair): PathExtractor<(String, String)>, Query(params): Query, ) -> Result, EntryError> { - tracing::info!("Received get onchain history request for pair {:?}", pair); let pair_id: String = currency_pair_to_pair_id(&pair.0, &pair.1); let network = params.network; let timestamp_range = params.timestamp.assert_time_is_valid()?; diff --git a/pragma-node/src/handlers/onchain/get_publishers.rs b/pragma-node/src/handlers/onchain/get_publishers.rs index 787117a1..bd26410d 100644 --- a/pragma-node/src/handlers/onchain/get_publishers.rs +++ b/pragma-node/src/handlers/onchain/get_publishers.rs @@ -53,7 +53,7 @@ pub struct GetOnchainPublishersResponse(pub Vec); GetOnchainPublishersParams ), )] -#[tracing::instrument] +#[tracing::instrument(skip(state))] pub async fn get_onchain_publishers( State(state): State, Query(params): Query, diff --git a/pragma-node/src/handlers/onchain/subscribe_to_ohlc.rs b/pragma-node/src/handlers/onchain/subscribe_to_ohlc.rs index b1fcb2e7..d9866c9f 100644 --- a/pragma-node/src/handlers/onchain/subscribe_to_ohlc.rs +++ b/pragma-node/src/handlers/onchain/subscribe_to_ohlc.rs @@ -25,6 +25,7 @@ pub struct GetOnchainOHLCResponse { pub data: Vec, } +#[tracing::instrument(skip(state, ws), fields(endpoint_name = "subscribe_to_onchain_ohlc"))] pub async fn subscribe_to_onchain_ohlc( ws: WebSocketUpgrade, State(state): State, @@ -36,6 +37,13 @@ pub async fn subscribe_to_onchain_ohlc( /// Interval in milliseconds that the channel will update the client with the latest prices. const CHANNEL_UPDATE_INTERVAL_IN_MS: u64 = 30000; // 30 seconds +#[tracing::instrument( + skip(socket, app_state), + fields( + subscriber_id, + client_ip = %client_addr.ip() + ) +)] async fn create_new_subscriber(socket: WebSocket, app_state: AppState, client_addr: SocketAddr) { let (mut subscriber, _) = match Subscriber::::new( "subscribe_to_ohlc".into(), @@ -69,6 +77,15 @@ async fn create_new_subscriber(socket: WebSocket, app_state: AppState, client_ad struct WsOHLCHandler; impl ChannelHandler for WsOHLCHandler { + #[tracing::instrument( + skip(self, subscriber), + fields( + subscriber_id = %subscriber.id, + network = ?subscription.network, + pair = %subscription.pair, + interval = ?subscription.interval + ) + )] async fn handle_client_msg( &mut self, subscriber: &mut Subscriber, @@ -107,6 +124,13 @@ impl ChannelHandler for WsOH Ok(()) } + #[tracing::instrument( + skip(self, subscriber), + fields( + subscriber_id = %subscriber.id + ), + err(Debug) + )] async fn periodic_interval( &mut self, subscriber: &mut Subscriber, @@ -180,6 +204,15 @@ impl WsOHLCHandler { Ok(()) } + #[tracing::instrument( + skip(self, subscriber, message), + fields( + subscriber_id = %subscriber.id, + ip = %subscriber.ip_address, + msg_len = message.len() + ) + )] + async fn check_rate_limit( &self, subscriber: &mut Subscriber, @@ -192,7 +225,7 @@ impl WsOHLCHandler { NonZeroU32::new(message.len().try_into()?).ok_or(InfraError::InternalServerError)?, ) != Ok(Ok(())) { - tracing::info!( + tracing::warn!( subscriber_id = %subscriber.id, ip = %ip_addr, "Rate limit exceeded. Closing connection.", diff --git a/pragma-node/src/handlers/subscribe_to_entry.rs b/pragma-node/src/handlers/subscribe_to_entry.rs index 582a7af0..a4a21d8b 100644 --- a/pragma-node/src/handlers/subscribe_to_entry.rs +++ b/pragma-node/src/handlers/subscribe_to_entry.rs @@ -44,7 +44,7 @@ pub struct SubscribeToEntryResponse { pub timestamp: UnixTimestamp, } -#[tracing::instrument] +#[tracing::instrument(skip(state, ws), fields(endpoint_name = "subscribe_to_entry"))] pub async fn subscribe_to_entry( ws: WebSocketUpgrade, State(state): State, @@ -59,6 +59,13 @@ pub async fn subscribe_to_entry( /// Interval in milliseconds that the channel will update the client with the latest prices. const CHANNEL_UPDATE_INTERVAL_IN_MS: u64 = 500; +#[tracing::instrument( + skip(socket, app_state), + fields( + subscriber_id, + client_ip = %client_addr.ip() + ) +)] async fn create_new_subscriber(socket: WebSocket, app_state: AppState, client_addr: SocketAddr) { let (mut subscriber, _) = match Subscriber::::new( "subscribe_to_entry".into(), @@ -92,6 +99,14 @@ async fn create_new_subscriber(socket: WebSocket, app_state: AppState, client_ad struct WsEntriesHandler; impl ChannelHandler for WsEntriesHandler { + #[tracing::instrument( + skip(self, subscriber), + fields( + subscriber_id = %subscriber.id, + msg_type = ?request.msg_type, + pairs = ?request.pairs + ) + )] async fn handle_client_msg( &mut self, subscriber: &mut Subscriber, @@ -129,6 +144,12 @@ impl ChannelHandler for WsEn Ok(()) } + #[tracing::instrument( + skip(self, subscriber), + fields( + subscriber_id = %subscriber.id + ) + )] async fn periodic_interval( &mut self, subscriber: &mut Subscriber, @@ -162,6 +183,13 @@ impl ChannelHandler for WsEn impl WsEntriesHandler { /// Get the current median entries for the subscribed pairs and sign them as Pragma. + #[tracing::instrument( + skip(self, state, subscription), + fields( + spot_pairs = ?subscription.get_subscribed_spot_pairs().len(), + perp_pairs = ?subscription.get_subscribed_perp_pairs().len() + ) + )] async fn get_subscribed_pairs_medians( &self, state: &AppState, @@ -202,6 +230,7 @@ impl WsEntriesHandler { } /// Get index & mark prices for the subscribed pairs. + #[tracing::instrument(skip(self, state, subscription))] async fn get_all_entries( &self, state: &AppState, diff --git a/pragma-node/src/handlers/subscribe_to_price.rs b/pragma-node/src/handlers/subscribe_to_price.rs index 71377a1c..5adc05d4 100644 --- a/pragma-node/src/handlers/subscribe_to_price.rs +++ b/pragma-node/src/handlers/subscribe_to_price.rs @@ -32,7 +32,7 @@ pub struct SubscribeToPriceResponse { pub timestamp: UnixTimestamp, } -#[tracing::instrument] +#[tracing::instrument(skip(state, ws), fields(endpoint_name = "subscribe_to_price"))] pub async fn subscribe_to_price( ws: WebSocketUpgrade, State(state): State, @@ -44,6 +44,13 @@ pub async fn subscribe_to_price( /// Interval in milliseconds that the channel will update the client with the latest prices. const CHANNEL_UPDATE_INTERVAL_IN_MS: u64 = 500; +#[tracing::instrument( + skip(socket, app_state), + fields( + subscriber_id, + client_ip = %client_addr.ip() + ) +)] async fn create_new_subscriber(socket: WebSocket, app_state: AppState, client_addr: SocketAddr) { let (mut subscriber, _) = match Subscriber::::new( "subscribe_to_price".into(), @@ -77,6 +84,14 @@ async fn create_new_subscriber(socket: WebSocket, app_state: AppState, client_ad struct WsEntriesHandler; impl ChannelHandler for WsEntriesHandler { + #[tracing::instrument( + skip(self, subscriber), + fields( + subscriber_id = %subscriber.id, + request_type = ?request.msg_type, + pairs_count = request.pairs.len() + ) + )] async fn handle_client_msg( &mut self, subscriber: &mut Subscriber, @@ -112,6 +127,12 @@ impl ChannelHandler for WsEn Ok(()) } + #[tracing::instrument( + skip(self, subscriber), + fields( + subscriber_id = %subscriber.id + ) + )] async fn periodic_interval( &mut self, subscriber: &mut Subscriber, @@ -145,6 +166,12 @@ impl ChannelHandler for WsEn impl WsEntriesHandler { /// Get the current median entries for the subscribed pairs and sign them as Pragma. + #[tracing::instrument( + skip(self, state, subscription), + fields( + subscribed_pairs = ?subscription.get_subscribed_spot_pairs().len() + ) + )] async fn get_subscribed_pairs_medians( &self, state: &AppState, @@ -170,6 +197,7 @@ impl WsEntriesHandler { } /// Get index & mark prices for the subscribed pairs. + #[tracing::instrument(skip(self, state, subscription))] async fn get_all_entries( &self, state: &AppState, diff --git a/pragma-node/src/main.rs b/pragma-node/src/main.rs index f99bdc7f..eb78fa09 100644 --- a/pragma-node/src/main.rs +++ b/pragma-node/src/main.rs @@ -29,7 +29,6 @@ pub struct AppState { offchain_pool: Pool, onchain_pool: Pool, // Redis connection - #[allow(dead_code)] redis_client: Option>, // Database caches caches: Arc, @@ -55,6 +54,7 @@ impl fmt::Debug for AppState { async fn main() -> Result<(), Box> { dotenv().ok(); + // We export our telemetry - so we can monitor the API through Signoz. let otel_endpoint = std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT") .unwrap_or_else(|_| "http://signoz.dev.pragma.build:4317".to_string()); pragma_common::telemetry::init_telemetry("pragma-node".into(), otel_endpoint, None)?; @@ -83,7 +83,6 @@ async fn main() -> Result<(), Box> { // Init the redis client - Optionnal, only for endpoints that interact with Redis, // i.e just the Merkle Feeds endpoint for now. - // TODO(akhercha): See with Hithem for production mode let redis_client = match pragma_entities::connection::init_redis_client( config.redis_host(), config.redis_port(), diff --git a/pragma-node/src/server/middlewares.rs b/pragma-node/src/server/middlewares.rs new file mode 100644 index 00000000..47f1f6c6 --- /dev/null +++ b/pragma-node/src/server/middlewares.rs @@ -0,0 +1,29 @@ +use axum::{ + body::Body, + http::{Request, Response}, + middleware::Next, +}; +use std::time::Instant; + +pub async fn track_timing(req: Request, next: Next) -> Response { + let start = Instant::now(); + let route = req.uri().path().to_owned(); + + let response = next.run(req).await; + + let elapsed = start.elapsed(); + tracing::info!("🌐 {} - {:?}", route, elapsed); + + response +} + +#[allow(dead_code)] +pub trait TimingLayer { + fn with_timing(self) -> Self; +} + +impl TimingLayer for axum::Router { + fn with_timing(self) -> Self { + self.layer(axum::middleware::from_fn(track_timing)) + } +} diff --git a/pragma-node/src/server/mod.rs b/pragma-node/src/server/mod.rs index dc49d13b..116a9077 100644 --- a/pragma-node/src/server/mod.rs +++ b/pragma-node/src/server/mod.rs @@ -1,3 +1,4 @@ +pub(crate) mod middlewares; pub(crate) mod routes; use axum_tracing_opentelemetry::middleware::{OtelAxumLayer, OtelInResponseLayer}; @@ -13,6 +14,7 @@ use utoipa::{ use utoipauto::utoipauto; use crate::errors::internal_error; +use crate::server::middlewares::TimingLayer; use crate::{config::Config, server::routes::app_router, AppState}; struct SecurityAddon; @@ -64,6 +66,7 @@ pub async fn run_api_server(config: &Config, state: AppState) { let app = app_router::(state.clone()) .with_state(state) + .with_timing() // Logging so we can see whats going on .layer(OtelAxumLayer::default()) .layer(OtelInResponseLayer) diff --git a/pragma-node/src/types/pricer.rs b/pragma-node/src/types/pricer.rs index 228e22ff..d60e4609 100644 --- a/pragma-node/src/types/pricer.rs +++ b/pragma-node/src/types/pricer.rs @@ -28,6 +28,10 @@ impl Pricer for IndexPricer { Self { pairs, pair_type } } + #[tracing::instrument(skip(self, db_pool), fields( + pairs_count = self.pairs.len(), + pair_type = ?self.pair_type + ))] async fn compute(&self, db_pool: &Pool) -> Result, EntryError> { if self.pairs.is_empty() { return Ok(vec![]); @@ -55,6 +59,7 @@ pub struct MarkPricer { impl MarkPricer { /// Builds the stablecoin/USD pairs from the non USD pairs. /// Example: ["BTC/USDT", "ETH/USDT"] -> ["USDT/USD"] + #[tracing::instrument] fn build_stable_to_usd_pairs(non_usd_pairs: &[String]) -> Vec { non_usd_pairs .iter() @@ -63,6 +68,7 @@ impl MarkPricer { } /// Computes the stablecoin/USD pairs median entries. + #[tracing::instrument(skip(db_pool))] async fn get_stablecoins_index_entries( db_pool: &Pool, stablecoin_pairs: &[String], @@ -74,6 +80,7 @@ impl MarkPricer { /// Retrieves the number of decimals for quote stablecoins. /// Example: ["BTC/USDT", "ETH/USDT"] -> {"USDT": 6} + #[tracing::instrument(skip(db_pool))] async fn get_stablecoins_decimals( db_pool: &Pool, stablecoin_pairs: Vec, @@ -98,6 +105,7 @@ impl MarkPricer { } /// Computes the non USD quoted pairs median entries. + #[tracing::instrument(skip(db_pool), fields(pairs_count = pairs.len()))] async fn get_pairs_entries( db_pool: &Pool, pairs: &[String], @@ -110,6 +118,7 @@ impl MarkPricer { /// Given the median price of a perp pair, the median price of the spot /// stablecoin/USD pair and the number of decimals of the stablecoin, computes /// the mark price. + #[tracing::instrument] fn compute_mark_price( perp_pair_price: &BigDecimal, spot_usd_price: &BigDecimal, @@ -126,6 +135,13 @@ impl MarkPricer { /// Builds the complete list of entries from the median price of the spot /// stablecoin/USD pairs and the median price of the perp pairs. + #[tracing::instrument( + skip(stablecoins_spot_entries, stablecoins_decimals, pairs_perp_entries), + fields( + spot_entries = stablecoins_spot_entries.len(), + perp_entries = pairs_perp_entries.len() + ) + )] pub fn merge_entries_from( stablecoins_spot_entries: Vec, stablecoins_decimals: HashMap, @@ -171,6 +187,13 @@ impl Pricer for MarkPricer { Self { pairs, pair_type } } + #[tracing::instrument( + skip(self, db_pool), + fields( + pairs_count = self.pairs.len(), + pair_type = ?self.pair_type + ) + )] async fn compute(&self, db_pool: &Pool) -> Result, EntryError> { if self.pairs.is_empty() { return Ok(vec![]);