Skip to content

Commit

Permalink
Rm WorkerBuilder::connect_with_buffered
Browse files Browse the repository at this point in the history
  • Loading branch information
rustworthy committed Aug 18, 2024
1 parent 58d6231 commit 1227381
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 30 deletions.
31 changes: 10 additions & 21 deletions src/worker/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
};
use std::future::Future;
use std::time::Duration;
use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, BufStream};
use tokio::io::{AsyncBufRead, AsyncWrite, BufStream};
use tokio::net::TcpStream as TokioStream;

#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -280,23 +280,9 @@ impl<E: 'static> WorkerBuilder<E> {

/// Connect to a Faktory server with a non-standard stream.
///
/// Iternally, the `stream` will be buffered. In case you've got a `stream` that is _already_
/// buffered (and so it is `AsyncBufRead`), you will want to use [`WorkerBuilder::connect_with_buffered`]
/// in order to avoid buffering the stream twice.
pub async fn connect_with<S>(self, stream: S, pwd: Option<String>) -> Result<Worker<E>, Error>
where
S: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
BufStream<S>: Reconnect,
{
let stream = BufStream::new(stream);
WorkerBuilder::connect_with_buffered(self, stream, pwd).await
}

/// Connect to a Faktory server with a non-standard buffered stream.
///
/// In case you've got a `stream` that is _not_ buffered just yet, you may want to use
/// [`WorkerBuilder::connect_with`] that will do this buffering for you.
pub async fn connect_with_buffered<S>(
/// In case you've got a `stream` that doesn't already implement `AsyncBufRead`, you will
/// want to wrap it in `tokio::io::BufStream`.
pub async fn connect_with<S>(
mut self,
stream: S,
pwd: Option<String>,
Expand Down Expand Up @@ -337,17 +323,20 @@ impl<E: 'static> WorkerBuilder<E> {
TlsKind::None => {
let addr = utils::host_from_url(&parsed_url);
let stream = TokioStream::connect(addr).await?;
self.connect_with(stream, password).await
let buffered = BufStream::new(stream);
self.connect_with(buffered, password).await
}
#[cfg(feature = "rustls")]
TlsKind::Rust => {
let stream = crate::rustls::TlsStream::connect_with_native_certs(url).await?;
self.connect_with(stream, password).await
let buffered = BufStream::new(stream);
self.connect_with(buffered, password).await

Check warning on line 333 in src/worker/builder.rs

View check run for this annotation

Codecov / codecov/patch

src/worker/builder.rs#L331-L333

Added lines #L331 - L333 were not covered by tests
}
#[cfg(feature = "native_tls")]
TlsKind::Native => {
let stream = crate::native_tls::TlsStream::connect(url).await?;
self.connect_with(stream, password).await
let buffered = BufStream::new(stream);
self.connect_with(buffered, password).await

Check warning on line 339 in src/worker/builder.rs

View check run for this annotation

Codecov / codecov/patch

src/worker/builder.rs#L337-L339

Added lines #L337 - L339 were not covered by tests
}
}
}
Expand Down
18 changes: 9 additions & 9 deletions tests/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async fn hello() {
.add_to_labels(["will".to_string()])
.add_to_labels(["be".to_string(), "added".to_string()])
.register_fn("never_called", |_j: Job| async move { unreachable!() })
.connect_with(s.clone(), None)
.connect_with(BufStream::new(s.clone()), None)
.await
.unwrap();
let written = s.pop_bytes_written(0);
Expand All @@ -104,7 +104,7 @@ async fn hello_pwd() {
let mut s = mock::Stream::with_salt(1545, "55104dc76695721d");
let w: Worker<io::Error> = WorkerBuilder::default()
.register_fn("never_called", |_j: Job| async move { unreachable!() })
.connect_with_buffered(BufStream::new(s.clone()), Some("foobar".to_string()))
.connect_with(BufStream::new(s.clone()), Some("foobar".to_string()))
.await
.unwrap();
let written = s.pop_bytes_written(0);
Expand All @@ -126,7 +126,7 @@ async fn dequeue() {
assert_eq!(job.args(), &["z"]);
Ok::<(), io::Error>(())
})
.connect_with(s.clone(), None)
.connect_with(BufStream::new(s.clone()), None)
.await
.unwrap();
s.ignore(0);
Expand Down Expand Up @@ -167,7 +167,7 @@ async fn dequeue_first_empty() {
assert_eq!(job.args(), &["z"]);
Ok::<(), io::Error>(())
})
.connect_with(s.clone(), None)
.connect_with(BufStream::new(s.clone()), None)
.await
.unwrap();
s.ignore(0);
Expand Down Expand Up @@ -226,7 +226,7 @@ async fn well_behaved() {
sleep(Duration::from_secs(7)).await;
Ok::<(), io::Error>(())
})
.connect_with(s.clone(), None)
.connect_with(BufStream::new(s.clone()), None)
.await
.unwrap();
s.ignore(0);
Expand Down Expand Up @@ -295,7 +295,7 @@ async fn no_first_job() {
sleep(Duration::from_secs(7)).await;
Ok::<(), io::Error>(())
})
.connect_with(s.clone(), None)
.connect_with(BufStream::new(s.clone()), None)
.await
.unwrap();
s.ignore(0);
Expand Down Expand Up @@ -366,7 +366,7 @@ async fn well_behaved_many() {
sleep(Duration::from_secs(7)).await;
Ok::<(), io::Error>(())
})
.connect_with(s.clone(), None)
.connect_with(BufStream::new(s.clone()), None)
.await
.unwrap();
s.ignore(0);
Expand Down Expand Up @@ -451,7 +451,7 @@ async fn terminate() {
sleep(Duration::from_secs(5)).await;
}
})
.connect_with(s.clone(), None)
.connect_with(BufStream::new(s.clone()), None)
.await
.unwrap();

Expand Down Expand Up @@ -569,7 +569,7 @@ async fn heart_broken() {
sleep(Duration::from_secs(7)).await;
Ok(())
})
.connect_with(s.clone(), None)
.connect_with(BufStream::new(s.clone()), None)
.await
.unwrap();

Expand Down

0 comments on commit 1227381

Please sign in to comment.