Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement TLS support for incoming redis connections #141

Merged
merged 3 commits into from
Sep 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions shotover-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ rand_distr = "0.4.1"
cached = "0.25.0"
pin-project = "1.0.1"
pin-project-lite = "0.2"
tokio-openssl = "0.6.2"
openssl = { version = "0.10.36", features = ["vendored"] }

# Error handling
thiserror = "1.0"
Expand Down
6 changes: 6 additions & 0 deletions shotover-proxy/examples/redis-tls/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
version: "3.3"
services:
redis-one:
image: library/redis:5.0.9
ports:
- "1111:6379"
3 changes: 3 additions & 0 deletions shotover-proxy/examples/redis-tls/redis-cli.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/sh

redis-cli --tls --cert tls_keys/redis.crt --key tls_keys/redis.key --cacert tls_keys/ca.crt "$@"
31 changes: 31 additions & 0 deletions shotover-proxy/examples/redis-tls/tls_keys/ca.crt
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
-----BEGIN CERTIFICATE-----
MIIFSzCCAzOgAwIBAgIUWPH3dRgC/YwYZZCXib8gwJ+O8xYwDQYJKoZIhvcNAQEL
BQAwNTETMBEGA1UECgwKUmVkaXMgVGVzdDEeMBwGA1UEAwwVQ2VydGlmaWNhdGUg
QXV0aG9yaXR5MB4XDTIxMDgxNjA1NTcxN1oXDTMxMDgxNDA1NTcxN1owNTETMBEG
A1UECgwKUmVkaXMgVGVzdDEeMBwGA1UEAwwVQ2VydGlmaWNhdGUgQXV0aG9yaXR5
MIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEAw+X26W/dGC+2sJHiJkbf
Hq39KSWoXrCvXFP6FXuGqdEIZzaUokOSrNR0aueasOyc3hQ+B+N1DyPlPXDYxcll
yZdcbRvUT85tprheke6P1ofA369fWpm6aZIQf1ahsBpZ5KLwavqo1MC/5eqvvloy
vwomLvmjONSjnbvbofgRkFH0mETo91qh9tAK6wLSEUwZhKFsBIajP5cQ+IlL8QVi
0sIc5fIrmAPUdK2NwI1ly1L48BuBT2n46nm3J9ZX5Px+9NvsDT7VA+g5o1Pfer62
gfmEdQdWzdRBWRw6HLMuvN3w6mL/3++W5LzhF846szebzhue84XzuGSJT5i6s3LP
dgWTweJTLP7imb2JFaZDcAwDPMXUhYqhU6wst+mlsqUX9XHR9MajZluxIQLXAom/
ZUSVYjlOvtTmFAOEqe/eKxM/ZX4+umnQDIMirvtxOV8jtO02/Yog2nSs3XLNQXL5
tIwDX2A/HErzvSOsMMwKggUBChBorELaSzy1bMNoSw0t6CQ+74fT7iV7nWKKvpmf
Zfeb7WOu2XF/zjNyy9ghQkiARTupCzZY2G7kCMCtXy7Q/I3ip5LWhiywwDOgkibI
Elp0AZEy3MvLfM/7kFroWd7NFQ739o7nkU1G6hhgKIGzUqP83ozwzjHuDvrE2yXw
Pi41ze7Iy+GvlKPgkwsVR9MCAwEAAaNTMFEwHQYDVR0OBBYEFLw7OfSQI/lsTRe/
4Pu/ctJDj/a6MB8GA1UdIwQYMBaAFLw7OfSQI/lsTRe/4Pu/ctJDj/a6MA8GA1Ud
EwEB/wQFMAMBAf8wDQYJKoZIhvcNAQELBQADggIBACwGR6DnefMO3n52bfujvwan
qRnH87IlnFrGDW1yXNAfs/9hh8UHPRNgvOzU/OlJBMWF0r+T0QwChcgqwZ/kQ5RG
GbkZeAA69Ail+LYra6p9r6tuTyZtGkpOLI+w73Qb6GS4q5agYMJjz7OU3M7m7dsw
lkYfqSxHtXc5otdxG9dlblRW5jpEoD9ry5Oh7f1FIGMgncOvYGX2zgLUr2Qn65aL
+Ung0KTmZtwPjzfWzHQEPx0hFcnh/K3xilhJ4P+8lc71aWpjnffvrONlX0dJzSzm
3xkCkorHagnAFoJwIM5/boe77PaHb4A4ehmkzKYIzMxbfGnePkUwFmGhmxgTHgtu
67voFBfIuWoEZNdl44Mb28JY7zYTJ81fi/RSnxqMg+2oxaqZlF+Y/c81fHBzOaSM
/txPmt5WWqC6aId+Mmmnmx1IpEvooc0KI3c+XPikHEWx/Z+FPD595vUaq6aiuzSs
tAvk6Z1OiArBKQcurQoHBJqfOQiu9f3+VebZ7sG3f7aRLDWpfF3JB6sHKlGl1tXY
HWBg+46ezLT3JJn820YZ2j8zZY0fGx5OwMwM64WLyIaxIiHhh+TtLaOy5+97S4LI
syet2sEZHPc3IPoBxMzeWbAt+qi74baX1U91M2/eHVPArEm3SXs0I6MFn/aQJ3IN
yAUJEnGE62qXdU0pa/LQ
-----END CERTIFICATE-----
23 changes: 23 additions & 0 deletions shotover-proxy/examples/redis-tls/tls_keys/redis.crt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
-----BEGIN CERTIFICATE-----
MIID6DCCAdACFDSqN+WRPnbixT+8vJtDyZZnjxhaMA0GCSqGSIb3DQEBCwUAMDUx
EzARBgNVBAoMClJlZGlzIFRlc3QxHjAcBgNVBAMMFUNlcnRpZmljYXRlIEF1dGhv
cml0eTAeFw0yMTA4MTYwNTU3MTdaFw0yMjA4MTYwNTU3MTdaMCwxEzARBgNVBAoM
ClJlZGlzIFRlc3QxFTATBgNVBAMMDEdlbmVyaWMtY2VydDCCASIwDQYJKoZIhvcN
AQEBBQADggEPADCCAQoCggEBAN7ySN3uxTS3ZzPewalkuJI0vZ9JD77ImvNvkgTR
CR83dnfgf9eY7y2PHgPKEit9Gh4e633QeZAiMjvjHTM9MZCkyhusp2/yDOX8WUWx
EZQbqf7UIzoxsXi3+TeKkpuxCUi1DYfMTd4EtJqFANZ7FM9j6/2AuWCj8s2hjXIs
iUSZWiYdqS1/BkGTxRKQs5SJD8ZtgCj2Yf8IdxpW87RTQPP2yGaHdcX9IY0S2BIY
FuZ8uQ8k+fWBWuRgh2CZmS96/4s8enyo32AYgODIJb+FG+gpfuCXGhqYsyCHetJ2
6nDODoB/nMGGdGwQ65KTcwnie6M2Fuvbu8AyTH9UhURBjY8CAwEAATANBgkqhkiG
9w0BAQsFAAOCAgEAX2uLRTsYvnIWq13p0h3BwTZa2EHUC2XeTVRgMPxLqz7loUo6
ty29aDaFuLgkn0Fc5jU4kChYtSRnZXbAanmVnwtk6uOYDPFLm8MHoH61+fipXib/
IINoz4JindM8NiKQwA5xEcefnNpmQVt5TTx3H7cR5WBa0o/eCyGMyFWWWhbr5+Xg
Tye5y2uE2AQnUYLW1rb2K5VRLROvh8VN6O+T50vHLblUJ7YGhF17gDUPtO6qMtgQ
A4+yrR8JBMriweZlNULtOOsd50ubQTEGPyNCnXXw5/+g+/YG0WAgb2DccRavX4ux
sSMij9iP9pWTAosw110sYL/syS3xLTwk4TGZv1lw0gGJW85OI6TKGVP0d0Gs9NGg
e5nvOk48btMXqHfXMFO0AqkGcZd/cGNGsng06nYEnmaBkLXhTelphLZmTOmwWDTE
L9Y2/c31ltnWP1aO95QfDKQUWQN8j+VZJn5ZhFZtc3DqhUxFDX1q1GaicQGGE+4w
66bfO33XZwo9VvKAJ8SCmLWNMjyqGiWhNedfVbV73GGwlQEGuvfKipJrilcDyvII
oRRI+88gpjgzQAmhkQQVfv0Fed232JRB3moLR+H2GGBVLRUeVGJiYIGGTTiKpdkF
FMkekmq5U+olKjzNbD97oFujRf4kGjyvFv5iDUAWyA34aTvXG/ax3RfSSUc=
-----END CERTIFICATE-----
27 changes: 27 additions & 0 deletions shotover-proxy/examples/redis-tls/tls_keys/redis.key
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEpQIBAAKCAQEA3vJI3e7FNLdnM97BqWS4kjS9n0kPvsia82+SBNEJHzd2d+B/
15jvLY8eA8oSK30aHh7rfdB5kCIyO+MdMz0xkKTKG6ynb/IM5fxZRbERlBup/tQj
OjGxeLf5N4qSm7EJSLUNh8xN3gS0moUA1nsUz2Pr/YC5YKPyzaGNciyJRJlaJh2p
LX8GQZPFEpCzlIkPxm2AKPZh/wh3GlbztFNA8/bIZod1xf0hjRLYEhgW5ny5DyT5
9YFa5GCHYJmZL3r/izx6fKjfYBiA4Mglv4Ub6Cl+4JcaGpizIId60nbqcM4OgH+c
wYZ0bBDrkpNzCeJ7ozYW69u7wDJMf1SFREGNjwIDAQABAoIBAQCd3Mb1xq0Cv96m
ntNYI4aBmDCwieZJO/hj6Rtmp7Ei1n3fMiqzuYmaI92n5zxoaMPGkjRDOQvqoBwb
xZwWfP0Mo3Ksl9tTa/vwGXgI3fFJgzEizIlJGojEptgjmM5oybl+Lx0ui96TF5fe
VdEbDbnVg7ZFIRGiOVSigAMM1jY4SF9kTl6El5wLa/dEIV9Pmq9+A65nBOKZTHbU
urJsMtUOFH98GfcnfSvNtGSe7lNZZbUIdCEz0X3DBNk54zalvHSliYVCwM1Zlp8t
LgJzeCmfpqxDplpgaAFwOyc1TeXWw8fduDlWw+RIe7Ck4d2FIk9C/ATkdzmGa9YV
D8cr2tiJAoGBAPPYp5AHipyreb5AJgCgaceNrZdDITB/XBr++Suyybz1Ir8Z+3W5
yCxgAj2qoGhMOS6CGp+KttTk7LzFInhXA+hlaICrsWiChgz1BWpI0VTvsOv09skh
i9rYCoVUQ9nB9nvztXbix2SW4vfXqS+7karQwH10wDAGqaR2yfbn0uxVAoGBAOoO
9YL9qRLXp+qhn8F2Z0Hc1flbCEj+PGBs5NmazJuQIjnluaHqJTHuejexabZcS4bl
5WvPubHH2igwCB/5ZiWNQNIF7+lVcvAAoQ4gxByLvHqRcp3CqNFgAJhCVQ1srAgX
JCEvwGK1DmQAdINtCIdmQRqOwb7xeHqSPM2d8TZTAoGAPwO2CZppT6NgirG5IGBT
9aW/Pl+yq/29p5xMd+Z0C8itegUU3o9sE0ucSKNXYJySClrE1oXaSZn/M6keB0s1
T1EleFrmNcPFMIQBKj43GmP2rINZYxCwO5Wo4lusTRG6yL1qH5brQCtd6/5nUlZ+
hk378G/DWqXeIQoxlwTBlSECgYEAx0p5bVGCtpJ9XWDE4Dtq7D+WybzjLxOaYRgX
O9l8wjBVlCqwhtcRWJSP5//d3PJ1NKVnVQcIPAHJFVLgeCko+mxtduarQmgJd6Vx
fNAVa6DnmQ1jJETs7Wnq17oTJV0UlcbucntwOhuj5y4kBwu9qVw9rtlCysxcIzGF
KCaFjhUCgYEArjavtpDghEWHzHtzk/gAV6JJMecsylE3N5aP/U0PWq3pQ/XQ9SdE
BxSRd7wEURd+OjvJfwkxJ8rO2XqfwdTAiOV0SRzCHN+Tuz9tcZxVc7tQwEVILAJw
UkIgBfGBOSEXSX3NBX3yk6Rri79Qkd7O1FDZ8y500/LEz61qPAsldJ0=
-----END RSA PRIVATE KEY-----
16 changes: 16 additions & 0 deletions shotover-proxy/examples/redis-tls/topology.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
---
sources:
redis_prod:
Redis:
batch_size_hint: 1
listen_addr: "127.0.0.1:6379"
tls:
certificate_authority_path: "examples/redis-tls/tls_keys/ca.crt"
certificate_path: "examples/redis-tls/tls_keys/redis.crt"
private_key_path: "examples/redis-tls/tls_keys/redis.key"
chain_config:
redis_chain:
- RedisDestination:
remote_address: "127.0.0.1:1111"
source_to_chain_mapping:
redis_prod: redis_chain
1 change: 1 addition & 0 deletions shotover-proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ pub mod protocols;
pub mod runner;
pub mod server;
pub mod sources;
pub mod tls;
pub mod transforms;
85 changes: 58 additions & 27 deletions shotover-proxy/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use crate::message::Messages;
use crate::tls::TlsAcceptor;
use crate::transforms::chain::TransformChain;
use crate::transforms::Wrapper;
use anyhow::Result;
use futures::StreamExt;
use metrics::gauge;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::{mpsc, watch, Semaphore};
use tokio::time;
use tokio::time::timeout;
Expand Down Expand Up @@ -69,6 +72,8 @@ pub struct TcpCodecListener<C: Codec> {
/// Used as part of the graceful shutdown process to wait for client
/// connections to complete processing.
pub shutdown_complete_tx: mpsc::Sender<()>,

pub tls: Option<TlsAcceptor>,
}

impl<C: Codec + 'static> TcpCodecListener<C> {
Expand Down Expand Up @@ -179,6 +184,8 @@ impl<C: Codec + 'static> TcpCodecListener<C> {
// Notifies the receiver half once all clones are
// dropped.
_shutdown_complete: self.shutdown_complete_tx.clone(),

tls: self.tls.clone(),
};

// Spawn a new task to process the connections. Tokio tasks are like
Expand Down Expand Up @@ -266,6 +273,46 @@ pub struct Handler<C: Codec> {
shutdown: Shutdown,

_shutdown_complete: mpsc::Sender<()>,

tls: Option<TlsAcceptor>,
}

fn spawn_read_write_tasks<
C: Codec + 'static,
R: AsyncRead + Unpin + Send + 'static,
W: AsyncWrite + Unpin + Send + 'static,
>(
codec: C,
rx: R,
tx: W,
in_tx: UnboundedSender<Messages>,
out_rx: UnboundedReceiver<Messages>,
) {
let mut reader = FramedRead::new(rx, codec.clone());
let writer = FramedWrite::new(tx, codec);

tokio::spawn(async move {
while let Some(message) = reader.next().await {
match message {
Ok(message) => {
if let Err(error) = in_tx.send(message) {
warn!("failed to send message: {}", error);
return;
}
}
Err(error) => {
warn!("failed to decode message: {}", error);
return;
}
}
}
});

tokio::spawn(async move {
let rx_stream = UnboundedReceiverStream::new(out_rx).map(Ok);
let r = rx_stream.forward(writer).await;
debug!("Stream ended {:?}", r);
});
}

impl<C: Codec + 'static> Handler<C> {
Expand All @@ -287,33 +334,17 @@ impl<C: Codec + 'static> Handler<C> {
// new request frame.
let mut idle_time_seconds: u64 = 1;

let (in_tx, mut in_rx) = tokio::sync::mpsc::unbounded_channel::<Messages>();
let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<Messages>();

let (rx, tx) = stream.into_split();

let mut reader = FramedRead::new(rx, self.codec.clone());
let writer = FramedWrite::new(tx, self.codec.clone());

tokio::spawn(async move {
while let Some(maybe_message) = reader.next().await {
match maybe_message {
Ok(resp_messages) => {
let _ = in_tx.send(resp_messages);
}
Err(e) => {
warn!("Frame error - {:?}", e);
break;
}
};
}
});

tokio::spawn(async move {
let rx_stream = UnboundedReceiverStream::new(out_rx).map(Ok);
let r = rx_stream.forward(writer).await;
debug!("Stream ended {:?}", r);
});
let (in_tx, mut in_rx) = mpsc::unbounded_channel::<Messages>();
let (out_tx, out_rx) = mpsc::unbounded_channel::<Messages>();

if let Some(tls) = &self.tls {
let tls_stream = tls.accept(stream).await?;
let (rx, tx) = tokio::io::split(tls_stream);
spawn_read_write_tasks(self.codec.clone(), rx, tx, in_tx, out_rx);
} else {
let (rx, tx) = stream.into_split();
spawn_read_write_tasks(self.codec.clone(), rx, tx, in_tx, out_rx);
};

while !self.shutdown.is_shutdown() {
// While reading a request frame, also listen for the shutdown signal
Expand Down
1 change: 1 addition & 0 deletions shotover-proxy/src/sources/cassandra_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ impl CassandraSource {
limit_connections: Arc::new(Semaphore::new(connection_limit.unwrap_or(512))),
trigger_shutdown_rx: trigger_shutdown_rx.clone(),
shutdown_complete_tx,
tls: None,
};

let join_handle = Handle::current().spawn(async move {
Expand Down
Loading