Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added new routing option: SingleNodeRoutingInfo::RandomPrimary #194

Merged
merged 4 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions redis/src/aio/tokio.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::{AsyncStream, RedisResult, RedisRuntime, SocketAddr};
use async_trait::async_trait;
#[allow(unused_imports)] // silence warning in multiple configuration builds
eifrah-aws marked this conversation as resolved.
Show resolved Hide resolved
use std::{
future::Future,
io,
Expand Down
18 changes: 9 additions & 9 deletions redis/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ use std::str::FromStr;
use std::thread;
use std::time::Duration;

use rand::{seq::IteratorRandom, thread_rng, Rng};
use rand::{seq::IteratorRandom, thread_rng};

use crate::cluster_pipeline::UNROUTABLE_ERROR;
use crate::cluster_routing::{
MultipleNodeRoutingInfo, ResponsePolicy, Routable, SingleNodeRoutingInfo, SlotAddr,
MultipleNodeRoutingInfo, ResponsePolicy, Routable, SingleNodeRoutingInfo,
};
use crate::cluster_slotmap::SlotMap;
use crate::cluster_topology::{parse_and_count_slots, SLOT_SIZE};
use crate::cluster_topology::parse_and_count_slots;
use crate::cmd::{cmd, Cmd};
use crate::connection::{
connect, Connection, ConnectionAddr, ConnectionInfo, ConnectionLike, RedisConnectionInfo,
Expand Down Expand Up @@ -459,12 +459,9 @@ where
};

match RoutingInfo::for_routable(cmd) {
Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) => {
let mut rng = thread_rng();
Ok(addr_for_slot(Route::new(
rng.gen_range(0..SLOT_SIZE),
SlotAddr::Master,
))?)
Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random))
| Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::RandomPrimary)) => {
Ok(addr_for_slot(Route::new_random_primary())?)
}
Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(route))) => {
Ok(addr_for_slot(route)?)
Expand Down Expand Up @@ -730,6 +727,9 @@ where
SingleNodeRoutingInfo::SpecificNode(route) => {
self.get_connection(&mut connections, route)?
}
SingleNodeRoutingInfo::RandomPrimary => {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is there no distinction at line 462 between RandomPrimary and Random, but here they are processed differently? Is there a bug at line 462?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. The code for RandomPrimary is correct
  2. The RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random) was not changed (replaced the old code with a call to Route::new_random_primary()

Having said that, there is no bug on that line, however there can be an optimization: for example we could add a check to see whether the cmd is readonly, and in case it does, we could allow "read-from-replica" instead of "SlotAddr::Master`.

If you think that this is correct, we could open a new bug and fix it in a different PR (I prefer not to mix changes in a single PR)

self.get_connection(&mut connections, &Route::new_random_primary())?
}
SingleNodeRoutingInfo::ByAddress { host, port } => {
let address = format!("{host}:{port}");
let conn = self.get_connection_by_addr(&mut connections, &address)?;
Expand Down
6 changes: 6 additions & 0 deletions redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,9 @@ impl<C> From<SingleNodeRoutingInfo> for InternalSingleNodeRouting<C> {
SingleNodeRoutingInfo::SpecificNode(route) => {
InternalSingleNodeRouting::SpecificNode(route)
}
SingleNodeRoutingInfo::RandomPrimary => {
InternalSingleNodeRouting::SpecificNode(Route::new_random_primary())
}
SingleNodeRoutingInfo::ByAddress { host, port } => {
InternalSingleNodeRouting::ByAddress(format!("{host}:{port}"))
}
Expand Down Expand Up @@ -620,6 +623,9 @@ fn route_for_pipeline(pipeline: &crate::Pipeline) -> RedisResult<Option<Route>>
Some(cluster_routing::RoutingInfo::SingleNode(
SingleNodeRoutingInfo::SpecificNode(route),
)) => Some(route),
Some(cluster_routing::RoutingInfo::SingleNode(
SingleNodeRoutingInfo::RandomPrimary,
)) => Some(Route::new_random_primary()),
Some(cluster_routing::RoutingInfo::MultiNode(_)) => None,
Some(cluster_routing::RoutingInfo::SingleNode(SingleNodeRoutingInfo::ByAddress {
..
Expand Down
43 changes: 41 additions & 2 deletions redis/src/cluster_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ pub enum RoutingInfo {
pub enum SingleNodeRoutingInfo {
/// Route to any node at random
Random,
/// Route to any *primary* node
RandomPrimary,
/// Route to the node that matches the [Route]
SpecificNode(Route),
/// Route to the node with the given address.
Expand Down Expand Up @@ -610,7 +612,13 @@ impl RoutingInfo {
.and_then(|x| std::str::from_utf8(x).ok())
.and_then(|x| x.parse::<u64>().ok())?;
if key_count == 0 {
Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random))
if is_readonly_cmd(cmd) {
Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random))
} else {
Some(RoutingInfo::SingleNode(
SingleNodeRoutingInfo::RandomPrimary,
))
}
} else {
r.arg_idx(3).map(|key| RoutingInfo::for_key(cmd, key))
}
Expand Down Expand Up @@ -949,6 +957,18 @@ impl Route {
pub fn slot_addr(&self) -> SlotAddr {
self.1
}

/// Returns a new Route for a random primary node
pub fn new_random_primary() -> Self {
Self::new(random_slot(), SlotAddr::Master)
}
}

/// Choose a random slot from `0..SLOT_SIZE` (excluding)
fn random_slot() -> u16 {
use rand::Rng;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please put "use" alongside all the others at the top

let mut rng = rand::thread_rng();
rng.gen_range(0..crate::cluster_topology::SLOT_SIZE)
}

#[cfg(test)]
Expand Down Expand Up @@ -1096,12 +1116,31 @@ mod tests {
cmd("EVAL").arg(r#"redis.call("PING");"#).arg(0),
cmd("EVALSHA").arg(r#"redis.call("PING");"#).arg(0),
] {
// EVAL / EVALSHA are expected to be routed to a RandomPrimary
assert_eq!(
RoutingInfo::for_routable(cmd),
Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random))
Some(RoutingInfo::SingleNode(
SingleNodeRoutingInfo::RandomPrimary
))
);
}

// FCALL (with 0 keys) is expected to be routed to a random primary node
assert_eq!(
RoutingInfo::for_routable(cmd("FCALL").arg("foo").arg(0)),
Some(RoutingInfo::SingleNode(
SingleNodeRoutingInfo::RandomPrimary
))
);

// While FCALL with N keys is expected to be routed to a specific node
assert_eq!(
RoutingInfo::for_routable(cmd("FCALL").arg("foo").arg(1).arg("mykey")),
Some(RoutingInfo::SingleNode(
SingleNodeRoutingInfo::SpecificNode(Route::new(slot(b"mykey"), SlotAddr::Master))
))
);

for (cmd, expected) in [
(
cmd("EVAL")
Expand Down