Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update redis-rs #1418

Merged
merged 1 commit into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 125 additions & 101 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ serde = { version = "1.0.111", features = ["derive"] }
serde_yaml = "0.9.17"
uuid = { version = "1.0.0", features = ["serde", "v4"] }
reqwest = "0.11.6"
redis = { version = "0.23.3", features = ["tokio-comp", "cluster"] }
redis = { version = "0.24.0", features = ["tokio-comp", "cluster"] }
cdrs-tokio = "8.0"
cassandra-protocol = "3.0"
tracing = "0.1.15"
Expand Down
5 changes: 3 additions & 2 deletions shotover-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ async-trait.workspace = true
tracing-subscriber.workspace = true
tracing-appender.workspace = true
async-once-cell = "0.5.2"
fred = { version = "7.0.0", features = ["enable-rustls"] }
fred = { version = "8.0.0", features = ["enable-rustls"] }
tokio-bin-process.workspace = true
rustls-pemfile = "1.0.2"
rustls-pemfile = "2.0.0"
rustls-pki-types = "1.1.0"
aws-throwaway.workspace = true
windsock = { path = "../windsock" }
regex = "1.7.0"
Expand Down
66 changes: 24 additions & 42 deletions shotover-proxy/benches/windsock/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ use async_trait::async_trait;
use aws_throwaway::Ec2Instance;
use fred::{
prelude::*,
rustls::{Certificate, ClientConfig, PrivateKey, RootCertStore},
rustls::{ClientConfig, RootCertStore},
};
use itertools::Itertools;
use rustls_pemfile::{certs, Item};
use rustls_pemfile::Item;
use rustls_pki_types::{CertificateDer, PrivateKeyDer};
use shotover::{
config::chain::TransformChainConfig,
sources::SourceConfig,
Expand Down Expand Up @@ -323,14 +324,14 @@ impl Bench for RedisBench {

let mut config = RedisConfig::from_url(address).unwrap();
if let Encryption::Tls = self.encryption {
let private_key = load_private_key("tests/test-configs/redis/tls/certs/localhost.key");
let certs = load_certs("tests/test-configs/redis/tls/certs/localhost.crt");
let private_key =
load_private_key("tests/test-configs/redis/tls/certs/localhost.key").unwrap();
let certs = load_certs("tests/test-configs/redis/tls/certs/localhost.crt").unwrap();
config.tls = Some(
ClientConfig::builder()
.with_safe_defaults()
.with_root_certificates(load_ca(
"tests/test-configs/redis/tls/certs/localhost_CA.crt",
))
.with_root_certificates(
load_ca("tests/test-configs/redis/tls/certs/localhost_CA.crt").unwrap(),
)
.with_client_auth_cert(certs, private_key)
.unwrap()
.into(),
Expand Down Expand Up @@ -378,48 +379,29 @@ impl Bench for RedisBench {
}
}

// TODO: when fred updates its rustls version recopy these functions from shotover/src/tls.rs

fn load_certs(path: &str) -> Vec<Certificate> {
load_certs_inner(path)
.with_context(|| format!("Failed to read certs at {path:?}"))
.unwrap()
}
fn load_certs_inner(path: &str) -> Result<Vec<Certificate>> {
certs(&mut BufReader::new(File::open(path)?))
fn load_certs(path: &str) -> Result<Vec<CertificateDer<'static>>> {
rustls_pemfile::certs(&mut BufReader::new(File::open(path)?))
.collect::<Result<Vec<_>, _>>()
.context("Error while parsing PEM")
.map(|certs| certs.into_iter().map(Certificate).collect())
}

fn load_private_key(path: &str) -> PrivateKey {
load_private_key_inner(path)
.with_context(|| format!("Failed to read private key at {path:?}"))
.unwrap()
}
fn load_private_key_inner(path: &str) -> Result<PrivateKey> {
let keys = rustls_pemfile::read_all(&mut BufReader::new(File::open(path)?))
.context("Error while parsing PEM")?;
keys.into_iter()
.find_map(|item| match item {
Item::RSAKey(x) | Item::PKCS8Key(x) => Some(PrivateKey(x)),
_ => None,
})
.ok_or_else(|| anyhow!("No suitable keys found in PEM"))
fn load_private_key(path: &str) -> Result<PrivateKeyDer<'static>> {
for key in rustls_pemfile::read_all(&mut BufReader::new(File::open(path)?)) {
match key.context("Error while parsing PEM")? {
Item::Pkcs8Key(x) => return Ok(x.into()),
Item::Pkcs1Key(x) => return Ok(x.into()),
_ => {}
}
}
Err(anyhow!("No suitable keys found in PEM"))
}

fn load_ca(path: &str) -> RootCertStore {
load_ca_inner(path)
.with_context(|| format!("Failed to load CA at {path:?}"))
.unwrap()
}
fn load_ca_inner(path: &str) -> Result<RootCertStore> {
fn load_ca(path: &str) -> Result<RootCertStore> {
let mut pem = BufReader::new(File::open(path)?);
let certs = rustls_pemfile::certs(&mut pem).context("Error while parsing PEM")?;

let mut root_cert_store = RootCertStore::empty();
for cert in certs {
for cert in rustls_pemfile::certs(&mut pem) {
root_cert_store
.add(&Certificate(cert))
.add(cert.context("Error while parsing PEM")?)
.context("Failed to add cert to cert store")?;
}
Ok(root_cert_store)
Expand Down
17 changes: 8 additions & 9 deletions shotover-proxy/tests/redis_int_tests/basic_driver_tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::redis_int_tests::assert::*;
use bytes::BytesMut;
use fred::clients::RedisClient;
use fred::interfaces::ClientLike;
use futures::{Future, StreamExt};
use rand::{thread_rng, Rng};
use rand_distr::Alphanumeric;
Expand Down Expand Up @@ -1265,16 +1267,13 @@ pub async fn test_dr_auth() {

/// A driver variant of this test case is provided so that we can ensure that
/// at least one driver handles this as we expect.
pub async fn test_trigger_transform_failure_driver(connection: &mut Connection) {
/// Fred is used here as redis-rs sends an unconfigurable `CLIENT SETINFO` command and ignores the result on connection init.
/// This results in the error message being completely dropped with redis-rs.
pub async fn test_trigger_transform_failure_driver(client: &RedisClient) {
assert_eq!(
redis::cmd("SET")
.arg("foo")
.arg(42)
.query_async::<_, ()>(connection)
.await
.unwrap_err()
.to_string(),
"An error was signalled by the server - ResponseError: Internal shotover (or custom transform) bug: Chain failed to send and/or receive messages, the connection will now be closed. Caused by: 0: RedisSinkSingle transform failed 1: Failed to connect to destination \"127.0.0.1:1111\" 2: Connection refused (os error 111)".to_string()
// fred sends a `CLIENT` command on startup to which shotover will reply with an error
client.wait_for_connect().await.unwrap_err().details(),
"ERR Internal shotover (or custom transform) bug: Chain failed to send and/or receive messages, the connection will now be closed. Caused by: 0: RedisSinkSingle transform failed 1: Failed to connect to destination \"127.0.0.1:1111\" 2: Connection refused (os error 111)".to_string()
);
}

Expand Down
10 changes: 8 additions & 2 deletions shotover-proxy/tests/redis_int_tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use crate::shotover_process;
use basic_driver_tests::*;
use fred::clients::RedisClient;
use fred::interfaces::ClientLike;
use fred::types::RedisConfig;
use redis::aio::Connection;
use redis::Commands;

Expand Down Expand Up @@ -47,9 +50,12 @@ async fn passthrough_redis_down() {
let shotover = shotover_process("tests/test-configs/redis/passthrough/topology.yaml")
.start()
.await;
let mut connection = redis_connection::new_async("127.0.0.1", 6379).await;
let client = RedisClient::new(RedisConfig::default(), None, None, None);

test_trigger_transform_failure_driver(&mut connection).await;
{
let _shutdown_handle = client.connect();
test_trigger_transform_failure_driver(&client).await;
}
test_trigger_transform_failure_raw().await;

test_invalid_frame().await;
Expand Down
5 changes: 3 additions & 2 deletions shotover-proxy/tests/runner/observability_int_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ async fn test_metrics() {
let shotover = shotover_process("tests/test-configs/null-redis/topology.yaml")
.start()
.await;
let mut connection = redis_connection::new_async("127.0.0.1", 6379).await;

// Expected string looks unnatural because it is sorted in alphabetical order to make it match the sorted error output
let expected = r#"
Expand Down Expand Up @@ -98,7 +97,8 @@ shotover_transform_total_count{transform="QueryCounter"}
"#;
assert_metrics_has_keys("", expected).await;

// Check we still get the metrics after sending a couple requests
let mut connection = redis_connection::new_async("127.0.0.1", 6379).await;

redis::cmd("SET")
.arg("the_key")
.arg(42)
Expand Down Expand Up @@ -131,6 +131,7 @@ shotover_chain_latency_seconds{chain="redis",client_details="127.0.0.1",quantile
shotover_chain_latency_seconds{chain="redis",client_details="127.0.0.1",quantile="0.99"}
shotover_chain_latency_seconds{chain="redis",client_details="127.0.0.1",quantile="0.999"}
shotover_chain_latency_seconds{chain="redis",client_details="127.0.0.1",quantile="1"}
shotover_query_count{name="redis-chain",query="CLIENT",type="redis"}
shotover_query_count{name="redis-chain",query="GET",type="redis"}
shotover_query_count{name="redis-chain",query="SET",type="redis"}
"#;
Expand Down
27 changes: 24 additions & 3 deletions shotover-proxy/tests/transforms/log_to_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,34 @@ async fn log_to_file() {
.start()
.await;

// CLIENT SETINFO requests sent by driver during connection handshake
let mut connection = redis_connection::new_async("127.0.0.1", 6379).await;
let request = std::fs::read("message-log/1/requests/message1.bin").unwrap();
assert_eq_string(
&request,
"*4\r\n$6\r\nCLIENT\r\n$7\r\nSETINFO\r\n$8\r\nLIB-NAME\r\n$8\r\nredis-rs\r\n",
);
let response = std::fs::read("message-log/1/responses/message1.bin").unwrap();
assert_eq_string(
&response,
"-ERR Unknown subcommand or wrong number of arguments for 'SETINFO'. Try CLIENT HELP\r\n",
);
let request = std::fs::read("message-log/1/requests/message2.bin").unwrap();
assert_eq_string(
&request,
"*4\r\n$6\r\nCLIENT\r\n$7\r\nSETINFO\r\n$7\r\nLIB-VER\r\n$6\r\n0.24.0\r\n",
);
let response = std::fs::read("message-log/1/responses/message2.bin").unwrap();
assert_eq_string(
&response,
"-ERR Unknown subcommand or wrong number of arguments for 'SETINFO'. Try CLIENT HELP\r\n",
);

// SET sent by command
assert_ok(redis::cmd("SET").arg("foo").arg(42), &mut connection).await;
let request = std::fs::read("message-log/1/requests/message1.bin").unwrap();
let request = std::fs::read("message-log/1/requests/message3.bin").unwrap();
assert_eq_string(&request, "*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$2\r\n42\r\n");

let response = std::fs::read("message-log/1/responses/message1.bin").unwrap();
let response = std::fs::read("message-log/1/responses/message3.bin").unwrap();
assert_eq_string(&response, "+OK\r\n");

shotover.shutdown_and_then_consume_events(&[]).await;
Expand Down
27 changes: 19 additions & 8 deletions shotover-proxy/tests/transforms/tee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,24 @@ async fn test_log_with_mismatch() {

assert_eq!("42", result);
shotover
.shutdown_and_then_consume_events(&[EventMatcher::new()
.with_level(Level::Warn)
.with_target("shotover::transforms::tee")
.with_message(
r#"Tee mismatch:
chain response: ["Redis BulkString(b\"42\"))"]
.shutdown_and_then_consume_events(&[
EventMatcher::new()
.with_level(Level::Warn)
.with_target("shotover::transforms::tee")
.with_message(
r#"Tee mismatch:
chain response: ["Redis BulkString(b\"42\"))", "Redis BulkString(b\"42\"))"]
tee response: ["Redis BulkString(b\"41\"))", "Redis BulkString(b\"41\"))"]"#,
),
EventMatcher::new()
.with_level(Level::Warn)
.with_target("shotover::transforms::tee")
.with_message(
r#"Tee mismatch:
chain response: ["Redis BulkString(b\"42\"))"]
tee response: ["Redis BulkString(b\"41\"))"]"#,
)])
),
])
.await;
}

Expand Down Expand Up @@ -287,6 +297,7 @@ async fn test_switch_main_chain() {
shotover
.shutdown_and_then_consume_events(&[EventMatcher::new()
.with_level(Level::Warn)
.with_count(tokio_bin_process::event_matcher::Count::Times(3))])
// 1 warning per loop above + 1 warning from the redis-rs driver connection handshake
.with_count(tokio_bin_process::event_matcher::Count::Times(4))])
.await;
}
4 changes: 2 additions & 2 deletions shotover/src/transforms/tee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ impl Transform for Tee {

if !chain_response.eq(&tee_response) {
debug!(
"Tee mismatch: \nchain response: {:?} \ntee response: {:?}",
"Tee mismatch:\nchain response: {:?}\ntee response: {:?}",
chain_response
.iter_mut()
.map(|m| m.to_high_level_string())
Expand Down Expand Up @@ -261,7 +261,7 @@ impl Transform for Tee {

if !chain_response.eq(&tee_response) {
warn!(
"Tee mismatch: \nchain response: {:?} \ntee response: {:?}",
"Tee mismatch:\nchain response: {:?}\ntee response: {:?}",
chain_response
.iter_mut()
.map(|m| m.to_high_level_string())
Expand Down
2 changes: 1 addition & 1 deletion test-helpers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ tracing-subscriber.workspace = true
serde_yaml.workspace = true
anyhow.workspace = true
rcgen.workspace = true
rdkafka = { version = "0.35", features = ["cmake-build"], optional = true }
rdkafka = { version = "0.36", features = ["cmake-build"], optional = true }
docker-compose-runner = "0.3.0"
# TODO: make scylla reexport these so we dont have to import random old versions
bigdecimal = "0.2.2"
Expand Down
Loading