Skip to content

Commit

Permalink
feat(udf): support client side load balancer for UDF
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang committed Feb 23, 2024
1 parent d2c547a commit 6ec6823
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 15 deletions.
114 changes: 113 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/expr/udf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ arrow-flight = { workspace = true }
arrow-schema = { workspace = true }
arrow-select = { workspace = true }
cfg-or-panic = "0.2"
futures = "0.3"
futures-util = "0.3.28"
ginepro = "0.7.0"
prometheus = "0.13"
risingwave_common = { workspace = true }
static_assertions = "1"
Expand Down
48 changes: 34 additions & 14 deletions src/expr/udf/src/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::str::FromStr;
use std::time::Duration;

use arrow_array::RecordBatch;
Expand All @@ -22,8 +23,12 @@ use arrow_flight::flight_service_client::FlightServiceClient;
use arrow_flight::{FlightData, FlightDescriptor};
use arrow_schema::Schema;
use cfg_or_panic::cfg_or_panic;
use futures::executor::block_on;
use futures_util::{stream, Stream, StreamExt, TryStreamExt};
use ginepro::{LoadBalancedChannel, ResolutionStrategy};
use risingwave_common::util::addr::HostAddr;
use thiserror_ext::AsReport;
use tokio::time::Duration as TokioDuration;
use tonic::transport::Channel;

use crate::metrics::GLOBAL_METRICS;
Expand All @@ -41,25 +46,40 @@ pub struct ArrowFlightUdfClient {
impl ArrowFlightUdfClient {
/// Connect to a UDF service.
pub async fn connect(addr: &str) -> Result<Self> {
let conn = tonic::transport::Endpoint::new(addr.to_string())?
.timeout(Duration::from_secs(5))
.connect_timeout(Duration::from_secs(5))
.connect()
.await?;
let client = FlightServiceClient::new(conn);
Ok(Self {
client,
addr: addr.into(),
})
Self::connect_inner(
addr,
ResolutionStrategy::Eager {
timeout: TokioDuration::from_secs(5),
},
)
.await
}

/// Connect to a UDF service lazily (i.e. only when the first request is sent).
/// Connect to a UDF service lazily (i.e. only fetch IP list from DNS when the first request is sent).
pub fn connect_lazy(addr: &str) -> Result<Self> {
let conn = tonic::transport::Endpoint::new(addr.to_string())?
block_on(async { Self::connect_inner(addr, ResolutionStrategy::Lazy).await })
}

async fn connect_inner(addr: &str, resolution_strategy: ResolutionStrategy) -> Result<Self> {
let addr = addr.strip_prefix("http://").ok_or_else(|| {
Error::service_error(format!("udf address must starts with http://: {}", addr))
})?;
let host_addr = HostAddr::from_str(addr)
.map_err(|e| Error::service_error(format!("invalid address: {}, err: {}", addr, e)))?;
let channel = LoadBalancedChannel::builder((host_addr.host.clone(), host_addr.port))
.dns_probe_interval(std::time::Duration::from_secs(5))
.timeout(Duration::from_secs(5))
.connect_timeout(Duration::from_secs(5))
.connect_lazy();
let client = FlightServiceClient::new(conn);
.resolution_strategy(resolution_strategy)
.channel()
.await
.map_err(|e| {
Error::service_error(format!(
"failed to create LoadBalancedChannel, address: {}, err: {}",
host_addr, e
))
})?;
let client = FlightServiceClient::new(channel.into());
Ok(Self {
client,
addr: addr.into(),
Expand Down

0 comments on commit 6ec6823

Please sign in to comment.