From 12273816e216fac510b88509a0ad7df24d7e3aa1 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Sun, 18 Aug 2024 19:59:04 +0400 Subject: [PATCH] Rm WorkerBuilder::connect_with_buffered --- src/worker/builder.rs | 31 ++++++++++--------------------- tests/consumer.rs | 18 +++++++++--------- 2 files changed, 19 insertions(+), 30 deletions(-) diff --git a/src/worker/builder.rs b/src/worker/builder.rs index 169c1199..7ff18767 100644 --- a/src/worker/builder.rs +++ b/src/worker/builder.rs @@ -5,7 +5,7 @@ use crate::{ }; use std::future::Future; use std::time::Duration; -use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, BufStream}; +use tokio::io::{AsyncBufRead, AsyncWrite, BufStream}; use tokio::net::TcpStream as TokioStream; #[derive(Debug, Clone, PartialEq, Eq)] @@ -280,23 +280,9 @@ impl WorkerBuilder { /// Connect to a Faktory server with a non-standard stream. /// - /// Iternally, the `stream` will be buffered. In case you've got a `stream` that is _already_ - /// buffered (and so it is `AsyncBufRead`), you will want to use [`WorkerBuilder::connect_with_buffered`] - /// in order to avoid buffering the stream twice. - pub async fn connect_with(self, stream: S, pwd: Option) -> Result, Error> - where - S: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static, - BufStream: Reconnect, - { - let stream = BufStream::new(stream); - WorkerBuilder::connect_with_buffered(self, stream, pwd).await - } - - /// Connect to a Faktory server with a non-standard buffered stream. - /// - /// In case you've got a `stream` that is _not_ buffered just yet, you may want to use - /// [`WorkerBuilder::connect_with`] that will do this buffering for you. - pub async fn connect_with_buffered( + /// In case you've got a `stream` that doesn't already implement `AsyncBufRead`, you will + /// want to wrap it in `tokio::io::BufStream`. + pub async fn connect_with( mut self, stream: S, pwd: Option, @@ -337,17 +323,20 @@ impl WorkerBuilder { TlsKind::None => { let addr = utils::host_from_url(&parsed_url); let stream = TokioStream::connect(addr).await?; - self.connect_with(stream, password).await + let buffered = BufStream::new(stream); + self.connect_with(buffered, password).await } #[cfg(feature = "rustls")] TlsKind::Rust => { let stream = crate::rustls::TlsStream::connect_with_native_certs(url).await?; - self.connect_with(stream, password).await + let buffered = BufStream::new(stream); + self.connect_with(buffered, password).await } #[cfg(feature = "native_tls")] TlsKind::Native => { let stream = crate::native_tls::TlsStream::connect(url).await?; - self.connect_with(stream, password).await + let buffered = BufStream::new(stream); + self.connect_with(buffered, password).await } } } diff --git a/tests/consumer.rs b/tests/consumer.rs index 8d9cd29d..2d9037bf 100644 --- a/tests/consumer.rs +++ b/tests/consumer.rs @@ -77,7 +77,7 @@ async fn hello() { .add_to_labels(["will".to_string()]) .add_to_labels(["be".to_string(), "added".to_string()]) .register_fn("never_called", |_j: Job| async move { unreachable!() }) - .connect_with(s.clone(), None) + .connect_with(BufStream::new(s.clone()), None) .await .unwrap(); let written = s.pop_bytes_written(0); @@ -104,7 +104,7 @@ async fn hello_pwd() { let mut s = mock::Stream::with_salt(1545, "55104dc76695721d"); let w: Worker = WorkerBuilder::default() .register_fn("never_called", |_j: Job| async move { unreachable!() }) - .connect_with_buffered(BufStream::new(s.clone()), Some("foobar".to_string())) + .connect_with(BufStream::new(s.clone()), Some("foobar".to_string())) .await .unwrap(); let written = s.pop_bytes_written(0); @@ -126,7 +126,7 @@ async fn dequeue() { assert_eq!(job.args(), &["z"]); Ok::<(), io::Error>(()) }) - .connect_with(s.clone(), None) + .connect_with(BufStream::new(s.clone()), None) .await .unwrap(); s.ignore(0); @@ -167,7 +167,7 @@ async fn dequeue_first_empty() { assert_eq!(job.args(), &["z"]); Ok::<(), io::Error>(()) }) - .connect_with(s.clone(), None) + .connect_with(BufStream::new(s.clone()), None) .await .unwrap(); s.ignore(0); @@ -226,7 +226,7 @@ async fn well_behaved() { sleep(Duration::from_secs(7)).await; Ok::<(), io::Error>(()) }) - .connect_with(s.clone(), None) + .connect_with(BufStream::new(s.clone()), None) .await .unwrap(); s.ignore(0); @@ -295,7 +295,7 @@ async fn no_first_job() { sleep(Duration::from_secs(7)).await; Ok::<(), io::Error>(()) }) - .connect_with(s.clone(), None) + .connect_with(BufStream::new(s.clone()), None) .await .unwrap(); s.ignore(0); @@ -366,7 +366,7 @@ async fn well_behaved_many() { sleep(Duration::from_secs(7)).await; Ok::<(), io::Error>(()) }) - .connect_with(s.clone(), None) + .connect_with(BufStream::new(s.clone()), None) .await .unwrap(); s.ignore(0); @@ -451,7 +451,7 @@ async fn terminate() { sleep(Duration::from_secs(5)).await; } }) - .connect_with(s.clone(), None) + .connect_with(BufStream::new(s.clone()), None) .await .unwrap(); @@ -569,7 +569,7 @@ async fn heart_broken() { sleep(Duration::from_secs(7)).await; Ok(()) }) - .connect_with(s.clone(), None) + .connect_with(BufStream::new(s.clone()), None) .await .unwrap();