Skip to content

Commit

Permalink
Cleanup comments
Browse files Browse the repository at this point in the history
Signed-off-by: Benjamin Leggett <[email protected]>
  • Loading branch information
bleggett committed Apr 18, 2024
1 parent 30c19f4 commit b9cf358
Showing 1 changed file with 53 additions and 66 deletions.
119 changes: 53 additions & 66 deletions src/proxy/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use tokio::sync::watch;
use tokio::task;

use tokio::sync::Mutex;
use tracing::{debug, error};
use tracing::{debug, error, trace};

use crate::config;
use crate::identity::{Identity, SecretManager};
Expand All @@ -41,6 +41,8 @@ use flurry::HashMap;

use pingora_pool;

// This is merely a counter to track the overall number of conns this pool spawns
// to ensure we get unique poolkeys-per-new-conn, it is not a limit
static GLOBAL_CONN_COUNT: AtomicI32 = AtomicI32::new(0);

// A relatively nonstandard HTTP/2 connection pool designed to allow multiplexing proxied workload connections
Expand All @@ -53,8 +55,6 @@ static GLOBAL_CONN_COUNT: AtomicI32 = AtomicI32::new(0);
// by flow control throttling.
#[derive(Clone)]
pub struct WorkloadHBONEPool {
// this is effectively just a convenience data type - a rwlocked hashmap with keying and LRU drops
// and has no actual hyper/http/connection logic.
cfg: config::Config,
socket_factory: Arc<dyn SocketFactory + Send + Sync>,
cert_manager: Arc<SecretManager>,
Expand All @@ -63,14 +63,15 @@ pub struct WorkloadHBONEPool {
max_streamcount: u16,
}

// PoolState is effectively the gnarly inner state stuff that needs thread/task sync, and should be wrapped in a Mutex.
struct PoolState {
pool_notifier: watch::Sender<bool>, // This is already impl clone? rustc complains that it isn't, tho
timeout_tx: watch::Sender<bool>, // This is already impl clone? rustc complains that it isn't, tho
timeout_rx: watch::Receiver<bool>,
// this is effectively just a convenience data type - a rwlocked hashmap with keying and LRU drops
// and has no actual hyper/http/connection logic.
connected_pool: Arc<pingora_pool::ConnectionPool<ConnClient>>,
// this must be a readlockable list-of-locks, so we can lock per-key, not globally, and avoid holding up all conn attempts
// this must be an atomic/concurrent-safe list-of-locks, so we can lock per-key, not globally, and avoid holding up all conn attempts
established_conn_writelock: HashMap<u64, Option<Arc<Mutex<()>>>>,
close_pollers: futures::stream::FuturesUnordered<task::JoinHandle<()>>,
pool_unused_release_timeout: Duration,
Expand All @@ -86,7 +87,8 @@ impl PoolState {
//
// Idle poppers are safe to invoke if the conn they are popping is already gone
// from the inner queue, so we will start one for every insert, let them run or terminate on their own,
// and poll them to completion on shutdown.
// and poll them to completion on shutdown - any duplicates from repeated checkouts/checkins of the same conn
// will simply resolve as a no-op in order.
//
// Note that "idle" in the context of this pool means "no one has asked for it or dropped it in X time, so prune it".
//
Expand Down Expand Up @@ -114,6 +116,19 @@ impl PoolState {
let _ = self.pool_notifier.send(true);
}

// Does an initial, naive check to see if a conn exists for this key.
//
// If it does, WRITELOCK the mutex for that key, clone (or create), check in the clone,
// and return the other reference for use.
//
// Otherwise, return None.
//
// It is important that the *initial* check here is authoritative, hence the locks, as
// we must know if this is a connection for a key *nobody* has tried to start yet,
// or if other things have already established conns for this key.
//
// This is so we can backpressure correctly if 1000 tasks all demand a new connection
// to the same key at once, and not eagerly open 1000 tunnel connections.
async fn first_checkout_conn_from_pool(
&self,
workload_key: &WorkloadKey,
Expand All @@ -122,38 +137,30 @@ impl PoolState {
debug!("first checkout READGUARD");

let found_conn = {
// BEGIN take outer readlock
debug!("pool connect MAP OUTER READ/WRITE GUARD");
trace!("pool connect outer map - take guard");
let guard = self.established_conn_writelock.guard();
debug!("pool connect MAP OUTER READ/WRITE START");

//OLD
// debug!("pool connect MAP OUTER READLOCK START");
// let map_read_lock = self.state.established_conn_writelock.read().await;
// debug!("pool connect MAP OUTER READLOCK END");
trace!("pool connect outer map - check for keyed mutex");
let exist_conn_lock = self.established_conn_writelock.get(&pool_key.key, &guard);
// BEGIN take inner writelock
debug!("pool connect MAP INNER WRITELOCK START");
match exist_conn_lock {
Some(e_conn_lock) => e_conn_lock.clone(),
None => None,
}
// exist_conn_lock.as_ref().unwrap().clone()
};
// let guard = self.established_conn_writelock.guard();
// debug!("pool connect MAP OUTER READGUARD GOT");
match found_conn {
// match map_read_lock.get(&pool_key.key) {
Some(exist_conn_lock) => {
debug!("first checkout INNER WRITELOCK");
debug!("first checkout - found mutex for key, waiting for writelock");
let _conn_lock = exist_conn_lock.as_ref().lock().await;

debug!(
"getting conn for key {:#?} and hash {:#?}",
"first checkout - got writelock for conn with key {:#?} and hash {:#?}",
workload_key, pool_key.key
);
self.connected_pool.get(&pool_key.key).and_then(|e_conn| {
debug!("got existing conn for key {:#?}", workload_key);
trace!(
"first checkout - inner pool - got existing conn for key {:#?}",
workload_key
);
if e_conn.at_max_streamcount() {
debug!(
"got conn for key {:#?}, but streamcount is maxed",
Expand All @@ -171,9 +178,11 @@ impl PoolState {
}
}

// When the Arc-wrapped PoolState is finally dropped, trigger the drain,
// which will terminate all connection driver spawns, as well as cancel all outstanding eviction timeout spawns
impl Drop for PoolState {
fn drop(&mut self) {
debug!("poolstate dropping, cancelling all outstanding pool eviction timeout spawns");
debug!("poolstate dropping, stopping all connection drivers and cancelling all outstanding eviction timeout spawns");
let _ = self.timeout_tx.send(true);
}
}
Expand Down Expand Up @@ -221,15 +230,15 @@ impl WorkloadHBONEPool {
pub async fn connect(&mut self, workload_key: WorkloadKey) -> Result<ConnClient, Error> {
debug!("pool connect START");
// TODO BML this may not be collision resistant/slow. It should be resistant enough for workloads tho.
// We are doing a deep-equals check at the end to mitigate any collisions, will see about bumping Pingora
let mut s = DefaultHasher::new();
workload_key.hash(&mut s);
let hash_key = s.finish();
let pool_key = pingora_pool::ConnectionMeta::new(
hash_key,
GLOBAL_CONN_COUNT.fetch_add(1, Ordering::Relaxed),
);

debug!("pool connect GET EXISTING");
debug!("initial attempt - try to get existing conn from pool");
// First, see if we can naively just check out a connection.
// This should be the common case, except for the first establishment of a new connection/key.
// This will be done under outer readlock (nonexclusive)/inner keyed writelock (exclusive).
Expand All @@ -245,37 +254,27 @@ impl WorkloadHBONEPool {
.first_checkout_conn_from_pool(&workload_key, &pool_key)
.await;

debug!("pool connect GOT EXISTING");
if existing_conn.is_some() {
debug!("using existing conn, connect future will be dropped on the floor");
debug!("initial attempt - found existing conn, done");
Ok(existing_conn.unwrap())
} else {
// We couldn't get a conn. This means either nobody has tried to establish any conns for this key yet,
// or they have, but no conns are currently available
// (because someone else has checked all of them out and not put any back yet)
//
// So, we wil writelock the outer lock to see if an inner lock entry exists for our key.
//
// critical block - this writelocks the entire pool for all tasks/threads
// as we check to see if anyone has ever inserted a sharded mutex for this key.
// So, we will take a nonexclusive readlock on the lockmap, to see if an inner lock
// exists for our key.
//
// If not, we insert one.
//
// We want to hold this for as short as possible a time, and drop it
// before we hold it over an await.
//
// this is is the ONLY block where we should hold a writelock on the whole mutex map
// for the rest, a readlock (nonexclusive) is sufficient.
{
debug!("pool connect MAP OUTER READ/WRITE GUARD");
debug!("didn't find a connection for key {:#?}, making sure lockmap has entry", hash_key);
let guard = self.state.established_conn_writelock.guard();
debug!("pool connect MAP OUTER READ/WRITE START");
match self.state.established_conn_writelock.try_insert(hash_key, Some(Arc::new(Mutex::new(()))), &guard) {
Ok(_) => {
debug!("inserting conn mutex for key {:#?}", hash_key);
debug!("inserting conn mutex for key {:#?} into lockmap", hash_key);
}
Err(_) => {
debug!("already have conn for key {:#?}", hash_key);
debug!("already have conn for key {:#?} in lockmap", hash_key);
}
}
}
Expand All @@ -285,7 +284,7 @@ impl WorkloadHBONEPool {
// 2. We can now, under readlock(nonexclusive) in the outer map, attempt to
// take the inner writelock for our specific key (exclusive).
//
// This unblocks other tasks spawning connections against other keys, but blocks other
// This doesn't block other tasks spawning connections against other keys, but DOES block other
// tasks spawning connections against THIS key - which is what we want.

// NOTE: This inner, key-specific mutex is a tokio::async::Mutex, and not a stdlib sync mutex.
Expand All @@ -296,37 +295,29 @@ impl WorkloadHBONEPool {
// hold it over does not resolve.
//
// HOWEVER. Here we know this connection will either establish or timeout
// and we WANT other tasks to go back to sleep if there is an outstanding lock.
// So the downsides are actually useful (we WANT task contention -
// to block other parallel tasks from trying to spawn a connection if we are already doing so)
// and we WANT other tasks to go back to sleep if a task is already trying to create a new connection for this key.
//
let found_conn = {
// BEGIN take outer readlock
debug!("pool connect MAP OUTER READ/WRITE GUARD");
// So the downsides are actually useful (we WANT task contention -
// to block other parallel tasks from trying to spawn a connection for this key if we are already doing so)
let inner_conn_lock = {
trace!("fallback attempt - getting keyed lock out of lockmap");
let guard = self.state.established_conn_writelock.guard();
debug!("pool connect MAP OUTER READ/WRITE START");


//OLD
// debug!("pool connect MAP OUTER READLOCK START");
// let map_read_lock = self.state.established_conn_writelock.read().await;
// debug!("pool connect MAP OUTER READLOCK END");
let exist_conn_lock = self.state.established_conn_writelock.get(&hash_key, &guard).unwrap();
// BEGIN take inner writelock
debug!("pool connect MAP INNER WRITELOCK START");
trace!("fallback attempt - got keyed lock out of lockmap");
exist_conn_lock.as_ref().unwrap().clone()
};
// drop(guard);

let got_conn = match found_conn.try_lock() {
debug!("appears we need a new conn, attempting to win connlock for wl key {:#?}", workload_key);
let got_conn = match inner_conn_lock.try_lock() {
Ok(_guard) => {
// BEGIN take inner writelock
// If we get here, it means the following are true:
// 1. We did not get a connection for our key.
// 2. We have the exclusive inner writelock to create a new connection for our key.
//
// So, carry on doing that.
debug!("appears we need a new conn, retaining connlock");
debug!("nothing else is creating a conn, make one");
debug!("nothing else is creating a conn and we won the lock, make one");
let pool_conn = self.spawn_new_pool_conn(workload_key.clone()).await;
let client = ConnClient{
sender: pool_conn?,
Expand All @@ -339,12 +330,11 @@ impl WorkloadHBONEPool {
"starting new conn for key {:#?} with pk {:#?}",
workload_key, pool_key
);
debug!("dropping lock");
Some(client)
// END take inner writelock
}
Err(_) => {
debug!("something else is creating a conn, wait for it");
debug!("we didnt' win the lock, something else is creating a conn, wait for it");
// If we get here, it means the following are true:
// 1. At one point, there was a preexisting conn in the pool for this key.
// 2. When we checked, we got nothing for that key.
Expand All @@ -356,7 +346,7 @@ impl WorkloadHBONEPool {
loop {
match self.pool_watcher.changed().await {
Ok(_) => {
debug!(
trace!(
"notified a new conn was enpooled, checking for hash {:#?}",
hash_key
);
Expand All @@ -365,7 +355,7 @@ impl WorkloadHBONEPool {
let existing_conn = self.state.connected_pool.get(&hash_key);
match existing_conn {
None => {
debug!("got nothing");
trace!("woke up on pool notification, but didn't find a conn for {:#?} yet", hash_key);
continue;
}
Some(e_conn) => {
Expand All @@ -376,13 +366,11 @@ impl WorkloadHBONEPool {
debug!("found existing conn for key {:#?}, but streamcount is maxed", workload_key);
break None;
}
debug!("pool connect LOOP END");
break Some(e_conn);
}
}
}
Err(_) => {
// END take outer readlock
return Err(Error::WorkloadHBONEPoolDraining);
}
}
Expand Down Expand Up @@ -430,7 +418,6 @@ impl WorkloadHBONEPool {
Ok(r_conn)
}
}
// END take outer readlock
}.and_then(|conn| {
// Finally, we either have a conn or an error.
// Just for safety's sake, since we are using a hash thanks to pingora supporting arbitrary Eq, Hash
Expand Down

0 comments on commit b9cf358

Please sign in to comment.