Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support remove queues, as well as pause all / resume all #59

Merged
merged 131 commits into from
May 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
131 commits
Select commit Hold shift + click to select a range
9fdcd85
Reconcile with latest main
rustworthy Feb 23, 2024
cbeb2ce
Use x509-parser for tls test. Add test verifier
rustworthy Feb 24, 2024
a3b99c4
Make minimal versions pass again
rustworthy Feb 24, 2024
7ab8ab3
Comment out Openssl installation on windows in CI
rustworthy Feb 24, 2024
e6b3a4c
Revert "Comment out Openssl installation on windows in CI"
rustworthy Feb 24, 2024
ab38bfc
Restore patch version. Comment out OpenSSL install on CI
rustworthy Feb 24, 2024
ce4c8f2
Update tls test to use job runner
rustworthy Feb 24, 2024
fb2ddf1
interim
rustworthy Feb 24, 2024
2d4ef10
Improme consumer builder api
rustworthy Feb 24, 2024
0582514
Run fmt
rustworthy Feb 24, 2024
7f44798
Split client mod
rustworthy Feb 25, 2024
84864da
Producer -> Client
rustworthy Feb 25, 2024
6d11ad9
Fix 'commit_batch' on Client
rustworthy Feb 25, 2024
a875a35
Consumer -> Worker
rustworthy Feb 25, 2024
a1c613a
Add JobId struct
rustworthy Feb 26, 2024
008126d
Use JobId in lib
rustworthy Feb 27, 2024
9a56dea
Use WorkerId new-type
rustworthy Feb 27, 2024
566d2a2
Split Batch mod into logic constructs
rustworthy Feb 27, 2024
b7560ba
Use BatchId for operaions with Batch
rustworthy Feb 27, 2024
2fb7c63
re-export async trait
rustworthy Feb 28, 2024
755d452
Make 'jid', 'kind' and 'args' public on Job struct
rustworthy Feb 28, 2024
65110cf
Clean up in worker crate
rustworthy Mar 1, 2024
dcd6d97
Re-export rustls for convenience
rustworthy Mar 1, 2024
3c9ade4
re-export tokio main, clean up
rustworthy Mar 1, 2024
55890c8
Group re-exports in lib.rs
rustworthy Mar 2, 2024
00a6d1d
Fix copypasta in cmd.rs
rustworthy Mar 2, 2024
5382d5e
Rm redundant method on Worker
rustworthy Mar 20, 2024
dde77ce
Respecrt clippy on beta chan
rustworthy Mar 20, 2024
faeb436
Use try_seconds instead of seconds on Duration struct
rustworthy Mar 20, 2024
9ba6e85
Use try_seconds instead of seconds on Duration struct in tests
rustworthy Mar 20, 2024
fe65c67
Pin chrono at 0.4.32 to make minimal-versions pass
rustworthy Mar 20, 2024
ab59c97
Pin chrono at 0.4.32 to make minimal-versions pass
rustworthy Mar 20, 2024
8b979a4
Fix opts in Worker::connect method
rustworthy Mar 20, 2024
5077f2a
Use 'dep:' for feature deps
rustworthy Apr 2, 2024
5b041c9
Use tokio-native-tls for tls feature
rustworthy Apr 2, 2024
4efd5b6
Rm re-exports. Use tokio/macros for binaries feat only
rustworthy Apr 2, 2024
d657f69
Restore pin of min version for openssl
rustworthy Apr 3, 2024
53969f7
Use JoinSet in loadtest binary
rustworthy Apr 3, 2024
ea5fa8b
Rm left-over from proto/mod
rustworthy Apr 3, 2024
d3389ca
Keep 'CommitBatch' public in crate
rustworthy Apr 3, 2024
b3a272d
Keep 'GetBatchStatus' public in crate
rustworthy Apr 3, 2024
c70ff2c
Keep 'OpenBatch' public in crate
rustworthy Apr 3, 2024
a561377
Clean up in Cleint::init
rustworthy Apr 4, 2024
e2ccfcf
Split private and public methods for Client into blocks
rustworthy Apr 4, 2024
a1a701e
Derive Debug for ClientOptions
rustworthy Apr 4, 2024
9e55c24
Update self_to_cmd to not use format
rustworthy Apr 4, 2024
3aba8da
Add 'new' method for Ack cmd
rustworthy Apr 4, 2024
8270b7e
Add docs to 'generic' and 'generic_with_backtrace'
rustworthy Apr 4, 2024
b97071e
Only make commands pub(crate)
rustworthy Apr 4, 2024
6a32b4e
Add disclaimer on batch and job progress: ent only
rustworthy Apr 5, 2024
84897e9
Make jid private again
rustworthy Apr 5, 2024
fddd71f
Keep `args` private on `Job`
rustworthy Apr 5, 2024
11c9a87
Rm rudimentary extern cate syntax from test binaries
rustworthy Apr 5, 2024
bfc76fc
Rm left-over from testing
rustworthy Apr 5, 2024
5ee03bd
Use mem::take in pop_bytes_written on MockStream
rustworthy Apr 5, 2024
8da1e6c
Rm left-over print stmts from testing from tests/mock
rustworthy Apr 5, 2024
44a3b31
Restore hello_c test as hello_worker
rustworthy Apr 5, 2024
8b6e784
Clean up 'roundtrip' test
rustworthy Apr 5, 2024
b633b19
Rm 'extern' declaration from tests/real
rustworthy Apr 6, 2024
dd3a989
Make min-versions pass again
rustworthy Apr 7, 2024
802c1db
Support remove queues, as well as pause all / resume all
rustworthy Apr 9, 2024
4b2b819
Add test for wirldcard queue control actions
rustworthy Apr 9, 2024
8ba888d
Run wildcard queue control actions test as dedicated step on CI
rustworthy Apr 9, 2024
7bbf826
Add test run commmand comment
rustworthy Apr 9, 2024
c49f972
Double-check queues are removed from Faktory
rustworthy Apr 10, 2024
8cb472e
Make `Client::info` return `ServerState` struct
rustworthy Apr 18, 2024
c1251b3
Adjust test: Faktory does not know about queue just yet
rustworthy Apr 18, 2024
9365b9d
Rm queue as test clean up step
rustworthy Apr 18, 2024
5a17498
Support both openssl and rustls
rustworthy Apr 21, 2024
13eaace
Support both openssl and rustls. Clean up
rustworthy Apr 21, 2024
8d54b0c
Support both openssl and rustls. Min versions fix
rustworthy Apr 21, 2024
f1e5966
Support both openssl and rustls. Add TlsStream error
rustworthy Apr 21, 2024
58cb247
Support both openssl and rustls. Rustls clean up
rustworthy Apr 22, 2024
6628efb
Rm `async` from loadtest binary name
rustworthy Apr 22, 2024
6af3dcc
Rm leading underscore in `ops_count` var in loadtest
rustworthy Apr 22, 2024
f8d3f32
Rm redundant 'asynchronously' from the docs in proto::client mod
rustworthy Apr 22, 2024
e9cd581
Update private docs for ClientOptions
rustworthy Apr 22, 2024
2df103d
Craete BatchId, WorkerId, and JobId with `::new`
rustworthy Apr 22, 2024
a491964
Add `AsRef<str>` impl for BatchId, WorkerId, and JobId
rustworthy Apr 22, 2024
bb60f6b
Run formatter. Fix doc code
rustworthy Apr 22, 2024
14bcf64
Take 'Fail' in WorkerStatesRegistry::register_failure
rustworthy Apr 23, 2024
609950f
Impl IntoIterator for WorkerStatesRegistry
rustworthy Apr 23, 2024
db9b636
Add docs for Closure newtype
rustworthy Apr 23, 2024
757f92e
Restore JobRunner impl for &' mut F
rustworthy Apr 23, 2024
6469573
Update worker in community::roundtrip test to chain `register`
rustworthy Apr 23, 2024
803400d
Do not requiere *Ext in src::worker::mod
rustworthy Apr 23, 2024
4b29f9c
Restore docs for WorkerBuilder::default
rustworthy Apr 23, 2024
d869858
Rm redunrant helper methods on Worker
rustworthy Apr 23, 2024
9329ea0
Restore docs on WorkerBuilder::connect
rustworthy Apr 23, 2024
3b05585
Place comments in worker::mod to original location
rustworthy Apr 23, 2024
14e06ad
Rm redundate .take on statuses iterator
rustworthy Apr 23, 2024
df8a4b1
Run cargo fmt
rustworthy Apr 23, 2024
2bd215d
Use JoinSet in Worker::run. Update consumer::terminate test.
rustworthy Apr 24, 2024
2d4da65
Update signature in batch ent methods to accept AsRef<BatchId>
rustworthy Apr 24, 2024
0703983
Add serde transparent for newtype ids
rustworthy Apr 24, 2024
55d9bea
Ask for String rather than &str in TlsStream::new
rustworthy Apr 24, 2024
8d91c26
Rm redundant TlsStrean::create_new
rustworthy Apr 24, 2024
a44c63a
Pin serde at 1.0.186 to make min versions pass
rustworthy Apr 24, 2024
a088548
Add WorkerBuilder::add_to_labels method. Demonstrate in test
rustworthy Apr 26, 2024
ce4a65f
Rm empty line in WorkerBuilder::register
rustworthy Apr 26, 2024
da28e8d
Rename WorkerBuilder::register to WorkerBuilder::register_fn
rustworthy Apr 27, 2024
050a28d
Add to WorkerBuilder::register and ::register_fn docs
rustworthy Apr 27, 2024
6644a2f
Re-use WorkerBuilder::connect_with in ::connect
rustworthy Apr 27, 2024
6e5121a
Do not use *Ext as bounds
rustworthy Apr 27, 2024
0490637
Clean up worker::health module
rustworthy Apr 27, 2024
dfda0db
Rm excessive bounds
rustworthy Apr 27, 2024
56ccfb6
Store STATUS_TERMINATING in Fakotry signalled so
rustworthy Apr 27, 2024
44b8cfb
Store STATUS_TERMINATING in Fakotry signalled so
rustworthy Apr 27, 2024
f07d932
Add PartialEq<str> to proto::single::id module newtypes
rustworthy Apr 28, 2024
cdb8caf
Do not require static string in tls::rustls
rustworthy Apr 28, 2024
fd231d1
Use permalink in Worker::listen_for_heartbeats docs
rustworthy Apr 28, 2024
15d1711
Make WorkerBuilder return Self
rustworthy Apr 28, 2024
7724d24
Run cargo fmt on sources
rustworthy Apr 28, 2024
46e5457
Make min version pass
rustworthy Apr 28, 2024
57c958c
Make min version pass. Re-gen lockfile
rustworthy Apr 28, 2024
70d40ad
Make min version pass. Use rustls_pki_types dev dep
rustworthy Apr 28, 2024
88a62d5
Merge branch 'development' into feat/remove-queues
rustworthy Apr 29, 2024
3e592b0
Rename Client::info to Client::current_info
rustworthy Apr 29, 2024
3c262df
Annotate ServerState as non-exhaustive and DataSnapshot::tasks as dep…
rustworthy Apr 29, 2024
48032b0
Merge branch 'main' into feat/remove-queues
rustworthy May 12, 2024
ccd5707
Resrtore Duration::seconds in tests::real::enterprise
rustworthy May 12, 2024
7653dfd
Resrtore Duration::seconds in proto::single::ent
rustworthy May 12, 2024
0a6d1b5
Checkout lockifile to main
rustworthy May 12, 2024
910d47e
Refactor Client::may_bid using pattern matching
rustworthy May 12, 2024
bd64699
Do not require ownerwship in Client::set_progess and Client::get_prog…
rustworthy May 12, 2024
f947b60
Upd docs for ProgressUpdate
rustworthy May 12, 2024
d6c36e8
Remove ServerState and Client::current_info logic
rustworthy May 20, 2024
125eed1
Use 'faktory' as key to grab 'data' in tests::real::community
rustworthy May 20, 2024
6519036
Chekout Makefile and .gitignor to main
rustworthy May 20, 2024
1a83b8b
Use 'faktory' as key to grab 'data' in tests::real::community
rustworthy May 20, 2024
3b33790
Rm tech debt fixes that are now a separate PR
rustworthy May 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ jobs:
run: cargo test --locked --all-features --all-targets
env: # set this explicitly so integration tests will run
FAKTORY_URL: tcp://127.0.0.1:7419
# commands executed during the following test affect all the queues on the Faktory server,
# so we perform this test in a dedicated - isolated - step, re-using the the Faktory container
- name: cargo test --locked (queue control actions)
run: cargo test --locked --all-features --all-targets queue_control_actions_wildcard -- --include-ignored
env: # set this explicitly so integration tests will run
FAKTORY_URL: tcp://127.0.0.1:7419
# https://github.com/rust-lang/cargo/issues/6669
- name: cargo test --doc
run: cargo test --locked --all-features --doc
Expand Down
59 changes: 51 additions & 8 deletions src/proto/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,20 @@
},
}
}

pub(crate) async fn perform_queue_action<Q>(
&mut self,
queues: &[Q],
action: QueueAction,
) -> Result<(), Error>
where
Q: AsRef<str> + Sync,
{
self.issue(&QueueControl::new(action, queues))
.await?

Check warning on line 325 in src/proto/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/proto/client/mod.rs#L325

Added line #L325 was not covered by tests
.read_ok()
.await
}
}

impl<S> Client<S>
Expand Down Expand Up @@ -370,25 +384,54 @@
}

/// Pause the given queues.
///
/// Passing a wildcard `&["*"]` as the value of the `queues` parameter
/// will pause all the queues. To be more explicit, you may want to call [`Client::queue_pause_all`]
/// shortcut method to pause all the queues.
pub async fn queue_pause<Q>(&mut self, queues: &[Q]) -> Result<(), Error>
where
Q: AsRef<str> + Sync,
{
self.issue(&QueueControl::new(QueueAction::Pause, queues))
.await?
.read_ok()
.await
self.perform_queue_action(queues, QueueAction::Pause).await
}

/// Pause all queues.
pub async fn queue_pause_all(&mut self) -> Result<(), Error> {
self.perform_queue_action(&["*"], QueueAction::Pause).await

Check warning on line 400 in src/proto/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/proto/client/mod.rs#L399-L400

Added lines #L399 - L400 were not covered by tests
}

/// Resume the given queues.
///
/// Passing a wildcard `&["*"]` as the value of the `queues` parameter
/// will resume all the queues. To be more explicit, you may want to call [`Client::queue_resume_all`]
/// shortcut method to resume all the queues.
pub async fn queue_resume<Q>(&mut self, queues: &[Q]) -> Result<(), Error>
where
Q: AsRef<str> + Sync,
{
self.issue(&QueueControl::new(QueueAction::Resume, queues))
.await?
.read_ok()
.await
self.perform_queue_action(queues, QueueAction::Resume).await
}

/// Resume all queues.
pub async fn queue_resume_all(&mut self) -> Result<(), Error> {
self.perform_queue_action(&["*"], QueueAction::Resume).await
}

Check warning on line 418 in src/proto/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/proto/client/mod.rs#L416-L418

Added lines #L416 - L418 were not covered by tests

/// Remove the given queues.
///
/// Beware, passing a wildcard `&["*"]` as the value of the `queues` parameter
/// will **remove** all the queues. To be more explicit, you may want to call [`Client::queue_remove_all`]
/// shortcut method to remove all the queues.
pub async fn queue_remove<Q>(&mut self, queues: &[Q]) -> Result<(), Error>
where
Q: AsRef<str> + Sync,
{
self.perform_queue_action(queues, QueueAction::Remove).await
}

/// Remove all queues.
pub async fn queue_remove_all(&mut self) -> Result<(), Error> {
self.perform_queue_action(&["*"], QueueAction::Remove).await

Check warning on line 434 in src/proto/client/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/proto/client/mod.rs#L433-L434

Added lines #L433 - L434 were not covered by tests
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/proto/single/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ impl FaktoryCommand for PushBulk {
pub(crate) enum QueueAction {
Pause,
Resume,
Remove,
}

pub(crate) struct QueueControl<'a, S>
Expand All @@ -298,6 +299,7 @@ where
let command = match self.action {
QueueAction::Pause => b"QUEUE PAUSE".as_ref(),
QueueAction::Resume => b"QUEUE RESUME".as_ref(),
QueueAction::Remove => b"QUEUE REMOVE".as_ref(),
};
w.write_all(command).await?;
write_queues(w, self.queues).await?;
Expand Down
177 changes: 164 additions & 13 deletions tests/real/community.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,41 +152,192 @@ async fn fail() {
}

#[tokio::test(flavor = "multi_thread")]
async fn queue() {
async fn queue_control_actions() {
skip_check!();
let local = "pause";

let local_1 = "queue_control_pause_and_resume_1";
let local_2 = "queue_control_pause_and_resume_2";

let (tx, rx) = sync::mpsc::channel();
let tx = sync::Arc::new(sync::Mutex::new(tx));
let tx_1 = sync::Arc::new(sync::Mutex::new(tx));
let tx_2 = sync::Arc::clone(&tx_1);

let mut w = WorkerBuilder::default()
let mut worker = WorkerBuilder::default()
.hostname("tester".to_string())
.wid(WorkerId::new(local))
.register_fn(local, move |_job| {
let tx = sync::Arc::clone(&tx);
.wid(WorkerId::new(local_1))
.register_fn(local_1, move |_job| {
let tx = sync::Arc::clone(&tx_1);
Box::pin(async move { tx.lock().unwrap().send(true) })
})
.register_fn(local_2, move |_job| {
let tx = sync::Arc::clone(&tx_2);
Box::pin(async move { tx.lock().unwrap().send(true) })
})
.connect(None)
.await
.unwrap();

let mut p = Client::connect(None).await.unwrap();
p.enqueue(Job::new(local, vec![Value::from(1)]).on_queue(local))
let mut client = Client::connect(None).await.unwrap();

// enqueue three jobs
client
.enqueue_many([
Job::new(local_1, vec![Value::from(1)]).on_queue(local_1),
Job::new(local_1, vec![Value::from(1)]).on_queue(local_1),
Job::new(local_1, vec![Value::from(1)]).on_queue(local_1),
])
.await
.unwrap();
p.queue_pause(&[local]).await.unwrap();

let had_job = w.run_one(0, &[local]).await.unwrap();
// pause the queue
client.queue_pause(&[local_1]).await.unwrap();

// try to consume from that queue
let had_job = worker.run_one(0, &[local_1]).await.unwrap();
assert!(!had_job);
let worker_executed = rx.try_recv().is_ok();
assert!(!worker_executed);

p.queue_resume(&[local]).await.unwrap();
// resume that queue and ...
client.queue_resume(&[local_1]).await.unwrap();

let had_job = w.run_one(0, &[local]).await.unwrap();
// ... be able to consume from it
let had_job = worker.run_one(0, &[local_1]).await.unwrap();
assert!(had_job);
let worker_executed = rx.try_recv().is_ok();
assert!(worker_executed);

// push two jobs on the other queue (reminder: we got two jobs
// remaining on the first queue):
client
.enqueue_many([
Job::new(local_2, vec![Value::from(1)]).on_queue(local_2),
Job::new(local_2, vec![Value::from(1)]).on_queue(local_2),
])
.await
.unwrap();

// pause both queues the queues
client.queue_pause(&[local_1, local_2]).await.unwrap();

// try to consume from them
assert!(!worker.run_one(0, &[local_1]).await.unwrap());
assert!(!worker.run_one(0, &[local_2]).await.unwrap());
assert!(!rx.try_recv().is_ok());

// now, resume the queues and ...
client.queue_resume(&[local_1, local_2]).await.unwrap();

// ... be able to consume from both of them
assert!(worker.run_one(0, &[local_1]).await.unwrap());
assert!(rx.try_recv().is_ok());
assert!(worker.run_one(0, &[local_2]).await.unwrap());
assert!(rx.try_recv().is_ok());

// let's inspect the sever state
let server_state = client.info().await.unwrap();
let queues = &server_state.get("faktory").unwrap().get("queues").unwrap();
assert_eq!(*queues.get(local_1).unwrap(), 1); // 1 job remaining
assert_eq!(*queues.get(local_2).unwrap(), 1); // also 1 job remaining

// let's now remove the queues
client.queue_remove(&[local_1, local_2]).await.unwrap();

// though there _was_ a job in each queue, consuming from
// the removed queues will not yield anything
assert!(!worker.run_one(0, &[local_1]).await.unwrap());
assert!(!worker.run_one(0, &[local_2]).await.unwrap());
assert!(!rx.try_recv().is_ok());

// let's inspect the sever state again
let server_state = client.info().await.unwrap();
let queues = &server_state.get("faktory").unwrap().get("queues").unwrap();
// our queue are not even mentioned in the server report:
assert!(queues.get(local_1).is_none());
assert!(queues.get(local_2).is_none());
}

// Run the following test with:
// FAKTORY_URL=tcp://127.0.0.1:7419 cargo test --locked --all-features --all-targets queue_control_actions_wildcard -- --include-ignored
#[tokio::test(flavor = "multi_thread")]
#[ignore = "this test requires a dedicated test run since the commands being tested will affect all queues on the Faktory server"]
async fn queue_control_actions_wildcard() {
skip_check!();

let local_1 = "queue_control_wildcard_1";
let local_2 = "queue_control_wildcard_2";

let (tx, rx) = sync::mpsc::channel();
let tx_1 = sync::Arc::new(sync::Mutex::new(tx));
let tx_2 = sync::Arc::clone(&tx_1);

let mut worker = WorkerBuilder::default()
.hostname("tester".to_string())
.wid(WorkerId::new(local_1))
.register_fn(local_1, move |_job| {
let tx = sync::Arc::clone(&tx_1);
Box::pin(async move { tx.lock().unwrap().send(true) })
})
.register_fn(local_2, move |_job| {
let tx = sync::Arc::clone(&tx_2);
Box::pin(async move { tx.lock().unwrap().send(true) })
})
.connect(None)
.await
.unwrap();

let mut client = Client::connect(None).await.unwrap();

// enqueue two jobs on each queue
client
.enqueue_many([
Job::new(local_1, vec![Value::from(1)]).on_queue(local_1),
Job::new(local_1, vec![Value::from(1)]).on_queue(local_1),
Job::new(local_2, vec![Value::from(1)]).on_queue(local_2),
Job::new(local_2, vec![Value::from(1)]).on_queue(local_2),
])
.await
.unwrap();

// pause all queues the queues
client.queue_pause_all().await.unwrap();

// try to consume from queues
assert!(!worker.run_one(0, &[local_1]).await.unwrap());
assert!(!worker.run_one(0, &[local_2]).await.unwrap());
assert!(!rx.try_recv().is_ok());

// now, resume all the queues and ...
client.queue_resume_all().await.unwrap();

// ... be able to consume from both of them
assert!(worker.run_one(0, &[local_1]).await.unwrap());
assert!(rx.try_recv().is_ok());
assert!(worker.run_one(0, &[local_2]).await.unwrap());
assert!(rx.try_recv().is_ok());

// let's inspect the sever state
let server_state = client.info().await.unwrap();
let queues = &server_state.get("faktory").unwrap().get("queues").unwrap();
assert_eq!(*queues.get(local_1).unwrap(), 1); // 1 job remaining
assert_eq!(*queues.get(local_2).unwrap(), 1); // also 1 job remaining

// let's now remove all the queues
client.queue_remove_all().await.unwrap();

// though there _was_ a job in each queue, consuming from
// the removed queues will not yield anything
assert!(!worker.run_one(0, &[local_1]).await.unwrap());
assert!(!worker.run_one(0, &[local_2]).await.unwrap());
assert!(!rx.try_recv().is_ok());

// let's inspect the sever state again
let server_state = client.info().await.unwrap();
let queues = &server_state.get("faktory").unwrap().get("queues").unwrap();

// our queue are not even mentioned in the server report:
assert!(queues.get(local_1).is_none());
assert!(queues.get(local_2).is_none());
}

#[tokio::test(flavor = "multi_thread")]
Expand Down
Loading