From 95ecc4e01b7fd06ec0b71c6486cb2cdd962e5040 Mon Sep 17 00:00:00 2001 From: Martin Beckmann Date: Thu, 8 Aug 2024 21:35:22 +0200 Subject: [PATCH] Tokio console and log filter reloading (#2868) # Description https://github.com/cowprotocol/services/pull/2815 reverted 2 changes (https://github.com/cowprotocol/services/pull/2791, https://github.com/cowprotocol/services/pull/2792) because we again saw issues with logs that were suddenly no longer emitted. This problem also happened the last time we merged support for `tokio-console` but I hoped that: 1. the issue was resolved in one of the related dependencies somehow 2. making the feature opt-in with a CLI flag would allow normal operations when disabled However, even with the feature disabled we saw that logs were again missing. I tried to debug the underlying issue and it appears that for some reason a log like `::tracing::event!(target: "sqlx::query", tracing::Level::DEBUG, summary);` would not get logged with the original implementation (not a huge deal since we weren't using them) but these logs would also cause the NEXT log that would normally be emitted to be skipped. This can be seen in this [code](https://github.com/cowprotocol/services/commit/5ba466668781d60bdf13531187f6baa1eac9fd89) which reproduced the error. Unfortunately `sqlx` is using `tracing::event!()` when executing an SQL query which means with the described problem it could happen randomly that some logs are missing. Luckily there are also scenarios where some SQL query always gets executed before a log we issue ourselves (e.g. `saving fee policies` log in the autopilot run loop would always be skipped) which made it easier to find the underlying cause. # Changes 1. Add `tokio-console` and `filter reloading` feature again 2. Fiddled around with how the tracing subscriber gets initialized until the test that reproduced the issue no longer skipped any logs. This was not straight forward since initializing the tokio console and our regular log filters requires you to jump through a few type system hoops. 3. removed CLI flag since this did not have the originally intended effect anyway # How to review The only important change happend in https://github.com/cowprotocol/services/pull/2868/commits/41c47ae6da040952ddb4d50ea03ba0f94636e35c. The other stuff is just the revert commit. ## How to test Run test that reproduced the issue and check that the targeted log does not get skipped. This doesn't necessarily mean that all is well now. To get more confidence on the change I would just run the CI and compare the number of logs with CI runs that don't change the tracing setup. If both have roughly the same number of logs (I assume some variance will always be there) the change should be good. --- .cargo/config.toml | 2 + Cargo.lock | 147 +++++++++++++++++++ Cargo.toml | 2 +- README.md | 21 +++ crates/observe/Cargo.toml | 3 +- crates/observe/build.rs | 4 + crates/observe/src/lib.rs | 3 + crates/observe/src/tracing.rs | 86 ++++++++--- crates/observe/src/tracing_reload_handler.rs | 113 ++++++++++++++ crates/orderbook/src/run.rs | 3 +- 10 files changed, 363 insertions(+), 21 deletions(-) create mode 100644 .cargo/config.toml create mode 100644 crates/observe/build.rs create mode 100644 crates/observe/src/tracing_reload_handler.rs diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000000..bff29e6e17 --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,2 @@ +[build] +rustflags = ["--cfg", "tokio_unstable"] diff --git a/Cargo.lock b/Cargo.lock index 457c5215cc..1a9eaab365 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1330,6 +1330,44 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b6a852b24ab71dffc585bcb46eaf7959d175cb865a7152e35b348d1b2960422" +[[package]] +name = "console-api" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a257c22cd7e487dd4a13d413beabc512c5052f0bc048db0da6a84c3d8a6142fd" +dependencies = [ + "futures-core", + "prost", + "prost-types", + "tonic", + "tracing-core", +] + +[[package]] +name = "console-subscriber" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31c4cc54bae66f7d9188996404abdf7fdfa23034ef8e43478c8810828abad758" +dependencies = [ + "console-api", + "crossbeam-channel", + "crossbeam-utils", + "futures-task", + "hdrhistogram", + "humantime", + "prost", + "prost-types", + "serde", + "serde_json", + "thread_local", + "tokio", + "tokio-stream", + "tonic", + "tracing", + "tracing-core", + "tracing-subscriber", +] + [[package]] name = "const-oid" version = "0.9.6" @@ -1480,6 +1518,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-queue" version = "0.3.11" @@ -2311,6 +2358,19 @@ dependencies = [ "hashbrown 0.14.5", ] +[[package]] +name = "hdrhistogram" +version = "7.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d" +dependencies = [ + "base64 0.21.7", + "byteorder", + "flate2", + "nom", + "num-traits", +] + [[package]] name = "headers" version = "0.3.9" @@ -2549,6 +2609,18 @@ dependencies = [ "tokio-rustls 0.24.1", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -3196,6 +3268,7 @@ name = "observe" version = "0.1.0" dependencies = [ "atty", + "console-subscriber", "futures", "once_cell", "pin-project-lite", @@ -3605,6 +3678,38 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "prost" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "deb1435c188b76130da55f17a466d252ff7b1418b2ad3e037d127b94e3411f29" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" +dependencies = [ + "anyhow", + "itertools 0.12.1", + "proc-macro2", + "quote", + "syn 2.0.66", +] + +[[package]] +name = "prost-types" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9091c90b0a32608e984ff2fa4091273cbdd755d54935c51d520887f4a1dbd5b0" +dependencies = [ + "prost", +] + [[package]] name = "protobuf" version = "2.28.0" @@ -5002,9 +5107,20 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", + "tracing", "windows-sys 0.48.0", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-macros" version = "2.3.0" @@ -5117,6 +5233,33 @@ dependencies = [ "winnow 0.6.13", ] +[[package]] +name = "tonic" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76c4eb7a4e9ef9d4763600161f12f5070b92a578e1b634db88a6887844c91a13" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.21.7", + "bytes", + "h2", + "http 0.2.12", + "http-body 0.4.6", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.4.13" @@ -5125,9 +5268,13 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ "futures-core", "futures-util", + "indexmap 1.9.3", "pin-project", "pin-project-lite", + "rand", + "slab", "tokio", + "tokio-util", "tower-layer", "tower-service", "tracing", diff --git a/Cargo.toml b/Cargo.toml index eea3c60994..7e02f8dbf2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,7 +47,7 @@ tempfile = "3.10.1" time = { version = "0.3.36", features = ["macros"] } thiserror = "1.0.61" toml = "0.8.14" -tokio = "1.38.0" +tokio = { version = "1.38.0", features = ["tracing"] } tokio-stream = { version = "0.1.15", features = ["sync"] } tracing = "0.1.40" tracing-subscriber = "0.3.18" diff --git a/README.md b/README.md index 8f56685bc2..a379927358 100644 --- a/README.md +++ b/README.md @@ -117,6 +117,27 @@ ANVIL_IP_ADDR=0.0.0.0 anvil \ --timestamp 1577836800 ``` +### Profiling + +The most important binaries support [tokio-console](https://github.com/tokio-rs/console) to allow you a could look inside the tokio runtime. + +Simply enable the feature by passing `--enable-tokio-console true` when running a binary and then in another shell, run + +``` +cargo install --locked tokio-console +tokio-console +``` + + +### Changing Log Filters + +It's possible to change the tracing log filter while the process is running. This can be useful to debug an error that requires more verbose logs but which might no longer appear after restarting the system. + +Each process opens a UNIX socket at `/tmp/log_filter_override__.sock`. To change the log filter connect to it with `nc -U ` and enter a new log filter. +You can also reset the log filter to the filter the program was initially started with by entering `reset`. + +See [here](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html#directives) for documentation on the supported log filter format. + ## Running the Services Locally ### Prerequisites diff --git a/crates/observe/Cargo.toml b/crates/observe/Cargo.toml index e4db7decc4..3fb1588a39 100644 --- a/crates/observe/Cargo.toml +++ b/crates/observe/Cargo.toml @@ -6,7 +6,8 @@ edition = "2021" license = "MIT OR Apache-2.0" [dependencies] -atty = "0.2.14" +atty = "0.2" +console-subscriber = "0.3.0" futures = { workspace = true } once_cell = { workspace = true } pin-project-lite = "0.2.14" diff --git a/crates/observe/build.rs b/crates/observe/build.rs new file mode 100644 index 0000000000..b0ce3de9b9 --- /dev/null +++ b/crates/observe/build.rs @@ -0,0 +1,4 @@ +fn main() { + // Make build system aware of custom config flags to avoid clippy warnings + println!("cargo::rustc-check-cfg=cfg(tokio_unstable)"); +} diff --git a/crates/observe/src/lib.rs b/crates/observe/src/lib.rs index e54bd43dc0..2362e62415 100644 --- a/crates/observe/src/lib.rs +++ b/crates/observe/src/lib.rs @@ -6,3 +6,6 @@ pub mod metrics; pub mod panic_hook; pub mod request_id; pub mod tracing; + +#[cfg(unix)] +mod tracing_reload_handler; diff --git a/crates/observe/src/tracing.rs b/crates/observe/src/tracing.rs index 606052e34c..6efe47f414 100644 --- a/crates/observe/src/tracing.rs +++ b/crates/observe/src/tracing.rs @@ -1,8 +1,15 @@ use { + crate::tracing_reload_handler::spawn_reload_handler, std::{panic::PanicInfo, sync::Once}, time::macros::format_description, tracing::level_filters::LevelFilter, - tracing_subscriber::fmt::{time::UtcTime, writer::MakeWriterExt as _}, + tracing_subscriber::{ + fmt::{time::UtcTime, writer::MakeWriterExt as _}, + prelude::*, + util::SubscriberInitExt, + EnvFilter, + Layer, + }, }; /// Initializes tracing setup that is shared between the binaries. @@ -28,22 +35,67 @@ pub fn initialize_reentrant(env_filter: &str) { } fn set_tracing_subscriber(env_filter: &str, stderr_threshold: LevelFilter) { - // This is what kibana uses to separate multi line log messages. - let subscriber_builder = tracing_subscriber::fmt::fmt() - .with_timer(UtcTime::new(format_description!( - "[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z" - ))) - .with_env_filter(env_filter) - .with_ansi(atty::is(atty::Stream::Stdout)); - match stderr_threshold.into_level() { - Some(threshold) => subscriber_builder - .with_writer( - std::io::stderr - .with_max_level(threshold) - .or_else(std::io::stdout), - ) - .init(), - None => subscriber_builder.init(), + let initial_filter = env_filter.to_string(); + + // The `tracing` APIs are heavily generic to enable zero overhead. Unfortunately + // this leads to very annoying type constraints which can only be satisfied + // by literally copy and pasting the code so the compiler doesn't try to + // infer types that satisfy both the tokio-console and the regular case. + // It's tempting to resolve this mess by first configuring the `fmt_layer` and + // only then the `console_subscriber`. However, this setup was the only way + // I found that: + // 1. actually makes `tokio-console` work + // 2. prints logs if `tokio-console` is disabled + // 3. does NOT skip the next log following a `tracing::event!()`. These calls + // happen for example under the hood in `sqlx`. I don't understand what's + // actually causing that but at this point I'm just happy if all the features + // work correctly. + macro_rules! fmt_layer { + ($env_filter:expr, $stderr_threshold:expr) => {{ + tracing_subscriber::fmt::layer() + .with_writer( + std::io::stdout + .with_min_level( + $stderr_threshold + .into_level() + .unwrap_or(tracing::Level::ERROR), + ) + .or_else(std::io::stderr), + ) + .with_timer(UtcTime::new(format_description!( + "[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z" + ))) + .with_ansi(atty::is(atty::Stream::Stdout)) + .with_filter($env_filter) + }}; + } + + if cfg!(tokio_unstable) { + let (env_filter, reload_handle) = + tracing_subscriber::reload::Layer::new(EnvFilter::new(&initial_filter)); + + tracing_subscriber::registry() + .with(console_subscriber::spawn()) + .with(fmt_layer!(env_filter, stderr_threshold)) + .init(); + + if cfg!(unix) { + spawn_reload_handler(initial_filter, reload_handle); + } + } else { + let (env_filter, reload_handle) = + tracing_subscriber::reload::Layer::new(EnvFilter::new(&initial_filter)); + + tracing_subscriber::registry() + // Without this the subscriber ignores the next log after an `tracing::event!()` which + // `sqlx` uses under the hood. + .with(tracing::level_filters::LevelFilter::TRACE) + .with(fmt_layer!(env_filter, stderr_threshold)) + .init(); + + if cfg!(unix) { + spawn_reload_handler(initial_filter, reload_handle); + } } } diff --git a/crates/observe/src/tracing_reload_handler.rs b/crates/observe/src/tracing_reload_handler.rs new file mode 100644 index 0000000000..71d5f3c817 --- /dev/null +++ b/crates/observe/src/tracing_reload_handler.rs @@ -0,0 +1,113 @@ +use { + tokio::{ + io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, + net::{UnixListener, UnixStream}, + }, + tracing_subscriber::{reload, EnvFilter}, +}; + +/// Spawns a new thread that listens for connections to a UNIX socket +/// at "/tmp/log_filter_override__". +/// Whenever a line gets writtedn to that socket the reload handler +/// uses it as the new log filter. +/// To reset to the original log filter send the message "reset". +pub(crate) fn spawn_reload_handler( + initial_filter: String, + reload_handle: reload::Handle, +) { + tokio::spawn(async move { + let id = std::process::id(); + let name = binary_name().unwrap_or_default(); + + let socket_path = format!("/tmp/log_filter_override_{name}_{id}.sock"); + tracing::warn!(file = socket_path, "open log filter reload socket"); + let handle = SocketHandle { + listener: UnixListener::bind(&socket_path).expect("socket handle is unique"), + socket_path, + }; + + loop { + handle_connection(&handle.listener, &initial_filter, &reload_handle).await; + } + }); +} + +struct SocketHandle { + socket_path: String, + listener: UnixListener, +} + +impl Drop for SocketHandle { + fn drop(&mut self) { + let _ = std::fs::remove_file(&self.socket_path); + } +} + +fn binary_name() -> Option { + Some( + std::env::current_exe() + .ok()? + .file_name()? + .to_str()? + .to_string(), + ) +} + +async fn handle_connection( + listener: &UnixListener, + initial_filter: &str, + reload_handle: &reload::Handle, +) { + let Ok((mut socket, _addr)) = listener.accept().await else { + tracing::warn!("failed to accept UNIX socket connection"); + return; + }; + + let _ = socket + .write_all(format!("log filter on process startup was: {initial_filter:?}\n",).as_bytes()) + .await; + + loop { + let message = read_line(&mut socket).await; + + let filter = match message.as_deref() { + Some("") => { + log(&mut socket, "client terminated connection".into()).await; + break; + } + None => { + log(&mut socket, "failed to read message from socket".into()).await; + continue; + } + Some("reset") => initial_filter, + Some(message) => message, + }; + + let Ok(env_filter) = EnvFilter::try_new(filter) else { + log(&mut socket, format!("failed to parse filter: {filter:?}")).await; + continue; + }; + + match reload_handle.reload(env_filter) { + Ok(_) => log(&mut socket, format!("applied new filter: {filter:?}")).await, + Err(err) => log(&mut socket, format!("failed to apply filter: {err:?}")).await, + } + } +} + +async fn read_line(socket: &mut UnixStream) -> Option { + let mut reader = BufReader::new(socket); + let mut buffer = String::new(); + reader.read_line(&mut buffer).await.ok()?; + Some(buffer.trim().to_owned()) +} + +/// Logs the message in this process' logs and reports it back to the +/// connected socket. +async fn log(socket: &mut UnixStream, message: String) { + // Use a fairly high log level to improve chances that this actually gets logged + // when somebody messed with the log filter. + tracing::warn!(message); + let _ = socket.write_all(message.as_bytes()).await; + let _ = socket.write_all(b"\n").await; +} diff --git a/crates/orderbook/src/run.rs b/crates/orderbook/src/run.rs index 24b4e01e44..bf435e797b 100644 --- a/crates/orderbook/src/run.rs +++ b/crates/orderbook/src/run.rs @@ -412,9 +412,8 @@ pub async fn run(args: Arguments) { shutdown_sender.send(()).expect("failed to send shutdown signal"); match tokio::time::timeout(Duration::from_secs(10), serve_api).await { Ok(inner) => inner.expect("API failed during shutdown"), - Err(_) => tracing::error!("API shutdown exceeded timeout"), + Err(_) => panic!("API shutdown exceeded timeout"), } - std::process::exit(0); } }; }