Skip to content

Commit

Permalink
Added Telemetry support for GLIDE client (core only) (#2545)
Browse files Browse the repository at this point in the history
  • Loading branch information
eifrah-aws authored Nov 5, 2024
1 parent 65bc84c commit 84d09a6
Show file tree
Hide file tree
Showing 13 changed files with 349 additions and 41 deletions.
3 changes: 3 additions & 0 deletions glide-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ redis = { path = "./redis-rs/redis", features = [
"cluster",
"cluster-async",
] }
telemetrylib = { path = "./telemetry" }
tokio = { version = "1", features = ["macros", "time"] }
logger_core = { path = "../logger_core" }
dispose = "0.5.0"
Expand All @@ -37,6 +38,8 @@ once_cell = "1.18.0"
sha1_smol = "1.0.0"
nanoid = "0.4.0"
async-trait = { version = "0.1.24" }
serde_json = "1"
serde = { version = "1", features = ["derive"] }

[features]
socket-layer = [
Expand Down
2 changes: 2 additions & 0 deletions glide-core/redis-rs/redis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ tracing = "0.1"
# Optional uuid support
uuid = { version = "1.6.1", optional = true }

telemetrylib = { path = "../../telemetry" }

[features]
default = [
"acl",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,22 @@ use dashmap::DashMap;
use futures::FutureExt;
use rand::seq::IteratorRandom;
use std::net::IpAddr;
use telemetrylib::Telemetry;

/// Count the number of connections in a connections_map object
macro_rules! count_connections {
($conn_map:expr) => {{
let mut count = 0usize;
for a in $conn_map {
count = count.saturating_add(if a.management_connection.is_some() {
2
} else {
1
});
}
count
}};
}

/// A struct that encapsulates a network connection along with its associated IP address.
#[derive(Clone, Eq, PartialEq, Debug)]
Expand Down Expand Up @@ -66,6 +82,15 @@ where
}
}

/// Return the number of underlying connections managed by this instance of ClusterNode
pub fn connections_count(&self) -> usize {
if self.management_connection.is_some() {
2
} else {
1
}
}

pub(crate) fn get_connection(&self, conn_type: &ConnectionType) -> Connection {
match conn_type {
ConnectionType::User => self.user_connection.conn.clone(),
Expand Down Expand Up @@ -106,6 +131,13 @@ pub(crate) struct ConnectionsContainer<Connection> {
topology_hash: TopologyHash,
}

impl<Connection> Drop for ConnectionsContainer<Connection> {
fn drop(&mut self) {
let count = count_connections!(&self.connection_map);
Telemetry::decr_total_connections(count);
}
}

impl<Connection> Default for ConnectionsContainer<Connection> {
fn default() -> Self {
Self {
Expand All @@ -129,8 +161,14 @@ where
read_from_replica_strategy: ReadFromReplicaStrategy,
topology_hash: TopologyHash,
) -> Self {
let connection_map = connection_map.0;

// Update the telemetry with the number of connections
let count = count_connections!(&connection_map);
Telemetry::incr_total_connections(count);

Self {
connection_map: connection_map.0,
connection_map,
slot_map,
read_from_replica_strategy,
topology_hash,
Expand All @@ -142,7 +180,11 @@ where
&mut self,
other_connection_map: ConnectionsMap<Connection>,
) {
let conn_count_before = count_connections!(&self.connection_map);
self.connection_map.extend(other_connection_map.0);
let conn_count_after = count_connections!(&self.connection_map);
// Update the number of connections by the difference
Telemetry::incr_total_connections(conn_count_after.saturating_sub(conn_count_before));
}

/// Returns true if the address represents a known primary node.
Expand Down Expand Up @@ -275,20 +317,35 @@ where
node: ClusterNode<Connection>,
) -> String {
let address = address.into();
self.connection_map.insert(address.clone(), node);

// Increase the total number of connections by the number of connections managed by `node`
Telemetry::incr_total_connections(node.connections_count());

if let Some(old_conn) = self.connection_map.insert(address.clone(), node) {
// We are replacing a node. Reduce the counter by the number of connections managed by
// the old connection
Telemetry::decr_total_connections(old_conn.connections_count());
};
address
}

pub(crate) fn remove_node(&self, address: &String) -> Option<ClusterNode<Connection>> {
self.connection_map
.remove(address)
.map(|(_key, value)| value)
if let Some((_key, old_conn)) = self.connection_map.remove(address) {
Telemetry::decr_total_connections(old_conn.connections_count());
Some(old_conn)
} else {
None
}
}

pub(crate) fn len(&self) -> usize {
self.connection_map.len()
}

pub(crate) fn connection_map(&self) -> &DashMap<String, ClusterNode<Connection>> {
&self.connection_map
}

pub(crate) fn get_current_topology_hash(&self) -> TopologyHash {
self.topology_hash
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::net::SocketAddr;

use super::{
connections_container::{ClusterNode, ConnectionWithIp},
Connect,
Expand All @@ -11,6 +9,7 @@ use crate::{
cluster_client::ClusterParams,
ErrorKind, RedisError, RedisResult,
};
use std::net::SocketAddr;

use futures::prelude::*;
use futures_util::{future::BoxFuture, join};
Expand Down Expand Up @@ -113,13 +112,15 @@ where
C: ConnectionLike + Connect + Send + Sync + 'static + Clone,
{
match future::join(
// User connection
create_connection(
addr,
params.clone(),
socket_addr,
false,
glide_connection_options.clone(),
),
// Management connection
create_connection(
addr,
params.clone(),
Expand Down
28 changes: 22 additions & 6 deletions glide-core/redis-rs/redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use tokio::task::JoinHandle;

#[cfg(feature = "tokio-comp")]
use crate::aio::DisconnectNotifier;
use telemetrylib::Telemetry;

use crate::{
aio::{get_socket_addrs, ConnectionLike, MultiplexedConnection, Runtime},
Expand Down Expand Up @@ -144,7 +145,7 @@ where
/// # Arguments
///
/// * `scan_state_rc` - A reference to the scan state, For initiating new scan send [`ScanStateRC::new()`],
/// for each subsequent iteration use the returned [`ScanStateRC`].
/// for each subsequent iteration use the returned [`ScanStateRC`].
/// * `count` - An optional count of keys requested,
/// the amount returned can vary and not obligated to return exactly count.
/// * `object_type` - An optional [`ObjectType`] enum of requested key redis type.
Expand Down Expand Up @@ -181,7 +182,7 @@ where
/// break;
/// }
/// }
/// keys
/// keys
/// }
/// ```
pub async fn cluster_scan(
Expand Down Expand Up @@ -241,7 +242,7 @@ where
/// break;
/// }
/// }
/// keys
/// keys
/// }
/// ```
pub async fn cluster_scan_with_pattern<K: ToRedisArgs>(
Expand Down Expand Up @@ -495,14 +496,27 @@ pub(crate) struct ClusterConnInner<C> {

impl<C> Dispose for ClusterConnInner<C> {
fn dispose(self) {
if let Ok(conn_lock) = self.inner.conn_lock.try_read() {
// Each node may contain user and *maybe* a management connection
let mut count = 0usize;
for node in conn_lock.connection_map() {
count = node.connections_count();
}
Telemetry::decr_total_connections(count);
}

if let Some(handle) = self.periodic_checks_handler {
#[cfg(feature = "tokio-comp")]
handle.abort()
}

if let Some(handle) = self.connections_validation_handler {
#[cfg(feature = "tokio-comp")]
handle.abort()
}

// Reduce the number of clients
Telemetry::decr_total_clients(1);
}
}

Expand Down Expand Up @@ -1080,6 +1094,8 @@ where
}
}

// New client added
Telemetry::incr_total_clients(1);
Ok(Disposable::new(connection))
}

Expand Down Expand Up @@ -1185,7 +1201,7 @@ where
Ok(connections.0)
}

// Reconnet to the initial nodes provided by the user in the creation of the client,
// Reconnect to the initial nodes provided by the user in the creation of the client,
// and try to refresh the slots based on the initial connections.
// Being used when all cluster connections are unavailable.
fn reconnect_to_initial_nodes(inner: Arc<InnerCore<C>>) -> impl Future<Output = ()> {
Expand Down Expand Up @@ -1272,7 +1288,7 @@ where
);

if !addrs_to_refresh.is_empty() {
// dont try existing nodes since we know a. it does not exist. b. exist but its connection is closed
// don't try existing nodes since we know a. it does not exist. b. exist but its connection is closed
Self::refresh_connections(
inner.clone(),
addrs_to_refresh,
Expand Down Expand Up @@ -1481,7 +1497,7 @@ where
});
let wait_duration = rate_limiter.wait_duration();
if passed_time <= wait_duration {
debug!("Skipping slot refresh as the wait duration hasn't yet passed. Passed time = {:?},
debug!("Skipping slot refresh as the wait duration hasn't yet passed. Passed time = {:?},
Wait duration = {:?}", passed_time, wait_duration);
skip_slots_refresh = true;
}
Expand Down
30 changes: 15 additions & 15 deletions glide-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,22 @@ pub const DEFAULT_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_millis(2
pub const DEFAULT_PERIODIC_TOPOLOGY_CHECKS_INTERVAL: Duration = Duration::from_secs(60);
pub const INTERNAL_CONNECTION_TIMEOUT: Duration = Duration::from_millis(250);
pub const FINISHED_SCAN_CURSOR: &str = "finished";
// The value of 1000 for the maximum number of inflight requests is determined based on Little's Law in queuing theory:
//
// Expected maximum request rate: 50,000 requests/second
// Expected response time: 1 millisecond
//
// According to Little's Law, the maximum number of inflight requests required to fully utilize the maximum request rate is:
// (50,000 requests/second) × (1 millisecond / 1000 milliseconds) = 50 requests
//
// The value of 1000 provides a buffer for bursts while still allowing full utilization of the maximum request rate.

/// The value of 1000 for the maximum number of inflight requests is determined based on Little's Law in queuing theory:
///
/// Expected maximum request rate: 50,000 requests/second
/// Expected response time: 1 millisecond
///
/// According to Little's Law, the maximum number of inflight requests required to fully utilize the maximum request rate is:
/// (50,000 requests/second) × (1 millisecond / 1000 milliseconds) = 50 requests
///
/// The value of 1000 provides a buffer for bursts while still allowing full utilization of the maximum request rate.
pub const DEFAULT_MAX_INFLIGHT_REQUESTS: u32 = 1000;

// The connection check interval is currently not exposed to the user via ConnectionRequest,
// as improper configuration could negatively impact performance or pub/sub resiliency.
// A 3-second interval provides a reasonable balance between connection validation
// and performance overhead.
/// The connection check interval is currently not exposed to the user via ConnectionRequest,
/// as improper configuration could negatively impact performance or pub/sub resiliency.
/// A 3-second interval provides a reasonable balance between connection validation
/// and performance overhead.
pub const CONNECTION_CHECKS_INTERVAL: Duration = Duration::from_secs(3);

pub(super) fn get_port(address: &NodeAddress) -> u16 {
Expand Down Expand Up @@ -712,8 +713,7 @@ impl Client {
})
})
.await
.map_err(|_| ConnectionError::Timeout)
.and_then(|res| res)
.map_err(|_| ConnectionError::Timeout)?
}
}

Expand Down
Loading

0 comments on commit 84d09a6

Please sign in to comment.