From 39a0cba95b5b554b3ccddc493c736a2058118d4d Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Tue, 7 Sep 2021 18:54:31 +1000 Subject: [PATCH] Implement TLS support for incoming redis connections (#141) --- Cargo.lock | 24 ++++++ shotover-proxy/Cargo.toml | 2 + .../examples/redis-tls/docker-compose.yml | 6 ++ .../examples/redis-tls/redis-cli.sh | 3 + .../examples/redis-tls/tls_keys/ca.crt | 31 +++++++ .../examples/redis-tls/tls_keys/redis.crt | 23 +++++ .../examples/redis-tls/tls_keys/redis.key | 27 ++++++ .../examples/redis-tls/topology.yaml | 16 ++++ shotover-proxy/src/lib.rs | 1 + shotover-proxy/src/server.rs | 85 +++++++++++++------ .../src/sources/cassandra_source.rs | 1 + shotover-proxy/src/sources/redis_source.rs | 40 ++++----- shotover-proxy/src/tls.rs | 82 ++++++++++++++++++ shotover-proxy/tests/helpers/mod.rs | 43 ++++++++++ .../redis_int_tests/basic_driver_tests.rs | 38 +++++++++ 15 files changed, 376 insertions(+), 46 deletions(-) create mode 100644 shotover-proxy/examples/redis-tls/docker-compose.yml create mode 100755 shotover-proxy/examples/redis-tls/redis-cli.sh create mode 100644 shotover-proxy/examples/redis-tls/tls_keys/ca.crt create mode 100644 shotover-proxy/examples/redis-tls/tls_keys/redis.crt create mode 100644 shotover-proxy/examples/redis-tls/tls_keys/redis.key create mode 100644 shotover-proxy/examples/redis-tls/topology.yaml create mode 100644 shotover-proxy/src/tls.rs diff --git a/Cargo.lock b/Cargo.lock index f5c88568a..8ddd39526 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1637,6 +1637,15 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28988d872ab76095a6e6ac88d99b54fd267702734fd7ffe610ca27f533ddb95a" +[[package]] +name = "openssl-src" +version = "111.16.0+1.1.1l" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ab2173f69416cf3ec12debb5823d244127d23a9b127d5a5189aa97c5fa2859f" +dependencies = [ + "cc", +] + [[package]] name = "openssl-sys" version = "0.9.66" @@ -1646,6 +1655,7 @@ dependencies = [ "autocfg", "cc", "libc", + "openssl-src", "pkg-config", "vcpkg", ] @@ -2508,6 +2518,7 @@ dependencies = [ "metrics-exporter-prometheus", "ntest", "num_cpus", + "openssl", "pcap", "pin-project-lite", "pktparse", @@ -2529,6 +2540,7 @@ dependencies = [ "threadpool", "tls-parser", "tokio", + "tokio-openssl", "tokio-stream", "tokio-util", "tracing", @@ -2846,6 +2858,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-openssl" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f24cddc8445a4dc8359cdd9e91c19d544fc95f672e32afe8945852b9381a09fe" +dependencies = [ + "futures", + "openssl", + "openssl-sys", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.7" diff --git a/shotover-proxy/Cargo.toml b/shotover-proxy/Cargo.toml index 24feda0b1..1666dacdb 100644 --- a/shotover-proxy/Cargo.toml +++ b/shotover-proxy/Cargo.toml @@ -24,6 +24,8 @@ rand = { version = "0.8.4", features = ["small_rng", "std"]} rand_distr = "0.4.1" cached = "0.25.0" pin-project-lite = "0.2" +tokio-openssl = "0.6.2" +openssl = { version = "0.10.36", features = ["vendored"] } # Error handling thiserror = "1.0" diff --git a/shotover-proxy/examples/redis-tls/docker-compose.yml b/shotover-proxy/examples/redis-tls/docker-compose.yml new file mode 100644 index 000000000..b8361b077 --- /dev/null +++ b/shotover-proxy/examples/redis-tls/docker-compose.yml @@ -0,0 +1,6 @@ +version: "3.3" +services: + redis-one: + image: library/redis:5.0.9 + ports: + - "1111:6379" diff --git a/shotover-proxy/examples/redis-tls/redis-cli.sh b/shotover-proxy/examples/redis-tls/redis-cli.sh new file mode 100755 index 000000000..8ebe289b7 --- /dev/null +++ b/shotover-proxy/examples/redis-tls/redis-cli.sh @@ -0,0 +1,3 @@ +#!/bin/sh + +redis-cli --tls --cert tls_keys/redis.crt --key tls_keys/redis.key --cacert tls_keys/ca.crt "$@" diff --git a/shotover-proxy/examples/redis-tls/tls_keys/ca.crt b/shotover-proxy/examples/redis-tls/tls_keys/ca.crt new file mode 100644 index 000000000..4e7824bf7 --- /dev/null +++ b/shotover-proxy/examples/redis-tls/tls_keys/ca.crt @@ -0,0 +1,31 @@ +-----BEGIN CERTIFICATE----- +MIIFSzCCAzOgAwIBAgIUWPH3dRgC/YwYZZCXib8gwJ+O8xYwDQYJKoZIhvcNAQEL +BQAwNTETMBEGA1UECgwKUmVkaXMgVGVzdDEeMBwGA1UEAwwVQ2VydGlmaWNhdGUg +QXV0aG9yaXR5MB4XDTIxMDgxNjA1NTcxN1oXDTMxMDgxNDA1NTcxN1owNTETMBEG +A1UECgwKUmVkaXMgVGVzdDEeMBwGA1UEAwwVQ2VydGlmaWNhdGUgQXV0aG9yaXR5 +MIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEAw+X26W/dGC+2sJHiJkbf +Hq39KSWoXrCvXFP6FXuGqdEIZzaUokOSrNR0aueasOyc3hQ+B+N1DyPlPXDYxcll +yZdcbRvUT85tprheke6P1ofA369fWpm6aZIQf1ahsBpZ5KLwavqo1MC/5eqvvloy +vwomLvmjONSjnbvbofgRkFH0mETo91qh9tAK6wLSEUwZhKFsBIajP5cQ+IlL8QVi +0sIc5fIrmAPUdK2NwI1ly1L48BuBT2n46nm3J9ZX5Px+9NvsDT7VA+g5o1Pfer62 +gfmEdQdWzdRBWRw6HLMuvN3w6mL/3++W5LzhF846szebzhue84XzuGSJT5i6s3LP +dgWTweJTLP7imb2JFaZDcAwDPMXUhYqhU6wst+mlsqUX9XHR9MajZluxIQLXAom/ +ZUSVYjlOvtTmFAOEqe/eKxM/ZX4+umnQDIMirvtxOV8jtO02/Yog2nSs3XLNQXL5 +tIwDX2A/HErzvSOsMMwKggUBChBorELaSzy1bMNoSw0t6CQ+74fT7iV7nWKKvpmf +Zfeb7WOu2XF/zjNyy9ghQkiARTupCzZY2G7kCMCtXy7Q/I3ip5LWhiywwDOgkibI +Elp0AZEy3MvLfM/7kFroWd7NFQ739o7nkU1G6hhgKIGzUqP83ozwzjHuDvrE2yXw +Pi41ze7Iy+GvlKPgkwsVR9MCAwEAAaNTMFEwHQYDVR0OBBYEFLw7OfSQI/lsTRe/ +4Pu/ctJDj/a6MB8GA1UdIwQYMBaAFLw7OfSQI/lsTRe/4Pu/ctJDj/a6MA8GA1Ud +EwEB/wQFMAMBAf8wDQYJKoZIhvcNAQELBQADggIBACwGR6DnefMO3n52bfujvwan +qRnH87IlnFrGDW1yXNAfs/9hh8UHPRNgvOzU/OlJBMWF0r+T0QwChcgqwZ/kQ5RG +GbkZeAA69Ail+LYra6p9r6tuTyZtGkpOLI+w73Qb6GS4q5agYMJjz7OU3M7m7dsw +lkYfqSxHtXc5otdxG9dlblRW5jpEoD9ry5Oh7f1FIGMgncOvYGX2zgLUr2Qn65aL ++Ung0KTmZtwPjzfWzHQEPx0hFcnh/K3xilhJ4P+8lc71aWpjnffvrONlX0dJzSzm +3xkCkorHagnAFoJwIM5/boe77PaHb4A4ehmkzKYIzMxbfGnePkUwFmGhmxgTHgtu +67voFBfIuWoEZNdl44Mb28JY7zYTJ81fi/RSnxqMg+2oxaqZlF+Y/c81fHBzOaSM +/txPmt5WWqC6aId+Mmmnmx1IpEvooc0KI3c+XPikHEWx/Z+FPD595vUaq6aiuzSs +tAvk6Z1OiArBKQcurQoHBJqfOQiu9f3+VebZ7sG3f7aRLDWpfF3JB6sHKlGl1tXY +HWBg+46ezLT3JJn820YZ2j8zZY0fGx5OwMwM64WLyIaxIiHhh+TtLaOy5+97S4LI +syet2sEZHPc3IPoBxMzeWbAt+qi74baX1U91M2/eHVPArEm3SXs0I6MFn/aQJ3IN +yAUJEnGE62qXdU0pa/LQ +-----END CERTIFICATE----- diff --git a/shotover-proxy/examples/redis-tls/tls_keys/redis.crt b/shotover-proxy/examples/redis-tls/tls_keys/redis.crt new file mode 100644 index 000000000..ee6007a8e --- /dev/null +++ b/shotover-proxy/examples/redis-tls/tls_keys/redis.crt @@ -0,0 +1,23 @@ +-----BEGIN CERTIFICATE----- +MIID6DCCAdACFDSqN+WRPnbixT+8vJtDyZZnjxhaMA0GCSqGSIb3DQEBCwUAMDUx +EzARBgNVBAoMClJlZGlzIFRlc3QxHjAcBgNVBAMMFUNlcnRpZmljYXRlIEF1dGhv +cml0eTAeFw0yMTA4MTYwNTU3MTdaFw0yMjA4MTYwNTU3MTdaMCwxEzARBgNVBAoM +ClJlZGlzIFRlc3QxFTATBgNVBAMMDEdlbmVyaWMtY2VydDCCASIwDQYJKoZIhvcN +AQEBBQADggEPADCCAQoCggEBAN7ySN3uxTS3ZzPewalkuJI0vZ9JD77ImvNvkgTR +CR83dnfgf9eY7y2PHgPKEit9Gh4e633QeZAiMjvjHTM9MZCkyhusp2/yDOX8WUWx +EZQbqf7UIzoxsXi3+TeKkpuxCUi1DYfMTd4EtJqFANZ7FM9j6/2AuWCj8s2hjXIs +iUSZWiYdqS1/BkGTxRKQs5SJD8ZtgCj2Yf8IdxpW87RTQPP2yGaHdcX9IY0S2BIY +FuZ8uQ8k+fWBWuRgh2CZmS96/4s8enyo32AYgODIJb+FG+gpfuCXGhqYsyCHetJ2 +6nDODoB/nMGGdGwQ65KTcwnie6M2Fuvbu8AyTH9UhURBjY8CAwEAATANBgkqhkiG +9w0BAQsFAAOCAgEAX2uLRTsYvnIWq13p0h3BwTZa2EHUC2XeTVRgMPxLqz7loUo6 +ty29aDaFuLgkn0Fc5jU4kChYtSRnZXbAanmVnwtk6uOYDPFLm8MHoH61+fipXib/ +IINoz4JindM8NiKQwA5xEcefnNpmQVt5TTx3H7cR5WBa0o/eCyGMyFWWWhbr5+Xg +Tye5y2uE2AQnUYLW1rb2K5VRLROvh8VN6O+T50vHLblUJ7YGhF17gDUPtO6qMtgQ +A4+yrR8JBMriweZlNULtOOsd50ubQTEGPyNCnXXw5/+g+/YG0WAgb2DccRavX4ux +sSMij9iP9pWTAosw110sYL/syS3xLTwk4TGZv1lw0gGJW85OI6TKGVP0d0Gs9NGg +e5nvOk48btMXqHfXMFO0AqkGcZd/cGNGsng06nYEnmaBkLXhTelphLZmTOmwWDTE +L9Y2/c31ltnWP1aO95QfDKQUWQN8j+VZJn5ZhFZtc3DqhUxFDX1q1GaicQGGE+4w +66bfO33XZwo9VvKAJ8SCmLWNMjyqGiWhNedfVbV73GGwlQEGuvfKipJrilcDyvII +oRRI+88gpjgzQAmhkQQVfv0Fed232JRB3moLR+H2GGBVLRUeVGJiYIGGTTiKpdkF +FMkekmq5U+olKjzNbD97oFujRf4kGjyvFv5iDUAWyA34aTvXG/ax3RfSSUc= +-----END CERTIFICATE----- diff --git a/shotover-proxy/examples/redis-tls/tls_keys/redis.key b/shotover-proxy/examples/redis-tls/tls_keys/redis.key new file mode 100644 index 000000000..5ccebf0e9 --- /dev/null +++ b/shotover-proxy/examples/redis-tls/tls_keys/redis.key @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEpQIBAAKCAQEA3vJI3e7FNLdnM97BqWS4kjS9n0kPvsia82+SBNEJHzd2d+B/ +15jvLY8eA8oSK30aHh7rfdB5kCIyO+MdMz0xkKTKG6ynb/IM5fxZRbERlBup/tQj +OjGxeLf5N4qSm7EJSLUNh8xN3gS0moUA1nsUz2Pr/YC5YKPyzaGNciyJRJlaJh2p +LX8GQZPFEpCzlIkPxm2AKPZh/wh3GlbztFNA8/bIZod1xf0hjRLYEhgW5ny5DyT5 +9YFa5GCHYJmZL3r/izx6fKjfYBiA4Mglv4Ub6Cl+4JcaGpizIId60nbqcM4OgH+c +wYZ0bBDrkpNzCeJ7ozYW69u7wDJMf1SFREGNjwIDAQABAoIBAQCd3Mb1xq0Cv96m +ntNYI4aBmDCwieZJO/hj6Rtmp7Ei1n3fMiqzuYmaI92n5zxoaMPGkjRDOQvqoBwb +xZwWfP0Mo3Ksl9tTa/vwGXgI3fFJgzEizIlJGojEptgjmM5oybl+Lx0ui96TF5fe +VdEbDbnVg7ZFIRGiOVSigAMM1jY4SF9kTl6El5wLa/dEIV9Pmq9+A65nBOKZTHbU +urJsMtUOFH98GfcnfSvNtGSe7lNZZbUIdCEz0X3DBNk54zalvHSliYVCwM1Zlp8t +LgJzeCmfpqxDplpgaAFwOyc1TeXWw8fduDlWw+RIe7Ck4d2FIk9C/ATkdzmGa9YV +D8cr2tiJAoGBAPPYp5AHipyreb5AJgCgaceNrZdDITB/XBr++Suyybz1Ir8Z+3W5 +yCxgAj2qoGhMOS6CGp+KttTk7LzFInhXA+hlaICrsWiChgz1BWpI0VTvsOv09skh +i9rYCoVUQ9nB9nvztXbix2SW4vfXqS+7karQwH10wDAGqaR2yfbn0uxVAoGBAOoO +9YL9qRLXp+qhn8F2Z0Hc1flbCEj+PGBs5NmazJuQIjnluaHqJTHuejexabZcS4bl +5WvPubHH2igwCB/5ZiWNQNIF7+lVcvAAoQ4gxByLvHqRcp3CqNFgAJhCVQ1srAgX +JCEvwGK1DmQAdINtCIdmQRqOwb7xeHqSPM2d8TZTAoGAPwO2CZppT6NgirG5IGBT +9aW/Pl+yq/29p5xMd+Z0C8itegUU3o9sE0ucSKNXYJySClrE1oXaSZn/M6keB0s1 +T1EleFrmNcPFMIQBKj43GmP2rINZYxCwO5Wo4lusTRG6yL1qH5brQCtd6/5nUlZ+ +hk378G/DWqXeIQoxlwTBlSECgYEAx0p5bVGCtpJ9XWDE4Dtq7D+WybzjLxOaYRgX +O9l8wjBVlCqwhtcRWJSP5//d3PJ1NKVnVQcIPAHJFVLgeCko+mxtduarQmgJd6Vx +fNAVa6DnmQ1jJETs7Wnq17oTJV0UlcbucntwOhuj5y4kBwu9qVw9rtlCysxcIzGF +KCaFjhUCgYEArjavtpDghEWHzHtzk/gAV6JJMecsylE3N5aP/U0PWq3pQ/XQ9SdE +BxSRd7wEURd+OjvJfwkxJ8rO2XqfwdTAiOV0SRzCHN+Tuz9tcZxVc7tQwEVILAJw +UkIgBfGBOSEXSX3NBX3yk6Rri79Qkd7O1FDZ8y500/LEz61qPAsldJ0= +-----END RSA PRIVATE KEY----- diff --git a/shotover-proxy/examples/redis-tls/topology.yaml b/shotover-proxy/examples/redis-tls/topology.yaml new file mode 100644 index 000000000..90096e926 --- /dev/null +++ b/shotover-proxy/examples/redis-tls/topology.yaml @@ -0,0 +1,16 @@ +--- +sources: + redis_prod: + Redis: + batch_size_hint: 1 + listen_addr: "127.0.0.1:6379" + tls: + certificate_authority_path: "examples/redis-tls/tls_keys/ca.crt" + certificate_path: "examples/redis-tls/tls_keys/redis.crt" + private_key_path: "examples/redis-tls/tls_keys/redis.key" +chain_config: + redis_chain: + - RedisDestination: + remote_address: "127.0.0.1:1111" +source_to_chain_mapping: + redis_prod: redis_chain diff --git a/shotover-proxy/src/lib.rs b/shotover-proxy/src/lib.rs index b0df9c056..4f5bca030 100644 --- a/shotover-proxy/src/lib.rs +++ b/shotover-proxy/src/lib.rs @@ -7,4 +7,5 @@ pub mod protocols; pub mod runner; pub mod server; pub mod sources; +pub mod tls; pub mod transforms; diff --git a/shotover-proxy/src/server.rs b/shotover-proxy/src/server.rs index 1244f88eb..2a74e27e0 100644 --- a/shotover-proxy/src/server.rs +++ b/shotover-proxy/src/server.rs @@ -1,11 +1,14 @@ use crate::message::Messages; +use crate::tls::TlsAcceptor; use crate::transforms::chain::TransformChain; use crate::transforms::Wrapper; use anyhow::Result; use futures::StreamExt; use metrics::gauge; use std::sync::Arc; +use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::{mpsc, watch, Semaphore}; use tokio::time; use tokio::time::timeout; @@ -69,6 +72,8 @@ pub struct TcpCodecListener { /// Used as part of the graceful shutdown process to wait for client /// connections to complete processing. pub shutdown_complete_tx: mpsc::Sender<()>, + + pub tls: Option, } impl TcpCodecListener { @@ -179,6 +184,8 @@ impl TcpCodecListener { // Notifies the receiver half once all clones are // dropped. _shutdown_complete: self.shutdown_complete_tx.clone(), + + tls: self.tls.clone(), }; // Spawn a new task to process the connections. Tokio tasks are like @@ -266,6 +273,46 @@ pub struct Handler { shutdown: Shutdown, _shutdown_complete: mpsc::Sender<()>, + + tls: Option, +} + +fn spawn_read_write_tasks< + C: Codec + 'static, + R: AsyncRead + Unpin + Send + 'static, + W: AsyncWrite + Unpin + Send + 'static, +>( + codec: C, + rx: R, + tx: W, + in_tx: UnboundedSender, + out_rx: UnboundedReceiver, +) { + let mut reader = FramedRead::new(rx, codec.clone()); + let writer = FramedWrite::new(tx, codec); + + tokio::spawn(async move { + while let Some(message) = reader.next().await { + match message { + Ok(message) => { + if let Err(error) = in_tx.send(message) { + warn!("failed to send message: {}", error); + return; + } + } + Err(error) => { + warn!("failed to decode message: {}", error); + return; + } + } + } + }); + + tokio::spawn(async move { + let rx_stream = UnboundedReceiverStream::new(out_rx).map(Ok); + let r = rx_stream.forward(writer).await; + debug!("Stream ended {:?}", r); + }); } impl Handler { @@ -287,33 +334,17 @@ impl Handler { // new request frame. let mut idle_time_seconds: u64 = 1; - let (in_tx, mut in_rx) = tokio::sync::mpsc::unbounded_channel::(); - let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::(); - - let (rx, tx) = stream.into_split(); - - let mut reader = FramedRead::new(rx, self.codec.clone()); - let writer = FramedWrite::new(tx, self.codec.clone()); - - tokio::spawn(async move { - while let Some(maybe_message) = reader.next().await { - match maybe_message { - Ok(resp_messages) => { - let _ = in_tx.send(resp_messages); - } - Err(e) => { - warn!("Frame error - {:?}", e); - break; - } - }; - } - }); - - tokio::spawn(async move { - let rx_stream = UnboundedReceiverStream::new(out_rx).map(Ok); - let r = rx_stream.forward(writer).await; - debug!("Stream ended {:?}", r); - }); + let (in_tx, mut in_rx) = mpsc::unbounded_channel::(); + let (out_tx, out_rx) = mpsc::unbounded_channel::(); + + if let Some(tls) = &self.tls { + let tls_stream = tls.accept(stream).await?; + let (rx, tx) = tokio::io::split(tls_stream); + spawn_read_write_tasks(self.codec.clone(), rx, tx, in_tx, out_rx); + } else { + let (rx, tx) = stream.into_split(); + spawn_read_write_tasks(self.codec.clone(), rx, tx, in_tx, out_rx); + }; while !self.shutdown.is_shutdown() { // While reading a request frame, also listen for the shutdown signal diff --git a/shotover-proxy/src/sources/cassandra_source.rs b/shotover-proxy/src/sources/cassandra_source.rs index 50e7fd946..8bb4cd273 100644 --- a/shotover-proxy/src/sources/cassandra_source.rs +++ b/shotover-proxy/src/sources/cassandra_source.rs @@ -82,6 +82,7 @@ impl CassandraSource { limit_connections: Arc::new(Semaphore::new(connection_limit.unwrap_or(512))), trigger_shutdown_rx: trigger_shutdown_rx.clone(), shutdown_complete_tx, + tls: None, }; let join_handle = Handle::current().spawn(async move { diff --git a/shotover-proxy/src/sources/redis_source.rs b/shotover-proxy/src/sources/redis_source.rs index ae344a5ab..fd071d93c 100644 --- a/shotover-proxy/src/sources/redis_source.rs +++ b/shotover-proxy/src/sources/redis_source.rs @@ -1,9 +1,10 @@ -use crate::transforms::chain::TransformChain; - use crate::config::topology::TopicHolder; use crate::protocols::redis_codec::RedisCodec; use crate::server::TcpCodecListener; use crate::sources::{Sources, SourcesFromConfig}; +use crate::tls::{TlsAcceptor, TlsConfig}; +use crate::transforms::chain::TransformChain; +use anyhow::Result; use async_trait::async_trait; use serde::{Deserialize, Serialize}; use std::sync::Arc; @@ -12,14 +13,13 @@ use tokio::sync::{mpsc, watch, Semaphore}; use tokio::task::JoinHandle; use tracing::{error, info}; -use anyhow::Result; - #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] pub struct RedisConfig { pub listen_addr: String, pub batch_size_hint: u64, pub connection_limit: Option, pub hard_connection_limit: Option, + pub tls: Option, } #[async_trait] @@ -31,18 +31,18 @@ impl SourcesFromConfig for RedisConfig { trigger_shutdown_rx: watch::Receiver, shutdown_complete_tx: mpsc::Sender<()>, ) -> Result> { - Ok(vec![Sources::Redis( - RedisSource::new( - chain, - self.listen_addr.clone(), - self.batch_size_hint, - trigger_shutdown_rx, - shutdown_complete_tx, - self.connection_limit, - self.hard_connection_limit, - ) - .await, - )]) + RedisSource::new( + chain, + self.listen_addr.clone(), + self.batch_size_hint, + trigger_shutdown_rx, + shutdown_complete_tx, + self.connection_limit, + self.hard_connection_limit, + self.tls.clone(), + ) + .await + .map(|x| vec![Sources::Redis(x)]) } } @@ -62,7 +62,8 @@ impl RedisSource { shutdown_complete_tx: mpsc::Sender<()>, connection_limit: Option, hard_connection_limit: Option, - ) -> RedisSource { + tls: Option, + ) -> Result { info!("Starting Redis source on [{}]", listen_addr); let name = "Redis Source"; @@ -76,6 +77,7 @@ impl RedisSource { limit_connections: Arc::new(Semaphore::new(connection_limit.unwrap_or(512))), trigger_shutdown_rx: trigger_shutdown_rx.clone(), shutdown_complete_tx, + tls: tls.map(|x| TlsAcceptor::new(x)).transpose()?, }; let join_handle = Handle::current().spawn(async move { @@ -104,10 +106,10 @@ impl RedisSource { Ok(()) }); - RedisSource { + Ok(RedisSource { name, join_handle, listen_addr, - } + }) } } diff --git a/shotover-proxy/src/tls.rs b/shotover-proxy/src/tls.rs new file mode 100644 index 000000000..7185f31ec --- /dev/null +++ b/shotover-proxy/src/tls.rs @@ -0,0 +1,82 @@ +use anyhow::{anyhow, Result}; +use openssl::ssl::Ssl; +use openssl::ssl::{SslAcceptor, SslConnector, SslFiletype, SslMethod}; +use serde::{Deserialize, Serialize}; +use std::pin::Pin; +use std::sync::Arc; +use tokio::net::TcpStream; +use tokio_openssl::SslStream; +use tracing::warn; + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct TlsConfig { + /// Path to the certificate authority in PEM format + pub certificate_authority_path: String, + /// Path to the certificate in PEM format + pub certificate_path: String, + /// Path to the private key in PEM format + pub private_key_path: String, +} + +#[derive(Clone)] +pub struct TlsAcceptor { + acceptor: Arc, +} + +impl TlsAcceptor { + pub fn new(tls_config: TlsConfig) -> Result { + let mut builder = SslAcceptor::mozilla_modern_v5(SslMethod::tls())?; + builder.set_ca_file(tls_config.certificate_authority_path)?; + builder.set_private_key_file(tls_config.private_key_path, SslFiletype::PEM)?; + builder.set_certificate_chain_file(tls_config.certificate_path)?; + builder.check_private_key()?; + + Ok(TlsAcceptor { + acceptor: Arc::new(builder.build()), + }) + } + + pub async fn accept(&self, tcp_stream: TcpStream) -> Result> { + let ssl = Ssl::new(self.acceptor.context())?; + let mut ssl_stream = SslStream::new(ssl, tcp_stream)?; + + Pin::new(&mut ssl_stream) + .accept() + .await + .map_err(|x| anyhow!("Failed to accept TLS connection: {}", x))?; + + Ok(ssl_stream) + } +} + +#[derive(Clone)] +pub struct TlsConnector { + connector: Arc, +} + +impl TlsConnector { + pub fn new(tls_config: TlsConfig) -> Result { + let mut builder = SslConnector::builder(SslMethod::tls())?; + builder.set_ca_file(tls_config.certificate_authority_path)?; + builder.set_private_key_file(tls_config.private_key_path, SslFiletype::PEM)?; + builder.set_certificate_chain_file(tls_config.certificate_path)?; + + Ok(TlsConnector { + connector: Arc::new(builder.build()), + }) + } + + pub async fn connect(&self, tcp_stream: TcpStream) -> Result> { + warn!("Disabling TLS hostname verification for compatibility with redis, this needs to be investigated properly"); + let ssl = self + .connector + .configure()? + .verify_hostname(false) + .into_ssl("localhost")?; + + let mut ssl_stream = SslStream::new(ssl, tcp_stream)?; + Pin::new(&mut ssl_stream).connect().await?; + + Ok(ssl_stream) + } +} diff --git a/shotover-proxy/tests/helpers/mod.rs b/shotover-proxy/tests/helpers/mod.rs index 87a28cb86..437151475 100644 --- a/shotover-proxy/tests/helpers/mod.rs +++ b/shotover-proxy/tests/helpers/mod.rs @@ -1,7 +1,9 @@ use anyhow::Result; use redis::{Client, Connection}; use shotover_proxy::runner::{ConfigOpts, Runner}; +use shotover_proxy::tls::{TlsConfig, TlsConnector}; use std::net::TcpStream; +use std::pin::Pin; use std::thread; use std::time::Duration; use tokio::runtime::{Handle as RuntimeHandle, Runtime}; @@ -58,6 +60,47 @@ impl ShotoverManager { .get_connection() .unwrap() } + + #[allow(unused)] + pub async fn async_redis_connection(&self, port: u16) -> redis::aio::Connection { + use redis::aio::AsyncStream; + use tokio::net::TcpStream; + + ShotoverManager::wait_for_socket_to_open(port); + + let stream = Box::pin(TcpStream::connect(("127.0.0.1", port)).await.unwrap()); + let connection_info = Default::default(); + redis::aio::Connection::new( + &connection_info, + stream as Pin>, + ) + .await + .unwrap() + } + + #[allow(unused)] + pub async fn async_tls_redis_connection( + &self, + port: u16, + config: TlsConfig, + ) -> redis::aio::Connection { + use redis::aio::AsyncStream; + use tokio::net::TcpStream; + + ShotoverManager::wait_for_socket_to_open(port); + + let tcp_stream = TcpStream::connect(("127.0.0.1", port)).await.unwrap(); + let connector = TlsConnector::new(config).unwrap(); + let tls_stream = connector.connect(tcp_stream).await.unwrap(); + + let connection_info = Default::default(); + redis::aio::Connection::new( + &connection_info, + Box::pin(tls_stream) as Pin>, + ) + .await + .unwrap() + } } impl Drop for ShotoverManager { diff --git a/shotover-proxy/tests/redis_int_tests/basic_driver_tests.rs b/shotover-proxy/tests/redis_int_tests/basic_driver_tests.rs index 7c689ef45..2ab4de8be 100644 --- a/shotover-proxy/tests/redis_int_tests/basic_driver_tests.rs +++ b/shotover-proxy/tests/redis_int_tests/basic_driver_tests.rs @@ -1,6 +1,7 @@ #![allow(clippy::let_unit_value)] use crate::helpers::ShotoverManager; +use shotover_proxy::tls::TlsConfig; use test_helpers::docker_compose::DockerCompose; @@ -610,6 +611,43 @@ fn test_pass_through() { run_all(&mut connection); } +#[tokio::test(flavor = "multi_thread")] +#[serial(redis)] +async fn test_tls() { + let _compose = DockerCompose::new("examples/redis-tls/docker-compose.yml"); + let shotover_manager = ShotoverManager::from_topology_file("examples/redis-tls/topology.yaml"); + + let tls_config = TlsConfig { + certificate_authority_path: "examples/redis-tls/tls_keys/ca.crt".into(), + certificate_path: "examples/redis-tls/tls_keys/redis.crt".into(), + private_key_path: "examples/redis-tls/tls_keys/redis.key".into(), + }; + + let mut connection = shotover_manager + .async_tls_redis_connection(6379, tls_config) + .await; + + redis::cmd("SET") + .arg("key1") + .arg(b"foo") + .query_async::<_, ()>(&mut connection) + .await + .unwrap(); + redis::cmd("SET") + .arg(&["key2", "bar"]) + .query_async::<_, ()>(&mut connection) + .await + .unwrap(); + + assert_eq!( + redis::cmd("MGET") + .arg(&["key1", "key2"]) + .query_async(&mut connection) + .await, + Ok(("foo".to_string(), b"bar".to_vec())) + ); +} + // #[test] // #[serial(redis)] #[allow(dead_code)]