From 69ced1f806aab5c2dacbddf4a47b75fd6c3dffe8 Mon Sep 17 00:00:00 2001 From: --show-origin Date: Sat, 13 Jul 2024 19:49:12 +0400 Subject: [PATCH 01/14] Add tls option to worker builder --- Cargo.lock | 93 ++++++++++++++++++++++++++++++++++++------- Cargo.toml | 4 +- src/tls/mod.rs | 6 +++ src/tls/rustls.rs | 89 ++++++++++++++++++++++++++++++++++++++++- src/worker/builder.rs | 53 ++++++++++++++++++++++-- tests/tls/rustls.rs | 36 ++++++++++++++++- 6 files changed, 258 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a4de9e5c..2c023b12 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -172,11 +172,17 @@ dependencies = [ "rustc-demangle", ] +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "bitflags" -version = "1.0.0" +version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5cde24d1b2e2216a726368b2363a273739c91f4e3eb4e0dd12d672d396ad989" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" @@ -289,6 +295,16 @@ dependencies = [ "libc", ] +[[package]] +name = "core-foundation" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" +dependencies = [ + "core-foundation-sys 0.8.6", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.7.0" @@ -297,9 +313,9 @@ checksum = "b3a71ab494c0b5b860bdc8407ae08978052417070c2ced38573a9157ad75b8ac" [[package]] name = "core-foundation-sys" -version = "0.8.3" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" +checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" [[package]] name = "cpufeatures" @@ -492,6 +508,7 @@ dependencies = [ "openssl", "pin-project", "rand 0.8.0", + "rustls-native-certs", "rustls-pki-types", "semver", "serde", @@ -534,7 +551,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b5365afd01fdf916e775a224e844f80b3b9710d0f4f00903e219e859474d7ae" dependencies = [ - "bitflags 1.0.0", + "bitflags 1.3.2", "fuchsia-zircon-sys", ] @@ -595,7 +612,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c17cc76786e99f8d2f055c11159e7f0091c42474dcc3189fbab96072e873e6d" dependencies = [ "android_system_properties", - "core-foundation-sys 0.8.3", + "core-foundation-sys 0.8.6", "iana-time-zone-haiku", "js-sys", "wasm-bindgen", @@ -740,8 +757,8 @@ dependencies = [ "openssl-probe", "openssl-sys", "schannel", - "security-framework", - "security-framework-sys", + "security-framework 0.4.1", + "security-framework-sys 0.4.1", "tempfile", ] @@ -859,9 +876,9 @@ dependencies = [ [[package]] name = "openssl-probe" -version = "0.1.0" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "756d49c8424483a3df3b5d735112b4da22109ced9a8294f1f5cdf80fb3810919" +checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-sys" @@ -1045,11 +1062,34 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a88d6d420651b496bdd98684116959239430022a115c1240e6c3993be0b15fba" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "rustls-pki-types", + "schannel", + "security-framework 2.10.0", +] + +[[package]] +name = "rustls-pemfile" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29993a25686778eb88d4189742cd713c9bce943bc54251a33509dc63cbacf73d" +dependencies = [ + "base64", + "rustls-pki-types", +] + [[package]] name = "rustls-pki-types" -version = "1.0.1" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7673e0aa20ee4937c6aacfc12bb8341cfbf054cdd21df6bec5fd0629fe9339b" +checksum = "976295e77ce332211c0d24d92c0e83e50f5c5f046d11082cea19f3df13a3562d" [[package]] name = "rustls-webpki" @@ -1084,10 +1124,23 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97bbedbe81904398b6ebb054b3e912f99d55807125790f3198ac990d98def5b0" dependencies = [ - "bitflags 1.0.0", - "core-foundation", + "bitflags 1.3.2", + "core-foundation 0.7.0", "core-foundation-sys 0.7.0", - "security-framework-sys", + "security-framework-sys 0.4.1", +] + +[[package]] +name = "security-framework" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "770452e37cad93e0a50d5abc3990d2bc351c36d0328f86cefec2f2fb206eaef6" +dependencies = [ + "bitflags 1.3.2", + "core-foundation 0.9.4", + "core-foundation-sys 0.8.6", + "libc", + "security-framework-sys 2.11.0", ] [[package]] @@ -1100,6 +1153,16 @@ dependencies = [ "libc", ] +[[package]] +name = "security-framework-sys" +version = "2.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "317936bbbd05227752583946b9e66d7ce3b489f84e11a94a510b4437fef407d7" +dependencies = [ + "core-foundation-sys 0.8.6", + "libc", +] + [[package]] name = "semver" version = "1.0.23" diff --git a/Cargo.toml b/Cargo.toml index ee94fa0d..b9c40a09 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,7 @@ exclude = [".github", "docker", ".gitignore", "Makefile"] [features] default = [] native_tls = ["dep:pin-project", "dep:tokio-native-tls"] -rustls = ["dep:pin-project", "dep:tokio-rustls"] +rustls = ["dep:pin-project", "dep:tokio-rustls", "dep:rustls-pki-types"] binaries = ["dep:clap", "tokio/macros"] ent = [] @@ -48,6 +48,8 @@ tokio-native-tls = { version = "0.3.1", optional = true } tokio-rustls = { version = "0.25.0", optional = true } url = "2" semver = { version = "1.0.23", features = ["serde"] } +rustls-native-certs = "0.7.1" +rustls-pki-types = { version = "1.0.1", optional = true } [dev-dependencies] rustls-pki-types = "1.0.1" diff --git a/src/tls/mod.rs b/src/tls/mod.rs index 95cf94c7..de429b7d 100644 --- a/src/tls/mod.rs +++ b/src/tls/mod.rs @@ -10,3 +10,9 @@ pub mod native_tls; #[cfg_attr(docsrs, doc(cfg(feature = "rustls")))] /// Namespace for Rustls-powered [`TlsStream`](crate::rustls::TlsStream). pub mod rustls; + +#[derive(Debug, Clone)] +pub(crate) enum TlsKind { + Native, + Rust, +} diff --git a/src/tls/rustls.rs b/src/tls/rustls.rs index 642b4d24..c082fccb 100644 --- a/src/tls/rustls.rs +++ b/src/tls/rustls.rs @@ -3,13 +3,20 @@ use crate::{Client, WorkerBuilder}; use crate::proto::{self, utils}; use crate::{Error, Reconnect}; +use rustls_pki_types::{CertificateDer, ServerName, UnixTime}; use std::io; use std::ops::{Deref, DerefMut}; use std::sync::Arc; use tokio::io::{AsyncRead, AsyncWrite, BufStream}; use tokio::net::TcpStream as TokioTcpStream; use tokio_rustls::client::TlsStream as RustlsStream; -use tokio_rustls::rustls::{ClientConfig, RootCertStore}; +use tokio_rustls::rustls::client::danger::{ + HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier, +}; +use tokio_rustls::rustls::{ + client::WebPkiServerVerifier, ClientConfig, DigitallySignedStruct, Error as RustlsError, + RootCertStore, SignatureScheme, +}; use tokio_rustls::TlsConnector; /// A reconnectable stream encrypted with TLS. @@ -96,7 +103,9 @@ where /// Create a new TLS connection on an existing stream. /// /// Internally creates a `ClientConfig` with an empty root certificates store and no client - /// authentication. Use [`new`](TlsStream::new) for a customized `TlsConnector`. + /// authentication. + /// + /// Use [`new`](TlsStream::new) for a customized `TlsConnector`. pub async fn default(stream: S, hostname: String) -> io::Result { let conf = ClientConfig::builder() .with_root_certificates(RootCertStore::empty()) @@ -105,6 +114,42 @@ where Self::new(stream, TlsConnector::from(Arc::new(conf)), hostname).await } + /// Create a new TLS connection on an existing stream using native certificates. + /// + /// Internally creates a `ClientConfig` with no client authenticatiom and a root certificates + /// store populated with the certificates loaded from a platform-native certificate store. + /// + /// Use [`new`](TlsStream::new) for a customized `TlsConnector`. + pub async fn with_native_certs( + stream: S, + hostname: String, + skip_verify: bool, + ) -> io::Result { + let mut store = RootCertStore::empty(); + for cert in rustls_native_certs::load_native_certs()? { + store.add(cert).map_err(io::Error::other)?; + } + + let config = if skip_verify { + let cert_verifier = WebPkiServerVerifier::builder(Arc::new(store.clone())) + .build() + .expect("can construct standard verifier"); + let mut config = ClientConfig::builder() + .with_root_certificates(store) + .with_no_client_auth(); + config + .dangerous() + .set_certificate_verifier(Arc::new(NoCertVerification(cert_verifier))); + config + } else { + ClientConfig::builder() + .with_root_certificates(store) + .with_no_client_auth() + }; + + Self::new(stream, TlsConnector::from(Arc::new(config)), hostname).await + } + /// Create a new TLS connection on an existing stream with a non-default TLS configuration. pub async fn new(stream: S, connector: TlsConnector, hostname: String) -> io::Result { let server_name = hostname @@ -123,6 +168,46 @@ where } } +#[derive(Debug)] +struct NoCertVerification(Arc); + +impl ServerCertVerifier for NoCertVerification { + fn verify_server_cert( + &self, + _: &CertificateDer<'_>, + _: &[CertificateDer<'_>], + _: &ServerName<'_>, + _: &[u8], + _: UnixTime, + ) -> Result { + Ok(ServerCertVerified::assertion()) + } + + fn verify_tls12_signature( + &self, + message: &[u8], + cert: &rustls_pki_types::CertificateDer<'_>, + dss: &DigitallySignedStruct, + ) -> Result { + self.0.verify_tls12_signature(message, cert, dss) + } + + fn verify_tls13_signature( + &self, + _message: &[u8], + _cert: &CertificateDer<'_>, + _dss: &DigitallySignedStruct, + ) -> Result { + // TODO: figure out what's wring with the test cert + Ok(HandshakeSignatureValid::assertion()) + //self.0.verify_tls13_signature(message, cert, dss) + } + + fn supported_verify_schemes(&self) -> Vec { + self.0.supported_verify_schemes() + } +} + #[async_trait::async_trait] impl Reconnect for BufStream> { async fn reconnect(&mut self) -> io::Result { diff --git a/src/worker/builder.rs b/src/worker/builder.rs index 7f979c5f..2dd2b8eb 100644 --- a/src/worker/builder.rs +++ b/src/worker/builder.rs @@ -1,7 +1,7 @@ use super::{runner::Closure, CallbacksRegistry, Client, Worker}; use crate::{ proto::{utils, ClientOptions}, - Error, Job, JobRunner, Reconnect, WorkerId, + Error, Job, JobRunner, Reconnect, TlsKind, WorkerId, }; use std::future::Future; use tokio::io::{AsyncRead, AsyncWrite, BufStream}; @@ -14,6 +14,8 @@ pub struct WorkerBuilder { opts: ClientOptions, workers_count: usize, callbacks: CallbacksRegistry, + tls_kind: Option, + skip_verify_server_certs: bool, } impl Default for WorkerBuilder { @@ -32,6 +34,8 @@ impl Default for WorkerBuilder { opts: ClientOptions::default(), workers_count: 1, callbacks: CallbacksRegistry::default(), + tls_kind: None, + skip_verify_server_certs: true, } } } @@ -122,6 +126,33 @@ impl WorkerBuilder { self } + /// Make the traffic between this worker and Faktory encrypted with native TLS. + /// + /// The underlying crate (`native-tls`) will use _SChannel_ on Windows, + /// _SecureTransport_ on OSX, and _OpenSSL_ on other platforms. + /// + /// Note that if you use this method on the builder, but eventually use [`WorkerBuilder::connect_with`] + /// (rather than [`WorkerBuilder::connect`]) to create an instance of [`Worker`], this worker + /// will be connected to the Faktory server with the stream you've provided to `connect_with`. + #[cfg(feature = "native_tls")] + #[cfg_attr(docsrs, doc(cfg(feature = "native_tls")))] + pub fn use_native_tls(mut self) -> Self { + self.tls_kind = Some(TlsKind::Native); + self + } + + /// Make the traffic between this worker and Faktory encrypted with [`rustls`](https://github.com/rustls/rustls). + /// + /// Note that if you use this method on the builder, but eventually use [`WorkerBuilder::connect_with`] + /// (rather than [`WorkerBuilder::connect`]) to create an instance of [`Worker`], this worker + /// will be connected to the Faktory server with the stream you've provided to `connect_with`. + #[cfg(feature = "rustls")] + #[cfg_attr(docsrs, doc(cfg(feature = "rustls")))] + pub fn use_rustls(mut self) -> Self { + self.tls_kind = Some(TlsKind::Rust); + self + } + /// Connect to a Faktory server with a non-standard stream. pub async fn connect_with( mut self, @@ -154,7 +185,23 @@ impl WorkerBuilder { /// If `url` is given, but does not specify a port, it defaults to 7419. pub async fn connect(self, url: Option<&str>) -> Result, Error> { let url = utils::parse_provided_or_from_env(url)?; - let stream = TokioStream::connect(utils::host_from_url(&url)).await?; - self.connect_with(stream, None).await + let addr = utils::host_from_url(&url); + let stream = TokioStream::connect(addr).await?; + match self.tls_kind { + None => self.connect_with(stream, None).await, + + #[cfg(feature = "rustls")] + Some(TlsKind::Rust) => { + let hostname = url.host_str().unwrap().to_string(); + let tls_tream = crate::rustls::TlsStream::with_native_certs( + stream, + hostname, + self.skip_verify_server_certs, + ) + .await?; + self.connect_with(tls_tream, None).await + } + _ => unimplemented!(), + } } } diff --git a/tests/tls/rustls.rs b/tests/tls/rustls.rs index 9bdc4301..764b7c94 100644 --- a/tests/tls/rustls.rs +++ b/tests/tls/rustls.rs @@ -1,5 +1,5 @@ use faktory::rustls::TlsStream; -use faktory::{Client, Job, WorkerBuilder, WorkerId}; +use faktory::{Client, Job, Worker, WorkerBuilder, WorkerId}; use serde_json::Value; use std::{ env, @@ -53,7 +53,7 @@ async fn roundtrip_tls() { .await .unwrap(); - // "one-shot" client + // "one-shot" producer Client::connect_with(tls().await, None) .await .unwrap() @@ -69,6 +69,38 @@ async fn roundtrip_tls() { assert_eq!(job.args(), &[Value::from("z")]); } +#[tokio::test(flavor = "multi_thread")] +async fn roundtrip_tls_with_worker_builder() { + if env::var_os("FAKTORY_URL_SECURE").is_none() { + return; + } + + let local = "roundtrip_tls_with_worker_builder"; + let (tx, rx) = sync::mpsc::channel(); + + let mut worker = Worker::builder() + .register(local, fixtures::JobHandler::new(tx)) + .use_rustls() + .connect(Some(&env::var("FAKTORY_URL_SECURE").unwrap())) + .await + .unwrap(); + + // "one-shot" producer + Client::connect(None) + .await + .unwrap() + .enqueue(Job::new(local, vec!["z"]).on_queue(local)) + .await + .unwrap(); + + worker.run_one(0, &[local]).await.unwrap(); + + let job = rx.recv().unwrap(); + assert_eq!(job.queue, local); + assert_eq!(job.kind(), local); + assert_eq!(job.args(), &[Value::from("z")]); +} + mod fixtures { pub use handler::JobHandler; pub use tls::TestServerCertVerifier; From 2d59efc222701f2064c7380751ba558534905959 Mon Sep 17 00:00:00 2001 From: --show-origin Date: Sat, 13 Jul 2024 19:51:55 +0400 Subject: [PATCH 02/14] Make rustls-native-certs to optional dep --- Cargo.toml | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b9c40a09..05d28760 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,12 @@ exclude = [".github", "docker", ".gitignore", "Makefile"] [features] default = [] native_tls = ["dep:pin-project", "dep:tokio-native-tls"] -rustls = ["dep:pin-project", "dep:tokio-rustls", "dep:rustls-pki-types"] +rustls = [ + "dep:pin-project", + "dep:tokio-rustls", + "dep:rustls-pki-types", + "dep:rustls-native-certs", +] binaries = ["dep:clap", "tokio/macros"] ent = [] @@ -48,7 +53,7 @@ tokio-native-tls = { version = "0.3.1", optional = true } tokio-rustls = { version = "0.25.0", optional = true } url = "2" semver = { version = "1.0.23", features = ["serde"] } -rustls-native-certs = "0.7.1" +rustls-native-certs = { version = "0.7.1", optional = true } rustls-pki-types = { version = "1.0.1", optional = true } [dev-dependencies] From 0d9bbd7a7a8f0504a76d02f5dd19e497a5747b46 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Sat, 10 Aug 2024 15:58:47 +0400 Subject: [PATCH 03/14] Use FAKTORY_URL with password in tls tests --- .github/workflows/tls.yml | 1 + Makefile | 1 + src/tls/rustls.rs | 3 ++- tests/tls/rustls.rs | 2 +- 4 files changed, 5 insertions(+), 2 deletions(-) diff --git a/.github/workflows/tls.yml b/.github/workflows/tls.yml index 8c6fa276..8f27c9aa 100644 --- a/.github/workflows/tls.yml +++ b/.github/workflows/tls.yml @@ -33,4 +33,5 @@ jobs: - name: Run tests env: FAKTORY_URL_SECURE: tcp://:uredinales@localhost:17419 + FAKTORY_URL: tcp://:uredinales@localhost:7419 run: cargo test --locked --features native_tls,rustls --test tls diff --git a/Makefile b/Makefile index 79e49cfe..968e667c 100644 --- a/Makefile +++ b/Makefile @@ -59,6 +59,7 @@ test/e2e: .PHONY: test/e2e/tls test/e2e/tls: FAKTORY_URL_SECURE=tcp://:${FAKTORY_PASSWORD}@${FAKTORY_HOST}:${FAKTORY_PORT_SECURE} \ + FAKTORY_URL=tcp://:${FAKTORY_PASSWORD}@${FAKTORY_HOST}:${FAKTORY_PORT} \ cargo test --locked --features native_tls,rustls --test tls -- --nocapture .PHONY: test/load diff --git a/src/tls/rustls.rs b/src/tls/rustls.rs index c082fccb..35d81562 100644 --- a/src/tls/rustls.rs +++ b/src/tls/rustls.rs @@ -199,8 +199,9 @@ impl ServerCertVerifier for NoCertVerification { _dss: &DigitallySignedStruct, ) -> Result { // TODO: figure out what's wring with the test cert + // IO(Custom { kind: ConnectionAborted, error: Custom { kind: InvalidData, error: InvalidCertificate(Other(OtherError(UnsupportedCertVersion))) } }) + // self.0.verify_tls13_signature(message, cert, dss) Ok(HandshakeSignatureValid::assertion()) - //self.0.verify_tls13_signature(message, cert, dss) } fn supported_verify_schemes(&self) -> Vec { diff --git a/tests/tls/rustls.rs b/tests/tls/rustls.rs index 783838da..86fc7e63 100644 --- a/tests/tls/rustls.rs +++ b/tests/tls/rustls.rs @@ -93,7 +93,7 @@ async fn roundtrip_tls_with_worker_builder() { .unwrap(); // "one-shot" producer - Client::connect(None) + Client::connect(Some(&env::var("FAKTORY_URL").unwrap())) .await .unwrap() .enqueue(Job::new(local, vec!["z"]).on_queue(local)) From e4fb46fb8f049f085900558212ea24a1a33e7dd3 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Sat, 10 Aug 2024 16:07:14 +0400 Subject: [PATCH 04/14] Make TlsKind visible --- src/tls/mod.rs | 6 ------ src/worker/builder.rs | 12 +++++++++--- tests/tls/rustls.rs | 2 +- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/tls/mod.rs b/src/tls/mod.rs index de429b7d..95cf94c7 100644 --- a/src/tls/mod.rs +++ b/src/tls/mod.rs @@ -10,9 +10,3 @@ pub mod native_tls; #[cfg_attr(docsrs, doc(cfg(feature = "rustls")))] /// Namespace for Rustls-powered [`TlsStream`](crate::rustls::TlsStream). pub mod rustls; - -#[derive(Debug, Clone)] -pub(crate) enum TlsKind { - Native, - Rust, -} diff --git a/src/worker/builder.rs b/src/worker/builder.rs index f4adab40..06162e84 100644 --- a/src/worker/builder.rs +++ b/src/worker/builder.rs @@ -1,7 +1,7 @@ use super::{runner::Closure, CallbacksRegistry, Client, ShutdownSignal, Worker}; use crate::{ proto::{utils, ClientOptions}, - Error, Job, JobRunner, Reconnect, TlsKind, WorkerId, + Error, Job, JobRunner, Reconnect, WorkerId, }; use std::future::Future; use std::sync::Arc; @@ -9,6 +9,12 @@ use std::time::Duration; use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, BufStream}; use tokio::net::TcpStream as TokioStream; +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) enum TlsKind { + Native, + Rust, +} + /// Convenience wrapper for building a Faktory worker. /// /// See the [`Worker`] documentation for details. @@ -245,7 +251,7 @@ impl WorkerBuilder { /// will be connected to the Faktory server with the stream you've provided to `connect_with`. #[cfg(feature = "native_tls")] #[cfg_attr(docsrs, doc(cfg(feature = "native_tls")))] - pub fn use_native_tls(mut self) -> Self { + pub fn with_native_tls(mut self) -> Self { self.tls_kind = Some(TlsKind::Native); self } @@ -257,7 +263,7 @@ impl WorkerBuilder { /// will be connected to the Faktory server with the stream you've provided to `connect_with`. #[cfg(feature = "rustls")] #[cfg_attr(docsrs, doc(cfg(feature = "rustls")))] - pub fn use_rustls(mut self) -> Self { + pub fn with_rustls(mut self) -> Self { self.tls_kind = Some(TlsKind::Rust); self } diff --git a/tests/tls/rustls.rs b/tests/tls/rustls.rs index 86fc7e63..52cf422b 100644 --- a/tests/tls/rustls.rs +++ b/tests/tls/rustls.rs @@ -87,7 +87,7 @@ async fn roundtrip_tls_with_worker_builder() { let mut worker = Worker::builder() .register(local, fixtures::JobHandler::new(tx)) - .use_rustls() + .with_rustls() .connect(Some(&env::var("FAKTORY_URL_SECURE").unwrap())) .await .unwrap(); From 99c03a1b8a76b5d8f9e8f86e591a7702872feded Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Tue, 13 Aug 2024 10:53:14 +0400 Subject: [PATCH 05/14] Issue new certs with minica --- docker/certs/README.md | 11 +++++++++++ docker/certs/faktory.local.crt | 29 ++++++++++------------------- docker/certs/faktory.local.key | 30 ++++-------------------------- src/tls/rustls.rs | 11 ++++------- tests/tls/rustls.rs | 2 +- 5 files changed, 30 insertions(+), 53 deletions(-) create mode 100644 docker/certs/README.md diff --git a/docker/certs/README.md b/docker/certs/README.md new file mode 100644 index 00000000..074e19e3 --- /dev/null +++ b/docker/certs/README.md @@ -0,0 +1,11 @@ +## Important + +The certificate has been produced with [`minica`](https://github.com/jsha/minica): + +```sh +./minica -domains 'localhost' +``` + +The lib's version used was `1.1.0` and the default algorithm at the time of issuance was `ecdsa`. + +The certificate was issued on `August 13, 2024` and will be valid for `2 years and 30 days` (which is a limitation [imposed](https://github.com/jsha/minica/blob/c5ce70c9b524953b13628607abafd7a557c6f074/main.go#L277-L281) by certain platforms). diff --git a/docker/certs/faktory.local.crt b/docker/certs/faktory.local.crt index 1214e42b..677d2d0f 100644 --- a/docker/certs/faktory.local.crt +++ b/docker/certs/faktory.local.crt @@ -1,21 +1,12 @@ -----BEGIN CERTIFICATE----- -MIIDazCCAlMCFAxQwXkfT4M84/fevISct//qQskRMA0GCSqGSIb3DQEBCwUAMHEx -CzAJBgNVBAYTAlVaMREwDwYDVQQIDAhUYXNoa2VudDERMA8GA1UEBwwIVGFzaGtl -bnQxEzARBgNVBAoMCmZha3RvcnktcnMxEzARBgNVBAsMCmZha3RvcnktcnMxEjAQ -BgNVBAMMCWxvY2FsaG9zdDAgFw0yNDAyMDMyMDI1MDlaGA8zMDA0MDQwNjIwMjUw -OVowcTELMAkGA1UEBhMCVVoxETAPBgNVBAgMCFRhc2hrZW50MREwDwYDVQQHDAhU -YXNoa2VudDETMBEGA1UECgwKZmFrdG9yeS1yczETMBEGA1UECwwKZmFrdG9yeS1y -czESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB -CgKCAQEA4ektheqTRy+eHn9j22AxGHqtg/elEiZC0UCLX51ysEkhnLLvFlVFtzd7 -q+nx1PNiHdH5i/TjdAYrXAZhKU/k2YfrgCyOjm/XxSw7ujXPP+cWOmdRYTexT9o7 -Yrg3ZYMniJbbTl8j37dieXHaO7FHAvpww1q/nbQkwD/1WqK1ggQY/OZ38wpUvsws -9LA7shuXdGnjAXunnRGEzZ2EG6T5hYw0PFL+2CHwr0lqNbCur8wu99t4ED9/vfLG -0TWRQwSnApyjHy89rn5Ze3vOiNzcBW778oZxwvzriEmbQQg6RxKE19AlaiV4+n5S -woAi8Ji69BKRUSlxRhW6eX4ABV2eOwIDAQABMA0GCSqGSIb3DQEBCwUAA4IBAQDS -EXuIvVx27LyWlIhfY6vwSWqeUoRXmMFpiBNTTvvHQKlJzLlDyn1b+CqHvMdE9RZh -FI5shZkiqtRRTUGVHB4o0ntwCQmWyV/5FQQ6EYs/bHXUcN2vt1XuU7WK4fRafPPu -snYDgg0TmpGvm+J8W64TfJogWqpPsnT4pOF+aNqW88TTs1JUnNFDBQmw2QKBK+AH -+V4zhpCjVXpKtVMTnDWHQfJh4whelD18lU1jPCbzQrRs2hQWQvtzKWi0YCYc1IXl -4E6eIOHRuiUl/mE3p3f2CGJIwxgrMuxN07ncnwVXBPCaVzSLWJHy0G61mFKH5R/7 -42EC7S/POk5GtzkMJ5Du +MIIB3DCCAWOgAwIBAgIIH7dYNg66/2UwCgYIKoZIzj0EAwMwIDEeMBwGA1UEAxMV +bWluaWNhIHJvb3QgY2EgMDI1MWZmMB4XDTI0MDgxMzA1NTQyOFoXDTI2MDkxMjA1 +NTQyOFowFDESMBAGA1UEAxMJbG9jYWxob3N0MHYwEAYHKoZIzj0CAQYFK4EEACID +YgAENZuBDDayhB5EzmRfErEoIbfE5IjWChNzjO4CLTrECemPqcJbjzsk8MBwB5cb +bHGMeg1nqkqof0ZkgrM4sWZsWNI1H/LODKdXBIqMpbU12iEs7S3eo5RaGlq9CtE6 +dYyAo3YwdDAOBgNVHQ8BAf8EBAMCBaAwHQYDVR0lBBYwFAYIKwYBBQUHAwEGCCsG +AQUFBwMCMAwGA1UdEwEB/wQCMAAwHwYDVR0jBBgwFoAUdieiiQwm+V0FBPIukYU/ +udp7ScwwFAYDVR0RBA0wC4IJbG9jYWxob3N0MAoGCCqGSM49BAMDA2cAMGQCMGL7 +ge3qiN2B0P0bQvf9DNCblvuC7rx6NcZraYpAj9HgO9iUTqyMVxB04uWiOOjE9wIw +D0ciU7opj7CqwaoC3EQbLleMoEuK8LLdHj/JfMxO2I9AlAxzT4ksIg/VSErlUEcv -----END CERTIFICATE----- diff --git a/docker/certs/faktory.local.key b/docker/certs/faktory.local.key index 08e1f32c..d8d59515 100644 --- a/docker/certs/faktory.local.key +++ b/docker/certs/faktory.local.key @@ -1,28 +1,6 @@ -----BEGIN PRIVATE KEY----- -MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDh6S2F6pNHL54e -f2PbYDEYeq2D96USJkLRQItfnXKwSSGcsu8WVUW3N3ur6fHU82Id0fmL9ON0Bitc -BmEpT+TZh+uALI6Ob9fFLDu6Nc8/5xY6Z1FhN7FP2jtiuDdlgyeIlttOXyPft2J5 -cdo7sUcC+nDDWr+dtCTAP/VaorWCBBj85nfzClS+zCz0sDuyG5d0aeMBe6edEYTN -nYQbpPmFjDQ8Uv7YIfCvSWo1sK6vzC7323gQP3+98sbRNZFDBKcCnKMfLz2ufll7 -e86I3NwFbvvyhnHC/OuISZtBCDpHEoTX0CVqJXj6flLCgCLwmLr0EpFRKXFGFbp5 -fgAFXZ47AgMBAAECggEAJjyV4G86O1fDbw0HxUdMOAT3nnkJfv9r2sgObwISueS+ -5CtjDUgkkyS4cXoY3P7O0hZKoxYxc19h8mMACgKETQ9U3G5uOIyUnEJm35cg+4Ns -/ziijQ5knAvndkeQ1MU0qUlDWEoBI+oBqGWNVwIj70ydTmtrOFGX0NRiflNA3n7q -pJbdRZzKnTxXxRwIRuGA1y6SlBLQ740hVOm56iLtRJ+P0kNErSL8Uhws/X9/0MXH -W8r2JVikNumBZH18MK+wBGulwZBcLurFfv31hbeQ/FnckOJ1OE53rnV+tBrZN7Ap -6eR4IMcVPfunnGX+meEUnJfmC0HrdQXucDB8Ey/biQKBgQDygP0JeUKpSWX2uSfV -2c8N0opmC2uHswOhf+H9TOyA4DO5NmlbOqVv+uUwRQvIkoen8XNMCPOyoK7WZNAB -hfyU+ck3HDIBqHbGBisUXDNLgIQIhWVznYK0QC+YYr+rEmFun0sMriuhZsU1q2mW -VoAPSTJhaufRb0TKib9Tarzg4wKBgQDue8jk0tbK5xL9dcyn1CxHtDAbfyQfQnSd -G+GcQDDCamgbKI042A5lPSToYEOpSMTOn/n5CmezsSMFnwuwZAgQ1Pbd3YeknBCi -6jWzqYcC11u3EeX9YPJgEDZq0uSWNZg0phDBsu+PYq7vDAriCsMeQrLMvQb0Fs3n -Pp4vVzSEyQKBgQCb+h1G/6jBzAT6WYNmyE6mPFpqYkQKpzjZorCPxO+FwS9jnLzN -Qf5w9TZ/Apoeqyj3+5RGPqfIqBNssLEdmbmpdLRYbxk2+c1Td1o0IU2Y7ZN/C5YC -dDhCidpTMIjJluv2RBz4jfpgOQL1j0g9u2to6ZKvGBz9F41unITkOY49MwKBgEzk -1qqJHL6BcQsOT3WRoNFh1N0YyoHVwJnjooPp4o7dFkIjeh1o9INKCrtuRoKvtt1U -kZnt8+/pXnxygqdWKY+byxlQU2sM8wREdho+wAx3edf2Smy/NIcq0xDwfMm98ByR -qvd5hWp7DCKBhITLqYv5P4NqM3LCY5N7CjADcyiZAoGBALXXR5WSHLjtzaN4Eeti -pWur1VN30HiM2zRTXwTxx6X7y/FI5xzoCVAJb6tSpC/aXzFx05Xa/LyhDXI2sbhm -G3a4tjBRrief5z8XQ7gdBSiyRtLc1XFy3kmeN2HTPMWSIrbk56xyEOqbXov5S+41 -hWwNT3lodEZ2ymFWEZHHAvhb +MIG2AgEAMBAGByqGSM49AgEGBSuBBAAiBIGeMIGbAgEBBDCs4MYYR22MIL1rZitb +nZF25hbh9M1aI4uPdn+Vqzphuk+tjEMAmYbAZcSCCGGoUzKhZANiAAQ1m4EMNrKE +HkTOZF8SsSght8TkiNYKE3OM7gItOsQJ6Y+pwluPOyTwwHAHlxtscYx6DWeqSqh/ +RmSCszixZmxY0jUf8s4Mp1cEioyltTXaISztLd6jlFoaWr0K0Tp1jIA= -----END PRIVATE KEY----- diff --git a/src/tls/rustls.rs b/src/tls/rustls.rs index 35d81562..708f1cfd 100644 --- a/src/tls/rustls.rs +++ b/src/tls/rustls.rs @@ -194,14 +194,11 @@ impl ServerCertVerifier for NoCertVerification { fn verify_tls13_signature( &self, - _message: &[u8], - _cert: &CertificateDer<'_>, - _dss: &DigitallySignedStruct, + message: &[u8], + cert: &CertificateDer<'_>, + dss: &DigitallySignedStruct, ) -> Result { - // TODO: figure out what's wring with the test cert - // IO(Custom { kind: ConnectionAborted, error: Custom { kind: InvalidData, error: InvalidCertificate(Other(OtherError(UnsupportedCertVersion))) } }) - // self.0.verify_tls13_signature(message, cert, dss) - Ok(HandshakeSignatureValid::assertion()) + self.0.verify_tls13_signature(message, cert, dss) } fn supported_verify_schemes(&self) -> Vec { diff --git a/tests/tls/rustls.rs b/tests/tls/rustls.rs index 52cf422b..28399bf5 100644 --- a/tests/tls/rustls.rs +++ b/tests/tls/rustls.rs @@ -27,7 +27,7 @@ async fn roundtrip_tls() { let tls = || async { let verifier = fixtures::TestServerCertVerifier::new( - SignatureScheme::RSA_PSS_SHA512, + SignatureScheme::ECDSA_NISTP384_SHA384, env::current_dir() .unwrap() .join("docker") From 5b296a8ea1ca2410a35c07ca503464ea201953d3 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Tue, 13 Aug 2024 12:37:30 +0400 Subject: [PATCH 06/14] Allow to skip server certs verification --- src/worker/builder.rs | 36 ++++++++++++++++++++++++++++-------- tests/tls/rustls.rs | 1 + 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/src/worker/builder.rs b/src/worker/builder.rs index 06162e84..053b48be 100644 --- a/src/worker/builder.rs +++ b/src/worker/builder.rs @@ -11,7 +11,14 @@ use tokio::net::TcpStream as TokioStream; #[derive(Debug, Clone, PartialEq, Eq)] pub(crate) enum TlsKind { + None, + + #[cfg(feature = "rustls")] + #[cfg_attr(docsrs, doc(cfg(feature = "native_tls")))] Native, + + #[cfg(feature = "rustls")] + #[cfg_attr(docsrs, doc(cfg(feature = "rustls")))] Rust, } @@ -24,7 +31,9 @@ pub struct WorkerBuilder { callbacks: CallbacksRegistry, shutdown_timeout: Option, shutdown_signal: Option, - tls_kind: Option, + tls_kind: TlsKind, + #[cfg(any(feature = "native_tls", feature = "rustls"))] + #[cfg_attr(docsrs, doc(cfg(any(feature = "native_tls", feature = "rustls"))))] skip_verify_server_certs: bool, } @@ -46,8 +55,10 @@ impl Default for WorkerBuilder { callbacks: CallbacksRegistry::default(), shutdown_timeout: None, shutdown_signal: None, - tls_kind: None, - skip_verify_server_certs: true, + tls_kind: TlsKind::None, + #[cfg(any(feature = "native_tls", feature = "rustls"))] + #[cfg_attr(docsrs, doc(cfg(any(feature = "native_tls", feature = "rustls"))))] + skip_verify_server_certs: false, } } } @@ -252,7 +263,7 @@ impl WorkerBuilder { #[cfg(feature = "native_tls")] #[cfg_attr(docsrs, doc(cfg(feature = "native_tls")))] pub fn with_native_tls(mut self) -> Self { - self.tls_kind = Some(TlsKind::Native); + self.tls_kind = TlsKind::Native; self } @@ -264,7 +275,15 @@ impl WorkerBuilder { #[cfg(feature = "rustls")] #[cfg_attr(docsrs, doc(cfg(feature = "rustls")))] pub fn with_rustls(mut self) -> Self { - self.tls_kind = Some(TlsKind::Rust); + self.tls_kind = TlsKind::Rust; + self + } + + /// Do not verify the server certificates. + #[cfg(any(feature = "native_tls", feature = "rustls"))] + #[cfg_attr(docsrs, doc(cfg(any(feature = "native_tls", feature = "rustls"))))] + pub fn dangerously_skip_verify_server_certs(mut self) -> Self { + self.skip_verify_server_certs = true; self } @@ -325,12 +344,12 @@ impl WorkerBuilder { let addr = utils::host_from_url(&url); let stream = TokioStream::connect(addr).await?; match self.tls_kind { - None => { + TlsKind::None => { self.connect_with(stream, url.password().map(|p| p.to_string())) .await } #[cfg(feature = "rustls")] - Some(TlsKind::Rust) => { + TlsKind::Rust => { let hostname = url.host_str().unwrap().to_string(); let tls_tream = crate::rustls::TlsStream::with_native_certs( stream, @@ -341,7 +360,8 @@ impl WorkerBuilder { self.connect_with(tls_tream, url.password().map(|p| p.to_string())) .await } - _ => unimplemented!(), + #[cfg(feature = "native_tls")] + TlsKind::Native => unimplemented!(), } } } diff --git a/tests/tls/rustls.rs b/tests/tls/rustls.rs index 28399bf5..3f7af167 100644 --- a/tests/tls/rustls.rs +++ b/tests/tls/rustls.rs @@ -88,6 +88,7 @@ async fn roundtrip_tls_with_worker_builder() { let mut worker = Worker::builder() .register(local, fixtures::JobHandler::new(tx)) .with_rustls() + .dangerously_skip_verify_server_certs() .connect(Some(&env::var("FAKTORY_URL_SECURE").unwrap())) .await .unwrap(); From c6db906a55e9ec4d84c26f07405934a706203f6c Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Tue, 13 Aug 2024 12:40:25 +0400 Subject: [PATCH 07/14] Fix TlsKind::Native cfg attr --- src/worker/builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/worker/builder.rs b/src/worker/builder.rs index 053b48be..6b023a02 100644 --- a/src/worker/builder.rs +++ b/src/worker/builder.rs @@ -13,7 +13,7 @@ use tokio::net::TcpStream as TokioStream; pub(crate) enum TlsKind { None, - #[cfg(feature = "rustls")] + #[cfg(feature = "native_tls")] #[cfg_attr(docsrs, doc(cfg(feature = "native_tls")))] Native, From 0e85a61987eb59a4f23d027d903c1f15bafa389f Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Wed, 14 Aug 2024 15:26:15 +0400 Subject: [PATCH 08/14] Impl native_tls arm in WorkerBuilder::connect --- src/tls/native_tls.rs | 18 ++++++++++ src/tls/rustls.rs | 73 ++++++++++++++++++++--------------------- src/worker/builder.rs | 29 +++++++++------- tests/tls/native_tls.rs | 37 +++++++++++++++++++-- 4 files changed, 105 insertions(+), 52 deletions(-) diff --git a/src/tls/native_tls.rs b/src/tls/native_tls.rs index 16bf3eae..5304f2f1 100644 --- a/src/tls/native_tls.rs +++ b/src/tls/native_tls.rs @@ -62,6 +62,24 @@ impl TlsStream { .await } + /// Create a new TLS connection over TCP ***dangerously*** skipping TLS verification. + /// + /// Similar to [`TlsStream::connect`], but accepting invalid server certificates and + /// invalid hostnames. + pub async fn connect_dangerously_skipping_verification( + url: Option<&str>, + ) -> Result { + TlsStream::with_connector( + TlsConnector::builder() + .danger_accept_invalid_certs(true) + .danger_accept_invalid_hostnames(true) + .build() + .map_err(error::Stream::NativeTls)?, + url, + ) + .await + } + /// Create a new TLS connection over TCP using a non-default TLS configuration. /// /// See `connect` for details about the `url` parameter. diff --git a/src/tls/rustls.rs b/src/tls/rustls.rs index 708f1cfd..ff5755de 100644 --- a/src/tls/rustls.rs +++ b/src/tls/rustls.rs @@ -61,8 +61,8 @@ impl TlsStream { /// /// If `url` is given, but does not specify a port, it defaults to 7419. /// - /// Internally creates a `ClientConfig` with an empty root certificates store and no client - /// authentication. Use [`with_client_config`](TlsStream::with_client_config) + /// Internally creates a `ClientConfig` with an _empty_ root certificates store and _no client + /// authentication_. Use [`with_client_config`](TlsStream::with_client_config) /// or [`with_connector`](TlsStream::with_connector) for customized /// `ClientConfig` and `TlsConnector` accordingly. pub async fn connect(url: Option<&str>) -> Result { @@ -73,6 +73,39 @@ impl TlsStream { TlsStream::with_connector(con, url).await } + /// Create a new TLS connection over TCP using native certificates. + /// + /// Unlike [`TlsStream::connect`], creates a root certificates store populated with the certificates + /// loaded from a platform-native certificate store. Thos method also allows to ***dangerously*** + /// skip server certificates verification. + pub async fn connect_with_native_certs( + url: Option<&str>, + dangerously_skip_verify: bool, + ) -> Result { + let mut store = RootCertStore::empty(); + for cert in rustls_native_certs::load_native_certs()? { + store.add(cert).map_err(io::Error::other)?; + } + + let config = if dangerously_skip_verify { + let cert_verifier = WebPkiServerVerifier::builder(Arc::new(store.clone())) + .build() + .expect("can construct standard verifier"); + let mut config = ClientConfig::builder() + .with_root_certificates(store) + .with_no_client_auth(); + config + .dangerous() + .set_certificate_verifier(Arc::new(NoCertVerification(cert_verifier))); + config + } else { + ClientConfig::builder() + .with_root_certificates(store) + .with_no_client_auth() + }; + TlsStream::with_connector(TlsConnector::from(Arc::new(config)), url).await + } + /// Create a new TLS connection over TCP using a non-default TLS configuration. /// /// See `connect` for details about the `url` parameter. @@ -114,42 +147,6 @@ where Self::new(stream, TlsConnector::from(Arc::new(conf)), hostname).await } - /// Create a new TLS connection on an existing stream using native certificates. - /// - /// Internally creates a `ClientConfig` with no client authenticatiom and a root certificates - /// store populated with the certificates loaded from a platform-native certificate store. - /// - /// Use [`new`](TlsStream::new) for a customized `TlsConnector`. - pub async fn with_native_certs( - stream: S, - hostname: String, - skip_verify: bool, - ) -> io::Result { - let mut store = RootCertStore::empty(); - for cert in rustls_native_certs::load_native_certs()? { - store.add(cert).map_err(io::Error::other)?; - } - - let config = if skip_verify { - let cert_verifier = WebPkiServerVerifier::builder(Arc::new(store.clone())) - .build() - .expect("can construct standard verifier"); - let mut config = ClientConfig::builder() - .with_root_certificates(store) - .with_no_client_auth(); - config - .dangerous() - .set_certificate_verifier(Arc::new(NoCertVerification(cert_verifier))); - config - } else { - ClientConfig::builder() - .with_root_certificates(store) - .with_no_client_auth() - }; - - Self::new(stream, TlsConnector::from(Arc::new(config)), hostname).await - } - /// Create a new TLS connection on an existing stream with a non-default TLS configuration. pub async fn new(stream: S, connector: TlsConnector, hostname: String) -> io::Result { let server_name = hostname diff --git a/src/worker/builder.rs b/src/worker/builder.rs index 6b023a02..3bbfcd82 100644 --- a/src/worker/builder.rs +++ b/src/worker/builder.rs @@ -340,28 +340,33 @@ impl WorkerBuilder { /// /// If `url` is given, but does not specify a port, it defaults to 7419. pub async fn connect(self, url: Option<&str>) -> Result, Error> { - let url = utils::parse_provided_or_from_env(url)?; - let addr = utils::host_from_url(&url); - let stream = TokioStream::connect(addr).await?; + let parsed_url = utils::parse_provided_or_from_env(url)?; + let password = parsed_url.password().map(|p| p.to_string()); match self.tls_kind { TlsKind::None => { - self.connect_with(stream, url.password().map(|p| p.to_string())) - .await + let addr = utils::host_from_url(&parsed_url); + let stream = TokioStream::connect(addr).await?; + self.connect_with(stream, password).await } #[cfg(feature = "rustls")] TlsKind::Rust => { - let hostname = url.host_str().unwrap().to_string(); - let tls_tream = crate::rustls::TlsStream::with_native_certs( - stream, - hostname, + let stream = crate::rustls::TlsStream::connect_with_native_certs( + url, self.skip_verify_server_certs, ) .await?; - self.connect_with(tls_tream, url.password().map(|p| p.to_string())) - .await + self.connect_with(stream, password).await } #[cfg(feature = "native_tls")] - TlsKind::Native => unimplemented!(), + TlsKind::Native => { + let stream = if self.skip_verify_server_certs { + crate::native_tls::TlsStream::connect_dangerously_skipping_verification(url) + .await? + } else { + crate::native_tls::TlsStream::connect(url).await? + }; + self.connect_with(stream, password).await + } } } } diff --git a/tests/tls/native_tls.rs b/tests/tls/native_tls.rs index 4f4c5c20..0e399d9e 100644 --- a/tests/tls/native_tls.rs +++ b/tests/tls/native_tls.rs @@ -1,5 +1,5 @@ use faktory::native_tls::TlsStream; -use faktory::{Client, Job, WorkerBuilder, WorkerId}; +use faktory::{Client, Job, Worker, WorkerId}; use serde_json::Value; use std::{env, sync}; use url::Url; @@ -37,7 +37,7 @@ async fn roundtrip_tls() { .password() .map(|p| p.to_string()); - let mut worker = WorkerBuilder::default() + let mut worker = Worker::builder() .hostname("tester".to_string()) .wid(WorkerId::new(local)) .register(local, fixtures::JobHandler::new(tx)) @@ -61,6 +61,39 @@ async fn roundtrip_tls() { assert_eq!(job.args(), &[Value::from("z")]); } +#[tokio::test(flavor = "multi_thread")] +async fn roundtrip_tls_with_worker_builder() { + if env::var_os("FAKTORY_URL_SECURE").is_none() { + return; + } + + let local = "roundtrip_tls_with_worker_builder"; + let (tx, rx) = sync::mpsc::channel(); + + let mut worker = Worker::builder() + .register(local, fixtures::JobHandler::new(tx)) + .with_native_tls() + .dangerously_skip_verify_server_certs() + .connect(Some(&env::var("FAKTORY_URL_SECURE").unwrap())) + .await + .unwrap(); + + // "one-shot" producer + Client::connect(Some(&env::var("FAKTORY_URL").unwrap())) + .await + .unwrap() + .enqueue(Job::new(local, vec!["z"]).on_queue(local)) + .await + .unwrap(); + + worker.run_one(0, &[local]).await.unwrap(); + + let job = rx.recv().unwrap(); + assert_eq!(job.queue, local); + assert_eq!(job.kind(), local); + assert_eq!(job.args(), &[Value::from("z")]); +} + mod fixtures { pub use handler::JobHandler; From 93fcdc0657e341543ee36b57bbbd37bb14d4e8df Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Wed, 14 Aug 2024 16:29:53 +0400 Subject: [PATCH 09/14] Update docs --- README.md | 4 +++- src/lib.rs | 33 +++++++++++++++++++++++++++++++-- src/worker/builder.rs | 10 ++++++++++ 3 files changed, 44 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 86ea74f4..0fc5ae72 100644 --- a/README.md +++ b/README.md @@ -78,11 +78,12 @@ let mut w = Worker::builder() println!("{:?}", job); Ok::<(), io::Error>(()) }) - .register_blocking_fn("fibo", |job| + .register_blocking_fn("fibo", |job| { std::thread::sleep(Duration::from_millis(1000)); println!("{:?}", job); Ok::<(), io::Error>(()) }) + .with_rustls() // available on `rustls` feature only .connect(None) .await .unwrap(); @@ -96,6 +97,7 @@ match w.run(&["default"]).await { stop_details.workers_still_running ); } +} ``` Also see some usage examples in `examples` directory in the project's root. You can run an example with: diff --git a/src/lib.rs b/src/lib.rs index 1f2c06c3..30ed5c10 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -48,14 +48,43 @@ //! //! ```no_run //! # tokio_test::block_on(async { -//! use faktory::Worker; +//! use async_trait::async_trait; +//! use faktory::{Job, JobRunner, Worker}; //! use std::io; +//! +//! struct DomainEntity(i32); +//! +//! impl DomainEntity { +//! fn new(buzz: i32) -> Self { +//! DomainEntity(buzz) +//! } +//! } +//! +//! #[async_trait] +//! impl JobRunner for DomainEntity { +//! type Error = io::Error; +//! +//! async fn run(&self, job: Job) -> Result<(), Self::Error> { +//! println!("{:?}, buzz={}", job, self.0); +//! Ok(()) +//! } +//! } +//! //! let mut w = Worker::builder() +//! .register("fizz", DomainEntity::new(1)) //! .register_fn("foobar", |job| async move { //! println!("{:?}", job); //! Ok::<(), io::Error>(()) //! }) -//! .connect(None).await.unwrap(); +//! .register_blocking_fn("fibo", |job| { +//! std::thread::sleep(std::time::Duration::from_millis(1000)); +//! println!("{:?}", job); +//! Ok::<(), io::Error>(()) +//! }) +//! .with_rustls() // available on `rustls` feature only +//! .connect(None) +//! .await +//! .unwrap(); //! //! if let Err(e) = w.run(&["default"]).await { //! println!("worker failed: {}", e); diff --git a/src/worker/builder.rs b/src/worker/builder.rs index 3bbfcd82..f81d5500 100644 --- a/src/worker/builder.rs +++ b/src/worker/builder.rs @@ -257,6 +257,11 @@ impl WorkerBuilder { /// The underlying crate (`native-tls`) will use _SChannel_ on Windows, /// _SecureTransport_ on OSX, and _OpenSSL_ on other platforms. /// + /// Internally, will use [`TlsStream::connect`](crate::native_tls::TlsStream::connect) to establish + /// a TLS stream to the Faktory server. If [`WorkerBuilder::dangerously_skip_verify_server_certs`] + /// has been called on this builder, [`TlsStream::connect_dangerously_skipping_verification`](crate::native_tls::TlsStream::connect_dangerously_skipping_verification) + /// will be used. + /// /// Note that if you use this method on the builder, but eventually use [`WorkerBuilder::connect_with`] /// (rather than [`WorkerBuilder::connect`]) to create an instance of [`Worker`], this worker /// will be connected to the Faktory server with the stream you've provided to `connect_with`. @@ -269,6 +274,11 @@ impl WorkerBuilder { /// Make the traffic between this worker and Faktory encrypted with [`rustls`](https://github.com/rustls/rustls). /// + /// Internally, will use [`TlsStream::connect_with_native_certs`](crate::rustls::TlsStream::connect_with_native_certs) + /// to establish a TLS stream to the Faktory server. If [`WorkerBuilder::dangerously_skip_verify_server_certs`] + /// has been called on this builder, a `true` will provided to [`TlsStream::connect_with_native_certs`](crate::rustls::TlsStream::connect_with_native_certs) + /// as an argument for `dangerously_skip_verify`. + /// /// Note that if you use this method on the builder, but eventually use [`WorkerBuilder::connect_with`] /// (rather than [`WorkerBuilder::connect`]) to create an instance of [`Worker`], this worker /// will be connected to the Faktory server with the stream you've provided to `connect_with`. From e727bfb7941c9002fdbb9195a7b38d71f981c740 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Wed, 14 Aug 2024 16:44:14 +0400 Subject: [PATCH 10/14] Clean up --- src/worker/builder.rs | 26 +++++++++++++------------- tests/tls/native_tls.rs | 2 +- tests/tls/rustls.rs | 2 +- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/worker/builder.rs b/src/worker/builder.rs index f81d5500..10d616e2 100644 --- a/src/worker/builder.rs +++ b/src/worker/builder.rs @@ -34,7 +34,7 @@ pub struct WorkerBuilder { tls_kind: TlsKind, #[cfg(any(feature = "native_tls", feature = "rustls"))] #[cfg_attr(docsrs, doc(cfg(any(feature = "native_tls", feature = "rustls"))))] - skip_verify_server_certs: bool, + verify_server_cert: bool, } impl Default for WorkerBuilder { @@ -58,7 +58,7 @@ impl Default for WorkerBuilder { tls_kind: TlsKind::None, #[cfg(any(feature = "native_tls", feature = "rustls"))] #[cfg_attr(docsrs, doc(cfg(any(feature = "native_tls", feature = "rustls"))))] - skip_verify_server_certs: false, + verify_server_cert: true, } } } @@ -258,10 +258,10 @@ impl WorkerBuilder { /// _SecureTransport_ on OSX, and _OpenSSL_ on other platforms. /// /// Internally, will use [`TlsStream::connect`](crate::native_tls::TlsStream::connect) to establish - /// a TLS stream to the Faktory server. If [`WorkerBuilder::dangerously_skip_verify_server_certs`] + /// a TLS stream to the Faktory server. If [`WorkerBuilder::dangerously_without_cert_verification`] /// has been called on this builder, [`TlsStream::connect_dangerously_skipping_verification`](crate::native_tls::TlsStream::connect_dangerously_skipping_verification) /// will be used. - /// + /// /// Note that if you use this method on the builder, but eventually use [`WorkerBuilder::connect_with`] /// (rather than [`WorkerBuilder::connect`]) to create an instance of [`Worker`], this worker /// will be connected to the Faktory server with the stream you've provided to `connect_with`. @@ -275,10 +275,10 @@ impl WorkerBuilder { /// Make the traffic between this worker and Faktory encrypted with [`rustls`](https://github.com/rustls/rustls). /// /// Internally, will use [`TlsStream::connect_with_native_certs`](crate::rustls::TlsStream::connect_with_native_certs) - /// to establish a TLS stream to the Faktory server. If [`WorkerBuilder::dangerously_skip_verify_server_certs`] + /// to establish a TLS stream to the Faktory server. If [`WorkerBuilder::dangerously_without_cert_verification`] /// has been called on this builder, a `true` will provided to [`TlsStream::connect_with_native_certs`](crate::rustls::TlsStream::connect_with_native_certs) /// as an argument for `dangerously_skip_verify`. - /// + /// /// Note that if you use this method on the builder, but eventually use [`WorkerBuilder::connect_with`] /// (rather than [`WorkerBuilder::connect`]) to create an instance of [`Worker`], this worker /// will be connected to the Faktory server with the stream you've provided to `connect_with`. @@ -289,11 +289,11 @@ impl WorkerBuilder { self } - /// Do not verify the server certificates. + /// Do not verify the server certificate. #[cfg(any(feature = "native_tls", feature = "rustls"))] #[cfg_attr(docsrs, doc(cfg(any(feature = "native_tls", feature = "rustls"))))] - pub fn dangerously_skip_verify_server_certs(mut self) -> Self { - self.skip_verify_server_certs = true; + pub fn dangerously_without_cert_verification(mut self) -> Self { + self.verify_server_cert = false; self } @@ -362,18 +362,18 @@ impl WorkerBuilder { TlsKind::Rust => { let stream = crate::rustls::TlsStream::connect_with_native_certs( url, - self.skip_verify_server_certs, + !self.verify_server_cert, ) .await?; self.connect_with(stream, password).await } #[cfg(feature = "native_tls")] TlsKind::Native => { - let stream = if self.skip_verify_server_certs { + let stream = if self.verify_server_cert { + crate::native_tls::TlsStream::connect(url).await? + } else { crate::native_tls::TlsStream::connect_dangerously_skipping_verification(url) .await? - } else { - crate::native_tls::TlsStream::connect(url).await? }; self.connect_with(stream, password).await } diff --git a/tests/tls/native_tls.rs b/tests/tls/native_tls.rs index 0e399d9e..54af2a58 100644 --- a/tests/tls/native_tls.rs +++ b/tests/tls/native_tls.rs @@ -73,7 +73,7 @@ async fn roundtrip_tls_with_worker_builder() { let mut worker = Worker::builder() .register(local, fixtures::JobHandler::new(tx)) .with_native_tls() - .dangerously_skip_verify_server_certs() + .dangerously_without_cert_verification() .connect(Some(&env::var("FAKTORY_URL_SECURE").unwrap())) .await .unwrap(); diff --git a/tests/tls/rustls.rs b/tests/tls/rustls.rs index 3f7af167..e7e9b4c6 100644 --- a/tests/tls/rustls.rs +++ b/tests/tls/rustls.rs @@ -88,7 +88,7 @@ async fn roundtrip_tls_with_worker_builder() { let mut worker = Worker::builder() .register(local, fixtures::JobHandler::new(tx)) .with_rustls() - .dangerously_skip_verify_server_certs() + .dangerously_without_cert_verification() .connect(Some(&env::var("FAKTORY_URL_SECURE").unwrap())) .await .unwrap(); From 65c31e814bede2b5efa2d41d43182059344e8a06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pavie=C5=82=20Michalkievi=C4=8D?= <117771945+rustworthy@users.noreply.github.com> Date: Sun, 18 Aug 2024 12:01:08 +0400 Subject: [PATCH 11/14] Update src/tls/native_tls.rs Co-authored-by: Jon Gjengset --- src/tls/native_tls.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tls/native_tls.rs b/src/tls/native_tls.rs index 5304f2f1..a85a97fd 100644 --- a/src/tls/native_tls.rs +++ b/src/tls/native_tls.rs @@ -62,7 +62,7 @@ impl TlsStream { .await } - /// Create a new TLS connection over TCP ***dangerously*** skipping TLS verification. + /// Create a new TLS connection over TCP **dangerously** skipping TLS verification. /// /// Similar to [`TlsStream::connect`], but accepting invalid server certificates and /// invalid hostnames. From 0826522008ed26f69e4e8f48f73fc6ba64bdc8ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pavie=C5=82=20Michalkievi=C4=8D?= <117771945+rustworthy@users.noreply.github.com> Date: Sun, 18 Aug 2024 12:01:18 +0400 Subject: [PATCH 12/14] Update src/tls/rustls.rs Co-authored-by: Jon Gjengset --- src/tls/rustls.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tls/rustls.rs b/src/tls/rustls.rs index ff5755de..c130d9d1 100644 --- a/src/tls/rustls.rs +++ b/src/tls/rustls.rs @@ -76,7 +76,7 @@ impl TlsStream { /// Create a new TLS connection over TCP using native certificates. /// /// Unlike [`TlsStream::connect`], creates a root certificates store populated with the certificates - /// loaded from a platform-native certificate store. Thos method also allows to ***dangerously*** + /// loaded from a platform-native certificate store. Thos method also allows to **dangerously** /// skip server certificates verification. pub async fn connect_with_native_certs( url: Option<&str>, From 2c9435594bbb94073da871fb6df6915117d0e038 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Sun, 18 Aug 2024 14:14:16 +0400 Subject: [PATCH 13/14] Rm 'verify_server_cert' option from WorkerBuilder --- Cargo.toml | 1 - src/tls/native_tls.rs | 18 ---------- src/tls/rustls.rs | 77 ++++------------------------------------- src/worker/builder.rs | 35 +++---------------- tests/tls/native_tls.rs | 33 ------------------ tests/tls/rustls.rs | 37 ++------------------ 6 files changed, 13 insertions(+), 188 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ede25b43..db34e9d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,7 +53,6 @@ tokio = { version = "1.35.1", features = [ tokio-native-tls = { version = "0.3.1", optional = true } tokio-rustls = { version = "0.25.0", optional = true } rustls-native-certs = { version = "0.7.1", optional = true } -rustls-pki-types = { version = "1.0.1", optional = true } tracing = "0.1" url = "2" semver = { version = "1.0.23", features = ["serde"] } diff --git a/src/tls/native_tls.rs b/src/tls/native_tls.rs index a85a97fd..16bf3eae 100644 --- a/src/tls/native_tls.rs +++ b/src/tls/native_tls.rs @@ -62,24 +62,6 @@ impl TlsStream { .await } - /// Create a new TLS connection over TCP **dangerously** skipping TLS verification. - /// - /// Similar to [`TlsStream::connect`], but accepting invalid server certificates and - /// invalid hostnames. - pub async fn connect_dangerously_skipping_verification( - url: Option<&str>, - ) -> Result { - TlsStream::with_connector( - TlsConnector::builder() - .danger_accept_invalid_certs(true) - .danger_accept_invalid_hostnames(true) - .build() - .map_err(error::Stream::NativeTls)?, - url, - ) - .await - } - /// Create a new TLS connection over TCP using a non-default TLS configuration. /// /// See `connect` for details about the `url` parameter. diff --git a/src/tls/rustls.rs b/src/tls/rustls.rs index c130d9d1..3496669e 100644 --- a/src/tls/rustls.rs +++ b/src/tls/rustls.rs @@ -3,20 +3,13 @@ use crate::{Client, WorkerBuilder}; use crate::proto::{self, utils}; use crate::{Error, Reconnect}; -use rustls_pki_types::{CertificateDer, ServerName, UnixTime}; use std::io; use std::ops::{Deref, DerefMut}; use std::sync::Arc; use tokio::io::{AsyncRead, AsyncWrite, BufStream}; use tokio::net::TcpStream as TokioTcpStream; use tokio_rustls::client::TlsStream as RustlsStream; -use tokio_rustls::rustls::client::danger::{ - HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier, -}; -use tokio_rustls::rustls::{ - client::WebPkiServerVerifier, ClientConfig, DigitallySignedStruct, Error as RustlsError, - RootCertStore, SignatureScheme, -}; +use tokio_rustls::rustls::{ClientConfig, RootCertStore}; use tokio_rustls::TlsConnector; /// A reconnectable stream encrypted with TLS. @@ -75,34 +68,16 @@ impl TlsStream { /// Create a new TLS connection over TCP using native certificates. /// - /// Unlike [`TlsStream::connect`], creates a root certificates store populated with the certificates - /// loaded from a platform-native certificate store. Thos method also allows to **dangerously** - /// skip server certificates verification. - pub async fn connect_with_native_certs( - url: Option<&str>, - dangerously_skip_verify: bool, - ) -> Result { + /// Unlike [`TlsStream::connect`], creates a root certificates store populated + /// with the certificates loaded from a platform-native certificate store. + pub async fn connect_with_native_certs(url: Option<&str>) -> Result { let mut store = RootCertStore::empty(); for cert in rustls_native_certs::load_native_certs()? { store.add(cert).map_err(io::Error::other)?; } - - let config = if dangerously_skip_verify { - let cert_verifier = WebPkiServerVerifier::builder(Arc::new(store.clone())) - .build() - .expect("can construct standard verifier"); - let mut config = ClientConfig::builder() - .with_root_certificates(store) - .with_no_client_auth(); - config - .dangerous() - .set_certificate_verifier(Arc::new(NoCertVerification(cert_verifier))); - config - } else { - ClientConfig::builder() - .with_root_certificates(store) - .with_no_client_auth() - }; + let config = ClientConfig::builder() + .with_root_certificates(store) + .with_no_client_auth(); TlsStream::with_connector(TlsConnector::from(Arc::new(config)), url).await } @@ -165,44 +140,6 @@ where } } -#[derive(Debug)] -struct NoCertVerification(Arc); - -impl ServerCertVerifier for NoCertVerification { - fn verify_server_cert( - &self, - _: &CertificateDer<'_>, - _: &[CertificateDer<'_>], - _: &ServerName<'_>, - _: &[u8], - _: UnixTime, - ) -> Result { - Ok(ServerCertVerified::assertion()) - } - - fn verify_tls12_signature( - &self, - message: &[u8], - cert: &rustls_pki_types::CertificateDer<'_>, - dss: &DigitallySignedStruct, - ) -> Result { - self.0.verify_tls12_signature(message, cert, dss) - } - - fn verify_tls13_signature( - &self, - message: &[u8], - cert: &CertificateDer<'_>, - dss: &DigitallySignedStruct, - ) -> Result { - self.0.verify_tls13_signature(message, cert, dss) - } - - fn supported_verify_schemes(&self) -> Vec { - self.0.supported_verify_schemes() - } -} - #[async_trait::async_trait] impl Reconnect for BufStream> { async fn reconnect(&mut self) -> io::Result { diff --git a/src/worker/builder.rs b/src/worker/builder.rs index 10d616e2..a947eb03 100644 --- a/src/worker/builder.rs +++ b/src/worker/builder.rs @@ -32,9 +32,6 @@ pub struct WorkerBuilder { shutdown_timeout: Option, shutdown_signal: Option, tls_kind: TlsKind, - #[cfg(any(feature = "native_tls", feature = "rustls"))] - #[cfg_attr(docsrs, doc(cfg(any(feature = "native_tls", feature = "rustls"))))] - verify_server_cert: bool, } impl Default for WorkerBuilder { @@ -56,9 +53,6 @@ impl Default for WorkerBuilder { shutdown_timeout: None, shutdown_signal: None, tls_kind: TlsKind::None, - #[cfg(any(feature = "native_tls", feature = "rustls"))] - #[cfg_attr(docsrs, doc(cfg(any(feature = "native_tls", feature = "rustls"))))] - verify_server_cert: true, } } } @@ -258,9 +252,7 @@ impl WorkerBuilder { /// _SecureTransport_ on OSX, and _OpenSSL_ on other platforms. /// /// Internally, will use [`TlsStream::connect`](crate::native_tls::TlsStream::connect) to establish - /// a TLS stream to the Faktory server. If [`WorkerBuilder::dangerously_without_cert_verification`] - /// has been called on this builder, [`TlsStream::connect_dangerously_skipping_verification`](crate::native_tls::TlsStream::connect_dangerously_skipping_verification) - /// will be used. + /// a TLS stream to the Faktory server. /// /// Note that if you use this method on the builder, but eventually use [`WorkerBuilder::connect_with`] /// (rather than [`WorkerBuilder::connect`]) to create an instance of [`Worker`], this worker @@ -275,9 +267,7 @@ impl WorkerBuilder { /// Make the traffic between this worker and Faktory encrypted with [`rustls`](https://github.com/rustls/rustls). /// /// Internally, will use [`TlsStream::connect_with_native_certs`](crate::rustls::TlsStream::connect_with_native_certs) - /// to establish a TLS stream to the Faktory server. If [`WorkerBuilder::dangerously_without_cert_verification`] - /// has been called on this builder, a `true` will provided to [`TlsStream::connect_with_native_certs`](crate::rustls::TlsStream::connect_with_native_certs) - /// as an argument for `dangerously_skip_verify`. + /// to establish a TLS stream to the Faktory server. /// /// Note that if you use this method on the builder, but eventually use [`WorkerBuilder::connect_with`] /// (rather than [`WorkerBuilder::connect`]) to create an instance of [`Worker`], this worker @@ -289,14 +279,6 @@ impl WorkerBuilder { self } - /// Do not verify the server certificate. - #[cfg(any(feature = "native_tls", feature = "rustls"))] - #[cfg_attr(docsrs, doc(cfg(any(feature = "native_tls", feature = "rustls"))))] - pub fn dangerously_without_cert_verification(mut self) -> Self { - self.verify_server_cert = false; - self - } - /// Connect to a Faktory server with a non-standard stream. /// /// Iternally, the `stream` will be buffered. In case you've got a `stream` that is _already_ @@ -360,21 +342,12 @@ impl WorkerBuilder { } #[cfg(feature = "rustls")] TlsKind::Rust => { - let stream = crate::rustls::TlsStream::connect_with_native_certs( - url, - !self.verify_server_cert, - ) - .await?; + let stream = crate::rustls::TlsStream::connect_with_native_certs(url).await?; self.connect_with(stream, password).await } #[cfg(feature = "native_tls")] TlsKind::Native => { - let stream = if self.verify_server_cert { - crate::native_tls::TlsStream::connect(url).await? - } else { - crate::native_tls::TlsStream::connect_dangerously_skipping_verification(url) - .await? - }; + let stream = crate::native_tls::TlsStream::connect(url).await?; self.connect_with(stream, password).await } } diff --git a/tests/tls/native_tls.rs b/tests/tls/native_tls.rs index 54af2a58..c50ebb8d 100644 --- a/tests/tls/native_tls.rs +++ b/tests/tls/native_tls.rs @@ -61,39 +61,6 @@ async fn roundtrip_tls() { assert_eq!(job.args(), &[Value::from("z")]); } -#[tokio::test(flavor = "multi_thread")] -async fn roundtrip_tls_with_worker_builder() { - if env::var_os("FAKTORY_URL_SECURE").is_none() { - return; - } - - let local = "roundtrip_tls_with_worker_builder"; - let (tx, rx) = sync::mpsc::channel(); - - let mut worker = Worker::builder() - .register(local, fixtures::JobHandler::new(tx)) - .with_native_tls() - .dangerously_without_cert_verification() - .connect(Some(&env::var("FAKTORY_URL_SECURE").unwrap())) - .await - .unwrap(); - - // "one-shot" producer - Client::connect(Some(&env::var("FAKTORY_URL").unwrap())) - .await - .unwrap() - .enqueue(Job::new(local, vec!["z"]).on_queue(local)) - .await - .unwrap(); - - worker.run_one(0, &[local]).await.unwrap(); - - let job = rx.recv().unwrap(); - assert_eq!(job.queue, local); - assert_eq!(job.kind(), local); - assert_eq!(job.args(), &[Value::from("z")]); -} - mod fixtures { pub use handler::JobHandler; diff --git a/tests/tls/rustls.rs b/tests/tls/rustls.rs index e7e9b4c6..41f605a8 100644 --- a/tests/tls/rustls.rs +++ b/tests/tls/rustls.rs @@ -1,5 +1,5 @@ use faktory::rustls::TlsStream; -use faktory::{Client, Job, Worker, WorkerBuilder, WorkerId}; +use faktory::{Client, Job, Worker, WorkerId}; use serde_json::Value; use std::{ env, @@ -52,7 +52,7 @@ async fn roundtrip_tls() { .password() .map(|p| p.to_string()); - let mut worker = WorkerBuilder::default() + let mut worker = Worker::builder() .hostname("tester".to_string()) .wid(WorkerId::new(local)) .register(local, fixtures::JobHandler::new(tx)) @@ -76,39 +76,6 @@ async fn roundtrip_tls() { assert_eq!(job.args(), &[Value::from("z")]); } -#[tokio::test(flavor = "multi_thread")] -async fn roundtrip_tls_with_worker_builder() { - if env::var_os("FAKTORY_URL_SECURE").is_none() { - return; - } - - let local = "roundtrip_tls_with_worker_builder"; - let (tx, rx) = sync::mpsc::channel(); - - let mut worker = Worker::builder() - .register(local, fixtures::JobHandler::new(tx)) - .with_rustls() - .dangerously_without_cert_verification() - .connect(Some(&env::var("FAKTORY_URL_SECURE").unwrap())) - .await - .unwrap(); - - // "one-shot" producer - Client::connect(Some(&env::var("FAKTORY_URL").unwrap())) - .await - .unwrap() - .enqueue(Job::new(local, vec!["z"]).on_queue(local)) - .await - .unwrap(); - - worker.run_one(0, &[local]).await.unwrap(); - - let job = rx.recv().unwrap(); - assert_eq!(job.queue, local); - assert_eq!(job.kind(), local); - assert_eq!(job.args(), &[Value::from("z")]); -} - mod fixtures { pub use handler::JobHandler; pub use tls::TestServerCertVerifier; From af21d7d39fb09461b2646180d117cca0d9c9fe98 Mon Sep 17 00:00:00 2001 From: Pavel Mikhalkevich Date: Sun, 18 Aug 2024 14:22:38 +0400 Subject: [PATCH 14/14] Rm from pki-types rustls-deps --- Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index db34e9d1..b29c54cc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,6 @@ native_tls = ["dep:pin-project", "dep:tokio-native-tls"] rustls = [ "dep:pin-project", "dep:tokio-rustls", - "dep:rustls-pki-types", "dep:rustls-native-certs", ] binaries = ["dep:clap", "tokio/macros"]