Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(udf): support client side load balancer for UDF #15200

Merged
merged 5 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 113 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 17 additions & 5 deletions e2e_test/error_ui/simple/main.slt
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,27 @@ Near "selet"


statement error
create function int_42() returns int as int_42 using link 'localhost:8815';
create function int_42() returns int as int_42 using link '555.0.0.1:8815';
----
db error: ERROR: Failed to run the query

Caused by:
Flight service error: invalid address: 555.0.0.1:8815, err: failed to parse address: http://555.0.0.1:8815: invalid IPv4 address


statement error
create function int_42() returns int as int_42 using link '55.55.55.55:5555';
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: failed to connect to UDF service
2: transport error
3: error trying to connect
4: invalid URL, scheme is missing
1: failed to check UDF signature
2: failed to send requests to UDF service
3: status: Unavailable, message: "error trying to connect: tcp connect error: deadline has elapsed", details: [], metadata: MetadataMap { headers: {} }
4: transport error
5: error trying to connect
6: tcp connect error
7: deadline has elapsed


statement error
Expand Down
2 changes: 2 additions & 0 deletions src/expr/udf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ arrow-flight = { workspace = true }
arrow-schema = { workspace = true }
arrow-select = { workspace = true }
cfg-or-panic = "0.2"
futures = "0.3"
futures-util = "0.3.28"
ginepro = "0.7.0"
prometheus = "0.13"
risingwave_common = { workspace = true }
static_assertions = "1"
Expand Down
68 changes: 52 additions & 16 deletions src/expr/udf/src/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::str::FromStr;
use std::time::Duration;

use arrow_array::RecordBatch;
Expand All @@ -22,13 +23,23 @@ use arrow_flight::flight_service_client::FlightServiceClient;
use arrow_flight::{FlightData, FlightDescriptor};
use arrow_schema::Schema;
use cfg_or_panic::cfg_or_panic;
use futures_util::{stream, Stream, StreamExt, TryStreamExt};
use futures_util::{stream, FutureExt, Stream, StreamExt, TryStreamExt};
use ginepro::{LoadBalancedChannel, ResolutionStrategy};
use risingwave_common::util::addr::HostAddr;
use thiserror_ext::AsReport;
use tokio::time::Duration as TokioDuration;
use tonic::transport::Channel;

use crate::metrics::GLOBAL_METRICS;
use crate::{Error, Result};

// Interval between two successive probes of the UDF DNS.
const DNS_PROBE_INTERVAL_SECS: u64 = 5;
// Timeout duration for performing an eager DNS resolution.
const EAGER_DNS_RESOLVE_TIMEOUT_SECS: u64 = 5;
const REQUEST_TIMEOUT_SECS: u64 = 5;
const CONNECT_TIMEOUT_SECS: u64 = 5;

/// Client for external function service based on Arrow Flight.
#[derive(Debug)]
pub struct ArrowFlightUdfClient {
Expand All @@ -41,25 +52,50 @@ pub struct ArrowFlightUdfClient {
impl ArrowFlightUdfClient {
/// Connect to a UDF service.
pub async fn connect(addr: &str) -> Result<Self> {
let conn = tonic::transport::Endpoint::new(addr.to_string())?
.timeout(Duration::from_secs(5))
.connect_timeout(Duration::from_secs(5))
.connect()
.await?;
let client = FlightServiceClient::new(conn);
Ok(Self {
client,
addr: addr.into(),
})
Self::connect_inner(
addr,
ResolutionStrategy::Eager {
timeout: TokioDuration::from_secs(EAGER_DNS_RESOLVE_TIMEOUT_SECS),
},
)
.await
}

/// Connect to a UDF service lazily (i.e. only when the first request is sent).
pub fn connect_lazy(addr: &str) -> Result<Self> {
let conn = tonic::transport::Endpoint::new(addr.to_string())?
.timeout(Duration::from_secs(5))
.connect_timeout(Duration::from_secs(5))
.connect_lazy();
let client = FlightServiceClient::new(conn);
Self::connect_inner(addr, ResolutionStrategy::Lazy)
.now_or_never()
.unwrap()
}

async fn connect_inner(
mut addr: &str,
resolution_strategy: ResolutionStrategy,
) -> Result<Self> {
if addr.starts_with("http://") {
addr = addr.strip_prefix("http://").unwrap();
}
if addr.starts_with("https://") {
addr = addr.strip_prefix("https://").unwrap();
}
let host_addr = HostAddr::from_str(addr).map_err(|e| {
Error::service_error(format!("invalid address: {}, err: {}", addr, e.as_report()))
})?;
let channel = LoadBalancedChannel::builder((host_addr.host.clone(), host_addr.port))
KeXiangWang marked this conversation as resolved.
Show resolved Hide resolved
.dns_probe_interval(std::time::Duration::from_secs(DNS_PROBE_INTERVAL_SECS))
.timeout(Duration::from_secs(REQUEST_TIMEOUT_SECS))
.connect_timeout(Duration::from_secs(CONNECT_TIMEOUT_SECS))
.resolution_strategy(resolution_strategy)
.channel()
.await
.map_err(|e| {
Error::service_error(format!(
"failed to create LoadBalancedChannel, address: {}, err: {}",
host_addr,
e.as_report()
))
})?;
let client = FlightServiceClient::new(channel.into());
Ok(Self {
client,
addr: addr.into(),
Expand Down
Loading