Skip to content

Commit

Permalink
Move response policy into multi-node routing.
Browse files Browse the repository at this point in the history
That's the only scenario where it's needed.
  • Loading branch information
nihohit committed Sep 12, 2023
1 parent 2608f96 commit bb0171f
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 61 deletions.
2 changes: 1 addition & 1 deletion redis/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ where
Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(route))) => {
Some(route)
}
Some(RoutingInfo::MultiNode(multi_node_routing)) => {
Some(RoutingInfo::MultiNode((multi_node_routing, _response_policy))) => {
return self.execute_on_multiple_nodes(func, multi_node_routing);
}
None => fail!(UNROUTABLE_ERROR),
Expand Down
18 changes: 6 additions & 12 deletions redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ where
routing: CommandRouting::Route(
routing.or_else(|| RoutingInfo::for_routable(cmd, allow_replicas)),
),
response_policy: RoutingInfo::response_policy(cmd),
},
sender,
})
Expand Down Expand Up @@ -226,7 +225,6 @@ enum CmdArg<C> {
Cmd {
cmd: Arc<Cmd>,
routing: CommandRouting<C>,
response_policy: Option<ResponsePolicy>,
},
Pipeline {
pipeline: Arc<crate::Pipeline>,
Expand Down Expand Up @@ -813,7 +811,6 @@ where
cmd: CmdArg::Cmd {
cmd,
routing: CommandRouting::Connection { addr, conn },
response_policy: None,
},
redirect: None,
},
Expand All @@ -836,7 +833,6 @@ where
cmd: Arc<Cmd>,
redirect: Option<Redirect>,
routing: CommandRouting<C>,
response_policy: Option<ResponsePolicy>,
core: Core<C>,
asking: bool,
) -> (OperationTarget, RedisResult<Response>) {
Expand All @@ -847,7 +843,10 @@ where
} else {
match routing {
// commands that are sent to multiple nodes are handled here.
CommandRouting::Route(Some(RoutingInfo::MultiNode(multi_node_routing))) => {
CommandRouting::Route(Some(RoutingInfo::MultiNode((
multi_node_routing,
response_policy,
)))) => {
assert!(!asking);
assert!(redirect.is_none());
return Self::execute_on_multiple_nodes(
Expand Down Expand Up @@ -906,13 +905,8 @@ where
let asking = matches!(&info.redirect, Some(Redirect::Ask(_)));

match info.cmd {
CmdArg::Cmd {
cmd,
routing,
response_policy,
} => {
Self::try_cmd_request(cmd, info.redirect, routing, response_policy, core, asking)
.await
CmdArg::Cmd { cmd, routing } => {
Self::try_cmd_request(cmd, info.redirect, routing, core, asking).await
}
CmdArg::Pipeline {
pipeline,
Expand Down
145 changes: 99 additions & 46 deletions redis/src/cluster_routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,40 @@ pub(crate) enum Redirect {
Ask(String),
}

#[derive(Debug, Clone, Copy)]
pub(crate) enum LogicalAggregateOp {
/// Logical bitwise aggregating operators.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum LogicalAggregateOp {
/// Aggregate by bitwise &&
And,
// Or, omitted due to dead code warnings. ATM this value isn't constructed anywhere
}

#[derive(Debug, Clone, Copy)]
pub(crate) enum AggregateOp {
/// Numerical aggreagting operators.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum AggregateOp {
/// Choose minimal value
Min,
/// Sum all values
Sum,
// Max, omitted due to dead code warnings. ATM this value isn't constructed anywhere
}

#[derive(Debug, Clone, Copy)]
pub(crate) enum ResponsePolicy {
/// Policy on how to combine multiple responses into one.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum ResponsePolicy {
/// Waits for one request to succeed, and return its results. Return error if all requests fail.
OneSucceeded,
/// Waits for one request to succeed with a non-empty value. Returns error if all requests fail or return `Nil`.
OneSucceededNonEmpty,
/// Waits for all requests to succeed, and the returns one of the successes. Returns the error on the first received error.
AllSucceeded,
/// Aggregates success results according to a logical bitwise operator. Returns error on any failed request, or on a response that doesn't conform to 0 or 1.
AggregateLogical(LogicalAggregateOp),
/// Aggregates success results according to a numeric operator. Returns error on any failed request, or on a response that isn't an integer.
Aggregate(AggregateOp),
/// Aggreagte array responses into a single array. Returns error on any failed request, or on a response that isn't an array.
CombineArrays,
/// Handling is not defined by the Redis standard. Will receive a special case
Special,
}

Expand All @@ -44,7 +57,7 @@ pub enum RoutingInfo {
/// Route to single node
SingleNode(SingleNodeRoutingInfo),
/// Route to multiple nodes
MultiNode(MultipleNodeRoutingInfo),
MultiNode((MultipleNodeRoutingInfo, Option<ResponsePolicy>)),
}

/// Defines which single node should receive a request.
Expand Down Expand Up @@ -220,51 +233,55 @@ where
Some(if routes.len() == 1 {
RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(routes.pop().unwrap().0))
} else {
RoutingInfo::MultiNode(MultipleNodeRoutingInfo::MultiSlot(routes))
RoutingInfo::MultiNode((
MultipleNodeRoutingInfo::MultiSlot(routes),
ResponsePolicy::for_command(cmd),
))
})
}

impl RoutingInfo {
pub(crate) fn response_policy<R>(r: &R) -> Option<ResponsePolicy>
where
R: Routable + ?Sized,
{
use ResponsePolicy::*;
let cmd = &r.command()?[..];
impl ResponsePolicy {
/// Parse the command for the matching response policy.
pub fn for_command(cmd: &[u8]) -> Option<ResponsePolicy> {
match cmd {
b"SCRIPT EXISTS" => Some(AggregateLogical(LogicalAggregateOp::And)),
b"SCRIPT EXISTS" => Some(ResponsePolicy::AggregateLogical(LogicalAggregateOp::And)),

b"DBSIZE" | b"DEL" | b"EXISTS" | b"SLOWLOG LEN" | b"TOUCH" | b"UNLINK"
| b"LATENCY RESET" => Some(Aggregate(AggregateOp::Sum)),
b"DBSIZE" | b"DEL" | b"EXISTS" | b"SLOWLOG LEN" | b"TOUCH" | b"UNLINK" => {
Some(ResponsePolicy::Aggregate(AggregateOp::Sum))
}

b"WAIT" => Some(Aggregate(AggregateOp::Min)),
b"WAIT" => Some(ResponsePolicy::Aggregate(AggregateOp::Min)),

b"ACL SETUSER" | b"ACL DELUSER" | b"ACL SAVE" | b"CLIENT SETNAME"
| b"CLIENT SETINFO" | b"CONFIG SET" | b"CONFIG RESETSTAT" | b"CONFIG REWRITE"
| b"FLUSHALL" | b"FLUSHDB" | b"FUNCTION DELETE" | b"FUNCTION FLUSH"
| b"FUNCTION LOAD" | b"FUNCTION RESTORE" | b"MEMORY PURGE" | b"MSET" | b"PING"
| b"SCRIPT FLUSH" | b"SCRIPT LOAD" | b"SLOWLOG RESET" => Some(AllSucceeded),
b"CONFIG SET" | b"FLUSHALL" | b"FLUSHDB" | b"FUNCTION DELETE" | b"FUNCTION FLUSH"
| b"FUNCTION LOAD" | b"FUNCTION RESTORE" | b"LATENCY RESET" | b"MEMORY PURGE"
| b"MSET" | b"PING" | b"SCRIPT FLUSH" | b"SCRIPT LOAD" | b"SLOWLOG RESET" => {
Some(ResponsePolicy::AllSucceeded)
}

b"KEYS" | b"MGET" | b"SLOWLOG GET" => Some(CombineArrays),
b"KEYS" | b"MGET" | b"SLOWLOG GET" => Some(ResponsePolicy::CombineArrays),

b"FUNCTION KILL" | b"SCRIPT KILL" => Some(OneSucceeded),
b"FUNCTION KILL" | b"SCRIPT KILL" => Some(ResponsePolicy::OneSucceeded),

// This isn't based on response_tips, but on the discussion here - https://github.com/redis/redis/issues/12410
b"RANDOMKEY" => Some(OneSucceededNonEmpty),
b"RANDOMKEY" => Some(ResponsePolicy::OneSucceededNonEmpty),

b"LATENCY GRAPH" | b"LATENCY HISTOGRAM" | b"LATENCY HISTORY" | b"LATENCY DOCTOR"
| b"LATENCY LATEST" => Some(Special),
| b"LATENCY LATEST" => Some(ResponsePolicy::Special),

b"FUNCTION STATS" => Some(Special),
b"FUNCTION STATS" => Some(ResponsePolicy::Special),

b"MEMORY MALLOC-STATS" | b"MEMORY DOCTOR" | b"MEMORY STATS" => Some(Special),
b"MEMORY MALLOC-STATS" | b"MEMORY DOCTOR" | b"MEMORY STATS" => {
Some(ResponsePolicy::Special)
}

b"INFO" => Some(Special),
b"INFO" => Some(ResponsePolicy::Special),

_ => None,
}
}
}

impl RoutingInfo {
/// Returns the routing info for `r`.
pub fn for_routable<R>(r: &R, allow_replica: bool) -> Option<RoutingInfo>
where
Expand All @@ -291,14 +308,20 @@ impl RoutingInfo {
| b"MEMORY MALLOC-STATS"
| b"MEMORY DOCTOR"
| b"MEMORY STATS"
| b"INFO" => Some(RoutingInfo::MultiNode(MultipleNodeRoutingInfo::AllMasters)),
| b"INFO" => Some(RoutingInfo::MultiNode((
MultipleNodeRoutingInfo::AllMasters,
ResponsePolicy::for_command(cmd),
))),

b"ACL SETUSER" | b"ACL DELUSER" | b"ACL SAVE" | b"CLIENT SETNAME"
| b"CLIENT SETINFO" | b"SLOWLOG GET" | b"SLOWLOG LEN" | b"SLOWLOG RESET"
| b"CONFIG SET" | b"CONFIG RESETSTAT" | b"CONFIG REWRITE" | b"SCRIPT FLUSH"
| b"SCRIPT LOAD" | b"LATENCY RESET" | b"LATENCY GRAPH" | b"LATENCY HISTOGRAM"
| b"LATENCY HISTORY" | b"LATENCY DOCTOR" | b"LATENCY LATEST" => {
Some(RoutingInfo::MultiNode(MultipleNodeRoutingInfo::AllNodes))
Some(RoutingInfo::MultiNode((
MultipleNodeRoutingInfo::AllNodes,
ResponsePolicy::for_command(cmd),
)))
}

b"MGET" | b"DEL" | b"EXISTS" | b"UNLINK" | b"TOUCH" => multi_shard(r, cmd, 1, false),
Expand Down Expand Up @@ -618,7 +641,10 @@ impl Route {

#[cfg(test)]
mod tests {
use super::{MultipleNodeRoutingInfo, Route, RoutingInfo, SingleNodeRoutingInfo, SlotAddr};
use super::{
AggregateOp, MultipleNodeRoutingInfo, ResponsePolicy, Route, RoutingInfo,
SingleNodeRoutingInfo, SlotAddr,
};
use crate::{cluster_topology::slot, cmd, parser::parse_redis_value};

#[test]
Expand Down Expand Up @@ -697,21 +723,48 @@ mod tests {

// Assert expected RoutingInfo explicitly:

for cmd in vec![
cmd("FLUSHALL"),
cmd("FLUSHDB"),
cmd("DBSIZE"),
cmd("PING"),
cmd("INFO"),
cmd("KEYS"),
cmd("SCRIPT KILL"),
] {
for cmd in [cmd("FLUSHALL"), cmd("FLUSHDB"), cmd("PING")] {
assert_eq!(
RoutingInfo::for_routable(&cmd, false),
Some(RoutingInfo::MultiNode(MultipleNodeRoutingInfo::AllMasters))
Some(RoutingInfo::MultiNode((
MultipleNodeRoutingInfo::AllMasters,
Some(ResponsePolicy::AllSucceeded)
)))
);
}

assert_eq!(
RoutingInfo::for_routable(&cmd("DBSIZE"), false),
Some(RoutingInfo::MultiNode((
MultipleNodeRoutingInfo::AllMasters,
Some(ResponsePolicy::Aggregate(AggregateOp::Sum))
)))
);

assert_eq!(
RoutingInfo::for_routable(&cmd("SCRIPT KILL"), false),
Some(RoutingInfo::MultiNode((
MultipleNodeRoutingInfo::AllMasters,
Some(ResponsePolicy::OneSucceeded)
)))
);

assert_eq!(
RoutingInfo::for_routable(&cmd("INFO"), false),
Some(RoutingInfo::MultiNode((
MultipleNodeRoutingInfo::AllMasters,
Some(ResponsePolicy::Special)
)))
);

assert_eq!(
RoutingInfo::for_routable(&cmd("KEYS"), false),
Some(RoutingInfo::MultiNode((
MultipleNodeRoutingInfo::AllMasters,
Some(ResponsePolicy::CombineArrays)
)))
);

for cmd in vec![
cmd("SCAN"),
cmd("SHUTDOWN"),
Expand Down Expand Up @@ -879,7 +932,7 @@ mod tests {
expected.insert(Route(12182, SlotAddr::Master), vec![1]);

assert!(
matches!(routing.clone(), Some(RoutingInfo::MultiNode(MultipleNodeRoutingInfo::MultiSlot(vec))) if {
matches!(routing.clone(), Some(RoutingInfo::MultiNode((MultipleNodeRoutingInfo::MultiSlot(vec), Some(ResponsePolicy::Aggregate(AggregateOp::Sum))))) if {
let routes = vec.clone().into_iter().collect();
expected == routes
}),
Expand All @@ -895,7 +948,7 @@ mod tests {
expected.insert(Route(12182, SlotAddr::Replica), vec![1]);

assert!(
matches!(routing.clone(), Some(RoutingInfo::MultiNode(MultipleNodeRoutingInfo::MultiSlot(vec))) if {
matches!(routing.clone(), Some(RoutingInfo::MultiNode((MultipleNodeRoutingInfo::MultiSlot(vec), Some(ResponsePolicy::CombineArrays)))) if {
let routes = vec.clone().into_iter().collect();
expected ==routes
}),
Expand Down
10 changes: 8 additions & 2 deletions redis/tests/test_cluster_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1056,7 +1056,10 @@ fn test_async_cluster_route_according_to_passed_argument() {
cmd.arg("test");
let _ = runtime.block_on(connection.send_packed_command(
&cmd,
Some(RoutingInfo::MultiNode(MultipleNodeRoutingInfo::AllMasters)),
Some(RoutingInfo::MultiNode((
MultipleNodeRoutingInfo::AllMasters,
None,
))),
));
{
let mut touched_ports = touched_ports.lock().unwrap();
Expand All @@ -1067,7 +1070,10 @@ fn test_async_cluster_route_according_to_passed_argument() {

let _ = runtime.block_on(connection.send_packed_command(
&cmd,
Some(RoutingInfo::MultiNode(MultipleNodeRoutingInfo::AllNodes)),
Some(RoutingInfo::MultiNode((
MultipleNodeRoutingInfo::AllNodes,
None,
))),
));
{
let mut touched_ports = touched_ports.lock().unwrap();
Expand Down

0 comments on commit bb0171f

Please sign in to comment.