Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into rust-1.78
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Jun 11, 2024
2 parents 51eaef0 + fa95f1b commit 9c31ff1
Show file tree
Hide file tree
Showing 12 changed files with 232 additions and 41 deletions.
6 changes: 6 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,12 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_XXHASH64_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.xxhash64.enabled")
.doc("The xxhash64 implementation is not optimized yet and may cause performance issues.")
.booleanConf
.createWithDefault(false)

}

object ConfigHelpers {
Expand Down
36 changes: 24 additions & 12 deletions core/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ itertools = "0.11.0"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
chrono-tz = { version = "0.8" }
paste = "1.0.14"
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", tag = "39.0.0-rc1" }
datafusion = { default-features = false, git = "https://github.com/apache/arrow-datafusion.git", tag = "39.0.0-rc1", features = ["unicode_expressions", "crypto_expressions"] }
datafusion-functions = { git = "https://github.com/apache/arrow-datafusion.git", tag = "39.0.0-rc1", features = ["crypto_expressions"] }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", tag = "39.0.0-rc1", default-features = false }
datafusion-physical-expr-common = { git = "https://github.com/apache/arrow-datafusion.git", tag = "39.0.0-rc1", default-features = false }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", tag = "39.0.0-rc1", default-features = false }
datafusion-common = { version = "39.0.0" }
datafusion = { default-features = false, version = "39.0.0", features = ["unicode_expressions", "crypto_expressions"] }
datafusion-functions = { version = "39.0.0", features = ["crypto_expressions"] }
datafusion-expr = { version = "39.0.0", default-features = false }
datafusion-physical-expr-common = { version = "39.0.0", default-features = false }
datafusion-physical-expr = { version = "39.0.0", default-features = false }
unicode-segmentation = "^1.10.1"
once_cell = "1.18.0"
regex = "1.9.6"
Expand Down
16 changes: 13 additions & 3 deletions core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,10 @@ pub enum CometError {
#[error("{msg}")]
Panic { msg: String },

#[error(transparent)]
#[error("{msg}")]
DataFusion {
#[from]
msg: String,
#[source]
source: DataFusionError,
},

Expand Down Expand Up @@ -185,10 +186,19 @@ impl convert::From<Box<dyn Any + Send>> for CometError {
}
}

impl From<DataFusionError> for CometError {
fn from(value: DataFusionError) -> Self {
CometError::DataFusion {
msg: value.message().to_string(),
source: value,
}
}
}

impl From<CometError> for DataFusionError {
fn from(value: CometError) -> Self {
match value {
CometError::DataFusion { source } => source,
CometError::DataFusion { msg: _, source } => source,
_ => DataFusionError::Execution(value.to_string()),
}
}
Expand Down
7 changes: 7 additions & 0 deletions core/src/execution/datafusion/expressions/scalar_funcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ use unhex::spark_unhex;
mod hex;
use hex::spark_hex;

mod chr;
use chr::spark_chr;

macro_rules! make_comet_scalar_udf {
($name:expr, $func:ident, $data_type:ident) => {{
let scalar_func = CometScalarFunction::new(
Expand Down Expand Up @@ -123,6 +126,10 @@ pub fn create_comet_physical_fun(
let func = Arc::new(spark_xxhash64);
make_comet_scalar_udf!("xxhash64", func, without data_type)
}
"chr" => {
let func = Arc::new(spark_chr);
make_comet_scalar_udf!("chr", func, without data_type)
}
sha if sha2_functions.contains(&sha) => {
// Spark requires hex string as the result of sha2 functions, we have to wrap the
// result of digest functions as hex string
Expand Down
121 changes: 121 additions & 0 deletions core/src/execution/datafusion/expressions/scalar_funcs/chr.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::{any::Any, sync::Arc};

use arrow::{
array::{ArrayRef, StringArray},
datatypes::{
DataType,
DataType::{Int64, Utf8},
},
};

use datafusion::logical_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
use datafusion_common::{cast::as_int64_array, exec_err, DataFusionError, Result, ScalarValue};

/// Returns the ASCII character having the binary equivalent to the input expression.
/// E.g., chr(65) = 'A'.
/// Compatible with Apache Spark's Chr function
pub fn spark_chr(args: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
let chr_func = ChrFunc::default();
chr_func.invoke(args)
}

pub fn chr(args: &[ArrayRef]) -> Result<ArrayRef> {
let integer_array = as_int64_array(&args[0])?;

// first map is the iterator, second is for the `Option<_>`
let result = integer_array
.iter()
.map(|integer: Option<i64>| {
integer
.map(|integer| match core::char::from_u32(integer as u32) {
Some(integer) => Ok(integer.to_string()),
None => {
exec_err!("requested character too large for encoding.")
}
})
.transpose()
})
.collect::<Result<StringArray>>()?;

Ok(Arc::new(result) as ArrayRef)
}

#[derive(Debug)]
pub struct ChrFunc {
signature: Signature,
}

impl Default for ChrFunc {
fn default() -> Self {
Self::new()
}
}

impl ChrFunc {
pub fn new() -> Self {
Self {
signature: Signature::uniform(1, vec![Int64], Volatility::Immutable),
}
}
}

impl ScalarUDFImpl for ChrFunc {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"chr"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(Utf8)
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
handle_chr_fn(args)
}
}

fn handle_chr_fn(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let array = args[0].clone();
match array {
ColumnarValue::Array(array) => {
let array = chr(&[array])?;
Ok(ColumnarValue::Array(array))
}
ColumnarValue::Scalar(ScalarValue::Int64(Some(value))) => {
match core::char::from_u32(value as u32) {
Some(ch) => Ok(ColumnarValue::Scalar(ScalarValue::Utf8(Some(
ch.to_string(),
)))),
None => exec_err!("requested character too large for encoding."),
}
}
ColumnarValue::Scalar(ScalarValue::Int64(None)) => {
Ok(ColumnarValue::Scalar(ScalarValue::Utf8(None)))
}
_ => exec_err!("The argument must be an Int64 array or scalar."),
}
}
11 changes: 10 additions & 1 deletion core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1391,7 +1391,7 @@ impl PhysicalPlanner {

impl From<DataFusionError> for ExecutionError {
fn from(value: DataFusionError) -> Self {
ExecutionError::DataFusionError(value.to_string())
ExecutionError::DataFusionError(value.message().to_string())
}
}

Expand Down Expand Up @@ -1563,6 +1563,7 @@ mod tests {
spark_operator,
};

use crate::execution::operators::ExecutionError;
use spark_expression::expr::ExprStruct::*;
use spark_operator::{operator::OpStruct, Operator};

Expand Down Expand Up @@ -1752,6 +1753,14 @@ mod tests {
assert!(output.is_empty());
}

#[tokio::test()]
async fn from_datafusion_error_to_comet() {
let err_msg = "exec error";
let err = datafusion_common::DataFusionError::Execution(err_msg.to_string());
let comet_err: ExecutionError = err.into();
assert_eq!(comet_err.to_string(), "Error from DataFusion: exec error.");
}

// Creates a filter operator which takes an `Int32Array` and selects rows that are equal to
// `value`.
fn create_filter(child_op: spark_operator::Operator, value: i32) -> spark_operator::Operator {
Expand Down
8 changes: 4 additions & 4 deletions core/src/execution/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,19 @@ pub use copy::*;
pub enum ExecutionError {
/// Simple error
#[allow(dead_code)]
#[error("General execution error with reason {0}.")]
#[error("General execution error with reason: {0}.")]
GeneralError(String),

/// Error when deserializing an operator.
#[error("Fail to deserialize to native operator with reason {0}.")]
#[error("Fail to deserialize to native operator with reason: {0}.")]
DeserializeError(String),

/// Error when processing Arrow array.
#[error("Fail to process Arrow array with reason {0}.")]
#[error("Fail to process Arrow array with reason: {0}.")]
ArrowError(String),

/// DataFusion error
#[error("Error from DataFusion {0}.")]
#[error("Error from DataFusion: {0}.")]
DataFusionError(String),
}

Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,4 @@ Comet provides the following configuration settings.
| spark.comet.scan.preFetch.enabled | Whether to enable pre-fetching feature of CometScan. By default is disabled. | false |
| spark.comet.scan.preFetch.threadNum | The number of threads running pre-fetching for CometScan. Effective if spark.comet.scan.preFetch.enabled is enabled. By default it is 2. Note that more pre-fetching threads means more memory requirement to store pre-fetched row groups. | 2 |
| spark.comet.shuffle.preferDictionary.ratio | The ratio of total values to distinct values in a string column to decide whether to prefer dictionary encoding when shuffling the column. If the ratio is higher than this config, dictionary encoding will be used on shuffling string column. This config is effective if it is higher than 1.0. By default, this config is 10.0. Note that this config is only used when 'spark.comet.columnar.shuffle.enabled' is true. | 10.0 |
| spark.comet.xxhash64.enabled | The xxhash64 implementation is not optimized yet and may cause performance issues. | false |
Loading

0 comments on commit 9c31ff1

Please sign in to comment.