Skip to content

Commit

Permalink
feat(udf): support inlined Rust UDF (#14903)
Browse files Browse the repository at this point in the history
Signed-off-by: Runji Wang <[email protected]>
  • Loading branch information
wangrunji0408 authored Feb 4, 2024
1 parent 5f741c6 commit f5386ca
Show file tree
Hide file tree
Showing 12 changed files with 191 additions and 110 deletions.
166 changes: 84 additions & 82 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ arrow-flight = "50"
arrow-select = "50"
arrow-ord = "50"
arrow-row = "50"
arrow-udf-js = { git = "https://github.com/risingwavelabs/arrow-udf.git", rev = "7ba1c22" }
arrow-udf-wasm = "0.1"
arrow-udf-js = "0.1"
arrow-udf-wasm = { version = "0.1.2", features = ["build"] }
arrow-array-deltalake = { package = "arrow-array", version = "48.0.1" }
arrow-buffer-deltalake = { package = "arrow-buffer", version = "48.0.1" }
arrow-cast-deltalake = { package = "arrow-cast", version = "48.0.1" }
Expand Down
1 change: 1 addition & 0 deletions ci/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ RUN curl -sSL https://install.python-poetry.org | python3 -

# add required rustup components
RUN rustup component add rustfmt llvm-tools-preview clippy
RUN rustup target add wasm32-wasi

ENV CARGO_REGISTRIES_CRATES_IO_PROTOCOL=sparse

Expand Down
2 changes: 1 addition & 1 deletion ci/build-ci-image.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ cat ../rust-toolchain
# shellcheck disable=SC2155

# REMEMBER TO ALSO UPDATE ci/docker-compose.yml
export BUILD_ENV_VERSION=v20240124_1
export BUILD_ENV_VERSION=v20240204

export BUILD_TAG="public.ecr.aws/x5u3w5h6/rw-build-env:${BUILD_ENV_VERSION}"

Expand Down
10 changes: 5 additions & 5 deletions ci/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ services:
retries: 5

source-test-env:
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240124_1
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240204
depends_on:
- mysql
- db
Expand All @@ -81,7 +81,7 @@ services:
- ..:/risingwave

sink-test-env:
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240124_1
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240204
depends_on:
- mysql
- db
Expand All @@ -93,12 +93,12 @@ services:
- ..:/risingwave

rw-build-env:
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240124_1
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240204
volumes:
- ..:/risingwave

ci-flamegraph-env:
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240124_1
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240204
# NOTE(kwannoel): This is used in order to permit
# syscalls for `nperf` (perf_event_open),
# so it can do CPU profiling.
Expand All @@ -109,7 +109,7 @@ services:
- ..:/risingwave

regress-test-env:
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240124_1
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240204
depends_on:
db:
condition: service_healthy
Expand Down
1 change: 0 additions & 1 deletion ci/scripts/build-other.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ source ci/scripts/common.sh

echo "--- Build Rust UDF"
cd e2e_test/udf/wasm
rustup target add wasm32-wasi
cargo build --release
cd ../../..

Expand Down
3 changes: 2 additions & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ COPY rust-toolchain rust-toolchain
RUN rustup self update \
&& rustup set profile minimal \
&& rustup show \
&& rustup component add rustfmt
&& rustup component add rustfmt \
&& rustup target add wasm32-wasi

RUN cargo install flamegraph
# TODO: cargo-chef doesn't work well now, because we update Cargo.lock very often.
Expand Down
3 changes: 2 additions & 1 deletion docker/Dockerfile.hdfs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ RUN unzip dashboard-artifact.zip && mv risingwave-dashboard-artifact /risingwave
RUN rustup self update \
&& rustup set profile minimal \
&& rustup show \
&& rustup component add rustfmt
&& rustup component add rustfmt \
&& rustup target add wasm32-wasi

RUN cargo fetch

Expand Down
21 changes: 21 additions & 0 deletions e2e_test/udf/wasm_udf.slt
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,24 @@ drop function jsonb_access;

statement ok
drop function series;

# inlined rust function
statement ok
create function gcd(int, int) returns int language rust as $$
fn gcd(mut a: i32, mut b: i32) -> i32 {
while b != 0 {
let t = b;
b = a % b;
a = t;
}
a
}
$$;

query I
select gcd(25, 15);
----
5

statement ok
drop function gcd;
2 changes: 1 addition & 1 deletion src/expr/core/src/expr/expr_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ impl Build for UserDefinedFunction {

let identifier = udf.get_identifier()?;
let imp = match udf.language.as_str() {
"wasm" => {
"wasm" | "rust" => {
let link = udf.get_link()?;
// Use `block_in_place` as an escape hatch to run async code here in sync context.
// Calling `block_on` directly will panic.
Expand Down
70 changes: 54 additions & 16 deletions src/frontend/src/handler/create_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ use risingwave_expr::expr::get_or_create_wasm_runtime;
use risingwave_object_store::object::{build_remote_object_store, ObjectStoreConfig};
use risingwave_pb::catalog::function::{Kind, ScalarFunction, TableFunction};
use risingwave_pb::catalog::Function;
use risingwave_sqlparser::ast::{
CreateFunctionBody, FunctionDefinition, ObjectName, OperateFunctionArg,
};
use risingwave_sqlparser::ast::{CreateFunctionBody, ObjectName, OperateFunctionArg};
use risingwave_storage::monitor::ObjectStoreMetrics;
use risingwave_udf::ArrowFlightUdfClient;

Expand Down Expand Up @@ -54,7 +52,7 @@ pub async fn handle_create_function(
Some(lang) => {
let lang = lang.real_value().to_lowercase();
match &*lang {
"python" | "java" | "wasm" | "javascript" => lang,
"python" | "java" | "wasm" | "rust" | "javascript" => lang,
_ => {
return Err(ErrorCode::InvalidParameterValue(format!(
"language {} is not supported",
Expand Down Expand Up @@ -134,12 +132,12 @@ pub async fn handle_create_function(
)
.into());
};
let Some(FunctionDefinition::SingleQuotedDef(id)) = params.as_ else {
let Some(as_) = params.as_ else {
return Err(
ErrorCode::InvalidParameterValue("AS must be specified".to_string()).into(),
);
};
identifier = id;
identifier = as_.into_string();

// check UDF server
{
Expand Down Expand Up @@ -173,16 +171,56 @@ pub async fn handle_create_function(
}
"javascript" => {
identifier = function_name.to_string();
body = Some(match params.as_ {
Some(FunctionDefinition::SingleQuotedDef(s)) => s,
Some(FunctionDefinition::DoubleDollarDef(s)) => s,
_ => {
return Err(ErrorCode::InvalidParameterValue(
"AS must be specified".to_string(),
)
.into())
}
});
body = Some(
params
.as_
.ok_or_else(|| ErrorCode::InvalidParameterValue("AS must be specified".into()))?
.into_string(),
);
}
"rust" => {
identifier = wasm_identifier(
&function_name,
&arg_types,
&return_type,
matches!(kind, Kind::Table(_)),
);
if params.using.is_some() {
return Err(ErrorCode::InvalidParameterValue(
"USING is not supported for rust function".to_string(),
)
.into());
}
let function_body = params
.as_
.ok_or_else(|| ErrorCode::InvalidParameterValue("AS must be specified".into()))?
.into_string();
let script = format!("#[arrow_udf::function(\"{identifier}\")]\n{function_body}");
body = Some(function_body.clone());

let wasm_binary =
tokio::task::spawn_blocking(move || arrow_udf_wasm::build::build("", &script))
.await?
.context("failed to build rust function")?;

// below is the same as `wasm` language
let runtime = arrow_udf_wasm::Runtime::new(&wasm_binary)?;
check_wasm_function(&runtime, &identifier)?;

let system_params = session.env().meta_client().get_system_params().await?;
let object_name = format!("{:?}.wasm", md5::compute(&wasm_binary));
upload_wasm_binary(
system_params.wasm_storage_url(),
&object_name,
wasm_binary.into(),
)
.await?;

link = Some(format!(
"{}/{}",
system_params.wasm_storage_url(),
object_name
));
}
"wasm" => {
identifier = wasm_identifier(
Expand Down
18 changes: 18 additions & 0 deletions src/sqlparser/src/ast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2642,6 +2642,24 @@ impl fmt::Display for FunctionDefinition {
}
}

impl FunctionDefinition {
/// Returns the function definition as a string slice.
pub fn as_str(&self) -> &str {
match self {
FunctionDefinition::SingleQuotedDef(s) => s,
FunctionDefinition::DoubleDollarDef(s) => s,
}
}

/// Returns the function definition as a string.
pub fn into_string(self) -> String {
match self {
FunctionDefinition::SingleQuotedDef(s) => s,
FunctionDefinition::DoubleDollarDef(s) => s,
}
}
}

/// Return types of a function.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
Expand Down

0 comments on commit f5386ca

Please sign in to comment.