Skip to content

Commit

Permalink
Test Client::clear
Browse files Browse the repository at this point in the history
  • Loading branch information
rustworthy committed Nov 22, 2024
1 parent ac9da93 commit 85993d3
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 4 deletions.
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ test/e2e:
cargo test --locked --all-features --all-targets -- \
--nocapture $(pattern)

.PHONY: test/e2e/ignored
test/e2e/ignored:
FAKTORY_URL=tcp://${FAKTORY_HOST}:${FAKTORY_PORT} \
cargo test --locked --all-features --all-targets -- \
--nocapture --include-ignored queue_control_actions_wildcard

.PHONY: test/e2e/tls
test/e2e/tls:
FAKTORY_URL_SECURE=tcp://:${FAKTORY_PASSWORD}@${FAKTORY_HOST}:${FAKTORY_PORT_SECURE} \
Expand Down
87 changes: 83 additions & 4 deletions tests/real/community.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use crate::{assert_gte, skip_check};
use chrono::Utc;
use faktory::{
Client, Job, JobBuilder, JobId, MutationFilter, MutationTarget, StopReason, Worker,
WorkerBuilder, WorkerId,
};
use serde_json::Value;
use std::{io, sync, time::Duration};
use tokio::time as tokio_time;
use std::time::Duration;
use std::{io, sync};
use tokio::time::{self as tokio_time};
use tokio_util::sync::CancellationToken;

#[tokio::test(flavor = "multi_thread")]
Expand Down Expand Up @@ -383,6 +385,26 @@ async fn queue_control_actions_wildcard() {
let local_1 = "queue_control_wildcard_1";
let local_2 = "queue_control_wildcard_2";

// prepare a client and remove any left-overs
// from the previous test run
let mut client = Client::connect().await.unwrap();
client
.requeue(
MutationTarget::Retries,
MutationFilter::builder().kind(local_1).build(),
)
.await
.unwrap();
client
.requeue(
MutationTarget::Retries,
MutationFilter::builder().kind(local_2).build(),
)
.await
.unwrap();
client.queue_remove(&[local_1]).await.unwrap();
client.queue_remove(&[local_2]).await.unwrap();

let (tx, rx) = sync::mpsc::channel();
let tx_1 = sync::Arc::new(sync::Mutex::new(tx));
let tx_2 = sync::Arc::clone(&tx_1);
Expand All @@ -402,8 +424,6 @@ async fn queue_control_actions_wildcard() {
.await
.unwrap();

let mut client = Client::connect().await.unwrap();

// enqueue two jobs on each queue
client
.enqueue_many([
Expand Down Expand Up @@ -453,6 +473,55 @@ async fn queue_control_actions_wildcard() {
// our queue are not even mentioned in the server report:
assert!(queues.get(local_1).is_none());
assert!(queues.get(local_2).is_none());

// let's also test here one bit from the Faktory MUTATION API,
// which affects the entire target set;
// for this, let's enqueue a few jobs that are not supposed to be
// consumed immediately, rather in a few minutes; this they these
// jobs will get into the `scheduled` set
let soon = Utc::now() + Duration::from_secs(2);
client
.enqueue_many([
Job::builder(local_1)
.args(vec![Value::from(1)])
.queue(local_1)
.at(soon)
.build(),
Job::builder(local_1)
.args(vec![Value::from(1)])
.queue(local_1)
.at(soon)
.build(),
Job::builder(local_2)
.args(vec![Value::from(1)])
.queue(local_2)
.at(soon)
.build(),
Job::builder(local_2)
.args(vec![Value::from(1)])
.queue(local_2)
.at(soon)
.build(),
])
.await
.unwrap();

// now, let's just clear all the scheduled jobs
client.clear(MutationTarget::Scheduled).await.unwrap();

tokio_time::sleep(Duration::from_secs(2)).await;

// the queue is empty
assert!(!worker.run_one(0, &[local_1]).await.unwrap());

// even if we force-schedule those jobs
client
.requeue(MutationTarget::Scheduled, MutationFilter::empty())
.await
.unwrap();

// still empty, meaing the jobs have been purged for good
assert!(!worker.run_one(0, &[local_1]).await.unwrap());
}

#[tokio::test(flavor = "multi_thread")]
Expand Down Expand Up @@ -819,6 +888,16 @@ async fn mutation_requeue_jobs() {
// TODO: Examine the job's failure (will need a dedicated PR)
}

#[tokio::test(flavor = "multi_thread")]
async fn mutation_kill_vs_discard() {
// Plan:
// 1. Create and push a few jobs with Job::at(..) populated
// 2. Kill them: `scheduled` -> `dead`
// 3. Re-enqueue them: `dead` -> `enqueued`
// 4. Consume-fail them: `enqueued` -> `retries`
// 5. Discard them: `retries` -> void
}

#[tokio::test(flavor = "multi_thread")]
async fn mutation_requeue_specific_jobs_only() {
skip_check!();
Expand Down

0 comments on commit 85993d3

Please sign in to comment.