Skip to content

Commit

Permalink
run cleanup schedulers in HA mode on the leader only
Browse files Browse the repository at this point in the history
  • Loading branch information
sebadob committed Oct 9, 2023
1 parent d2d9271 commit 6046ae3
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 45 deletions.
35 changes: 8 additions & 27 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ panic = "abort"

[workspace.dependencies]
sqlx = { version = "0.7", features = ["macros", "migrate", "postgres", "runtime-tokio", "sqlite", "tls-rustls", "uuid"] }
redhac = "0.8.0"
2 changes: 1 addition & 1 deletion rauthy-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ lazy_static = "1"
once_cell = "1"
rand = "0.8"
rand_core = { version = "0.6", features = ["std"] }
redhac = "0.7"
redhac = { workspace = true }
regex = "1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
Expand Down
2 changes: 1 addition & 1 deletion rauthy-handlers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ num_cpus = "1"
rauthy-common = { path = "../rauthy-common" }
rauthy-models = { path = "../rauthy-models" }
rauthy-service = { path = "../rauthy-service" }
redhac = "0.7"
redhac = { workspace = true }
rust-embed = { version = "6.8", features = ["actix-web", "tokio"] }
time = { version = "0.3", features = ["formatting", "local-offset", "macros", "parsing", "serde"] }
tracing = { version = "0.1", features = ["attributes"] }
Expand Down
2 changes: 1 addition & 1 deletion rauthy-main/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ rauthy-common = { path = "../rauthy-common" }
rauthy-handlers = { path = "../rauthy-handlers" }
rauthy-models = { path = "../rauthy-models" }
rauthy-service = { path = "../rauthy-service" }
redhac = "0.7"
redhac = { workspace = true }
rustls = "0.21"
rustls-pemfile = "1"
serde_json = "1"
Expand Down
99 changes: 86 additions & 13 deletions rauthy-main/src/schedulers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,28 @@ use rauthy_models::entity::refresh_tokens::RefreshToken;
use rauthy_models::entity::sessions::Session;
use rauthy_models::entity::users::User;
use rauthy_models::migration::backup_db;
use redhac::cache_del;
use redhac::{cache_del, QuorumHealthState, QuorumState};
use std::collections::HashSet;
use std::env;
use std::ops::{Add, Sub};
use std::str::FromStr;
use std::time::Duration;
use tokio::sync::watch::Receiver;
use tokio::time;
use tracing::{debug, error, info};

pub async fn scheduler_main(data: web::Data<AppState>) {
info!("Starting schedulers");

let rx_health = data.caches.ha_cache_config.rx_health_state.clone();

tokio::spawn(db_backup(data.db.clone()));
tokio::spawn(magic_link_cleanup(data.db.clone()));
tokio::spawn(refresh_tokens_cleanup(data.db.clone()));
tokio::spawn(sessions_cleanup(data.db.clone()));
tokio::spawn(jwks_cleanup(data.clone()));
tokio::spawn(password_expiry_checker(data.clone()));
tokio::spawn(user_expiry_checker(data));
tokio::spawn(magic_link_cleanup(data.db.clone(), rx_health.clone()));
tokio::spawn(refresh_tokens_cleanup(data.db.clone(), rx_health.clone()));
tokio::spawn(sessions_cleanup(data.db.clone(), rx_health.clone()));
tokio::spawn(jwks_cleanup(data.clone(), rx_health.clone()));
tokio::spawn(password_expiry_checker(data.clone(), rx_health.clone()));
tokio::spawn(user_expiry_checker(data, rx_health));
}

// TODO -> adapt to RDBMS
Expand Down Expand Up @@ -67,12 +70,22 @@ pub async fn db_backup(db: DbPool) {
// Cleans up old / expired magic links and deletes users, that have never used their
// 'set first ever password' magic link to keep the database clean in case of an open user registration.
// Runs every 6 hours.
pub async fn magic_link_cleanup(db: DbPool) {
pub async fn magic_link_cleanup(db: DbPool, rx_health: Receiver<Option<QuorumHealthState>>) {
let mut interval = time::interval(Duration::from_secs(3600 * 6));

loop {
interval.tick().await;

// will return None in a non-HA deployment
if let Some(is_ha_leader) = is_ha_leader(&rx_health) {
if !is_ha_leader {
debug!("Running HA mode without being the leader - skipping magic_link_cleanup scheduler");
continue;
}
}

// if this channel has an error, just go on and ignore it -> no HA mode
// if let Ok(cache_state) = rx_health.recv_async().await {}
debug!("Running magic_link_cleanup scheduler");

// allow 300 seconds of clock skew before cleaning up magic links
Expand Down Expand Up @@ -119,12 +132,24 @@ pub async fn magic_link_cleanup(db: DbPool) {
// Checks soon expiring passwords and notifies the user accordingly.
// Runs once every night at 04:30.
// TODO modify somehow to prevent multiple E-Mails in a HA deployment
pub async fn password_expiry_checker(data: web::Data<AppState>) {
pub async fn password_expiry_checker(
data: web::Data<AppState>,
rx_health: Receiver<Option<QuorumHealthState>>,
) {
// sec min hour day_of_month month day_of_week year
let schedule = cron::Schedule::from_str("0 30 4 * * * *").unwrap();

loop {
sleep_schedule_next(&schedule).await;

// will return None in a non-HA deployment
if let Some(is_ha_leader) = is_ha_leader(&rx_health) {
if !is_ha_leader {
debug!("Running HA mode without being the leader - skipping password_expiry_checker scheduler");
continue;
}
}

debug!("Running password_expiry_checker scheduler");

// warns, if the duration until the expiry is between 9 and 10 days, to only warn once
Expand Down Expand Up @@ -159,7 +184,10 @@ pub async fn password_expiry_checker(data: web::Data<AppState>) {
}

// Checks for expired users
pub async fn user_expiry_checker(data: web::Data<AppState>) {
pub async fn user_expiry_checker(
data: web::Data<AppState>,
rx_health: Receiver<Option<QuorumHealthState>>,
) {
let secs = env::var("SCHED_USER_EXP_MINS")
.unwrap_or_else(|_| "60".to_string())
.parse::<u64>()
Expand All @@ -178,6 +206,15 @@ pub async fn user_expiry_checker(data: web::Data<AppState>) {

loop {
interval.tick().await;

// will return None in a non-HA deployment
if let Some(is_ha_leader) = is_ha_leader(&rx_health) {
if !is_ha_leader {
debug!("Running HA mode without being the leader - skipping user_expiry_checker scheduler");
continue;
}
}

debug!("Running user_expiry_checker scheduler");

match User::find_expired(&data).await {
Expand Down Expand Up @@ -243,12 +280,20 @@ pub async fn user_expiry_checker(data: web::Data<AppState>) {
}

// Cleans up old / expired / already used Refresh Tokens
pub async fn refresh_tokens_cleanup(db: DbPool) {
pub async fn refresh_tokens_cleanup(db: DbPool, rx_health: Receiver<Option<QuorumHealthState>>) {
let mut interval = time::interval(Duration::from_secs(3600 * 3));

loop {
interval.tick().await;

// will return None in a non-HA deployment
if let Some(is_ha_leader) = is_ha_leader(&rx_health) {
if !is_ha_leader {
debug!("Running HA mode without being the leader - skipping refresh_tokens_cleanup scheduler");
continue;
}
}

debug!("Running refresh_tokens_cleanup scheduler");

let now = OffsetDateTime::now_utc().unix_timestamp();
Expand All @@ -265,12 +310,20 @@ pub async fn refresh_tokens_cleanup(db: DbPool) {
}

// Cleans up old / expired Sessions
pub async fn sessions_cleanup(db: DbPool) {
pub async fn sessions_cleanup(db: DbPool, rx_health: Receiver<Option<QuorumHealthState>>) {
let mut interval = time::interval(Duration::from_secs(3595 * 2));

loop {
interval.tick().await;

// will return None in a non-HA deployment
if let Some(is_ha_leader) = is_ha_leader(&rx_health) {
if !is_ha_leader {
debug!("Running HA mode without being the leader - skipping sessions_cleanup scheduler");
continue;
}
}

debug!("Running sessions_cleanup scheduler");

let thres = OffsetDateTime::now_utc()
Expand All @@ -290,12 +343,25 @@ pub async fn sessions_cleanup(db: DbPool) {
}

// Cleans up old / expired JWKSs
pub async fn jwks_cleanup(data: web::Data<AppState>) {
pub async fn jwks_cleanup(
data: web::Data<AppState>,
rx_health: Receiver<Option<QuorumHealthState>>,
) {
let mut interval = time::interval(Duration::from_secs(3600 * 24));

loop {
interval.tick().await;

// will return None in a non-HA deployment
if let Some(is_ha_leader) = is_ha_leader(&rx_health) {
if !is_ha_leader {
debug!(
"Running HA mode without being the leader - skipping jwks_cleanup scheduler"
);
continue;
}
}

debug!("Running jwks_cleanup scheduler");

let cleanup_threshold = OffsetDateTime::now_utc()
Expand Down Expand Up @@ -372,3 +438,10 @@ async fn sleep_schedule_next(schedule: &cron::Schedule) {
// we are adding a future date here --> safe to cast from i64 to u64
time::sleep(Duration::from_secs(until.num_seconds() as u64)).await;
}

fn is_ha_leader(rx_health: &Receiver<Option<QuorumHealthState>>) -> Option<bool> {
let health_state = rx_health.borrow();
health_state
.as_ref()
.map(|state| state.state == QuorumState::Leader)
}
2 changes: 1 addition & 1 deletion rauthy-models/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ openssl-sys = { version = "0.9", features = ["vendored"] }
rand = "0.8"
rand_core = { version = "0.6", features = ["std"] }
rauthy-common = { path = "../rauthy-common" }
redhac = "0.7"
redhac = { workspace = true }
regex = "1"
ring = "0.16"
semver = { version = "1.0.19", features = ["serde"] }
Expand Down
2 changes: 1 addition & 1 deletion rauthy-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ rand = "0.8"
rand_core = { version = "0.6", features = ["std"] }
rauthy-common = { path = "../rauthy-common" }
rauthy-models = { path = "../rauthy-models" }
redhac = "0.7"
redhac = { workspace = true }
ring = "0.16"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
Expand Down

0 comments on commit 6046ae3

Please sign in to comment.