From 22a4d1816d727cc1ddb7951a741f987792a6c5ca Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Sat, 23 Nov 2024 12:35:35 +0400 Subject: [PATCH] Test Client::discard --- tests/real/community.rs | 74 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 72 insertions(+), 2 deletions(-) diff --git a/tests/real/community.rs b/tests/real/community.rs index f365d635..b5646db0 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -889,7 +889,7 @@ async fn mutation_requeue_jobs() { } #[tokio::test(flavor = "multi_thread")] -async fn mutation_kill_and_requeue() { +async fn mutation_kill_and_requeue_and_discard() { skip_check!(); // prepare a client and clean up the queue @@ -965,6 +965,76 @@ async fn mutation_kill_and_requeue() { .map(|v| *v) .unwrap_or_default(); assert_eq!(njobs, 2); + + // prepare a worker that will fail the job unconditionally + let mut worker = Worker::builder::() + .register_fn(local, move |_job| async move { + panic!("force fail this job"); + }) + .connect() + .await + .unwrap(); + + // cosume them (and immediately fail them) and make + // sure the queue is drained + assert!(worker.run_one(0, &[local]).await.unwrap()); + assert!(worker.run_one(0, &[local]).await.unwrap()); + assert!(!worker.run_one(0, &[local]).await.unwrap()); + let njobs = client + .current_info() + .await + .unwrap() + .data + .queues + .get(local) + .map(|v| *v) + .unwrap_or_default(); + assert_eq!(njobs, 0); // sanity check + + // so the jobs have transitioned from being enqueued + // to the `retries` set, and we can now completely discard them + client + .discard( + MutationTarget::Retries, + MutationFilter::builder().kind(local).build(), + ) + .await + .unwrap(); + + // Double-check + client + .requeue( + MutationTarget::Retries, + MutationFilter::builder().kind(local).build(), + ) + .await + .unwrap(); + client + .requeue( + MutationTarget::Dead, + MutationFilter::builder().kind(local).build(), + ) + .await + .unwrap(); + client + .requeue( + MutationTarget::Scheduled, + MutationFilter::builder().kind(local).build(), + ) + .await + .unwrap(); + + // Gone for good + let njobs = client + .current_info() + .await + .unwrap() + .data + .queues + .get(local) + .map(|v| *v) + .unwrap_or_default(); + assert_eq!(njobs, 0); } #[tokio::test(flavor = "multi_thread")] @@ -987,7 +1057,7 @@ async fn mutation_requeue_specific_jobs_only() { // prepare a worker that will fail the job unconditionally let mut worker = Worker::builder::() .register_fn(local, move |_job| async move { - panic!("Force fail this job"); + panic!("force fail this job"); }) .connect() .await