diff --git a/Cargo.lock b/Cargo.lock index ed7677d652954..02c889ab96620 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4593,6 +4593,23 @@ dependencies = [ "stable_deref_trait", ] +[[package]] +name = "ginepro" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eedbff62a689be48f58f32571dbf3d60c4a73b39740141dfe7ac942536ea27f7" +dependencies = [ + "anyhow", + "async-trait", + "http 0.2.9", + "thiserror", + "tokio", + "tonic 0.10.2", + "tower", + "tracing", + "trust-dns-resolver", +] + [[package]] name = "glob" version = "0.3.1" @@ -5121,6 +5138,16 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" +[[package]] +name = "idna" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d20d6b07bfbc108882d88ed8e37d39636dcc260e15e30c45e6ba089610b917c" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + [[package]] name = "idna" version = "0.5.0" @@ -5265,6 +5292,18 @@ version = "2.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a611371471e98973dbcab4e0ec66c31a10bc356eeb4d54a0e05eac8158fe38c" +[[package]] +name = "ipconfig" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b58db92f96b720de98181bbbe63c831e87005ab460c1bf306eb2622b4707997f" +dependencies = [ + "socket2 0.5.3", + "widestring", + "windows-sys 0.48.0", + "winreg", +] + [[package]] name = "ipnet" version = "2.8.0" @@ -5779,6 +5818,15 @@ dependencies = [ "hashbrown 0.14.0", ] +[[package]] +name = "lru-cache" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31e24f1ad8321ca0e8a1e0ac13f23cb668e6f5466c2c57319f6a5cf1cc8e3b1c" +dependencies = [ + "linked-hash-map", +] + [[package]] name = "lz4" version = "1.24.0" @@ -8422,6 +8470,16 @@ dependencies = [ "winreg", ] +[[package]] +name = "resolv-conf" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52e44394d2086d010551b14b53b1f24e31647570cd1deb0379e2c21b329aba00" +dependencies = [ + "hostname", + "quick-error", +] + [[package]] name = "retain_mut" version = "0.1.7" @@ -10036,7 +10094,9 @@ dependencies = [ "arrow-schema 50.0.0", "arrow-select 50.0.0", "cfg-or-panic", + "futures", "futures-util", + "ginepro", "madsim-tokio", "madsim-tonic", "prometheus", @@ -12467,6 +12527,52 @@ version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "622b09ce2fe2df4618636fb92176d205662f59803f39e70d1c333393082de96c" +[[package]] +name = "trust-dns-proto" +version = "0.23.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3119112651c157f4488931a01e586aa459736e9d6046d3bd9105ffb69352d374" +dependencies = [ + "async-trait", + "cfg-if", + "data-encoding", + "enum-as-inner", + "futures-channel", + "futures-io", + "futures-util", + "idna 0.4.0", + "ipnet", + "once_cell", + "rand", + "smallvec", + "thiserror", + "tinyvec", + "tokio", + "tracing", + "url", +] + +[[package]] +name = "trust-dns-resolver" +version = "0.23.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10a3e6c3aff1718b3c73e395d1f35202ba2ffa847c6a62eea0db8fb4cfe30be6" +dependencies = [ + "cfg-if", + "futures-util", + "ipconfig", + "lru-cache", + "once_cell", + "parking_lot 0.12.1", + "rand", + "resolv-conf", + "smallvec", + "thiserror", + "tokio", + "tracing", + "trust-dns-proto", +] + [[package]] name = "try-lock" version = "0.2.4" @@ -12618,7 +12724,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633" dependencies = [ "form_urlencoded", - "idna", + "idna 0.5.0", "percent-encoding", "serde", ] @@ -13307,6 +13413,12 @@ dependencies = [ "web-sys", ] +[[package]] +name = "widestring" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "653f141f39ec16bba3c5abe400a0c60da7468261cc2cbf36805022876bc721a8" + [[package]] name = "wiggle" version = "17.0.0" diff --git a/src/expr/udf/Cargo.toml b/src/expr/udf/Cargo.toml index 838f2e62958c3..b17ad7acadfc1 100644 --- a/src/expr/udf/Cargo.toml +++ b/src/expr/udf/Cargo.toml @@ -16,7 +16,9 @@ arrow-flight = { workspace = true } arrow-schema = { workspace = true } arrow-select = { workspace = true } cfg-or-panic = "0.2" +futures = "0.3" futures-util = "0.3.28" +ginepro = "0.7.0" prometheus = "0.13" risingwave_common = { workspace = true } static_assertions = "1" diff --git a/src/expr/udf/src/external.rs b/src/expr/udf/src/external.rs index f8d4cf6cc379e..7e9ac8eb068bd 100644 --- a/src/expr/udf/src/external.rs +++ b/src/expr/udf/src/external.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::str::FromStr; use std::time::Duration; use arrow_array::RecordBatch; @@ -22,8 +23,12 @@ use arrow_flight::flight_service_client::FlightServiceClient; use arrow_flight::{FlightData, FlightDescriptor}; use arrow_schema::Schema; use cfg_or_panic::cfg_or_panic; +use futures::executor::block_on; use futures_util::{stream, Stream, StreamExt, TryStreamExt}; +use ginepro::{LoadBalancedChannel, ResolutionStrategy}; +use risingwave_common::util::addr::HostAddr; use thiserror_ext::AsReport; +use tokio::time::Duration as TokioDuration; use tonic::transport::Channel; use crate::metrics::GLOBAL_METRICS; @@ -41,25 +46,40 @@ pub struct ArrowFlightUdfClient { impl ArrowFlightUdfClient { /// Connect to a UDF service. pub async fn connect(addr: &str) -> Result { - let conn = tonic::transport::Endpoint::new(addr.to_string())? - .timeout(Duration::from_secs(5)) - .connect_timeout(Duration::from_secs(5)) - .connect() - .await?; - let client = FlightServiceClient::new(conn); - Ok(Self { - client, - addr: addr.into(), - }) + Self::connect_inner( + addr, + ResolutionStrategy::Eager { + timeout: TokioDuration::from_secs(5), + }, + ) + .await } - /// Connect to a UDF service lazily (i.e. only when the first request is sent). + /// Connect to a UDF service lazily (i.e. only fetch IP list from DNS when the first request is sent). pub fn connect_lazy(addr: &str) -> Result { - let conn = tonic::transport::Endpoint::new(addr.to_string())? + block_on(async { Self::connect_inner(addr, ResolutionStrategy::Lazy).await }) + } + + async fn connect_inner(addr: &str, resolution_strategy: ResolutionStrategy) -> Result { + let addr = addr.strip_prefix("http://").ok_or_else(|| { + Error::service_error(format!("udf address must starts with http://: {}", addr)) + })?; + let host_addr = HostAddr::from_str(addr) + .map_err(|e| Error::service_error(format!("invalid address: {}, err: {}", addr, e)))?; + let channel = LoadBalancedChannel::builder((host_addr.host.clone(), host_addr.port)) + .dns_probe_interval(std::time::Duration::from_secs(5)) .timeout(Duration::from_secs(5)) .connect_timeout(Duration::from_secs(5)) - .connect_lazy(); - let client = FlightServiceClient::new(conn); + .resolution_strategy(resolution_strategy) + .channel() + .await + .map_err(|e| { + Error::service_error(format!( + "failed to create LoadBalancedChannel, address: {}, err: {}", + host_addr, e + )) + })?; + let client = FlightServiceClient::new(channel.into()); Ok(Self { client, addr: addr.into(),