Skip to content

Commit

Permalink
Impl native_tls arm in WorkerBuilder::connect
Browse files Browse the repository at this point in the history
  • Loading branch information
rustworthy committed Aug 14, 2024
1 parent c6db906 commit 0e85a61
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 52 deletions.
18 changes: 18 additions & 0 deletions src/tls/native_tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,24 @@ impl TlsStream<TokioTcpStream> {
.await
}

/// Create a new TLS connection over TCP ***dangerously*** skipping TLS verification.
///
/// Similar to [`TlsStream::connect`], but accepting invalid server certificates and
/// invalid hostnames.
pub async fn connect_dangerously_skipping_verification(
url: Option<&str>,
) -> Result<Self, Error> {
TlsStream::with_connector(
TlsConnector::builder()
.danger_accept_invalid_certs(true)
.danger_accept_invalid_hostnames(true)
.build()
.map_err(error::Stream::NativeTls)?,
url,
)
.await
}

/// Create a new TLS connection over TCP using a non-default TLS configuration.
///
/// See `connect` for details about the `url` parameter.
Expand Down
73 changes: 35 additions & 38 deletions src/tls/rustls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ impl TlsStream<TokioTcpStream> {
///
/// If `url` is given, but does not specify a port, it defaults to 7419.
///
/// Internally creates a `ClientConfig` with an empty root certificates store and no client
/// authentication. Use [`with_client_config`](TlsStream::with_client_config)
/// Internally creates a `ClientConfig` with an _empty_ root certificates store and _no client
/// authentication_. Use [`with_client_config`](TlsStream::with_client_config)
/// or [`with_connector`](TlsStream::with_connector) for customized
/// `ClientConfig` and `TlsConnector` accordingly.
pub async fn connect(url: Option<&str>) -> Result<Self, Error> {
Expand All @@ -73,6 +73,39 @@ impl TlsStream<TokioTcpStream> {
TlsStream::with_connector(con, url).await
}

/// Create a new TLS connection over TCP using native certificates.
///
/// Unlike [`TlsStream::connect`], creates a root certificates store populated with the certificates
/// loaded from a platform-native certificate store. Thos method also allows to ***dangerously***
/// skip server certificates verification.
pub async fn connect_with_native_certs(
url: Option<&str>,
dangerously_skip_verify: bool,
) -> Result<Self, Error> {
let mut store = RootCertStore::empty();
for cert in rustls_native_certs::load_native_certs()? {
store.add(cert).map_err(io::Error::other)?;
}

let config = if dangerously_skip_verify {
let cert_verifier = WebPkiServerVerifier::builder(Arc::new(store.clone()))
.build()
.expect("can construct standard verifier");
let mut config = ClientConfig::builder()
.with_root_certificates(store)
.with_no_client_auth();
config
.dangerous()
.set_certificate_verifier(Arc::new(NoCertVerification(cert_verifier)));
config
} else {
ClientConfig::builder()
.with_root_certificates(store)
.with_no_client_auth()
};
TlsStream::with_connector(TlsConnector::from(Arc::new(config)), url).await
}

/// Create a new TLS connection over TCP using a non-default TLS configuration.
///
/// See `connect` for details about the `url` parameter.
Expand Down Expand Up @@ -114,42 +147,6 @@ where
Self::new(stream, TlsConnector::from(Arc::new(conf)), hostname).await
}

/// Create a new TLS connection on an existing stream using native certificates.
///
/// Internally creates a `ClientConfig` with no client authenticatiom and a root certificates
/// store populated with the certificates loaded from a platform-native certificate store.
///
/// Use [`new`](TlsStream::new) for a customized `TlsConnector`.
pub async fn with_native_certs(
stream: S,
hostname: String,
skip_verify: bool,
) -> io::Result<Self> {
let mut store = RootCertStore::empty();
for cert in rustls_native_certs::load_native_certs()? {
store.add(cert).map_err(io::Error::other)?;
}

let config = if skip_verify {
let cert_verifier = WebPkiServerVerifier::builder(Arc::new(store.clone()))
.build()
.expect("can construct standard verifier");
let mut config = ClientConfig::builder()
.with_root_certificates(store)
.with_no_client_auth();
config
.dangerous()
.set_certificate_verifier(Arc::new(NoCertVerification(cert_verifier)));
config
} else {
ClientConfig::builder()
.with_root_certificates(store)
.with_no_client_auth()
};

Self::new(stream, TlsConnector::from(Arc::new(config)), hostname).await
}

/// Create a new TLS connection on an existing stream with a non-default TLS configuration.
pub async fn new(stream: S, connector: TlsConnector, hostname: String) -> io::Result<Self> {
let server_name = hostname
Expand Down
29 changes: 17 additions & 12 deletions src/worker/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,28 +340,33 @@ impl<E: 'static> WorkerBuilder<E> {
///
/// If `url` is given, but does not specify a port, it defaults to 7419.
pub async fn connect(self, url: Option<&str>) -> Result<Worker<E>, Error> {
let url = utils::parse_provided_or_from_env(url)?;
let addr = utils::host_from_url(&url);
let stream = TokioStream::connect(addr).await?;
let parsed_url = utils::parse_provided_or_from_env(url)?;
let password = parsed_url.password().map(|p| p.to_string());
match self.tls_kind {
TlsKind::None => {
self.connect_with(stream, url.password().map(|p| p.to_string()))
.await
let addr = utils::host_from_url(&parsed_url);
let stream = TokioStream::connect(addr).await?;
self.connect_with(stream, password).await
}
#[cfg(feature = "rustls")]
TlsKind::Rust => {
let hostname = url.host_str().unwrap().to_string();
let tls_tream = crate::rustls::TlsStream::with_native_certs(
stream,
hostname,
let stream = crate::rustls::TlsStream::connect_with_native_certs(
url,
self.skip_verify_server_certs,
)
.await?;
self.connect_with(tls_tream, url.password().map(|p| p.to_string()))
.await
self.connect_with(stream, password).await
}
#[cfg(feature = "native_tls")]
TlsKind::Native => unimplemented!(),
TlsKind::Native => {
let stream = if self.skip_verify_server_certs {
crate::native_tls::TlsStream::connect_dangerously_skipping_verification(url)
.await?
} else {
crate::native_tls::TlsStream::connect(url).await?
};
self.connect_with(stream, password).await
}
}
}
}
37 changes: 35 additions & 2 deletions tests/tls/native_tls.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use faktory::native_tls::TlsStream;
use faktory::{Client, Job, WorkerBuilder, WorkerId};
use faktory::{Client, Job, Worker, WorkerId};
use serde_json::Value;
use std::{env, sync};
use url::Url;
Expand Down Expand Up @@ -37,7 +37,7 @@ async fn roundtrip_tls() {
.password()
.map(|p| p.to_string());

let mut worker = WorkerBuilder::default()
let mut worker = Worker::builder()
.hostname("tester".to_string())
.wid(WorkerId::new(local))
.register(local, fixtures::JobHandler::new(tx))
Expand All @@ -61,6 +61,39 @@ async fn roundtrip_tls() {
assert_eq!(job.args(), &[Value::from("z")]);
}

#[tokio::test(flavor = "multi_thread")]
async fn roundtrip_tls_with_worker_builder() {
if env::var_os("FAKTORY_URL_SECURE").is_none() {
return;
}

let local = "roundtrip_tls_with_worker_builder";
let (tx, rx) = sync::mpsc::channel();

let mut worker = Worker::builder()
.register(local, fixtures::JobHandler::new(tx))
.with_native_tls()
.dangerously_skip_verify_server_certs()
.connect(Some(&env::var("FAKTORY_URL_SECURE").unwrap()))
.await
.unwrap();

// "one-shot" producer
Client::connect(Some(&env::var("FAKTORY_URL").unwrap()))
.await
.unwrap()
.enqueue(Job::new(local, vec!["z"]).on_queue(local))
.await
.unwrap();

worker.run_one(0, &[local]).await.unwrap();

let job = rx.recv().unwrap();
assert_eq!(job.queue, local);
assert_eq!(job.kind(), local);
assert_eq!(job.args(), &[Value::from("z")]);
}

mod fixtures {
pub use handler::JobHandler;

Expand Down

0 comments on commit 0e85a61

Please sign in to comment.