Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(udf): support always_retry_on_network_error config for udf functions #15163

Merged
merged 11 commits into from
Feb 27, 2024
Merged
1 change: 1 addition & 0 deletions ci/scripts/run-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ pkill python3

sqllogictest -p 4566 -d dev './e2e_test/udf/alter_function.slt'
sqllogictest -p 4566 -d dev './e2e_test/udf/graceful_shutdown_python.slt'
sqllogictest -p 4566 -d dev './e2e_test/udf/always_retry_python.slt'
# FIXME: flaky test
# sqllogictest -p 4566 -d dev './e2e_test/udf/retry_python.slt'

Expand Down
75 changes: 75 additions & 0 deletions e2e_test/udf/always_retry_python.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
system ok
python3 e2e_test/udf/test.py &

# wait for server to start
sleep 10s

statement ok
CREATE FUNCTION sleep_always_retry(INT) RETURNS INT AS 'sleep' USING LINK 'http://localhost:8815' WITH ( always_retry_on_network_error = true );

statement ok
CREATE FUNCTION sleep_no_retry(INT) RETURNS INT AS 'sleep' USING LINK 'http://localhost:8815';

# Create a table with 30 records
statement ok
CREATE TABLE t (v1 int);

statement ok
INSERT INTO t select 0 from generate_series(1, 30);

statement ok
flush;

statement ok
SET STREAMING_RATE_LIMIT=1;

statement ok
SET BACKGROUND_DDL=true;

statement ok
CREATE MATERIALIZED VIEW mv_no_retry AS SELECT sleep_no_retry(v1) as s1 from t;

# Create a Materialized View
statement ok
CREATE MATERIALIZED VIEW mv_always_retry AS SELECT sleep_always_retry(v1) as s1 from t;

# Immediately kill the server, sleep for 1minute.
system ok
pkill -9 -i python && sleep 60

# Restart the server
system ok
python3 e2e_test/udf/test.py &

# Wait for materialized view to be complete
statement ok
wait;

query I
SELECT count(*) FROM mv_always_retry where s1 is NULL;
----
0

query B
SELECT count(*) > 0 FROM mv_no_retry where s1 is NULL;
----
t

statement ok
SET STREAMING_RATE_LIMIT=0;

statement ok
SET BACKGROUND_DDL=false;

# close the server
system ok
pkill -i python

statement ok
DROP FUNCTION sleep_always_retry;

statement ok
DROP FUNCTION sleep_no_retry;

statement ok
DROP TABLE t CASCADE;
1 change: 1 addition & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ message Function {
optional string link = 8;
optional string identifier = 10;
optional string body = 14;
bool always_retry_on_network_error = 16;

oneof kind {
ScalarFunction scalar = 11;
Expand Down
1 change: 1 addition & 0 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ message UserDefinedFunction {
optional string identifier = 6;
// For JavaScript UDF, it's the body of the function.
optional string body = 7;
bool always_retry_on_network_error = 9;
}

// Additional information for user defined table functions.
Expand Down
53 changes: 32 additions & 21 deletions src/expr/core/src/expr/expr_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ pub struct UserDefinedFunction {
/// On each successful call, the count will be decreased by 1.
/// See <https://github.com/risingwavelabs/risingwave/issues/13791>.
disable_retry_count: AtomicU8,
/// Always retry. Overrides `disable_retry_count`.
always_retry_on_network_error: bool,
}

const INITIAL_RETRY_COUNT: u8 = 16;
Expand Down Expand Up @@ -111,32 +113,40 @@ impl UserDefinedFunction {
UdfImpl::JavaScript(runtime) => runtime.call(&self.identifier, &arrow_input)?,
UdfImpl::External(client) => {
let disable_retry_count = self.disable_retry_count.load(Ordering::Relaxed);
let result = if disable_retry_count != 0 {
let result = if self.always_retry_on_network_error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we put all disable_retry_count related stuff in else branch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm no difference to me? Seems to add more nesting.

Copy link
Member

@xxchan xxchan Feb 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think mixing them together can lead to confusion. Why do we still update disable_retry_count when always_retry_on_network_error?

If nesting doesn't look good, we can add a sth like call_with_ disable_retry_count...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I didn't get what you meant originally. Now I do. Updated it.

client
.call(&self.identifier, arrow_input)
.call_with_always_retry_on_network_error(&self.identifier, arrow_input)
.instrument_await(self.span.clone())
.await
} else {
client
.call_with_retry(&self.identifier, arrow_input)
.instrument_await(self.span.clone())
.await
let result = if disable_retry_count != 0 {
client
.call(&self.identifier, arrow_input)
.instrument_await(self.span.clone())
.await
} else {
client
.call_with_retry(&self.identifier, arrow_input)
.instrument_await(self.span.clone())
.await
};
let disable_retry_count = self.disable_retry_count.load(Ordering::Relaxed);
let connection_error = matches!(&result, Err(e) if e.is_connection_error());
if connection_error && disable_retry_count != INITIAL_RETRY_COUNT {
// reset count on connection error
self.disable_retry_count
.store(INITIAL_RETRY_COUNT, Ordering::Relaxed);
} else if !connection_error && disable_retry_count != 0 {
// decrease count on success, ignore if exchange failed
_ = self.disable_retry_count.compare_exchange(
disable_retry_count,
disable_retry_count - 1,
Ordering::Relaxed,
Ordering::Relaxed,
);
}
result
};
let disable_retry_count = self.disable_retry_count.load(Ordering::Relaxed);
let connection_error = matches!(&result, Err(e) if e.is_connection_error());
if connection_error && disable_retry_count != INITIAL_RETRY_COUNT {
// reset count on connection error
self.disable_retry_count
.store(INITIAL_RETRY_COUNT, Ordering::Relaxed);
} else if !connection_error && disable_retry_count != 0 {
// decrease count on success, ignore if exchange failed
_ = self.disable_retry_count.compare_exchange(
disable_retry_count,
disable_retry_count - 1,
Ordering::Relaxed,
Ordering::Relaxed,
);
}
result?
}
};
Expand Down Expand Up @@ -234,6 +244,7 @@ impl Build for UserDefinedFunction {
identifier: identifier.clone(),
span: format!("udf_call({})", identifier).into(),
disable_retry_count: AtomicU8::new(0),
always_retry_on_network_error: udf.always_retry_on_network_error,
})
}
}
Expand Down
19 changes: 19 additions & 0 deletions src/expr/udf/src/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,25 @@ impl ArrowFlightUdfClient {
unreachable!()
}

/// Always retry on connection error
pub async fn call_with_always_retry_on_network_error(
&self,
id: &str,
input: RecordBatch,
) -> Result<RecordBatch> {
let mut backoff = Duration::from_millis(100);
loop {
match self.call(id, input.clone()).await {
Err(err) if err.is_connection_error() => {
tracing::error!(error = %err.as_report(), "UDF connection error. retry...");
}
ret => return ret,
}
tokio::time::sleep(backoff).await;
backoff *= 2;
}
}

/// Call a function with streaming input and output.
#[panic_return = "Result<stream::Empty<_>>"]
pub async fn call_stream(
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/catalog/function_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub struct FunctionCatalog {
pub identifier: Option<String>,
pub body: Option<String>,
pub link: Option<String>,
pub always_retry_on_network_error: bool,
}

#[derive(Clone, Display, PartialEq, Eq, Hash, Debug)]
Expand Down Expand Up @@ -68,6 +69,7 @@ impl From<&PbFunction> for FunctionCatalog {
identifier: prost.identifier.clone(),
body: prost.body.clone(),
link: prost.link.clone(),
always_retry_on_network_error: prost.always_retry_on_network_error,
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/expr/user_defined_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ impl UserDefinedFunction {
identifier: udf.identifier.clone(),
body: udf.body.clone(),
link: udf.link.clone(),
always_retry_on_network_error: udf.always_retry_on_network_error,
};

Ok(Self {
Expand Down Expand Up @@ -92,6 +93,7 @@ impl Expr for UserDefinedFunction {
identifier: self.catalog.identifier.clone(),
link: self.catalog.link.clone(),
body: self.catalog.body.clone(),
always_retry_on_network_error: self.catalog.always_retry_on_network_error,
})),
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/handler/create_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub async fn handle_create_function(
args: Option<Vec<OperateFunctionArg>>,
returns: Option<CreateFunctionReturns>,
params: CreateFunctionBody,
with_options: CreateFunctionWithOptions,
) -> Result<RwPgResponse> {
if or_replace {
bail_not_implemented!("CREATE OR REPLACE FUNCTION");
Expand Down Expand Up @@ -285,6 +286,9 @@ pub async fn handle_create_function(
link,
body,
owner: session.user_id(),
always_retry_on_network_error: with_options
.always_retry_on_network_error
.unwrap_or_default(),
};

let catalog_writer = session.catalog_writer()?;
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/handler/create_sql_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ pub async fn handle_create_sql_function(
body: Some(body),
link: None,
owner: session.user_id(),
always_retry_on_network_error: false,
};

let catalog_writer = session.catalog_writer()?;
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ pub async fn handle(
args,
returns,
params,
with_options,
} => {
// For general udf, `language` clause could be ignored
// refer: https://github.com/risingwavelabs/risingwave/pull/10608
Expand All @@ -266,6 +267,7 @@ pub async fn handle(
args,
returns,
params,
with_options,
)
.await
} else {
Expand Down
6 changes: 6 additions & 0 deletions src/meta/model_v2/migration/src/m20230908_072257_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,11 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Function::Identifier).string())
.col(ColumnDef::new(Function::Body).string())
.col(ColumnDef::new(Function::Kind).string().not_null())
.col(
ColumnDef::new(Function::AlwaysRetryOnNetworkError)
.boolean()
.not_null(),
)
.foreign_key(
&mut ForeignKey::create()
.name("FK_function_object_id")
Expand Down Expand Up @@ -1124,6 +1129,7 @@ enum Function {
Identifier,
Body,
Kind,
AlwaysRetryOnNetworkError,
}

#[derive(DeriveIden)]
Expand Down
2 changes: 2 additions & 0 deletions src/meta/model_v2/src/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub struct Model {
pub identifier: Option<String>,
pub body: Option<String>,
pub kind: FunctionKind,
pub always_retry_on_network_error: bool,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down Expand Up @@ -100,6 +101,7 @@ impl From<PbFunction> for ActiveModel {
identifier: Set(function.identifier),
body: Set(function.body),
kind: Set(function.kind.unwrap().into()),
always_retry_on_network_error: Set(function.always_retry_on_network_error),
}
}
}
1 change: 1 addition & 0 deletions src/meta/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ impl From<ObjectModel<function::Model>> for PbFunction {
identifier: value.0.identifier,
body: value.0.body,
kind: Some(value.0.kind.into()),
always_retry_on_network_error: value.0.always_retry_on_network_error,
}
}
}
Loading
Loading