Skip to content

Commit

Permalink
Schedule jobs through the new queue
Browse files Browse the repository at this point in the history
  • Loading branch information
sandhose committed Nov 19, 2024
1 parent 5a1cf22 commit 36921bd
Show file tree
Hide file tree
Showing 31 changed files with 182 additions and 699 deletions.
124 changes: 12 additions & 112 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 15 additions & 9 deletions crates/cli/src/commands/manage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ use mas_matrix::HomeserverConnection;
use mas_matrix_synapse::SynapseConnection;
use mas_storage::{
compat::{CompatAccessTokenRepository, CompatSessionFilter, CompatSessionRepository},
job::JobRepositoryExt,
oauth2::OAuth2SessionFilter,
queue::{DeactivateUserJob, ProvisionUserJob, ReactivateUserJob, SyncDevicesJob},
queue::{
DeactivateUserJob, ProvisionUserJob, QueueJobRepositoryExt as _, ReactivateUserJob,
SyncDevicesJob,
},
user::{BrowserSessionFilter, UserEmailRepository, UserPasswordRepository, UserRepository},
Clock, RepositoryAccess, SystemClock,
};
Expand Down Expand Up @@ -365,7 +367,7 @@ impl Options {
let id = id.into();
info!(user.id = %id, "Scheduling provisioning job");
let job = ProvisionUserJob::new_for_id(id);
repo.job().schedule_job(job).await?;
repo.queue_job().schedule_job(&mut rng, &clock, job).await?;
}

repo.into_inner().commit().await?;
Expand Down Expand Up @@ -428,7 +430,9 @@ impl Options {

// Schedule a job to sync the devices of the user with the homeserver
warn!("Scheduling job to sync devices for the user");
repo.job().schedule_job(SyncDevicesJob::new(&user)).await?;
repo.queue_job()
.schedule_job(&mut rng, &clock, SyncDevicesJob::new(&user))
.await?;

let txn = repo.into_inner();
if dry_run {
Expand Down Expand Up @@ -466,8 +470,8 @@ impl Options {

if deactivate {
warn!(%user.id, "Scheduling user deactivation");
repo.job()
.schedule_job(DeactivateUserJob::new(&user, false))
repo.queue_job()
.schedule_job(&mut rng, &clock, DeactivateUserJob::new(&user, false))
.await?;
}

Expand All @@ -490,8 +494,8 @@ impl Options {
.context("User not found")?;

warn!(%user.id, "User scheduling user reactivation");
repo.job()
.schedule_job(ReactivateUserJob::new(&user))
repo.queue_job()
.schedule_job(&mut rng, &clock, ReactivateUserJob::new(&user))
.await?;

repo.into_inner().commit().await?;
Expand Down Expand Up @@ -973,7 +977,9 @@ impl UserCreationRequest<'_> {
provision_job = provision_job.set_display_name(display_name);
}

repo.job().schedule_job(provision_job).await?;
repo.queue_job()
.schedule_job(rng, clock, provision_job)
.await?;

Ok(user)
}
Expand Down
28 changes: 2 additions & 26 deletions crates/cli/src/commands/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@ use mas_matrix_synapse::SynapseConnection;
use mas_router::UrlBuilder;
use mas_storage::SystemClock;
use mas_storage_pg::MIGRATOR;
use rand::{
distributions::{Alphanumeric, DistString},
thread_rng,
};
use sqlx::migrate::Migrate;
use tracing::{info, info_span, warn, Instrument};

Expand Down Expand Up @@ -161,13 +157,8 @@ impl Options {
let mailer = mailer_from_config(&config.email, &templates)?;
mailer.test_connection().await?;

#[allow(clippy::disallowed_methods)]
let mut rng = thread_rng();
let worker_name = Alphanumeric.sample_string(&mut rng, 10);

info!(worker_name, "Starting task worker");
let monitor = mas_tasks::init(
&worker_name,
info!("Starting task worker");
mas_tasks::init(
&pool,
&mailer,
homeserver_connection.clone(),
Expand All @@ -176,21 +167,6 @@ impl Options {
shutdown.task_tracker(),
)
.await?;

// XXX: The monitor from apalis is a bit annoying to use for graceful shutdowns,
// ideally we'd just give it a cancellation token
let shutdown_future = shutdown.soft_shutdown_token().cancelled_owned();
shutdown.task_tracker().spawn(async move {
if let Err(e) = monitor
.run_with_signal(async move {
shutdown_future.await;
Ok(())
})
.await
{
tracing::error!(error = &e as &dyn std::error::Error, "Task worker failed");
}
});
}

let listeners_config = config.http.listeners.clone();
Expand Down
Loading

0 comments on commit 36921bd

Please sign in to comment.