From 01bc24651df57852ccb0c84e6c0cfd1ddb30f4d8 Mon Sep 17 00:00:00 2001 From: Tiago Carvalho Date: Fri, 19 Jul 2024 21:59:01 +0100 Subject: [PATCH] retry migrations --- Cargo.toml | 1 + block-index/Cargo.toml | 1 + block-index/src/config.rs | 12 +++++-- block-index/src/main.rs | 74 +++++++++++++++++++++++++++++---------- 4 files changed, 67 insertions(+), 21 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 118dc4d..5428dbb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ tokio-retry = "0.3" tower = { version = "0.4", features = [ "util", "timeout", "load-shed", "limit", "buffer" ] } tower-http = { version = "0.4.4", features = [ "compression-full", "limit", "trace", "cors" ] } tracing = "0.1" +tracing-appender = "0.2.0" tracing-subscriber = { version = "0.3", features = [ "env-filter" ] } validator = { version = "0.16.0", features = ["derive"] } xorf = { version = "0.11.0", features = ["serde"]} diff --git a/block-index/Cargo.toml b/block-index/Cargo.toml index d707956..bd5c8ed 100644 --- a/block-index/Cargo.toml +++ b/block-index/Cargo.toml @@ -23,6 +23,7 @@ diesel_migrations.workspace = true orm.workspace = true shared.workspace = true tokio.workspace = true +tracing-appender.workspace = true tracing-subscriber.workspace = true tracing.workspace = true xorf.workspace = true diff --git a/block-index/src/config.rs b/block-index/src/config.rs index 366c4e4..1c02a47 100644 --- a/block-index/src/config.rs +++ b/block-index/src/config.rs @@ -1,5 +1,6 @@ use clap_verbosity_flag::{InfoLevel, LevelFilter, Verbosity}; use tracing::Level; +use tracing_appender::non_blocking::NonBlocking; use tracing_subscriber::FmtSubscriber; #[derive(clap::Parser)] @@ -11,7 +12,10 @@ pub struct AppConfig { pub verbosity: Verbosity, } -pub fn install_tracing_subscriber(verbosity: Verbosity) { +pub fn install_tracing_subscriber( + verbosity: Verbosity, + non_blocking_logger: NonBlocking, +) { let log_level = match verbosity.log_level_filter() { LevelFilter::Off => None, LevelFilter::Error => Some(Level::ERROR), @@ -21,8 +25,10 @@ pub fn install_tracing_subscriber(verbosity: Verbosity) { LevelFilter::Trace => Some(Level::TRACE), }; if let Some(log_level) = log_level { - let subscriber = - FmtSubscriber::builder().with_max_level(log_level).finish(); + let subscriber = FmtSubscriber::builder() + .with_max_level(log_level) + .with_writer(non_blocking_logger) + .finish(); tracing::subscriber::set_global_default(subscriber).unwrap(); } } diff --git a/block-index/src/main.rs b/block-index/src/main.rs index 7df061e..36068f1 100644 --- a/block-index/src/main.rs +++ b/block-index/src/main.rs @@ -2,6 +2,7 @@ pub mod appstate; pub mod config; use std::future::{self, Future}; +use std::ops::ControlFlow; use std::sync::atomic::{self, AtomicBool}; use std::sync::{Arc, Mutex}; use std::task::{Poll, Waker}; @@ -23,6 +24,13 @@ const VERSION_STRING: &str = env!("VERGEN_GIT_SHA"); // TODO: add db migrations for block index +macro_rules! exit { + () => {{ + tracing::info!("Exiting..."); + return Ok(()); + }}; +} + #[tokio::main] async fn main() -> Result<(), MainError> { let AppConfig { @@ -30,15 +38,53 @@ async fn main() -> Result<(), MainError> { database_url, } = AppConfig::parse(); - config::install_tracing_subscriber(verbosity); + let (non_blocking_logger, _worker) = + tracing_appender::non_blocking(std::io::stdout()); + config::install_tracing_subscriber(verbosity, non_blocking_logger); tracing::info!(version = VERSION_STRING, "Started the block index builder"); let mut exit_handle = must_exit(); let app_state = AppState::new(database_url).into_db_error()?; - run_migrations(&app_state).await?; + if wait_for_migrations(&mut exit_handle, &app_state) + .await + .is_break() + { + exit!(); + } + build_block_indexes(&mut exit_handle, &app_state).await; + + exit!(); +} + +async fn wait_for_migrations( + mut exit_handle: F, + app_state: &AppState, +) -> ControlFlow<()> +where + F: Future + Unpin, +{ + while run_migrations(app_state).await.is_err() { + const SLEEP_AMOUNT: Duration = Duration::from_secs(5); + + tracing::info!(after = ?SLEEP_AMOUNT, "Retrying migrations"); + tokio::select! { + _ = &mut exit_handle => { + return ControlFlow::Break(()); + } + _ = sleep(SLEEP_AMOUNT) => {} + } + } + + ControlFlow::Continue(()) +} + +async fn build_block_indexes(mut exit_handle: F, app_state: &AppState) +where + F: Future + Unpin, +{ loop { const SLEEP_AMOUNT: Duration = Duration::from_secs(30 * 60); @@ -46,16 +92,13 @@ async fn main() -> Result<(), MainError> { tokio::select! { _ = &mut exit_handle => { - break; + return; } _ = sleep(SLEEP_AMOUNT) => { - _ = build_new_block_index(&app_state).await; + _ = build_new_block_index(app_state).await; } } } - - tracing::info!("Exiting..."); - Ok(()) } fn must_exit() -> impl Future { @@ -75,17 +118,12 @@ fn must_exit() -> impl Future { let mut quit = signal::unix::signal(signal::unix::SignalKind::quit()) .expect("Failed to install QUIT signal handler"); - tokio::select! { - _ = interrupt.recv() => { - tracing::info!("INT signal received"); - } - _ = term.recv() => { - tracing::info!("TERM signal received"); - } - _ = quit.recv() => { - tracing::info!("QUIT signal received"); - } - } + let signal_descriptor = tokio::select! { + _ = interrupt.recv() => "INT", + _ = term.recv() => "TERM", + _ = quit.recv() => "QUIT", + }; + tracing::info!(which = signal_descriptor, "Signal received"); task_flag.store(true, atomic::Ordering::Relaxed);