Skip to content

Commit

Permalink
Do cache warmer for vote using quic if enabled (#3997)
Browse files Browse the repository at this point in the history
* Do cache warmer for vote using quic if enabled

* use then instead of then_some

* clippy issue

* refactor code

* address some feedback from Behzad
  • Loading branch information
lijunwangs authored Dec 15, 2024
1 parent d6302dc commit 141db09
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 30 deletions.
42 changes: 29 additions & 13 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,21 +332,16 @@ impl Tvu {
cluster_info.clone(),
poh_recorder.clone(),
tower_storage,
vote_connection_cache,
vote_connection_cache.clone(),
);

let warm_quic_cache_service = connection_cache.and_then(|connection_cache| {
if connection_cache.use_quic() {
Some(WarmQuicCacheService::new(
connection_cache.clone(),
cluster_info.clone(),
poh_recorder.clone(),
exit.clone(),
))
} else {
None
}
});
let warm_quic_cache_service = create_cache_warmer_if_needed(
connection_cache,
vote_connection_cache,
cluster_info,
poh_recorder,
&exit,
);

let cost_update_service = CostUpdateService::new(blockstore.clone(), cost_update_receiver);

Expand Down Expand Up @@ -417,6 +412,27 @@ impl Tvu {
}
}

fn create_cache_warmer_if_needed(
connection_cache: Option<&Arc<ConnectionCache>>,
vote_connection_cache: Arc<ConnectionCache>,
cluster_info: &Arc<ClusterInfo>,
poh_recorder: &Arc<RwLock<PohRecorder>>,
exit: &Arc<AtomicBool>,
) -> Option<WarmQuicCacheService> {
let tpu_connection_cache = connection_cache.filter(|cache| cache.use_quic()).cloned();
let vote_connection_cache = Some(vote_connection_cache).filter(|cache| cache.use_quic());

(tpu_connection_cache.is_some() || vote_connection_cache.is_some()).then(|| {
WarmQuicCacheService::new(
tpu_connection_cache,
vote_connection_cache,
cluster_info.clone(),
poh_recorder.clone(),
exit.clone(),
)
})
}

#[cfg(test)]
pub mod tests {
use {
Expand Down
71 changes: 54 additions & 17 deletions core/src/warm_quic_cache_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@ use {
rand::{thread_rng, Rng},
solana_client::connection_cache::{ConnectionCache, Protocol},
solana_connection_cache::client_connection::ClientConnection as TpuConnection,
solana_gossip::cluster_info::ClusterInfo,
solana_gossip::{
cluster_info::ClusterInfo,
contact_info::{ContactInfo, Error},
},
solana_poh::poh_recorder::PohRecorder,
solana_sdk::pubkey::Pubkey,
std::{
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
Expand All @@ -26,13 +31,43 @@ const CACHE_OFFSET_SLOT: i64 = 100;
const CACHE_JITTER_SLOT: i64 = 20;

impl WarmQuicCacheService {
fn warmup_connection(
cache: Option<&ConnectionCache>,
cluster_info: &ClusterInfo,
leader_pubkey: &Pubkey,
contact_info_selector: impl Fn(&ContactInfo) -> Result<SocketAddr, Error>,
log_context: &str,
) {
if let Some(connection_cache) = cache {
if let Some(Ok(addr)) =
cluster_info.lookup_contact_info(leader_pubkey, contact_info_selector)
{
let conn = connection_cache.get_connection(&addr);
if let Err(err) = conn.send_data(&[]) {
warn!(
"Failed to warmup QUIC connection to the leader {leader_pubkey:?} at {addr:?}, \
Context: {log_context}, Error: {err:?}"
);
}
}
}
}

pub fn new(
connection_cache: Arc<ConnectionCache>,
tpu_connection_cache: Option<Arc<ConnectionCache>>,
vote_connection_cache: Option<Arc<ConnectionCache>>,
cluster_info: Arc<ClusterInfo>,
poh_recorder: Arc<RwLock<PohRecorder>>,
exit: Arc<AtomicBool>,
) -> Self {
assert!(matches!(*connection_cache, ConnectionCache::Quic(_)));
assert!(matches!(
tpu_connection_cache.as_deref(),
None | Some(ConnectionCache::Quic(_))
));
assert!(matches!(
vote_connection_cache.as_deref(),
None | Some(ConnectionCache::Quic(_))
));
let thread_hdl = Builder::new()
.name("solWarmQuicSvc".to_string())
.spawn(move || {
Expand All @@ -48,20 +83,22 @@ impl WarmQuicCacheService {
.map_or(true, |last_leader| last_leader != leader_pubkey)
{
maybe_last_leader = Some(leader_pubkey);
if let Some(Ok(addr)) = cluster_info
.lookup_contact_info(&leader_pubkey, |node| {
node.tpu(Protocol::QUIC)
})
{
let conn = connection_cache.get_connection(&addr);
if let Err(err) = conn.send_data(&[]) {
warn!(
"Failed to warmup QUIC connection to the leader {:?}, \
Error {:?}",
leader_pubkey, err
);
}
}
// Warm cache for regular transactions
Self::warmup_connection(
tpu_connection_cache.as_deref(),
&cluster_info,
&leader_pubkey,
|node| node.tpu(Protocol::QUIC),
"tpu",
);
// Warm cache for vote
Self::warmup_connection(
vote_connection_cache.as_deref(),
&cluster_info,
&leader_pubkey,
|node| node.tpu_vote(Protocol::QUIC),
"vote",
);
}
}
sleep(Duration::from_millis(200));
Expand Down

0 comments on commit 141db09

Please sign in to comment.