Skip to content

Commit

Permalink
Allow user to force send to replica.
Browse files Browse the repository at this point in the history
  • Loading branch information
nihohit committed Sep 21, 2023
1 parent f0f184f commit b390580
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 28 deletions.
48 changes: 33 additions & 15 deletions redis/src/cluster_async/connections_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,15 @@ where

match route.slot_addr() {
SlotAddr::Master => self.connection_for_address(addrs.primary.as_str()),
SlotAddr::Replica => match self.read_from_replica_strategy {
SlotAddr::ReplicaOptional => match self.read_from_replica_strategy {
ReadFromReplicaStrategy::AlwaysFromPrimary => {
self.connection_for_address(addrs.primary.as_str())
}
ReadFromReplicaStrategy::RoundRobin => {
self.round_robin_read_from_replica(slot_map_value)
}
},
SlotAddr::ReplicaRequired => self.round_robin_read_from_replica(slot_map_value),
}
}

Expand All @@ -118,7 +119,7 @@ where
route: &Route,
) -> Option<ConnectionAndIdentifier<Connection>> {
self.lookup_route(route).or_else(|| {
if route.slot_addr() == SlotAddr::Replica {
if route.slot_addr() != SlotAddr::Master {
self.lookup_route(&Route::new(route.slot(), SlotAddr::Master))
} else {
None
Expand Down Expand Up @@ -257,7 +258,9 @@ mod tests {
expected_connections.contains(&found)
}

fn create_container() -> ConnectionsContainer<usize> {
fn create_container_with_strategy(
stragey: ReadFromReplicaStrategy,
) -> ConnectionsContainer<usize> {
let slot_map = SlotMap::new(
vec![
Slot::new(1, 1000, "primary1".to_owned(), Vec::new()),
Expand Down Expand Up @@ -305,10 +308,14 @@ mod tests {
ConnectionsContainer {
slot_map,
connection_map,
read_from_replica_strategy: ReadFromReplicaStrategy::RoundRobin,
read_from_replica_strategy: stragey,
}
}

fn create_container() -> ConnectionsContainer<usize> {
create_container_with_strategy(ReadFromReplicaStrategy::RoundRobin)
}

#[test]
fn get_connection_for_primary_route() {
let container = create_container();
Expand Down Expand Up @@ -367,27 +374,27 @@ mod tests {
let container = create_container();

assert!(container
.connection_for_route(&Route::new(1001, SlotAddr::Replica))
.connection_for_route(&Route::new(1001, SlotAddr::ReplicaOptional))
.is_none());

assert_eq!(
21,
container
.connection_for_route(&Route::new(1002, SlotAddr::Replica))
.connection_for_route(&Route::new(1002, SlotAddr::ReplicaOptional))
.unwrap()
.1
);

assert_eq!(
21,
container
.connection_for_route(&Route::new(1500, SlotAddr::Replica))
.connection_for_route(&Route::new(1500, SlotAddr::ReplicaOptional))
.unwrap()
.1
);

assert!(one_of(
container.connection_for_route(&Route::new(2001, SlotAddr::Replica)),
container.connection_for_route(&Route::new(2001, SlotAddr::ReplicaOptional)),
&[31, 32],
));
}
Expand All @@ -397,21 +404,21 @@ mod tests {
let container = create_container();

assert!(container
.connection_for_route(&Route::new(0, SlotAddr::Replica))
.connection_for_route(&Route::new(0, SlotAddr::ReplicaOptional))
.is_none());

assert_eq!(
1,
container
.connection_for_route(&Route::new(500, SlotAddr::Replica))
.connection_for_route(&Route::new(500, SlotAddr::ReplicaOptional))
.unwrap()
.1
);

assert_eq!(
1,
container
.connection_for_route(&Route::new(1000, SlotAddr::Replica))
.connection_for_route(&Route::new(1000, SlotAddr::ReplicaOptional))
.unwrap()
.1
);
Expand All @@ -425,12 +432,23 @@ mod tests {
assert_eq!(
31,
container
.connection_for_route(&Route::new(2001, SlotAddr::Replica))
.connection_for_route(&Route::new(2001, SlotAddr::ReplicaRequired))
.unwrap()
.1
);
}

#[test]
fn get_replica_connection_for_replica_route_if_replica_is_required_even_if_strategy_is_always_from_primary(
) {
let container = create_container_with_strategy(ReadFromReplicaStrategy::AlwaysFromPrimary);

assert!(one_of(
container.connection_for_route(&Route::new(2001, SlotAddr::ReplicaRequired)),
&[31, 32],
));
}

#[test]
fn get_primary_connection_for_replica_route_if_all_replicas_were_removed() {
let mut container = create_container();
Expand All @@ -439,23 +457,23 @@ mod tests {
assert_eq!(
2,
container
.connection_for_route(&Route::new(1002, SlotAddr::Replica))
.connection_for_route(&Route::new(1002, SlotAddr::ReplicaOptional))
.unwrap()
.1
);

assert_eq!(
2,
container
.connection_for_route(&Route::new(1500, SlotAddr::Replica))
.connection_for_route(&Route::new(1500, SlotAddr::ReplicaOptional))
.unwrap()
.1
);

assert_eq!(
3,
container
.connection_for_route(&Route::new(2001, SlotAddr::Replica))
.connection_for_route(&Route::new(2001, SlotAddr::ReplicaOptional))
.unwrap()
.1
);
Expand Down
2 changes: 1 addition & 1 deletion redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1500,7 +1500,7 @@ mod pipeline_routing_tests {

assert_eq!(
route_pipeline(&pipeline),
Some(Route::new(12182, SlotAddr::Replica))
Some(Route::new(12182, SlotAddr::ReplicaOptional))
);
}

Expand Down
22 changes: 12 additions & 10 deletions redis/src/cluster_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ pub(crate) fn combine_and_sort_array_results<'a>(
fn get_route(is_readonly: bool, key: &[u8]) -> Route {
let slot = get_slot(key);
if is_readonly {
Route::new(slot, SlotAddr::Replica)
Route::new(slot, SlotAddr::ReplicaOptional)
} else {
Route::new(slot, SlotAddr::Master)
}
Expand Down Expand Up @@ -545,10 +545,12 @@ impl Slot {
/// What type of node should a request be routed to, assuming read from replica is enabled.
#[derive(Eq, PartialEq, Clone, Copy, Debug, Hash)]
pub enum SlotAddr {
/// Primary node
/// The request must be routed to primary node
Master,
/// Replica node
Replica,
/// The request may be routed to a replica node
ReplicaOptional,
/// The request must be routed to replica node, if one exists
ReplicaRequired,
}

/// This is just a simplified version of [`Slot`],
Expand Down Expand Up @@ -751,7 +753,7 @@ mod tests {
Some(RoutingInfo::SingleNode(
SingleNodeRoutingInfo::SpecificNode(Route::new(
slot(b"foo"),
SlotAddr::Replica,
SlotAddr::ReplicaOptional,
)),
)),
),
Expand Down Expand Up @@ -781,7 +783,7 @@ mod tests {
Some(RoutingInfo::SingleNode(
SingleNodeRoutingInfo::SpecificNode(Route::new(
slot(b"mystream"),
SlotAddr::Replica,
SlotAddr::ReplicaOptional,
)),
)),
),
Expand All @@ -800,7 +802,7 @@ mod tests {
assert!(matches!(RoutingInfo::for_routable(&parse_redis_value(&[
42, 50, 13, 10, 36, 54, 13, 10, 69, 88, 73, 83, 84, 83, 13, 10, 36, 49, 54, 13, 10,
244, 93, 23, 40, 126, 127, 253, 33, 89, 47, 185, 204, 171, 249, 96, 139, 13, 10
]).unwrap()), Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(Route(slot, SlotAddr::Replica)))) if slot == 964));
]).unwrap()), Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(Route(slot, SlotAddr::ReplicaOptional)))) if slot == 964));

assert!(matches!(RoutingInfo::for_routable(&parse_redis_value(&[
42, 54, 13, 10, 36, 51, 13, 10, 83, 69, 84, 13, 10, 36, 49, 54, 13, 10, 36, 241,
Expand Down Expand Up @@ -839,9 +841,9 @@ mod tests {
cmd.arg("foo").arg("bar").arg("baz").arg("{bar}vaz");
let routing = RoutingInfo::for_routable(&cmd);
let mut expected = std::collections::HashMap::new();
expected.insert(Route(4813, SlotAddr::Replica), vec![3]);
expected.insert(Route(5061, SlotAddr::Replica), vec![2, 4]);
expected.insert(Route(12182, SlotAddr::Replica), vec![1]);
expected.insert(Route(4813, SlotAddr::ReplicaOptional), vec![3]);
expected.insert(Route(5061, SlotAddr::ReplicaOptional), vec![2, 4]);
expected.insert(Route(12182, SlotAddr::ReplicaOptional), vec![1]);

assert!(
matches!(routing.clone(), Some(RoutingInfo::MultiNode(MultipleNodeRoutingInfo::MultiSlot(vec))) if {
Expand Down
4 changes: 2 additions & 2 deletions redis/src/cluster_topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ mod tests {
let mut addresses =
slot_map.addresses_for_multi_routing(&MultipleNodeRoutingInfo::MultiSlot(vec![
(Route::new(1, SlotAddr::Master), vec![]),
(Route::new(2001, SlotAddr::Replica), vec![]),
(Route::new(2001, SlotAddr::ReplicaOptional), vec![]),
]));
addresses.sort();
assert!(addresses.contains(&"node1:6379"));
Expand All @@ -753,7 +753,7 @@ mod tests {
#[test]
fn test_slot_map_rotate_read_replicas() {
let slot_map = get_slot_map(ReadFromReplicaStrategy::RoundRobin);
let route = Route::new(2001, SlotAddr::Replica);
let route = Route::new(2001, SlotAddr::ReplicaOptional);
let mut addresses = vec![
slot_map.slot_addr_for_route(&route).unwrap(),
slot_map.slot_addr_for_route(&route).unwrap(),
Expand Down

0 comments on commit b390580

Please sign in to comment.