Skip to content

Commit

Permalink
Core: Release the read lock while creating connections inrefresh_conn…
Browse files Browse the repository at this point in the history
…ections
  • Loading branch information
barshaul committed Nov 6, 2024
1 parent f0fbaea commit 59386d8
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 43 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
* Node: Add `JSON.STRLEN` and `JSON.STRAPPEND` command ([#2537](https://github.com/valkey-io/valkey-glide/pull/2537))
* Node: Add `FT.SEARCH` ([#2551](https://github.com/valkey-io/valkey-glide/pull/2551))
* Python: Fix example ([#2556](https://github.com/valkey-io/valkey-glide/issues/2556))
* Core: Release the read lock while creating connections in `refresh_connections` ([#2630](https://github.com/valkey-io/valkey-glide/issues/2630))

#### Breaking Changes

Expand Down
90 changes: 47 additions & 43 deletions glide-core/redis-rs/redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1306,51 +1306,55 @@ where
check_existing_conn: bool,
) {
info!("Started refreshing connections to {:?}", addresses);
let connections_container = inner.conn_lock.read().await;
let cluster_params = &inner.cluster_params;
let subscriptions_by_address = &inner.subscriptions_by_address;
let glide_connection_options = &inner.glide_connection_options;
let mut tasks = FuturesUnordered::new();
let inner = inner.clone();

stream::iter(addresses.into_iter())
.fold(
&*connections_container,
|connections_container, address| async move {
let node_option = if check_existing_conn {
connections_container.remove_node(&address)
} else {
None
};
for address in addresses.into_iter() {
let inner = inner.clone();

// override subscriptions for this connection
let mut cluster_params = cluster_params.clone();
let subs_guard = subscriptions_by_address.read().await;
cluster_params.pubsub_subscriptions = subs_guard.get(&address).cloned();
drop(subs_guard);
let node = get_or_create_conn(
&address,
node_option,
&cluster_params,
conn_type,
glide_connection_options.clone(),
)
.await;
match node {
Ok(node) => {
connections_container
.replace_or_add_connection_for_address(address, node);
}
Err(err) => {
warn!(
"Failed to refresh connection for node {}. Error: `{:?}`",
address, err
);
}
}
connections_container
},
)
.await;
info!("refresh connections completed");
tasks.push(async move {
let node_option = if check_existing_conn {
let connections_container = inner.conn_lock.read().await;
connections_container.remove_node(&address)
} else {
None
};

// Override subscriptions for this connection
let mut cluster_params = inner.cluster_params.clone();
let subs_guard = inner.subscriptions_by_address.read().await;
cluster_params.pubsub_subscriptions = subs_guard.get(&address).cloned();
drop(subs_guard);

let node = get_or_create_conn(
&address,
node_option,
&cluster_params,
conn_type,
inner.glide_connection_options.clone(),
)
.await;

(address, node)
});
}

// Poll connection tasks as soon as each one finishes
while let Some(result) = tasks.next().await {
match result {
(address, Ok(node)) => {
let connections_container = inner.conn_lock.read().await;
connections_container.replace_or_add_connection_for_address(address, node);
}
(address, Err(err)) => {
warn!(
"Failed to refresh connection for node {}. Error: `{:?}`",
address, err
);
}
}
}
debug!("refresh connections completed");
}

async fn aggregate_results(
Expand Down

0 comments on commit 59386d8

Please sign in to comment.