From ef945e1259db853a95fb02e1254a3a6f7d446385 Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Mon, 26 Feb 2024 23:47:57 -0500 Subject: [PATCH] feat(udf): support client side load balancer for UDF (#15200) --- Cargo.lock | 114 +++++++++++++++++++++++++++++- e2e_test/error_ui/simple/main.slt | 22 ++++-- src/expr/udf/Cargo.toml | 2 + src/expr/udf/src/external.rs | 68 +++++++++++++----- 4 files changed, 184 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 34325a869c18..676be08f8703 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4578,6 +4578,23 @@ dependencies = [ "stable_deref_trait", ] +[[package]] +name = "ginepro" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eedbff62a689be48f58f32571dbf3d60c4a73b39740141dfe7ac942536ea27f7" +dependencies = [ + "anyhow", + "async-trait", + "http 0.2.9", + "thiserror", + "tokio", + "tonic 0.10.2", + "tower", + "tracing", + "trust-dns-resolver", +] + [[package]] name = "glob" version = "0.3.1" @@ -5106,6 +5123,16 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" +[[package]] +name = "idna" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d20d6b07bfbc108882d88ed8e37d39636dcc260e15e30c45e6ba089610b917c" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + [[package]] name = "idna" version = "0.5.0" @@ -5250,6 +5277,18 @@ version = "2.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a611371471e98973dbcab4e0ec66c31a10bc356eeb4d54a0e05eac8158fe38c" +[[package]] +name = "ipconfig" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b58db92f96b720de98181bbbe63c831e87005ab460c1bf306eb2622b4707997f" +dependencies = [ + "socket2 0.5.3", + "widestring", + "windows-sys 0.48.0", + "winreg", +] + [[package]] name = "ipnet" version = "2.8.0" @@ -5764,6 +5803,15 @@ dependencies = [ "hashbrown 0.14.0", ] +[[package]] +name = "lru-cache" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31e24f1ad8321ca0e8a1e0ac13f23cb668e6f5466c2c57319f6a5cf1cc8e3b1c" +dependencies = [ + "linked-hash-map", +] + [[package]] name = "lz4" version = "1.24.0" @@ -8412,6 +8460,16 @@ dependencies = [ "winreg", ] +[[package]] +name = "resolv-conf" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52e44394d2086d010551b14b53b1f24e31647570cd1deb0379e2c21b329aba00" +dependencies = [ + "hostname", + "quick-error", +] + [[package]] name = "retain_mut" version = "0.1.7" @@ -10020,7 +10078,9 @@ dependencies = [ "arrow-schema 50.0.0", "arrow-select 50.0.0", "cfg-or-panic", + "futures", "futures-util", + "ginepro", "madsim-tokio", "madsim-tonic", "prometheus", @@ -12451,6 +12511,52 @@ version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "622b09ce2fe2df4618636fb92176d205662f59803f39e70d1c333393082de96c" +[[package]] +name = "trust-dns-proto" +version = "0.23.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3119112651c157f4488931a01e586aa459736e9d6046d3bd9105ffb69352d374" +dependencies = [ + "async-trait", + "cfg-if", + "data-encoding", + "enum-as-inner", + "futures-channel", + "futures-io", + "futures-util", + "idna 0.4.0", + "ipnet", + "once_cell", + "rand", + "smallvec", + "thiserror", + "tinyvec", + "tokio", + "tracing", + "url", +] + +[[package]] +name = "trust-dns-resolver" +version = "0.23.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10a3e6c3aff1718b3c73e395d1f35202ba2ffa847c6a62eea0db8fb4cfe30be6" +dependencies = [ + "cfg-if", + "futures-util", + "ipconfig", + "lru-cache", + "once_cell", + "parking_lot 0.12.1", + "rand", + "resolv-conf", + "smallvec", + "thiserror", + "tokio", + "tracing", + "trust-dns-proto", +] + [[package]] name = "try-lock" version = "0.2.4" @@ -12602,7 +12708,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633" dependencies = [ "form_urlencoded", - "idna", + "idna 0.5.0", "percent-encoding", "serde", ] @@ -13291,6 +13397,12 @@ dependencies = [ "web-sys", ] +[[package]] +name = "widestring" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "653f141f39ec16bba3c5abe400a0c60da7468261cc2cbf36805022876bc721a8" + [[package]] name = "wiggle" version = "16.0.0" diff --git a/e2e_test/error_ui/simple/main.slt b/e2e_test/error_ui/simple/main.slt index b4cebbdfeff7..a0eba4a60d09 100644 --- a/e2e_test/error_ui/simple/main.slt +++ b/e2e_test/error_ui/simple/main.slt @@ -9,15 +9,27 @@ Near "selet" statement error -create function int_42() returns int as int_42 using link 'localhost:8815'; +create function int_42() returns int as int_42 using link '555.0.0.1:8815'; +---- +db error: ERROR: Failed to run the query + +Caused by: + Flight service error: invalid address: 555.0.0.1:8815, err: failed to parse address: http://555.0.0.1:8815: invalid IPv4 address + + +statement error +create function int_42() returns int as int_42 using link '55.55.55.55:5555'; ---- db error: ERROR: Failed to run the query Caused by these errors (recent errors listed first): - 1: failed to connect to UDF service - 2: transport error - 3: error trying to connect - 4: invalid URL, scheme is missing + 1: failed to check UDF signature + 2: failed to send requests to UDF service + 3: status: Unavailable, message: "error trying to connect: tcp connect error: deadline has elapsed", details: [], metadata: MetadataMap { headers: {} } + 4: transport error + 5: error trying to connect + 6: tcp connect error + 7: deadline has elapsed statement error diff --git a/src/expr/udf/Cargo.toml b/src/expr/udf/Cargo.toml index 838f2e62958c..b17ad7acadfc 100644 --- a/src/expr/udf/Cargo.toml +++ b/src/expr/udf/Cargo.toml @@ -16,7 +16,9 @@ arrow-flight = { workspace = true } arrow-schema = { workspace = true } arrow-select = { workspace = true } cfg-or-panic = "0.2" +futures = "0.3" futures-util = "0.3.28" +ginepro = "0.7.0" prometheus = "0.13" risingwave_common = { workspace = true } static_assertions = "1" diff --git a/src/expr/udf/src/external.rs b/src/expr/udf/src/external.rs index f8d4cf6cc379..7eb8e1d9f9b5 100644 --- a/src/expr/udf/src/external.rs +++ b/src/expr/udf/src/external.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::str::FromStr; use std::time::Duration; use arrow_array::RecordBatch; @@ -22,13 +23,23 @@ use arrow_flight::flight_service_client::FlightServiceClient; use arrow_flight::{FlightData, FlightDescriptor}; use arrow_schema::Schema; use cfg_or_panic::cfg_or_panic; -use futures_util::{stream, Stream, StreamExt, TryStreamExt}; +use futures_util::{stream, FutureExt, Stream, StreamExt, TryStreamExt}; +use ginepro::{LoadBalancedChannel, ResolutionStrategy}; +use risingwave_common::util::addr::HostAddr; use thiserror_ext::AsReport; +use tokio::time::Duration as TokioDuration; use tonic::transport::Channel; use crate::metrics::GLOBAL_METRICS; use crate::{Error, Result}; +// Interval between two successive probes of the UDF DNS. +const DNS_PROBE_INTERVAL_SECS: u64 = 5; +// Timeout duration for performing an eager DNS resolution. +const EAGER_DNS_RESOLVE_TIMEOUT_SECS: u64 = 5; +const REQUEST_TIMEOUT_SECS: u64 = 5; +const CONNECT_TIMEOUT_SECS: u64 = 5; + /// Client for external function service based on Arrow Flight. #[derive(Debug)] pub struct ArrowFlightUdfClient { @@ -41,25 +52,50 @@ pub struct ArrowFlightUdfClient { impl ArrowFlightUdfClient { /// Connect to a UDF service. pub async fn connect(addr: &str) -> Result { - let conn = tonic::transport::Endpoint::new(addr.to_string())? - .timeout(Duration::from_secs(5)) - .connect_timeout(Duration::from_secs(5)) - .connect() - .await?; - let client = FlightServiceClient::new(conn); - Ok(Self { - client, - addr: addr.into(), - }) + Self::connect_inner( + addr, + ResolutionStrategy::Eager { + timeout: TokioDuration::from_secs(EAGER_DNS_RESOLVE_TIMEOUT_SECS), + }, + ) + .await } /// Connect to a UDF service lazily (i.e. only when the first request is sent). pub fn connect_lazy(addr: &str) -> Result { - let conn = tonic::transport::Endpoint::new(addr.to_string())? - .timeout(Duration::from_secs(5)) - .connect_timeout(Duration::from_secs(5)) - .connect_lazy(); - let client = FlightServiceClient::new(conn); + Self::connect_inner(addr, ResolutionStrategy::Lazy) + .now_or_never() + .unwrap() + } + + async fn connect_inner( + mut addr: &str, + resolution_strategy: ResolutionStrategy, + ) -> Result { + if addr.starts_with("http://") { + addr = addr.strip_prefix("http://").unwrap(); + } + if addr.starts_with("https://") { + addr = addr.strip_prefix("https://").unwrap(); + } + let host_addr = HostAddr::from_str(addr).map_err(|e| { + Error::service_error(format!("invalid address: {}, err: {}", addr, e.as_report())) + })?; + let channel = LoadBalancedChannel::builder((host_addr.host.clone(), host_addr.port)) + .dns_probe_interval(std::time::Duration::from_secs(DNS_PROBE_INTERVAL_SECS)) + .timeout(Duration::from_secs(REQUEST_TIMEOUT_SECS)) + .connect_timeout(Duration::from_secs(CONNECT_TIMEOUT_SECS)) + .resolution_strategy(resolution_strategy) + .channel() + .await + .map_err(|e| { + Error::service_error(format!( + "failed to create LoadBalancedChannel, address: {}, err: {}", + host_addr, + e.as_report() + )) + })?; + let client = FlightServiceClient::new(channel.into()); Ok(Self { client, addr: addr.into(),