diff --git a/custom-transforms-example/config/topology.yaml b/custom-transforms-example/config/topology.yaml index 64ea361a5..10c343f14 100644 --- a/custom-transforms-example/config/topology.yaml +++ b/custom-transforms-example/config/topology.yaml @@ -1,6 +1,6 @@ --- sources: - - Redis: + - Valkey: name: "redis" listen_addr: "127.0.0.1:6379" chain: diff --git a/custom-transforms-example/src/redis_get_rewrite.rs b/custom-transforms-example/src/redis_get_rewrite.rs index 735540794..ee6af5129 100644 --- a/custom-transforms-example/src/redis_get_rewrite.rs +++ b/custom-transforms-example/src/redis_get_rewrite.rs @@ -1,7 +1,7 @@ use anyhow::Result; use async_trait::async_trait; use serde::{Deserialize, Serialize}; -use shotover::frame::{Frame, MessageType, RedisFrame}; +use shotover::frame::{Frame, MessageType, ValkeyFrame}; use shotover::message::{MessageIdSet, Messages}; use shotover::transforms::{ ChainState, Transform, TransformBuilder, TransformConfig, TransformContextConfig, @@ -28,7 +28,7 @@ impl TransformConfig for RedisGetRewriteConfig { } fn up_chain_protocol(&self) -> UpChainProtocol { - UpChainProtocol::MustBeOneOf(vec![MessageType::Redis]) + UpChainProtocol::MustBeOneOf(vec![MessageType::Valkey]) } fn down_chain_protocol(&self) -> DownChainProtocol { @@ -95,8 +95,8 @@ impl Transform for RedisGetRewrite { } fn is_get(frame: &Frame) -> bool { - if let Frame::Redis(RedisFrame::Array(array)) = frame { - if let Some(RedisFrame::BulkString(first)) = array.first() { + if let Frame::Valkey(ValkeyFrame::Array(array)) = frame { + if let Some(ValkeyFrame::BulkString(first)) = array.first() { first.eq_ignore_ascii_case(b"GET") } else { false @@ -108,5 +108,5 @@ fn is_get(frame: &Frame) -> bool { fn rewrite_get(frame: &mut Frame, result: &str) { tracing::info!("Replaced {frame:?} with BulkString(\"{result}\")"); - *frame = Frame::Redis(RedisFrame::BulkString(result.to_owned().into())); + *frame = Frame::Valkey(ValkeyFrame::BulkString(result.to_owned().into())); } diff --git a/shotover-proxy/benches/windsock/redis/bench.rs b/shotover-proxy/benches/windsock/redis/bench.rs index 3ff8c359a..f4bd75bf0 100644 --- a/shotover-proxy/benches/windsock/redis/bench.rs +++ b/shotover-proxy/benches/windsock/redis/bench.rs @@ -130,15 +130,17 @@ impl RedisBench { } } - common::generate_topology(SourceConfig::Redis(shotover::sources::redis::RedisConfig { - name: "redis".to_owned(), - listen_addr: host_address, - connection_limit: None, - hard_connection_limit: None, - tls: tls_acceptor, - timeout: None, - chain: TransformChainConfig(transforms), - })) + common::generate_topology(SourceConfig::Valkey( + shotover::sources::redis::ValkeyConfig { + name: "redis".to_owned(), + listen_addr: host_address, + connection_limit: None, + hard_connection_limit: None, + tls: tls_acceptor, + timeout: None, + chain: TransformChainConfig(transforms), + }, + )) } async fn run_aws_shotover( diff --git a/shotover-proxy/config/topology.yaml b/shotover-proxy/config/topology.yaml index 20d74067b..9f357ee39 100644 --- a/shotover-proxy/config/topology.yaml +++ b/shotover-proxy/config/topology.yaml @@ -2,9 +2,9 @@ --- # The list of sources. sources: - # The source, change from Redis to the source type of the database protocol you are receiving messages in. + # The source, change from Valkey to the source type of the database protocol you are receiving messages in. # For a list of possible sources: https://docs.shotover.io/sources - - Redis: + - Valkey: name: "redis" listen_addr: "127.0.0.1:6379" chain: diff --git a/shotover-proxy/tests/redis_int_tests/basic_driver_tests.rs b/shotover-proxy/tests/redis_int_tests/basic_driver_tests.rs index 6c603ea76..ab5cd1606 100644 --- a/shotover-proxy/tests/redis_int_tests/basic_driver_tests.rs +++ b/shotover-proxy/tests/redis_int_tests/basic_driver_tests.rs @@ -10,7 +10,7 @@ use rand::{thread_rng, Rng}; use redis::aio::Connection; use redis::cluster::ClusterConnection; use redis::{AsyncCommands, Commands, ErrorKind, RedisError, Value}; -use shotover::frame::RedisFrame; +use shotover::frame::ValkeyFrame; use shotover::tcp; use std::collections::{HashMap, HashSet}; use std::thread::sleep; @@ -1304,7 +1304,7 @@ pub async fn test_trigger_transform_failure_raw() { assert_eq!( read_redis_message(&mut connection).await, - RedisFrame::Error(format!("ERR Internal shotover (or custom transform) bug: Chain failed to send and/or receive messages, the connection will now be closed. Caused by: 0: RedisSinkSingle transform failed 1: Failed to connect to destination 127.0.0.1:1111 2: Connection refused (os error {CONNECTION_REFUSED_OS_ERROR})").into()) + ValkeyFrame::Error(format!("ERR Internal shotover (or custom transform) bug: Chain failed to send and/or receive messages, the connection will now be closed. Caused by: 0: RedisSinkSingle transform failed 1: Failed to connect to destination 127.0.0.1:1111 2: Connection refused (os error {CONNECTION_REFUSED_OS_ERROR})").into()) ); // If the connection was closed by shotover then we will succesfully read 0 bytes. @@ -1317,7 +1317,7 @@ pub async fn test_trigger_transform_failure_raw() { assert_eq!(amount, 0); } -async fn read_redis_message(connection: &mut TcpStream) -> RedisFrame { +async fn read_redis_message(connection: &mut TcpStream) -> ValkeyFrame { let mut buffer = BytesMut::new(); loop { if let Ok(Some((result, len))) = diff --git a/shotover-proxy/tests/redis_int_tests/mod.rs b/shotover-proxy/tests/redis_int_tests/mod.rs index ac0e3575b..0388aed9a 100644 --- a/shotover-proxy/tests/redis_int_tests/mod.rs +++ b/shotover-proxy/tests/redis_int_tests/mod.rs @@ -23,7 +23,7 @@ fn invalid_frame_event() -> EventMatcher { .with_level(Level::Warn) .with_target("shotover::server") .with_message( - r#"failed to decode message: Error decoding redis frame + r#"failed to decode message: Error decoding valkey frame Caused by: Decode Error: frame_type: Invalid frame type."#, diff --git a/shotover-proxy/tests/runner/observability_int_tests.rs b/shotover-proxy/tests/runner/observability_int_tests.rs index dab8f7591..f510f1323 100644 --- a/shotover-proxy/tests/runner/observability_int_tests.rs +++ b/shotover-proxy/tests/runner/observability_int_tests.rs @@ -129,19 +129,19 @@ shotover_chain_latency_seconds{chain="redis",client_details="127.0.0.1",quantile shotover_chain_latency_seconds{chain="redis",client_details="127.0.0.1",quantile="0.99"} shotover_chain_latency_seconds{chain="redis",client_details="127.0.0.1",quantile="0.999"} shotover_chain_latency_seconds{chain="redis",client_details="127.0.0.1",quantile="1"} -shotover_query_count{name="redis-chain",query="CLIENT",type="redis"} -shotover_query_count{name="redis-chain",query="GET",type="redis"} -shotover_query_count{name="redis-chain",query="SET",type="redis"} +shotover_query_count{name="redis-chain",query="CLIENT",type="valkey"} +shotover_query_count{name="redis-chain",query="GET",type="valkey"} +shotover_query_count{name="redis-chain",query="SET",type="valkey"} "#; assert_metrics_has_keys(expected, expected_new).await; assert_metrics_key_value( - r#"shotover_query_count{name="redis-chain",query="GET",type="redis"}"#, + r#"shotover_query_count{name="redis-chain",query="GET",type="valkey"}"#, "1", ) .await; assert_metrics_key_value( - r#"shotover_query_count{name="redis-chain",query="SET",type="redis"}"#, + r#"shotover_query_count{name="redis-chain",query="SET",type="valkey"}"#, "2", ) .await; diff --git a/shotover-proxy/tests/runner/runner_int_tests.rs b/shotover-proxy/tests/runner/runner_int_tests.rs index 97a0b206a..5400c79ce 100644 --- a/shotover-proxy/tests/runner/runner_int_tests.rs +++ b/shotover-proxy/tests/runner/runner_int_tests.rs @@ -123,7 +123,7 @@ async fn test_shotover_shutdown_when_protocol_mismatch() { Caused by: Topology errors - Transform RedisSinkSingle requires upchain protocol to be one of [Redis] but was Cassandra + Transform RedisSinkSingle requires upchain protocol to be one of [Valkey] but was Cassandra "#, )]) .await; diff --git a/shotover-proxy/tests/test-configs/invalid_non_terminating_last.yaml b/shotover-proxy/tests/test-configs/invalid_non_terminating_last.yaml index 01e102083..150bc64c9 100644 --- a/shotover-proxy/tests/test-configs/invalid_non_terminating_last.yaml +++ b/shotover-proxy/tests/test-configs/invalid_non_terminating_last.yaml @@ -1,6 +1,6 @@ --- sources: - - Redis: + - Valkey: name: "redis" listen_addr: "127.0.0.1:6379" chain: diff --git a/shotover-proxy/tests/test-configs/invalid_subchains.yaml b/shotover-proxy/tests/test-configs/invalid_subchains.yaml index ec007501a..e2d888270 100644 --- a/shotover-proxy/tests/test-configs/invalid_subchains.yaml +++ b/shotover-proxy/tests/test-configs/invalid_subchains.yaml @@ -1,13 +1,13 @@ --- sources: - - Redis: + - Valkey: name: "redis1" listen_addr: "127.0.0.1:6379" chain: - NullSink - NullSink - DebugPrinter - - Redis: + - Valkey: name: "redis2" listen_addr: "127.0.0.1:6379" chain: diff --git a/shotover-proxy/tests/test-configs/invalid_terminating_not_last.yaml b/shotover-proxy/tests/test-configs/invalid_terminating_not_last.yaml index be13a9428..59f33c30e 100644 --- a/shotover-proxy/tests/test-configs/invalid_terminating_not_last.yaml +++ b/shotover-proxy/tests/test-configs/invalid_terminating_not_last.yaml @@ -1,6 +1,6 @@ --- sources: - - Redis: + - Valkey: name: "redis" listen_addr: "127.0.0.1:6379" chain: diff --git a/shotover-proxy/tests/test-configs/log-to-file/topology.yaml b/shotover-proxy/tests/test-configs/log-to-file/topology.yaml index 39ec9672e..06451cc70 100644 --- a/shotover-proxy/tests/test-configs/log-to-file/topology.yaml +++ b/shotover-proxy/tests/test-configs/log-to-file/topology.yaml @@ -1,6 +1,6 @@ --- sources: - - Redis: + - Valkey: name: "redis" listen_addr: "127.0.0.1:6379" chain: diff --git a/shotover-proxy/tests/test-configs/null-redis/topology.yaml b/shotover-proxy/tests/test-configs/null-redis/topology.yaml index 988519216..fb11e49ae 100644 --- a/shotover-proxy/tests/test-configs/null-redis/topology.yaml +++ b/shotover-proxy/tests/test-configs/null-redis/topology.yaml @@ -1,6 +1,6 @@ --- sources: - - Redis: + - Valkey: name: "redis" listen_addr: "127.0.0.1:6379" chain: diff --git a/shotover-proxy/tests/test-configs/query_type_filter/simple.yaml b/shotover-proxy/tests/test-configs/query_type_filter/simple.yaml index 39131b4bb..7ff4dc357 100644 --- a/shotover-proxy/tests/test-configs/query_type_filter/simple.yaml +++ b/shotover-proxy/tests/test-configs/query_type_filter/simple.yaml @@ -1,6 +1,6 @@ --- sources: - - Redis: + - Valkey: name: "redis1" listen_addr: "127.0.0.1:6379" connection_limit: 3000000 @@ -8,8 +8,8 @@ sources: - QueryTypeFilter: DenyList: [Write] - DebugReturner: - Redis: "42" - - Redis: + Valkey: "42" + - Valkey: name: "redis2" listen_addr: "127.0.0.1:6380" connection_limit: 3000000 @@ -17,4 +17,4 @@ sources: - QueryTypeFilter: AllowList: [Read] - DebugReturner: - Redis: "42" + Valkey: "42" diff --git a/shotover-proxy/tests/test-configs/redis/cluster-auth/topology.yaml b/shotover-proxy/tests/test-configs/redis/cluster-auth/topology.yaml index 2633233aa..70a0b09c1 100644 --- a/shotover-proxy/tests/test-configs/redis/cluster-auth/topology.yaml +++ b/shotover-proxy/tests/test-configs/redis/cluster-auth/topology.yaml @@ -1,6 +1,6 @@ --- sources: - - Redis: + - Valkey: name: "redis" listen_addr: "127.0.0.1:6379" chain: diff --git a/shotover-proxy/tests/test-configs/redis/cluster-dr/topology.yaml b/shotover-proxy/tests/test-configs/redis/cluster-dr/topology.yaml index 1ec8b686c..1e40d7989 100644 --- a/shotover-proxy/tests/test-configs/redis/cluster-dr/topology.yaml +++ b/shotover-proxy/tests/test-configs/redis/cluster-dr/topology.yaml @@ -1,7 +1,7 @@ # This example will replicate all commands to the DR datacenter on a best effort basis --- sources: - - Redis: + - Valkey: name: "redis" listen_addr: "127.0.0.1:6379" connection_limit: 3000000 diff --git a/shotover-proxy/tests/test-configs/redis/cluster-handling/topology.yaml b/shotover-proxy/tests/test-configs/redis/cluster-handling/topology.yaml index cb4db09a5..355c71c6b 100644 --- a/shotover-proxy/tests/test-configs/redis/cluster-handling/topology.yaml +++ b/shotover-proxy/tests/test-configs/redis/cluster-handling/topology.yaml @@ -1,6 +1,6 @@ --- sources: - - Redis: + - Valkey: name: "redis" listen_addr: "127.0.0.1:6379" chain: diff --git a/shotover-proxy/tests/test-configs/redis/cluster-hiding/topology-encode.yaml b/shotover-proxy/tests/test-configs/redis/cluster-hiding/topology-encode.yaml index 9c20a0d30..a9ea2bae0 100644 --- a/shotover-proxy/tests/test-configs/redis/cluster-hiding/topology-encode.yaml +++ b/shotover-proxy/tests/test-configs/redis/cluster-hiding/topology-encode.yaml @@ -1,6 +1,6 @@ --- sources: - - Redis: + - Valkey: name: "redis" listen_addr: "127.0.0.1:6379" chain: diff --git a/shotover-proxy/tests/test-configs/redis/cluster-hiding/topology.yaml b/shotover-proxy/tests/test-configs/redis/cluster-hiding/topology.yaml index 8ba91bcdb..2f4aae166 100644 --- a/shotover-proxy/tests/test-configs/redis/cluster-hiding/topology.yaml +++ b/shotover-proxy/tests/test-configs/redis/cluster-hiding/topology.yaml @@ -1,6 +1,6 @@ --- sources: - - Redis: + - Valkey: name: "redis" listen_addr: "127.0.0.1:6379" chain: diff --git a/shotover-proxy/tests/test-configs/redis/cluster-ports-rewrite/topology.yaml b/shotover-proxy/tests/test-configs/redis/cluster-ports-rewrite/topology.yaml index a315d64e3..17f5a8d64 100644 --- a/shotover-proxy/tests/test-configs/redis/cluster-ports-rewrite/topology.yaml +++ b/shotover-proxy/tests/test-configs/redis/cluster-ports-rewrite/topology.yaml @@ -1,6 +1,6 @@ --- sources: - - Redis: + - Valkey: name: "redis" listen_addr: "127.0.0.1:6380" chain: diff --git a/shotover-proxy/tests/test-configs/redis/cluster-tls/topology-encode.yaml b/shotover-proxy/tests/test-configs/redis/cluster-tls/topology-encode.yaml index 9e77c323a..6ace09980 100644 --- a/shotover-proxy/tests/test-configs/redis/cluster-tls/topology-encode.yaml +++ b/shotover-proxy/tests/test-configs/redis/cluster-tls/topology-encode.yaml @@ -1,6 +1,6 @@ --- sources: - - Redis: + - Valkey: name: "redis" listen_addr: "127.0.0.1:6379" tls: diff --git a/shotover-proxy/tests/test-configs/redis/cluster-tls/topology-no-source-encryption.yaml b/shotover-proxy/tests/test-configs/redis/cluster-tls/topology-no-source-encryption.yaml index 54d3bbdea..5ba5e954d 100644 --- a/shotover-proxy/tests/test-configs/redis/cluster-tls/topology-no-source-encryption.yaml +++ b/shotover-proxy/tests/test-configs/redis/cluster-tls/topology-no-source-encryption.yaml @@ -1,6 +1,6 @@ --- sources: - - Redis: + - Valkey: name: "redis" listen_addr: "127.0.0.1:6379" chain: diff --git a/shotover-proxy/tests/test-configs/redis/cluster-tls/topology.yaml b/shotover-proxy/tests/test-configs/redis/cluster-tls/topology.yaml index f29fb892f..a71886b13 100644 --- a/shotover-proxy/tests/test-configs/redis/cluster-tls/topology.yaml +++ b/shotover-proxy/tests/test-configs/redis/cluster-tls/topology.yaml @@ -1,6 +1,6 @@ --- sources: - - Redis: + - Valkey: name: "redis" listen_addr: "127.0.0.1:6379" tls: diff --git a/shotover-proxy/tests/test-configs/redis/passthrough/topology-encode.yaml b/shotover-proxy/tests/test-configs/redis/passthrough/topology-encode.yaml index 36304a3ff..3a5f35aa1 100644 --- a/shotover-proxy/tests/test-configs/redis/passthrough/topology-encode.yaml +++ b/shotover-proxy/tests/test-configs/redis/passthrough/topology-encode.yaml @@ -1,6 +1,6 @@ --- sources: - - Redis: + - Valkey: name: "redis" listen_addr: "127.0.0.1:6379" chain: diff --git a/shotover-proxy/tests/test-configs/redis/passthrough/topology.yaml b/shotover-proxy/tests/test-configs/redis/passthrough/topology.yaml index 93968e3b0..3b7f1726b 100644 --- a/shotover-proxy/tests/test-configs/redis/passthrough/topology.yaml +++ b/shotover-proxy/tests/test-configs/redis/passthrough/topology.yaml @@ -1,6 +1,6 @@ --- sources: - - Redis: + - Valkey: name: "redis" listen_addr: "127.0.0.1:6379" chain: diff --git a/shotover-proxy/tests/test-configs/redis/tls-no-client-auth/topology.yaml b/shotover-proxy/tests/test-configs/redis/tls-no-client-auth/topology.yaml index 124642b4a..523b2a7d9 100644 --- a/shotover-proxy/tests/test-configs/redis/tls-no-client-auth/topology.yaml +++ b/shotover-proxy/tests/test-configs/redis/tls-no-client-auth/topology.yaml @@ -1,6 +1,6 @@ --- sources: - - Redis: + - Valkey: name: "redis" listen_addr: "127.0.0.1:6379" tls: diff --git a/shotover-proxy/tests/test-configs/redis/tls-no-verify-hostname/topology.yaml b/shotover-proxy/tests/test-configs/redis/tls-no-verify-hostname/topology.yaml index 2ae2d207b..ebfaed89d 100644 --- a/shotover-proxy/tests/test-configs/redis/tls-no-verify-hostname/topology.yaml +++ b/shotover-proxy/tests/test-configs/redis/tls-no-verify-hostname/topology.yaml @@ -1,6 +1,6 @@ --- sources: - - Redis: + - Valkey: name: "redis" listen_addr: "127.0.0.1:6379" tls: diff --git a/shotover-proxy/tests/test-configs/redis/tls/topology-encode.yaml b/shotover-proxy/tests/test-configs/redis/tls/topology-encode.yaml index 4699b0f5d..e2a056933 100644 --- a/shotover-proxy/tests/test-configs/redis/tls/topology-encode.yaml +++ b/shotover-proxy/tests/test-configs/redis/tls/topology-encode.yaml @@ -1,6 +1,6 @@ --- sources: - - Redis: + - Valkey: name: "redis" listen_addr: "127.0.0.1:6379" tls: diff --git a/shotover-proxy/tests/test-configs/redis/tls/topology.yaml b/shotover-proxy/tests/test-configs/redis/tls/topology.yaml index fcb841a53..629eab380 100644 --- a/shotover-proxy/tests/test-configs/redis/tls/topology.yaml +++ b/shotover-proxy/tests/test-configs/redis/tls/topology.yaml @@ -1,6 +1,6 @@ --- sources: - - Redis: + - Valkey: name: "redis" listen_addr: "127.0.0.1:6379" tls: diff --git a/shotover-proxy/tests/test-configs/tee/fail.yaml b/shotover-proxy/tests/test-configs/tee/fail.yaml index b30b69c8f..84af5f1c4 100644 --- a/shotover-proxy/tests/test-configs/tee/fail.yaml +++ b/shotover-proxy/tests/test-configs/tee/fail.yaml @@ -1,6 +1,6 @@ --- sources: - - Redis: + - Valkey: name: "redis" listen_addr: "127.0.0.1:6379" connection_limit: 3000000 @@ -12,6 +12,6 @@ sources: - QueryTypeFilter: DenyList: [Read] - DebugReturner: - Redis: "42" + Valkey: "42" - DebugReturner: - Redis: "42" + Valkey: "42" diff --git a/shotover-proxy/tests/test-configs/tee/fail_with_mismatch.yaml b/shotover-proxy/tests/test-configs/tee/fail_with_mismatch.yaml index d735a235c..fec8798ec 100644 --- a/shotover-proxy/tests/test-configs/tee/fail_with_mismatch.yaml +++ b/shotover-proxy/tests/test-configs/tee/fail_with_mismatch.yaml @@ -1,6 +1,6 @@ --- sources: - - Redis: + - Valkey: name: "redis" listen_addr: "127.0.0.1:6379" connection_limit: 3000000 @@ -12,6 +12,6 @@ sources: - QueryTypeFilter: DenyList: [Read] - DebugReturner: - Redis: "41" + Valkey: "41" - DebugReturner: - Redis: "42" + Valkey: "42" diff --git a/shotover-proxy/tests/test-configs/tee/ignore.yaml b/shotover-proxy/tests/test-configs/tee/ignore.yaml index 2fbecae5d..22966bc99 100644 --- a/shotover-proxy/tests/test-configs/tee/ignore.yaml +++ b/shotover-proxy/tests/test-configs/tee/ignore.yaml @@ -1,6 +1,6 @@ --- sources: - - Redis: + - Valkey: name: "redis" listen_addr: "127.0.0.1:6379" connection_limit: 3000000 @@ -12,6 +12,6 @@ sources: - QueryTypeFilter: DenyList: [Read] - DebugReturner: - Redis: "42" + Valkey: "42" - DebugReturner: - Redis: "42" + Valkey: "42" diff --git a/shotover-proxy/tests/test-configs/tee/ignore_with_mismatch.yaml b/shotover-proxy/tests/test-configs/tee/ignore_with_mismatch.yaml index 8a0d859c7..5f0fb21a8 100644 --- a/shotover-proxy/tests/test-configs/tee/ignore_with_mismatch.yaml +++ b/shotover-proxy/tests/test-configs/tee/ignore_with_mismatch.yaml @@ -1,6 +1,6 @@ --- sources: - - Redis: + - Valkey: name: "redis" listen_addr: "127.0.0.1:6379" connection_limit: 3000000 @@ -12,6 +12,6 @@ sources: - QueryTypeFilter: DenyList: [Read] - DebugReturner: - Redis: "41" + Valkey: "41" - DebugReturner: - Redis: "42" + Valkey: "42" diff --git a/shotover-proxy/tests/test-configs/tee/log.yaml b/shotover-proxy/tests/test-configs/tee/log.yaml index 5aff0384d..36e042c19 100644 --- a/shotover-proxy/tests/test-configs/tee/log.yaml +++ b/shotover-proxy/tests/test-configs/tee/log.yaml @@ -1,6 +1,6 @@ --- sources: - - Redis: + - Valkey: name: "redis" listen_addr: "127.0.0.1:6379" connection_limit: 3000000 @@ -12,6 +12,6 @@ sources: - QueryTypeFilter: DenyList: [Read] - DebugReturner: - Redis: "42" + Valkey: "42" - DebugReturner: - Redis: "42" + Valkey: "42" diff --git a/shotover-proxy/tests/test-configs/tee/log_with_mismatch.yaml b/shotover-proxy/tests/test-configs/tee/log_with_mismatch.yaml index 10305bb4a..5320244f2 100644 --- a/shotover-proxy/tests/test-configs/tee/log_with_mismatch.yaml +++ b/shotover-proxy/tests/test-configs/tee/log_with_mismatch.yaml @@ -1,6 +1,6 @@ --- sources: - - Redis: + - Valkey: name: "redis" listen_addr: "127.0.0.1:6379" connection_limit: 3000000 @@ -12,6 +12,6 @@ sources: - QueryTypeFilter: DenyList: [Read] - DebugReturner: - Redis: "41" + Valkey: "41" - DebugReturner: - Redis: "42" + Valkey: "42" diff --git a/shotover-proxy/tests/test-configs/tee/subchain.yaml b/shotover-proxy/tests/test-configs/tee/subchain.yaml index 9bc3bb6cb..f9e542162 100644 --- a/shotover-proxy/tests/test-configs/tee/subchain.yaml +++ b/shotover-proxy/tests/test-configs/tee/subchain.yaml @@ -1,6 +1,6 @@ --- sources: - - Redis: + - Valkey: name: "redis" listen_addr: "127.0.0.1:6379" connection_limit: 3000000 @@ -16,6 +16,6 @@ sources: buffer_size: 10000 chain: - DebugReturner: - Redis: "42" + Valkey: "42" - DebugReturner: - Redis: "42" + Valkey: "42" diff --git a/shotover-proxy/tests/test-configs/tee/subchain_with_mismatch.yaml b/shotover-proxy/tests/test-configs/tee/subchain_with_mismatch.yaml index dd02461ef..afe89c0ad 100644 --- a/shotover-proxy/tests/test-configs/tee/subchain_with_mismatch.yaml +++ b/shotover-proxy/tests/test-configs/tee/subchain_with_mismatch.yaml @@ -1,6 +1,6 @@ --- sources: - - Redis: + - Valkey: name: "redis" listen_addr: "127.0.0.1:6379" connection_limit: 3000000 @@ -18,6 +18,6 @@ sources: - QueryTypeFilter: DenyList: [Read] - DebugReturner: - Redis: "41" + Valkey: "41" - DebugReturner: - Redis: "42" + Valkey: "42" diff --git a/shotover-proxy/tests/test-configs/tee/switch_chain.yaml b/shotover-proxy/tests/test-configs/tee/switch_chain.yaml index f6c435ff0..d68af33ce 100644 --- a/shotover-proxy/tests/test-configs/tee/switch_chain.yaml +++ b/shotover-proxy/tests/test-configs/tee/switch_chain.yaml @@ -1,6 +1,6 @@ --- sources: - - Redis: + - Valkey: name: "redis-1" listen_addr: "127.0.0.1:6371" connection_limit: @@ -11,10 +11,10 @@ sources: switch_port: 1231 chain: - DebugReturner: - Redis: "b" + Valkey: "b" - DebugReturner: - Redis: "a" - - Redis: + Valkey: "a" + - Valkey: name: "redis-2" listen_addr: "127.0.0.1:6372" connection_limit: @@ -27,10 +27,10 @@ sources: switch_port: 1232 chain: - DebugReturner: - Redis: "b" + Valkey: "b" - DebugReturner: - Redis: "a" - - Redis: + Valkey: "a" + - Valkey: name: "redis-3" listen_addr: "127.0.0.1:6373" connection_limit: @@ -41,6 +41,6 @@ sources: switch_port: 1233 chain: - DebugReturner: - Redis: "b" + Valkey: "b" - DebugReturner: - Redis: "a" + Valkey: "a" diff --git a/shotover-proxy/tests/transforms/tee.rs b/shotover-proxy/tests/transforms/tee.rs index d995fe266..c4a1f5a0b 100644 --- a/shotover-proxy/tests/transforms/tee.rs +++ b/shotover-proxy/tests/transforms/tee.rs @@ -84,8 +84,8 @@ async fn test_log_with_mismatch() { .with_target("shotover::transforms::tee") .with_message( r#"Tee mismatch: -result-source response: Redis BulkString(b"42") -other response: Redis BulkString(b"41")"#, +result-source response: Valkey BulkString(b"42") +other response: Valkey BulkString(b"41")"#, )]) .await; } @@ -255,8 +255,8 @@ async fn test_switch_main_chain() { .with_target("shotover::transforms::tee") .with_message( r#"Tee mismatch: -result-source response: Redis BulkString(b"a") -other response: Redis BulkString(b"b")"#, +result-source response: Valkey BulkString(b"a") +other response: Valkey BulkString(b"b")"#, ), EventMatcher::new() .with_level(Level::Warn) @@ -265,8 +265,8 @@ other response: Redis BulkString(b"b")"#, .with_target("shotover::transforms::tee") .with_message( r#"Tee mismatch: -result-source response: Redis BulkString(b"b") -other response: Redis BulkString(b"a")"#, +result-source response: Valkey BulkString(b"b") +other response: Valkey BulkString(b"a")"#, ), ]) .await; diff --git a/shotover/benches/benches/chain.rs b/shotover/benches/benches/chain.rs index 7bb0055fb..9b783be8e 100644 --- a/shotover/benches/benches/chain.rs +++ b/shotover/benches/benches/chain.rs @@ -6,7 +6,7 @@ use hex_literal::hex; use shotover::codec::CodecState; use shotover::frame::cassandra::{parse_statement_single, Tracing}; use shotover::frame::{CassandraFrame, CassandraOperation, Frame}; -use shotover::frame::{MessageType, RedisFrame}; +use shotover::frame::{MessageType, ValkeyFrame}; use shotover::message::{Message, MessageIdMap, QueryType}; use shotover::transforms::cassandra::peers_rewrite::CassandraPeersRewrite; use shotover::transforms::chain::{TransformChain, TransformChainBuilder}; @@ -33,7 +33,7 @@ fn criterion_benchmark(c: &mut Criterion) { { let chain = TransformChainBuilder::new(vec![Box::::default()], "bench"); let chain_state = ChainState::new_with_addr( - vec![Message::from_frame(Frame::Redis(RedisFrame::Null))], + vec![Message::from_frame(Frame::Valkey(ValkeyFrame::Null))], "127.0.0.1:6379".parse().unwrap(), ); @@ -49,7 +49,7 @@ fn criterion_benchmark(c: &mut Criterion) { { let chain = TransformChainBuilder::new(vec![Box::::default()], "bench"); let chain_state = ChainState::new_with_addr( - vec![Message::from_frame(Frame::Redis(RedisFrame::Null))], + vec![Message::from_frame(Frame::Valkey(ValkeyFrame::Null))], "127.0.0.1:6379".parse().unwrap(), ); @@ -69,20 +69,20 @@ fn criterion_benchmark(c: &mut Criterion) { filter: Filter::DenyList(vec![QueryType::Read]), filtered_requests: MessageIdMap::default(), }), - Box::new(DebugReturner::new(Response::Redis("a".into()))), + Box::new(DebugReturner::new(Response::Valkey("a".into()))), ], "bench", ); let chain_state = ChainState::new_with_addr( vec![ - Message::from_frame(Frame::Redis(RedisFrame::Array(vec![ - RedisFrame::BulkString(Bytes::from_static(b"SET")), - RedisFrame::BulkString(Bytes::from_static(b"foo")), - RedisFrame::BulkString(Bytes::from_static(b"bar")), + Message::from_frame(Frame::Valkey(ValkeyFrame::Array(vec![ + ValkeyFrame::BulkString(Bytes::from_static(b"SET")), + ValkeyFrame::BulkString(Bytes::from_static(b"foo")), + ValkeyFrame::BulkString(Bytes::from_static(b"bar")), ]))), - Message::from_frame(Frame::Redis(RedisFrame::Array(vec![ - RedisFrame::BulkString(Bytes::from_static(b"GET")), - RedisFrame::BulkString(Bytes::from_static(b"foo")), + Message::from_frame(Frame::Valkey(ValkeyFrame::Array(vec![ + ValkeyFrame::BulkString(Bytes::from_static(b"GET")), + ValkeyFrame::BulkString(Bytes::from_static(b"foo")), ]))), ], "127.0.0.1:6379".parse().unwrap(), @@ -106,11 +106,13 @@ fn criterion_benchmark(c: &mut Criterion) { "bench", ); let chain_state = ChainState::new_with_addr( - vec![Message::from_frame(Frame::Redis(RedisFrame::Array(vec![ - RedisFrame::BulkString(Bytes::from_static(b"SET")), - RedisFrame::BulkString(Bytes::from_static(b"foo")), - RedisFrame::BulkString(Bytes::from_static(b"bar")), - ])))], + vec![Message::from_frame(Frame::Valkey(ValkeyFrame::Array( + vec![ + ValkeyFrame::BulkString(Bytes::from_static(b"SET")), + ValkeyFrame::BulkString(Bytes::from_static(b"foo")), + ValkeyFrame::BulkString(Bytes::from_static(b"bar")), + ], + )))], "127.0.0.1:6379".parse().unwrap(), ); @@ -133,7 +135,7 @@ fn criterion_benchmark(c: &mut Criterion) { } .get_builder(TransformContextConfig { chain_name: "".into(), - up_chain_protocol: MessageType::Redis, + up_chain_protocol: MessageType::Valkey, }), ) .unwrap(), @@ -239,7 +241,7 @@ fn criterion_benchmark(c: &mut Criterion) { } .get_builder(TransformContextConfig { chain_name: "".into(), - up_chain_protocol: MessageType::Redis, + up_chain_protocol: MessageType::Valkey, }), ) .unwrap(), @@ -283,14 +285,14 @@ fn criterion_benchmark(c: &mut Criterion) { ); let chain_state = ChainState::new_with_addr( vec![ - Message::from_frame(Frame::Redis(RedisFrame::Array(vec![ - RedisFrame::BulkString(Bytes::from_static(b"SET")), - RedisFrame::BulkString(Bytes::from_static(b"foo")), - RedisFrame::BulkString(Bytes::from_static(b"bar")), + Message::from_frame(Frame::Valkey(ValkeyFrame::Array(vec![ + ValkeyFrame::BulkString(Bytes::from_static(b"SET")), + ValkeyFrame::BulkString(Bytes::from_static(b"foo")), + ValkeyFrame::BulkString(Bytes::from_static(b"bar")), ]))), - Message::from_frame(Frame::Redis(RedisFrame::Array(vec![ - RedisFrame::BulkString(Bytes::from_static(b"GET")), - RedisFrame::BulkString(Bytes::from_static(b"foo")), + Message::from_frame(Frame::Valkey(ValkeyFrame::Array(vec![ + ValkeyFrame::BulkString(Bytes::from_static(b"GET")), + ValkeyFrame::BulkString(Bytes::from_static(b"foo")), ]))), ], "127.0.0.1:6379".parse().unwrap(), diff --git a/shotover/src/codec/mod.rs b/shotover/src/codec/mod.rs index 3d5036b79..e17024984 100644 --- a/shotover/src/codec/mod.rs +++ b/shotover/src/codec/mod.rs @@ -59,7 +59,7 @@ pub enum CodecState { compression: Compression, }, #[cfg(feature = "redis")] - Redis, + Valkey, #[cfg(feature = "kafka")] Kafka(KafkaCodecState), Dummy, diff --git a/shotover/src/codec/redis.rs b/shotover/src/codec/redis.rs index 8edbfe864..3b81d2e3a 100644 --- a/shotover/src/codec/redis.rs +++ b/shotover/src/codec/redis.rs @@ -3,7 +3,7 @@ use std::time::Instant; use super::{CodecWriteError, Direction}; use crate::codec::{CodecBuilder, CodecReadError}; -use crate::frame::{Frame, MessageType, RedisFrame}; +use crate::frame::{Frame, MessageType, ValkeyFrame}; use crate::message::{Encodable, Message, MessageId, Messages}; use anyhow::{anyhow, Result}; use bytes::BytesMut; @@ -13,14 +13,14 @@ use redis_protocol::resp2::encode::extend_encode; use tokio_util::codec::{Decoder, Encoder}; #[derive(Clone)] -pub struct RedisCodecBuilder { +pub struct ValkeyCodecBuilder { direction: Direction, message_latency: Histogram, } -impl CodecBuilder for RedisCodecBuilder { - type Decoder = RedisDecoder; - type Encoder = RedisEncoder; +impl CodecBuilder for ValkeyCodecBuilder { + type Decoder = ValkeyDecoder; + type Encoder = ValkeyEncoder; fn new(direction: Direction, destination_name: String) -> Self { let message_latency = super::message_latency(direction, destination_name); @@ -30,7 +30,7 @@ impl CodecBuilder for RedisCodecBuilder { } } - fn build(&self) -> (RedisDecoder, RedisEncoder) { + fn build(&self) -> (ValkeyDecoder, ValkeyEncoder) { let (tx, rx) = match self.direction { Direction::Source => (None, None), Direction::Sink => { @@ -39,13 +39,13 @@ impl CodecBuilder for RedisCodecBuilder { } }; ( - RedisDecoder::new(rx, self.direction), - RedisEncoder::new(tx, self.direction, self.message_latency.clone()), + ValkeyDecoder::new(rx, self.direction), + ValkeyEncoder::new(tx, self.direction, self.message_latency.clone()), ) } fn protocol(&self) -> MessageType { - MessageType::Redis + MessageType::Valkey } } @@ -58,27 +58,27 @@ pub enum RequestType { Subscribe, /// a unsubscribe Unsubscribe, - /// redis reset + /// valkey reset Reset, /// Everything else Other, } -pub struct RedisEncoder { +pub struct ValkeyEncoder { // Some when Sink (because it sends requests) request_header_tx: Option>, direction: Direction, message_latency: Histogram, } -pub struct RedisDecoder { +pub struct ValkeyDecoder { // Some when Sink (because it receives responses) request_header_rx: Option>, direction: Direction, is_subscribed: bool, } -impl RedisDecoder { +impl ValkeyDecoder { pub fn new( request_header_rx: Option>, direction: Direction, @@ -91,24 +91,24 @@ impl RedisDecoder { } } -impl Decoder for RedisDecoder { +impl Decoder for ValkeyDecoder { type Item = Messages; type Error = CodecReadError; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { let received_at = Instant::now(); - match decode_bytes_mut(src) - .map_err(|e| CodecReadError::Parser(anyhow!(e).context("Error decoding redis frame")))? - { + match decode_bytes_mut(src).map_err(|e| { + CodecReadError::Parser(anyhow!(e).context("Error decoding valkey frame")) + })? { Some((frame, _size, bytes)) => { tracing::debug!( - "{}: incoming redis message:\n{}", + "{}: incoming valkey message:\n{}", self.direction, pretty_hex::pretty_hex(&bytes) ); let mut message = Message::from_bytes_and_frame_at_instant( bytes, - Frame::Redis(frame), + Frame::Valkey(frame), Some(received_at), ); @@ -119,7 +119,7 @@ impl Decoder for RedisDecoder { // * `unsubscribe` - a response to an UNSUBSCRIBE, PUNSUBSCRIBE or SUNSUBSCRIBE request // * `message` - a subscription message // - // Additionally redis will: + // Additionally valkey will: // * accept a few regular commands while in pubsub mode: PING, RESET and QUIT // * return an error response when a nonexistent or non pubsub compatible command is used // @@ -129,13 +129,13 @@ impl Decoder for RedisDecoder { // Determine if message is a `message` subscription message // - // Because PING, RESET, QUIT and error responses never return a RedisFrame::Array starting with `message`, + // Because PING, RESET, QUIT and error responses never return a ValkeyFrame::Array starting with `message`, // they have no way to collide with the `message` value of a subscription message. // So while we are in subscription mode we can use that to determine if an // incoming message is a subscription message. let is_subscription_message = if self.is_subscribed { - if let Some(Frame::Redis(RedisFrame::Array(array))) = message.frame() { - if let [RedisFrame::BulkString(ty), ..] = array.as_slice() { + if let Some(Frame::Valkey(ValkeyFrame::Array(array))) = message.frame() { + if let [ValkeyFrame::BulkString(ty), ..] = array.as_slice() { ty.as_ref() == b"message" } else { false @@ -158,15 +158,15 @@ impl Decoder for RedisDecoder { if !is_subscription_message { if let Some(rx) = self.request_header_rx.as_ref() { let request_info = rx.recv().map_err(|_| { - CodecReadError::Parser(anyhow!("redis encoder half was lost")) + CodecReadError::Parser(anyhow!("valkey encoder half was lost")) })?; message.set_request_id(request_info.id); match request_info.ty { RequestType::Subscribe | RequestType::Unsubscribe => { - if let Some(Frame::Redis(RedisFrame::Array(array))) = + if let Some(Frame::Valkey(ValkeyFrame::Array(array))) = message.frame() { - if let Some(RedisFrame::Integer( + if let Some(ValkeyFrame::Integer( number_of_subscribed_channels, )) = array.get(2) { @@ -188,7 +188,7 @@ impl Decoder for RedisDecoder { } } -impl RedisEncoder { +impl ValkeyEncoder { pub fn new( request_header_tx: Option>, direction: Direction, @@ -202,18 +202,18 @@ impl RedisEncoder { } } -impl Encoder for RedisEncoder { +impl Encoder for ValkeyEncoder { type Error = CodecWriteError; fn encode(&mut self, item: Messages, dst: &mut BytesMut) -> Result<(), Self::Error> { item.into_iter().try_for_each(|mut m| { let start = dst.len(); - m.ensure_message_type(MessageType::Redis) + m.ensure_message_type(MessageType::Valkey) .map_err(CodecWriteError::Encoder)?; let received_at = m.received_from_source_or_sink_at; if let Some(tx) = self.request_header_tx.as_ref() { - let ty = if let Some(Frame::Redis(RedisFrame::Array(array))) = m.frame() { - if let Some(RedisFrame::BulkString(bytes)) = array.first() { + let ty = if let Some(Frame::Valkey(ValkeyFrame::Array(array))) = m.frame() { + if let Some(ValkeyFrame::BulkString(bytes)) = array.first() { match bytes.to_ascii_uppercase().as_slice() { b"SUBSCRIBE" | b"PSUBSCRIBE" | b"SSUBSCRIBE" => RequestType::Subscribe, b"UNSUBSCRIBE" | b"PUNSUBSCRIBE" | b"SUNSUBSCRIBE" => { @@ -237,17 +237,17 @@ impl Encoder for RedisEncoder { Ok(()) } Encodable::Frame(frame) => { - let item = frame.into_redis().unwrap(); + let item = frame.into_valkey().unwrap(); extend_encode(dst, &item) .map(|_| ()) - .map_err(|e| anyhow!("Redis encoding error: {} - {:#?}", e, item)) + .map_err(|e| anyhow!("Valkey encoding error: {} - {:#?}", e, item)) } }; if let Some(received_at) = received_at { self.message_latency.record(received_at.elapsed()); } tracing::debug!( - "{}: outgoing redis message:\n{}", + "{}: outgoing valkey message:\n{}", self.direction, pretty_hex::pretty_hex(&&dst[start..]) ); @@ -257,9 +257,9 @@ impl Encoder for RedisEncoder { } #[cfg(test)] -mod redis_tests { +mod valkey_tests { - use crate::codec::{redis::RedisCodecBuilder, CodecBuilder, Direction}; + use crate::codec::{redis::ValkeyCodecBuilder, CodecBuilder, Direction}; use bytes::BytesMut; use hex_literal::hex; use pretty_assertions::assert_eq; @@ -289,7 +289,7 @@ mod redis_tests { fn test_frame(raw_frame: &[u8]) { let (mut decoder, mut encoder) = - RedisCodecBuilder::new(Direction::Source, "redis".to_owned()).build(); + ValkeyCodecBuilder::new(Direction::Source, "valkey".to_owned()).build(); let message = decoder .decode(&mut BytesMut::from(raw_frame)) .unwrap() diff --git a/shotover/src/config/topology.rs b/shotover/src/config/topology.rs index 6535be7f0..162b1a523 100644 --- a/shotover/src/config/topology.rs +++ b/shotover/src/config/topology.rs @@ -87,7 +87,7 @@ mod topology_tests { use crate::transforms::null::NullSinkConfig; use crate::transforms::TransformConfig; use crate::{ - sources::{redis::RedisConfig, Source, SourceConfig}, + sources::{redis::ValkeyConfig, Source, SourceConfig}, transforms::{ parallel_map::ParallelMapConfig, redis::cache::RedisConfig as RedisCacheConfig, }, @@ -96,8 +96,8 @@ mod topology_tests { use std::collections::HashMap; use tokio::sync::watch; - fn create_source_from_chain_redis(chain: Vec>) -> Vec { - vec![SourceConfig::Redis(RedisConfig { + fn create_source_from_chain_valkey(chain: Vec>) -> Vec { + vec![SourceConfig::Valkey(ValkeyConfig { name: "foo".to_string(), listen_addr: "127.0.0.1:0".to_string(), connection_limit: None, @@ -123,10 +123,10 @@ mod topology_tests { })] } - async fn run_test_topology_redis( + async fn run_test_topology_valkey( chain: Vec>, ) -> anyhow::Result> { - let sources = create_source_from_chain_redis(chain); + let sources = create_source_from_chain_valkey(chain); let topology = Topology { sources }; @@ -155,7 +155,7 @@ foo source: Chain cannot be empty "#; - let error = run_test_topology_redis(vec![]) + let error = run_test_topology_valkey(vec![]) .await .unwrap_err() .to_string(); @@ -164,7 +164,7 @@ foo source: #[tokio::test] async fn test_validate_chain_valid_chain() { - run_test_topology_redis(vec![Box::new(DebugPrinterConfig), Box::new(NullSinkConfig)]) + run_test_topology_valkey(vec![Box::new(DebugPrinterConfig), Box::new(NullSinkConfig)]) .await .unwrap(); } @@ -183,7 +183,7 @@ foo source: Check https://docs.shotover.io/transforms.html#coalesce for more information. "#; - let error = run_test_topology_redis(vec![ + let error = run_test_topology_valkey(vec![ Box::new(CoalesceConfig { flush_when_buffered_message_count: None, flush_when_millis_since_last_flush: None, @@ -205,7 +205,7 @@ foo source: Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain. "#; - let error = run_test_topology_redis(vec![ + let error = run_test_topology_valkey(vec![ Box::new(DebugPrinterConfig), Box::new(NullSinkConfig), Box::new(NullSinkConfig), @@ -225,7 +225,7 @@ foo source: Non-terminating transform "DebugPrinter" is last in chain. Last transform must be terminating. "#; - let error = run_test_topology_redis(vec![ + let error = run_test_topology_valkey(vec![ Box::new(DebugPrinterConfig), Box::new(DebugPrinterConfig), Box::new(DebugPrinterConfig), @@ -246,7 +246,7 @@ foo source: Non-terminating transform "DebugPrinter" is last in chain. Last transform must be terminating. "#; - let error = run_test_topology_redis(vec![ + let error = run_test_topology_valkey(vec![ Box::new(DebugPrinterConfig), Box::new(DebugPrinterConfig), Box::new(NullSinkConfig), @@ -260,7 +260,7 @@ foo source: } #[tokio::test] - async fn test_validate_chain_valid_subchain_cassandra_redis_cache() { + async fn test_validate_chain_valid_subchain_cassandra_valkey_cache() { let caching_schema = HashMap::new(); run_test_topology_cassandra(vec![ @@ -281,7 +281,7 @@ foo source: } #[tokio::test] - async fn test_validate_chain_invalid_subchain_cassandra_redis_cache() { + async fn test_validate_chain_invalid_subchain_cassandra_valkey_cache() { let expected = r#"Topology errors foo source: foo chain: @@ -313,7 +313,7 @@ foo source: #[tokio::test] async fn test_validate_chain_valid_subchain_parallel_map() { - run_test_topology_redis(vec![ + run_test_topology_valkey(vec![ Box::new(DebugPrinterConfig), Box::new(DebugPrinterConfig), Box::new(ParallelMapConfig { @@ -340,7 +340,7 @@ foo source: Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain. "#; - let error = run_test_topology_redis(vec![ + let error = run_test_topology_valkey(vec![ Box::new(DebugPrinterConfig), Box::new(DebugPrinterConfig), Box::new(ParallelMapConfig { @@ -378,7 +378,7 @@ foo source: Box::new(NullSinkConfig), ]); - let error = run_test_topology_redis(vec![ + let error = run_test_topology_valkey(vec![ Box::new(DebugPrinterConfig), Box::new(DebugPrinterConfig), Box::new(ParallelMapConfig { @@ -409,7 +409,7 @@ foo source: Box::new(DebugPrinterConfig), ]); - let error = run_test_topology_redis(vec![ + let error = run_test_topology_valkey(vec![ Box::new(DebugPrinterConfig), Box::new(DebugPrinterConfig), Box::new(ParallelMapConfig { @@ -442,7 +442,7 @@ foo source: Box::new(DebugPrinterConfig), ]); - let error = run_test_topology_redis(vec![ + let error = run_test_topology_valkey(vec![ Box::new(DebugPrinterConfig), Box::new(DebugPrinterConfig), Box::new(ParallelMapConfig { @@ -464,8 +464,8 @@ foo source: Source name "foo" occurred more than once. Make sure all source names are unique. The names will be used in logging and metrics. "#; - let mut sources = create_source_from_chain_redis(vec![Box::new(NullSinkConfig)]); - sources.extend(create_source_from_chain_redis(vec![Box::new( + let mut sources = create_source_from_chain_valkey(vec![Box::new(NullSinkConfig)]); + sources.extend(create_source_from_chain_valkey(vec![Box::new( NullSinkConfig, )])); diff --git a/shotover/src/connection.rs b/shotover/src/connection.rs index df4195748..83ab04aa9 100644 --- a/shotover/src/connection.rs +++ b/shotover/src/connection.rs @@ -536,7 +536,7 @@ impl DummyResponseInserter { #[cfg(all(test, feature = "redis"))] mod tests { use super::DummyResponseInserter; - use crate::frame::{Frame, RedisFrame}; + use crate::frame::{Frame, ValkeyFrame}; use crate::message::Message; use pretty_assertions::assert_eq; @@ -544,18 +544,18 @@ mod tests { Message::from_frame(Frame::Dummy) } - fn redis_request() -> Message { - Message::from_frame(Frame::Redis(RedisFrame::Null)) + fn valkey_request() -> Message { + Message::from_frame(Frame::Valkey(ValkeyFrame::Null)) } - fn redis_response(request: &Message) -> Message { - let mut message = Message::from_frame(Frame::Redis(RedisFrame::Null)); + fn valkey_response(request: &Message) -> Message { + let mut message = Message::from_frame(Frame::Valkey(ValkeyFrame::Null)); message.set_request_id(request.id()); message } - fn redis_response_unsolicited() -> Message { - Message::from_frame(Frame::Redis(RedisFrame::Null)) + fn valkey_response_unsolicited() -> Message { + Message::from_frame(Frame::Valkey(ValkeyFrame::Null)) } #[test] @@ -583,35 +583,35 @@ mod tests { assert_eq!(inserter.pending_requests_count(), 0); } - // send one redis request + // send one valkey request { - let mut requests = vec![redis_request()]; + let mut requests = vec![valkey_request()]; inserter.process_requests(&mut requests); let mut responses = vec![]; inserter.process_responses(&mut responses, 0); assert_eq!(responses, []); - // received redis response + // received valkey response assert_eq!(inserter.pending_requests_count(), 1); - responses.insert(0, redis_response(&requests[0])); + responses.insert(0, valkey_response(&requests[0])); inserter.process_responses(&mut responses, 0); - assert_eq!(responses, vec![redis_response(&requests[0])]); + assert_eq!(responses, vec![valkey_response(&requests[0])]); assert_eq!(inserter.pending_requests_count(), 0); } - // send one dummy request and then one redis request + // send one dummy request and then one valkey request { - let mut requests = vec![dummy(), redis_request()]; + let mut requests = vec![dummy(), valkey_request()]; inserter.process_requests(&mut requests); let mut responses = vec![]; inserter.process_responses(&mut responses, 0); assert_eq!(responses, vec![dummy()]); - // received redis response + // received valkey response assert_eq!(inserter.pending_requests_count(), 1); - responses.insert(1, redis_response(&requests[1])); + responses.insert(1, valkey_response(&requests[1])); inserter.process_responses(&mut responses, 1); - assert_eq!(responses, vec![dummy(), redis_response(&requests[1])]); + assert_eq!(responses, vec![dummy(), valkey_response(&requests[1])]); assert_eq!(inserter.pending_requests_count(), 0); // try_recv with no responses @@ -620,22 +620,22 @@ mod tests { assert_eq!(inserter.pending_requests_count(), 0); } - // send one redis request and then one dummy request + // send one valkey request and then one dummy request { - let mut requests = vec![redis_request(), dummy()]; + let mut requests = vec![valkey_request(), dummy()]; inserter.process_requests(&mut requests); let mut responses = vec![]; inserter.process_responses(&mut responses, 0); assert_eq!(responses, vec![]); - // received redis response + // received valkey response assert_eq!(inserter.pending_requests_count(), 2); - responses.insert(0, redis_response(&requests[0])); + responses.insert(0, valkey_response(&requests[0])); inserter.process_responses(&mut responses, 0); - assert_eq!(responses, vec![redis_response(&requests[0]), dummy()]); + assert_eq!(responses, vec![valkey_response(&requests[0]), dummy()]); assert_eq!(inserter.pending_requests_count(), 0); inserter.process_responses(&mut responses, 2); - assert_eq!(responses, vec![redis_response(&requests[0]), dummy()]); + assert_eq!(responses, vec![valkey_response(&requests[0]), dummy()]); assert_eq!(inserter.pending_requests_count(), 0); // try_recv with no responses @@ -644,27 +644,27 @@ mod tests { assert_eq!(inserter.pending_requests_count(), 0); } - // send one redis request and then one dummy request - // an unsolicited response is received before and after the redis response + // send one valkey request and then one dummy request + // an unsolicited response is received before and after the valkey response { - let mut requests = vec![redis_request(), dummy()]; + let mut requests = vec![valkey_request(), dummy()]; inserter.process_requests(&mut requests); let mut responses = vec![]; inserter.process_responses(&mut responses, 0); assert_eq!(responses, vec![]); - // received redis response surrounded by unsolicted responses + // received valkey response surrounded by unsolicted responses assert_eq!(inserter.pending_requests_count(), 2); - responses.insert(0, redis_response_unsolicited()); - responses.insert(0, redis_response(&requests[1])); - responses.insert(0, redis_response_unsolicited()); + responses.insert(0, valkey_response_unsolicited()); + responses.insert(0, valkey_response(&requests[1])); + responses.insert(0, valkey_response_unsolicited()); inserter.process_responses(&mut responses, 0); assert_eq!( responses, vec![ - redis_response_unsolicited(), - redis_response(&requests[0]), - redis_response_unsolicited(), + valkey_response_unsolicited(), + valkey_response(&requests[0]), + valkey_response_unsolicited(), dummy() ] ); @@ -673,9 +673,9 @@ mod tests { assert_eq!( responses, vec![ - redis_response_unsolicited(), - redis_response(&requests[0]), - redis_response_unsolicited(), + valkey_response_unsolicited(), + valkey_response(&requests[0]), + valkey_response_unsolicited(), dummy() ] ); diff --git a/shotover/src/frame/mod.rs b/shotover/src/frame/mod.rs index aa9c61745..67335d838 100644 --- a/shotover/src/frame/mod.rs +++ b/shotover/src/frame/mod.rs @@ -14,7 +14,7 @@ use kafka::KafkaFrame; #[cfg(feature = "opensearch")] pub use opensearch::OpenSearchFrame; #[cfg(feature = "redis")] -pub use redis_protocol::resp2::types::BytesFrame as RedisFrame; +pub use redis_protocol::resp2::types::BytesFrame as ValkeyFrame; use std::fmt::{Display, Formatter, Result as FmtResult}; #[cfg(feature = "cassandra")] @@ -30,7 +30,7 @@ pub mod value; #[derive(PartialEq, Debug, Clone, Copy)] pub enum MessageType { #[cfg(feature = "redis")] - Redis, + Valkey, #[cfg(feature = "cassandra")] Cassandra, #[cfg(feature = "kafka")] @@ -46,7 +46,7 @@ impl MessageType { #[cfg(feature = "cassandra")] MessageType::Cassandra => false, #[cfg(feature = "redis")] - MessageType::Redis => true, + MessageType::Valkey => true, #[cfg(feature = "kafka")] MessageType::Kafka => true, #[cfg(feature = "opensearch")] @@ -60,7 +60,7 @@ impl MessageType { #[cfg(feature = "cassandra")] MessageType::Cassandra => "cql", #[cfg(feature = "redis")] - MessageType::Redis => "redis", + MessageType::Valkey => "redis", #[cfg(feature = "kafka")] MessageType::Kafka => "kafka", #[cfg(feature = "opensearch")] @@ -76,7 +76,7 @@ impl From<&CodecState> for MessageType { #[cfg(feature = "cassandra")] CodecState::Cassandra { .. } => Self::Cassandra, #[cfg(feature = "redis")] - CodecState::Redis => Self::Redis, + CodecState::Valkey => Self::Valkey, #[cfg(feature = "kafka")] CodecState::Kafka { .. } => Self::Kafka, #[cfg(feature = "opensearch")] @@ -94,7 +94,7 @@ impl Frame { compression: Compression::None, }, #[cfg(feature = "redis")] - Frame::Redis(_) => CodecState::Redis, + Frame::Valkey(_) => CodecState::Valkey, #[cfg(feature = "kafka")] Frame::Kafka(_) => CodecState::Kafka(KafkaCodecState { request_header: None, @@ -112,7 +112,7 @@ pub enum Frame { #[cfg(feature = "cassandra")] Cassandra(CassandraFrame), #[cfg(feature = "redis")] - Redis(RedisFrame), + Valkey(ValkeyFrame), #[cfg(feature = "kafka")] Kafka(KafkaFrame), /// Represents a message that must exist due to shotovers requirement that every request has a corresponding response. @@ -134,8 +134,8 @@ impl Frame { CassandraFrame::from_bytes(bytes, codec_state.as_cassandra()).map(Frame::Cassandra) } #[cfg(feature = "redis")] - MessageType::Redis => redis_protocol::resp2::decode::decode_bytes(&bytes) - .map(|x| Frame::Redis(x.unwrap().0)) + MessageType::Valkey => redis_protocol::resp2::decode::decode_bytes(&bytes) + .map(|x| Frame::Valkey(x.unwrap().0)) .map_err(|e| anyhow!("{e:?}")), #[cfg(feature = "kafka")] MessageType::Kafka => { @@ -150,7 +150,7 @@ impl Frame { pub fn name(&self) -> &'static str { match self { #[cfg(feature = "redis")] - Frame::Redis(_) => "Redis", + Frame::Valkey(_) => "Valkey", #[cfg(feature = "cassandra")] Frame::Cassandra(_) => "Cassandra", #[cfg(feature = "kafka")] @@ -166,7 +166,7 @@ impl Frame { #[cfg(feature = "cassandra")] Frame::Cassandra(_) => MessageType::Cassandra, #[cfg(feature = "redis")] - Frame::Redis(_) => MessageType::Redis, + Frame::Valkey(_) => MessageType::Valkey, #[cfg(feature = "kafka")] Frame::Kafka(_) => MessageType::Kafka, Frame::Dummy => MessageType::Dummy, @@ -176,11 +176,11 @@ impl Frame { } #[cfg(feature = "redis")] - pub fn redis(&mut self) -> Result<&mut RedisFrame> { + pub fn valkey(&mut self) -> Result<&mut ValkeyFrame> { match self { - Frame::Redis(frame) => Ok(frame), + Frame::Valkey(frame) => Ok(frame), frame => Err(anyhow!( - "Expected redis frame but received {} frame", + "Expected valkey frame but received {} frame", frame.name() )), } @@ -198,11 +198,11 @@ impl Frame { } #[cfg(feature = "redis")] - pub fn into_redis(self) -> Result { + pub fn into_valkey(self) -> Result { match self { - Frame::Redis(frame) => Ok(frame), + Frame::Valkey(frame) => Ok(frame), frame => Err(anyhow!( - "Expected redis frame but received {} frame", + "Expected valkey frame but received {} frame", frame.name() )), } @@ -239,7 +239,7 @@ impl Display for Frame { #[cfg(feature = "cassandra")] Frame::Cassandra(frame) => write!(f, "Cassandra {}", frame), #[cfg(feature = "redis")] - Frame::Redis(frame) => write!(f, "Redis {:?}", frame), + Frame::Valkey(frame) => write!(f, "Valkey {:?}", frame), #[cfg(feature = "kafka")] Frame::Kafka(frame) => write!(f, "Kafka {}", frame), Frame::Dummy => write!(f, "Shotover internal dummy message"), diff --git a/shotover/src/frame/redis.rs b/shotover/src/frame/redis.rs index 4d9019e96..d70d0ede1 100644 --- a/shotover/src/frame/redis.rs +++ b/shotover/src/frame/redis.rs @@ -1,10 +1,10 @@ -use crate::frame::RedisFrame; +use crate::frame::ValkeyFrame; use crate::message::QueryType; #[inline] -pub fn redis_query_type(frame: &RedisFrame) -> QueryType { - if let RedisFrame::Array(frames) = frame { - if let Some(RedisFrame::BulkString(bytes)) = frames.first() { +pub fn valkey_query_type(frame: &ValkeyFrame) -> QueryType { + if let ValkeyFrame::Array(frames) = frame { + if let Some(ValkeyFrame::BulkString(bytes)) = frames.first() { return match bytes.to_ascii_uppercase().as_slice() { b"APPEND" | b"BITCOUNT" | b"STRLEN" | b"GET" | b"GETRANGE" | b"MGET" | b"LRANGE" | b"LINDEX" | b"LLEN" | b"SCARD" | b"SISMEMBER" | b"SMEMBERS" @@ -18,9 +18,9 @@ pub fn redis_query_type(frame: &RedisFrame) -> QueryType { QueryType::Write } -pub fn redis_query_name(frame: &RedisFrame) -> Option { - if let RedisFrame::Array(array) = frame { - if let Some(RedisFrame::BulkString(v)) = array.first() { +pub fn valkey_query_name(frame: &ValkeyFrame) -> Option { + if let ValkeyFrame::Array(array) = frame { + if let Some(ValkeyFrame::BulkString(v)) = array.first() { let upper_bytes = v.to_ascii_uppercase(); match String::from_utf8(upper_bytes) { Ok(query_type) => { @@ -28,7 +28,7 @@ pub fn redis_query_name(frame: &RedisFrame) -> Option { } Err(err) => { tracing::error!( - "Failed to convert redis bulkstring to string, err: {:?}", + "Failed to convert valkey bulkstring to string, err: {:?}", err ) } diff --git a/shotover/src/frame/value/redis.rs b/shotover/src/frame/value/redis.rs index 7b970197b..10466bdbc 100644 --- a/shotover/src/frame/value/redis.rs +++ b/shotover/src/frame/value/redis.rs @@ -1,51 +1,51 @@ use super::{GenericValue, IntSize}; -use crate::frame::RedisFrame; +use crate::frame::ValkeyFrame; -impl From for GenericValue { - fn from(f: RedisFrame) -> Self { +impl From for GenericValue { + fn from(f: ValkeyFrame) -> Self { match f { - RedisFrame::SimpleString(s) => { + ValkeyFrame::SimpleString(s) => { GenericValue::Strings(String::from_utf8_lossy(&s).to_string()) } - RedisFrame::Error(e) => GenericValue::Strings(e.to_string()), - RedisFrame::Integer(i) => GenericValue::Integer(i, IntSize::I64), - RedisFrame::BulkString(b) => GenericValue::Bytes(b), - RedisFrame::Array(a) => { + ValkeyFrame::Error(e) => GenericValue::Strings(e.to_string()), + ValkeyFrame::Integer(i) => GenericValue::Integer(i, IntSize::I64), + ValkeyFrame::BulkString(b) => GenericValue::Bytes(b), + ValkeyFrame::Array(a) => { GenericValue::List(a.iter().cloned().map(GenericValue::from).collect()) } - RedisFrame::Null => GenericValue::Null, + ValkeyFrame::Null => GenericValue::Null, } } } -impl From<&RedisFrame> for GenericValue { - fn from(f: &RedisFrame) -> Self { +impl From<&ValkeyFrame> for GenericValue { + fn from(f: &ValkeyFrame) -> Self { match f.clone() { - RedisFrame::SimpleString(s) => { + ValkeyFrame::SimpleString(s) => { GenericValue::Strings(String::from_utf8_lossy(s.as_ref()).to_string()) } - RedisFrame::Error(e) => GenericValue::Strings(e.to_string()), - RedisFrame::Integer(i) => GenericValue::Integer(i, IntSize::I64), - RedisFrame::BulkString(b) => GenericValue::Bytes(b), - RedisFrame::Array(a) => { + ValkeyFrame::Error(e) => GenericValue::Strings(e.to_string()), + ValkeyFrame::Integer(i) => GenericValue::Integer(i, IntSize::I64), + ValkeyFrame::BulkString(b) => GenericValue::Bytes(b), + ValkeyFrame::Array(a) => { GenericValue::List(a.iter().cloned().map(GenericValue::from).collect()) } - RedisFrame::Null => GenericValue::Null, + ValkeyFrame::Null => GenericValue::Null, } } } -impl From for RedisFrame { - fn from(value: GenericValue) -> RedisFrame { +impl From for ValkeyFrame { + fn from(value: GenericValue) -> ValkeyFrame { match value { - GenericValue::Null => RedisFrame::Null, - GenericValue::Bytes(b) => RedisFrame::BulkString(b), - GenericValue::Strings(s) => RedisFrame::SimpleString(s.into()), - GenericValue::Integer(i, _) => RedisFrame::Integer(i), - GenericValue::Float(f) => RedisFrame::SimpleString(f.to_string().into()), - GenericValue::Boolean(b) => RedisFrame::Integer(i64::from(b)), - GenericValue::Inet(i) => RedisFrame::SimpleString(i.to_string().into()), - GenericValue::List(l) => RedisFrame::Array(l.into_iter().map(|v| v.into()).collect()), + GenericValue::Null => ValkeyFrame::Null, + GenericValue::Bytes(b) => ValkeyFrame::BulkString(b), + GenericValue::Strings(s) => ValkeyFrame::SimpleString(s.into()), + GenericValue::Integer(i, _) => ValkeyFrame::Integer(i), + GenericValue::Float(f) => ValkeyFrame::SimpleString(f.to_string().into()), + GenericValue::Boolean(b) => ValkeyFrame::Integer(i64::from(b)), + GenericValue::Inet(i) => ValkeyFrame::SimpleString(i.to_string().into()), + GenericValue::List(l) => ValkeyFrame::Array(l.into_iter().map(|v| v.into()).collect()), GenericValue::Ascii(_a) => todo!(), GenericValue::Double(_d) => todo!(), GenericValue::Set(_s) => todo!(), diff --git a/shotover/src/message/mod.rs b/shotover/src/message/mod.rs index 160f7888f..587113d2c 100644 --- a/shotover/src/message/mod.rs +++ b/shotover/src/message/mod.rs @@ -4,7 +4,7 @@ use crate::codec::CodecState; #[cfg(feature = "cassandra")] use crate::frame::{cassandra, cassandra::CassandraMetadata}; #[cfg(feature = "redis")] -use crate::frame::{redis::redis_query_type, RedisFrame}; +use crate::frame::{redis::valkey_query_type, ValkeyFrame}; use crate::frame::{Frame, MessageType}; use anyhow::{anyhow, Context, Result}; use bytes::Bytes; @@ -23,7 +23,7 @@ pub enum Metadata { #[cfg(feature = "cassandra")] Cassandra(CassandraMetadata), #[cfg(feature = "redis")] - Redis, + Valkey, #[cfg(feature = "kafka")] Kafka, #[cfg(feature = "opensearch")] @@ -38,12 +38,12 @@ impl Metadata { #[allow(unreachable_code)] Ok(Message::from_frame(match self { #[cfg(feature = "redis")] - Metadata::Redis => { - // Redis errors can not contain newlines at the protocol level + Metadata::Valkey => { + // Valkey errors can not contain newlines at the protocol level let message = format!("ERR {error}") .replace("\r\n", " ") .replace('\n', " "); - Frame::Redis(RedisFrame::Error(message.into())) + Frame::Valkey(ValkeyFrame::Error(message.into())) } #[cfg(feature = "cassandra")] Metadata::Cassandra(meta) => Frame::Cassandra(meta.to_error_response(error)), @@ -126,7 +126,7 @@ impl Message { } /// This method should be called when you have both a Frame and matching raw bytes of a message. - /// This is expected to be used only by codecs that are decoding a protocol that does not include length of the message in the header. e.g. redis + /// This is expected to be used only by codecs that are decoding a protocol that does not include length of the message in the header. e.g. valkey /// Providing both the raw bytes and Frame results in better performance if they are both already available. pub fn from_bytes_and_frame_at_instant( bytes: Bytes, @@ -239,7 +239,7 @@ impl Message { /// Return the MessageId of the request that resulted in this message /// Returns None when: /// * The message is a request - /// * The message is a response but was not created in response to a request. e.g. Cassandra events and redis pubsub + /// * The message is a response but was not created in response to a request. e.g. Cassandra events and valkey pubsub pub fn request_id(&self) -> Option { self.request_id } @@ -332,7 +332,7 @@ impl Message { .. } => match message_type { #[cfg(feature = "redis")] - MessageType::Redis => nonzero!(1u32), + MessageType::Valkey => nonzero!(1u32), #[cfg(feature = "cassandra")] MessageType::Cassandra => cassandra::raw_frame::cell_count(bytes)?, #[cfg(feature = "kafka")] @@ -345,7 +345,7 @@ impl Message { #[cfg(feature = "cassandra")] Frame::Cassandra(frame) => frame.cell_count()?, #[cfg(feature = "redis")] - Frame::Redis(_) => nonzero!(1u32), + Frame::Valkey(_) => nonzero!(1u32), #[cfg(feature = "kafka")] Frame::Kafka(_) => todo!(), Frame::Dummy => nonzero!(1u32), @@ -376,7 +376,7 @@ impl Message { #[cfg(feature = "cassandra")] Some(Frame::Cassandra(cassandra)) => cassandra.get_query_type(), #[cfg(feature = "redis")] - Some(Frame::Redis(redis)) => redis_query_type(redis), // free-standing function as we cant define methods on RedisFrame + Some(Frame::Valkey(valkey)) => valkey_query_type(valkey), // free-standing function as we cant define methods on ValkeyFrame #[cfg(feature = "kafka")] Some(Frame::Kafka(_)) => todo!(), Some(Frame::Dummy) => todo!(), @@ -425,7 +425,7 @@ impl Message { Ok(Metadata::Cassandra(cassandra::raw_frame::metadata(bytes)?)) } #[cfg(feature = "redis")] - MessageType::Redis => Ok(Metadata::Redis), + MessageType::Valkey => Ok(Metadata::Valkey), #[cfg(feature = "kafka")] MessageType::Kafka => Ok(Metadata::Kafka), MessageType::Dummy => Err(anyhow!("Dummy has no metadata")), @@ -438,7 +438,7 @@ impl Message { #[cfg(feature = "kafka")] Frame::Kafka(_) => Ok(Metadata::Kafka), #[cfg(feature = "redis")] - Frame::Redis(_) => Ok(Metadata::Redis), + Frame::Valkey(_) => Ok(Metadata::Valkey), Frame::Dummy => Err(anyhow!("dummy has no metadata")), #[cfg(feature = "opensearch")] Frame::OpenSearch(_) => Err(anyhow!("OpenSearch has no metadata")), @@ -460,7 +460,7 @@ impl Message { pub(crate) fn response_is_dummy(&mut self) -> bool { match self.message_type() { #[cfg(feature = "redis")] - MessageType::Redis => false, + MessageType::Valkey => false, #[cfg(feature = "cassandra")] MessageType::Cassandra => false, #[cfg(feature = "kafka")] @@ -495,7 +495,7 @@ impl Message { #[cfg(feature = "cassandra")] Metadata::Cassandra(metadata) => Frame::Cassandra(metadata.backpressure_response()), #[cfg(feature = "redis")] - Metadata::Redis => unimplemented!(), + Metadata::Valkey => unimplemented!(), #[cfg(feature = "kafka")] Metadata::Kafka => unimplemented!(), #[cfg(feature = "opensearch")] @@ -533,7 +533,7 @@ impl Message { #[cfg(feature = "cassandra")] Frame::Cassandra(cassandra) => Some(cassandra.stream_id), #[cfg(feature = "redis")] - Frame::Redis(_) => None, + Frame::Valkey(_) => None, #[cfg(feature = "kafka")] Frame::Kafka(_) => None, Frame::Dummy => None, diff --git a/shotover/src/server.rs b/shotover/src/server.rs index 15dfa1ab1..bbb6e4df3 100644 --- a/shotover/src/server.rs +++ b/shotover/src/server.rs @@ -837,7 +837,7 @@ impl PendingRequests { fn new(message_type: MessageType) -> Self { match message_type { #[cfg(feature = "redis")] - MessageType::Redis => PendingRequests::Ordered(vec![]), + MessageType::Valkey => PendingRequests::Ordered(vec![]), #[cfg(feature = "cassandra")] MessageType::Cassandra => PendingRequests::Unordered(Default::default()), #[cfg(feature = "kafka")] diff --git a/shotover/src/sources/mod.rs b/shotover/src/sources/mod.rs index 646577182..b27f92933 100644 --- a/shotover/src/sources/mod.rs +++ b/shotover/src/sources/mod.rs @@ -7,7 +7,7 @@ use crate::sources::kafka::{KafkaConfig, KafkaSource}; #[cfg(feature = "opensearch")] use crate::sources::opensearch::{OpenSearchConfig, OpenSearchSource}; #[cfg(feature = "redis")] -use crate::sources::redis::{RedisConfig, RedisSource}; +use crate::sources::redis::{ValkeyConfig, ValkeySource}; use anyhow::Result; use serde::{Deserialize, Serialize}; use tokio::sync::watch; @@ -34,7 +34,7 @@ pub enum Source { #[cfg(feature = "cassandra")] Cassandra(CassandraSource), #[cfg(feature = "redis")] - Redis(RedisSource), + Valkey(ValkeySource), #[cfg(feature = "kafka")] Kafka(KafkaSource), #[cfg(feature = "opensearch")] @@ -47,7 +47,7 @@ impl Source { #[cfg(feature = "cassandra")] Source::Cassandra(c) => c.join_handle, #[cfg(feature = "redis")] - Source::Redis(r) => r.join_handle, + Source::Valkey(r) => r.join_handle, #[cfg(feature = "kafka")] Source::Kafka(r) => r.join_handle, #[cfg(feature = "opensearch")] @@ -62,7 +62,7 @@ pub enum SourceConfig { #[cfg(feature = "cassandra")] Cassandra(CassandraConfig), #[cfg(feature = "redis")] - Redis(RedisConfig), + Valkey(ValkeyConfig), #[cfg(feature = "kafka")] Kafka(KafkaConfig), #[cfg(feature = "opensearch")] @@ -78,7 +78,7 @@ impl SourceConfig { #[cfg(feature = "cassandra")] SourceConfig::Cassandra(c) => c.get_source(trigger_shutdown_rx).await, #[cfg(feature = "redis")] - SourceConfig::Redis(r) => r.get_source(trigger_shutdown_rx).await, + SourceConfig::Valkey(r) => r.get_source(trigger_shutdown_rx).await, #[cfg(feature = "kafka")] SourceConfig::Kafka(r) => r.get_source(trigger_shutdown_rx).await, #[cfg(feature = "opensearch")] @@ -91,7 +91,7 @@ impl SourceConfig { #[cfg(feature = "cassandra")] SourceConfig::Cassandra(c) => &c.name, #[cfg(feature = "redis")] - SourceConfig::Redis(r) => &r.name, + SourceConfig::Valkey(r) => &r.name, #[cfg(feature = "kafka")] SourceConfig::Kafka(r) => &r.name, #[cfg(feature = "opensearch")] diff --git a/shotover/src/sources/redis.rs b/shotover/src/sources/redis.rs index 837f0de77..90bc47765 100644 --- a/shotover/src/sources/redis.rs +++ b/shotover/src/sources/redis.rs @@ -1,4 +1,4 @@ -use crate::codec::{redis::RedisCodecBuilder, CodecBuilder, Direction}; +use crate::codec::{redis::ValkeyCodecBuilder, CodecBuilder, Direction}; use crate::config::chain::TransformChainConfig; use crate::server::TcpCodecListener; use crate::sources::{Source, Transport}; @@ -13,7 +13,7 @@ use tracing::{error, info}; #[derive(Serialize, Deserialize, Debug)] #[serde(deny_unknown_fields)] -pub struct RedisConfig { +pub struct ValkeyConfig { pub name: String, pub listen_addr: String, pub connection_limit: Option, @@ -23,13 +23,13 @@ pub struct RedisConfig { pub chain: TransformChainConfig, } -impl RedisConfig { +impl ValkeyConfig { pub async fn get_source( &self, trigger_shutdown_rx: watch::Receiver, ) -> Result> { - Ok(Source::Redis( - RedisSource::new( + Ok(Source::Valkey( + ValkeySource::new( self.name.clone(), &self.chain, self.listen_addr.clone(), @@ -45,11 +45,11 @@ impl RedisConfig { } #[derive(Debug)] -pub struct RedisSource { +pub struct ValkeySource { pub join_handle: JoinHandle<()>, } -impl RedisSource { +impl ValkeySource { #![allow(clippy::too_many_arguments)] pub async fn new( name: String, @@ -60,15 +60,15 @@ impl RedisSource { hard_connection_limit: Option, tls: Option, timeout: Option, - ) -> Result> { - info!("Starting Redis source on [{}]", listen_addr); + ) -> Result> { + info!("Starting Valkey source on [{}]", listen_addr); let mut listener = TcpCodecListener::new( chain_config, name.clone(), listen_addr.clone(), hard_connection_limit.unwrap_or(false), - RedisCodecBuilder::new(Direction::Source, name), + ValkeyCodecBuilder::new(Direction::Source, name), Arc::new(Semaphore::new(connection_limit.unwrap_or(512))), trigger_shutdown_rx.clone(), tls.as_ref().map(TlsAcceptor::new).transpose()?, @@ -93,6 +93,6 @@ impl RedisSource { } }); - Ok(RedisSource { join_handle }) + Ok(ValkeySource { join_handle }) } } diff --git a/shotover/src/transforms/coalesce.rs b/shotover/src/transforms/coalesce.rs index 7399351df..514e4f36d 100644 --- a/shotover/src/transforms/coalesce.rs +++ b/shotover/src/transforms/coalesce.rs @@ -111,7 +111,7 @@ impl Transform for Coalesce { #[cfg(all(test, feature = "redis"))] mod test { - use crate::frame::{Frame, RedisFrame}; + use crate::frame::{Frame, ValkeyFrame}; use crate::message::Message; use crate::transforms::chain::TransformAndMetrics; use crate::transforms::coalesce::Coalesce; @@ -132,7 +132,7 @@ mod test { let mut chain = vec![TransformAndMetrics::new(Box::new(Loopback::default()))]; let requests: Vec<_> = (0..25) - .map(|_| Message::from_frame(Frame::Redis(RedisFrame::Null))) + .map(|_| Message::from_frame(Frame::Valkey(ValkeyFrame::Null))) .collect(); assert_responses_len(&mut chain, &mut coalesce, &requests, 0).await; @@ -154,7 +154,7 @@ mod test { let mut chain = vec![TransformAndMetrics::new(Box::new(Loopback::default()))]; let requests: Vec<_> = (0..25) - .map(|_| Message::from_frame(Frame::Redis(RedisFrame::Null))) + .map(|_| Message::from_frame(Frame::Valkey(ValkeyFrame::Null))) .collect(); assert_responses_len(&mut chain, &mut coalesce, &requests, 0).await; @@ -177,7 +177,7 @@ mod test { let mut chain = vec![TransformAndMetrics::new(Box::new(Loopback::default()))]; let requests: Vec<_> = (0..25) - .map(|_| Message::from_frame(Frame::Redis(RedisFrame::Null))) + .map(|_| Message::from_frame(Frame::Valkey(ValkeyFrame::Null))) .collect(); assert_responses_len(&mut chain, &mut coalesce, &requests, 0).await; diff --git a/shotover/src/transforms/debug/returner.rs b/shotover/src/transforms/debug/returner.rs index 95c75c607..fa9b02b0b 100644 --- a/shotover/src/transforms/debug/returner.rs +++ b/shotover/src/transforms/debug/returner.rs @@ -40,7 +40,7 @@ pub enum Response { #[serde(skip)] Message(Message), #[cfg(feature = "redis")] - Redis(String), + Valkey(String), Fail, } @@ -89,10 +89,10 @@ impl Transform for DebugReturner { Ok(message) } #[cfg(feature = "redis")] - Response::Redis(string) => { - use crate::frame::{Frame, RedisFrame}; + Response::Valkey(string) => { + use crate::frame::{Frame, ValkeyFrame}; use crate::message::Message; - let mut message = Message::from_frame(Frame::Redis(RedisFrame::BulkString( + let mut message = Message::from_frame(Frame::Valkey(ValkeyFrame::BulkString( string.to_string().into(), ))); message.set_request_id(request.id()); diff --git a/shotover/src/transforms/filter.rs b/shotover/src/transforms/filter.rs index c6cad8f7f..f425d8f26 100644 --- a/shotover/src/transforms/filter.rs +++ b/shotover/src/transforms/filter.rs @@ -104,7 +104,7 @@ impl Transform for QueryTypeFilter { mod test { use super::Filter; use crate::frame::Frame; - use crate::frame::RedisFrame; + use crate::frame::ValkeyFrame; use crate::message::MessageIdMap; use crate::message::{Message, QueryType}; use crate::transforms::chain::TransformAndMetrics; @@ -125,15 +125,15 @@ mod test { let messages: Vec<_> = (0..26) .map(|i| { if i % 2 == 0 { - Message::from_frame(Frame::Redis(RedisFrame::Array(vec![ - RedisFrame::BulkString("GET".into()), - RedisFrame::BulkString("key".into()), + Message::from_frame(Frame::Valkey(ValkeyFrame::Array(vec![ + ValkeyFrame::BulkString("GET".into()), + ValkeyFrame::BulkString("key".into()), ]))) } else { - Message::from_frame(Frame::Redis(RedisFrame::Array(vec![ - RedisFrame::BulkString("SET".into()), - RedisFrame::BulkString("key".into()), - RedisFrame::BulkString("value".into()), + Message::from_frame(Frame::Valkey(ValkeyFrame::Array(vec![ + ValkeyFrame::BulkString("SET".into()), + ValkeyFrame::BulkString("key".into()), + ValkeyFrame::BulkString("value".into()), ]))) } }) @@ -150,17 +150,17 @@ mod test { if i % 2 == 0 { assert_eq!( frame, - &Frame::Redis(RedisFrame::Error( + &Frame::Valkey(ValkeyFrame::Error( "ERR Message was filtered out by shotover".into() )), ) } else { assert_eq!( frame, - &Frame::Redis(RedisFrame::Array(vec![ - RedisFrame::BulkString("SET".into()), - RedisFrame::BulkString("key".into()), - RedisFrame::BulkString("value".into()), + &Frame::Valkey(ValkeyFrame::Array(vec![ + ValkeyFrame::BulkString("SET".into()), + ValkeyFrame::BulkString("key".into()), + ValkeyFrame::BulkString("value".into()), ])) ) } @@ -180,15 +180,15 @@ mod test { let messages: Vec<_> = (0..26) .map(|i| { if i % 2 == 0 { - Message::from_frame(Frame::Redis(RedisFrame::Array(vec![ - RedisFrame::BulkString("GET".into()), - RedisFrame::BulkString("key".into()), + Message::from_frame(Frame::Valkey(ValkeyFrame::Array(vec![ + ValkeyFrame::BulkString("GET".into()), + ValkeyFrame::BulkString("key".into()), ]))) } else { - Message::from_frame(Frame::Redis(RedisFrame::Array(vec![ - RedisFrame::BulkString("SET".into()), - RedisFrame::BulkString("key".into()), - RedisFrame::BulkString("value".into()), + Message::from_frame(Frame::Valkey(ValkeyFrame::Array(vec![ + ValkeyFrame::BulkString("SET".into()), + ValkeyFrame::BulkString("key".into()), + ValkeyFrame::BulkString("value".into()), ]))) } }) @@ -205,17 +205,17 @@ mod test { if i % 2 == 0 { assert_eq!( frame, - &Frame::Redis(RedisFrame::Error( + &Frame::Valkey(ValkeyFrame::Error( "ERR Message was filtered out by shotover".into() )), ) } else { assert_eq!( frame, - &Frame::Redis(RedisFrame::Array(vec![ - RedisFrame::BulkString("SET".into()), - RedisFrame::BulkString("key".into()), - RedisFrame::BulkString("value".into()), + &Frame::Valkey(ValkeyFrame::Array(vec![ + ValkeyFrame::BulkString("SET".into()), + ValkeyFrame::BulkString("key".into()), + ValkeyFrame::BulkString("value".into()), ])) ) } diff --git a/shotover/src/transforms/query_counter.rs b/shotover/src/transforms/query_counter.rs index 41e7efa58..229f56636 100644 --- a/shotover/src/transforms/query_counter.rs +++ b/shotover/src/transforms/query_counter.rs @@ -77,11 +77,11 @@ impl Transform for QueryCounter { } } #[cfg(feature = "redis")] - Some(Frame::Redis(frame)) => { - if let Some(query_type) = crate::frame::redis::redis_query_name(frame) { - self.increment_counter(query_type, "redis"); + Some(Frame::Valkey(frame)) => { + if let Some(query_type) = crate::frame::redis::valkey_query_name(frame) { + self.increment_counter(query_type, "valkey"); } else { - self.increment_counter("unknown".to_string(), "redis"); + self.increment_counter("unknown".to_string(), "valkey"); } } #[cfg(feature = "kafka")] diff --git a/shotover/src/transforms/redis/cache.rs b/shotover/src/transforms/redis/cache.rs index 24d63ecfd..2e848c0d8 100644 --- a/shotover/src/transforms/redis/cache.rs +++ b/shotover/src/transforms/redis/cache.rs @@ -1,5 +1,5 @@ use crate::config::chain::TransformChainConfig; -use crate::frame::{CassandraFrame, CassandraOperation, Frame, MessageType, RedisFrame}; +use crate::frame::{CassandraFrame, CassandraOperation, Frame, MessageType, ValkeyFrame}; use crate::message::{Message, MessageIdMap, Messages, Metadata}; use crate::transforms::chain::{TransformChain, TransformChainBuilder}; use crate::transforms::{ @@ -20,7 +20,7 @@ use std::collections::{BTreeMap, HashMap}; use std::net::SocketAddr; use tracing::{error, warn}; -/// Data is stored in Redis as a Hash (hset/hget) and constructed from the cassandra SELECT statement +/// Data is stored in Valkey as a Hash (hset/hget) and constructed from the cassandra SELECT statement /// * The name of the hash is constructed from: the FROM component and partition + range keys as per the TableCacheSchema configuration /// * The name of the field in the hash is constructed from: the SELECT component and the WHERE component excluding the partition + range keys used in the hash name /// * The contents of field in the hash is: the raw bytes of a cassandra response from a SELECT @@ -35,7 +35,7 @@ use tracing::{error, warn}; /// range_key: [] /// then this cassandra query: /// `SELECT a, b, c as g FROM keyspace1.table2 WHERE e='foo' a[2]=3` -/// will result in this redis command: +/// will result in this valkey command: /// `hset "keyspace1.table2:'foo'" "a b c WHERE a[2]=3" $SELECT_RESPONSE_BYTES` // TODO: ensure quoted identifiers wont cause collisions in the above described format @@ -103,7 +103,7 @@ impl TransformConfig for RedisConfig { let transform_context_config = TransformContextConfig { chain_name: "cache_chain".into(), - up_chain_protocol: MessageType::Redis, + up_chain_protocol: MessageType::Valkey, }; Ok(Box::new(SimpleRedisCacheBuilder { @@ -185,10 +185,10 @@ impl SimpleRedisCache { match build_redis_key_from_cql3(query, table_cache_schema) { Ok(address) => { return Some(Message::from_frame_diverged( - Frame::Redis(RedisFrame::Array(vec![ - RedisFrame::BulkString("HGET".into()), - RedisFrame::BulkString(address.key), - RedisFrame::BulkString(address.field), + Frame::Valkey(ValkeyFrame::Array(vec![ + ValkeyFrame::BulkString("HGET".into()), + ValkeyFrame::BulkString(address.key), + ValkeyFrame::BulkString(address.field), ])), request, )); @@ -214,13 +214,13 @@ impl SimpleRedisCache { ) .expect("There must be a pending request, since we store a pending request for all redis requests"); let cassandra_frame = match redis_response.frame() { - Some(Frame::Redis(redis_frame)) => { + Some(Frame::Valkey(redis_frame)) => { match redis_frame { - RedisFrame::Error(err) => { + ValkeyFrame::Error(err) => { error!("Redis cache server returned error: {err:?}"); None } - RedisFrame::BulkString(redis_bytes) => { + ValkeyFrame::BulkString(redis_bytes) => { match CassandraFrame::from_bytes(redis_bytes.clone(), Compression::None) { Ok(mut response_frame) => { @@ -252,7 +252,7 @@ impl SimpleRedisCache { } } } - RedisFrame::Null => { + ValkeyFrame::Null => { self.missed_requests.increment(1); None } @@ -308,7 +308,7 @@ impl SimpleRedisCache { /// TODO make this drop only the specified keys not the entire cache fn drop_table(&self, _statement: &CassandraStatement, response: &Message) -> Message { Message::from_frame_at_instant( - Frame::Redis(RedisFrame::Array(vec![RedisFrame::BulkString( + Frame::Valkey(ValkeyFrame::Array(vec![ValkeyFrame::BulkString( "FLUSHDB".into(), )])), response.received_from_source_or_sink_at, @@ -328,9 +328,9 @@ impl SimpleRedisCache { build_redis_key_from_cql3(statement, table_cache_schema) { return Some(Message::from_frame_at_instant( - Frame::Redis(RedisFrame::Array(vec![ - RedisFrame::BulkString("DEL".into()), - RedisFrame::BulkString(address.key), + Frame::Valkey(ValkeyFrame::Array(vec![ + ValkeyFrame::BulkString("DEL".into()), + ValkeyFrame::BulkString(address.key), ])), response.received_from_source_or_sink_at, )); @@ -358,11 +358,11 @@ impl SimpleRedisCache { let encoded = frame.clone().encode(Compression::None); return Ok(Some(Message::from_frame_at_instant( - Frame::Redis(RedisFrame::Array(vec![ - RedisFrame::BulkString("HSET".into()), - RedisFrame::BulkString(address.key), - RedisFrame::BulkString(address.field), - RedisFrame::BulkString(encoded.into()), + Frame::Valkey(ValkeyFrame::Array(vec![ + ValkeyFrame::BulkString("HSET".into()), + ValkeyFrame::BulkString(address.key), + ValkeyFrame::BulkString(address.field), + ValkeyFrame::BulkString(encoded.into()), ])), response.received_from_source_or_sink_at, ))); diff --git a/shotover/src/transforms/redis/cluster_ports_rewrite.rs b/shotover/src/transforms/redis/cluster_ports_rewrite.rs index ab9a2452f..a35996cee 100644 --- a/shotover/src/transforms/redis/cluster_ports_rewrite.rs +++ b/shotover/src/transforms/redis/cluster_ports_rewrite.rs @@ -1,6 +1,6 @@ use crate::frame::Frame; use crate::frame::MessageType; -use crate::frame::RedisFrame; +use crate::frame::ValkeyFrame; use crate::message::{MessageIdMap, Messages}; use crate::transforms::DownChainProtocol; use crate::transforms::TransformContextBuilder; @@ -31,7 +31,7 @@ impl TransformConfig for RedisClusterPortsRewriteConfig { } fn up_chain_protocol(&self) -> UpChainProtocol { - UpChainProtocol::MustBeOneOf(vec![MessageType::Redis]) + UpChainProtocol::MustBeOneOf(vec![MessageType::Valkey]) } fn down_chain_protocol(&self) -> DownChainProtocol { @@ -127,14 +127,14 @@ impl Transform for RedisClusterPortsRewrite { /// Rewrites the ports of a response to a CLUSTER SLOTS message to `new_port` fn rewrite_port_slot(frame: &mut Frame, new_port: u16) -> Result<()> { - if let Frame::Redis(RedisFrame::Array(array)) = frame { + if let Frame::Valkey(ValkeyFrame::Array(array)) = frame { for elem in array.iter_mut() { - if let RedisFrame::Array(slot) = elem { + if let ValkeyFrame::Array(slot) = elem { for (index, mut frame) in slot.iter_mut().enumerate() { match (index, &mut frame) { (0..=1, _) => {} - (_, RedisFrame::Array(target)) => match target.as_mut_slice() { - [RedisFrame::BulkString(_ip), RedisFrame::Integer(port), ..] => { + (_, ValkeyFrame::Array(target)) => match target.as_mut_slice() { + [ValkeyFrame::BulkString(_ip), ValkeyFrame::Integer(port), ..] => { *port = new_port.into(); } _ => bail!("expected host-port in slot map but was: {:?}", frame), @@ -151,14 +151,14 @@ fn rewrite_port_slot(frame: &mut Frame, new_port: u16) -> Result<()> { /// Get a mutable reference to the CSV string inside a response to CLUSTER NODES or REPLICAS fn get_buffer(frame: &mut Frame) -> Option<&mut Bytes> { // CLUSTER NODES - if let Frame::Redis(RedisFrame::BulkString(buf)) = frame { + if let Frame::Valkey(ValkeyFrame::BulkString(buf)) = frame { return Some(buf); } // CLUSTER REPLICAS - if let Frame::Redis(RedisFrame::Array(array)) = frame { + if let Frame::Valkey(ValkeyFrame::Array(array)) = frame { for item in array.iter_mut() { - if let RedisFrame::BulkString(buf) = item { + if let ValkeyFrame::BulkString(buf) = item { return Some(buf); } } @@ -225,9 +225,9 @@ fn rewrite_port_node(frame: &mut Frame, new_port: u16) -> Result<()> { /// Determines if the supplied Redis Frame is a `CLUSTER NODES` request /// or `CLUSTER REPLICAS` which returns the same response as `CLUSTER NODES` fn is_cluster_nodes(frame: &Frame) -> bool { - if let Frame::Redis(RedisFrame::Array(array)) = frame { + if let Frame::Valkey(ValkeyFrame::Array(array)) = frame { match array.as_slice() { - [RedisFrame::BulkString(one), RedisFrame::BulkString(two), ..] => { + [ValkeyFrame::BulkString(one), ValkeyFrame::BulkString(two), ..] => { one.eq_ignore_ascii_case(b"CLUSTER") && (two.eq_ignore_ascii_case(b"NODES") || two.eq_ignore_ascii_case(b"REPLICAS")) } @@ -240,9 +240,9 @@ fn is_cluster_nodes(frame: &Frame) -> bool { /// Determines if the supplied Redis Frame is a `CLUSTER SLOTS` request fn is_cluster_slots(frame: &Frame) -> bool { - if let Frame::Redis(RedisFrame::Array(array)) = frame { + if let Frame::Valkey(ValkeyFrame::Array(array)) = frame { match array.as_slice() { - [RedisFrame::BulkString(one), RedisFrame::BulkString(two), ..] => { + [ValkeyFrame::BulkString(one), ValkeyFrame::BulkString(two), ..] => { one.eq_ignore_ascii_case(b"CLUSTER") && two.eq_ignore_ascii_case(b"SLOTS") } [..] => false, @@ -255,7 +255,7 @@ fn is_cluster_slots(frame: &Frame) -> bool { #[cfg(test)] mod test { use super::*; - use crate::codec::redis::RedisDecoder; + use crate::codec::redis::ValkeyDecoder; use crate::codec::Direction; use crate::transforms::redis::sink_cluster::parse_slots; use pretty_assertions::assert_eq; @@ -271,16 +271,16 @@ mod test { ]; for combo in combos { - let frame = Frame::Redis(RedisFrame::Array(vec![ - RedisFrame::BulkString(Bytes::from_static(combo.0)), - RedisFrame::BulkString(Bytes::from_static(combo.1)), + let frame = Frame::Valkey(ValkeyFrame::Array(vec![ + ValkeyFrame::BulkString(Bytes::from_static(combo.0)), + ValkeyFrame::BulkString(Bytes::from_static(combo.1)), ])); assert!(is_cluster_slots(&frame)); } - let frame = Frame::Redis(RedisFrame::Array(vec![ - RedisFrame::BulkString(Bytes::from_static(b"GET")), - RedisFrame::BulkString(Bytes::from_static(b"key1")), + let frame = Frame::Valkey(ValkeyFrame::Array(vec![ + ValkeyFrame::BulkString(Bytes::from_static(b"GET")), + ValkeyFrame::BulkString(Bytes::from_static(b"key1")), ])); assert!(!is_cluster_slots(&frame)); @@ -302,16 +302,16 @@ mod test { ]; for combo in combos { - let frame = Frame::Redis(RedisFrame::Array(vec![ - RedisFrame::BulkString(Bytes::from_static(combo.0)), - RedisFrame::BulkString(Bytes::from_static(combo.1)), + let frame = Frame::Valkey(ValkeyFrame::Array(vec![ + ValkeyFrame::BulkString(Bytes::from_static(combo.0)), + ValkeyFrame::BulkString(Bytes::from_static(combo.1)), ])); assert!(is_cluster_nodes(&frame)); } - let frame = Frame::Redis(RedisFrame::Array(vec![ - RedisFrame::BulkString(Bytes::from_static(b"GET")), - RedisFrame::BulkString(Bytes::from_static(b"key1")), + let frame = Frame::Valkey(ValkeyFrame::Array(vec![ + ValkeyFrame::BulkString(Bytes::from_static(b"GET")), + ValkeyFrame::BulkString(Bytes::from_static(b"key1")), ])); assert!(!is_cluster_nodes(&frame)); @@ -320,7 +320,7 @@ mod test { #[test] fn test_rewrite_port_slots() { let slots_pcap: &[u8] = b"*3\r\n*4\r\n:10923\r\n:16383\r\n*3\r\n$12\r\n192.168.80.6\r\n:6379\r\n$40\r\n3a7c357ed75d2aa01fca1e14ef3735a2b2b8ffac\r\n*3\r\n$12\r\n192.168.80.3\r\n:6379\r\n$40\r\n77c01b0ddd8668fff05e3f6a8aaf5f3ccd454a79\r\n*4\r\n:5461\r\n:10922\r\n*3\r\n$12\r\n192.168.80.5\r\n:6379\r\n$40\r\n969c6215d064e68593d384541ceeb57e9520dbed\r\n*3\r\n$12\r\n192.168.80.2\r\n:6379\r\n$40\r\n3929f69990a75be7b2d49594c57fe620862e6fd6\r\n*4\r\n:0\r\n:5460\r\n*3\r\n$12\r\n192.168.80.7\r\n:6379\r\n$40\r\n15d52a65d1fc7a53e34bf9193415aa39136882b2\r\n*3\r\n$12\r\n192.168.80.4\r\n:6379\r\n$40\r\ncd023916a3528fae7e606a10d8289a665d6c47b0\r\n"; - let mut codec = RedisDecoder::new(None, Direction::Sink); + let mut codec = ValkeyDecoder::new(None, Direction::Sink); let mut message = codec .decode(&mut slots_pcap.into()) .unwrap() @@ -331,7 +331,7 @@ mod test { rewrite_port_slot(message.frame().unwrap(), 6380).unwrap(); let slots_frames = match message.frame().unwrap() { - Frame::Redis(RedisFrame::Array(frames)) => frames, + Frame::Valkey(ValkeyFrame::Array(frames)) => frames, frame => panic!("bad input: {frame:?}"), }; @@ -385,12 +385,12 @@ c852007a1c3b726534e6866456c1f2002fc442d9 172.31.0.6:1234@16379 myself,master - 0 f9553ea7fc23905476efec1f949b4b3e41a44103 :1234@0 slave,noaddr c852007a1c3b726534e6866456c1f2002fc442d9 1634273478445 1634273478445 3 disconnected "; - let mut raw_frame = Frame::Redis(RedisFrame::BulkString(Bytes::from_static(bulk_string))); + let mut raw_frame = Frame::Valkey(ValkeyFrame::BulkString(Bytes::from_static(bulk_string))); rewrite_port_node(&mut raw_frame, 1234).unwrap(); assert_eq!( raw_frame, - Frame::Redis(RedisFrame::BulkString(Bytes::from_static(expected_string))) + Frame::Valkey(ValkeyFrame::BulkString(Bytes::from_static(expected_string))) ); } } diff --git a/shotover/src/transforms/redis/sink_cluster.rs b/shotover/src/transforms/redis/sink_cluster.rs index 1b251bb8b..dc8fb4ff2 100644 --- a/shotover/src/transforms/redis/sink_cluster.rs +++ b/shotover/src/transforms/redis/sink_cluster.rs @@ -1,6 +1,6 @@ -use crate::codec::redis::RedisCodecBuilder; +use crate::codec::redis::ValkeyCodecBuilder; use crate::codec::{CodecBuilder, Direction}; -use crate::frame::{Frame, MessageType, RedisFrame}; +use crate::frame::{Frame, MessageType, ValkeyFrame}; use crate::message::{Message, Messages}; use crate::tls::TlsConnectorConfig; use crate::transforms::redis::RedisError; @@ -57,7 +57,7 @@ impl TransformConfig for RedisSinkClusterConfig { ) -> Result> { let connection_pool = ConnectionPool::new_with_auth( Duration::from_millis(self.connect_timeout_ms), - RedisCodecBuilder::new(Direction::Sink, "RedisSinkCluster".to_owned()), + ValkeyCodecBuilder::new(Direction::Sink, "RedisSinkCluster".to_owned()), RedisAuthenticator {}, self.tls.clone(), )?; @@ -72,7 +72,7 @@ impl TransformConfig for RedisSinkClusterConfig { } fn up_chain_protocol(&self) -> UpChainProtocol { - UpChainProtocol::MustBeOneOf(vec![MessageType::Redis]) + UpChainProtocol::MustBeOneOf(vec![MessageType::Valkey]) } fn down_chain_protocol(&self) -> DownChainProtocol { @@ -84,7 +84,7 @@ pub struct RedisSinkClusterBuilder { first_contact_points: Vec, direct_destination: Option, connection_count: usize, - connection_pool: ConnectionPool, + connection_pool: ConnectionPool, shared_topology: Arc>, failed_requests: Counter, } @@ -95,7 +95,7 @@ impl RedisSinkClusterBuilder { direct_destination: Option, connection_count: usize, connection_pool: ConnectionPool< - RedisCodecBuilder, + ValkeyCodecBuilder, RedisAuthenticator, UsernamePasswordToken, >, @@ -157,7 +157,7 @@ pub struct RedisSinkCluster { load_scores: HashMap<(String, usize), usize>, rng: SmallRng, connection_count: usize, - connection_pool: ConnectionPool, + connection_pool: ConnectionPool, reason_for_no_nodes: Option<&'static str>, rebuild_connections: bool, first_contact_points: Vec, @@ -173,7 +173,7 @@ impl RedisSinkCluster { connection_count: usize, shared_topology: Arc>, connection_pool: ConnectionPool< - RedisCodecBuilder, + ValkeyCodecBuilder, RedisAuthenticator, UsernamePasswordToken, >, @@ -219,8 +219,8 @@ impl RedisSinkCluster { #[inline] async fn dispatch_message(&mut self, mut message: Message) -> Result { let command = match message.frame() { - Some(Frame::Redis(RedisFrame::Array(ref command))) => command, - None => bail!("Failed to parse redis frame"), + Some(Frame::Valkey(ValkeyFrame::Array(ref command))) => command, + None => bail!("Failed to parse valkey frame"), message => bail!("syntax error: bad command: {message:?}"), }; @@ -280,7 +280,7 @@ impl RedisSinkCluster { Ok(Box::pin(async move { let response = responses .fold(None, |acc, response| async move { - if let Some((_, RedisFrame::Error(_))) = acc { + if let Some((_, ValkeyFrame::Error(_))) = acc { acc } else { match response { @@ -290,7 +290,7 @@ impl RedisSinkCluster { }) => Some(( message.received_from_source_or_sink_at, match message.frame().unwrap() { - Frame::Redis(frame) => { + Frame::Valkey(frame) => { let new_frame = frame.take(); match acc { Some((_, prev_frame)) => routing_info @@ -299,13 +299,15 @@ impl RedisSinkCluster { None => new_frame, } } - _ => unreachable!("direct response from a redis sink"), + _ => unreachable!("direct response from a valkey sink"), }, )), Ok(Response { response: Err(e), .. - }) => Some((None, RedisFrame::Error(e.to_string().into()))), - Err(e) => Some((None, RedisFrame::Error(e.to_string().into()))), + }) => Some((None, ValkeyFrame::Error(e.to_string().into()))), + Err(e) => { + Some((None, ValkeyFrame::Error(e.to_string().into()))) + } } } }) @@ -314,7 +316,7 @@ impl RedisSinkCluster { let (received_at, response) = response.unwrap(); Ok(Response { response: Ok(Message::from_frame_at_instant( - Frame::Redis(response), + Frame::Valkey(response), received_at, )), }) @@ -559,13 +561,13 @@ impl RedisSinkCluster { } RoutingInfo::Auth => self.on_auth(message).await, RoutingInfo::Unsupported => { - short_circuit(RedisFrame::Error( + short_circuit(ValkeyFrame::Error( Str::from_inner(Bytes::from_static(b"ERR unknown command - Shotover RedisSinkCluster does not not support this command")).unwrap(), )) } - RoutingInfo::ShortCircuitNil => short_circuit(RedisFrame::Null), + RoutingInfo::ShortCircuitNil => short_circuit(ValkeyFrame::Null), RoutingInfo::ShortCircuitOk => { - short_circuit(RedisFrame::SimpleString(Bytes::from_static(b"OK"))) + short_circuit(ValkeyFrame::SimpleString(Bytes::from_static(b"OK"))) } } } @@ -595,13 +597,13 @@ impl RedisSinkCluster { async fn on_auth(&mut self, mut message: Message) -> Result { let command = match message.frame() { - Some(Frame::Redis(RedisFrame::Array(ref command))) => command, - None => bail!("Failed to parse redis frame"), + Some(Frame::Valkey(ValkeyFrame::Array(ref command))) => command, + None => bail!("Failed to parse valkey frame"), message => bail!("syntax error: bad command: {message:?}"), }; let mut args = command.iter().skip(1).rev().map(|f| match f { - RedisFrame::BulkString(s) => Ok(s), + ValkeyFrame::BulkString(s) => Ok(s), _ => bail!("syntax error: expected bulk string"), }); @@ -619,7 +621,7 @@ impl RedisSinkCluster { let token = UsernamePasswordToken { username, password }; match self.build_connections(Some(token)).await { - Ok(()) => short_circuit(RedisFrame::SimpleString("OK".into())), + Ok(()) => short_circuit(ValkeyFrame::SimpleString("OK".into())), Err(TransformError::Upstream(RedisError::BadCredentials)) => { self.send_error_response("WRONGPASS invalid username-password") } @@ -634,13 +636,13 @@ impl RedisSinkCluster { #[inline(always)] fn send_error_response(&self, message: &str) -> Result { self.failed_requests.increment(1); - short_circuit(RedisFrame::Error(message.into())) + short_circuit(ValkeyFrame::Error(message.into())) } // TODO: calls to this function should be completely replaced with calls to short_circuit that provide more specific error messages fn short_circuit_with_error(&self) -> Result { warn!("Could not route request - short circuiting"); - short_circuit(RedisFrame::Error( + short_circuit(ValkeyFrame::Error( "ERR Shotover RedisSinkCluster does not not support this command used in this way" .into(), )) @@ -721,9 +723,9 @@ pub enum ResponseJoin { impl RoutingInfo { #[inline(always)] - pub fn for_command_frame(args: &[RedisFrame]) -> Result { + pub fn for_command_frame(args: &[ValkeyFrame]) -> Result { let command_name = match args.first() { - Some(RedisFrame::BulkString(command_name)) => command_name.to_ascii_uppercase(), + Some(ValkeyFrame::BulkString(command_name)) => command_name.to_ascii_uppercase(), _ => bail!("syntax error: bad command name"), }; @@ -736,7 +738,7 @@ impl RoutingInfo { // // With this implementation, running `KEYS *` in a large production environment could OoM shotover. // But I assume in such an environment the nodes would be configured to not allow running KEYS. - // So each redis node would return an error and then we would return a single error to the client which would be fine. + // So each valkey node would return an error and then we would return a single error to the client which would be fine. b"KEYS" => RoutingInfo::AllMasters(ResponseJoin::ArrayJoin), // The LASTSAVE command is needed to confirm that a previous BGSAVE command has succeed. // In order to maintain this use case we query every node and return the oldest save time. @@ -748,19 +750,19 @@ impl RoutingInfo { RoutingInfo::AllNodes(ResponseJoin::First) } b"SCRIPT" => match args.get(1) { - Some(RedisFrame::BulkString(a)) if a.eq_ignore_ascii_case(b"KILL") => { + Some(ValkeyFrame::BulkString(a)) if a.eq_ignore_ascii_case(b"KILL") => { RoutingInfo::Unsupported } _ => RoutingInfo::AllMasters(ResponseJoin::First), }, // * We cant reasonably support CLIENT SETNAME/GETNAME in shotover - // - Connections are pooled so we cant forward it to the redis node + // - Connections are pooled so we cant forward it to the valkey node // - We cant just implement the functionality shotover side because the connection names are supposed to be viewable from CLIENT LIST // * However we dont want to just fail them either as clients like jedis would break so: // - We just pretend to accept CLIENT SETNAME returning an "OK" but without hitting any nodes // - We just pretend to handle CLIENT GETNAME always returning nil without hitting any nodes b"CLIENT" => match args.get(1) { - Some(RedisFrame::BulkString(sub_command)) => { + Some(ValkeyFrame::BulkString(sub_command)) => { match sub_command.to_ascii_uppercase().as_slice() { b"SETNAME" => RoutingInfo::ShortCircuitOk, b"GETNAME" => RoutingInfo::ShortCircuitNil, @@ -773,7 +775,7 @@ impl RoutingInfo { b"SCAN" | b"SHUTDOWN" | b"SLAVEOF" | b"REPLICAOF" | b"MOVE" | b"BITOP" | b"CONFIG" | b"SLOWLOG" | b"INFO" | b"TIME" => RoutingInfo::Unsupported, b"EVALSHA" | b"EVAL" => match args.get(2) { - Some(RedisFrame::BulkString(key_count)) => { + Some(ValkeyFrame::BulkString(key_count)) => { if key_count.as_ref() == b"0" { RoutingInfo::Random } else { @@ -795,7 +797,7 @@ impl RoutingInfo { b"XREAD" | b"XREADGROUP" => args .iter() .position(|a| match a { - RedisFrame::BulkString(a) => a.eq_ignore_ascii_case(b"STREAMS"), + ValkeyFrame::BulkString(a) => a.eq_ignore_ascii_case(b"STREAMS"), _ => false, }) .and_then(|streams_position| { @@ -805,7 +807,7 @@ impl RoutingInfo { .unwrap_or(RoutingInfo::Unsupported), b"AUTH" => RoutingInfo::Auth, // These are stateless commands that return a response. - // We just need a single redis node to handle this for us so shotover can pretend to be a single node. + // We just need a single valkey node to handle this for us so shotover can pretend to be a single node. // So we just pick a node at random. b"ECHO" | b"PING" => RoutingInfo::Random, b"HELLO" => RoutingInfo::Unsupported, @@ -817,8 +819,8 @@ impl RoutingInfo { } #[inline(always)] - pub fn for_key(key: &RedisFrame) -> Option { - if let RedisFrame::BulkString(key) = key { + pub fn for_key(key: &ValkeyFrame) -> Option { + if let ValkeyFrame::BulkString(key) = key { let key = get_hashtag(key).unwrap_or(key); Some(RoutingInfo::Slot( crc16::State::::calculate(key) % SLOT_SIZE as u16, @@ -838,26 +840,26 @@ impl RoutingInfo { } impl ResponseJoin { - pub fn join(&self, prev_frame: RedisFrame, next_frame: RedisFrame) -> RedisFrame { + pub fn join(&self, prev_frame: ValkeyFrame, next_frame: ValkeyFrame) -> ValkeyFrame { match self { ResponseJoin::IntegerMin => match (prev_frame, next_frame) { - (RedisFrame::Integer(prev), RedisFrame::Integer(next)) => { - RedisFrame::Integer(prev.min(next)) + (ValkeyFrame::Integer(prev), ValkeyFrame::Integer(next)) => { + ValkeyFrame::Integer(prev.min(next)) } - _ => RedisFrame::Error("One of the redis frames was not an integer".into()), + _ => ValkeyFrame::Error("One of the valkey frames was not an integer".into()), }, ResponseJoin::IntegerSum => match (prev_frame, next_frame) { - (RedisFrame::Integer(prev), RedisFrame::Integer(next)) => { - RedisFrame::Integer(prev + next) + (ValkeyFrame::Integer(prev), ValkeyFrame::Integer(next)) => { + ValkeyFrame::Integer(prev + next) } - _ => RedisFrame::Error("One of the redis frames was not an integer".into()), + _ => ValkeyFrame::Error("One of the valkey frames was not an integer".into()), }, ResponseJoin::ArrayJoin => match (prev_frame, next_frame) { - (RedisFrame::Array(mut prev), RedisFrame::Array(next)) => { + (ValkeyFrame::Array(mut prev), ValkeyFrame::Array(next)) => { prev.extend(next); - RedisFrame::Array(prev) + ValkeyFrame::Array(prev) } - _ => RedisFrame::Error("One of the redis frames was not an array".into()), + _ => ValkeyFrame::Error("One of the valkey frames was not an array".into()), }, ResponseJoin::First => prev_frame, } @@ -865,7 +867,7 @@ impl ResponseJoin { } fn build_slot_to_server( - frames: &[RedisFrame], + frames: &[ValkeyFrame], slot_entries: &mut Vec<(String, u16, u16)>, start: u16, end: u16, @@ -873,7 +875,7 @@ fn build_slot_to_server( ensure!(start <= end, "invalid slot range: {}-{}", start, end); ensure!(frames.len() >= 2, "expected at least two fields"); - let ip = if let RedisFrame::BulkString(ref ip) = frames[0] { + let ip = if let ValkeyFrame::BulkString(ref ip) = frames[0] { std::str::from_utf8(ip.as_ref()).context("Failed to parse IP address as utf8")? } else { bail!("unexpected type for ip"); @@ -884,7 +886,7 @@ fn build_slot_to_server( return Ok(()); } - let port = if let RedisFrame::Integer(port) = frames[1] { + let port = if let ValkeyFrame::Integer(port) = frames[1] { port } else { bail!("unexpected type for port"); @@ -895,25 +897,25 @@ fn build_slot_to_server( Ok(()) } -pub fn parse_slots(results: &[RedisFrame]) -> Result { +pub fn parse_slots(results: &[ValkeyFrame]) -> Result { let mut master_entries: Vec<(String, u16, u16)> = vec![]; let mut replica_entries: Vec<(String, u16, u16)> = vec![]; for result in results { match result { - RedisFrame::Array(result) => { + ValkeyFrame::Array(result) => { let mut start: u16 = 0; let mut end: u16 = 0; for (index, item) in result.iter().enumerate() { match (index, item) { - (0, RedisFrame::Integer(i)) => start = *i as u16, - (1, RedisFrame::Integer(i)) => end = *i as u16, - (2, RedisFrame::Array(master)) => { + (0, ValkeyFrame::Integer(i)) => start = *i as u16, + (1, ValkeyFrame::Integer(i)) => end = *i as u16, + (2, ValkeyFrame::Array(master)) => { build_slot_to_server(master, &mut master_entries, start, end) .context("failed to decode master slots")? } - (_, RedisFrame::Array(replica)) => { + (_, ValkeyFrame::Array(replica)) => { build_slot_to_server(replica, &mut replica_entries, start, end) .context("failed to decode replica slots")?; } @@ -937,17 +939,17 @@ async fn get_topology_from_node( ) -> Result { let return_chan_rx = send_message_request( &sender, - Message::from_frame(Frame::Redis(RedisFrame::Array(vec![ - RedisFrame::BulkString("CLUSTER".into()), - RedisFrame::BulkString("SLOTS".into()), + Message::from_frame(Frame::Valkey(ValkeyFrame::Array(vec![ + ValkeyFrame::BulkString("CLUSTER".into()), + ValkeyFrame::BulkString("SLOTS".into()), ]))), )?; match receive_frame_response(return_chan_rx).await? { - RedisFrame::Array(results) => { + ValkeyFrame::Array(results) => { parse_slots(&results).map_err(|e| TransformError::Protocol(e.to_string())) } - RedisFrame::Error(message) => { + ValkeyFrame::Error(message) => { Err(TransformError::Upstream(RedisError::from_message(&message))) } frame => Err(TransformError::Protocol(format!( @@ -985,24 +987,24 @@ fn send_message_request( } #[inline(always)] -async fn receive_frame_response(receiver: oneshot::Receiver) -> Result { +async fn receive_frame_response(receiver: oneshot::Receiver) -> Result { let Response { response, .. } = receiver.await?; match response?.frame() { - Some(Frame::Redis(frame)) => Ok(frame.take()), - None => Err(anyhow!("Failed to parse redis frame")), - response => Err(anyhow!("Unexpected redis response: {response:?}")), + Some(Frame::Valkey(frame)) => Ok(frame.take()), + None => Err(anyhow!("Failed to parse valkey frame")), + response => Err(anyhow!("Unexpected valkey response: {response:?}")), } } -fn short_circuit(frame: RedisFrame) -> Result { +fn short_circuit(frame: ValkeyFrame) -> Result { let (one_tx, one_rx) = oneshot::channel::(); one_tx .send(Response { - response: Ok(Message::from_frame(Frame::Redis(frame))), + response: Ok(Message::from_frame(Frame::Valkey(frame))), }) - .map_err(|_| anyhow!("Failed to send short circuited redis frame"))?; + .map_err(|_| anyhow!("Failed to send short circuited valkey frame"))?; Ok(Box::pin(async { one_rx @@ -1024,18 +1026,18 @@ impl Transform for RedisSinkCluster { if !self.has_run_init { self.topology = (*self.shared_topology.read().await).clone(); if self.topology.channels.is_empty() { - // The code paths for authenticated and unauthenticated redis are quite different. - // * For unauthenticated redis this initial build_connections should succeed. + // The code paths for authenticated and unauthenticated valkey are quite different. + // * For unauthenticated valkey this initial build_connections should succeed. // + This is required to process the messages we are about to receive. // + We also share the results to skip having to run build_connections again for new connection - // * For authenticated redis this initial build_connections always fails + // * For authenticated valkey this initial build_connections always fails // + The first message to come through should be an AUTH command which will give us the credentials required for us to run build_connections. // As soon as we receive it we will rerun build_connections so we can process other message types afterwards. - // + It is important we do not share the results of the successful build_connections as that would leak authenticated shotover<->redis connections to other client<->shotover connections. + // + It is important we do not share the results of the successful build_connections as that would leak authenticated shotover<->valkey connections to other client<->shotover connections. if let Err(err) = self.build_connections(self.token.clone()).await { match err { TransformError::Upstream(RedisError::NotAuthenticated) => { - // Build_connections sent an internal `CLUSTER SLOTS` command to redis and redis refused to respond because it is enforcing authentication. + // Build_connections sent an internal `CLUSTER SLOTS` command to valkey and valkey refused to respond because it is enforcing authentication. // When the client sends an AUTH message we will rerun build_connections. } _ => tracing::warn!("Error when building connections: {err:?}"), @@ -1058,7 +1060,7 @@ impl Transform for RedisSinkCluster { for message in chain_state.requests.drain(..) { responses.push_back(match self.dispatch_message(message).await { Ok(response) => response, - Err(e) => short_circuit(RedisFrame::Error(format!("ERR {e}").into())).unwrap(), + Err(e) => short_circuit(ValkeyFrame::Error(format!("ERR {e}").into())).unwrap(), }) } @@ -1071,7 +1073,7 @@ impl Transform for RedisSinkCluster { trace!("Got resp {:?}", s); let Response { response } = s.or_else(|e| -> Result { Ok(Response { - response: Ok(Message::from_frame(Frame::Redis(RedisFrame::Error( + response: Ok(Message::from_frame(Frame::Valkey(ValkeyFrame::Error( format!("ERR Could not route request - {e}").into(), )))), }) @@ -1079,7 +1081,7 @@ impl Transform for RedisSinkCluster { let mut response = response?; match response.frame() { - Some(Frame::Redis(frame)) => { + Some(Frame::Valkey(frame)) => { match Redirection::parse(frame) { Some(Redirection::Moved { slot, server }) => { debug!("Got MOVE {} {}", slot, server); @@ -1120,9 +1122,9 @@ enum Redirection { } impl Redirection { - fn parse(frame: &RedisFrame) -> Option { + fn parse(frame: &ValkeyFrame) -> Option { match frame { - RedisFrame::Error(err) => { + ValkeyFrame::Error(err) => { let mut tokens = err.split(' '); match tokens.next()? { "MOVED" => Some(Redirection::Moved { @@ -1163,29 +1165,29 @@ impl Authenticator for RedisAuthenticator { sender: &mut UnboundedSender, token: &UsernamePasswordToken, ) -> Result<(), TransformError> { - let mut auth_args = vec![RedisFrame::BulkString(Bytes::from_static(b"AUTH"))]; + let mut auth_args = vec![ValkeyFrame::BulkString(Bytes::from_static(b"AUTH"))]; // Support non-ACL / username-less. if let Some(username) = &token.username { - auth_args.push(RedisFrame::BulkString(username.clone())); + auth_args.push(ValkeyFrame::BulkString(username.clone())); } - auth_args.push(RedisFrame::BulkString(token.password.clone())); + auth_args.push(ValkeyFrame::BulkString(token.password.clone())); let return_rx = send_message_request( sender, - Message::from_frame(Frame::Redis(RedisFrame::Array(auth_args))), + Message::from_frame(Frame::Valkey(ValkeyFrame::Array(auth_args))), )?; match receive_frame_response(return_rx).await? { - RedisFrame::SimpleString(s) if s == "OK" => { + ValkeyFrame::SimpleString(s) if s == "OK" => { trace!("authenticated upstream as user: {:?}", token.username); Ok(()) } - RedisFrame::SimpleString(s) => Err(TransformError::Protocol(format!( + ValkeyFrame::SimpleString(s) => Err(TransformError::Protocol(format!( "expected OK but got: {s:?}" ))), - RedisFrame::Error(e) => Err(TransformError::Upstream(RedisError::from_message(&e))), + ValkeyFrame::Error(e) => Err(TransformError::Upstream(RedisError::from_message(&e))), f => Err(TransformError::Protocol(format!( "unexpected response type: {f:?}" ))), @@ -1195,7 +1197,7 @@ impl Authenticator for RedisAuthenticator { #[cfg(test)] mod test { use super::*; - use crate::codec::redis::RedisDecoder; + use crate::codec::redis::ValkeyDecoder; use crate::codec::Direction; use pretty_assertions::assert_eq; use tokio_util::codec::Decoder; @@ -1205,7 +1207,7 @@ mod test { // Wireshark capture from a Redis cluster with 3 masters and 3 replicas. let slots_pcap: &[u8] = b"*3\r\n*4\r\n:10923\r\n:16383\r\n*3\r\n$12\r\n192.168.80.6\r\n:6379\r\n$40\r\n3a7c357ed75d2aa01fca1e14ef3735a2b2b8ffac\r\n*3\r\n$12\r\n192.168.80.3\r\n:6379\r\n$40\r\n77c01b0ddd8668fff05e3f6a8aaf5f3ccd454a79\r\n*4\r\n:5461\r\n:10922\r\n*3\r\n$12\r\n192.168.80.5\r\n:6379\r\n$40\r\n969c6215d064e68593d384541ceeb57e9520dbed\r\n*3\r\n$12\r\n192.168.80.2\r\n:6379\r\n$40\r\n3929f69990a75be7b2d49594c57fe620862e6fd6\r\n*4\r\n:0\r\n:5460\r\n*3\r\n$12\r\n192.168.80.7\r\n:6379\r\n$40\r\n15d52a65d1fc7a53e34bf9193415aa39136882b2\r\n*3\r\n$12\r\n192.168.80.4\r\n:6379\r\n$40\r\ncd023916a3528fae7e606a10d8289a665d6c47b0\r\n"; - let mut codec = RedisDecoder::new(None, Direction::Sink); + let mut codec = ValkeyDecoder::new(None, Direction::Sink); let mut message = codec .decode(&mut slots_pcap.into()) @@ -1215,7 +1217,7 @@ mod test { .unwrap(); let slots_frames = match message.frame().unwrap() { - Frame::Redis(RedisFrame::Array(frames)) => frames, + Frame::Valkey(ValkeyFrame::Array(frames)) => frames, frame => panic!("bad input: {frame:?}"), }; diff --git a/shotover/src/transforms/redis/sink_single.rs b/shotover/src/transforms/redis/sink_single.rs index bc842a16b..db8bfdbcc 100644 --- a/shotover/src/transforms/redis/sink_single.rs +++ b/shotover/src/transforms/redis/sink_single.rs @@ -1,13 +1,13 @@ use crate::codec::{CodecBuilder, Direction}; use crate::connection::SinkConnection; -use crate::frame::{Frame, MessageType, RedisFrame}; +use crate::frame::{Frame, MessageType, ValkeyFrame}; use crate::message::Messages; use crate::tls::{TlsConnector, TlsConnectorConfig}; use crate::transforms::{ ChainState, DownChainProtocol, Transform, TransformBuilder, TransformConfig, TransformContextBuilder, UpChainProtocol, }; -use crate::{codec::redis::RedisCodecBuilder, transforms::TransformContextConfig}; +use crate::{codec::redis::ValkeyCodecBuilder, transforms::TransformContextConfig}; use anyhow::Result; use async_trait::async_trait; use metrics::{counter, Counter}; @@ -44,7 +44,7 @@ impl TransformConfig for RedisSinkSingleConfig { } fn up_chain_protocol(&self) -> UpChainProtocol { - UpChainProtocol::MustBeOneOf(vec![MessageType::Redis]) + UpChainProtocol::MustBeOneOf(vec![MessageType::Valkey]) } fn down_chain_protocol(&self) -> DownChainProtocol { @@ -119,7 +119,7 @@ impl Transform for RedisSinkSingle { chain_state: &'shorter mut ChainState<'longer>, ) -> Result { if self.connection.is_none() { - let codec = RedisCodecBuilder::new(Direction::Sink, "RedisSinkSingle".to_owned()); + let codec = ValkeyCodecBuilder::new(Direction::Sink, "RedisSinkSingle".to_owned()); self.connection = Some( SinkConnection::new( &self.address, @@ -144,7 +144,7 @@ impl Transform for RedisSinkSingle { .try_recv_into(&mut responses) { for response in &mut responses { - if let Some(Frame::Redis(RedisFrame::Error(_))) = response.frame() { + if let Some(Frame::Valkey(ValkeyFrame::Error(_))) = response.frame() { self.failed_requests.increment(1); } } @@ -166,7 +166,7 @@ impl Transform for RedisSinkSingle { .await?; for response in &mut responses[responses_len_old..] { - if let Some(Frame::Redis(RedisFrame::Error(_))) = response.frame() { + if let Some(Frame::Valkey(ValkeyFrame::Error(_))) = response.frame() { self.failed_requests.increment(1); } if response.request_id().is_some() { diff --git a/shotover/src/transforms/tee.rs b/shotover/src/transforms/tee.rs index 8f2c81d80..8b7bb2ab2 100644 --- a/shotover/src/transforms/tee.rs +++ b/shotover/src/transforms/tee.rs @@ -609,7 +609,7 @@ mod tests { let transform_context_config = TransformContextConfig { chain_name: "".into(), - up_chain_protocol: MessageType::Redis, + up_chain_protocol: MessageType::Valkey, }; let transform = config.get_builder(transform_context_config).await.unwrap(); let result = transform.validate(); @@ -628,7 +628,7 @@ mod tests { let transform_context_config = TransformContextConfig { chain_name: "".into(), - up_chain_protocol: MessageType::Redis, + up_chain_protocol: MessageType::Valkey, }; let transform = config.get_builder(transform_context_config).await.unwrap(); let result = transform.validate().join("\n"); @@ -649,7 +649,7 @@ mod tests { }; let transform_context_config = TransformContextConfig { chain_name: "".into(), - up_chain_protocol: MessageType::Redis, + up_chain_protocol: MessageType::Valkey, }; let transform = config.get_builder(transform_context_config).await.unwrap(); let result = transform.validate(); @@ -667,7 +667,7 @@ mod tests { }; let transform_context_config = TransformContextConfig { chain_name: "".into(), - up_chain_protocol: MessageType::Redis, + up_chain_protocol: MessageType::Valkey, }; let transform = config.get_builder(transform_context_config).await.unwrap(); let result = transform.validate(); @@ -688,7 +688,7 @@ mod tests { let transform_context_config = TransformContextConfig { chain_name: "".into(), - up_chain_protocol: MessageType::Redis, + up_chain_protocol: MessageType::Valkey, }; let transform = config.get_builder(transform_context_config).await.unwrap(); let result = transform.validate().join("\n"); @@ -712,7 +712,7 @@ mod tests { let transform_context_config = TransformContextConfig { chain_name: "".into(), - up_chain_protocol: MessageType::Redis, + up_chain_protocol: MessageType::Valkey, }; let transform = config.get_builder(transform_context_config).await.unwrap(); let result = transform.validate(); diff --git a/shotover/src/transforms/util/cluster_connection_pool.rs b/shotover/src/transforms/util/cluster_connection_pool.rs index 3f010f824..d4c71cfbd 100644 --- a/shotover/src/transforms/util/cluster_connection_pool.rs +++ b/shotover/src/transforms/util/cluster_connection_pool.rs @@ -308,7 +308,7 @@ async fn rx_process( #[cfg(all(test, feature = "redis"))] mod test { use super::spawn_read_write_tasks; - use crate::codec::redis::RedisCodecBuilder; + use crate::codec::redis::ValkeyCodecBuilder; use crate::codec::{CodecBuilder, Direction}; use std::mem; use std::time::Duration; @@ -340,7 +340,7 @@ mod test { let stream = TcpStream::connect(("127.0.0.1", port)).await.unwrap(); let (rx, tx) = stream.into_split(); - let codec = RedisCodecBuilder::new(Direction::Sink, "redis".to_owned()); + let codec = ValkeyCodecBuilder::new(Direction::Sink, "valkey".to_owned()); let sender = spawn_read_write_tasks(&codec, rx, tx); assert!(remote.await.unwrap()); @@ -380,7 +380,7 @@ mod test { let stream = TcpStream::connect(("127.0.0.1", port)).await.unwrap(); let (rx, tx) = stream.into_split(); - let codec = RedisCodecBuilder::new(Direction::Sink, "redis".to_owned()); + let codec = ValkeyCodecBuilder::new(Direction::Sink, "valkey".to_owned()); // Drop sender immediately. std::mem::drop(spawn_read_write_tasks(&codec, rx, tx));