-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce a fast reconnect process for async cluster connections. #184
Conversation
241039a
to
1ae1d15
Compare
4124c7b
to
1592f57
Compare
1592f57
to
56beb8f
Compare
The process is periodic and can be configured via ClusterParams. This process ensures that all expected user connections exist and have not been passively closed. The expected connections are calculated from the current slot map. Additionally, for the Tokio runtime, an instant disconnect notification is available, allowing the reconnect process to be triggered instantly without waiting for the periodic check. This process is especially important for pub/sub support, as passive disconnects can render a pub/sub subscriber inoperative. Three integration tests are introduced with this feature: a generic fast reconnect test, pub/sub resilience to passive disconnects, and pub/sub resilience to scale-out.
56beb8f
to
2c7faec
Compare
redis/src/client.rs
Outdated
push_sender: Option<mpsc::UnboundedSender<PushInfo>>, | ||
disconnect_notifier: Option<Box<dyn DisconnectNotifier>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the context of redis-rs
, this would be a breaking change since it's an exposed user API. However, since we're only using it internally within Glide, and if we're ok with breaking these APIs, I think it's the right time to modify this function to accept a ConnectionOptions
struct (or another appropriate name) that internally holds all connection handlers/options. This change would reduce the need to modify the entire chain of internal function calls (like get_multiplexed_async_connection_with_timeouts
, etc.) and fixing all tests that use these APIs each time we add a new option.
What do you think about changing it to:
pub struct ConnectionOptions {
push_sender: Option<mpsc::UnboundedSender<PushInfo>>,
disconnect_notifier: Option<Box<dyn DisconnectNotifier>>,
}
pub async fn get_multiplexed_async_connection(
&self,
connection_options: ConnectionOptions
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
redis/src/cluster_async/mod.rs
Outdated
disconnect_notifier: Option<Box<dyn DisconnectNotifier>>, | ||
#[cfg(feature = "tokio-comp")] | ||
tokio_notify: Arc<Notify>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of passing tokio notify in another parameter, why not to expand the DisconnectNotifier trait to have a notified API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, that was my original try, but got :
102 | async fn notified(&mut self);
| -----^^^^^^^^^^^^^^^^^^^^^^^^
| |
| `async` because of this
|
= note: `async` trait functions are not currently supported
There might be some crates that do it, but i dont want to go down this rabbit hole.
Do you know how to do it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#[async_trait::async_trait]
@@ -1145,22 +1217,93 @@ where | |||
} | |||
} | |||
|
|||
// Validate all existing user connections and try to reconnect if nessesary. | |||
// In addition, as a safety measure, drop nodes that do not have any assigned slots. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with removing connections that aren’t found in the slot map is that we might inadvertently remove newly added nodes received through a MOVED error before they are added to the slot map. This issue will persist even after fixing MOVED errors to update the slot map for specific slots, because updating the slot map based on a MOVED error is handled inside the refresh_slots task, which is spawned separately. Meanwhile, new connections can be established in the get_connection method after a MOVED error and might execute before the refresh_slots task runs.
For example, consider the following scenario:
- A MOVED error is received with a new node address X.
- The
refresh_slots
task is spawned to update the specific slot or perform a full slot refresh. - The request that received the MOVED error calls
get_connection
and creates a new connection for the moved node X. validate_all_user_connections
is called, finds X in the connections map but not in the slots map, and removes it from the connection map.- Another request encountering the same MOVED error doesn’t find the connection and creates a new one.
- This cycle continues until the node X is eventually added to the slots map, which might happen quickly or could take longer if a full slots refresh is required and multiple iterations are needed for it to complete.
Given that during a full refresh_slots operation the connection map is completely replaced with a new one that contains only the nodes from the newly discovered map, the risk of connection leaks accumulating over time is minimal. Therefore, in weighing the tradeoff between prematurely removing connections (which could lead to closing new connections repeatedly causing higher latency and risking connection storms) versus potentially storing non-relevant connections temporarily, it might be safer to leave the cleanup to be handled solely by the refresh_slots process.
However, if we skip the cleanup, we need to ensure that nodes present in the connection map but not in the slot map aren't added to addrs_to_refresh
. Otherwise, we risk repeatedly trying to refresh the connection of a stale node.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I am aware of that behavior.
First of all, periodic syncing must include both adding and removing the connections as required by the source of truth (the slots map), there is no way around this symmetry requirement.
Secondly this behavior is due to insufficient step (3) - if we create a connection, than it means we believe it is valid, and so, should update the slot map. Only creating the connection is not sufficient.
I thought to complement the step (3), but we discussed it and agreed that you`ll complement it in a more specific work. Do you want me to do it in this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed, will be addressed by a specific work
redis/src/cluster_async/mod.rs
Outdated
connections_container | ||
.slot_map | ||
.addresses_for_all_nodes() | ||
.iter() | ||
.for_each(|addr| { | ||
all_nodes_with_slots.insert(String::from(*addr)); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
connections_container | |
.slot_map | |
.addresses_for_all_nodes() | |
.iter() | |
.for_each(|addr| { | |
all_nodes_with_slots.insert(String::from(*addr)); | |
}); | |
all_nodes_with_slots = connections_container | |
.slot_map | |
.addresses_for_all_nodes() | |
.iter() | |
.map(|addr| String::from(*addr)) | |
.collect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
let mut addrs_to_refresh = Vec::new(); | ||
for (addr, con_fut) in &all_valid_conns { | ||
let con = con_fut.clone().await; | ||
if con.is_closed() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the distinction between connections that are still present in the connection map with is_closed set to true and those that have been removed because the client failed to reestablish their connection isn’t clear. Could you document this difference, when are we expecting to see each?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, will do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// dont try existing nodes since we know a. it does not exist. b. exist but its connection is closed | ||
Self::refresh_connections( | ||
inner.clone(), | ||
addrs_to_refresh, | ||
RefreshConnectionType::AllConnections, | ||
false, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// dont try existing nodes since we know a. it does not exist. b. exist but its connection is closed | |
Self::refresh_connections( | |
inner.clone(), | |
addrs_to_refresh, | |
RefreshConnectionType::AllConnections, | |
false, | |
) | |
Self::refresh_connections( | |
inner.clone(), | |
addrs_to_refresh, | |
RefreshConnectionType::AllConnections, | |
// dont check the existing connections since we know a. it does not exist, or b. exist but its connection is closed | |
false, | |
) |
or
// dont try existing nodes since we know a. it does not exist. b. exist but its connection is closed | |
Self::refresh_connections( | |
inner.clone(), | |
addrs_to_refresh, | |
RefreshConnectionType::AllConnections, | |
false, | |
) | |
// dont check the existing connections since we know a. it does not exist, or b. exist but its connection is closed | |
let check_existing_conns = false; | |
Self::refresh_connections( | |
inner.clone(), | |
addrs_to_refresh, | |
RefreshConnectionType::AllConnections, | |
check_existing_conns, | |
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the first option is better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
redis/src/cluster_async/mod.rs
Outdated
async fn refresh_connections( | ||
inner: Arc<InnerCore<C>>, | ||
addresses: Vec<String>, | ||
conn_type: RefreshConnectionType, | ||
try_existing_node: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this option isn't clear.
maybe rename to check_existing_conn and document what it does
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
redis/src/cluster_async/mod.rs
Outdated
let node_option = if try_existing_node { | ||
connections_container.remove_node(&address) | ||
} else { | ||
Option::None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Option::None | |
None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
redis/src/cluster_async/mod.rs
Outdated
#[cfg(feature = "tokio-comp")] | ||
let _ = timeout(interval_duration, async { | ||
inner.tokio_notify.notified().await; | ||
}) | ||
.await; | ||
#[cfg(not(feature = "tokio-comp"))] | ||
let _ = boxed_sleep(interval_duration).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#[cfg(feature = "tokio-comp")] | |
let _ = timeout(interval_duration, async { | |
inner.tokio_notify.notified().await; | |
}) | |
.await; | |
#[cfg(not(feature = "tokio-comp"))] | |
let _ = boxed_sleep(interval_duration).await; | |
#[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] | |
use async_std::future::timeout; | |
#[cfg(feature = "tokio-comp")] | |
use tokio::time::timeout; | |
if let Some(notifier) = inner.disconnect_notifier { | |
timeout(interval_duration, inner.disconnect_notifier.notified()).await; | |
} else { | |
let _ = boxed_sleep(interval_duration).await; | |
} | |
... | |
} | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice, will try
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
18b4925
to
9040c07
Compare
9040c07
to
24c19dd
Compare
Introduce a fast reconnect process for async cluster connections.
The process is periodic and can be configured via ClusterParams.
This process ensures that all expected user connections exist and have not been passively closed.
The expected connections are calculated from the current slot map.
Additionally, for the Tokio runtime, an instant disconnect notification is available, allowing the reconnect process to be triggered instantly without waiting for the periodic check.
This process is especially important for pub/sub support, as passive disconnects can render a pub/sub subscriber inoperative. Three integration tests are introduced with this feature: a generic fast reconnect test, pub/sub resilience to passive disconnects, and pub/sub resilience to scale-out.
Note! This PR must be followed by a PR to glide-core. implementing the similar functionality for CMD
Issue #, if available:
valkey-io/valkey-glide#2042