From e621000a156b3a29502cbfc77d966a227e40c02b Mon Sep 17 00:00:00 2001 From: NickAc <32451103+NickAcPT@users.noreply.github.com> Date: Sat, 2 Nov 2024 12:56:43 +0000 Subject: [PATCH] Implement load-balancing for outgoing requests --- nmsr-aas/Cargo.toml | 4 +- nmsr-aas/src/model/armor/manager.rs | 2 +- nmsr-aas/src/model/resolver/mojang/client.rs | 2 + nmsr-aas/src/utils/config.rs | 11 +- nmsr-aas/src/utils/http_client.rs | 134 +++++++++++++++++-- 5 files changed, 135 insertions(+), 18 deletions(-) diff --git a/nmsr-aas/Cargo.toml b/nmsr-aas/Cargo.toml index c92f416..71631bc 100644 --- a/nmsr-aas/Cargo.toml +++ b/nmsr-aas/Cargo.toml @@ -39,7 +39,7 @@ serde_json = { workspace = true } base64 = "0.22" # Hyper - HTTP client -hyper = { version = "1.4", features = ["client"] } +hyper = { version = "1.5", features = ["client"] } hyper-util = { version = "0.1", features = [ "client", "client-legacy", @@ -91,7 +91,7 @@ thiserror = { workspace = true } derive_more = { workspace = true } # Tower - Service framework -tower = { version = "0.5", features = ["buffer", "limit", "timeout", "retry"] } +tower = { version = "0.5", features = ["buffer", "limit", "timeout", "retry", "balance"] } tower-http = { version = "0.6", features = [ "set-header", "trace", diff --git a/nmsr-aas/src/model/armor/manager.rs b/nmsr-aas/src/model/armor/manager.rs index d96697a..53e1468 100644 --- a/nmsr-aas/src/model/armor/manager.rs +++ b/nmsr-aas/src/model/armor/manager.rs @@ -83,7 +83,7 @@ impl VanillaMinecraftArmorManager { .explain("Unable to create armor cache folder".to_string())?; let manager = Self { - client: NmsrHttpClient::new(20, 5 * 60 /* 5 minutes */, 5), + client: NmsrHttpClient::new(20, 5 * 60 /* 5 minutes */, 5, &[]), material_location, trims_location, }; diff --git a/nmsr-aas/src/model/resolver/mojang/client.rs b/nmsr-aas/src/model/resolver/mojang/client.rs index 5160896..58557c5 100644 --- a/nmsr-aas/src/model/resolver/mojang/client.rs +++ b/nmsr-aas/src/model/resolver/mojang/client.rs @@ -38,6 +38,7 @@ impl MojangClient { mojank.session_server_rate_limit, mojank.session_server_timeout, mojank.session_server_retries, + &mojank.outgoing_addresses ), name_lookup_client: NmsrHttpClient::new( mojank @@ -45,6 +46,7 @@ impl MojangClient { .unwrap_or(mojank.session_server_rate_limit), mojank.session_server_timeout, mojank.session_server_retries, + &mojank.outgoing_addresses ), mojank_config: mojank, }) diff --git a/nmsr-aas/src/utils/config.rs b/nmsr-aas/src/utils/config.rs index ccd04ef..d79d51b 100644 --- a/nmsr-aas/src/utils/config.rs +++ b/nmsr-aas/src/utils/config.rs @@ -1,8 +1,5 @@ use std::{ - collections::HashMap, - fs::Metadata, - path::PathBuf, - time::{Duration, SystemTime}, + collections::HashMap, fs::Metadata, net::IpAddr, path::PathBuf, time::{Duration, SystemTime} }; use chrono::{DateTime, Local}; @@ -69,6 +66,7 @@ impl Default for ModelCacheConfiguration { } } } + #[derive(Serialize, Deserialize, Clone, Debug)] #[serde(default)] pub struct MojankConfiguration { @@ -118,6 +116,9 @@ pub struct MojankConfiguration { /// The template to use for resolving player cape textures. pub textures_server_cape_url_template: String, + + /// The outgoing addresses for load-balancing requests to Mojang servers. + pub outgoing_addresses: Vec, } pub const DEFAULT_TEXTURES_SERVER_SKIN_URL_TEMPLATE: &str = @@ -146,6 +147,8 @@ impl Default for MojankConfiguration { .to_string(), textures_server_cape_url_template: DEFAULT_TEXTURES_SERVER_SKIN_URL_TEMPLATE .to_string(), + + outgoing_addresses: Vec::new(), } } } diff --git a/nmsr-aas/src/utils/http_client.rs b/nmsr-aas/src/utils/http_client.rs index b96a5b0..e8d2644 100644 --- a/nmsr-aas/src/utils/http_client.rs +++ b/nmsr-aas/src/utils/http_client.rs @@ -9,11 +9,15 @@ use hyper_util::{ }; use std::{ future::{ready, Ready}, + net::IpAddr, time::Duration, }; use tower::{ + balance::p2c::Balance, buffer::Buffer, + discover::ServiceList, limit::RateLimit, + load::{CompleteOnResponse, PendingRequestsDiscover}, retry::{Policy, Retry}, timeout::{Timeout, TimeoutLayer}, Service, ServiceBuilder, ServiceExt, @@ -51,13 +55,42 @@ pub(crate) type NmsrTraceLayer = Trace< DefaultOnFailure, >; -pub struct NmsrHttpClient { - inner: Buffer, >> as Service>>::Future>, +pub(crate) type HttpClientInnerService = + Buffer< + Request, + >> as Service< + Request, + >>::Future, + >; + +pub enum NmsrHttpClient { + SingleIp { + inner: HttpClientInnerService, + }, + LoadBalanced { + inner: Buffer< + Request, + >>, + Request, + > as Service>>::Future, + >, + }, } impl NmsrHttpClient { - pub fn new(rate_limit_per_second: u64, request_timeout_seconds: u64, request_retries_count: usize) -> Self { - create_http_client(rate_limit_per_second, request_timeout_seconds, request_retries_count) + pub fn new( + rate_limit_per_second: u64, + request_timeout_seconds: u64, + request_retries_count: usize, + client_ips: &[IpAddr], + ) -> Self { + create_http_client( + rate_limit_per_second, + request_timeout_seconds, + request_retries_count, + client_ips, + ) } #[instrument(skip(self, parent_span, on_error), parent = parent_span, err)] @@ -75,8 +108,20 @@ impl NmsrHttpClient { unreachable!("Empty body should not error: {}", e) })))?; - let response = { - let mut svc = self.inner.clone(); + let response = if let NmsrHttpClient::SingleIp { inner } = self { + let mut svc = inner.clone(); + + let service = svc + .ready() + .await + .map_err(MojangRequestError::BoxedRequestError)?; + + service + .call(request) + .await + .map_err(MojangRequestError::BoxedRequestError)? + } else if let NmsrHttpClient::LoadBalanced { inner } = self { + let mut svc = inner.clone(); let service = svc .ready() @@ -87,6 +132,8 @@ impl NmsrHttpClient { .call(request) .await .map_err(MojangRequestError::BoxedRequestError)? + } else { + unreachable!("Invalid NmsrHttpClient variant") }; if response.status() != StatusCode::OK { @@ -104,17 +151,78 @@ impl NmsrHttpClient { } } -fn create_http_client(rate_limit_per_second: u64, request_timeout_seconds: u64, request_retries_count: usize) -> NmsrHttpClient { +fn create_http_client( + rate_limit_per_second: u64, + request_timeout_seconds: u64, + request_retries_count: usize, + client_ips: &[IpAddr], +) -> NmsrHttpClient { + if client_ips.is_empty() { + create_http_client_internal( + rate_limit_per_second, + request_timeout_seconds, + request_retries_count, + None, + ) + } else if client_ips.len() == 1 { + create_http_client_internal( + rate_limit_per_second, + request_timeout_seconds, + request_retries_count, + Some(client_ips[0]), + ) + } else { + let clients = client_ips + .into_iter() + .map(|ip| { + create_http_client_internal( + rate_limit_per_second, + request_timeout_seconds, + request_retries_count, + Some(*ip), + ) + }) + .flat_map(|svc| { + if let NmsrHttpClient::SingleIp { inner } = svc { + Some(inner) + } else { + None + } + }) + .collect::>(); + + let discover = ServiceList::new(clients); + let load = PendingRequestsDiscover::new(discover, CompleteOnResponse::default()); + let balanced = Balance::new(load); + + let balanced = ServiceBuilder::new() + .buffer(rate_limit_per_second.saturating_mul(2) as usize) + .check_clone() + .service(balanced); + + NmsrHttpClient::LoadBalanced { inner: balanced } + } +} + +fn create_http_client_internal( + rate_limit_per_second: u64, + request_timeout_seconds: u64, + request_retries_count: usize, + client_ip: Option, +) -> NmsrHttpClient { let mut http = HttpConnector::new(); http.set_nodelay(true); http.enforce_http(false); + http.set_local_address(client_ip); let tls = TlsConnector::new().expect("Expected TLS connector to be valid"); let https = HttpsConnector::from((http, tls.into())); // A new higher level client from hyper is in the works, so we gotta use the legacy one - let client = Client::builder(TokioExecutor::new()).build(https); + let client = Client::builder(TokioExecutor::new()) + .http2_keep_alive_while_idle(true) + .build(https); let tracing = TraceLayer::new_for_http().on_body_chunk(()).on_eos(()); @@ -135,11 +243,11 @@ fn create_http_client(rate_limit_per_second: u64, request_timeout_seconds: u64, .check_clone() .service(client); - NmsrHttpClient { inner: service } + NmsrHttpClient::SingleIp { inner: service } } #[derive(Copy, Clone, Debug)] -struct MojankRetryPolicy { +pub(crate) struct MojankRetryPolicy { attempts: usize, } @@ -152,7 +260,11 @@ impl MojankRetryPolicy { impl Policy, Res, P> for MojankRetryPolicy { type Future = Ready<()>; - fn retry(&mut self, _req: &mut Request, result: &mut Result) -> Option { + fn retry( + &mut self, + _req: &mut Request, + result: &mut Result, + ) -> Option { match result { Ok(_) => None, Err(_) => {