Skip to content

Commit

Permalink
Actually consume jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
sandhose committed Oct 31, 2024
1 parent e0fd4d6 commit 6add451
Show file tree
Hide file tree
Showing 12 changed files with 848 additions and 1,084 deletions.
130 changes: 128 additions & 2 deletions crates/storage-pg/src/queue/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@
//! [`QueueJobRepository`].

use async_trait::async_trait;
use mas_storage::{queue::QueueJobRepository, Clock};
use mas_storage::{
queue::{Job, QueueJobRepository, Worker},
Clock,
};
use rand::RngCore;
use sqlx::PgConnection;
use ulid::Ulid;
use uuid::Uuid;

use crate::{DatabaseError, ExecuteExt};
use crate::{DatabaseError, DatabaseInconsistencyError, ExecuteExt};

/// An implementation of [`QueueJobRepository`] for a PostgreSQL connection.
pub struct PgQueueJobRepository<'c> {
Expand All @@ -29,6 +32,37 @@ impl<'c> PgQueueJobRepository<'c> {
}
}

struct JobReservationResult {
queue_job_id: Uuid,
queue_name: String,
payload: serde_json::Value,
metadata: serde_json::Value,
}

impl TryFrom<JobReservationResult> for Job {
type Error = DatabaseInconsistencyError;

fn try_from(value: JobReservationResult) -> Result<Self, Self::Error> {
let id = value.queue_job_id.into();
let queue_name = value.queue_name;
let payload = value.payload;

let metadata = serde_json::from_value(value.metadata).map_err(|e| {
DatabaseInconsistencyError::on("queue_jobs")
.column("metadata")
.row(id)
.source(e)
})?;

Ok(Self {
id,
queue_name,
payload,
metadata,
})
}
}

#[async_trait]
impl<'c> QueueJobRepository for PgQueueJobRepository<'c> {
type Error = DatabaseError;
Expand Down Expand Up @@ -73,4 +107,96 @@ impl<'c> QueueJobRepository for PgQueueJobRepository<'c> {

Ok(())
}

#[tracing::instrument(
name = "db.queue_job.reserve",
skip_all,
fields(
db.query.text,
),
err,
)]
async fn reserve(
&mut self,
clock: &dyn Clock,
worker: &Worker,
queues: &[&str],
count: usize,
) -> Result<Vec<Job>, Self::Error> {
let now = clock.now();
let max_count = i64::try_from(count).unwrap_or(i64::MAX);
let queues: Vec<String> = queues.iter().map(|&s| s.to_owned()).collect();
let results = sqlx::query_as!(
JobReservationResult,
r#"
-- We first grab a few jobs that are available,
-- using a FOR UPDATE SKIP LOCKED so that this can be run concurrently
-- and we don't get multiple workers grabbing the same jobs
WITH locked_jobs AS (
SELECT queue_job_id
FROM queue_jobs
WHERE
status = 'available'
AND queue_name = ANY($1)
ORDER BY queue_job_id ASC
LIMIT $2
FOR UPDATE
SKIP LOCKED
)
-- then we update the status of those jobs to 'running', returning the job details
UPDATE queue_jobs
SET status = 'running', started_at = $3, started_by = $4
FROM locked_jobs
WHERE queue_jobs.queue_job_id = locked_jobs.queue_job_id
RETURNING
queue_jobs.queue_job_id,
queue_jobs.queue_name,
queue_jobs.payload,
queue_jobs.metadata
"#,
&queues,
max_count,
now,
Uuid::from(worker.id),
)
.traced()
.fetch_all(&mut *self.conn)
.await?;

let jobs = results
.into_iter()
.map(TryFrom::try_from)
.collect::<Result<Vec<_>, _>>()?;

Ok(jobs)
}

#[tracing::instrument(
name = "db.queue_job.mark_as_completed",
skip_all,
fields(
db.query.text,
job.id = %id,
),
err,
)]
async fn mark_as_completed(&mut self, clock: &dyn Clock, id: Ulid) -> Result<(), Self::Error> {
let now = clock.now();
let res = sqlx::query!(
r#"
UPDATE queue_jobs
SET status = 'completed', completed_at = $1
WHERE queue_job_id = $2 AND status = 'running'
"#,
now,
Uuid::from(id),
)
.traced()
.execute(&mut *self.conn)
.await?;

DatabaseError::ensure_affected_rows(&res, 1)?;

Ok(())
}
}
48 changes: 47 additions & 1 deletion crates/storage/src/queue/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,17 @@ use serde::{Deserialize, Serialize};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use ulid::Ulid;

use super::Worker;
use crate::{repository_impl, Clock};

/// Represents a job in the job queue
pub struct Job {
/// The ID of the job
pub id: Ulid,

/// The queue on which the job was placed
pub queue_name: String,

/// The payload of the job
pub payload: serde_json::Value,

Expand All @@ -27,7 +31,7 @@ pub struct Job {
}

/// Metadata stored alongside the job
#[derive(Serialize, Deserialize, Default)]
#[derive(Serialize, Deserialize, Default, Clone, Debug)]
pub struct JobMetadata {
#[serde(default)]
trace_id: String,
Expand Down Expand Up @@ -89,6 +93,38 @@ pub trait QueueJobRepository: Send + Sync {
payload: serde_json::Value,
metadata: serde_json::Value,
) -> Result<(), Self::Error>;

/// Reserve multiple jobs from multiple queues
///
/// # Parameters
///
/// * `clock` - The clock used to generate timestamps
/// * `worker` - The worker that is reserving the jobs
/// * `queues` - The queues to reserve jobs from
/// * `count` - The number of jobs to reserve
///
/// # Errors
///
/// Returns an error if the underlying repository fails.
async fn reserve(
&mut self,
clock: &dyn Clock,
worker: &Worker,
queues: &[&str],
count: usize,
) -> Result<Vec<Job>, Self::Error>;

/// Mark a job as completed
///
/// # Parameters
///
/// * `clock` - The clock used to generate timestamps
/// * `job` - The job to mark as completed
///
/// # Errors
///
/// Returns an error if the underlying repository fails.
async fn mark_as_completed(&mut self, clock: &dyn Clock, id: Ulid) -> Result<(), Self::Error>;
}

repository_impl!(QueueJobRepository:
Expand All @@ -100,6 +136,16 @@ repository_impl!(QueueJobRepository:
payload: serde_json::Value,
metadata: serde_json::Value,
) -> Result<(), Self::Error>;

async fn reserve(
&mut self,
clock: &dyn Clock,
worker: &Worker,
queues: &[&str],
count: usize,
) -> Result<Vec<Job>, Self::Error>;

async fn mark_as_completed(&mut self, clock: &dyn Clock, id: Ulid) -> Result<(), Self::Error>;
);

/// Extension trait for [`QueueJobRepository`] to help adding a job to the queue
Expand Down
Loading

0 comments on commit 6add451

Please sign in to comment.