From 85993d3deacc212b4d6b00ac33004100dd02dc37 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Fri, 22 Nov 2024 11:19:49 +0400 Subject: [PATCH] Test Client::clear --- Makefile | 6 +++ tests/real/community.rs | 87 +++++++++++++++++++++++++++++++++++++++-- 2 files changed, 89 insertions(+), 4 deletions(-) diff --git a/Makefile b/Makefile index 34d11cca..b2a3d318 100644 --- a/Makefile +++ b/Makefile @@ -58,6 +58,12 @@ test/e2e: cargo test --locked --all-features --all-targets -- \ --nocapture $(pattern) +.PHONY: test/e2e/ignored +test/e2e/ignored: + FAKTORY_URL=tcp://${FAKTORY_HOST}:${FAKTORY_PORT} \ + cargo test --locked --all-features --all-targets -- \ + --nocapture --include-ignored queue_control_actions_wildcard + .PHONY: test/e2e/tls test/e2e/tls: FAKTORY_URL_SECURE=tcp://:${FAKTORY_PASSWORD}@${FAKTORY_HOST}:${FAKTORY_PORT_SECURE} \ diff --git a/tests/real/community.rs b/tests/real/community.rs index b16a380e..116cb49e 100644 --- a/tests/real/community.rs +++ b/tests/real/community.rs @@ -1,11 +1,13 @@ use crate::{assert_gte, skip_check}; +use chrono::Utc; use faktory::{ Client, Job, JobBuilder, JobId, MutationFilter, MutationTarget, StopReason, Worker, WorkerBuilder, WorkerId, }; use serde_json::Value; -use std::{io, sync, time::Duration}; -use tokio::time as tokio_time; +use std::time::Duration; +use std::{io, sync}; +use tokio::time::{self as tokio_time}; use tokio_util::sync::CancellationToken; #[tokio::test(flavor = "multi_thread")] @@ -383,6 +385,26 @@ async fn queue_control_actions_wildcard() { let local_1 = "queue_control_wildcard_1"; let local_2 = "queue_control_wildcard_2"; + // prepare a client and remove any left-overs + // from the previous test run + let mut client = Client::connect().await.unwrap(); + client + .requeue( + MutationTarget::Retries, + MutationFilter::builder().kind(local_1).build(), + ) + .await + .unwrap(); + client + .requeue( + MutationTarget::Retries, + MutationFilter::builder().kind(local_2).build(), + ) + .await + .unwrap(); + client.queue_remove(&[local_1]).await.unwrap(); + client.queue_remove(&[local_2]).await.unwrap(); + let (tx, rx) = sync::mpsc::channel(); let tx_1 = sync::Arc::new(sync::Mutex::new(tx)); let tx_2 = sync::Arc::clone(&tx_1); @@ -402,8 +424,6 @@ async fn queue_control_actions_wildcard() { .await .unwrap(); - let mut client = Client::connect().await.unwrap(); - // enqueue two jobs on each queue client .enqueue_many([ @@ -453,6 +473,55 @@ async fn queue_control_actions_wildcard() { // our queue are not even mentioned in the server report: assert!(queues.get(local_1).is_none()); assert!(queues.get(local_2).is_none()); + + // let's also test here one bit from the Faktory MUTATION API, + // which affects the entire target set; + // for this, let's enqueue a few jobs that are not supposed to be + // consumed immediately, rather in a few minutes; this they these + // jobs will get into the `scheduled` set + let soon = Utc::now() + Duration::from_secs(2); + client + .enqueue_many([ + Job::builder(local_1) + .args(vec![Value::from(1)]) + .queue(local_1) + .at(soon) + .build(), + Job::builder(local_1) + .args(vec![Value::from(1)]) + .queue(local_1) + .at(soon) + .build(), + Job::builder(local_2) + .args(vec![Value::from(1)]) + .queue(local_2) + .at(soon) + .build(), + Job::builder(local_2) + .args(vec![Value::from(1)]) + .queue(local_2) + .at(soon) + .build(), + ]) + .await + .unwrap(); + + // now, let's just clear all the scheduled jobs + client.clear(MutationTarget::Scheduled).await.unwrap(); + + tokio_time::sleep(Duration::from_secs(2)).await; + + // the queue is empty + assert!(!worker.run_one(0, &[local_1]).await.unwrap()); + + // even if we force-schedule those jobs + client + .requeue(MutationTarget::Scheduled, MutationFilter::empty()) + .await + .unwrap(); + + // still empty, meaing the jobs have been purged for good + assert!(!worker.run_one(0, &[local_1]).await.unwrap()); } #[tokio::test(flavor = "multi_thread")] @@ -819,6 +888,16 @@ async fn mutation_requeue_jobs() { // TODO: Examine the job's failure (will need a dedicated PR) } +#[tokio::test(flavor = "multi_thread")] +async fn mutation_kill_vs_discard() { + // Plan: + // 1. Create and push a few jobs with Job::at(..) populated + // 2. Kill them: `scheduled` -> `dead` + // 3. Re-enqueue them: `dead` -> `enqueued` + // 4. Consume-fail them: `enqueued` -> `retries` + // 5. Discard them: `retries` -> void +} + #[tokio::test(flavor = "multi_thread")] async fn mutation_requeue_specific_jobs_only() { skip_check!();