Skip to content

Commit

Permalink
Added management connections to the async cluster client to perform p…
Browse files Browse the repository at this point in the history
…eriodic cluster checks
  • Loading branch information
barshaul committed Oct 4, 2023
1 parent b6b2e9a commit ad7dc11
Show file tree
Hide file tree
Showing 5 changed files with 600 additions and 138 deletions.
186 changes: 147 additions & 39 deletions redis/src/cluster_async/connections_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,40 @@ type IdentifierType = ArcStr;

#[derive(Clone, Eq, PartialEq, Debug)]
pub(crate) struct ClusterNode<Connection> {
pub connection: Connection,
pub ip: Option<IpAddr>,
user_connection: Connection,
management_connection: Option<Connection>,
ip: Option<IpAddr>,
}

impl<Connection> ClusterNode<Connection>
where
Connection: Clone,
{
pub(crate) fn new(connection: Connection, ip: Option<IpAddr>) -> Self {
Self { connection, ip }
pub(crate) fn new(
user_connection: Connection,
management_connection: Option<Connection>,
ip: Option<IpAddr>,
) -> Self {
Self {
user_connection,
management_connection,
ip,
}
}

pub(crate) fn get_user_connection(&self) -> Connection {
self.user_connection.clone()
}

pub(crate) fn get_management_connection(&self) -> Option<Connection> {
self.management_connection.as_ref().cloned()
}

pub(crate) fn get_ip(&self) -> Option<IpAddr> {
self.ip
}
}

/// This opaque type allows us to change the way that the connections are organized
/// internally without refactoring the calling code.
#[derive(Clone, Hash, Eq, PartialEq, Debug)]
Expand Down Expand Up @@ -119,6 +141,15 @@ where
}
}

fn get_connection_for_node(node: &ClusterNode<Connection>, use_management: bool) -> Connection {
if use_management {
node.get_management_connection()
.unwrap_or_else(|| node.get_user_connection())
} else {
node.get_user_connection()
}
}

pub(crate) fn connection_for_route(
&self,
route: &Route,
Expand All @@ -134,11 +165,18 @@ where

pub(crate) fn all_node_connections(
&self,
use_management: bool,
) -> impl Iterator<Item = ConnectionAndIdentifier<Connection>> + '_ {
self.connection_map.iter().filter_map(|(identifier, node)| {
node.as_ref()
.map(|node| (identifier.clone(), node.connection.clone()))
})
self.connection_map
.iter()
.filter_map(move |(identifier, node)| {
node.as_ref().map(|node| {
(
identifier.clone(),
Self::get_connection_for_node(node, use_management),
)
})
})
}

pub(crate) fn all_primary_connections(
Expand All @@ -163,7 +201,7 @@ where

pub(crate) fn connection_for_identifier(&self, identifier: &Identifier) -> Option<Connection> {
let node = self.connection_map.get(identifier)?.as_ref()?;
Some(node.connection.clone())
Some(node.get_user_connection().clone())
}

pub(crate) fn connection_for_address(
Expand All @@ -189,6 +227,7 @@ where
pub(crate) fn random_connections(
&self,
amount: usize,
use_management: bool,
) -> impl Iterator<Item = ConnectionAndIdentifier<Connection>> + '_ {
self.connection_map
.iter()
Expand All @@ -199,7 +238,10 @@ where
})
.choose_multiple(&mut rand::thread_rng(), amount)
.into_iter()
.map(|(identifier, node)| (identifier.clone(), node.connection.clone()))
.map(move |(identifier, node)| {
let conn = Self::get_connection_for_node(node, use_management);
(identifier.clone(), conn)
})
}

pub(crate) fn replace_or_add_connection_for_address(
Expand All @@ -222,7 +264,7 @@ where
pub(crate) fn len(&self) -> usize {
self.connection_map
.iter()
.filter(|(_, conn_option)| conn_option.is_some())
.filter(|(_, node)| node.is_some())
.count()
}

Expand All @@ -238,7 +280,18 @@ mod tests {
use crate::cluster_routing::{Slot, SlotAddr};

use super::*;

impl<Connection> ClusterNode<Connection>
where
Connection: Clone,
{
pub(crate) fn new_only_with_user_conn(user_connection: Connection) -> Self {
Self {
user_connection,
management_connection: None,
ip: None,
}
}
}
fn remove_connections(container: &mut ConnectionsContainer<usize>, identifiers: &[&str]) {
for identifier in identifiers {
container.remove_connection(&Identifier((*identifier).into()));
Expand Down Expand Up @@ -266,9 +319,24 @@ mod tests {
let found = connection.unwrap().1;
expected_connections.contains(&found)
}
fn create_cluster_node(
connection: usize,
use_management_connections: bool,
) -> Option<ClusterNode<usize>> {
Some(ClusterNode::new(
connection,
if use_management_connections {
Some(connection * 10)
} else {
None
},
None,
))
}

fn create_container_with_strategy(
stragey: ReadFromReplicaStrategy,
use_management_connections: bool,
) -> ConnectionsContainer<usize> {
let slot_map = SlotMap::new(
vec![
Expand All @@ -291,27 +359,27 @@ mod tests {
let mut connection_map = HashMap::new();
connection_map.insert(
Identifier("primary1".into()),
Some(ClusterNode::new(1, None)),
create_cluster_node(1, use_management_connections),
);
connection_map.insert(
Identifier("primary2".into()),
Some(ClusterNode::new(2, None)),
create_cluster_node(2, use_management_connections),
);
connection_map.insert(
Identifier("primary3".into()),
Some(ClusterNode::new(3, None)),
create_cluster_node(3, use_management_connections),
);
connection_map.insert(
Identifier("replica2-1".into()),
Some(ClusterNode::new(21, None)),
create_cluster_node(21, use_management_connections),
);
connection_map.insert(
Identifier("replica3-1".into()),
Some(ClusterNode::new(31, None)),
create_cluster_node(31, use_management_connections),
);
connection_map.insert(
Identifier("replica3-2".into()),
Some(ClusterNode::new(32, None)),
create_cluster_node(32, use_management_connections),
);

ConnectionsContainer {
Expand All @@ -323,7 +391,7 @@ mod tests {
}

fn create_container() -> ConnectionsContainer<usize> {
create_container_with_strategy(ReadFromReplicaStrategy::RoundRobin)
create_container_with_strategy(ReadFromReplicaStrategy::RoundRobin, false)
}

#[test]
Expand Down Expand Up @@ -451,7 +519,8 @@ mod tests {
#[test]
fn get_replica_connection_for_replica_route_if_replica_is_required_even_if_strategy_is_always_from_primary(
) {
let container = create_container_with_strategy(ReadFromReplicaStrategy::AlwaysFromPrimary);
let container =
create_container_with_strategy(ReadFromReplicaStrategy::AlwaysFromPrimary, false);

assert!(one_of(
container.connection_for_route(&Route::new(2001, SlotAddr::ReplicaRequired)),
Expand Down Expand Up @@ -532,8 +601,10 @@ mod tests {
#[test]
fn get_connection_by_address_returns_added_connection() {
let mut container = create_container();
let identifier =
container.replace_or_add_connection_for_address("foobar", ClusterNode::new(4, None));
let identifier = container.replace_or_add_connection_for_address(
"foobar",
ClusterNode::new_only_with_user_conn(4),
);

assert_eq!(4, container.connection_for_identifier(&identifier).unwrap());
assert_eq!(
Expand All @@ -546,8 +617,10 @@ mod tests {
fn get_random_connections_without_repetitions() {
let container = create_container();

let random_connections: HashSet<_> =
container.random_connections(3).map(|pair| pair.1).collect();
let random_connections: HashSet<_> = container
.random_connections(3, false)
.map(|pair| pair.1)
.collect();

assert_eq!(random_connections.len(), 3);
assert!(random_connections
Expand All @@ -560,16 +633,18 @@ mod tests {
let mut container = create_container();
remove_all_connections(&mut container);

assert_eq!(0, container.random_connections(1).count());
assert_eq!(0, container.random_connections(1, false).count());
}

#[test]
fn get_random_connections_returns_added_connection() {
let mut container = create_container();
remove_all_connections(&mut container);
let identifier =
container.replace_or_add_connection_for_address("foobar", ClusterNode::new(4, None));
let random_connections: Vec<_> = container.random_connections(1).collect();
let identifier = container.replace_or_add_connection_for_address(
"foobar",
ClusterNode::new_only_with_user_conn(4),
);
let random_connections: Vec<_> = container.random_connections(1, false).collect();

assert_eq!(vec![(identifier, 4)], random_connections);
}
Expand All @@ -578,7 +653,7 @@ mod tests {
fn get_random_connections_is_bound_by_the_number_of_connections_in_the_map() {
let container = create_container();
let mut random_connections: Vec<_> = container
.random_connections(1000)
.random_connections(1000, false)
.map(|pair| pair.1)
.collect();
random_connections.sort();
Expand All @@ -587,10 +662,22 @@ mod tests {
}

#[test]
fn get_all_nodes() {
fn get_random_management_connections() {
let container = create_container_with_strategy(ReadFromReplicaStrategy::RoundRobin, true);
let mut random_connections: Vec<_> = container
.random_connections(1000, true)
.map(|pair| pair.1)
.collect();
random_connections.sort();

assert_eq!(random_connections, vec![10, 20, 30, 210, 310, 320]);
}

#[test]
fn get_all_user_connections() {
let container = create_container();
let mut connections: Vec<_> = container
.all_node_connections()
.all_node_connections(false)
.map(|conn| conn.1)
.collect();
connections.sort();
Expand All @@ -599,12 +686,15 @@ mod tests {
}

#[test]
fn get_all_nodes_returns_added_connection() {
fn get_all_user_connections_returns_added_connection() {
let mut container = create_container();
container.replace_or_add_connection_for_address("foobar", ClusterNode::new(4, None));
container.replace_or_add_connection_for_address(
"foobar",
ClusterNode::new_only_with_user_conn(4),
);

let mut connections: Vec<_> = container
.all_node_connections()
.all_node_connections(false)
.map(|conn| conn.1)
.collect();
connections.sort();
Expand All @@ -613,19 +703,31 @@ mod tests {
}

#[test]
fn get_all_nodes_does_not_return_removed_connection() {
fn get_all_user_connections_does_not_return_removed_connection() {
let mut container = create_container();
container.remove_connection(&Identifier("primary1".into()));

let mut connections: Vec<_> = container
.all_node_connections()
.all_node_connections(false)
.map(|conn| conn.1)
.collect();
connections.sort();

assert_eq!(vec![2, 3, 21, 31, 32], connections);
}

#[test]
fn get_all_management_connections() {
let container = create_container_with_strategy(ReadFromReplicaStrategy::RoundRobin, true);
let mut connections: Vec<_> = container
.all_node_connections(true)
.map(|conn| conn.1)
.collect();
connections.sort();

assert_eq!(vec![10, 20, 30, 210, 310, 320], connections);
}

#[test]
fn get_all_primaries() {
let container = create_container();
Expand Down Expand Up @@ -662,7 +764,10 @@ mod tests {
container.remove_connection(&Identifier("primary1".into()));
assert_eq!(container.len(), 5);

container.replace_or_add_connection_for_address("foobar", ClusterNode::new(4, None));
container.replace_or_add_connection_for_address(
"foobar",
ClusterNode::new_only_with_user_conn(4),
);
assert_eq!(container.len(), 6);
}

Expand All @@ -676,7 +781,10 @@ mod tests {
container.remove_connection(&Identifier("foobar".into()));
assert_eq!(container.len(), 6);

container.replace_or_add_connection_for_address("primary1", ClusterNode::new(4, None));
container.replace_or_add_connection_for_address(
"primary1",
ClusterNode::new_only_with_user_conn(4),
);
assert_eq!(container.len(), 6);
}

Expand All @@ -685,7 +793,7 @@ mod tests {
let mut container = create_container();

let connection = container.remove_connection(&Identifier("primary1".into()));
assert_eq!(connection, Some(ClusterNode::new(1, None)));
assert_eq!(connection, Some(ClusterNode::new_only_with_user_conn(1)));

let non_connection = container.remove_connection(&Identifier("foobar".into()));
assert_eq!(non_connection, None);
Expand Down
Loading

0 comments on commit ad7dc11

Please sign in to comment.