Skip to content

Commit

Permalink
feat: OTEL telemetry + metrics (#144)
Browse files Browse the repository at this point in the history
* feat(otel_metrics): OTEL first draft

* feat(otel_metrics): Removed unused deps

* feat(otel_metrics): Taplo linters

* feat(otel_metrics): Added env for compose
  • Loading branch information
akhercha authored Nov 25, 2024
1 parent de9824e commit 5505b30
Show file tree
Hide file tree
Showing 44 changed files with 791 additions and 437 deletions.
603 changes: 454 additions & 149 deletions Cargo.lock

Large diffs are not rendered by default.

39 changes: 27 additions & 12 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ authors = ["Pragma Labs <[email protected]>"]
color-eyre = "0.6"
aws-config = { version = "1.5.1", features = ["behavior-version-latest"] }
aws-sdk-secretsmanager = "1.32.0"
axum = { version = "0.6", features = ["macros", "ws", "tokio"] }
axum = { version = "0.7.7", features = ["macros", "ws", "tokio"] }
axum-macros = "0.3"
cainome = { git = "https://github.com/cartridge-gg/cainome", tag = "v0.4.5", features = [
"abigen-rs",
"abigen-rs",
] }
diesel = { version = "2.1", features = [
"postgres",
Expand All @@ -42,8 +42,7 @@ chrono = { version = "0.4.26", features = ["serde"] }
lazy_static = "1.4.0"
serde = { version = "1.0.204", features = ["derive"] }
moka = { version = "0.12", features = ["future"] }
opentelemetry = { version = "0.22" }
prometheus = "0.13.4"
opentelemetry = { version = "0.26.0", features = ["metrics", "logs"] }
nonzero_ext = { version = "0.3.0" }
serde_json = { version = "1.0.122", features = ["arbitrary_precision"] }
starknet = "0.12.0"
Expand All @@ -58,14 +57,33 @@ strum = { version = "0.26", features = ["derive"] }
tracing = "0.1.4"
tracing-test = "0.2.5"
url = "2.5.0"
tower-http = { version = "0.4.0", features = ["fs", "trace", "cors"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tokio = { version = "1.11.0", features = ["full"] }
tower-http = { version = "0.6.2", features = ["fs", "trace", "cors"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
tokio = { version = "~1.38.0", features = ["full"] }
toml = "0.8.8"
utoipa = { version = "4", features = ["axum_extras", "chrono", "uuid"] }
utoipa = { version = "5.0.0", features = ["axum_extras", "chrono", "uuid"] }
utoipauto = "0.1.14"
utoipa-swagger-ui = { version = "4", features = ["axum"] }
utoipa-swagger-ui = { version = "8.0.3", features = ["axum"] }
uuid = { version = "1.4", features = ["fast-rng", "v4", "serde"] }
init-tracing-opentelemetry = { version = "0.22.0", features = [
"otlp",
"tracing_subscriber_ext",
] }
axum-tracing-opentelemetry = "0.21.1"
opentelemetry-otlp = { version = "0.26.0", features = [
"metrics",
"tonic",
"logs",
] }
tracing-opentelemetry-instrumentation-sdk = "0.21.0"
opentelemetry_sdk = { version = "0.26.0", features = [
"metrics",
"rt-tokio",
"logs",
] }
tracing-opentelemetry = "0.27.0"
opentelemetry-appender-tracing = "0.26.0"
opentelemetry-semantic-conventions = "0.26.0"

pragma-monitoring = { git = "https://github.com/astraly-labs/pragma-monitoring" }

Expand All @@ -80,6 +98,3 @@ testcontainers-modules = { version = "0.9.0", features = [
"http_wait",
] }
pretty_assertions = "1.4.0"

[workspace.patch.crates-io]
quote = { version = "1.0.37" }
1 change: 1 addition & 0 deletions compose.dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ services:
pragma-ingestor-1:
container_name: "pragma-ingestor-1"
environment:
- OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317
- DATABASE_MAX_CONN=25
- BROKERS=pragma-kafka:9092
- TOPIC=pragma-data
Expand Down
9 changes: 8 additions & 1 deletion pragma-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,19 @@ keywords = ["pragma", "sdk", "consumer", "data", "feeds"]
[dependencies]
bigdecimal = { workspace = true, features = ["serde"] }
chrono = { workspace = true }
color-eyre = { workspace = true }
init-tracing-opentelemetry = { workspace = true }
opentelemetry = { workspace = true }
opentelemetry-appender-tracing = { workspace = true }
opentelemetry-otlp = { workspace = true }
opentelemetry-semantic-conventions = { workspace = true }
opentelemetry_sdk = { workspace = true }
serde = { workspace = true, features = ["derive"] }
starknet = { workspace = true }
strum = { workspace = true, features = ["derive"] }
thiserror = { workspace = true }
tracing = { workspace = true }
tracing-axiom = "0.7"
tracing-opentelemetry = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
utoipa = { workspace = true }

Expand Down
2 changes: 1 addition & 1 deletion pragma-common/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub mod errors;
pub mod hash;
pub mod tracing;
pub mod telemetry;
pub mod types;
pub mod utils;
112 changes: 112 additions & 0 deletions pragma-common/src/telemetry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
use color_eyre::eyre::Result;
use init_tracing_opentelemetry::tracing_subscriber_ext::build_otel_layer;
use opentelemetry::trace::TracerProvider;
use opentelemetry::{global, KeyValue};
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::logs::{BatchConfig, LoggerProvider};
use opentelemetry_sdk::metrics::reader::DefaultTemporalitySelector;
use opentelemetry_sdk::metrics::{MeterProviderBuilder, PeriodicReader};
use opentelemetry_sdk::{runtime, trace::BatchConfigBuilder};
use opentelemetry_sdk::{
trace::{Config, Tracer},
Resource,
};
use opentelemetry_semantic_conventions::resource::SERVICE_NAME;
use tracing::level_filters::LevelFilter;
use tracing::Level;
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;

pub fn init_telemetry(
app_name: String,
collection_endpoint: String,
log_level: Option<Level>,
) -> Result<()> {
let tracing_subscriber = tracing_subscriber::registry()
.with(build_otel_layer()?)
.with(LevelFilter::from_level(log_level.unwrap_or(Level::INFO)))
.with(
tracing_subscriber::fmt::layer()
.with_target(false)
.with_file(false)
.with_line_number(false)
.pretty(),
);

let tracer_provider = init_tracer_provider(&app_name, &collection_endpoint)?;
let logger_provider = init_logs_provider(&app_name, &collection_endpoint)?;
init_meter_provider(&app_name, &collection_endpoint)?;

tracing_subscriber
.with(OpenTelemetryLayer::new(tracer_provider))
.with(OpenTelemetryTracingBridge::new(&logger_provider))
.init();

Ok(())
}

fn init_tracer_provider(app_name: &str, collection_endpoint: &str) -> Result<Tracer> {
let provider = opentelemetry_otlp::new_pipeline()
.tracing()
.with_batch_config(BatchConfigBuilder::default().build())
.with_trace_config(
Config::default().with_resource(Resource::new(vec![KeyValue::new(
SERVICE_NAME,
format!("{app_name}-trace-service"),
)])),
)
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(collection_endpoint),
)
.install_batch(runtime::Tokio)
.expect("Failed to install tracer provider");

global::set_tracer_provider(provider.clone());
Ok(provider.tracer(format!("{app_name}-subscriber")))
}

fn init_logs_provider(app_name: &str, collection_endpoint: &str) -> Result<LoggerProvider> {
let logger = opentelemetry_otlp::new_pipeline()
.logging()
.with_batch_config(BatchConfig::default())
.with_resource(Resource::new(vec![KeyValue::new(
SERVICE_NAME,
format!("{app_name}-logs-service"),
)]))
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(collection_endpoint),
)
.install_batch(runtime::Tokio)?;

Ok(logger)
}

pub fn init_meter_provider(app_name: &str, collection_endpoint: &str) -> Result<()> {
let exporter = opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(collection_endpoint)
.build_metrics_exporter(Box::new(DefaultTemporalitySelector::new()))?;

let reader = PeriodicReader::builder(exporter, runtime::Tokio)
.with_interval(std::time::Duration::from_secs(5))
.build();

let metrics_provider = MeterProviderBuilder::default()
.with_reader(reader)
.with_resource(Resource::new(vec![KeyValue::new(
SERVICE_NAME,
format!("{app_name}-meter-service"),
)]))
.build();

// Set the global meter provider
global::set_meter_provider(metrics_provider);

Ok(())
}
41 changes: 0 additions & 41 deletions pragma-common/src/tracing.rs

This file was deleted.

2 changes: 1 addition & 1 deletion pragma-entities/src/models/entries/entry_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub enum VolatilityError {
InvalidTimestampsRange(u64, u64),
}

#[derive(Debug, thiserror::Error)]
#[derive(Debug, thiserror::Error, ToSchema)]
pub enum SigningError {
#[error("Invalid message: {0}")]
InvalidMessageError(String),
Expand Down
9 changes: 8 additions & 1 deletion pragma-ingestor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,14 @@ mod error;
#[tracing::instrument]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let _ = dotenv(); // .env file is not present in prod
pragma_common::tracing::init_tracing("pragma-ingestor")?;

// TODO: OTEL_EXPORTER_OTLP_ENDPOINT should be read from env.
pragma_common::telemetry::init_telemetry(
"pragma-ingestor".into(),
"http://localhost:4317".into(),
None,
)?;

info!(
"kafka configuration : hostname={:?}, group_id={}, topic={}",
config::CONFIG.brokers,
Expand Down
9 changes: 4 additions & 5 deletions pragma-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ aws-config = { workspace = true, features = ["behavior-version-latest"] }
aws-sdk-secretsmanager = { workspace = true }
axum = { workspace = true, features = ["macros", "ws", "tokio"] }
axum-macros = { workspace = true }
axum-tracing-opentelemetry = { workspace = true }
bigdecimal = { workspace = true, features = ["serde"] }
cainome = { workspace = true, features = ["abigen-rs"] }
chrono = { workspace = true, features = ["serde"] }
Expand All @@ -28,8 +29,9 @@ lazy_static = { workspace = true }
moka = { workspace = true, features = ["future"] }
nonzero_ext = { workspace = true }
opentelemetry = { workspace = true }
pragma-common = { path = "../pragma-common" }
pragma-entities = { path = "../pragma-entities" }
pragma-monitoring = { workspace = true }
prometheus = { workspace = true }
rdkafka = { workspace = true }
redis = { workspace = true, features = ["tokio-comp", "json"] }
serde = { workspace = true, features = ["derive"] }
Expand All @@ -43,11 +45,8 @@ tower-http = { workspace = true, features = ["fs", "trace", "cors"] }
tracing = { workspace = true }
utoipa = { workspace = true }
utoipa-swagger-ui = { workspace = true, features = ["axum"] }
uuid = { workspace = true, features = ["fast-rng", "v4", "serde"] }

pragma-common = { path = "../pragma-common" }
pragma-entities = { path = "../pragma-entities" }
utoipauto = { workspace = true }
uuid = { workspace = true, features = ["fast-rng", "v4", "serde"] }

[dev-dependencies]
rstest = { workspace = true }
2 changes: 1 addition & 1 deletion pragma-node/src/caches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::infra::repositories::onchain_repository::publisher::RawPublisherUpdat
/// Structure responsible of holding our Databases caches.
/// All the caches are initialized empty with their associated time to live in the
/// constants module.
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct CacheRegistry {
onchain_publishers_updates: Cache<String, HashMap<String, RawPublisherUpdates>>,
merkle_feed_tree: Cache<u64, MerkleTree>,
Expand Down
6 changes: 0 additions & 6 deletions pragma-node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@ use tokio::sync::OnceCell;
pub struct ServerConfig {
host: String,
port: u16,
metrics_port: u16,
}

impl Default for ServerConfig {
fn default() -> Self {
Self {
host: "0.0.0.0".to_string(),
port: 3000,
metrics_port: 8080,
}
}
}
Expand Down Expand Up @@ -80,10 +78,6 @@ impl Config {
self.server.port
}

pub fn metrics_port(&self) -> u16 {
self.server.metrics_port
}

pub fn kafka_topic(&self) -> &str {
&self.kafka.topic
}
Expand Down
7 changes: 4 additions & 3 deletions pragma-node/src/handlers/create_entry.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use axum::extract::State;
use axum::extract::{self, State};
use axum::Json;
use chrono::{DateTime, Utc};
use pragma_entities::{EntryError, NewEntry, PublisherError};
Expand All @@ -10,7 +10,7 @@ use crate::config::config;
use crate::infra::kafka;
use crate::infra::repositories::publisher_repository;
use crate::types::entries::Entry;
use crate::utils::{assert_request_signature_is_valid, felt_from_decimal, JsonExtractor};
use crate::utils::{assert_request_signature_is_valid, felt_from_decimal};
use crate::AppState;

#[derive(Debug, Serialize, Deserialize, ToSchema)]
Expand Down Expand Up @@ -47,9 +47,10 @@ pub struct CreateEntryResponse {
(status = 401, description = "Unauthorized Publisher", body = EntryError)
)
)]
#[tracing::instrument]
pub async fn create_entries(
State(state): State<AppState>,
JsonExtractor(new_entries): JsonExtractor<CreateEntryRequest>,
extract::Json(new_entries): extract::Json<CreateEntryRequest>,
) -> Result<Json<CreateEntryResponse>, EntryError> {
tracing::info!("Received new entries: {:?}", new_entries);

Expand Down
Loading

0 comments on commit 5505b30

Please sign in to comment.