Skip to content

Commit

Permalink
Fixing PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
barshaul committed Dec 28, 2023
1 parent 877e5f2 commit d31c834
Showing 1 changed file with 11 additions and 34 deletions.
45 changes: 11 additions & 34 deletions redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1720,33 +1720,25 @@ fn warn_management_conn_faild(addr: &str, err: RedisError) {
);
}

fn create_or_change_async_node<C>(
fn create_async_node<C>(
user_conn: C,
management_conn: Option<C>,
ip: Option<IpAddr>,
node: Option<AsyncClusterNode<C>>,
) -> AsyncClusterNode<C>
where
C: ConnectionLike + Connect + Send + Sync + 'static + Clone,
{
let user_conn = async { user_conn }.boxed().shared();
let management_conn =
management_conn.map(|management_conn| async { management_conn }.boxed().shared());
if let Some(mut node) = node {
node.user_connection = user_conn;
node.management_connection = management_conn;
node.ip = ip;
node
} else {
AsyncClusterNode::new(user_conn, management_conn, ip)
}

AsyncClusterNode::new(user_conn, management_conn, ip)
}

async fn connect_and_check_all_connections<C>(
addr: &str,
params: ClusterParams,
socket_addr: Option<SocketAddr>,
node: Option<AsyncClusterNode<C>>,
) -> RedisResult<AsyncClusterNode<C>>
where
C: ConnectionLike + Connect + Send + Sync + 'static + Clone,
Expand All @@ -1769,29 +1761,19 @@ where
.await
.ok()
.map(|_| management_conn);
Ok(create_or_change_async_node(
user_conn,
management_conn,
user_ip,
node,
))
Ok(create_async_node(user_conn, management_conn, user_ip))
} else {
// Use only the connection with the latest IP address
warn_mismatch_ip(addr, user_ip, management_ip);
if has_dns_changed(addr, &user_ip.unwrap()).await {
// The user_ip is incorrect. Use the created `management_conn` for the user connection
user_conn = management_conn;
setup_user_connection(&mut user_conn, params).await?;
Ok(create_or_change_async_node(
user_conn,
None,
management_ip,
node,
))
Ok(create_async_node(user_conn, None, management_ip))
} else {
// The user_ip is correct. Use the user connetion and drop the management connection
setup_user_connection(&mut user_conn, params).await?;
Ok(create_or_change_async_node(user_conn, None, user_ip, node))
Ok(create_async_node(user_conn, None, user_ip))
}
}
}
Expand All @@ -1800,7 +1782,7 @@ where
warn_management_conn_faild(addr, err);
let (mut user_conn, user_ip): (C, Option<IpAddr>) = conn;
setup_user_connection(&mut user_conn, params).await?;
Ok(create_or_change_async_node(user_conn, None, user_ip, node))
Ok(create_async_node(user_conn, None, user_ip))
}
(Err(err_1), Err(err_2)) => {
// Neither of the connections succeeded.
Expand Down Expand Up @@ -1896,14 +1878,9 @@ where
.ok()
.map(|(conn, _ip): (C, Option<IpAddr>)| conn);
}
Ok(create_or_change_async_node(
user_conn,
management_conn,
ip,
Some(node),
))
Ok(create_async_node(user_conn, management_conn, ip))
} else {
Ok(create_or_change_async_node(user_conn, None, ip, None))
Ok(create_async_node(user_conn, None, ip))
}
}
RefreshConnectionType::OnlyManagementConnection => {
Expand All @@ -1912,11 +1889,11 @@ where
Some(node) => {
connect_and_check_only_management_conn(addr, params, socket_addr, node).await
}
None => connect_and_check_all_connections(addr, params, socket_addr, node).await,
None => connect_and_check_all_connections(addr, params, socket_addr).await,
}
}
RefreshConnectionType::AllConnections => {
connect_and_check_all_connections(addr, params, socket_addr, node).await
connect_and_check_all_connections(addr, params, socket_addr).await
}
}
}
Expand Down

0 comments on commit d31c834

Please sign in to comment.