Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(udf): support always_retry_on_network_error config for udf functions (#15163) #15303

Merged
merged 2 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ci/scripts/run-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ pkill python3

sqllogictest -p 4566 -d dev './e2e_test/udf/alter_function.slt'
sqllogictest -p 4566 -d dev './e2e_test/udf/graceful_shutdown_python.slt'
sqllogictest -p 4566 -d dev './e2e_test/udf/always_retry_python.slt'
# FIXME: flaky test
# sqllogictest -p 4566 -d dev './e2e_test/udf/retry_python.slt'

Expand Down
2 changes: 1 addition & 1 deletion ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ steps:
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 16
timeout_in_minutes: 18
retry: *auto-retry

- label: "end-to-end test (parallel)"
Expand Down
75 changes: 75 additions & 0 deletions e2e_test/udf/always_retry_python.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
system ok
python3 e2e_test/udf/test.py &

# wait for server to start
sleep 10s

statement ok
CREATE FUNCTION sleep_always_retry(INT) RETURNS INT AS 'sleep' USING LINK 'http://localhost:8815' WITH ( always_retry_on_network_error = true );

statement ok
CREATE FUNCTION sleep_no_retry(INT) RETURNS INT AS 'sleep' USING LINK 'http://localhost:8815';

# Create a table with 30 records
statement ok
CREATE TABLE t (v1 int);

statement ok
INSERT INTO t select 0 from generate_series(1, 30);

statement ok
flush;

statement ok
SET STREAMING_RATE_LIMIT=1;

statement ok
SET BACKGROUND_DDL=true;

statement ok
CREATE MATERIALIZED VIEW mv_no_retry AS SELECT sleep_no_retry(v1) as s1 from t;

# Create a Materialized View
statement ok
CREATE MATERIALIZED VIEW mv_always_retry AS SELECT sleep_always_retry(v1) as s1 from t;

# Immediately kill the server, sleep for 1minute.
system ok
pkill -9 -i python && sleep 60

# Restart the server
system ok
python3 e2e_test/udf/test.py &

# Wait for materialized view to be complete
statement ok
wait;

query I
SELECT count(*) FROM mv_always_retry where s1 is NULL;
----
0

query B
SELECT count(*) > 0 FROM mv_no_retry where s1 is NULL;
----
t

statement ok
SET STREAMING_RATE_LIMIT=0;

statement ok
SET BACKGROUND_DDL=false;

# close the server
system ok
pkill -i python

statement ok
DROP FUNCTION sleep_always_retry;

statement ok
DROP FUNCTION sleep_no_retry;

statement ok
DROP TABLE t CASCADE;
1 change: 1 addition & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ message Function {
optional string link = 8;
optional string identifier = 10;
optional string body = 14;
bool always_retry_on_network_error = 16;

oneof kind {
ScalarFunction scalar = 11;
Expand Down
1 change: 1 addition & 0 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,7 @@ message UserDefinedFunction {
optional string identifier = 6;
// For JavaScript UDF, it's the body of the function.
optional string body = 7;
bool always_retry_on_network_error = 9;
}

// Additional information for user defined table functions.
Expand Down
53 changes: 32 additions & 21 deletions src/expr/core/src/expr/expr_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub struct UserDefinedFunction {
/// On each successful call, the count will be decreased by 1.
/// See <https://github.com/risingwavelabs/risingwave/issues/13791>.
disable_retry_count: AtomicU8,
/// Always retry. Overrides `disable_retry_count`.
always_retry_on_network_error: bool,
}

const INITIAL_RETRY_COUNT: u8 = 16;
Expand Down Expand Up @@ -128,32 +130,40 @@ impl UserDefinedFunction {
UdfImpl::JavaScript(runtime) => runtime.call(&self.identifier, &input)?,
UdfImpl::External(client) => {
let disable_retry_count = self.disable_retry_count.load(Ordering::Relaxed);
let result = if disable_retry_count != 0 {
let result = if self.always_retry_on_network_error {
client
.call(&self.identifier, input)
.call_with_always_retry_on_network_error(&self.identifier, input)
.instrument_await(self.span.clone())
.await
} else {
client
.call_with_retry(&self.identifier, input)
.instrument_await(self.span.clone())
.await
let result = if disable_retry_count != 0 {
client
.call(&self.identifier, input)
.instrument_await(self.span.clone())
.await
} else {
client
.call_with_retry(&self.identifier, input)
.instrument_await(self.span.clone())
.await
};
let disable_retry_count = self.disable_retry_count.load(Ordering::Relaxed);
let connection_error = matches!(&result, Err(e) if e.is_connection_error());
if connection_error && disable_retry_count != INITIAL_RETRY_COUNT {
// reset count on connection error
self.disable_retry_count
.store(INITIAL_RETRY_COUNT, Ordering::Relaxed);
} else if !connection_error && disable_retry_count != 0 {
// decrease count on success, ignore if exchange failed
_ = self.disable_retry_count.compare_exchange(
disable_retry_count,
disable_retry_count - 1,
Ordering::Relaxed,
Ordering::Relaxed,
);
}
result
};
let disable_retry_count = self.disable_retry_count.load(Ordering::Relaxed);
let connection_error = matches!(&result, Err(e) if e.is_connection_error());
if connection_error && disable_retry_count != INITIAL_RETRY_COUNT {
// reset count on connection error
self.disable_retry_count
.store(INITIAL_RETRY_COUNT, Ordering::Relaxed);
} else if !connection_error && disable_retry_count != 0 {
// decrease count on success, ignore if exchange failed
_ = self.disable_retry_count.compare_exchange(
disable_retry_count,
disable_retry_count - 1,
Ordering::Relaxed,
Ordering::Relaxed,
);
}
result?
}
};
Expand Down Expand Up @@ -248,6 +258,7 @@ impl Build for UserDefinedFunction {
identifier: identifier.clone(),
span: format!("udf_call({})", identifier).into(),
disable_retry_count: AtomicU8::new(0),
always_retry_on_network_error: udf.always_retry_on_network_error,
})
}
}
Expand Down
19 changes: 19 additions & 0 deletions src/expr/udf/src/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,25 @@ impl ArrowFlightUdfClient {
unreachable!()
}

/// Always retry on connection error
pub async fn call_with_always_retry_on_network_error(
&self,
id: &str,
input: RecordBatch,
) -> Result<RecordBatch> {
let mut backoff = Duration::from_millis(100);
loop {
match self.call(id, input.clone()).await {
Err(err) if err.is_connection_error() => {
tracing::error!(error = %err.as_report(), "UDF connection error. retry...");
}
ret => return ret,
}
tokio::time::sleep(backoff).await;
backoff *= 2;
}
}

/// Call a function with streaming input and output.
#[panic_return = "Result<stream::Empty<_>>"]
pub async fn call_stream(
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/catalog/function_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub struct FunctionCatalog {
pub identifier: Option<String>,
pub body: Option<String>,
pub link: Option<String>,
pub always_retry_on_network_error: bool,
}

#[derive(Clone, Display, PartialEq, Eq, Hash, Debug)]
Expand Down Expand Up @@ -68,6 +69,7 @@ impl From<&PbFunction> for FunctionCatalog {
identifier: prost.identifier.clone(),
body: prost.body.clone(),
link: prost.link.clone(),
always_retry_on_network_error: prost.always_retry_on_network_error,
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/expr/user_defined_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ impl UserDefinedFunction {
identifier: udf.identifier.clone(),
body: udf.body.clone(),
link: udf.link.clone(),
always_retry_on_network_error: udf.always_retry_on_network_error,
};

Ok(Self {
Expand Down Expand Up @@ -92,6 +93,7 @@ impl Expr for UserDefinedFunction {
identifier: self.catalog.identifier.clone(),
link: self.catalog.link.clone(),
body: self.catalog.body.clone(),
always_retry_on_network_error: self.catalog.always_retry_on_network_error,
})),
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/handler/create_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub async fn handle_create_function(
args: Option<Vec<OperateFunctionArg>>,
returns: Option<CreateFunctionReturns>,
params: CreateFunctionBody,
with_options: CreateFunctionWithOptions,
) -> Result<RwPgResponse> {
if or_replace {
bail_not_implemented!("CREATE OR REPLACE FUNCTION");
Expand Down Expand Up @@ -247,6 +248,9 @@ pub async fn handle_create_function(
link,
body,
owner: session.user_id(),
always_retry_on_network_error: with_options
.always_retry_on_network_error
.unwrap_or_default(),
};

let catalog_writer = session.catalog_writer()?;
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/handler/create_sql_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ pub async fn handle_create_sql_function(
body: Some(body),
link: None,
owner: session.user_id(),
always_retry_on_network_error: false,
};

let catalog_writer = session.catalog_writer()?;
Expand Down
2 changes: 2 additions & 0 deletions src/frontend/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ pub async fn handle(
args,
returns,
params,
with_options,
} => {
// For general udf, `language` clause could be ignored
// refer: https://github.com/risingwavelabs/risingwave/pull/10608
Expand All @@ -226,6 +227,7 @@ pub async fn handle(
args,
returns,
params,
with_options,
)
.await
} else {
Expand Down
6 changes: 6 additions & 0 deletions src/meta/model_v2/migration/src/m20230908_072257_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,11 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Function::Identifier).string())
.col(ColumnDef::new(Function::Body).string())
.col(ColumnDef::new(Function::Kind).string().not_null())
.col(
ColumnDef::new(Function::AlwaysRetryOnNetworkError)
.boolean()
.not_null(),
)
.foreign_key(
&mut ForeignKey::create()
.name("FK_function_object_id")
Expand Down Expand Up @@ -1113,6 +1118,7 @@ enum Function {
Identifier,
Body,
Kind,
AlwaysRetryOnNetworkError,
}

#[derive(DeriveIden)]
Expand Down
2 changes: 2 additions & 0 deletions src/meta/model_v2/src/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub struct Model {
pub identifier: Option<String>,
pub body: Option<String>,
pub kind: FunctionKind,
pub always_retry_on_network_error: bool,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down Expand Up @@ -100,6 +101,7 @@ impl From<PbFunction> for ActiveModel {
identifier: Set(function.identifier),
body: Set(function.body),
kind: Set(function.kind.unwrap().into()),
always_retry_on_network_error: Set(function.always_retry_on_network_error),
}
}
}
1 change: 1 addition & 0 deletions src/meta/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ impl From<ObjectModel<function::Model>> for PbFunction {
identifier: value.0.identifier,
body: value.0.body,
kind: Some(value.0.kind.into()),
always_retry_on_network_error: value.0.always_retry_on_network_error,
}
}
}
Loading
Loading