From 84d09a636a54b0134afbb3100c98d1151b679ed1 Mon Sep 17 00:00:00 2001 From: eifrah-aws Date: Tue, 5 Nov 2024 12:50:52 +0200 Subject: [PATCH] Added `Telemetry` support for GLIDE client (core only) (#2545) --- glide-core/Cargo.toml | 3 + glide-core/redis-rs/redis/Cargo.toml | 2 + .../cluster_async/connections_container.rs | 67 +++++++++++- .../src/cluster_async/connections_logic.rs | 5 +- .../redis-rs/redis/src/cluster_async/mod.rs | 28 +++-- glide-core/src/client/mod.rs | 30 +++--- .../src/client/reconnecting_connection.rs | 29 ++++- glide-core/src/client/standalone_client.rs | 30 ++++-- glide-core/src/lib.rs | 1 + glide-core/telemetry/Cargo.toml | 11 ++ glide-core/telemetry/src/lib.rs | 68 ++++++++++++ glide-core/tests/test_client.rs | 100 +++++++++++++++++- glide-core/tests/utilities/cluster.rs | 16 ++- 13 files changed, 349 insertions(+), 41 deletions(-) create mode 100644 glide-core/telemetry/Cargo.toml create mode 100644 glide-core/telemetry/src/lib.rs diff --git a/glide-core/Cargo.toml b/glide-core/Cargo.toml index 150c0ff33d..bbbf08e0cc 100644 --- a/glide-core/Cargo.toml +++ b/glide-core/Cargo.toml @@ -18,6 +18,7 @@ redis = { path = "./redis-rs/redis", features = [ "cluster", "cluster-async", ] } +telemetrylib = { path = "./telemetry" } tokio = { version = "1", features = ["macros", "time"] } logger_core = { path = "../logger_core" } dispose = "0.5.0" @@ -37,6 +38,8 @@ once_cell = "1.18.0" sha1_smol = "1.0.0" nanoid = "0.4.0" async-trait = { version = "0.1.24" } +serde_json = "1" +serde = { version = "1", features = ["derive"] } [features] socket-layer = [ diff --git a/glide-core/redis-rs/redis/Cargo.toml b/glide-core/redis-rs/redis/Cargo.toml index 0179669eaf..6311869e10 100644 --- a/glide-core/redis-rs/redis/Cargo.toml +++ b/glide-core/redis-rs/redis/Cargo.toml @@ -102,6 +102,8 @@ tracing = "0.1" # Optional uuid support uuid = { version = "1.6.1", optional = true } +telemetrylib = { path = "../../telemetry" } + [features] default = [ "acl", diff --git a/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs b/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs index 9d44b25cab..1f01a59d13 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/connections_container.rs @@ -6,6 +6,22 @@ use dashmap::DashMap; use futures::FutureExt; use rand::seq::IteratorRandom; use std::net::IpAddr; +use telemetrylib::Telemetry; + +/// Count the number of connections in a connections_map object +macro_rules! count_connections { + ($conn_map:expr) => {{ + let mut count = 0usize; + for a in $conn_map { + count = count.saturating_add(if a.management_connection.is_some() { + 2 + } else { + 1 + }); + } + count + }}; +} /// A struct that encapsulates a network connection along with its associated IP address. #[derive(Clone, Eq, PartialEq, Debug)] @@ -66,6 +82,15 @@ where } } + /// Return the number of underlying connections managed by this instance of ClusterNode + pub fn connections_count(&self) -> usize { + if self.management_connection.is_some() { + 2 + } else { + 1 + } + } + pub(crate) fn get_connection(&self, conn_type: &ConnectionType) -> Connection { match conn_type { ConnectionType::User => self.user_connection.conn.clone(), @@ -106,6 +131,13 @@ pub(crate) struct ConnectionsContainer { topology_hash: TopologyHash, } +impl Drop for ConnectionsContainer { + fn drop(&mut self) { + let count = count_connections!(&self.connection_map); + Telemetry::decr_total_connections(count); + } +} + impl Default for ConnectionsContainer { fn default() -> Self { Self { @@ -129,8 +161,14 @@ where read_from_replica_strategy: ReadFromReplicaStrategy, topology_hash: TopologyHash, ) -> Self { + let connection_map = connection_map.0; + + // Update the telemetry with the number of connections + let count = count_connections!(&connection_map); + Telemetry::incr_total_connections(count); + Self { - connection_map: connection_map.0, + connection_map, slot_map, read_from_replica_strategy, topology_hash, @@ -142,7 +180,11 @@ where &mut self, other_connection_map: ConnectionsMap, ) { + let conn_count_before = count_connections!(&self.connection_map); self.connection_map.extend(other_connection_map.0); + let conn_count_after = count_connections!(&self.connection_map); + // Update the number of connections by the difference + Telemetry::incr_total_connections(conn_count_after.saturating_sub(conn_count_before)); } /// Returns true if the address represents a known primary node. @@ -275,20 +317,35 @@ where node: ClusterNode, ) -> String { let address = address.into(); - self.connection_map.insert(address.clone(), node); + + // Increase the total number of connections by the number of connections managed by `node` + Telemetry::incr_total_connections(node.connections_count()); + + if let Some(old_conn) = self.connection_map.insert(address.clone(), node) { + // We are replacing a node. Reduce the counter by the number of connections managed by + // the old connection + Telemetry::decr_total_connections(old_conn.connections_count()); + }; address } pub(crate) fn remove_node(&self, address: &String) -> Option> { - self.connection_map - .remove(address) - .map(|(_key, value)| value) + if let Some((_key, old_conn)) = self.connection_map.remove(address) { + Telemetry::decr_total_connections(old_conn.connections_count()); + Some(old_conn) + } else { + None + } } pub(crate) fn len(&self) -> usize { self.connection_map.len() } + pub(crate) fn connection_map(&self) -> &DashMap> { + &self.connection_map + } + pub(crate) fn get_current_topology_hash(&self) -> TopologyHash { self.topology_hash } diff --git a/glide-core/redis-rs/redis/src/cluster_async/connections_logic.rs b/glide-core/redis-rs/redis/src/cluster_async/connections_logic.rs index 7a2306ca19..25dfcbb9d1 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/connections_logic.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/connections_logic.rs @@ -1,5 +1,3 @@ -use std::net::SocketAddr; - use super::{ connections_container::{ClusterNode, ConnectionWithIp}, Connect, @@ -11,6 +9,7 @@ use crate::{ cluster_client::ClusterParams, ErrorKind, RedisError, RedisResult, }; +use std::net::SocketAddr; use futures::prelude::*; use futures_util::{future::BoxFuture, join}; @@ -113,6 +112,7 @@ where C: ConnectionLike + Connect + Send + Sync + 'static + Clone, { match future::join( + // User connection create_connection( addr, params.clone(), @@ -120,6 +120,7 @@ where false, glide_connection_options.clone(), ), + // Management connection create_connection( addr, params.clone(), diff --git a/glide-core/redis-rs/redis/src/cluster_async/mod.rs b/glide-core/redis-rs/redis/src/cluster_async/mod.rs index 28024f7649..5deba4b534 100644 --- a/glide-core/redis-rs/redis/src/cluster_async/mod.rs +++ b/glide-core/redis-rs/redis/src/cluster_async/mod.rs @@ -56,6 +56,7 @@ use tokio::task::JoinHandle; #[cfg(feature = "tokio-comp")] use crate::aio::DisconnectNotifier; +use telemetrylib::Telemetry; use crate::{ aio::{get_socket_addrs, ConnectionLike, MultiplexedConnection, Runtime}, @@ -144,7 +145,7 @@ where /// # Arguments /// /// * `scan_state_rc` - A reference to the scan state, For initiating new scan send [`ScanStateRC::new()`], - /// for each subsequent iteration use the returned [`ScanStateRC`]. + /// for each subsequent iteration use the returned [`ScanStateRC`]. /// * `count` - An optional count of keys requested, /// the amount returned can vary and not obligated to return exactly count. /// * `object_type` - An optional [`ObjectType`] enum of requested key redis type. @@ -181,7 +182,7 @@ where /// break; /// } /// } - /// keys + /// keys /// } /// ``` pub async fn cluster_scan( @@ -241,7 +242,7 @@ where /// break; /// } /// } - /// keys + /// keys /// } /// ``` pub async fn cluster_scan_with_pattern( @@ -495,14 +496,27 @@ pub(crate) struct ClusterConnInner { impl Dispose for ClusterConnInner { fn dispose(self) { + if let Ok(conn_lock) = self.inner.conn_lock.try_read() { + // Each node may contain user and *maybe* a management connection + let mut count = 0usize; + for node in conn_lock.connection_map() { + count = node.connections_count(); + } + Telemetry::decr_total_connections(count); + } + if let Some(handle) = self.periodic_checks_handler { #[cfg(feature = "tokio-comp")] handle.abort() } + if let Some(handle) = self.connections_validation_handler { #[cfg(feature = "tokio-comp")] handle.abort() } + + // Reduce the number of clients + Telemetry::decr_total_clients(1); } } @@ -1080,6 +1094,8 @@ where } } + // New client added + Telemetry::incr_total_clients(1); Ok(Disposable::new(connection)) } @@ -1185,7 +1201,7 @@ where Ok(connections.0) } - // Reconnet to the initial nodes provided by the user in the creation of the client, + // Reconnect to the initial nodes provided by the user in the creation of the client, // and try to refresh the slots based on the initial connections. // Being used when all cluster connections are unavailable. fn reconnect_to_initial_nodes(inner: Arc>) -> impl Future { @@ -1272,7 +1288,7 @@ where ); if !addrs_to_refresh.is_empty() { - // dont try existing nodes since we know a. it does not exist. b. exist but its connection is closed + // don't try existing nodes since we know a. it does not exist. b. exist but its connection is closed Self::refresh_connections( inner.clone(), addrs_to_refresh, @@ -1481,7 +1497,7 @@ where }); let wait_duration = rate_limiter.wait_duration(); if passed_time <= wait_duration { - debug!("Skipping slot refresh as the wait duration hasn't yet passed. Passed time = {:?}, + debug!("Skipping slot refresh as the wait duration hasn't yet passed. Passed time = {:?}, Wait duration = {:?}", passed_time, wait_duration); skip_slots_refresh = true; } diff --git a/glide-core/src/client/mod.rs b/glide-core/src/client/mod.rs index 6a8ebe278a..0ed66c4217 100644 --- a/glide-core/src/client/mod.rs +++ b/glide-core/src/client/mod.rs @@ -31,21 +31,22 @@ pub const DEFAULT_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_millis(2 pub const DEFAULT_PERIODIC_TOPOLOGY_CHECKS_INTERVAL: Duration = Duration::from_secs(60); pub const INTERNAL_CONNECTION_TIMEOUT: Duration = Duration::from_millis(250); pub const FINISHED_SCAN_CURSOR: &str = "finished"; -// The value of 1000 for the maximum number of inflight requests is determined based on Little's Law in queuing theory: -// -// Expected maximum request rate: 50,000 requests/second -// Expected response time: 1 millisecond -// -// According to Little's Law, the maximum number of inflight requests required to fully utilize the maximum request rate is: -// (50,000 requests/second) × (1 millisecond / 1000 milliseconds) = 50 requests -// -// The value of 1000 provides a buffer for bursts while still allowing full utilization of the maximum request rate. + +/// The value of 1000 for the maximum number of inflight requests is determined based on Little's Law in queuing theory: +/// +/// Expected maximum request rate: 50,000 requests/second +/// Expected response time: 1 millisecond +/// +/// According to Little's Law, the maximum number of inflight requests required to fully utilize the maximum request rate is: +/// (50,000 requests/second) × (1 millisecond / 1000 milliseconds) = 50 requests +/// +/// The value of 1000 provides a buffer for bursts while still allowing full utilization of the maximum request rate. pub const DEFAULT_MAX_INFLIGHT_REQUESTS: u32 = 1000; -// The connection check interval is currently not exposed to the user via ConnectionRequest, -// as improper configuration could negatively impact performance or pub/sub resiliency. -// A 3-second interval provides a reasonable balance between connection validation -// and performance overhead. +/// The connection check interval is currently not exposed to the user via ConnectionRequest, +/// as improper configuration could negatively impact performance or pub/sub resiliency. +/// A 3-second interval provides a reasonable balance between connection validation +/// and performance overhead. pub const CONNECTION_CHECKS_INTERVAL: Duration = Duration::from_secs(3); pub(super) fn get_port(address: &NodeAddress) -> u16 { @@ -712,8 +713,7 @@ impl Client { }) }) .await - .map_err(|_| ConnectionError::Timeout) - .and_then(|res| res) + .map_err(|_| ConnectionError::Timeout)? } } diff --git a/glide-core/src/client/reconnecting_connection.rs b/glide-core/src/client/reconnecting_connection.rs index 14311173ee..245e7900d3 100644 --- a/glide-core/src/client/reconnecting_connection.rs +++ b/glide-core/src/client/reconnecting_connection.rs @@ -13,6 +13,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::sync::Mutex; use std::time::Duration; +use telemetrylib::Telemetry; use tokio::sync::{mpsc, Notify}; use tokio::task; use tokio::time::timeout; @@ -20,6 +21,15 @@ use tokio_retry::Retry; use super::{run_with_timeout, DEFAULT_CONNECTION_ATTEMPT_TIMEOUT}; +/// The reason behind the call to `reconnect()` +#[derive(PartialEq, Eq, Debug, Clone)] +pub enum ReconnectReason { + /// A connection was dropped (for any reason) + ConnectionDropped, + /// Connection creation error + CreateError, +} + /// The object that is used in order to recreate a connection after a disconnect. struct ConnectionBackend { /// This signal is reset when a connection disconnects, and set when a new `ConnectionState` has been set with a `Connected` state. @@ -125,6 +135,7 @@ async fn create_connection( .addr ), ); + Telemetry::incr_total_connections(1); Ok(ReconnectingConnection { inner: Arc::new(InnerReconnectingConnection { state: Mutex::new(ConnectionState::Connected(connection)), @@ -151,7 +162,7 @@ async fn create_connection( }), connection_options, }; - connection.reconnect(); + connection.reconnect(ReconnectReason::CreateError); Err((connection, err)) } } @@ -221,6 +232,9 @@ impl ReconnectingConnection { } pub(super) fn mark_as_dropped(&self) { + // Update the telemetry for each connection that is dropped. A dropped connection + // will not be re-connected, so update the telemetry here + Telemetry::decr_total_connections(1); self.inner .backend .client_dropped_flagged @@ -245,7 +259,10 @@ impl ReconnectingConnection { } } - pub(super) fn reconnect(&self) { + /// Attempt to re-connect the connection. + /// + /// This function spawns a task to perform the reconnection in the background + pub(super) fn reconnect(&self, reason: ReconnectReason) { { let mut guard = self.inner.state.lock().unwrap(); if matches!(*guard, ConnectionState::Reconnecting) { @@ -259,6 +276,13 @@ impl ReconnectingConnection { log_debug("reconnect", "starting"); let connection_clone = self.clone(); + + if reason.eq(&ReconnectReason::ConnectionDropped) { + // Attempting to reconnect a connection that was dropped (for any reason) - update the telemetry by reducing + // the number of opened connections by 1, it will be incremented by 1 after a successful re-connect + Telemetry::decr_total_connections(1); + } + // The reconnect task is spawned instead of awaited here, so that the reconnect attempt will continue in the // background, regardless of whether the calling task is dropped or not. task::spawn(async move { @@ -293,6 +317,7 @@ impl ReconnectingConnection { .set(); *guard = ConnectionState::Connected(connection); } + Telemetry::incr_total_connections(1); return; } Err(_) => tokio::time::sleep(sleep_duration).await, diff --git a/glide-core/src/client/standalone_client.rs b/glide-core/src/client/standalone_client.rs index b1a699a546..961f67e516 100644 --- a/glide-core/src/client/standalone_client.rs +++ b/glide-core/src/client/standalone_client.rs @@ -2,7 +2,7 @@ * Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ use super::get_redis_connection_info; -use super::reconnecting_connection::ReconnectingConnection; +use super::reconnecting_connection::{ReconnectReason, ReconnectingConnection}; use super::{ConnectionRequest, NodeAddress, TlsMode}; use crate::retry_strategies::RetryStrategy; use futures::{future, stream, StreamExt}; @@ -14,6 +14,7 @@ use redis::cluster_routing::{self, is_readonly_cmd, ResponsePolicy, Routable, Ro use redis::{PushInfo, RedisError, RedisResult, Value}; use std::sync::atomic::AtomicUsize; use std::sync::Arc; +use telemetrylib::Telemetry; use tokio::sync::mpsc; use tokio::task; @@ -46,6 +47,13 @@ pub struct StandaloneClient { inner: Arc, } +impl Drop for StandaloneClient { + fn drop(&mut self) { + // Client was dropped, reduce the number of clients + Telemetry::decr_total_clients(1); + } +} + pub enum StandaloneClientConnectionError { NoAddressesProvided, FailedConnection(Vec<(Option, RedisError)>), @@ -193,6 +201,9 @@ impl StandaloneClient { Self::start_periodic_connection_check(node.clone()); } + // Successfully created new client. Update the telemetry + Telemetry::incr_total_clients(1); + Ok(Self { inner: Arc::new(DropWrapper { primary_index, @@ -260,7 +271,7 @@ impl StandaloneClient { match result { Err(err) if err.is_unrecoverable_error() => { log_warn("send request", format!("received disconnect error `{err}`")); - reconnecting_connection.reconnect(); + reconnecting_connection.reconnect(ReconnectReason::ConnectionDropped); Err(err) } _ => result, @@ -377,7 +388,7 @@ impl StandaloneClient { "pipeline request", format!("received disconnect error `{err}`"), ); - reconnecting_connection.reconnect(); + reconnecting_connection.reconnect(ReconnectReason::ConnectionDropped); Err(err) } _ => result, @@ -414,7 +425,7 @@ impl StandaloneClient { .is_err_and(|err| err.is_connection_dropped() || err.is_connection_refusal()) { log_debug("StandaloneClient", "heartbeat triggered reconnect"); - reconnecting_connection.reconnect(); + reconnecting_connection.reconnect(ReconnectReason::ConnectionDropped); } } }); @@ -435,6 +446,7 @@ impl StandaloneClient { "StandaloneClient", "connection checker stopped after connection was dropped", ); + // Client was dropped, checker can stop. return; } @@ -453,7 +465,7 @@ impl StandaloneClient { "StandaloneClient", "connection checker has triggered reconnect", ); - reconnecting_connection.reconnect(); + reconnecting_connection.reconnect(ReconnectReason::ConnectionDropped); } } }); @@ -483,7 +495,8 @@ async fn get_connection_and_replication_info( let mut multiplexed_connection = match reconnecting_connection.get_connection().await { Ok(multiplexed_connection) => multiplexed_connection, Err(err) => { - reconnecting_connection.reconnect(); + // NOTE: this block is never reached + reconnecting_connection.reconnect(ReconnectReason::ConnectionDropped); return Err((reconnecting_connection, err)); } }; @@ -492,7 +505,10 @@ async fn get_connection_and_replication_info( .send_packed_command(redis::cmd("INFO").arg("REPLICATION")) .await { - Ok(replication_status) => Ok((reconnecting_connection, replication_status)), + Ok(replication_status) => { + // Connection established + we got the INFO output + Ok((reconnecting_connection, replication_status)) + } Err(err) => Err((reconnecting_connection, err)), } } diff --git a/glide-core/src/lib.rs b/glide-core/src/lib.rs index 8da08e99f9..9b22a8bb55 100644 --- a/glide-core/src/lib.rs +++ b/glide-core/src/lib.rs @@ -17,3 +17,4 @@ pub mod scripts_container; pub use client::ConnectionRequest; pub mod cluster_scan_container; pub mod request_type; +pub use telemetrylib::Telemetry; diff --git a/glide-core/telemetry/Cargo.toml b/glide-core/telemetry/Cargo.toml new file mode 100644 index 0000000000..73b9cb25ea --- /dev/null +++ b/glide-core/telemetry/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "telemetrylib" +version = "0.1.0" +edition = "2021" +license = "Apache-2.0" +authors = ["Valkey GLIDE Maintainers"] + +[dependencies] +lazy_static = "1" +serde = { version = "1", features = ["derive"] } +serde_json = "1" diff --git a/glide-core/telemetry/src/lib.rs b/glide-core/telemetry/src/lib.rs new file mode 100644 index 0000000000..886e43a2c8 --- /dev/null +++ b/glide-core/telemetry/src/lib.rs @@ -0,0 +1,68 @@ +use lazy_static::lazy_static; +use serde::Serialize; +use std::sync::RwLock as StdRwLock; + +#[derive(Default, Serialize)] +#[allow(dead_code)] +pub struct Telemetry { + /// Total number of connections opened to Valkey + total_connections: usize, + /// Total number of GLIDE clients + total_clients: usize, +} + +lazy_static! { + static ref TELEMETRY: StdRwLock = StdRwLock::::default(); +} + +const MUTEX_WRITE_ERR: &str = "Failed to obtain write lock for mutex. Poisoned mutex"; +const MUTEX_READ_ERR: &str = "Failed to obtain read lock for mutex. Poisoned mutex"; + +impl Telemetry { + /// Increment the total number of connections by `incr_by` + /// Return the number of total connections after the increment + pub fn incr_total_connections(incr_by: usize) -> usize { + let mut t = TELEMETRY.write().expect(MUTEX_WRITE_ERR); + t.total_connections = t.total_connections.saturating_add(incr_by); + t.total_connections + } + + /// Decrease the total number of connections by `decr_by` + /// Return the number of total connections after the decrease + pub fn decr_total_connections(decr_by: usize) -> usize { + let mut t = TELEMETRY.write().expect(MUTEX_WRITE_ERR); + t.total_connections = t.total_connections.saturating_sub(decr_by); + t.total_connections + } + + /// Increment the total number of clients by `incr_by` + /// Return the number of total clients after the increment + pub fn incr_total_clients(incr_by: usize) -> usize { + let mut t = TELEMETRY.write().expect(MUTEX_WRITE_ERR); + t.total_clients = t.total_clients.saturating_add(incr_by); + t.total_clients + } + + /// Decrease the total number of clients by `decr_by` + /// Return the number of total clients after the decrease + pub fn decr_total_clients(decr_by: usize) -> usize { + let mut t = TELEMETRY.write().expect(MUTEX_WRITE_ERR); + t.total_clients = t.total_clients.saturating_sub(decr_by); + t.total_clients + } + + /// Return the number of active connections + pub fn total_connections() -> usize { + TELEMETRY.read().expect(MUTEX_READ_ERR).total_connections + } + + /// Return the number of active clients + pub fn total_clients() -> usize { + TELEMETRY.read().expect(MUTEX_READ_ERR).total_clients + } + + /// Reset the telemetry collected thus far + pub fn reset() { + *TELEMETRY.write().expect(MUTEX_WRITE_ERR) = Telemetry::default(); + } +} diff --git a/glide-core/tests/test_client.rs b/glide-core/tests/test_client.rs index 024c0d74bc..4e197b5c66 100644 --- a/glide-core/tests/test_client.rs +++ b/glide-core/tests/test_client.rs @@ -3,8 +3,27 @@ */ mod utilities; +#[macro_export] +/// Compare `$expected` with `$actual`. This macro, will exit the test process +/// if the assertion fails. Unlike `assert_eq!` - this also works in tasks +macro_rules! async_assert_eq { + ($expected:expr, $actual:expr) => {{ + if $actual != $expected { + println!( + "{}:{}: Expected: {:?} != Actual: {:?}", + file!(), + line!(), + $actual, + $expected + ); + std::process::exit(1); + } + }}; +} + #[cfg(test)] pub(crate) mod shared_client_tests { + use glide_core::Telemetry; use std::collections::HashMap; use super::*; @@ -44,7 +63,9 @@ pub(crate) mod shared_client_tests { }) .await } - BackingServer::Cluster(cluster) => create_cluster_client(cluster, configuration).await, + BackingServer::Cluster(cluster) => { + create_cluster_client(cluster.as_ref(), configuration).await + } } } @@ -540,6 +561,83 @@ pub(crate) mod shared_client_tests { }); } + #[test] + #[serial_test::serial] + fn test_client_telemetry_standalone() { + Telemetry::reset(); + block_on_all(async move { + // create a server with 2 clients + let server_config = TestConfiguration { + use_tls: false, + ..Default::default() + }; + + let test_basics = utilities::setup_test_basics_internal(&server_config).await; + let server = BackingServer::Standalone(test_basics.server); + + // setup_test_basics_internal internally, starts a single client connection + assert_eq!(Telemetry::total_connections(), 1); + assert_eq!(Telemetry::total_clients(), 1); + + { + // Create 2 more clients, confirm that they are tracked + let _client1 = create_client(&server, server_config.clone()).await; + let _client2 = create_client(&server, server_config).await; + + // Each client maintains a single connection + assert_eq!(Telemetry::total_connections(), 3); + assert_eq!(Telemetry::total_clients(), 3); + + // Connections are dropped here + } + + // Confirm 1 connection & client remain + assert_eq!(Telemetry::total_connections(), 1); + assert_eq!(Telemetry::total_clients(), 1); + }); + } + + #[test] + #[serial_test::serial] + fn test_client_telemetry_cluster() { + Telemetry::reset(); + block_on_all(async { + let local_set = tokio::task::LocalSet::default(); + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + // We use 2 tasks to let "dispose" be called. In addition, the task that checks for the cleanup + // does not start until the cluster is up and running. We use a channel to communicate this between + // the tasks + local_set.spawn_local(async move { + let cluster = cluster::setup_default_cluster().await; + async_assert_eq!(Telemetry::total_connections(), 0); + async_assert_eq!(Telemetry::total_clients(), 0); + + // Each client opens 12 connections + println!("Creating 1st cluster client..."); + let _c1 = cluster::setup_default_client(&cluster).await; + async_assert_eq!(Telemetry::total_connections(), 12); + async_assert_eq!(Telemetry::total_clients(), 1); + + println!("Creating 2nd cluster client..."); + let _c2 = cluster::setup_default_client(&cluster).await; + async_assert_eq!(Telemetry::total_connections(), 24); + async_assert_eq!(Telemetry::total_clients(), 2); + + let _ = tx.send(1).await; + // client is dropped and eventually disposed here + }); + + local_set.spawn_local(async move { + let _ = rx.recv().await; + println!("Cluster terminated. Wait for the telemetry to clear"); + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + assert_eq!(Telemetry::total_connections(), 0); + assert_eq!(Telemetry::total_clients(), 0); + }); + local_set.await; + }); + } + #[rstest] #[serial_test::serial] #[timeout(SHORT_CLUSTER_TEST_TIMEOUT)] diff --git a/glide-core/tests/utilities/cluster.rs b/glide-core/tests/utilities/cluster.rs index 9e7c356f4e..f12d9e911e 100644 --- a/glide-core/tests/utilities/cluster.rs +++ b/glide-core/tests/utilities/cluster.rs @@ -230,11 +230,11 @@ async fn setup_acl_for_cluster( } pub async fn create_cluster_client( - cluster: &Option, + cluster: Option<&RedisCluster>, mut configuration: TestConfiguration, ) -> Client { let addresses = if !configuration.shared_server { - cluster.as_ref().unwrap().get_server_addresses() + cluster.unwrap().get_server_addresses() } else { get_shared_cluster_addresses(configuration.use_tls) }; @@ -263,10 +263,20 @@ pub async fn setup_test_basics_internal(configuration: TestConfiguration) -> Clu } else { None }; - let client = create_cluster_client(&cluster, configuration).await; + let client = create_cluster_client(cluster.as_ref(), configuration).await; ClusterTestBasics { cluster, client } } +pub async fn setup_default_cluster() -> RedisCluster { + let test_config = TestConfiguration::default(); + RedisCluster::new(false, &test_config.connection_info, None, None) +} + +pub async fn setup_default_client(cluster: &RedisCluster) -> Client { + let test_config = TestConfiguration::default(); + create_cluster_client(Some(cluster), test_config).await +} + pub async fn setup_test_basics(use_tls: bool) -> ClusterTestBasics { setup_test_basics_internal(TestConfiguration { use_tls,