diff --git a/src/proxy/pool.rs b/src/proxy/pool.rs index f469ee9b0..b8c9f632b 100644 --- a/src/proxy/pool.rs +++ b/src/proxy/pool.rs @@ -120,11 +120,11 @@ impl WorkloadHBONEPool { // // If many `connects` request a connection to the same dest at once, all will wait until exactly // one connection is created, before deciding if they should create more or just use that one. - pub async fn connect(&mut self, key: WorkloadKey) -> Result { + pub async fn connect(&mut self, workload_key: WorkloadKey) -> Result { debug!("pool connect START"); // TODO BML this may not be collision resistant/slow. It should be resistant enough for workloads tho. let mut s = DefaultHasher::new(); - key.hash(&mut s); + workload_key.hash(&mut s); let hash_key = s.finish(); let pool_key = pingora_pool::ConnectionMeta::new( hash_key, @@ -143,7 +143,7 @@ impl WorkloadHBONEPool { // This is so we can backpressure correctly if 1000 tasks all demand a new connection // to the same key at once, and not eagerly open 1000 tunnel connections. let existing_conn = self - .first_checkout_conn_from_pool(&key, hash_key, &pool_key) + .first_checkout_conn_from_pool(&workload_key, &pool_key) .await; debug!("pool connect GOT EXISTING"); @@ -219,17 +219,17 @@ impl WorkloadHBONEPool { // So, carry on doing that. debug!("appears we need a new conn, retaining connlock"); debug!("nothing else is creating a conn, make one"); - let pool_conn = self.spawn_new_pool_conn(key.clone()).await; + let pool_conn = self.spawn_new_pool_conn(workload_key.clone()).await; let client = ConnClient{ sender: pool_conn?, stream_count: Arc::new(AtomicU16::new(0)), stream_count_max: self.max_streamcount, - wl_key: key.clone(), + wl_key: workload_key.clone(), }; debug!( "starting new conn for key {:#?} with pk {:#?}", - key, pool_key + workload_key, pool_key ); debug!("dropping lock"); Some(client) @@ -265,7 +265,7 @@ impl WorkloadHBONEPool { // We found a conn, but it's already maxed out. // Return None and create another. if e_conn.at_max_streamcount() { - debug!("found existing conn for key {:#?}, but streamcount is maxed", key); + debug!("found existing conn for key {:#?}, but streamcount is maxed", workload_key); break None; } debug!("pool connect LOOP END"); @@ -310,13 +310,13 @@ impl WorkloadHBONEPool { // (streamcount maxed, etc) // Start a new one, clone the underlying client, return a copy, and put the other back in the pool. None => { - debug!("spawning new conn for key {:#?} to replace", key); - let pool_conn = self.spawn_new_pool_conn(key.clone()).await; + debug!("spawning new conn for key {:#?} to replace", workload_key); + let pool_conn = self.spawn_new_pool_conn(workload_key.clone()).await; let r_conn = ConnClient{ sender: pool_conn?, stream_count: Arc::new(AtomicU16::new(0)), stream_count_max: self.max_streamcount, - wl_key: key.clone(), + wl_key: workload_key.clone(), }; self.checkin_conn(r_conn.clone(), pool_key.clone()); Ok(r_conn) @@ -330,7 +330,7 @@ impl WorkloadHBONEPool { // not equal the provided key // // this is a final safety check for collisions, we will throw up our hands and refuse to return the conn - match conn.is_for_workload(key) { + match conn.is_for_workload(workload_key) { Ok(()) => { return Ok(conn) } @@ -343,22 +343,21 @@ impl WorkloadHBONEPool { } async fn first_checkout_conn_from_pool( &self, - key: &WorkloadKey, - hash_key: u64, + workload_key: &WorkloadKey, pool_key: &pingora_pool::ConnectionMeta, ) -> Option { debug!("first checkout READLOCK"); let map_read_lock = self.established_conn_writelock.read().await; - match map_read_lock.get(&hash_key) { + match map_read_lock.get(&pool_key.key) { Some(exist_conn_lock) => { debug!("first checkout INNER WRITELOCK"); let _conn_lock = exist_conn_lock.as_ref().unwrap().lock().await; - debug!("getting conn for key {:#?} and hash {:#?}", key, hash_key); - self.connected_pool.get(&hash_key).and_then(|e_conn| { - debug!("got existing conn for key {:#?}", key); + debug!("getting conn for key {:#?} and hash {:#?}", workload_key, pool_key.key); + self.connected_pool.get(&pool_key.key).and_then(|e_conn| { + debug!("got existing conn for key {:#?}", workload_key); if e_conn.at_max_streamcount() { - debug!("got conn for key {:#?}, but streamcount is maxed", key); + debug!("got conn for key {:#?}, but streamcount is maxed", workload_key); None } else { self.checkin_conn(e_conn.clone(), pool_key.clone());