From 8522882f41cf6eda6ea1d8d06d5d0d8bd8dcdaca Mon Sep 17 00:00:00 2001 From: barshaul Date: Wed, 9 Oct 2024 19:51:05 +0000 Subject: [PATCH] some fixes --- redis/src/cluster_async/mod.rs | 70 +++++++++++++++---------------- redis/tests/test_cluster_async.rs | 4 +- 2 files changed, 35 insertions(+), 39 deletions(-) diff --git a/redis/src/cluster_async/mod.rs b/redis/src/cluster_async/mod.rs index 967136ede..87a159e90 100644 --- a/redis/src/cluster_async/mod.rs +++ b/redis/src/cluster_async/mod.rs @@ -1836,6 +1836,7 @@ where core: Core, response_policy: Option, ) -> OperationResult { + trace!("execute_on_multiple_nodes"); let connections_container = core.conn_lock.read().await; if connections_container.is_empty() { return OperationResult::Err(( @@ -2119,20 +2120,22 @@ where } ConnectionCheck::RandomConnection => { let read_guard = core.conn_lock.read().await; - let random_conns_option = read_guard.random_connections(1, ConnectionType::User); - if let Some(mut random_connections) = random_conns_option { - let (random_address, random_conn_future) = - random_connections.next().ok_or(RedisError::from(( + read_guard + .random_connections(1, ConnectionType::User) + .and_then(|mut random_connections| { + random_connections.next().map( + |(random_address, random_conn_future)| async move { + (random_address, random_conn_future.await) + }, + ) + }) + .ok_or_else(|| { + RedisError::from(( ErrorKind::AllConnectionsUnavailable, "No random connection found", - )))?; - return Ok((random_address, random_conn_future.await)); - } else { - return Err(RedisError::from(( - ErrorKind::AllConnectionsUnavailable, - "No random connection found", - ))); - } + )) + })? + .await } }; @@ -2151,23 +2154,21 @@ where RecoverFuture::RecoverSlots(ref mut future) => match ready!(future.as_mut().poll(cx)) { Ok(_) => { trace!("Recovered!"); - (Some(ConnectionState::PollComplete), Poll::Ready(Ok(()))) + (ConnectionState::PollComplete, Poll::Ready(Ok(()))) } Err(err) => { trace!("Recover slots failed!"); let next_state = if err.kind() == ErrorKind::AllConnectionsUnavailable { - Some(ConnectionState::Recover(RecoverFuture::Reconnect( - Box::pin(ClusterConnInner::reconnect_to_initial_nodes( - self.inner.clone(), - )), + ConnectionState::Recover(RecoverFuture::Reconnect(Box::pin( + ClusterConnInner::reconnect_to_initial_nodes(self.inner.clone()), ))) } else { - Some(ConnectionState::Recover(RecoverFuture::RecoverSlots( - Box::pin(Self::refresh_slots_and_subscriptions_with_retries( + ConnectionState::Recover(RecoverFuture::RecoverSlots(Box::pin( + Self::refresh_slots_and_subscriptions_with_retries( self.inner.clone(), &RefreshPolicy::Throttable, - )), + ), ))) }; (next_state, Poll::Ready(Err(err))) @@ -2176,12 +2177,10 @@ where RecoverFuture::Reconnect(ref mut future) => { ready!(future.as_mut().poll(cx)); trace!("Reconnected connections"); - (Some(ConnectionState::PollComplete), Poll::Ready(Ok(()))) + (ConnectionState::PollComplete, Poll::Ready(Ok(()))) } }; - if let Some(state) = next_state { - self.state = state; - } + self.state = next_state; poll } @@ -2476,19 +2475,18 @@ async fn calculate_topology_from_random_nodes<'a, C>( where C: ConnectionLike + Connect + Clone + Send + Sync + 'static, { - let requested_nodes = match read_guard - .random_connections(num_of_nodes_to_query, ConnectionType::PreferManagement) + let requested_nodes = if let Some(random_conns) = + read_guard.random_connections(num_of_nodes_to_query, ConnectionType::PreferManagement) { - Some(random_conns) => random_conns, - None => { - return ( - Err(RedisError::from(( - ErrorKind::AllConnectionsUnavailable, - "No available connections to refresh slots from", - ))), - vec![], - ) - } + random_conns + } else { + return ( + Err(RedisError::from(( + ErrorKind::AllConnectionsUnavailable, + "No available connections to refresh slots from", + ))), + vec![], + ); }; let topology_join_results = futures::future::join_all(requested_nodes.map(|(addr, conn)| async move { diff --git a/redis/tests/test_cluster_async.rs b/redis/tests/test_cluster_async.rs index f64863581..c0a2d12a8 100644 --- a/redis/tests/test_cluster_async.rs +++ b/redis/tests/test_cluster_async.rs @@ -2584,7 +2584,6 @@ mod cluster_async { respond_startup_two_nodes(name, cmd)?; let i = requests.fetch_add(1, atomic::Ordering::SeqCst); match i { - // Respond that the key exists on a node that does not yet have a connection: 0 => Err(Err(RedisError::from((ErrorKind::IoError, "io-error")))), _ => { panic!("Expected not to be retried!") @@ -3294,9 +3293,8 @@ mod cluster_async { } if use_sharded { - let mut cmd = redis::cmd("SPUBLISH"); // validate SPUBLISH - let result = cmd + let result = redis::cmd("SPUBLISH") .arg("test_channel_?") .arg("test_message") .query_async(&mut publishing_con)