Skip to content

Commit

Permalink
Update redis-rs
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Jan 21, 2024
1 parent 4da469a commit dd8155d
Show file tree
Hide file tree
Showing 10 changed files with 215 additions and 171 deletions.
226 changes: 125 additions & 101 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ serde = { version = "1.0.111", features = ["derive"] }
serde_yaml = "0.9.17"
uuid = { version = "1.0.0", features = ["serde", "v4"] }
reqwest = "0.11.6"
redis = { version = "0.23.3", features = ["tokio-comp", "cluster"] }
redis = { version = "0.24.0", features = ["tokio-comp", "cluster"] }
cdrs-tokio = "8.0"
cassandra-protocol = "3.0"
tracing = "0.1.15"
Expand Down
5 changes: 3 additions & 2 deletions shotover-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ async-trait.workspace = true
tracing-subscriber.workspace = true
tracing-appender.workspace = true
async-once-cell = "0.5.2"
fred = { version = "7.0.0", features = ["enable-rustls"] }
fred = { version = "8.0.0", features = ["enable-rustls"] }
tokio-bin-process.workspace = true
rustls-pemfile = "1.0.2"
rustls-pemfile = "2.0.0"
rustls-pki-types = "1.1.0"
aws-throwaway.workspace = true
windsock = { path = "../windsock" }
regex = "1.7.0"
Expand Down
66 changes: 24 additions & 42 deletions shotover-proxy/benches/windsock/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ use async_trait::async_trait;
use aws_throwaway::Ec2Instance;
use fred::{
prelude::*,
rustls::{Certificate, ClientConfig, PrivateKey, RootCertStore},
rustls::{ClientConfig, RootCertStore},
};
use itertools::Itertools;
use rustls_pemfile::{certs, Item};
use rustls_pemfile::Item;
use rustls_pki_types::{CertificateDer, PrivateKeyDer};
use shotover::{
config::chain::TransformChainConfig,
sources::SourceConfig,
Expand Down Expand Up @@ -323,14 +324,14 @@ impl Bench for RedisBench {

let mut config = RedisConfig::from_url(address).unwrap();
if let Encryption::Tls = self.encryption {
let private_key = load_private_key("tests/test-configs/redis/tls/certs/localhost.key");
let certs = load_certs("tests/test-configs/redis/tls/certs/localhost.crt");
let private_key =
load_private_key("tests/test-configs/redis/tls/certs/localhost.key").unwrap();
let certs = load_certs("tests/test-configs/redis/tls/certs/localhost.crt").unwrap();
config.tls = Some(
ClientConfig::builder()
.with_safe_defaults()
.with_root_certificates(load_ca(
"tests/test-configs/redis/tls/certs/localhost_CA.crt",
))
.with_root_certificates(
load_ca("tests/test-configs/redis/tls/certs/localhost_CA.crt").unwrap(),
)
.with_client_auth_cert(certs, private_key)
.unwrap()
.into(),
Expand Down Expand Up @@ -378,48 +379,29 @@ impl Bench for RedisBench {
}
}

// TODO: when fred updates its rustls version recopy these functions from shotover/src/tls.rs

fn load_certs(path: &str) -> Vec<Certificate> {
load_certs_inner(path)
.with_context(|| format!("Failed to read certs at {path:?}"))
.unwrap()
}
fn load_certs_inner(path: &str) -> Result<Vec<Certificate>> {
certs(&mut BufReader::new(File::open(path)?))
fn load_certs(path: &str) -> Result<Vec<CertificateDer<'static>>> {
rustls_pemfile::certs(&mut BufReader::new(File::open(path)?))
.collect::<Result<Vec<_>, _>>()
.context("Error while parsing PEM")
.map(|certs| certs.into_iter().map(Certificate).collect())
}

fn load_private_key(path: &str) -> PrivateKey {
load_private_key_inner(path)
.with_context(|| format!("Failed to read private key at {path:?}"))
.unwrap()
}
fn load_private_key_inner(path: &str) -> Result<PrivateKey> {
let keys = rustls_pemfile::read_all(&mut BufReader::new(File::open(path)?))
.context("Error while parsing PEM")?;
keys.into_iter()
.find_map(|item| match item {
Item::RSAKey(x) | Item::PKCS8Key(x) => Some(PrivateKey(x)),
_ => None,
})
.ok_or_else(|| anyhow!("No suitable keys found in PEM"))
fn load_private_key(path: &str) -> Result<PrivateKeyDer<'static>> {
for key in rustls_pemfile::read_all(&mut BufReader::new(File::open(path)?)) {
match key.context("Error while parsing PEM")? {
Item::Pkcs8Key(x) => return Ok(x.into()),
Item::Pkcs1Key(x) => return Ok(x.into()),
_ => {}
}
}
Err(anyhow!("No suitable keys found in PEM"))
}

fn load_ca(path: &str) -> RootCertStore {
load_ca_inner(path)
.with_context(|| format!("Failed to load CA at {path:?}"))
.unwrap()
}
fn load_ca_inner(path: &str) -> Result<RootCertStore> {
fn load_ca(path: &str) -> Result<RootCertStore> {
let mut pem = BufReader::new(File::open(path)?);
let certs = rustls_pemfile::certs(&mut pem).context("Error while parsing PEM")?;

let mut root_cert_store = RootCertStore::empty();
for cert in certs {
for cert in rustls_pemfile::certs(&mut pem) {
root_cert_store
.add(&Certificate(cert))
.add(cert.context("Error while parsing PEM")?)
.context("Failed to add cert to cert store")?;
}
Ok(root_cert_store)
Expand Down
17 changes: 8 additions & 9 deletions shotover-proxy/tests/redis_int_tests/basic_driver_tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::redis_int_tests::assert::*;
use bytes::BytesMut;
use fred::clients::RedisClient;
use fred::interfaces::ClientLike;
use futures::{Future, StreamExt};
use rand::{thread_rng, Rng};
use rand_distr::Alphanumeric;
Expand Down Expand Up @@ -1265,16 +1267,13 @@ pub async fn test_dr_auth() {

/// A driver variant of this test case is provided so that we can ensure that
/// at least one driver handles this as we expect.
pub async fn test_trigger_transform_failure_driver(connection: &mut Connection) {
/// Fred is used here as redis-rs sends an unconfigurable `CLIENT SETINFO` command and ignores the result on connection init.
/// This results in the error message being completely dropped with redis-rs.
pub async fn test_trigger_transform_failure_driver(client: &RedisClient) {
assert_eq!(
redis::cmd("SET")
.arg("foo")
.arg(42)
.query_async::<_, ()>(connection)
.await
.unwrap_err()
.to_string(),
"An error was signalled by the server - ResponseError: Internal shotover (or custom transform) bug: Chain failed to send and/or receive messages, the connection will now be closed. Caused by: 0: RedisSinkSingle transform failed 1: Failed to connect to destination \"127.0.0.1:1111\" 2: Connection refused (os error 111)".to_string()
// fred sends a `CLIENT` command on startup to which shotover will reply with an error
client.wait_for_connect().await.unwrap_err().details(),
"ERR Internal shotover (or custom transform) bug: Chain failed to send and/or receive messages, the connection will now be closed. Caused by: 0: RedisSinkSingle transform failed 1: Failed to connect to destination \"127.0.0.1:1111\" 2: Connection refused (os error 111)".to_string()
);
}

Expand Down
10 changes: 8 additions & 2 deletions shotover-proxy/tests/redis_int_tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use crate::shotover_process;
use basic_driver_tests::*;
use fred::clients::RedisClient;
use fred::interfaces::ClientLike;
use fred::types::RedisConfig;
use redis::aio::Connection;
use redis::Commands;

Expand Down Expand Up @@ -47,9 +50,12 @@ async fn passthrough_redis_down() {
let shotover = shotover_process("tests/test-configs/redis/passthrough/topology.yaml")
.start()
.await;
let mut connection = redis_connection::new_async("127.0.0.1", 6379).await;
let client = RedisClient::new(RedisConfig::default(), None, None, None);

test_trigger_transform_failure_driver(&mut connection).await;
{
let _shutdown_handle = client.connect();
test_trigger_transform_failure_driver(&client).await;
}
test_trigger_transform_failure_raw().await;

test_invalid_frame().await;
Expand Down
27 changes: 24 additions & 3 deletions shotover-proxy/tests/transforms/log_to_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,34 @@ async fn log_to_file() {
.start()
.await;

// CLIENT SETINFO requests sent by driver during connection handshake
let mut connection = redis_connection::new_async("127.0.0.1", 6379).await;
let request = std::fs::read("message-log/1/requests/message1.bin").unwrap();
assert_eq_string(
&request,
"*4\r\n$6\r\nCLIENT\r\n$7\r\nSETINFO\r\n$8\r\nLIB-NAME\r\n$8\r\nredis-rs\r\n",
);
let response = std::fs::read("message-log/1/responses/message1.bin").unwrap();
assert_eq_string(
&response,
"-ERR Unknown subcommand or wrong number of arguments for 'SETINFO'. Try CLIENT HELP\r\n",
);
let request = std::fs::read("message-log/1/requests/message2.bin").unwrap();
assert_eq_string(
&request,
"*4\r\n$6\r\nCLIENT\r\n$7\r\nSETINFO\r\n$7\r\nLIB-VER\r\n$6\r\n0.24.0\r\n",
);
let response = std::fs::read("message-log/1/responses/message2.bin").unwrap();
assert_eq_string(
&response,
"-ERR Unknown subcommand or wrong number of arguments for 'SETINFO'. Try CLIENT HELP\r\n",
);

// SET sent by command
assert_ok(redis::cmd("SET").arg("foo").arg(42), &mut connection).await;
let request = std::fs::read("message-log/1/requests/message1.bin").unwrap();
let request = std::fs::read("message-log/1/requests/message3.bin").unwrap();
assert_eq_string(&request, "*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$2\r\n42\r\n");

let response = std::fs::read("message-log/1/responses/message1.bin").unwrap();
let response = std::fs::read("message-log/1/responses/message3.bin").unwrap();
assert_eq_string(&response, "+OK\r\n");

shotover.shutdown_and_then_consume_events(&[]).await;
Expand Down
27 changes: 19 additions & 8 deletions shotover-proxy/tests/transforms/tee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,24 @@ async fn test_log_with_mismatch() {

assert_eq!("42", result);
shotover
.shutdown_and_then_consume_events(&[EventMatcher::new()
.with_level(Level::Warn)
.with_target("shotover::transforms::tee")
.with_message(
r#"Tee mismatch:
chain response: ["Redis BulkString(b\"42\"))"]
.shutdown_and_then_consume_events(&[
EventMatcher::new()
.with_level(Level::Warn)
.with_target("shotover::transforms::tee")
.with_message(
r#"Tee mismatch:
chain response: ["Redis BulkString(b\"42\"))", "Redis BulkString(b\"42\"))"]
tee response: ["Redis BulkString(b\"41\"))", "Redis BulkString(b\"41\"))"]"#,
),
EventMatcher::new()
.with_level(Level::Warn)
.with_target("shotover::transforms::tee")
.with_message(
r#"Tee mismatch:
chain response: ["Redis BulkString(b\"42\"))"]
tee response: ["Redis BulkString(b\"41\"))"]"#,
)])
),
])
.await;
}

Expand Down Expand Up @@ -287,6 +297,7 @@ async fn test_switch_main_chain() {
shotover
.shutdown_and_then_consume_events(&[EventMatcher::new()
.with_level(Level::Warn)
.with_count(tokio_bin_process::event_matcher::Count::Times(3))])
// 1 warning per loop above + 1 warning from the redis-rs driver connection handshake
.with_count(tokio_bin_process::event_matcher::Count::Times(4))])
.await;
}
4 changes: 2 additions & 2 deletions shotover/src/transforms/tee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ impl Transform for Tee {

if !chain_response.eq(&tee_response) {
debug!(
"Tee mismatch: \nchain response: {:?} \ntee response: {:?}",
"Tee mismatch:\nchain response: {:?}\ntee response: {:?}",
chain_response
.iter_mut()
.map(|m| m.to_high_level_string())
Expand Down Expand Up @@ -261,7 +261,7 @@ impl Transform for Tee {

if !chain_response.eq(&tee_response) {
warn!(
"Tee mismatch: \nchain response: {:?} \ntee response: {:?}",
"Tee mismatch:\nchain response: {:?}\ntee response: {:?}",
chain_response
.iter_mut()
.map(|m| m.to_high_level_string())
Expand Down
2 changes: 1 addition & 1 deletion test-helpers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ tracing-subscriber.workspace = true
serde_yaml.workspace = true
anyhow.workspace = true
rcgen.workspace = true
rdkafka = { version = "0.35", features = ["cmake-build"], optional = true }
rdkafka = { version = "0.36", features = ["cmake-build"], optional = true }
docker-compose-runner = "0.3.0"
# TODO: make scylla reexport these so we dont have to import random old versions
bigdecimal = "0.2.2"
Expand Down

0 comments on commit dd8155d

Please sign in to comment.