diff --git a/Cargo.lock b/Cargo.lock index 70bc0593..a538093e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -107,6 +107,15 @@ version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" +[[package]] +name = "arbitrary" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dde20b3d026af13f561bdd0f15edf01fc734f0dafcedbaf42bba506a9517f223" +dependencies = [ + "derive_arbitrary", +] + [[package]] name = "arc-swap" version = "1.7.1" @@ -748,9 +757,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" dependencies = [ "async-trait", - "axum-core", - "axum-macros", - "base64 0.21.7", + "axum-core 0.3.4", + "axum-macros 0.3.8", "bitflags 1.3.2", "bytes", "futures-util", @@ -769,13 +777,49 @@ dependencies = [ "serde_json", "serde_path_to_error", "serde_urlencoded", - "sha1", "sync_wrapper 0.1.2", "tokio", + "tower 0.4.13", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" +dependencies = [ + "async-trait", + "axum-core 0.4.5", + "axum-macros 0.4.2", + "base64 0.22.1", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.4.1", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sha1", + "sync_wrapper 1.0.1", + "tokio", "tokio-tungstenite", - "tower", + "tower 0.5.1", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -795,6 +839,27 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper 1.0.1", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "axum-macros" version = "0.3.8" @@ -807,6 +872,35 @@ dependencies = [ "syn 2.0.87", ] +[[package]] +name = "axum-macros" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57d123550fa8d071b7255cb0cc04dc302baa6c8c4a79f55701552684d8399bce" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", +] + +[[package]] +name = "axum-tracing-opentelemetry" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7355023f282d1ef4e97cf8ce948a500f989736108e4cfde38363ce71770e774" +dependencies = [ + "axum 0.7.9", + "futures-core", + "futures-util", + "http 1.1.0", + "opentelemetry 0.25.0", + "pin-project-lite", + "tower 0.5.1", + "tracing", + "tracing-opentelemetry 0.26.0", + "tracing-opentelemetry-instrumentation-sdk", +] + [[package]] name = "backtrace" version = "0.3.71" @@ -1029,9 +1123,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.6.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" +checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da" [[package]] name = "bytes-utils" @@ -1274,6 +1368,33 @@ dependencies = [ "cc", ] +[[package]] +name = "color-eyre" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55146f5e46f237f7423d74111267d4597b59b0dad0ffaf7303bce9945d843ad5" +dependencies = [ + "backtrace", + "color-spantrace", + "eyre", + "indenter", + "once_cell", + "owo-colors", + "tracing-error", +] + +[[package]] +name = "color-spantrace" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd6be1b2a7e382e2b98b43b2adcca6bb0e465af0bdd38123873ae61eb17a72c2" +dependencies = [ + "once_cell", + "owo-colors", + "tracing-core", + "tracing-error", +] + [[package]] name = "colorchoice" version = "1.0.3" @@ -1543,6 +1664,17 @@ dependencies = [ "serde", ] +[[package]] +name = "derive_arbitrary" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30542c1ad912e0e3d22a1935c290e12e8a29d704a420177a31faad4a601a0800" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", +] + [[package]] name = "diesel" version = "2.1.6" @@ -1684,6 +1816,17 @@ dependencies = [ "winapi", ] +[[package]] +name = "displaydoc" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", +] + [[package]] name = "docker_credential" version = "1.3.1" @@ -1874,6 +2017,16 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "eyre" +version = "0.6.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cd915d99f24784cdc19fd37ef22b97e3ff0ae756c7e492e9fbfe897d61e2aec" +dependencies = [ + "indenter", + "once_cell", +] + [[package]] name = "fallible-iterator" version = "0.2.0" @@ -2367,9 +2520,9 @@ dependencies = [ [[package]] name = "http-range-header" -version = "0.3.1" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "add0ab9360ddbd88cfeb3bd9574a1d85cfdfa14db10b3e21d3700dbc4328758f" +checksum = "08a397c49fec283e3d6211adbe480be95aae5f304cfb923e9970e08956d5168a" [[package]] name = "httparse" @@ -2516,14 +2669,15 @@ dependencies = [ [[package]] name = "hyper-timeout" -version = "0.4.1" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" dependencies = [ - "hyper 0.14.29", + "hyper 1.4.1", + "hyper-util", "pin-project-lite", "tokio", - "tokio-io-timeout", + "tower-service", ] [[package]] @@ -2557,9 +2711,9 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.6" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ab92f4f49ee4fb4f997c784b7a2e0fa70050211e0b6a287f898c3c9785ca956" +checksum = "df2dcfbe0677734ab2f3ffa7fa7bfd4706bfdc1ef393f2ee30184aed67e631b4" dependencies = [ "bytes", "futures-channel", @@ -2570,7 +2724,6 @@ dependencies = [ "pin-project-lite", "socket2 0.5.7", "tokio", - "tower", "tower-service", "tracing", ] @@ -2677,6 +2830,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "indenter" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce23b50ad8242c51a442f3ff322d56b02f08852c77e4c0b4d3fd684abc89c683" + [[package]] name = "indexmap" version = "1.9.3" @@ -2699,6 +2858,22 @@ dependencies = [ "serde", ] +[[package]] +name = "init-tracing-opentelemetry" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd320e65afe3181853835785cf02c43c3609fb8eff0193b69be150855e48e88b" +dependencies = [ + "opentelemetry 0.25.0", + "opentelemetry-otlp 0.25.0", + "opentelemetry-semantic-conventions 0.25.0", + "opentelemetry_sdk 0.25.0", + "thiserror", + "tracing", + "tracing-opentelemetry 0.26.0", + "tracing-subscriber", +] + [[package]] name = "inout" version = "0.1.3" @@ -3003,6 +3178,12 @@ dependencies = [ "scopeguard", ] +[[package]] +name = "lockfree-object-pool" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9374ef4228402d4b7e403e5838cb880d9ee663314b0a900d5a6aabf0c213552e" + [[package]] name = "log" version = "0.4.21" @@ -3350,9 +3531,9 @@ dependencies = [ [[package]] name = "opentelemetry" -version = "0.22.0" +version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "900d57987be3f2aeb70d385fff9b27fb74c5723cc9a52d904d4f9c807a0667bf" +checksum = "803801d3d3b71cd026851a53f974ea03df3d179cb758b260136a6c9e22e196af" dependencies = [ "futures-core", "futures-sink", @@ -3360,38 +3541,80 @@ dependencies = [ "once_cell", "pin-project-lite", "thiserror", - "urlencoding", +] + +[[package]] +name = "opentelemetry" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "570074cc999d1a58184080966e5bd3bf3a9a4af650c3b05047c2621e7405cd17" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", +] + +[[package]] +name = "opentelemetry-appender-tracing" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14f62d9a23c680ab91c74605f5006110768eb67600bb654937fef5c852fb8ec7" +dependencies = [ + "opentelemetry 0.26.0", + "tracing", + "tracing-core", + "tracing-subscriber", ] [[package]] name = "opentelemetry-http" -version = "0.11.1" +version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7690dc77bf776713848c4faa6501157469017eaf332baccd4eb1cea928743d94" +checksum = "88d8c2b76e5f7848a289aa9666dbe56b16f8a22a4c5246ef37a14941818d2913" dependencies = [ "async-trait", "bytes", - "http 0.2.12", - "opentelemetry", - "reqwest 0.11.27", + "http 1.1.0", + "opentelemetry 0.25.0", + "reqwest 0.12.5", ] [[package]] name = "opentelemetry-otlp" -version = "0.15.0" +version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a016b8d9495c639af2145ac22387dcb88e44118e45320d9238fbf4e7889abcb" +checksum = "596b1719b3cab83addb20bcbffdf21575279d9436d9ccccfe651a3bf0ab5ab06" dependencies = [ "async-trait", "futures-core", - "http 0.2.12", - "opentelemetry", + "http 1.1.0", + "opentelemetry 0.25.0", "opentelemetry-http", - "opentelemetry-proto", - "opentelemetry-semantic-conventions 0.14.0", - "opentelemetry_sdk", + "opentelemetry-proto 0.25.0", + "opentelemetry_sdk 0.25.0", + "prost", + "reqwest 0.12.5", + "thiserror", + "tokio", + "tonic", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29e1f9c8b032d4f635c730c0efcf731d5e2530ea13fa8bef7939ddc8420696bd" +dependencies = [ + "async-trait", + "futures-core", + "http 1.1.0", + "opentelemetry 0.26.0", + "opentelemetry-proto 0.26.1", + "opentelemetry_sdk 0.26.0", "prost", - "reqwest 0.11.27", "thiserror", "tokio", "tonic", @@ -3399,43 +3622,53 @@ dependencies = [ [[package]] name = "opentelemetry-proto" -version = "0.5.0" +version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a8fddc9b68f5b80dae9d6f510b88e02396f006ad48cac349411fbecc80caae4" +checksum = "2c43620e8f93359eb7e627a3b16ee92d8585774986f24f2ab010817426c5ce61" dependencies = [ - "opentelemetry", - "opentelemetry_sdk", + "opentelemetry 0.25.0", + "opentelemetry_sdk 0.25.0", + "prost", + "tonic", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9d3968ce3aefdcca5c27e3c4ea4391b37547726a70893aab52d3de95d5f8b34" +dependencies = [ + "opentelemetry 0.26.0", + "opentelemetry_sdk 0.26.0", "prost", "tonic", ] [[package]] name = "opentelemetry-semantic-conventions" -version = "0.14.0" +version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9ab5bd6c42fb9349dcf28af2ba9a0667f697f9bdcca045d39f2cec5543e2910" +checksum = "9b8e442487022a943e2315740e443dc5ee95fd541c18f509a5a6251b408a9f95" [[package]] name = "opentelemetry-semantic-conventions" -version = "0.15.0" +version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1869fb4bb9b35c5ba8a1e40c9b128a7b4c010d07091e864a29da19e4fe2ca4d7" +checksum = "db945c1eaea8ac6a9677185357480d215bb6999faa9f691d0c4d4d641eab7a09" [[package]] name = "opentelemetry_sdk" -version = "0.22.1" +version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e90c7113be649e31e9a0f8b5ee24ed7a16923b322c3c5ab6367469c049d6b7e" +checksum = "e0da0d6b47a3dbc6e9c9e36a0520e25cf943e046843818faaa3f87365a548c82" dependencies = [ "async-trait", - "crossbeam-channel", "futures-channel", "futures-executor", "futures-util", "glob", "once_cell", - "opentelemetry", - "ordered-float", + "opentelemetry 0.25.0", "percent-encoding", "rand", "thiserror", @@ -3444,19 +3677,31 @@ dependencies = [ ] [[package]] -name = "option-ext" -version = "0.2.0" +name = "opentelemetry_sdk" +version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" +checksum = "d2c627d9f4c9cdc1f21a29ee4bfbd6028fcb8bcf2a857b43f3abdf72c9c862f3" +dependencies = [ + "async-trait", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "once_cell", + "opentelemetry 0.26.0", + "percent-encoding", + "rand", + "serde_json", + "thiserror", + "tokio", + "tokio-stream", +] [[package]] -name = "ordered-float" -version = "4.2.2" +name = "option-ext" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a91171844676f8c7990ce64959210cd2eaef32c2612c50f9fae9f8aaa6065a6" -dependencies = [ - "num-traits", -] +checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" [[package]] name = "outref" @@ -3470,6 +3715,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "owo-colors" +version = "3.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f" + [[package]] name = "parity-scale-codec" version = "3.6.12" @@ -3780,13 +4031,20 @@ version = "0.1.0" dependencies = [ "bigdecimal", "chrono", + "color-eyre", + "init-tracing-opentelemetry", + "opentelemetry 0.26.0", + "opentelemetry-appender-tracing", + "opentelemetry-otlp 0.26.0", + "opentelemetry-semantic-conventions 0.26.0", + "opentelemetry_sdk 0.26.0", "rstest", "serde", "starknet 0.12.0", "strum 0.26.3", "thiserror", "tracing", - "tracing-axiom", + "tracing-opentelemetry 0.27.0", "tracing-subscriber", "utoipa", ] @@ -3809,7 +4067,7 @@ dependencies = [ name = "pragma-entities" version = "0.1.0" dependencies = [ - "axum", + "axum 0.7.9", "bigdecimal", "chrono", "deadpool-diesel", @@ -3848,8 +4106,8 @@ version = "0.1.0" source = "git+https://github.com/astraly-labs/pragma-monitoring#9963730469d3858788f6726081f77c7e51a2f7d8" dependencies = [ "arc-swap", - "axum", - "axum-macros", + "axum 0.6.20", + "axum-macros 0.3.8", "bigdecimal", "chrono", "deadpool", @@ -3881,8 +4139,9 @@ version = "0.1.0" dependencies = [ "aws-config", "aws-sdk-secretsmanager", - "axum", - "axum-macros", + "axum 0.7.9", + "axum-macros 0.3.8", + "axum-tracing-opentelemetry", "bigdecimal", "cainome", "chrono", @@ -3896,11 +4155,10 @@ dependencies = [ "lazy_static", "moka", "nonzero_ext", - "opentelemetry", + "opentelemetry 0.26.0", "pragma-common", "pragma-entities", "pragma-monitoring", - "prometheus", "rdkafka", "redis", "rstest", @@ -4027,9 +4285,9 @@ dependencies = [ [[package]] name = "prost" -version = "0.12.6" +version = "0.13.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "deb1435c188b76130da55f17a466d252ff7b1418b2ad3e037d127b94e3411f29" +checksum = "7b0487d90e047de87f984913713b85c601c05609aad5b0df4b4573fbf69aa13f" dependencies = [ "bytes", "prost-derive", @@ -4037,9 +4295,9 @@ dependencies = [ [[package]] name = "prost-derive" -version = "0.12.6" +version = "0.13.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" +checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5" dependencies = [ "anyhow", "itertools", @@ -4521,7 +4779,6 @@ dependencies = [ "proc-macro2", "quote", "rust-embed-utils", - "shellexpand", "syn 2.0.87", "walkdir", ] @@ -5005,15 +5262,6 @@ dependencies = [ "lazy_static", ] -[[package]] -name = "shellexpand" -version = "3.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da03fa3b94cc19e3ebfc88c4229c49d8f08cdbd1228870a45f0ffdf84988e14b" -dependencies = [ - "dirs", -] - [[package]] name = "shlex" version = "1.3.0" @@ -5029,6 +5277,12 @@ dependencies = [ "libc", ] +[[package]] +name = "simd-adler32" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d66dc143e6b11c1eddc06d5c423cfc97062865baf299914ab64caa38182078fe" + [[package]] name = "similar" version = "2.6.0" @@ -5728,16 +5982,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "tokio-io-timeout" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" -dependencies = [ - "pin-project-lite", - "tokio", -] - [[package]] name = "tokio-macros" version = "2.3.0" @@ -5808,9 +6052,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.15" +version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af" +checksum = "4f4e6ce100d0eb49a2734f8c0812bcd324cf357d21810932c5df6b96ef2b86f1" dependencies = [ "futures-core", "pin-project-lite", @@ -5819,9 +6063,9 @@ dependencies = [ [[package]] name = "tokio-tungstenite" -version = "0.20.1" +version = "0.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "212d5dcb2a1ce06d81107c3d0ffa3121fe974b73f068c8282cb1c32328113b6c" +checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9" dependencies = [ "futures-util", "log", @@ -5889,26 +6133,29 @@ dependencies = [ [[package]] name = "tonic" -version = "0.11.0" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76c4eb7a4e9ef9d4763600161f12f5070b92a578e1b634db88a6887844c91a13" +checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" dependencies = [ "async-stream", "async-trait", - "axum", - "base64 0.21.7", + "axum 0.7.9", + "base64 0.22.1", "bytes", - "h2 0.3.26", - "http 0.2.12", - "http-body 0.4.6", - "hyper 0.14.29", + "h2 0.4.5", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.4.1", "hyper-timeout", + "hyper-util", "percent-encoding", "pin-project", "prost", + "socket2 0.5.7", "tokio", "tokio-stream", - "tower", + "tower 0.4.13", "tower-layer", "tower-service", "tracing", @@ -5934,18 +6181,34 @@ dependencies = [ "tracing", ] +[[package]] +name = "tower" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2873938d487c3cfb9aed7546dc9f2711d867c9f90c46b889989a2cb84eba6b4f" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper 0.1.2", + "tokio", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower-http" -version = "0.4.4" +version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140" +checksum = "403fa3b783d4b626a8ad51d766ab03cb6d2dbfc46b1c5d4448395e6628dc9697" dependencies = [ "bitflags 2.5.0", "bytes", - "futures-core", "futures-util", - "http 0.2.12", - "http-body 0.4.6", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", "http-range-header", "httpdate", "mime", @@ -5961,15 +6224,15 @@ dependencies = [ [[package]] name = "tower-layer" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" [[package]] name = "tower-service" -version = "0.3.2" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" +checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" @@ -5994,24 +6257,6 @@ dependencies = [ "syn 2.0.87", ] -[[package]] -name = "tracing-axiom" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2e60db0243130d086d30618bc441be6376521b3361e2befae828cbcfaa276dc" -dependencies = [ - "opentelemetry", - "opentelemetry-otlp", - "opentelemetry-semantic-conventions 0.15.0", - "opentelemetry_sdk", - "reqwest 0.11.27", - "thiserror", - "tracing-core", - "tracing-opentelemetry", - "tracing-subscriber", - "url", -] - [[package]] name = "tracing-core" version = "0.1.32" @@ -6022,6 +6267,16 @@ dependencies = [ "valuable", ] +[[package]] +name = "tracing-error" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d686ec1c0f384b1277f097b2f279a2ecc11afe8c133c1aabf036a27cb4cd206e" +dependencies = [ + "tracing", + "tracing-subscriber", +] + [[package]] name = "tracing-futures" version = "0.2.5" @@ -6045,20 +6300,52 @@ dependencies = [ [[package]] name = "tracing-opentelemetry" -version = "0.23.0" +version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9be14ba1bbe4ab79e9229f7f89fab8d120b865859f10527f31c033e599d2284" +checksum = "5eabc56d23707ad55ba2a0750fc24767125d5a0f51993ba41ad2c441cc7b8dea" dependencies = [ "js-sys", "once_cell", - "opentelemetry", - "opentelemetry_sdk", + "opentelemetry 0.25.0", + "opentelemetry_sdk 0.25.0", + "smallvec", "tracing", "tracing-core", + "tracing-log", "tracing-subscriber", "web-time", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc58af5d3f6c5811462cabb3289aec0093f7338e367e5a33d28c0433b3c7360b" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry 0.26.0", + "opentelemetry_sdk 0.26.0", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", + "web-time", +] + +[[package]] +name = "tracing-opentelemetry-instrumentation-sdk" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "612243becba145498d15c8ff9d41f333ca27b343ee874909d964cc2aed3a013f" +dependencies = [ + "http 1.1.0", + "opentelemetry 0.25.0", + "tracing", + "tracing-opentelemetry 0.26.0", +] + [[package]] name = "tracing-serde" version = "0.1.3" @@ -6125,20 +6412,19 @@ checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" [[package]] name = "tungstenite" -version = "0.20.1" +version = "0.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e3dac10fd62eaf6617d3a904ae222845979aec67c615d1c842b4002c7666fb9" +checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a" dependencies = [ "byteorder", "bytes", "data-encoding", - "http 0.2.12", + "http 1.1.0", "httparse", "log", "rand", "sha1", "thiserror", - "url", "utf-8", ] @@ -6257,9 +6543,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "utoipa" -version = "4.2.3" +version = "5.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5afb1a60e207dca502682537fefcfd9921e71d0b83e9576060f09abc6efab23" +checksum = "514a48569e4e21c86d0b84b5612b5e73c0b2cf09db63260134ba426d4e8ea714" dependencies = [ "indexmap 2.2.6", "serde", @@ -6269,11 +6555,10 @@ dependencies = [ [[package]] name = "utoipa-gen" -version = "4.3.0" +version = "5.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7bf0e16c02bc4bf5322ab65f10ab1149bdbcaa782cba66dc7057370a3f8190be" +checksum = "5629efe65599d0ccd5d493688cbf6e03aa7c1da07fe59ff97cf5977ed0637f66" dependencies = [ - "proc-macro-error", "proc-macro2", "quote", "regex", @@ -6283,16 +6568,17 @@ dependencies = [ [[package]] name = "utoipa-swagger-ui" -version = "4.0.0" +version = "8.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "154517adf0d0b6e22e8e1f385628f14fcaa3db43531dc74303d3edef89d6dfe5" +checksum = "a5c80b4dd79ea382e8374d67dcce22b5c6663fa13a82ad3886441d1bbede5e35" dependencies = [ - "axum", + "axum 0.7.9", "mime_guess", "regex", "rust-embed", "serde", "serde_json", + "url", "utoipa", "zip", ] @@ -6308,9 +6594,9 @@ dependencies = [ [[package]] name = "utoipauto-core" -version = "0.1.12" +version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17e82ab96c5a55263b5bed151b8426410d93aa909a453acdbd4b6792b5af7d64" +checksum = "39449c1c0079e06bca01fd954736a9cd8a1c999540c9c2c404eb74ce63e8eb73" dependencies = [ "proc-macro2", "quote", @@ -6319,9 +6605,9 @@ dependencies = [ [[package]] name = "utoipauto-macro" -version = "0.1.12" +version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86b8338dc3c9526011ffaa2aa6bd60ddfda9d49d2123108690755c6e34844212" +checksum = "d1521871bcd9cb5024e0ec86437e014f8ac8cf36c15a177d4b8f97560a1699fa" dependencies = [ "proc-macro2", "quote", @@ -6798,12 +7084,31 @@ dependencies = [ [[package]] name = "zip" -version = "0.6.6" +version = "2.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "760394e246e4c28189f19d488c058bf16f564016aefac5d32bb1f3b51d5e9261" +checksum = "40dd8c92efc296286ce1fbd16657c5dbefff44f1b4ca01cc5f517d8b7b3d3e2e" dependencies = [ - "byteorder", + "arbitrary", "crc32fast", "crossbeam-utils", + "displaydoc", "flate2", + "indexmap 2.2.6", + "memchr", + "thiserror", + "zopfli", +] + +[[package]] +name = "zopfli" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5019f391bac5cf252e93bbcc53d039ffd62c7bfb7c150414d61369afe57e946" +dependencies = [ + "bumpalo", + "crc32fast", + "lockfree-object-pool", + "log", + "once_cell", + "simd-adler32", ] diff --git a/Cargo.toml b/Cargo.toml index d87fae67..f55f2b10 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,10 +19,10 @@ authors = ["Pragma Labs "] color-eyre = "0.6" aws-config = { version = "1.5.1", features = ["behavior-version-latest"] } aws-sdk-secretsmanager = "1.32.0" -axum = { version = "0.6", features = ["macros", "ws", "tokio"] } +axum = { version = "0.7.7", features = ["macros", "ws", "tokio"] } axum-macros = "0.3" cainome = { git = "https://github.com/cartridge-gg/cainome", tag = "v0.4.5", features = [ - "abigen-rs", + "abigen-rs", ] } diesel = { version = "2.1", features = [ "postgres", @@ -42,8 +42,7 @@ chrono = { version = "0.4.26", features = ["serde"] } lazy_static = "1.4.0" serde = { version = "1.0.204", features = ["derive"] } moka = { version = "0.12", features = ["future"] } -opentelemetry = { version = "0.22" } -prometheus = "0.13.4" +opentelemetry = { version = "0.26.0", features = ["metrics", "logs"] } nonzero_ext = { version = "0.3.0" } serde_json = { version = "1.0.122", features = ["arbitrary_precision"] } starknet = "0.12.0" @@ -58,14 +57,33 @@ strum = { version = "0.26", features = ["derive"] } tracing = "0.1.4" tracing-test = "0.2.5" url = "2.5.0" -tower-http = { version = "0.4.0", features = ["fs", "trace", "cors"] } -tracing-subscriber = { version = "0.3", features = ["env-filter"] } -tokio = { version = "1.11.0", features = ["full"] } +tower-http = { version = "0.6.2", features = ["fs", "trace", "cors"] } +tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } +tokio = { version = "~1.38.0", features = ["full"] } toml = "0.8.8" -utoipa = { version = "4", features = ["axum_extras", "chrono", "uuid"] } +utoipa = { version = "5.0.0", features = ["axum_extras", "chrono", "uuid"] } utoipauto = "0.1.14" -utoipa-swagger-ui = { version = "4", features = ["axum"] } +utoipa-swagger-ui = { version = "8.0.3", features = ["axum"] } uuid = { version = "1.4", features = ["fast-rng", "v4", "serde"] } +init-tracing-opentelemetry = { version = "0.22.0", features = [ + "otlp", + "tracing_subscriber_ext", +] } +axum-tracing-opentelemetry = "0.21.1" +opentelemetry-otlp = { version = "0.26.0", features = [ + "metrics", + "tonic", + "logs", +] } +tracing-opentelemetry-instrumentation-sdk = "0.21.0" +opentelemetry_sdk = { version = "0.26.0", features = [ + "metrics", + "rt-tokio", + "logs", +] } +tracing-opentelemetry = "0.27.0" +opentelemetry-appender-tracing = "0.26.0" +opentelemetry-semantic-conventions = "0.26.0" pragma-monitoring = { git = "https://github.com/astraly-labs/pragma-monitoring" } @@ -80,6 +98,3 @@ testcontainers-modules = { version = "0.9.0", features = [ "http_wait", ] } pretty_assertions = "1.4.0" - -[workspace.patch.crates-io] -quote = { version = "1.0.37" } \ No newline at end of file diff --git a/README.md b/README.md index 0079be4a..fe9f7073 100644 --- a/README.md +++ b/README.md @@ -174,6 +174,7 @@ export PORT=3000 export METRICS_PORT=8080 export KAFKA_BROKERS=localhost:29092 export AXIOM_TOKEN=xaat- # OPTIONAL +export OTEL_EXPORTER_OTLP_ENDPOINT=localhost:4317 ``` ### 4. Start the Pragma Node service: diff --git a/compose.dev.yaml b/compose.dev.yaml index 25abf54d..ac799545 100644 --- a/compose.dev.yaml +++ b/compose.dev.yaml @@ -166,6 +166,7 @@ services: pragma-ingestor-1: container_name: "pragma-ingestor-1" environment: + - OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 - DATABASE_MAX_CONN=25 - BROKERS=pragma-kafka:9092 - TOPIC=pragma-data diff --git a/infra/pragma-ingestor/config/.env.example b/infra/pragma-ingestor/config/.env.example index d43d076e..86c3a176 100644 --- a/infra/pragma-ingestor/config/.env.example +++ b/infra/pragma-ingestor/config/.env.example @@ -2,4 +2,5 @@ OFFCHAIN_DATABASE_URL="postgres://postgres:test-password@timescale-db:5432/pragm ONCHAIN_DATABASE_URL="postgres://postgres:test-password@postgre-db:5432/pragma" BROKERS="pragma-kafka:29092" TOPIC="pragma-data" -GROUP_ID="pragma-data" \ No newline at end of file +GROUP_ID="pragma-data" +OTEL_EXPORTER_OTLP_ENDPOINT=http://signoz.dev.pragma.build:4317 diff --git a/infra/pragma-node/config/.env.example b/infra/pragma-node/config/.env.example index fe5bcf12..559badbd 100644 --- a/infra/pragma-node/config/.env.example +++ b/infra/pragma-node/config/.env.example @@ -8,3 +8,4 @@ METRICS_PORT=8080 KAFKA_BROKERS="pragma-kafka:9092" REDIS_HOST="0.0.0.0" REDIS_PORT=6379 +OTEL_EXPORTER_OTLP_ENDPOINT=http://signoz.dev.pragma.build:4317 diff --git a/pragma-common/Cargo.toml b/pragma-common/Cargo.toml index 54643755..fe823eb7 100644 --- a/pragma-common/Cargo.toml +++ b/pragma-common/Cargo.toml @@ -14,12 +14,19 @@ keywords = ["pragma", "sdk", "consumer", "data", "feeds"] [dependencies] bigdecimal = { workspace = true, features = ["serde"] } chrono = { workspace = true } +color-eyre = { workspace = true } +init-tracing-opentelemetry = { workspace = true } +opentelemetry = { workspace = true } +opentelemetry-appender-tracing = { workspace = true } +opentelemetry-otlp = { workspace = true } +opentelemetry-semantic-conventions = { workspace = true } +opentelemetry_sdk = { workspace = true } serde = { workspace = true, features = ["derive"] } starknet = { workspace = true } strum = { workspace = true, features = ["derive"] } thiserror = { workspace = true } tracing = { workspace = true } -tracing-axiom = "0.7" +tracing-opentelemetry = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter"] } utoipa = { workspace = true } diff --git a/pragma-common/src/lib.rs b/pragma-common/src/lib.rs index 2492f72f..ddca6af3 100644 --- a/pragma-common/src/lib.rs +++ b/pragma-common/src/lib.rs @@ -1,5 +1,5 @@ pub mod errors; pub mod hash; -pub mod tracing; +pub mod telemetry; pub mod types; pub mod utils; diff --git a/pragma-common/src/telemetry.rs b/pragma-common/src/telemetry.rs new file mode 100644 index 00000000..6d171579 --- /dev/null +++ b/pragma-common/src/telemetry.rs @@ -0,0 +1,112 @@ +use color_eyre::eyre::Result; +use init_tracing_opentelemetry::tracing_subscriber_ext::build_otel_layer; +use opentelemetry::trace::TracerProvider; +use opentelemetry::{global, KeyValue}; +use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; +use opentelemetry_otlp::WithExportConfig; +use opentelemetry_sdk::logs::{BatchConfig, LoggerProvider}; +use opentelemetry_sdk::metrics::reader::DefaultTemporalitySelector; +use opentelemetry_sdk::metrics::{MeterProviderBuilder, PeriodicReader}; +use opentelemetry_sdk::{runtime, trace::BatchConfigBuilder}; +use opentelemetry_sdk::{ + trace::{Config, Tracer}, + Resource, +}; +use opentelemetry_semantic_conventions::resource::SERVICE_NAME; +use tracing::level_filters::LevelFilter; +use tracing::Level; +use tracing_opentelemetry::OpenTelemetryLayer; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; + +pub fn init_telemetry( + app_name: String, + collection_endpoint: String, + log_level: Option, +) -> Result<()> { + let tracing_subscriber = tracing_subscriber::registry() + .with(build_otel_layer()?) + .with(LevelFilter::from_level(log_level.unwrap_or(Level::INFO))) + .with( + tracing_subscriber::fmt::layer() + .with_target(false) + .with_file(false) + .with_line_number(false) + .pretty(), + ); + + let tracer_provider = init_tracer_provider(&app_name, &collection_endpoint)?; + let logger_provider = init_logs_provider(&app_name, &collection_endpoint)?; + init_meter_provider(&app_name, &collection_endpoint)?; + + tracing_subscriber + .with(OpenTelemetryLayer::new(tracer_provider)) + .with(OpenTelemetryTracingBridge::new(&logger_provider)) + .init(); + + Ok(()) +} + +fn init_tracer_provider(app_name: &str, collection_endpoint: &str) -> Result { + let provider = opentelemetry_otlp::new_pipeline() + .tracing() + .with_batch_config(BatchConfigBuilder::default().build()) + .with_trace_config( + Config::default().with_resource(Resource::new(vec![KeyValue::new( + SERVICE_NAME, + format!("{app_name}-trace-service"), + )])), + ) + .with_exporter( + opentelemetry_otlp::new_exporter() + .tonic() + .with_endpoint(collection_endpoint), + ) + .install_batch(runtime::Tokio) + .expect("Failed to install tracer provider"); + + global::set_tracer_provider(provider.clone()); + Ok(provider.tracer(format!("{app_name}-subscriber"))) +} + +fn init_logs_provider(app_name: &str, collection_endpoint: &str) -> Result { + let logger = opentelemetry_otlp::new_pipeline() + .logging() + .with_batch_config(BatchConfig::default()) + .with_resource(Resource::new(vec![KeyValue::new( + SERVICE_NAME, + format!("{app_name}-logs-service"), + )])) + .with_exporter( + opentelemetry_otlp::new_exporter() + .tonic() + .with_endpoint(collection_endpoint), + ) + .install_batch(runtime::Tokio)?; + + Ok(logger) +} + +pub fn init_meter_provider(app_name: &str, collection_endpoint: &str) -> Result<()> { + let exporter = opentelemetry_otlp::new_exporter() + .tonic() + .with_endpoint(collection_endpoint) + .build_metrics_exporter(Box::new(DefaultTemporalitySelector::new()))?; + + let reader = PeriodicReader::builder(exporter, runtime::Tokio) + .with_interval(std::time::Duration::from_secs(5)) + .build(); + + let metrics_provider = MeterProviderBuilder::default() + .with_reader(reader) + .with_resource(Resource::new(vec![KeyValue::new( + SERVICE_NAME, + format!("{app_name}-meter-service"), + )])) + .build(); + + // Set the global meter provider + global::set_meter_provider(metrics_provider); + + Ok(()) +} diff --git a/pragma-common/src/tracing.rs b/pragma-common/src/tracing.rs deleted file mode 100644 index 097753df..00000000 --- a/pragma-common/src/tracing.rs +++ /dev/null @@ -1,41 +0,0 @@ -use std::env; -use tracing_subscriber::filter::filter_fn; -use tracing_subscriber::prelude::*; -use tracing_subscriber::Layer; - -pub fn init_tracing(service_name: &str) -> Result<(), Box> { - let axum_layer = tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| { - "example_tracing_aka_logging=debug,tower_http=debug,axum::rejection=trace".into() - }); - - let fmt_layer = tracing_subscriber::fmt::layer() - .compact() - .with_file(true) - .with_line_number(true) - .with_thread_ids(true) - .with_target(false) - .pretty(); - - let filter = filter_fn(|metadata| { - metadata.target() != "hyper" && metadata.level() <= &tracing::Level::DEBUG - }); - - let mut layers: Vec + Send + Sync>> = vec![ - Box::new(fmt_layer.with_filter(filter.clone())), - Box::new(axum_layer.with_filter(filter.clone())), - ]; - - // Check if the Axiom token is set - if env::var("AXIOM_TOKEN").is_ok() { - if let Ok(axiom_layer) = tracing_axiom::builder_with_env(service_name)? - .with_dataset("pragma-node")? - .build() - { - layers.push(Box::new(axiom_layer.with_filter(filter))); - } - } - - tracing_subscriber::registry().with(layers).try_init()?; - - Ok(()) -} diff --git a/pragma-entities/src/models/entries/entry_error.rs b/pragma-entities/src/models/entries/entry_error.rs index f4448485..f8e56de0 100644 --- a/pragma-entities/src/models/entries/entry_error.rs +++ b/pragma-entities/src/models/entries/entry_error.rs @@ -13,7 +13,7 @@ pub enum VolatilityError { InvalidTimestampsRange(u64, u64), } -#[derive(Debug, thiserror::Error)] +#[derive(Debug, thiserror::Error, ToSchema)] pub enum SigningError { #[error("Invalid message: {0}")] InvalidMessageError(String), diff --git a/pragma-ingestor/src/main.rs b/pragma-ingestor/src/main.rs index 65d0e51e..5ec92357 100644 --- a/pragma-ingestor/src/main.rs +++ b/pragma-ingestor/src/main.rs @@ -14,7 +14,11 @@ mod error; #[tracing::instrument] async fn main() -> Result<(), Box> { let _ = dotenv(); // .env file is not present in prod - pragma_common::tracing::init_tracing("pragma-ingestor")?; + + let otel_endpoint = std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT") + .unwrap_or_else(|_| "http://signoz.dev.pragma.build:4317".to_string()); + pragma_common::telemetry::init_telemetry("pragma-ingestor".into(), otel_endpoint, None)?; + info!( "kafka configuration : hostname={:?}, group_id={}, topic={}", config::CONFIG.brokers, diff --git a/pragma-node/Cargo.toml b/pragma-node/Cargo.toml index 41dd5de6..d9439b89 100644 --- a/pragma-node/Cargo.toml +++ b/pragma-node/Cargo.toml @@ -9,6 +9,7 @@ aws-config = { workspace = true, features = ["behavior-version-latest"] } aws-sdk-secretsmanager = { workspace = true } axum = { workspace = true, features = ["macros", "ws", "tokio"] } axum-macros = { workspace = true } +axum-tracing-opentelemetry = { workspace = true } bigdecimal = { workspace = true, features = ["serde"] } cainome = { workspace = true, features = ["abigen-rs"] } chrono = { workspace = true, features = ["serde"] } @@ -28,8 +29,9 @@ lazy_static = { workspace = true } moka = { workspace = true, features = ["future"] } nonzero_ext = { workspace = true } opentelemetry = { workspace = true } +pragma-common = { path = "../pragma-common" } +pragma-entities = { path = "../pragma-entities" } pragma-monitoring = { workspace = true } -prometheus = { workspace = true } rdkafka = { workspace = true } redis = { workspace = true, features = ["tokio-comp", "json"] } serde = { workspace = true, features = ["derive"] } @@ -43,11 +45,8 @@ tower-http = { workspace = true, features = ["fs", "trace", "cors"] } tracing = { workspace = true } utoipa = { workspace = true } utoipa-swagger-ui = { workspace = true, features = ["axum"] } -uuid = { workspace = true, features = ["fast-rng", "v4", "serde"] } - -pragma-common = { path = "../pragma-common" } -pragma-entities = { path = "../pragma-entities" } utoipauto = { workspace = true } +uuid = { workspace = true, features = ["fast-rng", "v4", "serde"] } [dev-dependencies] rstest = { workspace = true } diff --git a/pragma-node/src/caches.rs b/pragma-node/src/caches.rs index f46996fe..1f8584d2 100644 --- a/pragma-node/src/caches.rs +++ b/pragma-node/src/caches.rs @@ -14,7 +14,7 @@ use crate::infra::repositories::onchain_repository::publisher::RawPublisherUpdat /// Structure responsible of holding our Databases caches. /// All the caches are initialized empty with their associated time to live in the /// constants module. -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct CacheRegistry { onchain_publishers_updates: Cache>, merkle_feed_tree: Cache, diff --git a/pragma-node/src/config.rs b/pragma-node/src/config.rs index 350d9330..f9fb3e6c 100644 --- a/pragma-node/src/config.rs +++ b/pragma-node/src/config.rs @@ -5,7 +5,6 @@ use tokio::sync::OnceCell; pub struct ServerConfig { host: String, port: u16, - metrics_port: u16, } impl Default for ServerConfig { @@ -13,7 +12,6 @@ impl Default for ServerConfig { Self { host: "0.0.0.0".to_string(), port: 3000, - metrics_port: 8080, } } } @@ -80,10 +78,6 @@ impl Config { self.server.port } - pub fn metrics_port(&self) -> u16 { - self.server.metrics_port - } - pub fn kafka_topic(&self) -> &str { &self.kafka.topic } diff --git a/pragma-node/src/handlers/create_entry.rs b/pragma-node/src/handlers/create_entry.rs index a0c62d62..f3b35678 100644 --- a/pragma-node/src/handlers/create_entry.rs +++ b/pragma-node/src/handlers/create_entry.rs @@ -1,4 +1,4 @@ -use axum::extract::State; +use axum::extract::{self, State}; use axum::Json; use chrono::{DateTime, Utc}; use pragma_entities::{EntryError, NewEntry, PublisherError}; @@ -10,7 +10,7 @@ use crate::config::config; use crate::infra::kafka; use crate::infra::repositories::publisher_repository; use crate::types::entries::Entry; -use crate::utils::{assert_request_signature_is_valid, felt_from_decimal, JsonExtractor}; +use crate::utils::{assert_request_signature_is_valid, felt_from_decimal}; use crate::AppState; #[derive(Debug, Serialize, Deserialize, ToSchema)] @@ -47,9 +47,10 @@ pub struct CreateEntryResponse { (status = 401, description = "Unauthorized Publisher", body = EntryError) ) )] +#[tracing::instrument] pub async fn create_entries( State(state): State, - JsonExtractor(new_entries): JsonExtractor, + extract::Json(new_entries): extract::Json, ) -> Result, EntryError> { tracing::info!("Received new entries: {:?}", new_entries); diff --git a/pragma-node/src/handlers/create_future_entry.rs b/pragma-node/src/handlers/create_future_entry.rs index baba7903..5477b3c9 100644 --- a/pragma-node/src/handlers/create_future_entry.rs +++ b/pragma-node/src/handlers/create_future_entry.rs @@ -1,4 +1,4 @@ -use axum::extract::State; +use axum::extract::{self, State}; use axum::Json; use chrono::{DateTime, Utc}; use pragma_entities::{EntryError, NewFutureEntry, PublisherError}; @@ -10,7 +10,7 @@ use crate::config::config; use crate::infra::kafka; use crate::infra::repositories::publisher_repository; use crate::types::entries::FutureEntry; -use crate::utils::{assert_request_signature_is_valid, felt_from_decimal, JsonExtractor}; +use crate::utils::{assert_request_signature_is_valid, felt_from_decimal}; use crate::AppState; #[derive(Debug, Serialize, Deserialize, ToSchema)] @@ -47,9 +47,10 @@ pub struct CreateFutureEntryResponse { (status = 401, description = "Unauthorized Publisher", body = EntryError) ) )] +#[tracing::instrument] pub async fn create_future_entries( State(state): State, - JsonExtractor(new_entries): JsonExtractor, + extract::Json(new_entries): extract::Json, ) -> Result, EntryError> { tracing::info!("Received new future entries: {:?}", new_entries); diff --git a/pragma-node/src/handlers/get_entry.rs b/pragma-node/src/handlers/get_entry.rs index 0a80e037..9d33e37e 100644 --- a/pragma-node/src/handlers/get_entry.rs +++ b/pragma-node/src/handlers/get_entry.rs @@ -102,6 +102,7 @@ pub struct GetEntryResponse { GetEntryParams, ), )] +#[tracing::instrument] pub async fn get_entry( State(state): State, PathExtractor(pair): PathExtractor<(String, String)>, diff --git a/pragma-node/src/handlers/get_expiries.rs b/pragma-node/src/handlers/get_expiries.rs index 5b95e674..20ba322b 100644 --- a/pragma-node/src/handlers/get_expiries.rs +++ b/pragma-node/src/handlers/get_expiries.rs @@ -14,13 +14,14 @@ use crate::utils::currency_pair_to_pair_id; get, path = "/node/v1/data/{base}/{quote}/future_expiries", responses( - (status = 200, description = "Get available future expiries for a pair", body = [GetEntryResponse]) + (status = 200, description = "Get available future expiries for a pair", body = [Vec]) ), params( ("base" = String, Path, description = "Base Asset"), ("quote" = String, Path, description = "Quote Asset"), ), )] +#[tracing::instrument] pub async fn get_expiries( State(state): State, PathExtractor(pair): PathExtractor<(String, String)>, diff --git a/pragma-node/src/handlers/get_ohlc.rs b/pragma-node/src/handlers/get_ohlc.rs index 635361a1..24a21851 100644 --- a/pragma-node/src/handlers/get_ohlc.rs +++ b/pragma-node/src/handlers/get_ohlc.rs @@ -30,6 +30,7 @@ pub struct GetOHLCResponse { GetEntryParams, ), )] +#[tracing::instrument] pub async fn get_ohlc( State(state): State, PathExtractor(pair): PathExtractor<(String, String)>, diff --git a/pragma-node/src/handlers/get_volatility.rs b/pragma-node/src/handlers/get_volatility.rs index 5efa9a3c..647590ab 100644 --- a/pragma-node/src/handlers/get_volatility.rs +++ b/pragma-node/src/handlers/get_volatility.rs @@ -11,7 +11,7 @@ use pragma_entities::{EntryError, VolatilityError}; use crate::utils::{compute_volatility, currency_pair_to_pair_id}; /// Volatility query -#[derive(Deserialize, IntoParams)] +#[derive(Deserialize, IntoParams, Debug)] pub struct VolatilityQuery { /// Initial timestamp, combined with final_timestamp, it helps define the period over which the mean is computed start: u64, @@ -38,6 +38,7 @@ pub struct GetVolatilityResponse { VolatilityQuery ), )] +#[tracing::instrument] pub async fn get_volatility( State(state): State, PathExtractor(pair): PathExtractor<(String, String)>, diff --git a/pragma-node/src/handlers/merkle_feeds/get_merkle_proof.rs b/pragma-node/src/handlers/merkle_feeds/get_merkle_proof.rs index 92ee9b8b..b7443ea9 100644 --- a/pragma-node/src/handlers/merkle_feeds/get_merkle_proof.rs +++ b/pragma-node/src/handlers/merkle_feeds/get_merkle_proof.rs @@ -15,7 +15,7 @@ use crate::types::hex_hash::HexHash; use crate::utils::PathExtractor; use crate::AppState; -#[derive(Default, Deserialize, IntoParams, ToSchema)] +#[derive(Default, Deserialize, IntoParams, ToSchema, Debug)] pub struct GetMerkleProofQuery { pub network: Option, pub block_id: Option, @@ -35,6 +35,7 @@ pub struct GetMerkleProofResponse(pub MerkleProof); GetMerkleProofQuery ), )] +#[tracing::instrument] pub async fn get_merkle_feeds_proof( State(state): State, PathExtractor(option_hex_hash): PathExtractor, diff --git a/pragma-node/src/handlers/merkle_feeds/get_option.rs b/pragma-node/src/handlers/merkle_feeds/get_option.rs index 78c8224c..e4ffdc2b 100644 --- a/pragma-node/src/handlers/merkle_feeds/get_option.rs +++ b/pragma-node/src/handlers/merkle_feeds/get_option.rs @@ -13,7 +13,7 @@ use crate::infra::redis; use crate::utils::PathExtractor; use crate::AppState; -#[derive(Default, Deserialize, IntoParams, ToSchema)] +#[derive(Default, Deserialize, IntoParams, ToSchema, Debug)] pub struct GetOptionQuery { pub network: Option, #[serde(rename = "block_id")] @@ -38,6 +38,7 @@ pub struct GetOptionResponse { GetOptionQuery ), )] +#[tracing::instrument] pub async fn get_merkle_feeds_option( State(state): State, PathExtractor(instrument): PathExtractor, diff --git a/pragma-node/src/handlers/onchain/get_checkpoints.rs b/pragma-node/src/handlers/onchain/get_checkpoints.rs index 87219862..c0f4ac62 100644 --- a/pragma-node/src/handlers/onchain/get_checkpoints.rs +++ b/pragma-node/src/handlers/onchain/get_checkpoints.rs @@ -53,6 +53,7 @@ pub struct GetOnchainCheckpointsResponse(pub Vec); GetOnchainCheckpointsParams ), )] +#[tracing::instrument] pub async fn get_onchain_checkpoints( State(state): State, PathExtractor(pair): PathExtractor<(String, String)>, diff --git a/pragma-node/src/handlers/onchain/get_entry.rs b/pragma-node/src/handlers/onchain/get_entry.rs index d5351ed6..706a6751 100644 --- a/pragma-node/src/handlers/onchain/get_entry.rs +++ b/pragma-node/src/handlers/onchain/get_entry.rs @@ -59,6 +59,7 @@ pub struct GetOnchainEntryResponse { GetOnchainEntryParams ), )] +#[tracing::instrument] pub async fn get_onchain_entry( State(state): State, PathExtractor(pair): PathExtractor<(String, String)>, diff --git a/pragma-node/src/handlers/onchain/get_history.rs b/pragma-node/src/handlers/onchain/get_history.rs index 92ac5259..4b299401 100644 --- a/pragma-node/src/handlers/onchain/get_history.rs +++ b/pragma-node/src/handlers/onchain/get_history.rs @@ -46,6 +46,7 @@ pub struct GetOnchainHistoryResponse(pub Vec); GetOnchainHistoryParams ), )] +#[tracing::instrument] pub async fn get_onchain_history( State(state): State, PathExtractor(pair): PathExtractor<(String, String)>, diff --git a/pragma-node/src/handlers/onchain/get_publishers.rs b/pragma-node/src/handlers/onchain/get_publishers.rs index 9bb55b0b..787117a1 100644 --- a/pragma-node/src/handlers/onchain/get_publishers.rs +++ b/pragma-node/src/handlers/onchain/get_publishers.rs @@ -53,6 +53,7 @@ pub struct GetOnchainPublishersResponse(pub Vec); GetOnchainPublishersParams ), )] +#[tracing::instrument] pub async fn get_onchain_publishers( State(state): State, Query(params): Query, diff --git a/pragma-node/src/handlers/onchain/subscribe_to_ohlc.rs b/pragma-node/src/handlers/onchain/subscribe_to_ohlc.rs index a9bed39d..b1fcb2e7 100644 --- a/pragma-node/src/handlers/onchain/subscribe_to_ohlc.rs +++ b/pragma-node/src/handlers/onchain/subscribe_to_ohlc.rs @@ -13,10 +13,9 @@ use utoipa::{ToResponse, ToSchema}; use crate::infra::repositories::entry_repository::OHLCEntry; use crate::infra::repositories::onchain_repository; -use crate::types::ws::metrics::{Interaction, Status}; use crate::types::ws::{ChannelHandler, Subscriber, SubscriptionType}; use crate::utils::is_onchain_existing_pair; -use crate::AppState; +use crate::{metrics, AppState}; use axum::extract::ws::{WebSocket, WebSocketUpgrade}; @@ -26,17 +25,6 @@ pub struct GetOnchainOHLCResponse { pub data: Vec, } -#[utoipa::path( - get, - path = "/node/v1/onchain/ohlc/subscribe", - responses( - ( - status = 200, - description = "Subscribe to a list of OHLC entries", - body = [SubscribeToEntryResponse] - ) - ) -)] pub async fn subscribe_to_onchain_ohlc( ws: WebSocketUpgrade, State(state): State, @@ -50,6 +38,7 @@ const CHANNEL_UPDATE_INTERVAL_IN_MS: u64 = 30000; // 30 seconds async fn create_new_subscriber(socket: WebSocket, app_state: AppState, client_addr: SocketAddr) { let (mut subscriber, _) = match Subscriber::::new( + "subscribe_to_ohlc".into(), socket, client_addr.ip(), Arc::new(app_state), @@ -209,7 +198,7 @@ impl WsOHLCHandler { "Rate limit exceeded. Closing connection.", ); - subscriber.record_metric(Interaction::RateLimit, Status::Error); + subscriber.record_metric(metrics::Interaction::RateLimit, metrics::Status::Error); subscriber.send_err("Rate limit exceeded.").await; subscriber.sender.close().await?; diff --git a/pragma-node/src/handlers/optimistic_oracle/get_assertion_details.rs b/pragma-node/src/handlers/optimistic_oracle/get_assertion_details.rs index 7f57b5b8..4e3988fd 100644 --- a/pragma-node/src/handlers/optimistic_oracle/get_assertion_details.rs +++ b/pragma-node/src/handlers/optimistic_oracle/get_assertion_details.rs @@ -16,6 +16,7 @@ use crate::handlers::optimistic_oracle::types::AssertionDetails; ("assertion_id" = String, Path, description = "Unique identifier of the assertion"), ), )] +#[tracing::instrument] pub async fn get_assertion_details( State(state): State, Path(assertion_id): Path, diff --git a/pragma-node/src/handlers/optimistic_oracle/get_assertions.rs b/pragma-node/src/handlers/optimistic_oracle/get_assertions.rs index dc3a26c1..86753914 100644 --- a/pragma-node/src/handlers/optimistic_oracle/get_assertions.rs +++ b/pragma-node/src/handlers/optimistic_oracle/get_assertions.rs @@ -19,6 +19,7 @@ pub const DEFAULT_LIMIT: u32 = 100; ("limit" = Option, Query, description = "Number of items per page"), ), )] +#[tracing::instrument] pub async fn get_assertions( State(state): State, Query(params): Query, diff --git a/pragma-node/src/handlers/optimistic_oracle/get_disputed_assertions.rs b/pragma-node/src/handlers/optimistic_oracle/get_disputed_assertions.rs index dd280ec1..0a0d953f 100644 --- a/pragma-node/src/handlers/optimistic_oracle/get_disputed_assertions.rs +++ b/pragma-node/src/handlers/optimistic_oracle/get_disputed_assertions.rs @@ -20,6 +20,7 @@ pub const DEFAULT_LIMIT: u32 = 100; ("limit" = Option, Query, description = "Number of items per page"), ), )] +#[tracing::instrument] pub async fn get_disputed_assertions( State(state): State, Query(params): Query, diff --git a/pragma-node/src/handlers/optimistic_oracle/get_resolved_assertions.rs b/pragma-node/src/handlers/optimistic_oracle/get_resolved_assertions.rs index cba8c69a..4c14ffaa 100644 --- a/pragma-node/src/handlers/optimistic_oracle/get_resolved_assertions.rs +++ b/pragma-node/src/handlers/optimistic_oracle/get_resolved_assertions.rs @@ -21,6 +21,7 @@ pub const DEFAULT_LIMIT: u32 = 100; ("limit" = Option, Query, description = "Number of items per page"), ), )] +#[tracing::instrument] pub async fn get_resolved_assertions( State(state): State, Query(params): Query, diff --git a/pragma-node/src/handlers/optimistic_oracle/types.rs b/pragma-node/src/handlers/optimistic_oracle/types.rs index e4550baa..5b80eff3 100644 --- a/pragma-node/src/handlers/optimistic_oracle/types.rs +++ b/pragma-node/src/handlers/optimistic_oracle/types.rs @@ -48,6 +48,7 @@ pub struct GetAssertionsParams { pub struct Assertion { pub assertion_id: String, pub claim: String, + #[schema(value_type = String)] pub bond: BigDecimal, pub expiration_time: NaiveDateTime, pub identifier: String, diff --git a/pragma-node/src/handlers/subscribe_to_entry.rs b/pragma-node/src/handlers/subscribe_to_entry.rs index 0701f6f2..582a7af0 100644 --- a/pragma-node/src/handlers/subscribe_to_entry.rs +++ b/pragma-node/src/handlers/subscribe_to_entry.rs @@ -44,17 +44,7 @@ pub struct SubscribeToEntryResponse { pub timestamp: UnixTimestamp, } -#[utoipa::path( - get, - path = "/node/v1/data/subscribe", - responses( - ( - status = 200, - description = "Subscribe to a list of entries", - body = [SubscribeToEntryResponse] - ) - ) -)] +#[tracing::instrument] pub async fn subscribe_to_entry( ws: WebSocketUpgrade, State(state): State, @@ -71,6 +61,7 @@ const CHANNEL_UPDATE_INTERVAL_IN_MS: u64 = 500; async fn create_new_subscriber(socket: WebSocket, app_state: AppState, client_addr: SocketAddr) { let (mut subscriber, _) = match Subscriber::::new( + "subscribe_to_entry".into(), socket, client_addr.ip(), Arc::new(app_state), diff --git a/pragma-node/src/handlers/subscribe_to_price.rs b/pragma-node/src/handlers/subscribe_to_price.rs index 469bd536..71377a1c 100644 --- a/pragma-node/src/handlers/subscribe_to_price.rs +++ b/pragma-node/src/handlers/subscribe_to_price.rs @@ -32,17 +32,7 @@ pub struct SubscribeToPriceResponse { pub timestamp: UnixTimestamp, } -#[utoipa::path( - get, - path = "/node/v1/data/price/subscribe", - responses( - ( - status = 200, - description = "Subscribe to a list of pairs' prices", - body = [SubscribeToPriceResponse] - ) - ) -)] +#[tracing::instrument] pub async fn subscribe_to_price( ws: WebSocketUpgrade, State(state): State, @@ -56,6 +46,7 @@ const CHANNEL_UPDATE_INTERVAL_IN_MS: u64 = 500; async fn create_new_subscriber(socket: WebSocket, app_state: AppState, client_addr: SocketAddr) { let (mut subscriber, _) = match Subscriber::::new( + "subscribe_to_price".into(), socket, client_addr.ip(), Arc::new(app_state), diff --git a/pragma-node/src/main.rs b/pragma-node/src/main.rs index 26803c04..f99bdc7f 100644 --- a/pragma-node/src/main.rs +++ b/pragma-node/src/main.rs @@ -5,11 +5,13 @@ mod errors; mod handlers; mod infra; mod metrics; -mod servers; +mod server; mod types; mod utils; use dotenvy::dotenv; +use metrics::MetricsRegistry; +use std::fmt; use std::sync::Arc; use caches::CacheRegistry; @@ -19,9 +21,7 @@ use starknet::signers::SigningKey; use pragma_entities::connection::{ENV_OFFCHAIN_DATABASE_URL, ENV_ONCHAIN_DATABASE_URL}; use crate::config::config; -use crate::metrics::MetricsRegistry; use crate::utils::PragmaSignerBuilder; -use types::ws::metrics::WsMetrics; #[derive(Clone)] pub struct AppState { @@ -36,7 +36,18 @@ pub struct AppState { // Pragma Signer used for StarkEx signing pragma_signer: Option, // Metrics - ws_metrics: Arc, + metrics: Arc, +} + +impl fmt::Debug for AppState { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("AppState") + .field("redis_client", &self.redis_client) + .field("caches", &self.caches) + .field("pragma_signer", &self.pragma_signer) + .field("metrics", &self.metrics) + .finish_non_exhaustive() + } } #[tokio::main] @@ -44,7 +55,9 @@ pub struct AppState { async fn main() -> Result<(), Box> { dotenv().ok(); - pragma_common::tracing::init_tracing("pragma-node")?; + let otel_endpoint = std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT") + .unwrap_or_else(|_| "http://signoz.dev.pragma.build:4317".to_string()); + pragma_common::telemetry::init_telemetry("pragma-node".into(), otel_endpoint, None)?; let config = config().await; @@ -84,23 +97,16 @@ async fn main() -> Result<(), Box> { } }; - // Create the Metrics registry - let metrics_registry = MetricsRegistry::new(); - let ws_metrics = WsMetrics::new(&metrics_registry).expect("Failed to create WsMetrics"); - let state = AppState { offchain_pool, onchain_pool, redis_client, caches: Arc::new(caches), pragma_signer, - ws_metrics: Arc::new(ws_metrics), + metrics: MetricsRegistry::new(), }; - tokio::join!( - servers::app::run_app_server(config, state), - servers::metrics::run_metrics_server(config, metrics_registry) - ); + server::run_api_server(config, state).await; // Ensure that the tracing provider is shutdown correctly opentelemetry::global::shutdown_tracer_provider(); diff --git a/pragma-node/src/metrics.rs b/pragma-node/src/metrics.rs index f831e0ea..4a8b316b 100644 --- a/pragma-node/src/metrics.rs +++ b/pragma-node/src/metrics.rs @@ -1,23 +1,101 @@ -use prometheus::{Error as PrometheusError, Registry}; use std::sync::Arc; -#[derive(Clone, Debug)] -pub struct MetricsRegistry(Arc); +use opentelemetry::{metrics::Counter, KeyValue}; +use strum::Display; + +#[derive(Debug)] +pub struct MetricsRegistry { + /// TODO(akhercha): See which additional metrics we want here? + pub ws_metrics: WsMetricsRegistry, +} impl MetricsRegistry { - pub fn new() -> Self { - MetricsRegistry(Arc::new(Registry::new())) + pub fn new() -> Arc { + Arc::new(Self { + ws_metrics: Arc::try_unwrap(WsMetricsRegistry::new()) + .unwrap_or_else(|arc| (*arc).clone()), + }) + } +} + +#[derive(Debug, Clone)] +pub struct WsMetricsRegistry { + metrics: std::collections::HashMap, +} + +impl WsMetricsRegistry { + pub fn new() -> Arc { + let mut metrics = std::collections::HashMap::new(); + + let endpoints = [ + "subscribe_to_entry", + "subscribe_to_price", + "subscribe_to_ohlc", + ]; + for endpoint in endpoints.iter() { + metrics.insert(endpoint.to_string(), WsMetrics::new(endpoint)); + } + + Arc::new(Self { metrics }) } - pub fn register( + pub fn record_ws_interaction( &self, - metric: T, - ) -> Result { - self.0.register(Box::new(metric.clone()))?; - Ok(metric) + endpoint_name: &str, + interaction: Interaction, + status: Status, + ) { + if let Some(metrics) = self.metrics.get(endpoint_name) { + metrics.record_interaction(interaction, status); + } else { + tracing::warn!("No metrics registered for WS endpoint: {}", endpoint_name); + } + } +} + +#[derive(Display, Clone, Debug)] +pub enum Interaction { + NewConnection, + CloseConnection, + ClientMessageDecode, + ClientMessageProcess, + ChannelUpdate, + RateLimit, +} + +#[derive(Display, Clone, Debug)] +pub enum Status { + Success, + Error, +} + +#[derive(Debug, Clone)] +pub struct WsMetrics { + interactions: Counter, +} + +impl WsMetrics { + fn new(endpoint_name: &str) -> Self { + let meter = opentelemetry::global::meter("pragma-node-meter"); + let interactions = meter + .u64_counter(format!("{}_ws_interactions_total", endpoint_name)) + .with_description(format!( + "Number of WebSocket interactions for {}", + endpoint_name + )) + .with_unit("count") + .init(); + + Self { interactions } } - pub fn registry(&self) -> Arc { - self.0.clone() + fn record_interaction(&self, interaction: Interaction, status: Status) { + self.interactions.add( + 1, + &[ + KeyValue::new("interaction", interaction.to_string()), + KeyValue::new("status", status.to_string()), + ], + ); } } diff --git a/pragma-node/src/servers/app/mod.rs b/pragma-node/src/server/mod.rs similarity index 78% rename from pragma-node/src/servers/app/mod.rs rename to pragma-node/src/server/mod.rs index 37b89543..dc49d13b 100644 --- a/pragma-node/src/servers/app/mod.rs +++ b/pragma-node/src/server/mod.rs @@ -1,10 +1,8 @@ pub(crate) mod routes; +use axum_tracing_opentelemetry::middleware::{OtelAxumLayer, OtelInResponseLayer}; use std::net::SocketAddr; -use tower_http::{ - cors::CorsLayer, - trace::{DefaultMakeSpan, TraceLayer}, -}; +use tower_http::cors::CorsLayer; use utoipa::{ openapi::{ security::{ApiKey, ApiKeyValue, SecurityScheme}, @@ -15,7 +13,7 @@ use utoipa::{ use utoipauto::utoipauto; use crate::errors::internal_error; -use crate::{config::Config, servers::app::routes::app_router, AppState}; +use crate::{config::Config, server::routes::app_router, AppState}; struct SecurityAddon; @@ -46,7 +44,7 @@ impl Modify for ServerAddon { } #[tracing::instrument(skip(state))] -pub async fn run_app_server(config: &Config, state: AppState) { +pub async fn run_api_server(config: &Config, state: AppState) { #[utoipauto( paths = "./pragma-node/src, ./pragma-common/src from pragma_common, ./pragma-entities/src from pragma_entities" )] @@ -67,10 +65,8 @@ pub async fn run_app_server(config: &Config, state: AppState) { let app = app_router::(state.clone()) .with_state(state) // Logging so we can see whats going on - .layer( - TraceLayer::new_for_http() - .make_span_with(DefaultMakeSpan::default().include_headers(true)), - ) + .layer(OtelAxumLayer::default()) + .layer(OtelInResponseLayer) // Permissive CORS layer to allow all origins .layer(CorsLayer::permissive()); @@ -78,13 +74,17 @@ pub async fn run_app_server(config: &Config, state: AppState) { let port = config.server_port(); let address = format!("{}:{}", host, port); let socket_addr: SocketAddr = address.parse().unwrap(); + let listener = tokio::net::TcpListener::bind(socket_addr) + .await + .expect("Invalid API server address."); tracing::info!("πŸš€ API started at http://{}", socket_addr); - tokio::spawn(async move { - axum::Server::bind(&socket_addr) - .serve(app.into_make_service_with_connect_info::()) - .await - .map_err(internal_error) - .unwrap() - }); + + axum::serve( + listener, + app.into_make_service_with_connect_info::(), + ) + .await + .map_err(internal_error) + .unwrap(); } diff --git a/pragma-node/src/servers/app/routes.rs b/pragma-node/src/server/routes.rs similarity index 100% rename from pragma-node/src/servers/app/routes.rs rename to pragma-node/src/server/routes.rs diff --git a/pragma-node/src/servers/metrics.rs b/pragma-node/src/servers/metrics.rs deleted file mode 100644 index c50180fa..00000000 --- a/pragma-node/src/servers/metrics.rs +++ /dev/null @@ -1,50 +0,0 @@ -use axum::body::Body; -use axum::http::{Response, StatusCode}; -use axum::response::IntoResponse; -use axum::Server; -use axum::{routing::get, Router}; -use prometheus::{Encoder, TextEncoder}; -use std::net::SocketAddr; - -use crate::config::Config; -use crate::metrics::MetricsRegistry; - -pub async fn run_metrics_server(config: &Config, metrics_registry: MetricsRegistry) { - let app = Router::new() - .route("/", get(root_handler)) - .route("/metrics", get(move || metrics_handler(metrics_registry))); - - let host = config.server_host(); - let port = config.metrics_port(); - - let address = format!("{}:{}", host, port); - let socket_addr: SocketAddr = address.parse().unwrap(); - - tracing::info!("πŸ–¨ Metrics available at http://{}/metrics", socket_addr); - Server::bind(&socket_addr) - .serve(app.into_make_service()) - .await - .unwrap(); -} - -async fn root_handler() -> impl IntoResponse { - let html_content = "/metrics"; - Response::builder() - .header("Content-Type", "text/html") - .body(Body::from(html_content)) - .unwrap() -} - -async fn metrics_handler(metrics_registry: MetricsRegistry) -> impl IntoResponse { - let mut buffer = vec![]; - let encoder = TextEncoder::new(); - - let metric_families = metrics_registry.registry().gather(); - encoder.encode(&metric_families, &mut buffer).unwrap(); - - Response::builder() - .status(StatusCode::OK) - .header("Content-Type", encoder.format_type()) - .body(Body::from(buffer)) - .unwrap() -} diff --git a/pragma-node/src/servers/mod.rs b/pragma-node/src/servers/mod.rs deleted file mode 100644 index 3ecd5dc8..00000000 --- a/pragma-node/src/servers/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod app; -pub mod metrics; diff --git a/pragma-node/src/types/ws/mod.rs b/pragma-node/src/types/ws.rs similarity index 92% rename from pragma-node/src/types/ws/mod.rs rename to pragma-node/src/types/ws.rs index 0d1b19cf..b8e8e3f7 100644 --- a/pragma-node/src/types/ws/mod.rs +++ b/pragma-node/src/types/ws.rs @@ -1,7 +1,4 @@ -pub mod metrics; - use governor::{DefaultKeyedRateLimiter, Quota, RateLimiter}; -use metrics::{Interaction, Status}; use nonzero_ext::nonzero; use serde::{Deserialize, Serialize}; use serde_json::json; @@ -11,6 +8,7 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc::{self, Receiver, Sender}; +use crate::metrics::{Interaction, Status}; use crate::AppState; use axum::extract::ws::{Message, WebSocket}; use futures_util::stream::{SplitSink, SplitStream}; @@ -44,6 +42,7 @@ pub enum WebSocketError { #[allow(dead_code)] pub struct Subscriber { pub id: Uuid, + pub endpoint_name: String, pub ip_address: IpAddr, pub closed: bool, pub state: Arc>, @@ -82,6 +81,7 @@ where { /// Create a new subscriber tied to a websocket connection. pub async fn new( + endpoint_name: String, socket: WebSocket, ip_address: IpAddr, app_state: Arc, @@ -94,6 +94,7 @@ where let mut subscriber = Subscriber { id, + endpoint_name, ip_address, closed: false, state: Arc::new(Mutex::new(state.unwrap_or_default())), @@ -132,20 +133,15 @@ where H: ChannelHandler, CM: for<'a> Deserialize<'a>, { - let tracing_span = tracing::span!(tracing::Level::INFO, "subscriber", id = %self.id); - let _tracing_guard = tracing_span.enter(); loop { tokio::select! { // Messages from the client maybe_client_msg = self.receiver.next() => { match maybe_client_msg { Some(Ok(client_msg)) => { - tracing::info!("πŸ‘€ [CLIENT -> SERVER]"); - tracing::info!("{:?}", client_msg); handler = self.decode_and_handle(handler, client_msg).await?; } Some(Err(_)) => { - tracing::info!("πŸ˜Άβ€πŸŒ«οΈ Client disconnected/error occurred. Closing the channel."); return Ok(()); }, None => {} @@ -168,8 +164,6 @@ where // Messages from the server to the client maybe_server_msg = self.notify_receiver.recv() => { if let Some(server_msg) = maybe_server_msg { - tracing::info!("πŸ₯‘ [SERVER -> CLIENT]"); - tracing::info!("{:?}", server_msg); let _ = self.sender.send(server_msg).await; } }, @@ -179,7 +173,6 @@ where self.sender.close().await.ok(); self.closed = true; self.record_metric(Interaction::CloseConnection, Status::Success); - tracing::info!("β›” [CLOSING SIGNAL]"); return Ok(()); } }, @@ -231,7 +224,6 @@ where ) -> Result, WebSocketError> { match msg { Message::Close(_) => { - tracing::info!("πŸ“¨ [CLOSE]"); if self.exit.0.send(true).is_ok() { self.sender .close() @@ -243,7 +235,6 @@ where } } Message::Text(text) => { - tracing::info!("πŸ“¨ [TEXT]"); let msg = serde_json::from_str::(&text); if let Ok(msg) = msg { return Ok(Some(msg)); @@ -253,7 +244,6 @@ where } } Message::Binary(payload) => { - tracing::info!("πŸ“¨ [BINARY]"); let maybe_msg = serde_json::from_slice::(&payload); if let Ok(msg) = maybe_msg { return Ok(Some(msg)); @@ -281,8 +271,10 @@ where /// Records a web socket metric. pub fn record_metric(&self, interaction: Interaction, status: Status) { - self.app_state - .ws_metrics - .record_interaction(interaction, status); + self.app_state.metrics.ws_metrics.record_ws_interaction( + &self.endpoint_name, + interaction, + status, + ); } } diff --git a/pragma-node/src/types/ws/metrics.rs b/pragma-node/src/types/ws/metrics.rs deleted file mode 100644 index a83dde3a..00000000 --- a/pragma-node/src/types/ws/metrics.rs +++ /dev/null @@ -1,44 +0,0 @@ -use prometheus::{CounterVec, Opts}; -use strum::Display; - -use crate::metrics::MetricsRegistry; - -#[derive(Display, Clone, Debug)] -pub enum Interaction { - NewConnection, - CloseConnection, - ClientMessageDecode, - ClientMessageProcess, - ChannelUpdate, - RateLimit, -} - -#[derive(Display, Clone, Debug)] -pub enum Status { - Success, - Error, -} - -pub struct WsMetrics { - interactions: CounterVec, -} - -impl WsMetrics { - pub fn new(registry: &MetricsRegistry) -> Result { - let interactions = registry.register(CounterVec::new( - Opts::new("ws_interactions", "Number of WebSocket interactions"), - &["interaction", "status"], - )?)?; - - Ok(Self { interactions }) - } - - pub fn record_interaction(&self, interaction: Interaction, status: Status) { - self.interactions - .with_label_values(&[ - interaction.to_string().as_str(), - status.to_string().as_str(), - ]) - .inc(); - } -} diff --git a/pragma-node/src/utils/custom_extractors/json_extractor.rs b/pragma-node/src/utils/custom_extractors/json_extractor.rs deleted file mode 100644 index b65efbf9..00000000 --- a/pragma-node/src/utils/custom_extractors/json_extractor.rs +++ /dev/null @@ -1,14 +0,0 @@ -use axum::extract::rejection::JsonRejection; -use axum_macros::FromRequest; - -use crate::errors::AppError; - -#[derive(FromRequest)] -#[from_request(via(axum::Json), rejection(AppError))] -pub struct JsonExtractor(pub T); - -impl From for AppError { - fn from(rejection: JsonRejection) -> Self { - AppError::BodyParsingError(rejection.to_string()) - } -} diff --git a/pragma-node/src/utils/custom_extractors/mod.rs b/pragma-node/src/utils/custom_extractors/mod.rs index eaec8484..d463c47b 100644 --- a/pragma-node/src/utils/custom_extractors/mod.rs +++ b/pragma-node/src/utils/custom_extractors/mod.rs @@ -1,2 +1 @@ -pub mod json_extractor; pub mod path_extractor; diff --git a/pragma-node/src/utils/mod.rs b/pragma-node/src/utils/mod.rs index d912b1dd..6a3d0b0d 100644 --- a/pragma-node/src/utils/mod.rs +++ b/pragma-node/src/utils/mod.rs @@ -2,7 +2,6 @@ pub use aws::PragmaSignerBuilder; pub use conversion::{ convert_via_quote, felt_from_decimal, format_bigdecimal_price, normalize_to_decimals, }; -pub use custom_extractors::json_extractor::JsonExtractor; pub use custom_extractors::path_extractor::PathExtractor; pub use signing::starkex::StarkexPrice; pub use signing::typed_data::TypedData; diff --git a/rust-toolchain.toml b/rust-toolchain.toml index cc34bc04..7d857086 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,5 +1,5 @@ [toolchain] -channel = "1.80.0" +channel = "1.82.0" components = ["rustfmt", "clippy", "rust-analyzer"] targets = ["wasm32-unknown-unknown"] profile = "minimal"