Skip to content

Commit

Permalink
feat: Better logs integration with Signoz (#146)
Browse files Browse the repository at this point in the history
* dev(better_logs): Better tracing

* dev(better_logs): better logs

* dev(better_logs): logs

* dev(better_logs): minor stuff

* dev(better_logs): Removed AXIOM from readme
  • Loading branch information
akhercha authored Nov 26, 2024
1 parent 9ee34b6 commit 17d546c
Show file tree
Hide file tree
Showing 21 changed files with 162 additions and 60 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ export HOST="0.0.0.0"
export PORT=3000
export METRICS_PORT=8080
export KAFKA_BROKERS=localhost:29092
export AXIOM_TOKEN=xaat- # OPTIONAL
export OTEL_EXPORTER_OTLP_ENDPOINT=localhost:4317
```

Expand Down
2 changes: 1 addition & 1 deletion pragma-ingestor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
}

#[tracing::instrument(skip(pool))]
#[tracing::instrument(skip(pool, payload))]
async fn process_payload(pool: &Pool, payload: Vec<u8>) -> Result<(), Box<dyn std::error::Error>> {
let decoded_payload = String::from_utf8_lossy(&payload);
let is_future_entries = decoded_payload.contains("expiration_timestamp");
Expand Down
15 changes: 1 addition & 14 deletions pragma-node/src/handlers/create_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,12 @@ pub struct CreateEntryResponse {
(status = 401, description = "Unauthorized Publisher", body = EntryError)
)
)]
#[tracing::instrument]
#[tracing::instrument(skip(state))]
pub async fn create_entries(
State(state): State<AppState>,
extract::Json(new_entries): extract::Json<CreateEntryRequest>,
) -> Result<Json<CreateEntryResponse>, EntryError> {
tracing::info!("Received new entries: {:?}", new_entries);

let config = config().await;

if new_entries.entries.is_empty() {
Expand All @@ -77,12 +76,6 @@ pub async fn create_entries(
let public_key = Felt::from_hex(&public_key)
.map_err(|_| EntryError::PublisherError(PublisherError::InvalidKey(public_key)))?;

tracing::info!(
"Retrieved {:?} public key: {:?}",
publisher_name,
public_key
);

// Fetch account address from database
// TODO: Cache it
let account_address = publisher_repository::get(&state.offchain_pool, publisher_name.clone())
Expand All @@ -92,12 +85,6 @@ pub async fn create_entries(
let account_address = Felt::from_hex(&account_address)
.map_err(|_| EntryError::PublisherError(PublisherError::InvalidAddress(account_address)))?;

tracing::info!(
"Retrieved {:?} account address: {:?}",
publisher_name,
account_address
);

let signature = assert_request_signature_is_valid::<CreateEntryRequest, Entry>(
&new_entries,
&account_address,
Expand Down
15 changes: 1 addition & 14 deletions pragma-node/src/handlers/create_future_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,12 @@ pub struct CreateFutureEntryResponse {
(status = 401, description = "Unauthorized Publisher", body = EntryError)
)
)]
#[tracing::instrument]
#[tracing::instrument(skip(state))]
pub async fn create_future_entries(
State(state): State<AppState>,
extract::Json(new_entries): extract::Json<CreateFutureEntryRequest>,
) -> Result<Json<CreateFutureEntryResponse>, EntryError> {
tracing::info!("Received new future entries: {:?}", new_entries);

let config = config().await;

if new_entries.entries.is_empty() {
Expand All @@ -77,12 +76,6 @@ pub async fn create_future_entries(
let public_key = Felt::from_hex(&public_key)
.map_err(|_| EntryError::PublisherError(PublisherError::InvalidKey(public_key)))?;

tracing::info!(
"Retrieved {:?} public key: {:?}",
publisher_name,
public_key
);

// Fetch account address from database
// TODO: Cache it
let account_address = publisher_repository::get(&state.offchain_pool, publisher_name.clone())
Expand All @@ -92,12 +85,6 @@ pub async fn create_future_entries(
let account_address = Felt::from_hex(&account_address)
.map_err(|_| EntryError::PublisherError(PublisherError::InvalidAddress(account_address)))?;

tracing::info!(
"Retrieved {:?} account address: {:?}",
publisher_name,
account_address
);

let signature = assert_request_signature_is_valid::<CreateFutureEntryRequest, FutureEntry>(
&new_entries,
&account_address,
Expand Down
4 changes: 1 addition & 3 deletions pragma-node/src/handlers/get_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,12 @@ pub struct GetEntryResponse {
GetEntryParams,
),
)]
#[tracing::instrument]
#[tracing::instrument(skip(state))]
pub async fn get_entry(
State(state): State<AppState>,
PathExtractor(pair): PathExtractor<(String, String)>,
Query(params): Query<GetEntryParams>,
) -> Result<Json<GetEntryResponse>, EntryError> {
tracing::info!("Received get entry request for pair {:?}", pair);

let is_routing = params.routing.unwrap_or(false);

let routing_params = RoutingParams::try_from(params)?;
Expand Down
4 changes: 1 addition & 3 deletions pragma-node/src/handlers/get_expiries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@ use crate::utils::currency_pair_to_pair_id;
("quote" = String, Path, description = "Quote Asset"),
),
)]
#[tracing::instrument]
#[tracing::instrument(skip(state))]
pub async fn get_expiries(
State(state): State<AppState>,
PathExtractor(pair): PathExtractor<(String, String)>,
) -> Result<Json<Vec<NaiveDateTime>>, EntryError> {
tracing::info!("Received get expiries for pair {:?}", pair);

let pair_id = currency_pair_to_pair_id(&pair.0, &pair.1);

let req_result = entry_repository::get_expiries_list(&state.offchain_pool, pair_id.clone())
Expand Down
3 changes: 1 addition & 2 deletions pragma-node/src/handlers/get_ohlc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,12 @@ pub struct GetOHLCResponse {
GetEntryParams,
),
)]
#[tracing::instrument]
#[tracing::instrument(skip(state))]
pub async fn get_ohlc(
State(state): State<AppState>,
PathExtractor(pair): PathExtractor<(String, String)>,
Query(params): Query<GetEntryParams>,
) -> Result<Json<GetOHLCResponse>, EntryError> {
tracing::info!("Received get entry request for pair {:?}", pair);
// Construct pair id
let pair_id = currency_pair_to_pair_id(&pair.0, &pair.1);

Expand Down
3 changes: 1 addition & 2 deletions pragma-node/src/handlers/get_volatility.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,12 @@ pub struct GetVolatilityResponse {
VolatilityQuery
),
)]
#[tracing::instrument]
#[tracing::instrument(skip(state))]
pub async fn get_volatility(
State(state): State<AppState>,
PathExtractor(pair): PathExtractor<(String, String)>,
Query(volatility_query): Query<VolatilityQuery>,
) -> Result<Json<GetVolatilityResponse>, EntryError> {
tracing::info!("Received get volatility request for pair {:?}", pair);
// Construct pair id
let pair_id = currency_pair_to_pair_id(&pair.0, &pair.1);

Expand Down
3 changes: 1 addition & 2 deletions pragma-node/src/handlers/merkle_feeds/get_merkle_proof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,12 @@ pub struct GetMerkleProofResponse(pub MerkleProof);
GetMerkleProofQuery
),
)]
#[tracing::instrument]
#[tracing::instrument(skip(state))]
pub async fn get_merkle_feeds_proof(
State(state): State<AppState>,
PathExtractor(option_hex_hash): PathExtractor<HexHash>,
Query(params): Query<GetMerkleProofQuery>,
) -> Result<Json<GetMerkleProofResponse>, MerkleFeedError> {
tracing::info!("Received get merkle proof request");
if state.redis_client.is_none() {
return Err(MerkleFeedError::RedisConnection);
}
Expand Down
6 changes: 1 addition & 5 deletions pragma-node/src/handlers/merkle_feeds/get_option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,12 @@ pub struct GetOptionResponse {
GetOptionQuery
),
)]
#[tracing::instrument]
#[tracing::instrument(skip(state))]
pub async fn get_merkle_feeds_option(
State(state): State<AppState>,
PathExtractor(instrument): PathExtractor<String>,
Query(params): Query<GetOptionQuery>,
) -> Result<Json<GetOptionResponse>, MerkleFeedError> {
tracing::info!(
"Received get option request for instrument {:?}",
instrument
);
if state.redis_client.is_none() {
return Err(MerkleFeedError::RedisConnection);
}
Expand Down
4 changes: 1 addition & 3 deletions pragma-node/src/handlers/onchain/get_checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,12 @@ pub struct GetOnchainCheckpointsResponse(pub Vec<Checkpoint>);
GetOnchainCheckpointsParams
),
)]
#[tracing::instrument]
#[tracing::instrument(skip(state))]
pub async fn get_onchain_checkpoints(
State(state): State<AppState>,
PathExtractor(pair): PathExtractor<(String, String)>,
Query(params): Query<GetOnchainCheckpointsParams>,
) -> Result<Json<GetOnchainCheckpointsResponse>, CheckpointError> {
tracing::info!("Received get onchain entry request for pair {:?}", pair);

let pair_id: String = currency_pair_to_pair_id(&pair.0, &pair.1);

let limit = params.limit.unwrap_or(DEFAULT_LIMIT);
Expand Down
3 changes: 1 addition & 2 deletions pragma-node/src/handlers/onchain/get_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,12 @@ pub struct GetOnchainEntryResponse {
GetOnchainEntryParams
),
)]
#[tracing::instrument]
#[tracing::instrument(skip(state))]
pub async fn get_onchain_entry(
State(state): State<AppState>,
PathExtractor(pair): PathExtractor<(String, String)>,
Query(params): Query<GetOnchainEntryParams>,
) -> Result<Json<GetOnchainEntryResponse>, EntryError> {
tracing::info!("Received get onchain entry request for pair {:?}", pair);
let pair_id: String = currency_pair_to_pair_id(&pair.0, &pair.1);
let with_components = params.components.unwrap_or(true);
let with_variations = params.variations.unwrap_or(true);
Expand Down
3 changes: 1 addition & 2 deletions pragma-node/src/handlers/onchain/get_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,12 @@ pub struct GetOnchainHistoryResponse(pub Vec<GetOnchainHistoryEntry>);
GetOnchainHistoryParams
),
)]
#[tracing::instrument]
#[tracing::instrument(skip(state))]
pub async fn get_onchain_history(
State(state): State<AppState>,
PathExtractor(pair): PathExtractor<(String, String)>,
Query(params): Query<GetOnchainHistoryParams>,
) -> Result<Json<GetOnchainHistoryResponse>, EntryError> {
tracing::info!("Received get onchain history request for pair {:?}", pair);
let pair_id: String = currency_pair_to_pair_id(&pair.0, &pair.1);
let network = params.network;
let timestamp_range = params.timestamp.assert_time_is_valid()?;
Expand Down
2 changes: 1 addition & 1 deletion pragma-node/src/handlers/onchain/get_publishers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub struct GetOnchainPublishersResponse(pub Vec<Publisher>);
GetOnchainPublishersParams
),
)]
#[tracing::instrument]
#[tracing::instrument(skip(state))]
pub async fn get_onchain_publishers(
State(state): State<AppState>,
Query(params): Query<GetOnchainPublishersParams>,
Expand Down
35 changes: 34 additions & 1 deletion pragma-node/src/handlers/onchain/subscribe_to_ohlc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct GetOnchainOHLCResponse {
pub data: Vec<OHLCEntry>,
}

#[tracing::instrument(skip(state, ws), fields(endpoint_name = "subscribe_to_onchain_ohlc"))]
pub async fn subscribe_to_onchain_ohlc(
ws: WebSocketUpgrade,
State(state): State<AppState>,
Expand All @@ -36,6 +37,13 @@ pub async fn subscribe_to_onchain_ohlc(
/// Interval in milliseconds that the channel will update the client with the latest prices.
const CHANNEL_UPDATE_INTERVAL_IN_MS: u64 = 30000; // 30 seconds

#[tracing::instrument(
skip(socket, app_state),
fields(
subscriber_id,
client_ip = %client_addr.ip()
)
)]
async fn create_new_subscriber(socket: WebSocket, app_state: AppState, client_addr: SocketAddr) {
let (mut subscriber, _) = match Subscriber::<SubscriptionState>::new(
"subscribe_to_ohlc".into(),
Expand Down Expand Up @@ -69,6 +77,15 @@ async fn create_new_subscriber(socket: WebSocket, app_state: AppState, client_ad
struct WsOHLCHandler;

impl ChannelHandler<SubscriptionState, SubscriptionRequest, InfraError> for WsOHLCHandler {
#[tracing::instrument(
skip(self, subscriber),
fields(
subscriber_id = %subscriber.id,
network = ?subscription.network,
pair = %subscription.pair,
interval = ?subscription.interval
)
)]
async fn handle_client_msg(
&mut self,
subscriber: &mut Subscriber<SubscriptionState>,
Expand Down Expand Up @@ -107,6 +124,13 @@ impl ChannelHandler<SubscriptionState, SubscriptionRequest, InfraError> for WsOH
Ok(())
}

#[tracing::instrument(
skip(self, subscriber),
fields(
subscriber_id = %subscriber.id
),
err(Debug)
)]
async fn periodic_interval(
&mut self,
subscriber: &mut Subscriber<SubscriptionState>,
Expand Down Expand Up @@ -180,6 +204,15 @@ impl WsOHLCHandler {
Ok(())
}

#[tracing::instrument(
skip(self, subscriber, message),
fields(
subscriber_id = %subscriber.id,
ip = %subscriber.ip_address,
msg_len = message.len()
)
)]

async fn check_rate_limit(
&self,
subscriber: &mut Subscriber<SubscriptionState>,
Expand All @@ -192,7 +225,7 @@ impl WsOHLCHandler {
NonZeroU32::new(message.len().try_into()?).ok_or(InfraError::InternalServerError)?,
) != Ok(Ok(()))
{
tracing::info!(
tracing::warn!(
subscriber_id = %subscriber.id,
ip = %ip_addr,
"Rate limit exceeded. Closing connection.",
Expand Down
31 changes: 30 additions & 1 deletion pragma-node/src/handlers/subscribe_to_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub struct SubscribeToEntryResponse {
pub timestamp: UnixTimestamp,
}

#[tracing::instrument]
#[tracing::instrument(skip(state, ws), fields(endpoint_name = "subscribe_to_entry"))]
pub async fn subscribe_to_entry(
ws: WebSocketUpgrade,
State(state): State<AppState>,
Expand All @@ -59,6 +59,13 @@ pub async fn subscribe_to_entry(
/// Interval in milliseconds that the channel will update the client with the latest prices.
const CHANNEL_UPDATE_INTERVAL_IN_MS: u64 = 500;

#[tracing::instrument(
skip(socket, app_state),
fields(
subscriber_id,
client_ip = %client_addr.ip()
)
)]
async fn create_new_subscriber(socket: WebSocket, app_state: AppState, client_addr: SocketAddr) {
let (mut subscriber, _) = match Subscriber::<SubscriptionState>::new(
"subscribe_to_entry".into(),
Expand Down Expand Up @@ -92,6 +99,14 @@ async fn create_new_subscriber(socket: WebSocket, app_state: AppState, client_ad
struct WsEntriesHandler;

impl ChannelHandler<SubscriptionState, SubscriptionRequest, EntryError> for WsEntriesHandler {
#[tracing::instrument(
skip(self, subscriber),
fields(
subscriber_id = %subscriber.id,
msg_type = ?request.msg_type,
pairs = ?request.pairs
)
)]
async fn handle_client_msg(
&mut self,
subscriber: &mut Subscriber<SubscriptionState>,
Expand Down Expand Up @@ -129,6 +144,12 @@ impl ChannelHandler<SubscriptionState, SubscriptionRequest, EntryError> for WsEn
Ok(())
}

#[tracing::instrument(
skip(self, subscriber),
fields(
subscriber_id = %subscriber.id
)
)]
async fn periodic_interval(
&mut self,
subscriber: &mut Subscriber<SubscriptionState>,
Expand Down Expand Up @@ -162,6 +183,13 @@ impl ChannelHandler<SubscriptionState, SubscriptionRequest, EntryError> for WsEn

impl WsEntriesHandler {
/// Get the current median entries for the subscribed pairs and sign them as Pragma.
#[tracing::instrument(
skip(self, state, subscription),
fields(
spot_pairs = ?subscription.get_subscribed_spot_pairs().len(),
perp_pairs = ?subscription.get_subscribed_perp_pairs().len()
)
)]
async fn get_subscribed_pairs_medians(
&self,
state: &AppState,
Expand Down Expand Up @@ -202,6 +230,7 @@ impl WsEntriesHandler {
}

/// Get index & mark prices for the subscribed pairs.
#[tracing::instrument(skip(self, state, subscription))]
async fn get_all_entries(
&self,
state: &AppState,
Expand Down
Loading

0 comments on commit 17d546c

Please sign in to comment.