Skip to content

Commit

Permalink
Merge branch 'main' into scram_over_mtls_tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Jun 13, 2024
2 parents 4ccda32 + 796e346 commit 0f3d3a2
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 24 deletions.
26 changes: 26 additions & 0 deletions shotover-proxy/tests/redis_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::thread::sleep;
use std::time::Duration;
use test_helpers::connection::redis_connection;
use test_helpers::docker_compose::docker_compose;
use test_helpers::metrics::assert_metrics_key_value;
use test_helpers::shotover_process::{Count, EventMatcher, Level};

pub mod assert;
Expand Down Expand Up @@ -44,6 +45,8 @@ async fn passthrough_standard() {
shotover
.shutdown_and_then_consume_events(&[invalid_frame_event()])
.await;

assert_failed_requests_metric_is_incremented_on_error_response().await;
}

#[tokio::test(flavor = "multi_thread")]
Expand Down Expand Up @@ -321,3 +324,26 @@ async fn cluster_dr() {

shotover.shutdown_and_then_consume_events(&[]).await;
}

pub async fn assert_failed_requests_metric_is_incremented_on_error_response() {
let shotover = shotover_process("tests/test-configs/redis/passthrough/topology.yaml")
.start()
.await;
let mut connection = redis_connection::new_async("127.0.0.1", 6379).await;

redis::cmd("INVALID_COMMAND")
.arg("foo")
.query_async::<_, ()>(&mut connection)
.await
.unwrap_err();

// Redis client driver initialization sends 2 CLIENT SETINFO commands which trigger 2 errors
// because those commands are not available in the currently used redis version.
assert_metrics_key_value(
r#"shotover_failed_requests_count{chain="redis",transform="RedisSinkSingle"}"#,
"3",
)
.await;

shotover.shutdown_and_then_consume_events(&[]).await;
}
64 changes: 40 additions & 24 deletions shotover/src/transforms/redis/sink_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use futures::stream::FuturesOrdered;
use futures::stream::FuturesUnordered;
use futures::{StreamExt, TryFutureExt};
use itertools::Itertools;
use metrics::counter;
use metrics::{counter, Counter};
use rand::rngs::SmallRng;
use rand::seq::IteratorRandom;
use rand::SeedableRng;
Expand Down Expand Up @@ -61,14 +61,14 @@ impl TransformConfig for RedisSinkClusterConfig {
RedisAuthenticator {},
self.tls.clone(),
)?;
Ok(Box::new(RedisSinkClusterBuilder {
first_contact_points: self.first_contact_points.clone(),
direct_destination: self.direct_destination.clone(),
connection_count: self.connection_count.unwrap_or(1),
Ok(Box::new(RedisSinkClusterBuilder::new(
self.first_contact_points.clone(),
self.direct_destination.clone(),
self.connection_count.unwrap_or(1),
connection_pool,
chain_name: transform_context.chain_name,
shared_topology: Arc::new(RwLock::new(Topology::new())),
}))
transform_context.chain_name,
Arc::new(RwLock::new(Topology::new())),
)))
}

fn up_chain_protocol(&self) -> UpChainProtocol {
Expand All @@ -85,8 +85,32 @@ pub struct RedisSinkClusterBuilder {
direct_destination: Option<String>,
connection_count: usize,
connection_pool: ConnectionPool<RedisCodecBuilder, RedisAuthenticator, UsernamePasswordToken>,
chain_name: String,
shared_topology: Arc<RwLock<Topology>>,
failed_requests: Counter,
}

impl RedisSinkClusterBuilder {
fn new(
first_contact_points: Vec<String>,
direct_destination: Option<String>,
connection_count: usize,
connection_pool: ConnectionPool<
RedisCodecBuilder,
RedisAuthenticator,
UsernamePasswordToken,
>,
chain_name: String,
shared_topology: Arc<RwLock<Topology>>,
) -> Self {
RedisSinkClusterBuilder {
first_contact_points,
direct_destination,
connection_count,
connection_pool,
shared_topology,
failed_requests: counter!("shotover_failed_requests_count", "chain" => chain_name, "transform" => NAME),
}
}
}

impl TransformBuilder for RedisSinkClusterBuilder {
Expand All @@ -95,9 +119,9 @@ impl TransformBuilder for RedisSinkClusterBuilder {
self.first_contact_points.clone(),
self.direct_destination.clone(),
self.connection_count,
self.chain_name.clone(),
self.shared_topology.clone(),
self.connection_pool.clone(),
self.failed_requests.clone(),
))
}

Expand Down Expand Up @@ -126,7 +150,6 @@ impl Topology {
}

pub struct RedisSinkCluster {
chain_name: String,
has_run_init: bool,
topology: Topology,
shared_topology: Arc<RwLock<Topology>>,
Expand All @@ -140,23 +163,23 @@ pub struct RedisSinkCluster {
first_contact_points: Vec<String>,
direct_destination: Option<String>,
token: Option<UsernamePasswordToken>,
failed_requests: Counter,
}

impl RedisSinkCluster {
fn new(
first_contact_points: Vec<String>,
direct_destination: Option<String>,
connection_count: usize,
chain_name: String,
shared_topology: Arc<RwLock<Topology>>,
connection_pool: ConnectionPool<
RedisCodecBuilder,
RedisAuthenticator,
UsernamePasswordToken,
>,
failed_requests: Counter,
) -> Self {
let sink_cluster = RedisSinkCluster {
chain_name: chain_name.clone(),
RedisSinkCluster {
has_run_init: false,
first_contact_points,
direct_destination,
Expand All @@ -170,11 +193,8 @@ impl RedisSinkCluster {
reason_for_no_nodes: None,
rebuild_connections: true,
token: None,
};

counter!("shotover_failed_requests_count", "chain" => chain_name, "transform" => sink_cluster.get_name());

sink_cluster
failed_requests,
}
}

async fn direct_connection(&mut self) -> Result<&UnboundedSender<Request>> {
Expand All @@ -196,10 +216,6 @@ impl RedisSinkCluster {
Ok(self.direct_connection.as_ref().unwrap())
}

fn get_name(&self) -> &'static str {
NAME
}

#[inline]
async fn dispatch_message(&mut self, mut message: Message) -> Result<ResponseFuture> {
let command = match message.frame() {
Expand Down Expand Up @@ -617,7 +633,7 @@ impl RedisSinkCluster {

#[inline(always)]
fn send_error_response(&self, message: &str) -> Result<ResponseFuture> {
counter!("shotover_failed_requests_count", "chain" => self.chain_name.clone(), "transform" => self.get_name()).increment(1);
self.failed_requests.increment(1);
short_circuit(RedisFrame::Error(message.into()))
}

Expand Down

0 comments on commit 0f3d3a2

Please sign in to comment.