Skip to content

Commit

Permalink
Revert "revert"
Browse files Browse the repository at this point in the history
This reverts commit f2f2286.
  • Loading branch information
esemeniuc committed Feb 2, 2024
1 parent f2f2286 commit 432784b
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 16 deletions.
8 changes: 4 additions & 4 deletions core/src/proxy/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub async fn generate_auth_tokens(
auth_service_client: &mut AuthServiceClient<Channel>,
// used to sign challenges
keypair: &Keypair,
) -> crate::proxy::Result<(
) -> crate::proxy::ProxyResult<(
Token, /* access_token */
Token, /* refresh_token */
)> {
Expand Down Expand Up @@ -105,7 +105,7 @@ pub async fn maybe_refresh_auth_tokens(
cluster_info: &Arc<ClusterInfo>,
connection_timeout: &Duration,
refresh_within_s: u64,
) -> crate::proxy::Result<(
) -> crate::proxy::ProxyResult<(
Option<Token>, // access token
Option<Token>, // refresh token
)> {
Expand Down Expand Up @@ -159,7 +159,7 @@ pub async fn maybe_refresh_auth_tokens(
pub async fn refresh_access_token(
auth_service_client: &mut AuthServiceClient<Channel>,
refresh_token: &Token,
) -> crate::proxy::Result<Token> {
) -> crate::proxy::ProxyResult<Token> {
let response = auth_service_client
.refresh_access_token(RefreshAccessTokenRequest {
refresh_token: refresh_token.value.clone(),
Expand All @@ -172,7 +172,7 @@ pub async fn refresh_access_token(
/// An invalid token is one where any of its fields are None or the token itself is None.
/// Performs the necessary validations on the auth tokens before returning,
/// i.e. it is safe to call .unwrap() on the token fields from the call-site.
fn get_validated_token(maybe_token: Option<Token>) -> crate::proxy::Result<Token> {
fn get_validated_token(maybe_token: Option<Token>) -> crate::proxy::ProxyResult<Token> {
let token = maybe_token
.ok_or_else(|| ProxyError::BadAuthenticationToken("received a null token".to_string()))?;
if token.expires_at_utc.is_none() {
Expand Down
10 changes: 5 additions & 5 deletions core/src/proxy/block_engine_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ impl BlockEngineStage {
exit: &Arc<AtomicBool>,
block_builder_fee_info: &Arc<Mutex<BlockBuilderFeeInfo>>,
connection_timeout: &Duration,
) -> crate::proxy::Result<()> {
) -> crate::proxy::ProxyResult<()> {
// Get a copy of configs here in case they have changed at runtime
let keypair = cluster_info.keypair().clone();

Expand Down Expand Up @@ -294,7 +294,7 @@ impl BlockEngineStage {
connection_timeout: &Duration,
keypair: Arc<Keypair>,
cluster_info: &Arc<ClusterInfo>,
) -> crate::proxy::Result<()> {
) -> crate::proxy::ProxyResult<()> {
let subscribe_packets_stream = timeout(
*connection_timeout,
client.subscribe_packets(block_engine::SubscribePacketsRequest {}),
Expand Down Expand Up @@ -369,7 +369,7 @@ impl BlockEngineStage {
keypair: Arc<Keypair>,
cluster_info: &Arc<ClusterInfo>,
connection_timeout: &Duration,
) -> crate::proxy::Result<()> {
) -> crate::proxy::ProxyResult<()> {
const METRICS_TICK: Duration = Duration::from_secs(1);
const MAINTENANCE_TICK: Duration = Duration::from_secs(10 * 60);
let refresh_within_s: u64 = METRICS_TICK.as_secs().saturating_mul(3).saturating_div(2);
Expand Down Expand Up @@ -453,7 +453,7 @@ impl BlockEngineStage {
maybe_bundles_response: Result<Option<block_engine::SubscribeBundlesResponse>, Status>,
bundle_sender: &Sender<Vec<PacketBundle>>,
block_engine_stats: &mut BlockEngineStageStats,
) -> crate::proxy::Result<()> {
) -> crate::proxy::ProxyResult<()> {
let bundles_response = maybe_bundles_response?.ok_or(ProxyError::GrpcStreamDisconnected)?;
let bundles: Vec<PacketBundle> = bundles_response
.bundles
Expand Down Expand Up @@ -491,7 +491,7 @@ impl BlockEngineStage {
banking_packet_sender: &BankingPacketSender,
trust_packets: bool,
block_engine_stats: &mut BlockEngineStageStats,
) -> crate::proxy::Result<()> {
) -> crate::proxy::ProxyResult<()> {
if let Some(batch) = resp.batch {
if batch.packets.is_empty() {
saturating_add_assign!(block_engine_stats.num_empty_packets, 1);
Expand Down
5 changes: 3 additions & 2 deletions core/src/proxy/fetch_stage_manager.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use {
crate::proxy::{relayer_stage::RelayerConfig, HeartbeatEvent, ProxyError, Result},
crate::proxy::{relayer_stage::RelayerConfig, HeartbeatEvent, ProxyError, ProxyResult},
crossbeam_channel::{select, tick, Receiver, Sender},
solana_client::connection_cache::Protocol,
solana_gossip::{cluster_info::ClusterInfo, contact_info},
Expand Down Expand Up @@ -79,8 +79,9 @@ impl FetchStageManager {
packet_intercept_rx: &Receiver<PacketBatch>,
packet_tx: &Sender<PacketBatch>,
exit: &Arc<AtomicBool>,
) -> Result<()> {
) -> ProxyResult<()> {
// Contact info to gossip to the network if no heartbeats are received from relayer

let my_fallback_contact_info = cluster_info.my_contact_info();
let local_relayer_config = global_relayer_config.lock().unwrap().clone();

Expand Down
2 changes: 1 addition & 1 deletion core/src/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use {
tonic::Status,
};

pub type Result<T> = std::result::Result<T, ProxyError>;
pub type ProxyResult<T> = Result<T, ProxyError>;
type HeartbeatEvent = (SocketAddr, SocketAddr);

#[derive(Error, Debug)]
Expand Down
8 changes: 4 additions & 4 deletions core/src/proxy/relayer_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ impl RelayerStage {
banking_packet_sender: &BankingPacketSender,
exit: &Arc<AtomicBool>,
connection_timeout: &Duration,
) -> crate::proxy::Result<()> {
) -> crate::proxy::ProxyResult<()> {
// Get a copy of configs here in case they have changed at runtime
let keypair = cluster_info.keypair().clone();

Expand Down Expand Up @@ -284,7 +284,7 @@ impl RelayerStage {
keypair: Arc<Keypair>,
cluster_info: &Arc<ClusterInfo>,
connection_timeout: &Duration,
) -> crate::proxy::Result<()> {
) -> crate::proxy::ProxyResult<()> {
let heartbeat_event: HeartbeatEvent = {
let tpu_config = timeout(
*connection_timeout,
Expand Down Expand Up @@ -354,7 +354,7 @@ impl RelayerStage {
keypair: Arc<Keypair>,
cluster_info: &Arc<ClusterInfo>,
connection_timeout: &Duration,
) -> crate::proxy::Result<()> {
) -> crate::proxy::ProxyResult<()> {
const METRICS_TICK: Duration = Duration::from_secs(1);
let refresh_within_s: u64 = METRICS_TICK.as_secs().saturating_mul(3).saturating_div(2);

Expand Down Expand Up @@ -433,7 +433,7 @@ impl RelayerStage {
trust_packets: bool,
banking_packet_sender: &BankingPacketSender,
relayer_stats: &mut RelayerStageStats,
) -> crate::proxy::Result<()> {
) -> crate::proxy::ProxyResult<()> {
match subscribe_packets_resp.msg {
None => {
saturating_add_assign!(relayer_stats.num_empty_messages, 1);
Expand Down

0 comments on commit 432784b

Please sign in to comment.