Skip to content

Commit

Permalink
feat(udf): support client side load balancer for UDF (#15200)
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang authored Feb 27, 2024
1 parent 8cc30b1 commit 1ab3fdd
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 22 deletions.
114 changes: 113 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 17 additions & 5 deletions e2e_test/error_ui/simple/main.slt
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,27 @@ Near "selet"


statement error
create function int_42() returns int as int_42 using link 'localhost:8815';
create function int_42() returns int as int_42 using link '555.0.0.1:8815';
----
db error: ERROR: Failed to run the query

Caused by:
Flight service error: invalid address: 555.0.0.1:8815, err: failed to parse address: http://555.0.0.1:8815: invalid IPv4 address


statement error
create function int_42() returns int as int_42 using link '55.55.55.55:5555';
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: failed to connect to UDF service
2: transport error
3: error trying to connect
4: invalid URL, scheme is missing
1: failed to check UDF signature
2: failed to send requests to UDF service
3: status: Unavailable, message: "error trying to connect: tcp connect error: deadline has elapsed", details: [], metadata: MetadataMap { headers: {} }
4: transport error
5: error trying to connect
6: tcp connect error
7: deadline has elapsed


statement error
Expand Down
2 changes: 2 additions & 0 deletions src/expr/udf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ arrow-flight = { workspace = true }
arrow-schema = { workspace = true }
arrow-select = { workspace = true }
cfg-or-panic = "0.2"
futures = "0.3"
futures-util = "0.3.28"
ginepro = "0.7.0"
prometheus = "0.13"
risingwave_common = { workspace = true }
static_assertions = "1"
Expand Down
68 changes: 52 additions & 16 deletions src/expr/udf/src/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::str::FromStr;
use std::time::Duration;

use arrow_array::RecordBatch;
Expand All @@ -22,13 +23,23 @@ use arrow_flight::flight_service_client::FlightServiceClient;
use arrow_flight::{FlightData, FlightDescriptor};
use arrow_schema::Schema;
use cfg_or_panic::cfg_or_panic;
use futures_util::{stream, Stream, StreamExt, TryStreamExt};
use futures_util::{stream, FutureExt, Stream, StreamExt, TryStreamExt};
use ginepro::{LoadBalancedChannel, ResolutionStrategy};
use risingwave_common::util::addr::HostAddr;
use thiserror_ext::AsReport;
use tokio::time::Duration as TokioDuration;
use tonic::transport::Channel;

use crate::metrics::GLOBAL_METRICS;
use crate::{Error, Result};

// Interval between two successive probes of the UDF DNS.
const DNS_PROBE_INTERVAL_SECS: u64 = 5;
// Timeout duration for performing an eager DNS resolution.
const EAGER_DNS_RESOLVE_TIMEOUT_SECS: u64 = 5;
const REQUEST_TIMEOUT_SECS: u64 = 5;
const CONNECT_TIMEOUT_SECS: u64 = 5;

/// Client for external function service based on Arrow Flight.
#[derive(Debug)]
pub struct ArrowFlightUdfClient {
Expand All @@ -41,25 +52,50 @@ pub struct ArrowFlightUdfClient {
impl ArrowFlightUdfClient {
/// Connect to a UDF service.
pub async fn connect(addr: &str) -> Result<Self> {
let conn = tonic::transport::Endpoint::new(addr.to_string())?
.timeout(Duration::from_secs(5))
.connect_timeout(Duration::from_secs(5))
.connect()
.await?;
let client = FlightServiceClient::new(conn);
Ok(Self {
client,
addr: addr.into(),
})
Self::connect_inner(
addr,
ResolutionStrategy::Eager {
timeout: TokioDuration::from_secs(EAGER_DNS_RESOLVE_TIMEOUT_SECS),
},
)
.await
}

/// Connect to a UDF service lazily (i.e. only when the first request is sent).
pub fn connect_lazy(addr: &str) -> Result<Self> {
let conn = tonic::transport::Endpoint::new(addr.to_string())?
.timeout(Duration::from_secs(5))
.connect_timeout(Duration::from_secs(5))
.connect_lazy();
let client = FlightServiceClient::new(conn);
Self::connect_inner(addr, ResolutionStrategy::Lazy)
.now_or_never()
.unwrap()
}

async fn connect_inner(
mut addr: &str,
resolution_strategy: ResolutionStrategy,
) -> Result<Self> {
if addr.starts_with("http://") {
addr = addr.strip_prefix("http://").unwrap();
}
if addr.starts_with("https://") {
addr = addr.strip_prefix("https://").unwrap();
}
let host_addr = HostAddr::from_str(addr).map_err(|e| {
Error::service_error(format!("invalid address: {}, err: {}", addr, e.as_report()))
})?;
let channel = LoadBalancedChannel::builder((host_addr.host.clone(), host_addr.port))
.dns_probe_interval(std::time::Duration::from_secs(DNS_PROBE_INTERVAL_SECS))
.timeout(Duration::from_secs(REQUEST_TIMEOUT_SECS))
.connect_timeout(Duration::from_secs(CONNECT_TIMEOUT_SECS))
.resolution_strategy(resolution_strategy)
.channel()
.await
.map_err(|e| {
Error::service_error(format!(
"failed to create LoadBalancedChannel, address: {}, err: {}",
host_addr,
e.as_report()
))
})?;
let client = FlightServiceClient::new(channel.into());
Ok(Self {
client,
addr: addr.into(),
Expand Down

0 comments on commit 1ab3fdd

Please sign in to comment.