From 0e85a61987eb59a4f23d027d903c1f15bafa389f Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Wed, 14 Aug 2024 15:26:15 +0400 Subject: [PATCH] Impl native_tls arm in WorkerBuilder::connect --- src/tls/native_tls.rs | 18 ++++++++++ src/tls/rustls.rs | 73 ++++++++++++++++++++--------------------- src/worker/builder.rs | 29 +++++++++------- tests/tls/native_tls.rs | 37 +++++++++++++++++++-- 4 files changed, 105 insertions(+), 52 deletions(-) diff --git a/src/tls/native_tls.rs b/src/tls/native_tls.rs index 16bf3eae..5304f2f1 100644 --- a/src/tls/native_tls.rs +++ b/src/tls/native_tls.rs @@ -62,6 +62,24 @@ impl TlsStream { .await } + /// Create a new TLS connection over TCP ***dangerously*** skipping TLS verification. + /// + /// Similar to [`TlsStream::connect`], but accepting invalid server certificates and + /// invalid hostnames. + pub async fn connect_dangerously_skipping_verification( + url: Option<&str>, + ) -> Result { + TlsStream::with_connector( + TlsConnector::builder() + .danger_accept_invalid_certs(true) + .danger_accept_invalid_hostnames(true) + .build() + .map_err(error::Stream::NativeTls)?, + url, + ) + .await + } + /// Create a new TLS connection over TCP using a non-default TLS configuration. /// /// See `connect` for details about the `url` parameter. diff --git a/src/tls/rustls.rs b/src/tls/rustls.rs index 708f1cfd..ff5755de 100644 --- a/src/tls/rustls.rs +++ b/src/tls/rustls.rs @@ -61,8 +61,8 @@ impl TlsStream { /// /// If `url` is given, but does not specify a port, it defaults to 7419. /// - /// Internally creates a `ClientConfig` with an empty root certificates store and no client - /// authentication. Use [`with_client_config`](TlsStream::with_client_config) + /// Internally creates a `ClientConfig` with an _empty_ root certificates store and _no client + /// authentication_. Use [`with_client_config`](TlsStream::with_client_config) /// or [`with_connector`](TlsStream::with_connector) for customized /// `ClientConfig` and `TlsConnector` accordingly. pub async fn connect(url: Option<&str>) -> Result { @@ -73,6 +73,39 @@ impl TlsStream { TlsStream::with_connector(con, url).await } + /// Create a new TLS connection over TCP using native certificates. + /// + /// Unlike [`TlsStream::connect`], creates a root certificates store populated with the certificates + /// loaded from a platform-native certificate store. Thos method also allows to ***dangerously*** + /// skip server certificates verification. + pub async fn connect_with_native_certs( + url: Option<&str>, + dangerously_skip_verify: bool, + ) -> Result { + let mut store = RootCertStore::empty(); + for cert in rustls_native_certs::load_native_certs()? { + store.add(cert).map_err(io::Error::other)?; + } + + let config = if dangerously_skip_verify { + let cert_verifier = WebPkiServerVerifier::builder(Arc::new(store.clone())) + .build() + .expect("can construct standard verifier"); + let mut config = ClientConfig::builder() + .with_root_certificates(store) + .with_no_client_auth(); + config + .dangerous() + .set_certificate_verifier(Arc::new(NoCertVerification(cert_verifier))); + config + } else { + ClientConfig::builder() + .with_root_certificates(store) + .with_no_client_auth() + }; + TlsStream::with_connector(TlsConnector::from(Arc::new(config)), url).await + } + /// Create a new TLS connection over TCP using a non-default TLS configuration. /// /// See `connect` for details about the `url` parameter. @@ -114,42 +147,6 @@ where Self::new(stream, TlsConnector::from(Arc::new(conf)), hostname).await } - /// Create a new TLS connection on an existing stream using native certificates. - /// - /// Internally creates a `ClientConfig` with no client authenticatiom and a root certificates - /// store populated with the certificates loaded from a platform-native certificate store. - /// - /// Use [`new`](TlsStream::new) for a customized `TlsConnector`. - pub async fn with_native_certs( - stream: S, - hostname: String, - skip_verify: bool, - ) -> io::Result { - let mut store = RootCertStore::empty(); - for cert in rustls_native_certs::load_native_certs()? { - store.add(cert).map_err(io::Error::other)?; - } - - let config = if skip_verify { - let cert_verifier = WebPkiServerVerifier::builder(Arc::new(store.clone())) - .build() - .expect("can construct standard verifier"); - let mut config = ClientConfig::builder() - .with_root_certificates(store) - .with_no_client_auth(); - config - .dangerous() - .set_certificate_verifier(Arc::new(NoCertVerification(cert_verifier))); - config - } else { - ClientConfig::builder() - .with_root_certificates(store) - .with_no_client_auth() - }; - - Self::new(stream, TlsConnector::from(Arc::new(config)), hostname).await - } - /// Create a new TLS connection on an existing stream with a non-default TLS configuration. pub async fn new(stream: S, connector: TlsConnector, hostname: String) -> io::Result { let server_name = hostname diff --git a/src/worker/builder.rs b/src/worker/builder.rs index 6b023a02..3bbfcd82 100644 --- a/src/worker/builder.rs +++ b/src/worker/builder.rs @@ -340,28 +340,33 @@ impl WorkerBuilder { /// /// If `url` is given, but does not specify a port, it defaults to 7419. pub async fn connect(self, url: Option<&str>) -> Result, Error> { - let url = utils::parse_provided_or_from_env(url)?; - let addr = utils::host_from_url(&url); - let stream = TokioStream::connect(addr).await?; + let parsed_url = utils::parse_provided_or_from_env(url)?; + let password = parsed_url.password().map(|p| p.to_string()); match self.tls_kind { TlsKind::None => { - self.connect_with(stream, url.password().map(|p| p.to_string())) - .await + let addr = utils::host_from_url(&parsed_url); + let stream = TokioStream::connect(addr).await?; + self.connect_with(stream, password).await } #[cfg(feature = "rustls")] TlsKind::Rust => { - let hostname = url.host_str().unwrap().to_string(); - let tls_tream = crate::rustls::TlsStream::with_native_certs( - stream, - hostname, + let stream = crate::rustls::TlsStream::connect_with_native_certs( + url, self.skip_verify_server_certs, ) .await?; - self.connect_with(tls_tream, url.password().map(|p| p.to_string())) - .await + self.connect_with(stream, password).await } #[cfg(feature = "native_tls")] - TlsKind::Native => unimplemented!(), + TlsKind::Native => { + let stream = if self.skip_verify_server_certs { + crate::native_tls::TlsStream::connect_dangerously_skipping_verification(url) + .await? + } else { + crate::native_tls::TlsStream::connect(url).await? + }; + self.connect_with(stream, password).await + } } } } diff --git a/tests/tls/native_tls.rs b/tests/tls/native_tls.rs index 4f4c5c20..0e399d9e 100644 --- a/tests/tls/native_tls.rs +++ b/tests/tls/native_tls.rs @@ -1,5 +1,5 @@ use faktory::native_tls::TlsStream; -use faktory::{Client, Job, WorkerBuilder, WorkerId}; +use faktory::{Client, Job, Worker, WorkerId}; use serde_json::Value; use std::{env, sync}; use url::Url; @@ -37,7 +37,7 @@ async fn roundtrip_tls() { .password() .map(|p| p.to_string()); - let mut worker = WorkerBuilder::default() + let mut worker = Worker::builder() .hostname("tester".to_string()) .wid(WorkerId::new(local)) .register(local, fixtures::JobHandler::new(tx)) @@ -61,6 +61,39 @@ async fn roundtrip_tls() { assert_eq!(job.args(), &[Value::from("z")]); } +#[tokio::test(flavor = "multi_thread")] +async fn roundtrip_tls_with_worker_builder() { + if env::var_os("FAKTORY_URL_SECURE").is_none() { + return; + } + + let local = "roundtrip_tls_with_worker_builder"; + let (tx, rx) = sync::mpsc::channel(); + + let mut worker = Worker::builder() + .register(local, fixtures::JobHandler::new(tx)) + .with_native_tls() + .dangerously_skip_verify_server_certs() + .connect(Some(&env::var("FAKTORY_URL_SECURE").unwrap())) + .await + .unwrap(); + + // "one-shot" producer + Client::connect(Some(&env::var("FAKTORY_URL").unwrap())) + .await + .unwrap() + .enqueue(Job::new(local, vec!["z"]).on_queue(local)) + .await + .unwrap(); + + worker.run_one(0, &[local]).await.unwrap(); + + let job = rx.recv().unwrap(); + assert_eq!(job.queue, local); + assert_eq!(job.kind(), local); + assert_eq!(job.args(), &[Value::from("z")]); +} + mod fixtures { pub use handler::JobHandler;