Skip to content

Commit

Permalink
chore(main): reorganize background jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
corybuecker committed Oct 7, 2024
1 parent 108c011 commit 339d6f4
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 92 deletions.
1 change: 1 addition & 0 deletions src/jobs.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod clear_sessions;
pub mod convert_goals;
26 changes: 26 additions & 0 deletions src/jobs/clear_sessions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use crate::{models::user::User, mongo_client};
use bson::doc;
use chrono::Utc;
use tracing::{error, info};

pub async fn clear_sessions() {
info!("clearing old sessions at {}", Utc::now());
let mongo = mongo_client().await.unwrap();

let update_result = mongo
.default_database()
.unwrap()
.collection::<User>("users")
.update_many(
doc! {},
doc! {"$pull": doc! {"sessions": doc! {"expiration": doc! { "$lte": Utc::now()}}}},
)
.await;

match update_result {
Ok(_) => {}
Err(err) => {
error!("{}", err);
}
}
}
18 changes: 12 additions & 6 deletions src/jobs/convert_goals.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
use crate::models::{envelope::Envelope, goal::Goal};
use crate::{
models::{envelope::Envelope, goal::Goal},
mongo_client,
};
use bson::{doc, oid::ObjectId};
use chrono::Utc;
use mongodb::ClientSession;
use std::str::FromStr;
use tracing::info;

pub async fn convert_goals() -> Result<f64, mongodb::error::Error> {
info!("converting goals to envelopes at {}", Utc::now());

let mongo = mongo_client().await?;
let mut session = mongo.start_session().await?;

pub async fn convert_goals(mut session: ClientSession) -> Result<f64, mongodb::error::Error> {
session.start_transaction().await?;

let envelopes = session
Expand Down Expand Up @@ -101,9 +109,7 @@ mod tests {
.await
.unwrap();

let session = client.start_session().await.unwrap();

match convert_goals(session).await {
match convert_goals().await {
Ok(result) => println!("{}", result),
Err(error) => println!("conversion error: {}", error),
};
Expand Down
93 changes: 7 additions & 86 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,13 @@ use axum::{
};
use axum_extra::extract::cookie::Key;
use bson::doc;
use chrono::{Local, Utc};
use models::user::User;
use jobs::{
clear_sessions::{clear_sessions},
convert_goals::convert_goals,
};
use mongodb::Client;
use serde::Serialize;
use std::{
collections::HashMap,
env,
time::{Duration, SystemTime},
};
use std::{env, time::Duration};
use tera::{Context, Tera};
use tokio::{
select,
Expand All @@ -27,6 +25,7 @@ use tokio::{
sync::mpsc,
time::interval,
};
use utilities::tera::{digest_asset, extract_id};
mod authenticated;
use tower_http::{
services::ServeDir,
Expand Down Expand Up @@ -68,92 +67,14 @@ impl FromRef<SharedState> for Key {
}
}

pub fn extract_id() -> impl tera::Filter {
return |value: &tera::Value,
args: &HashMap<String, tera::Value>|
-> tera::Result<tera::Value> {
debug!("{}", value);
debug!("{:#?}", args);

let id = value.get("_id");

match id {
None => Err(tera::Error::msg("could not find id field".to_string())),
Some(id) => Ok(tera::Value::String(
id["$oid"].to_string().replace("\"", ""),
)),
}
};
}

pub fn digest_asset() -> impl tera::Function {
let key = SystemTime::now();
let key = key
.duration_since(SystemTime::UNIX_EPOCH)
.expect("could not generate asset timestamp");
let key = key.as_secs().to_string();

return move |args: &HashMap<String, tera::Value>| -> tera::Result<tera::Value> {
match args.get("file") {
Some(file) => {
let mut path = "/assets/".to_string();

let Some(file) = file.as_str() else {
return Err("".to_string().into());
};

path.push_str(file);
path.push_str("?v=");
path.push_str(&key);

Ok(path.into())
}
None => Err("".to_string().into()),
}
};
}

async fn current_time() {
println!("{}", Local::now())
}

async fn clear_sessions() {
let mongo = mongo_client().await.unwrap();

let update_result = mongo
.default_database()
.unwrap()
.collection::<User>("users")
.update_many(
doc! {},
doc! {"$pull": doc! {"sessions": doc! {"expiration": doc! { "$lte": Utc::now()}}}},
)
.await;

match update_result {
Ok(_) => {}
Err(err) => {
println!("{}", err);
}
}
}

async fn convert_goals_wrapper() {
let mongo = mongo_client().await.unwrap();
let _ = jobs::convert_goals::convert_goals(mongo.start_session().await.unwrap()).await;
}

fn start_background_jobs() -> tokio::task::JoinHandle<()> {
spawn(async {
let mut interval = interval(Duration::from_millis(60000));

loop {
let h1 = async { current_time().await };
let h2 = async { current_time().await };

interval.tick().await;

tokio::join!(h1, h2, clear_sessions(), convert_goals_wrapper());
let _result = tokio::join!(clear_sessions(), convert_goals());
}
})
}
Expand Down
1 change: 1 addition & 0 deletions src/utilities.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod dates;
pub mod tera;
46 changes: 46 additions & 0 deletions src/utilities/tera.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use std::{collections::HashMap, time::SystemTime};
use tera::{Filter, Value};
use tracing::debug;

pub fn extract_id() -> impl Filter {
return |value: &Value, args: &HashMap<String, tera::Value>| -> tera::Result<tera::Value> {
debug!("{}", value);
debug!("{:#?}", args);

let id = value.get("_id");

match id {
None => Err(tera::Error::msg("could not find id field".to_string())),
Some(id) => Ok(tera::Value::String(
id["$oid"].to_string().replace("\"", ""),
)),
}
};
}

pub fn digest_asset() -> impl tera::Function {
let key = SystemTime::now();
let key = key
.duration_since(SystemTime::UNIX_EPOCH)
.expect("could not generate asset timestamp");
let key = key.as_secs().to_string();

return move |args: &HashMap<String, tera::Value>| -> tera::Result<tera::Value> {
match args.get("file") {
Some(file) => {
let mut path = "/assets/".to_string();

let Some(file) = file.as_str() else {
return Err("".to_string().into());
};

path.push_str(file);
path.push_str("?v=");
path.push_str(&key);

Ok(path.into())
}
None => Err("".to_string().into()),
}
};
}

0 comments on commit 339d6f4

Please sign in to comment.