Skip to content

Commit

Permalink
Move the jobs types in the queue module
Browse files Browse the repository at this point in the history
  • Loading branch information
sandhose committed Nov 19, 2024
1 parent b23490d commit 5a1cf22
Show file tree
Hide file tree
Showing 25 changed files with 463 additions and 483 deletions.
5 changes: 2 additions & 3 deletions crates/cli/src/commands/manage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ use mas_matrix::HomeserverConnection;
use mas_matrix_synapse::SynapseConnection;
use mas_storage::{
compat::{CompatAccessTokenRepository, CompatSessionFilter, CompatSessionRepository},
job::{
DeactivateUserJob, JobRepositoryExt, ProvisionUserJob, ReactivateUserJob, SyncDevicesJob,
},
job::JobRepositoryExt,
oauth2::OAuth2SessionFilter,
queue::{DeactivateUserJob, ProvisionUserJob, ReactivateUserJob, SyncDevicesJob},
user::{BrowserSessionFilter, UserEmailRepository, UserPasswordRepository, UserRepository},
Clock, RepositoryAccess, SystemClock,
};
Expand Down
5 changes: 1 addition & 4 deletions crates/handlers/src/admin/v1/users/add.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@ use aide::{transform::TransformOperation, NoApi, OperationIo};
use axum::{extract::State, response::IntoResponse, Json};
use hyper::StatusCode;
use mas_matrix::BoxHomeserverConnection;
use mas_storage::{
job::{JobRepositoryExt, ProvisionUserJob},
BoxRng,
};
use mas_storage::{job::JobRepositoryExt, queue::ProvisionUserJob, BoxRng};
use schemars::JsonSchema;
use serde::Deserialize;
use tracing::warn;
Expand Down
2 changes: 1 addition & 1 deletion crates/handlers/src/admin/v1/users/deactivate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use aide::{transform::TransformOperation, OperationIo};
use axum::{response::IntoResponse, Json};
use hyper::StatusCode;
use mas_storage::job::{DeactivateUserJob, JobRepositoryExt};
use mas_storage::{job::JobRepositoryExt, queue::DeactivateUserJob};
use tracing::info;
use ulid::Ulid;

Expand Down
3 changes: 2 additions & 1 deletion crates/handlers/src/compat/logout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use mas_axum_utils::sentry::SentryEventID;
use mas_data_model::TokenType;
use mas_storage::{
compat::{CompatAccessTokenRepository, CompatSessionRepository},
job::{JobRepositoryExt, SyncDevicesJob},
job::JobRepositoryExt,
queue::SyncDevicesJob,
BoxClock, BoxRepository, Clock, RepositoryAccess,
};
use thiserror::Error;
Expand Down
4 changes: 1 addition & 3 deletions crates/handlers/src/graphql/mutations/compat_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@
use anyhow::Context as _;
use async_graphql::{Context, Enum, InputObject, Object, ID};
use mas_storage::{
compat::CompatSessionRepository,
job::{JobRepositoryExt, SyncDevicesJob},
RepositoryAccess,
compat::CompatSessionRepository, job::JobRepositoryExt, queue::SyncDevicesJob, RepositoryAccess,
};

use crate::graphql::{
Expand Down
3 changes: 2 additions & 1 deletion crates/handlers/src/graphql/mutations/oauth2_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ use async_graphql::{Context, Description, Enum, InputObject, Object, ID};
use chrono::Duration;
use mas_data_model::{Device, TokenType};
use mas_storage::{
job::{JobRepositoryExt, SyncDevicesJob},
job::JobRepositoryExt,
oauth2::{
OAuth2AccessTokenRepository, OAuth2ClientRepository, OAuth2RefreshTokenRepository,
OAuth2SessionRepository,
},
queue::SyncDevicesJob,
user::UserRepository,
RepositoryAccess,
};
Expand Down
3 changes: 2 additions & 1 deletion crates/handlers/src/graphql/mutations/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
use anyhow::Context as _;
use async_graphql::{Context, Description, Enum, InputObject, Object, ID};
use mas_storage::{
job::{DeactivateUserJob, JobRepositoryExt, ProvisionUserJob},
job::JobRepositoryExt,
queue::{DeactivateUserJob, ProvisionUserJob},
user::UserRepository,
};
use tracing::{info, warn};
Expand Down
3 changes: 2 additions & 1 deletion crates/handlers/src/graphql/mutations/user_email.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
use anyhow::Context as _;
use async_graphql::{Context, Description, Enum, InputObject, Object, ID};
use mas_storage::{
job::{JobRepositoryExt, ProvisionUserJob, VerifyEmailJob},
job::JobRepositoryExt,
queue::{ProvisionUserJob, VerifyEmailJob},
user::{UserEmailRepository, UserRepository},
RepositoryAccess,
};
Expand Down
3 changes: 1 addition & 2 deletions crates/handlers/src/oauth2/revoke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ use mas_data_model::TokenType;
use mas_iana::oauth::OAuthTokenTypeHint;
use mas_keystore::Encrypter;
use mas_storage::{
job::{JobRepositoryExt, SyncDevicesJob},
BoxClock, BoxRepository, RepositoryAccess,
job::JobRepositoryExt, queue::SyncDevicesJob, BoxClock, BoxRepository, RepositoryAccess,
};
use oauth2_types::{
errors::{ClientError, ClientErrorCode},
Expand Down
3 changes: 2 additions & 1 deletion crates/handlers/src/upstream_oauth2/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use mas_matrix::BoxHomeserverConnection;
use mas_policy::Policy;
use mas_router::UrlBuilder;
use mas_storage::{
job::{JobRepositoryExt, ProvisionUserJob},
job::JobRepositoryExt,
queue::ProvisionUserJob,
upstream_oauth2::{UpstreamOAuthLinkRepository, UpstreamOAuthSessionRepository},
user::{BrowserSessionRepository, UserEmailRepository, UserRepository},
BoxClock, BoxRepository, BoxRng, RepositoryAccess,
Expand Down
5 changes: 2 additions & 3 deletions crates/handlers/src/views/account/emails/add.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ use mas_data_model::SiteConfig;
use mas_policy::Policy;
use mas_router::UrlBuilder;
use mas_storage::{
job::{JobRepositoryExt, VerifyEmailJob},
user::UserEmailRepository,
BoxClock, BoxRepository, BoxRng,
job::JobRepositoryExt, queue::VerifyEmailJob, user::UserEmailRepository, BoxClock,
BoxRepository, BoxRng,
};
use mas_templates::{EmailAddContext, ErrorContext, TemplateContext, Templates};
use serde::Deserialize;
Expand Down
5 changes: 2 additions & 3 deletions crates/handlers/src/views/account/emails/verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ use mas_axum_utils::{
};
use mas_router::UrlBuilder;
use mas_storage::{
job::{JobRepositoryExt, ProvisionUserJob},
user::UserEmailRepository,
BoxClock, BoxRepository, BoxRng, RepositoryAccess,
job::JobRepositoryExt, queue::ProvisionUserJob, user::UserEmailRepository, BoxClock,
BoxRepository, BoxRng, RepositoryAccess,
};
use mas_templates::{EmailVerificationPageContext, TemplateContext, Templates};
use serde::Deserialize;
Expand Down
3 changes: 1 addition & 2 deletions crates/handlers/src/views/recovery/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ use mas_axum_utils::{
use mas_data_model::SiteConfig;
use mas_router::UrlBuilder;
use mas_storage::{
job::{JobRepositoryExt, SendAccountRecoveryEmailsJob},
BoxClock, BoxRepository, BoxRng,
job::JobRepositoryExt, queue::SendAccountRecoveryEmailsJob, BoxClock, BoxRepository, BoxRng,
};
use mas_templates::{EmptyContext, RecoveryProgressContext, TemplateContext, Templates};
use ulid::Ulid;
Expand Down
3 changes: 1 addition & 2 deletions crates/handlers/src/views/recovery/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ use mas_axum_utils::{
use mas_data_model::{SiteConfig, UserAgent};
use mas_router::UrlBuilder;
use mas_storage::{
job::{JobRepositoryExt, SendAccountRecoveryEmailsJob},
BoxClock, BoxRepository, BoxRng,
job::JobRepositoryExt, queue::SendAccountRecoveryEmailsJob, BoxClock, BoxRepository, BoxRng,
};
use mas_templates::{
EmptyContext, FieldError, FormError, FormState, RecoveryStartContext, RecoveryStartFormField,
Expand Down
3 changes: 2 additions & 1 deletion crates/handlers/src/views/register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use mas_matrix::BoxHomeserverConnection;
use mas_policy::Policy;
use mas_router::UrlBuilder;
use mas_storage::{
job::{JobRepositoryExt, ProvisionUserJob, VerifyEmailJob},
job::JobRepositoryExt,
queue::{ProvisionUserJob, VerifyEmailJob},
user::{BrowserSessionRepository, UserEmailRepository, UserPasswordRepository, UserRepository},
BoxClock, BoxRepository, BoxRng, RepositoryAccess,
};
Expand Down

This file was deleted.

76 changes: 1 addition & 75 deletions crates/storage-pg/src/queue/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@
//! [`QueueJobRepository`].

use async_trait::async_trait;
use mas_storage::{
queue::{Job, QueueJobRepository, Worker},
Clock,
};
use mas_storage::{queue::QueueJobRepository, Clock};
use rand::RngCore;
use sqlx::PgConnection;
use ulid::Ulid;
Expand Down Expand Up @@ -76,75 +73,4 @@ impl<'c> QueueJobRepository for PgQueueJobRepository<'c> {

Ok(())
}

#[tracing::instrument(
name = "db.queue_job.get_available",
fields(
db.query.text,
),
skip_all,
err,
)]
async fn get_available(
&mut self,
clock: &dyn Clock,
worker: &Worker,
queues: &[&str],
max_count: usize,
) -> Result<Vec<Job>, Self::Error> {
let now = clock.now();
let max_count = i64::try_from(max_count).unwrap_or(i64::MAX);
let queues: Vec<String> = queues.iter().map(|&s| s.to_owned()).collect();
sqlx::query!(
r#"
-- We first grab a few jobs that are available,
-- using a FOR UPDATE SKIP LOCKED so that this can be run concurrently
-- and we don't get multiple workers grabbing the same jobs
WITH locked_jobs AS (
SELECT queue_job_id
FROM queue_jobs
WHERE
status = 'available'
AND queue_name = ANY($1)
ORDER BY queue_job_id ASC
LIMIT $2
FOR UPDATE
SKIP LOCKED
)
-- then we update the status of those jobs to 'running', returning the job details
UPDATE queue_jobs
SET status = 'running', started_at = $3, started_by = $4
FROM locked_jobs
WHERE queue_jobs.queue_job_id = locked_jobs.queue_job_id
RETURNING
queue_jobs.queue_job_id,
queue_jobs.payload,
queue_jobs.metadata
"#,
&queues,
max_count,
now,
Uuid::from(worker.id),
)
.traced()
.fetch_all(&mut *self.conn)
.await?;

todo!()
}

#[tracing::instrument(
name = "db.queue_job.mark_completed",
fields(
queue_job.id = %job.id,
db.query.text,
),
skip_all,
err,
)]
async fn mark_completed(&mut self, clock: &dyn Clock, job: Job) -> Result<(), Self::Error> {
let _ = clock;
let _ = job;
todo!()
}
}
Loading

0 comments on commit 5a1cf22

Please sign in to comment.