Skip to content

Commit

Permalink
retry migrations
Browse files Browse the repository at this point in the history
  • Loading branch information
sug0 committed Jul 19, 2024
1 parent fad4aa8 commit 01bc246
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 21 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ tokio-retry = "0.3"
tower = { version = "0.4", features = [ "util", "timeout", "load-shed", "limit", "buffer" ] }
tower-http = { version = "0.4.4", features = [ "compression-full", "limit", "trace", "cors" ] }
tracing = "0.1"
tracing-appender = "0.2.0"
tracing-subscriber = { version = "0.3", features = [ "env-filter" ] }
validator = { version = "0.16.0", features = ["derive"] }
xorf = { version = "0.11.0", features = ["serde"]}
1 change: 1 addition & 0 deletions block-index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ diesel_migrations.workspace = true
orm.workspace = true
shared.workspace = true
tokio.workspace = true
tracing-appender.workspace = true
tracing-subscriber.workspace = true
tracing.workspace = true
xorf.workspace = true
Expand Down
12 changes: 9 additions & 3 deletions block-index/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use clap_verbosity_flag::{InfoLevel, LevelFilter, Verbosity};
use tracing::Level;
use tracing_appender::non_blocking::NonBlocking;
use tracing_subscriber::FmtSubscriber;

#[derive(clap::Parser)]
Expand All @@ -11,7 +12,10 @@ pub struct AppConfig {
pub verbosity: Verbosity<InfoLevel>,
}

pub fn install_tracing_subscriber(verbosity: Verbosity<InfoLevel>) {
pub fn install_tracing_subscriber(
verbosity: Verbosity<InfoLevel>,
non_blocking_logger: NonBlocking,
) {
let log_level = match verbosity.log_level_filter() {
LevelFilter::Off => None,
LevelFilter::Error => Some(Level::ERROR),
Expand All @@ -21,8 +25,10 @@ pub fn install_tracing_subscriber(verbosity: Verbosity<InfoLevel>) {
LevelFilter::Trace => Some(Level::TRACE),
};
if let Some(log_level) = log_level {
let subscriber =
FmtSubscriber::builder().with_max_level(log_level).finish();
let subscriber = FmtSubscriber::builder()
.with_max_level(log_level)
.with_writer(non_blocking_logger)
.finish();
tracing::subscriber::set_global_default(subscriber).unwrap();
}
}
74 changes: 56 additions & 18 deletions block-index/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod appstate;
pub mod config;

use std::future::{self, Future};
use std::ops::ControlFlow;
use std::sync::atomic::{self, AtomicBool};
use std::sync::{Arc, Mutex};
use std::task::{Poll, Waker};
Expand All @@ -23,39 +24,81 @@ const VERSION_STRING: &str = env!("VERGEN_GIT_SHA");

// TODO: add db migrations for block index

macro_rules! exit {
() => {{
tracing::info!("Exiting...");
return Ok(());
}};
}

#[tokio::main]
async fn main() -> Result<(), MainError> {
let AppConfig {
verbosity,
database_url,
} = AppConfig::parse();

config::install_tracing_subscriber(verbosity);
let (non_blocking_logger, _worker) =
tracing_appender::non_blocking(std::io::stdout());
config::install_tracing_subscriber(verbosity, non_blocking_logger);

tracing::info!(version = VERSION_STRING, "Started the block index builder");
let mut exit_handle = must_exit();

let app_state = AppState::new(database_url).into_db_error()?;

run_migrations(&app_state).await?;
if wait_for_migrations(&mut exit_handle, &app_state)
.await
.is_break()
{
exit!();
}
build_block_indexes(&mut exit_handle, &app_state).await;

exit!();
}

async fn wait_for_migrations<F>(
mut exit_handle: F,
app_state: &AppState,
) -> ControlFlow<()>
where
F: Future<Output = ()> + Unpin,
{
while run_migrations(app_state).await.is_err() {
const SLEEP_AMOUNT: Duration = Duration::from_secs(5);

tracing::info!(after = ?SLEEP_AMOUNT, "Retrying migrations");

tokio::select! {
_ = &mut exit_handle => {
return ControlFlow::Break(());
}
_ = sleep(SLEEP_AMOUNT) => {}
}
}

ControlFlow::Continue(())
}

async fn build_block_indexes<F>(mut exit_handle: F, app_state: &AppState)
where
F: Future<Output = ()> + Unpin,
{
loop {
const SLEEP_AMOUNT: Duration = Duration::from_secs(30 * 60);

tracing::debug!(sleep = ?SLEEP_AMOUNT, "Waiting to build new block index...");

tokio::select! {
_ = &mut exit_handle => {
break;
return;
}
_ = sleep(SLEEP_AMOUNT) => {
_ = build_new_block_index(&app_state).await;
_ = build_new_block_index(app_state).await;
}
}
}

tracing::info!("Exiting...");
Ok(())
}

fn must_exit() -> impl Future<Output = ()> {
Expand All @@ -75,17 +118,12 @@ fn must_exit() -> impl Future<Output = ()> {
let mut quit = signal::unix::signal(signal::unix::SignalKind::quit())
.expect("Failed to install QUIT signal handler");

tokio::select! {
_ = interrupt.recv() => {
tracing::info!("INT signal received");
}
_ = term.recv() => {
tracing::info!("TERM signal received");
}
_ = quit.recv() => {
tracing::info!("QUIT signal received");
}
}
let signal_descriptor = tokio::select! {
_ = interrupt.recv() => "INT",
_ = term.recv() => "TERM",
_ = quit.recv() => "QUIT",
};
tracing::info!(which = signal_descriptor, "Signal received");

task_flag.store(true, atomic::Ordering::Relaxed);

Expand Down

0 comments on commit 01bc246

Please sign in to comment.