Skip to content

Commit

Permalink
Added management connections to be used with the periodic checks
Browse files Browse the repository at this point in the history
  • Loading branch information
barshaul committed Nov 12, 2023
1 parent 68ea13e commit 951617c
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 154 deletions.
106 changes: 25 additions & 81 deletions redis/src/cluster_async/connections_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type IdentifierType = ArcStr;
#[derive(Clone, Eq, PartialEq, Debug)]
pub(crate) struct ClusterNode<Connection> {
pub user_connection: Connection,
pub management_connection: Option<Connection>,
pub management_connection: Connection,
pub ip: Option<IpAddr>,
}

Expand All @@ -22,7 +22,7 @@ where
{
pub(crate) fn new(
user_connection: Connection,
management_connection: Option<Connection>,
management_connection: Connection,
ip: Option<IpAddr>,
) -> Self {
Self {
Expand All @@ -35,10 +35,7 @@ where
pub(crate) fn get_connection(&self, conn_type: &ConnectionType) -> Connection {
match conn_type {
ConnectionType::User => self.user_connection.clone(),
ConnectionType::Management => self
.management_connection
.clone()
.unwrap_or_else(|| self.user_connection.clone()),
ConnectionType::Management => self.management_connection.clone(),
}
}
}
Expand Down Expand Up @@ -286,18 +283,7 @@ mod tests {
use crate::cluster_routing::{Slot, SlotAddr};

use super::*;
impl<Connection> ClusterNode<Connection>
where
Connection: Clone,
{
pub(crate) fn new_only_with_user_conn(user_connection: Connection) -> Self {
Self {
user_connection,
management_connection: None,
ip: None,
}
}
}

fn remove_nodes(container: &mut ConnectionsContainer<usize>, identifiers: &[&str]) {
for identifier in identifiers {
container.remove_node(&Identifier((*identifier).into()));
Expand Down Expand Up @@ -325,24 +311,13 @@ mod tests {
let found = connection.unwrap().1;
expected_connections.contains(&found)
}
fn create_cluster_node(
connection: usize,
use_management_connections: bool,
) -> Option<ClusterNode<usize>> {
Some(ClusterNode::new(
connection,
if use_management_connections {
Some(connection * 10)
} else {
None
},
None,
))

fn create_cluster_node(connection: usize) -> Option<ClusterNode<usize>> {
Some(ClusterNode::new(connection, connection * 10, None))
}

fn create_container_with_strategy(
stragey: ReadFromReplicaStrategy,
use_management_connections: bool,
) -> ConnectionsContainer<usize> {
let slot_map = SlotMap::new(
vec![
Expand All @@ -363,30 +338,12 @@ mod tests {
ReadFromReplicaStrategy::AlwaysFromPrimary, // this argument shouldn't matter, since we overload the RFR strategy.
);
let mut connection_map = HashMap::new();
connection_map.insert(
Identifier("primary1".into()),
create_cluster_node(1, use_management_connections),
);
connection_map.insert(
Identifier("primary2".into()),
create_cluster_node(2, use_management_connections),
);
connection_map.insert(
Identifier("primary3".into()),
create_cluster_node(3, use_management_connections),
);
connection_map.insert(
Identifier("replica2-1".into()),
create_cluster_node(21, use_management_connections),
);
connection_map.insert(
Identifier("replica3-1".into()),
create_cluster_node(31, use_management_connections),
);
connection_map.insert(
Identifier("replica3-2".into()),
create_cluster_node(32, use_management_connections),
);
connection_map.insert(Identifier("primary1".into()), create_cluster_node(1));
connection_map.insert(Identifier("primary2".into()), create_cluster_node(2));
connection_map.insert(Identifier("primary3".into()), create_cluster_node(3));
connection_map.insert(Identifier("replica2-1".into()), create_cluster_node(21));
connection_map.insert(Identifier("replica3-1".into()), create_cluster_node(31));
connection_map.insert(Identifier("replica3-2".into()), create_cluster_node(32));

ConnectionsContainer {
slot_map,
Expand All @@ -397,7 +354,7 @@ mod tests {
}

fn create_container() -> ConnectionsContainer<usize> {
create_container_with_strategy(ReadFromReplicaStrategy::RoundRobin, false)
create_container_with_strategy(ReadFromReplicaStrategy::RoundRobin)
}

#[test]
Expand Down Expand Up @@ -525,8 +482,7 @@ mod tests {
#[test]
fn get_replica_connection_for_replica_route_if_replica_is_required_even_if_strategy_is_always_from_primary(
) {
let container =
create_container_with_strategy(ReadFromReplicaStrategy::AlwaysFromPrimary, false);
let container = create_container_with_strategy(ReadFromReplicaStrategy::AlwaysFromPrimary);

assert!(one_of(
container.connection_for_route(&Route::new(2001, SlotAddr::ReplicaRequired)),
Expand Down Expand Up @@ -607,10 +563,8 @@ mod tests {
#[test]
fn get_connection_by_address_returns_added_connection() {
let mut container = create_container();
let identifier = container.replace_or_add_connection_for_address(
"foobar",
ClusterNode::new_only_with_user_conn(4),
);
let identifier = container
.replace_or_add_connection_for_address("foobar", create_cluster_node(4).unwrap());

assert_eq!(4, container.connection_for_identifier(&identifier).unwrap());
assert_eq!(
Expand Down Expand Up @@ -651,10 +605,8 @@ mod tests {
fn get_random_connections_returns_added_connection() {
let mut container = create_container();
remove_all_connections(&mut container);
let identifier = container.replace_or_add_connection_for_address(
"foobar",
ClusterNode::new_only_with_user_conn(4),
);
let identifier = container
.replace_or_add_connection_for_address("foobar", create_cluster_node(4).unwrap());
let random_connections: Vec<_> = container
.random_connections(1, ConnectionType::User)
.collect();
Expand All @@ -676,7 +628,7 @@ mod tests {

#[test]
fn get_random_management_connections() {
let container = create_container_with_strategy(ReadFromReplicaStrategy::RoundRobin, true);
let container = create_container_with_strategy(ReadFromReplicaStrategy::RoundRobin);
let mut random_connections: Vec<_> = container
.random_connections(1000, ConnectionType::Management)
.map(|pair| pair.1)
Expand All @@ -701,10 +653,7 @@ mod tests {
#[test]
fn get_all_user_connections_returns_added_connection() {
let mut container = create_container();
container.replace_or_add_connection_for_address(
"foobar",
ClusterNode::new_only_with_user_conn(4),
);
container.replace_or_add_connection_for_address("foobar", create_cluster_node(4).unwrap());

let mut connections: Vec<_> = container
.all_node_connections()
Expand Down Expand Up @@ -765,10 +714,7 @@ mod tests {
container.remove_node(&Identifier("primary1".into()));
assert_eq!(container.len(), 5);

container.replace_or_add_connection_for_address(
"foobar",
ClusterNode::new_only_with_user_conn(4),
);
container.replace_or_add_connection_for_address("foobar", create_cluster_node(4).unwrap());
assert_eq!(container.len(), 6);
}

Expand All @@ -782,10 +728,8 @@ mod tests {
container.remove_node(&Identifier("foobar".into()));
assert_eq!(container.len(), 6);

container.replace_or_add_connection_for_address(
"primary1",
ClusterNode::new_only_with_user_conn(4),
);
container
.replace_or_add_connection_for_address("primary1", create_cluster_node(4).unwrap());
assert_eq!(container.len(), 6);
}

Expand All @@ -794,7 +738,7 @@ mod tests {
let mut container = create_container();

let connection = container.remove_node(&Identifier("primary1".into()));
assert_eq!(connection, Some(ClusterNode::new_only_with_user_conn(1)));
assert_eq!(connection, create_cluster_node(1));

let non_connection = container.remove_node(&Identifier("foobar".into()));
assert_eq!(non_connection, None);
Expand Down
Loading

0 comments on commit 951617c

Please sign in to comment.