Skip to content

Commit

Permalink
Merge pull request #21 from nihohit/slot-coverage
Browse files Browse the repository at this point in the history
calculate_topology work with partial slot cover
  • Loading branch information
nihohit authored Aug 10, 2023
2 parents 2c1c8e4 + 960f092 commit d33c92d
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 79 deletions.
2 changes: 1 addition & 1 deletion redis/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ where
for conn in samples.iter_mut() {
let value = conn.req_command(&slot_cmd())?;
match parse_slots(&value, self.cluster_params.tls)
.and_then(|v| build_slot_map(&mut new_slots, v))
.map(|v| build_slot_map(&mut new_slots, v))
{
Ok(_) => {
result = Ok(new_slots);
Expand Down
185 changes: 107 additions & 78 deletions redis/src/cluster_topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,36 @@ pub const DEFAULT_REFRESH_SLOTS_RETRY_INITIAL_INTERVAL: Duration = Duration::fro
pub(crate) const SLOT_SIZE: u16 = 16384;

#[derive(Derivative)]
#[derivative(PartialEq, PartialOrd, Ord)]
#[derive(Debug, Eq)]
#[derivative(PartialEq, Eq)]
#[derive(Debug)]
pub(crate) struct TopologyView {
#[derivative(PartialOrd = "ignore", Ord = "ignore")]
pub(crate) hash_value: u64,
#[derivative(PartialEq = "ignore", PartialOrd = "ignore", Ord = "ignore")]
#[derivative(PartialEq = "ignore")]
pub(crate) topology_value: Value,
#[derivative(PartialEq = "ignore")]
pub(crate) nodes_count: u16,
#[derivative(PartialEq = "ignore")]
slots_and_count: Option<(u16, Vec<Slot>)>,
}

impl TopologyView {
// Tries to parse the `topology_value` field, and sets the parsed value, and the number of covered slots, in `slots_and_count`.
// If `slots_and_count` is already not `None`, the function will return early. This means that resetting
// `topology_value` after calling this brings the object into an inconsistent state.
fn parse_and_count_slots(&mut self, tls_mode: Option<TlsMode>) {
if self.slots_and_count.is_some() {
return;
}
self.slots_and_count =
parse_slots(&self.topology_value, tls_mode)
.ok()
.map(|parsed_slots| {
let slot_count = parsed_slots
.iter()
.fold(0, |acc, slot| acc + slot.end() - slot.start());
(slot_count, parsed_slots)
})
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -213,35 +234,10 @@ pub(crate) fn parse_slots(raw_slot_resp: &Value, tls: Option<TlsMode>) -> RedisR
Ok(result)
}

pub(crate) fn build_slot_map(slot_map: &mut SlotMap, mut slots_data: Vec<Slot>) -> RedisResult<()> {
slots_data.sort_by_key(|slot_data| slot_data.start());
let last_slot = slots_data.iter().try_fold(0, |prev_end, slot_data| {
if prev_end != slot_data.start() {
return Err(RedisError::from((
ErrorKind::ResponseError,
"Slot refresh error.",
format!(
"Received overlapping slots {} and {}..{}",
prev_end,
slot_data.start(),
slot_data.end()
),
)));
}
Ok(slot_data.end() + 1)
})?;

if last_slot != SLOT_SIZE {
return Err(RedisError::from((
ErrorKind::ResponseError,
"Slot refresh error.",
format!("Lacks the slots >= {last_slot}"),
)));
}
pub(crate) fn build_slot_map(slot_map: &mut SlotMap, slots_data: Vec<Slot>) {
slot_map.clear();
slot_map.fill_slots(slots_data);
trace!("{:?}", slot_map);
Ok(())
}

fn calculate_hash<T: Hash>(t: &T) -> u64 {
Expand All @@ -263,66 +259,80 @@ pub(crate) fn calculate_topology(
)));
}
let mut hash_view_map = HashMap::new();
let mut new_slots = SlotMap::new();
for view in topology_views {
let hash_value = calculate_hash(&view);
let topology_entry = hash_view_map.entry(hash_value).or_insert(TopologyView {
hash_value,
topology_value: view,
nodes_count: 0,
slots_and_count: None,
});
topology_entry.nodes_count += 1;
}
let mut most_frequent_topology: Option<&TopologyView> = None;
let mut has_more_than_a_single_max = false;
let vec_iter = hash_view_map.values();
let mut non_unique_max_node_count = false;
let mut vec_iter = hash_view_map.into_values();
let mut most_frequent_topology = match vec_iter.next() {
Some(view) => view,
None => {
return Err(RedisError::from((
ErrorKind::ResponseError,
"No topology views found",
)));
}
};
// Find the most frequent topology view
for curr_view in vec_iter {
let max_view = match most_frequent_topology {
Some(view) => view,
None => {
most_frequent_topology = Some(curr_view);
continue;
}
};
match max_view.cmp(curr_view) {
for mut curr_view in vec_iter {
match most_frequent_topology
.nodes_count
.cmp(&curr_view.nodes_count)
{
std::cmp::Ordering::Less => {
most_frequent_topology = Some(curr_view);
has_more_than_a_single_max = false;
most_frequent_topology = curr_view;
non_unique_max_node_count = false;
}
std::cmp::Ordering::Equal => has_more_than_a_single_max = true,
std::cmp::Ordering::Greater => continue,
std::cmp::Ordering::Equal => {
non_unique_max_node_count = true;

// We choose as the greater view the one with higher slot coverage.
most_frequent_topology.parse_and_count_slots(tls_mode);
if let Some((slot_count, _)) = most_frequent_topology.slots_and_count {
curr_view.parse_and_count_slots(tls_mode);
let curr_slot_count = curr_view
.slots_and_count
.as_ref()
.map(|(slot_count, _)| *slot_count)
.unwrap_or(0);

if let std::cmp::Ordering::Less = slot_count.cmp(&curr_slot_count) {
most_frequent_topology = curr_view;
}
} else {
most_frequent_topology = curr_view;
}
}
}
}
let most_frequent_topology = match most_frequent_topology {
Some(view) => view,
None => unreachable!(),

let parse_and_built_result = |mut most_frequent_topology: TopologyView| {
let mut new_slots = SlotMap::new();
most_frequent_topology.parse_and_count_slots(tls_mode);
let slots_data = most_frequent_topology
.slots_and_count
.map(|(_, slots)| slots)
.ok_or(RedisError::from((
ErrorKind::ResponseError,
"Failed to parse the slots on the majority view",
)))?;
build_slot_map(&mut new_slots, slots_data);
Ok(new_slots)
};
if has_more_than_a_single_max {

if non_unique_max_node_count {
// More than a single most frequent view was found
// If we reached the last retry, or if we it's a 2-nodes cluster, we'll return all found topologies to be checked by the caller
// If we reached the last retry, or if we it's a 2-nodes cluster, we'll return a view with the highest slot coverage, and that is one of most agreed on views.
if curr_retry >= DEFAULT_NUMBER_OF_REFRESH_SLOTS_RETRIES || num_of_queried_nodes < 3 {
for (idx, topology_view) in hash_view_map.values().enumerate() {
match parse_slots(&topology_view.topology_value, tls_mode)
.and_then(|v| build_slot_map(&mut new_slots, v))
{
Ok(_) => {
return Ok(new_slots);
}
Err(e) => {
// If it's the last view, raise the error
if idx == hash_view_map.len() - 1 {
return Err(RedisError::from((
ErrorKind::ResponseError,
"Failed to obtain a majority in topology views and couldn't find complete slot coverage in any of the views",
e.to_string()
)));
} else {
continue;
}
}
}
}
return parse_and_built_result(most_frequent_topology);
}
return Err(RedisError::from((
ErrorKind::ResponseError,
Expand All @@ -334,9 +344,7 @@ pub(crate) fn calculate_topology(
let agreement_rate = most_frequent_topology.nodes_count as f32 / num_of_queried_nodes as f32;
const MIN_AGREEMENT_RATE: f32 = 0.2;
if agreement_rate >= MIN_AGREEMENT_RATE {
parse_slots(&most_frequent_topology.topology_value, tls_mode)
.and_then(|v| build_slot_map(&mut new_slots, v))?;
Ok(new_slots)
parse_and_built_result(most_frequent_topology)
} else {
Err(RedisError::from((
ErrorKind::ResponseError,
Expand Down Expand Up @@ -487,15 +495,36 @@ mod tests {
}

#[test]
fn test_topology_calculator_2_nodes_queried_no_majority_no_full_coverage_return_error() {
fn test_topology_calculator_2_nodes_queried_no_majority_no_full_coverage_prefer_fuller_coverage(
) {
// 2 nodes queried: No majority, no full slot coverage, should return error
let queried_nodes = 2;
let topology_results = vec![
get_view(&ViewType::SingleNodeViewMissingSlots),
get_view(&ViewType::TwoNodesViewMissingSlots),
];
let topology_view_res = calculate_topology(topology_results, 1, None, queried_nodes);
assert!(topology_view_res.is_err());
let topology_view = calculate_topology(topology_results, 1, None, queried_nodes).unwrap();
let res: Vec<_> = topology_view.values().collect();
let node_1 = get_node_addr("node3", 6381);
let node_2 = get_node_addr("node4", 6382);
let expected: Vec<&SlotAddrs> = vec![&node_1, &node_2];
assert_eq!(res, expected);
}

#[test]
fn test_topology_calculator_3_nodes_queried_no_full_coverage_prefer_majority() {
// 2 nodes queried: No majority, no full slot coverage, should return error
let queried_nodes = 2;
let topology_results = vec![
get_view(&ViewType::SingleNodeViewMissingSlots),
get_view(&ViewType::TwoNodesViewMissingSlots),
get_view(&ViewType::SingleNodeViewMissingSlots),
];
let topology_view = calculate_topology(topology_results, 1, None, queried_nodes).unwrap();
let res: Vec<_> = topology_view.values().collect();
let node_1 = get_node_addr("node1", 6379);
let expected: Vec<&SlotAddrs> = vec![&node_1];
assert_eq!(res, expected);
}

#[test]
Expand Down

0 comments on commit d33c92d

Please sign in to comment.