Skip to content

Commit

Permalink
Fixups/review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Benjamin Leggett <[email protected]>
  • Loading branch information
bleggett committed Apr 22, 2024
1 parent b54414e commit ca913b6
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 38 deletions.
16 changes: 10 additions & 6 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ const DEFAULT_SELFTERM_DEADLINE: Duration = Duration::from_secs(5);
const DEFAULT_CLUSTER_ID: &str = "Kubernetes";
const DEFAULT_CLUSTER_DOMAIN: &str = "cluster.local";
const DEFAULT_TTL: Duration = Duration::from_secs(60 * 60 * 24); // 24 hours
const DEFAULT_POOL_RELEASE: Duration = Duration::from_secs(60 * 5); // 5 minutes
const DEFAULT_POOL_UNUSED_RELEASE_TIMEOUT: Duration = Duration::from_secs(60 * 5); // 5 minutes
const DEFAULT_POOL_MAX_STREAMS_PER_CONNECTION: u16 = 100; //Go: 100, Hyper: 200, Envoy: 2147483647 (lol), Spec recommended minimum 100

const DEFAULT_INPOD_MARK: u32 = 1337;

Expand Down Expand Up @@ -128,8 +129,8 @@ pub struct Config {
// This can be used to effect flow control for "connection storms" when workload clients
// (such as loadgen clients) open many connections all at once.
//
// Note that this will only be checked and inner conns rebalanced accordingly when a new connection
// is requested from the pool, and not on every stream queue on that connection.
// Note that this will only be checked when a *new* connection
// is requested from the pool, and not on every *stream* queued on that connection.
// So if you request a single connection from a pool configured wiht a max streamcount of 200,
// and queue 500 streams on it, you will still exceed this limit and are at the mercy of hyper's
// default stream queuing.
Expand Down Expand Up @@ -333,11 +334,14 @@ pub fn construct_config(pc: ProxyConfig) -> Result<Config, Error> {
.get(DNS_CAPTURE_METADATA)
.map_or(false, |value| value.to_lowercase() == "true"),

pool_max_streams_per_conn: parse_default(POOL_MAX_STREAMS_PER_CONNECTION, 250)?,
pool_max_streams_per_conn: parse_default(
POOL_MAX_STREAMS_PER_CONNECTION,
DEFAULT_POOL_MAX_STREAMS_PER_CONNECTION,
)?,

pool_unused_release_timeout: match parse::<String>(POOL_UNUSED_RELEASE_TIMEOUT)? {
Some(ttl) => duration_str::parse(ttl).unwrap_or(DEFAULT_POOL_RELEASE),
None => DEFAULT_POOL_RELEASE,
Some(ttl) => duration_str::parse(ttl).unwrap_or(DEFAULT_POOL_UNUSED_RELEASE_TIMEOUT),
None => DEFAULT_POOL_UNUSED_RELEASE_TIMEOUT,
},

window_size: 4 * 1024 * 1024,
Expand Down
87 changes: 55 additions & 32 deletions src/proxy/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ use flurry::HashMap;

use pingora_pool;

// This is merely a counter to track the overall number of conns this pool spawns
// to ensure we get unique poolkeys-per-new-conn, it is not a limit
static GLOBAL_CONN_COUNT: AtomicI32 = AtomicI32::new(0);

// A relatively nonstandard HTTP/2 connection pool designed to allow multiplexing proxied workload connections
// over a (smaller) number of HTTP/2 mTLS tunnels.
//
Expand Down Expand Up @@ -75,6 +71,9 @@ struct PoolState {
established_conn_writelock: HashMap<u64, Option<Arc<Mutex<()>>>>,
close_pollers: futures::stream::FuturesUnordered<task::JoinHandle<()>>,
pool_unused_release_timeout: Duration,
// This is merely a counter to track the overall number of conns this pool spawns
// to ensure we get unique poolkeys-per-new-conn, it is not a limit
pool_global_conn_count: AtomicI32,
}

impl PoolState {
Expand Down Expand Up @@ -116,6 +115,37 @@ impl PoolState {
let _ = self.pool_notifier.send(true);
}

// Since we are using a hash key to do lookup on the inner pingora pool, do a get guard
// to make sure what we pull out actually deep-equals the workload_key, to avoid *sigh* crossing the streams.
fn guarded_get(
&self,
hash_key: &u64,
workload_key: &WorkloadKey,
) -> Result<Option<ConnClient>, Error> {
match self.connected_pool.get(hash_key) {
None => Ok(None),
Some(conn) => match Self::enforce_key_integrity(conn, workload_key) {
Err(e) => Err(e),
Ok(conn) => Ok(Some(conn)),
},
}
}

// Just for safety's sake, since we are using a hash thanks to pingora supporting arbitrary Eq, Hash
// types, do a deep equality test before returning the conn, returning an error if the conn's key does
// not equal the provided key
//
// this is a final safety check for collisions, we will throw up our hands and refuse to return the conn
fn enforce_key_integrity(
conn: ConnClient,
expected_key: &WorkloadKey,
) -> Result<ConnClient, Error> {
match conn.is_for_workload(expected_key) {
Ok(()) => Ok(conn),
Err(e) => Err(e),
}
}

// Does an initial, naive check to see if a conn exists for this key.
//
// If it does, WRITELOCK the mutex for that key, clone (or create), check in the clone,
Expand All @@ -133,7 +163,7 @@ impl PoolState {
&self,
workload_key: &WorkloadKey,
pool_key: &pingora_pool::ConnectionMeta,
) -> Option<ConnClient> {
) -> Result<Option<ConnClient>, Error> {
debug!("first checkout READGUARD");

let found_conn = {
Expand All @@ -142,10 +172,7 @@ impl PoolState {

trace!("pool connect outer map - check for keyed mutex");
let exist_conn_lock = self.established_conn_writelock.get(&pool_key.key, &guard);
match exist_conn_lock {
Some(e_conn_lock) => e_conn_lock.clone(),
None => None,
}
exist_conn_lock.and_then(|e_conn_lock| e_conn_lock.clone())
};
match found_conn {
Some(exist_conn_lock) => {
Expand All @@ -156,7 +183,8 @@ impl PoolState {
"first checkout - got writelock for conn with key {:#?} and hash {:#?}",
workload_key, pool_key.key
);
self.connected_pool.get(&pool_key.key).and_then(|e_conn| {
let got = self.guarded_get(&pool_key.key, workload_key)?;
Ok(got.and_then(|e_conn| {
trace!(
"first checkout - inner pool - got existing conn for key {:#?}",
workload_key
Expand All @@ -171,9 +199,9 @@ impl PoolState {
self.checkin_conn(e_conn.clone(), pool_key.clone());
Some(e_conn)
}
})
}))
}
None => None,
None => Ok(None),
}
}
}
Expand All @@ -200,7 +228,10 @@ impl WorkloadHBONEPool {
let (timeout_send, timeout_recv) = watch::channel(false);
let max_count = cfg.pool_max_streams_per_conn;
let pool_duration = cfg.pool_unused_release_timeout;
// This is merely a counter to track the overall number of conns this pool spawns
// to ensure we get unique poolkeys-per-new-conn, it is not a limit
debug!("constructing pool with {:#?} streams per conn", max_count);

Self {
state: Arc::new(PoolState {
pool_notifier: timeout_tx,
Expand All @@ -212,6 +243,7 @@ impl WorkloadHBONEPool {
established_conn_writelock: HashMap::new(),
close_pollers: futures::stream::FuturesUnordered::new(),
pool_unused_release_timeout: pool_duration,
pool_global_conn_count: AtomicI32::new(0),
}),
cfg,
socket_factory,
Expand All @@ -236,7 +268,9 @@ impl WorkloadHBONEPool {
let hash_key = s.finish();
let pool_key = pingora_pool::ConnectionMeta::new(
hash_key,
GLOBAL_CONN_COUNT.fetch_add(1, Ordering::Relaxed),
self.state
.pool_global_conn_count
.fetch_add(1, Ordering::Relaxed),
);
debug!("initial attempt - try to get existing conn from pool");
// First, see if we can naively just check out a connection.
Expand All @@ -252,7 +286,7 @@ impl WorkloadHBONEPool {
let existing_conn = self
.state
.first_checkout_conn_from_pool(&workload_key, &pool_key)
.await;
.await?;

// Early return, no need to do anything else
if existing_conn.is_some() {
Expand Down Expand Up @@ -350,7 +384,7 @@ impl WorkloadHBONEPool {
// END take inner writelock
}
Err(_) => {
debug!("we didnt' win the lock, something else is creating a conn, wait for it");
debug!("we didn't win the lock, something else is creating a conn, wait for it");
// If we get here, it means the following are true:
// 1. At one point, there was a preexisting conn in the pool for this key.
// 2. When we checked, we got nothing for that key.
Expand All @@ -366,9 +400,10 @@ impl WorkloadHBONEPool {
"notified a new conn was enpooled, checking for hash {:#?}",
hash_key
);
// The sharded mutex for this connkey is already locked - someone else must be making a conn
// if they are, try to wait for it, but bail if we find one and it's got a maxed streamcount.
let existing_conn = self.state.connected_pool.get(&hash_key);

// Notifier fired, try and get a conn out for our key.
// If we do, make sure it's not maxed out on streams.
let existing_conn = self.state.guarded_get(&hash_key, &workload_key)?;
match existing_conn {
None => {
trace!("woke up on pool notification, but didn't find a conn for {:#?} yet", hash_key);
Expand Down Expand Up @@ -434,18 +469,6 @@ impl WorkloadHBONEPool {
Ok(r_conn)
}
}
.and_then(|conn| {
// Finally, we either have a conn or an error.
// Just for safety's sake, since we are using a hash thanks to pingora supporting arbitrary Eq, Hash
// types, do a deep equality test before returning the conn, returning an error if the conn's key does
// not equal the provided key
//
// this is a final safety check for collisions, we will throw up our hands and refuse to return the conn
match conn.is_for_workload(workload_key) {
Ok(()) => Ok(conn),
Err(e) => Err(e),
}
})
}

async fn spawn_new_pool_conn(
Expand Down Expand Up @@ -532,8 +555,8 @@ impl ConnClient {
self.sender.send_request(req)
}

pub fn is_for_workload(&self, wl_key: WorkloadKey) -> Result<(), crate::proxy::Error> {
if !(self.wl_key == wl_key) {
pub fn is_for_workload(&self, wl_key: &WorkloadKey) -> Result<(), crate::proxy::Error> {
if !(self.wl_key == *wl_key) {
Err(crate::proxy::Error::Generic(
"fetched connection does not match workload key!".into(),
))
Expand Down

0 comments on commit ca913b6

Please sign in to comment.