From 799406947c0250807695d447d41c0875b944238c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 24 May 2024 07:18:30 -0600 Subject: [PATCH 01/13] Add test and proto support for RLike --- .../scala/org/apache/comet/CometConf.scala | 8 ++ .../execution/datafusion/expressions/mod.rs | 1 + .../datafusion/expressions/regexp.rs | 94 +++++++++++++++++++ core/src/execution/datafusion/planner.rs | 7 ++ core/src/execution/proto/expr.proto | 10 +- docs/source/user-guide/configs.md | 1 + .../apache/comet/serde/QueryPlanSerde.scala | 48 ++++++---- .../apache/comet/CometExpressionSuite.scala | 17 ++++ 8 files changed, 163 insertions(+), 23 deletions(-) create mode 100644 core/src/execution/datafusion/expressions/regexp.rs diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 5aee02f11..be6df52f3 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -398,6 +398,14 @@ object CometConf { .booleanConf .createWithDefault(false) + val COMET_REGEXP_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] = + conf("spark.comet.regexp.allowIncompatible") + .doc("Comet is not currently fully compatible with Spark for all regular expressions. " + + "Set this config to true to allow them anyway using Rust's regular expression engine. " + + "See compatibility guide for more information.") + .booleanConf + .createWithDefault(false) + } object ConfigHelpers { diff --git a/core/src/execution/datafusion/expressions/mod.rs b/core/src/execution/datafusion/expressions/mod.rs index 9db4b65b3..56908dc46 100644 --- a/core/src/execution/datafusion/expressions/mod.rs +++ b/core/src/execution/datafusion/expressions/mod.rs @@ -29,6 +29,7 @@ pub mod avg_decimal; pub mod bloom_filter_might_contain; pub mod correlation; pub mod covariance; +pub mod regexp; pub mod stats; pub mod stddev; pub mod strings; diff --git a/core/src/execution/datafusion/expressions/regexp.rs b/core/src/execution/datafusion/expressions/regexp.rs new file mode 100644 index 000000000..683d0a4ae --- /dev/null +++ b/core/src/execution/datafusion/expressions/regexp.rs @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::execution::datafusion::expressions::{ + strings::{StringSpaceExec, SubstringExec}, + utils::down_cast_any_ref, +}; +use arrow_array::RecordBatch; +use arrow_schema::{DataType, Schema}; +use datafusion::logical_expr::ColumnarValue; +use datafusion_physical_expr::PhysicalExpr; +use std::{ + any::Any, + fmt::{Display, Formatter}, + hash::Hasher, + sync::Arc, +}; + +#[derive(Debug, Hash)] +pub struct RLike { + child: Arc, + pattern: Arc, +} + +impl RLike { + pub fn new(child: Arc, pattern: Arc) -> Self { + Self { child, pattern } + } +} + +impl Display for RLike { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "RLike [child: {}, pattern: {}] ", + self.child, self.pattern + ) + } +} + +impl PartialEq for RLike { + fn eq(&self, other: &dyn Any) -> bool { + todo!() + } +} + +impl PhysicalExpr for RLike { + fn as_any(&self) -> &dyn Any { + todo!() + } + + fn data_type(&self, input_schema: &Schema) -> datafusion_common::Result { + todo!() + } + + fn nullable(&self, input_schema: &Schema) -> datafusion_common::Result { + todo!() + } + + fn evaluate(&self, batch: &RecordBatch) -> datafusion_common::Result { + todo!() + } + + fn children(&self) -> Vec> { + todo!() + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> datafusion_common::Result> { + todo!() + } + + fn dyn_hash(&self, _state: &mut dyn Hasher) { + todo!() + } +} diff --git a/core/src/execution/datafusion/planner.rs b/core/src/execution/datafusion/planner.rs index 01d892381..a39bc1615 100644 --- a/core/src/execution/datafusion/planner.rs +++ b/core/src/execution/datafusion/planner.rs @@ -70,6 +70,7 @@ use crate::{ correlation::Correlation, covariance::Covariance, if_expr::IfExpr, + regexp::RLike, scalar_funcs::create_comet_physical_fun, stats::StatsType, stddev::Stddev, @@ -432,6 +433,12 @@ impl PhysicalPlanner { Ok(Arc::new(Like::new(left, right))) } + ExprStruct::Rlike(expr) => { + let left = self.create_expr(expr.left.as_ref().unwrap(), input_schema.clone())?; + let right = self.create_expr(expr.right.as_ref().unwrap(), input_schema)?; + + Ok(Arc::new(RLike::new(left, right))) + } ExprStruct::CheckOverflow(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema)?; let data_type = to_arrow_datatype(expr.datatype.as_ref().unwrap()); diff --git a/core/src/execution/proto/expr.proto b/core/src/execution/proto/expr.proto index be85e8a92..3608e90a5 100644 --- a/core/src/execution/proto/expr.proto +++ b/core/src/execution/proto/expr.proto @@ -54,7 +54,7 @@ message Expr { StartsWith startsWith = 27; EndsWith endsWith = 28; Contains contains = 29; - // RLike rlike = 30; + RLike rlike = 30; ScalarFunc scalarFunc = 31; EqualNullSafe eqNullSafe = 32; NotEqualNullSafe neqNullSafe = 33; @@ -368,10 +368,10 @@ message Like { Expr right = 2; } -// message RLike { -// Expr left = 1; -// Expr right = 2; -// } + message RLike { + Expr left = 1; + Expr right = 2; + } message StartsWith { Expr left = 1; diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index eb349b349..4cd96338b 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -44,6 +44,7 @@ Comet provides the following configuration settings. | spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB. | 402653184b | | spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false | | spark.comet.parquet.enable.directBuffer | Whether to use Java direct byte buffer when reading Parquet. By default, this is false | false | +| spark.comet.regexp.allowIncompatible | Comet is not currently fully compatible with Spark for all regular expressions. Set this config to true to allow them anyway using Rust's regular expression engine. See compatibility guide for more information. | false | | spark.comet.rowToColumnar.supportedOperatorList | A comma-separated list of row-based operators that will be converted to columnar format when 'spark.comet.rowToColumnar.enabled' is true | Range,InMemoryTableScan | | spark.comet.scan.enabled | Whether to enable Comet scan. When this is turned on, Spark will use Comet to read Parquet data source. Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. By default, this config is true. | true | | spark.comet.scan.preFetch.enabled | Whether to enable pre-fetching feature of CometScan. By default is disabled. | false | diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 6333650dd..63e804fa1 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -1094,24 +1094,36 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim None } - // TODO waiting for arrow-rs update -// case RLike(left, right) => -// val leftExpr = exprToProtoInternal(left, inputs) -// val rightExpr = exprToProtoInternal(right, inputs) -// -// if (leftExpr.isDefined && rightExpr.isDefined) { -// val builder = ExprOuterClass.RLike.newBuilder() -// builder.setLeft(leftExpr.get) -// builder.setRight(rightExpr.get) -// -// Some( -// ExprOuterClass.Expr -// .newBuilder() -// .setRlike(builder) -// .build()) -// } else { -// None -// } + case RLike(left, right) => + // for now, we assume that all regular expressions are incompatible with Spark but + // later we can add logic to determine if a pattern will produce the same results + // in Rust, or even transpile the pattern to work around differences between the JVM + // and Rust regular expression engines + if (CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.get()) { + val leftExpr = exprToProtoInternal(left, inputs) + val rightExpr = exprToProtoInternal(right, inputs) + + if (leftExpr.isDefined && rightExpr.isDefined) { + val builder = ExprOuterClass.RLike.newBuilder() + builder.setLeft(leftExpr.get) + builder.setRight(rightExpr.get) + + Some( + ExprOuterClass.Expr + .newBuilder() + .setRlike(builder) + .build()) + } else { + withInfo(expr, left, right) + None + } + } else { + withInfo( + expr, + "Regular expressions are disabled. " + + s"Set ${CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key}=true to enable them.") + None + } case StartsWith(left, right) => val leftExpr = exprToProtoInternal(left, inputs) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 6ca4baf60..44c3ca6b4 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -588,6 +588,23 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } + test("rlike") { + val table = "rlike_names" + withTable(table) { + sql(s"create table $table(id int, name varchar(20)) using parquet") + sql(s"insert into $table values(1,'James Smith')") + sql(s"insert into $table values(2,'Michael Rose')") + sql(s"insert into $table values(3,'Robert Williams')") + sql(s"insert into $table values(4,'Rames Rose')") + sql(s"insert into $table values(5,'Rames rose')") + + withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") { + val query = sql(s"select id from $table where name rlike 'R[a-z]+s [Rr]ose'") + checkSparkAnswerAndOperator(query) + } + } + } + test("contains") { assume(!isSpark32) From 2d31bc861e5022cc7cbfb70d03dc4c4dd2e798ad Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 24 May 2024 07:29:55 -0600 Subject: [PATCH 02/13] boilerplate --- .../datafusion/expressions/regexp.rs | 25 +++++++++++++------ 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/core/src/execution/datafusion/expressions/regexp.rs b/core/src/execution/datafusion/expressions/regexp.rs index 683d0a4ae..67c2d6c35 100644 --- a/core/src/execution/datafusion/expressions/regexp.rs +++ b/core/src/execution/datafusion/expressions/regexp.rs @@ -56,21 +56,24 @@ impl Display for RLike { impl PartialEq for RLike { fn eq(&self, other: &dyn Any) -> bool { - todo!() + down_cast_any_ref(other) + .downcast_ref::() + .map(|x| self.child.eq(&x.child) && self.pattern.eq(&x.pattern)) + .unwrap_or(false) } } impl PhysicalExpr for RLike { fn as_any(&self) -> &dyn Any { - todo!() + self } fn data_type(&self, input_schema: &Schema) -> datafusion_common::Result { - todo!() + self.child.data_type(input_schema) } fn nullable(&self, input_schema: &Schema) -> datafusion_common::Result { - todo!() + self.child.nullable(input_schema) } fn evaluate(&self, batch: &RecordBatch) -> datafusion_common::Result { @@ -78,17 +81,23 @@ impl PhysicalExpr for RLike { } fn children(&self) -> Vec> { - todo!() + vec![self.child.clone()] } fn with_new_children( self: Arc, children: Vec>, ) -> datafusion_common::Result> { - todo!() + assert!(children.len() == 2); + Ok(Arc::new(RLike::new( + children[0].clone(), + children[1].clone(), + ))) } - fn dyn_hash(&self, _state: &mut dyn Hasher) { - todo!() + fn dyn_hash(&self, state: &mut dyn Hasher) { + use std::hash::Hash; + let mut s = state; + self.hash(&mut s); } } From 28169e74d84bb8ad13f07178336d275a025f3fb6 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 24 May 2024 07:43:39 -0600 Subject: [PATCH 03/13] test passes --- .../datafusion/expressions/regexp.rs | 45 +++++++++++++++---- 1 file changed, 37 insertions(+), 8 deletions(-) diff --git a/core/src/execution/datafusion/expressions/regexp.rs b/core/src/execution/datafusion/expressions/regexp.rs index 67c2d6c35..0b035d5da 100644 --- a/core/src/execution/datafusion/expressions/regexp.rs +++ b/core/src/execution/datafusion/expressions/regexp.rs @@ -17,14 +17,13 @@ * under the License. */ -use crate::execution::datafusion::expressions::{ - strings::{StringSpaceExec, SubstringExec}, - utils::down_cast_any_ref, -}; -use arrow_array::RecordBatch; +use crate::execution::datafusion::expressions::utils::down_cast_any_ref; +use arrow_array::{builder::BooleanBuilder, Array, RecordBatch, StringArray}; use arrow_schema::{DataType, Schema}; use datafusion::logical_expr::ColumnarValue; +use datafusion_common::ScalarValue; use datafusion_physical_expr::PhysicalExpr; +use regex::Regex; use std::{ any::Any, fmt::{Display, Formatter}, @@ -68,8 +67,8 @@ impl PhysicalExpr for RLike { self } - fn data_type(&self, input_schema: &Schema) -> datafusion_common::Result { - self.child.data_type(input_schema) + fn data_type(&self, _input_schema: &Schema) -> datafusion_common::Result { + Ok(DataType::Boolean) } fn nullable(&self, input_schema: &Schema) -> datafusion_common::Result { @@ -77,7 +76,37 @@ impl PhysicalExpr for RLike { } fn evaluate(&self, batch: &RecordBatch) -> datafusion_common::Result { - todo!() + if let ColumnarValue::Array(v) = self.child.evaluate(batch)? { + if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(pattern))) = + self.pattern.evaluate(batch)? + { + // TODO cache Regex across invocations of evaluate or create it in constructor + let re = Regex::new(&pattern).unwrap(); // TODO error handling + let inputs = v + .as_any() + .downcast_ref::() + .expect("string array"); + let mut builder = BooleanBuilder::with_capacity(inputs.len()); + if inputs.is_nullable() { + for i in 0..inputs.len() { + if inputs.is_null(i) { + builder.append_null(); + } else { + builder.append_value(re.is_match(inputs.value(i))); + } + } + } else { + for i in 0..inputs.len() { + builder.append_value(re.is_match(inputs.value(i))); + } + } + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) + } else { + todo!() + } + } else { + todo!() + } } fn children(&self) -> Vec> { From b3f47b78a102f0d91060219971d2db83bb9b3994 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 24 May 2024 07:54:23 -0600 Subject: [PATCH 04/13] error handling --- .../datafusion/expressions/regexp.rs | 50 +++++++++++-------- 1 file changed, 30 insertions(+), 20 deletions(-) diff --git a/core/src/execution/datafusion/expressions/regexp.rs b/core/src/execution/datafusion/expressions/regexp.rs index 0b035d5da..cd6f6cd58 100644 --- a/core/src/execution/datafusion/expressions/regexp.rs +++ b/core/src/execution/datafusion/expressions/regexp.rs @@ -17,7 +17,7 @@ * under the License. */ -use crate::execution::datafusion::expressions::utils::down_cast_any_ref; +use crate::{errors::CometError, execution::datafusion::expressions::utils::down_cast_any_ref}; use arrow_array::{builder::BooleanBuilder, Array, RecordBatch, StringArray}; use arrow_schema::{DataType, Schema}; use datafusion::logical_expr::ColumnarValue; @@ -80,32 +80,42 @@ impl PhysicalExpr for RLike { if let ColumnarValue::Scalar(ScalarValue::Utf8(Some(pattern))) = self.pattern.evaluate(batch)? { - // TODO cache Regex across invocations of evaluate or create it in constructor - let re = Regex::new(&pattern).unwrap(); // TODO error handling - let inputs = v - .as_any() - .downcast_ref::() - .expect("string array"); - let mut builder = BooleanBuilder::with_capacity(inputs.len()); - if inputs.is_nullable() { - for i in 0..inputs.len() { - if inputs.is_null(i) { - builder.append_null(); + // TODO cache Regex across invocations of evaluate() or create it in constructor + match Regex::new(&pattern) { + Ok(re) => { + let inputs = v + .as_any() + .downcast_ref::() + .expect("string array"); + let mut builder = BooleanBuilder::with_capacity(inputs.len()); + if inputs.is_nullable() { + for i in 0..inputs.len() { + if inputs.is_null(i) { + builder.append_null(); + } else { + builder.append_value(re.is_match(inputs.value(i))); + } + } } else { - builder.append_value(re.is_match(inputs.value(i))); + for i in 0..inputs.len() { + builder.append_value(re.is_match(inputs.value(i))); + } } + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) } - } else { - for i in 0..inputs.len() { - builder.append_value(re.is_match(inputs.value(i))); - } + Err(e) => Err(CometError::Internal(format!( + "Failed to compile regular expression: {e:?}" + )) + .into()), } - Ok(ColumnarValue::Array(Arc::new(builder.finish()))) } else { - todo!() + Err( + CometError::Internal("Only scalar regex patterns are supported".to_string()) + .into(), + ) } } else { - todo!() + Err(CometError::Internal("Only columnar inputs are supported".to_string()).into()) } } From 79796343399fa74d7c9c2c1677a1c5db2f6a8bea Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 24 May 2024 08:10:19 -0600 Subject: [PATCH 05/13] improve test --- core/src/execution/datafusion/expressions/regexp.rs | 2 ++ .../scala/org/apache/comet/serde/QueryPlanSerde.scala | 10 ++++++++++ .../scala/org/apache/comet/CometExpressionSuite.scala | 6 ++++++ 3 files changed, 18 insertions(+) diff --git a/core/src/execution/datafusion/expressions/regexp.rs b/core/src/execution/datafusion/expressions/regexp.rs index cd6f6cd58..1026e3ad4 100644 --- a/core/src/execution/datafusion/expressions/regexp.rs +++ b/core/src/execution/datafusion/expressions/regexp.rs @@ -115,6 +115,8 @@ impl PhysicalExpr for RLike { ) } } else { + // this should be unreachable because Spark will evaluate regex expressions against + // literal strings as part of query planning Err(CometError::Internal("Only columnar inputs are supported".to_string()).into()) } } diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 63e804fa1..088025cc2 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -1100,6 +1100,16 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim // in Rust, or even transpile the pattern to work around differences between the JVM // and Rust regular expression engines if (CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.get()) { + + // we currently only support scalar regex patterns + right match { + case Literal(_, DataTypes.StringType) => + // supported + case _ => + withInfo(expr, "Only scalar patterns are supported") + return None + } + val leftExpr = exprToProtoInternal(left, inputs) val rightExpr = exprToProtoInternal(right, inputs) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 44c3ca6b4..435a64162 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -601,6 +601,12 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { withSQLConf(CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key -> "true") { val query = sql(s"select id from $table where name rlike 'R[a-z]+s [Rr]ose'") checkSparkAnswerAndOperator(query) + + // test that we fall back to Spark if the pattern is not a scalar value + val query2 = sql(s"select id from $table where name rlike name") + val (sparkPlan, cometPlan) = checkSparkAnswer(query2) + val explain = new ExtendedExplainInfo().generateExtendedInfo(cometPlan) + assert(explain == "Only scalar patterns are supported") } } } From b2c9688795b2f09ca7208752e9aef53b4fcd5b73 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 24 May 2024 08:17:08 -0600 Subject: [PATCH 06/13] add to supported expressions --- docs/source/user-guide/expressions.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/user-guide/expressions.md b/docs/source/user-guide/expressions.md index 521699d34..c99dfb3e5 100644 --- a/docs/source/user-guide/expressions.md +++ b/docs/source/user-guide/expressions.md @@ -48,7 +48,7 @@ The following Spark expressions are currently available: - Substring - Coalesce - StringSpace - - Like + - Like/RLike - Contains - Startswith - Endswith From 93daf41c32c53b219eb94ea6f2a355fc0c579238 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 24 May 2024 08:20:19 -0600 Subject: [PATCH 07/13] add notes to compatibility guide --- docs/source/user-guide/compatibility.md | 7 +++++++ docs/templates/compatibility-template.md | 7 +++++++ 2 files changed, 14 insertions(+) diff --git a/docs/source/user-guide/compatibility.md b/docs/source/user-guide/compatibility.md index a16fd1b21..9791a169c 100644 --- a/docs/source/user-guide/compatibility.md +++ b/docs/source/user-guide/compatibility.md @@ -32,6 +32,13 @@ be used in production. There is an [epic](https://github.com/apache/datafusion-comet/issues/313) where we are tracking the work to fully implement ANSI support. +## Regular Expressions + +Comet uses the [regex](https://crates.io/crates/regex) crate to evaluate regular expressions, and it is expected that +this will produce different results to Java's regular expression engine in some cases. It also lacks support for +features such as backreferences. For these reasons, regular expression support is disabled by default and can be +enabled by setting `spark.comet.regexp.allowIncompatible=true`. + ## Cast Cast operations in Comet fall into three levels of support: diff --git a/docs/templates/compatibility-template.md b/docs/templates/compatibility-template.md index 64f871354..9aecef1c8 100644 --- a/docs/templates/compatibility-template.md +++ b/docs/templates/compatibility-template.md @@ -32,6 +32,13 @@ be used in production. There is an [epic](https://github.com/apache/datafusion-comet/issues/313) where we are tracking the work to fully implement ANSI support. +## Regular Expressions + +Comet uses the [regex](https://crates.io/crates/regex) crate to evaluate regular expressions, and it is expected that +this will produce different results to Java's regular expression engine in some cases. It also lacks support for +features such as backreferences. For these reasons, regular expression support is disabled by default and can be +enabled by setting `spark.comet.regexp.allowIncompatible=true`. + ## Cast Cast operations in Comet fall into three levels of support: From 204dd590a02b597737e7b28bac3c4097797e25f3 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 7 Jun 2024 09:11:46 -0600 Subject: [PATCH 08/13] fix merge conflict --- core/src/execution/datafusion/expressions/mod.rs | 3 --- core/src/execution/datafusion/planner.rs | 3 --- 2 files changed, 6 deletions(-) diff --git a/core/src/execution/datafusion/expressions/mod.rs b/core/src/execution/datafusion/expressions/mod.rs index 5f127c0d4..496c6c48d 100644 --- a/core/src/execution/datafusion/expressions/mod.rs +++ b/core/src/execution/datafusion/expressions/mod.rs @@ -29,11 +29,8 @@ pub mod avg_decimal; pub mod bloom_filter_might_contain; pub mod correlation; pub mod covariance; -<<<<<<< HEAD pub mod regexp; -======= pub mod negative; ->>>>>>> apache/main pub mod stats; pub mod stddev; pub mod strings; diff --git a/core/src/execution/datafusion/planner.rs b/core/src/execution/datafusion/planner.rs index 36c29e6dd..240eca3c4 100644 --- a/core/src/execution/datafusion/planner.rs +++ b/core/src/execution/datafusion/planner.rs @@ -70,11 +70,8 @@ use crate::{ correlation::Correlation, covariance::Covariance, if_expr::IfExpr, -<<<<<<< HEAD regexp::RLike, -======= negative, ->>>>>>> apache/main scalar_funcs::create_comet_physical_fun, stats::StatsType, stddev::Stddev, From 390fd0fb60b37f2b3867909c5d84e0e8a9bc4129 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 7 Jun 2024 09:17:46 -0600 Subject: [PATCH 09/13] lint --- core/src/execution/datafusion/expressions/mod.rs | 2 +- core/src/execution/datafusion/planner.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/execution/datafusion/expressions/mod.rs b/core/src/execution/datafusion/expressions/mod.rs index 496c6c48d..f90ddb54e 100644 --- a/core/src/execution/datafusion/expressions/mod.rs +++ b/core/src/execution/datafusion/expressions/mod.rs @@ -29,8 +29,8 @@ pub mod avg_decimal; pub mod bloom_filter_might_contain; pub mod correlation; pub mod covariance; -pub mod regexp; pub mod negative; +pub mod regexp; pub mod stats; pub mod stddev; pub mod strings; diff --git a/core/src/execution/datafusion/planner.rs b/core/src/execution/datafusion/planner.rs index 240eca3c4..beecb4272 100644 --- a/core/src/execution/datafusion/planner.rs +++ b/core/src/execution/datafusion/planner.rs @@ -70,8 +70,8 @@ use crate::{ correlation::Correlation, covariance::Covariance, if_expr::IfExpr, - regexp::RLike, negative, + regexp::RLike, scalar_funcs::create_comet_physical_fun, stats::StatsType, stddev::Stddev, From 0b1b2c4a0a1a12c09b98117b68a7475f6fd30b95 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 7 Jun 2024 14:47:01 -0600 Subject: [PATCH 10/13] update for DataFusion 39 API change --- core/src/execution/datafusion/expressions/regexp.rs | 4 ++-- core/src/execution/datafusion/expressions/strings.rs | 2 -- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/execution/datafusion/expressions/regexp.rs b/core/src/execution/datafusion/expressions/regexp.rs index 1026e3ad4..aa4db724b 100644 --- a/core/src/execution/datafusion/expressions/regexp.rs +++ b/core/src/execution/datafusion/expressions/regexp.rs @@ -121,8 +121,8 @@ impl PhysicalExpr for RLike { } } - fn children(&self) -> Vec> { - vec![self.child.clone()] + fn children(&self) -> Vec<&Arc> { + vec![&self.child] } fn with_new_children( diff --git a/core/src/execution/datafusion/expressions/strings.rs b/core/src/execution/datafusion/expressions/strings.rs index cbbd4cfa4..39b7f7b93 100644 --- a/core/src/execution/datafusion/expressions/strings.rs +++ b/core/src/execution/datafusion/expressions/strings.rs @@ -143,8 +143,6 @@ make_predicate_function!(EndsWith, ends_with_dyn, ends_with_utf8_scalar_dyn); make_predicate_function!(Contains, contains_dyn, contains_utf8_scalar_dyn); -// make_predicate_function!(RLike, rlike_dyn, rlike_utf8_scalar_dyn); - #[derive(Debug, Hash)] pub struct SubstringExec { pub child: Arc, From b74394fe808b1c34f23476071c167be81fa0e540 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 7 Jun 2024 15:19:19 -0600 Subject: [PATCH 11/13] update expressions doc --- docs/source/user-guide/expressions.md | 54 +++++++++++++-------------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/docs/source/user-guide/expressions.md b/docs/source/user-guide/expressions.md index 44e3129e8..53f419a4c 100644 --- a/docs/source/user-guide/expressions.md +++ b/docs/source/user-guide/expressions.md @@ -66,32 +66,33 @@ The following Spark expressions are currently available. Any known compatibility ## String Functions -| Expression | Notes | -| --------------- | ----------------------------------------------------------------------------------------------------------- | -| Ascii | | -| BitLength | | -| Chr | | -| ConcatWs | | -| Contains | | -| EndsWith | | -| InitCap | | -| Instr | | -| Length | | -| Like | | -| Lower | | -| OctetLength | | -| Repeat | Negative argument for number of times to repeat causes exception | -| Replace | | -| Reverse | | -| StartsWith | | -| StringSpace | | -| StringTrim | | -| StringTrimBoth | | -| StringTrimLeft | | -| StringTrimRight | | -| Substring | | -| Translate | | -| Upper | | +| Expression | Notes | +| --------------- | ------------------------------------------------------------------ | +| Ascii | | +| BitLength | | +| Chr | | +| ConcatWs | | +| Contains | | +| EndsWith | | +| InitCap | | +| Instr | | +| Length | | +| Like | | +| Lower | | +| OctetLength | | +| Repeat | Negative argument for number of times to repeat causes exception | +| Replace | | +| Reverse | | +| RLike | RLike is disabled by default. See compatibility guide for details. | +| StartsWith | | +| StringSpace | | +| StringTrim | | +| StringTrimBoth | | +| StringTrimLeft | | +| StringTrimRight | | +| Substring | | +| Translate | | +| Upper | | ## Date/Time Functions @@ -190,4 +191,3 @@ The following Spark expressions are currently available. Any known compatibility | ScalarSubquery | | | Coalesce | | | NormalizeNaNAndZero | | - From a6db1a749d39329f0907671dc4c6c3adc009ba53 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 13 Jun 2024 09:39:03 -0600 Subject: [PATCH 12/13] regexp benchmarks and tests --- core/Cargo.toml | 4 + core/benches/regexp.rs | 75 +++++++++++++++++++ .../datafusion/expressions/regexp.rs | 61 +++++++++++++++ 3 files changed, 140 insertions(+) create mode 100644 core/benches/regexp.rs diff --git a/core/Cargo.toml b/core/Cargo.toml index fe74b3554..c345bfece 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -136,3 +136,7 @@ harness = false [[bench]] name = "shuffle_writer" harness = false + +[[regexp]] +name = "regexp" +harness = false diff --git a/core/benches/regexp.rs b/core/benches/regexp.rs new file mode 100644 index 000000000..ba5e7002e --- /dev/null +++ b/core/benches/regexp.rs @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +use std::sync::Arc; +use arrow::datatypes::Int32Type; +use arrow::error::ArrowError; +use arrow_array::{builder::StringBuilder, builder::StringDictionaryBuilder, RecordBatch}; +use arrow_schema::{DataType, Field, Schema}; +use comet::execution::datafusion::expressions::regexp::RLike; +use criterion::{criterion_group, criterion_main, Criterion}; +use datafusion::common::ScalarValue; +use datafusion_physical_expr::{expressions::Column, expressions::Literal, PhysicalExpr, expressions::LikeExpr}; + +fn criterion_benchmark(c: &mut Criterion) { + let batch = create_utf8_batch().unwrap(); + let child_expr = Arc::new(Column::new("foo", 0)); + let pattern_expr = Arc::new(Literal::new(ScalarValue::Utf8(Some("5[0-9]5".to_string())))); + let rlike = RLike::new(child_expr.clone(), pattern_expr.clone()); + let df_rlike = LikeExpr::new(false, false, child_expr, pattern_expr); + + let mut group = c.benchmark_group("regexp"); + group.bench_function("regexp_comet_rlike", |b| { + b.iter(|| rlike.evaluate(&batch).unwrap()); + }); + group.bench_function("regexp_datafusion_rlike", |b| { + b.iter(|| df_rlike.evaluate(&batch).unwrap()); + }); +} + +fn create_utf8_batch() -> Result { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, true), + Field::new("b", DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), true) + ])); + let mut string_builder = StringBuilder::new(); + let mut string_dict_builder = StringDictionaryBuilder::::new(); + for i in 0..1000 { + if i % 10 == 0 { + string_builder.append_null(); + string_dict_builder.append_null(); + } else { + string_builder.append_value(format!("{}", i)); + string_dict_builder.append_value(format!("{}", i)); + } + } + let string_array = string_builder.finish(); + let string_dict_array2 = string_dict_builder.finish(); + RecordBatch::try_new(schema.clone(), vec![Arc::new(string_array), Arc::new(string_dict_array2)]) +} + +fn config() -> Criterion { + Criterion::default() +} + +criterion_group! { + name = benches; + config = config(); + targets = criterion_benchmark +} +criterion_main!(benches); diff --git a/core/src/execution/datafusion/expressions/regexp.rs b/core/src/execution/datafusion/expressions/regexp.rs index aa4db724b..8226bf148 100644 --- a/core/src/execution/datafusion/expressions/regexp.rs +++ b/core/src/execution/datafusion/expressions/regexp.rs @@ -142,3 +142,64 @@ impl PhysicalExpr for RLike { self.hash(&mut s); } } + +#[cfg(test)] +mod test { + use std::sync::Arc; + use arrow_array::builder::{StringBuilder, StringDictionaryBuilder}; + use arrow_array::{Array, BooleanArray, RecordBatch}; + use arrow_array::types::Int32Type; + use arrow_schema::{ArrowError, DataType, Field, Schema}; + use datafusion_common::{DataFusionError, ScalarValue}; + use datafusion_expr::ColumnarValue; + use datafusion_physical_expr::expressions::Literal; + use datafusion_physical_expr_common::expressions::column::Column; + use datafusion_physical_expr_common::physical_expr::PhysicalExpr; + use super::*; + + #[test] + fn test_string_input() -> Result<(), DataFusionError> { + do_test(0, "5[0-9]5", 10) + } + + #[test] + fn test_dict_encoded_string_input() -> Result<(), DataFusionError> { + do_test(1, "5[0-9]5", 10) + } + + fn do_test(column: usize, pattern: &str, expected_count: usize) -> Result<(), DataFusionError> { + let batch = create_utf8_batch()?; + let child_expr = Arc::new(Column::new("foo", column)); + let pattern_expr = Arc::new(Literal::new(ScalarValue::Utf8(Some(pattern.to_string())))); + let rlike = RLike::new(child_expr, pattern_expr); + if let ColumnarValue::Array(array) = rlike.evaluate(&batch).unwrap() { + let array = array.as_any().downcast_ref::().expect("boolean array"); + assert_eq!(expected_count, array.true_count()); + } else { + unreachable!() + } + Ok(()) + } + + fn create_utf8_batch() -> Result { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, true), + Field::new("b", DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), true) + ])); + let mut string_builder = StringBuilder::new(); + let mut string_dict_builder = StringDictionaryBuilder::::new(); + for i in 0..1000 { + if i % 10 == 0 { + string_builder.append_null(); + string_dict_builder.append_null(); + } else { + string_builder.append_value(format!("{}", i)); + string_dict_builder.append_value(format!("{}", i)); + } + } + let string_array = string_builder.finish(); + let string_dict_array2 = string_dict_builder.finish(); + RecordBatch::try_new(schema.clone(), vec![Arc::new(string_array), Arc::new(string_dict_array2)]) + } + +} \ No newline at end of file From 6177118049fcb8207c9d957b336516e66a0b6e46 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 13 Jun 2024 19:48:29 -0600 Subject: [PATCH 13/13] fix error --- core/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index c345bfece..7e6017027 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -137,6 +137,6 @@ harness = false name = "shuffle_writer" harness = false -[[regexp]] +[[bench]] name = "regexp" harness = false