Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support ANSI mode in CAST from String to Bool #290

Merged
merged 19 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,14 @@ object CometConf {
.toSequence
.createWithDefault(Seq("Range,InMemoryTableScan"))

val COMET_ANSI_MODE_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.ansi.enabled")
.doc(
"Comet does not respect ANSI mode in most cases and by default will not accelerate " +
"queries when ansi mode is enabled. Enable this setting to test Comet's experimental " +
"support for ANSI mode. This should not be used in production.")
.booleanConf
.createWithDefault(false)

}

object ConfigHelpers {
Expand Down
16 changes: 16 additions & 0 deletions core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@ pub enum CometError {
#[error("Comet Internal Error: {0}")]
Internal(String),

// TODO this error message is likely to change between Spark versions and it would be better
// to have the full error in Scala and just pass the invalid value back here
#[error("[[CAST_INVALID_INPUT] The value '{value}' of the type \"{from_type}\" cannot be cast to \"{to_type}\" \
because it is malformed. Correct the value as per the syntax, or change its target type. \
Use `try_cast` to tolerate malformed input and return NULL instead. If necessary \
set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error")]
CastInvalidValue {
value: String,
from_type: String,
to_type: String,
},

#[error(transparent)]
Arrow {
#[from]
Expand Down Expand Up @@ -183,6 +195,10 @@ impl jni::errors::ToException for CometError {
class: "java/lang/NullPointerException".to_string(),
msg: self.to_string(),
},
CometError::CastInvalidValue { .. } => Exception {
class: "org/apache/spark/SparkException".to_string(),
msg: self.to_string(),
},
CometError::NumberIntFormat { source: s } => Exception {
class: "java/lang/NumberFormatException".to_string(),
msg: s.to_string(),
Expand Down
48 changes: 37 additions & 11 deletions core/src/execution/datafusion/expressions/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::{
sync::Arc,
};

use crate::errors::{CometError, CometResult};
use arrow::{
compute::{cast_with_options, CastOptions},
record_batch::RecordBatch,
Expand Down Expand Up @@ -49,26 +50,38 @@ static CAST_OPTIONS: CastOptions = CastOptions {
pub struct Cast {
pub child: Arc<dyn PhysicalExpr>,
pub data_type: DataType,
pub ansi_mode: bool,

/// When cast from/to timezone related types, we need timezone, which will be resolved with
/// session local timezone by an analyzer in Spark.
pub timezone: String,
}

impl Cast {
pub fn new(child: Arc<dyn PhysicalExpr>, data_type: DataType, timezone: String) -> Self {
pub fn new(
child: Arc<dyn PhysicalExpr>,
data_type: DataType,
ansi_mode: bool,
timezone: String,
) -> Self {
Self {
child,
data_type,
timezone,
ansi_mode,
}
}

pub fn new_without_timezone(child: Arc<dyn PhysicalExpr>, data_type: DataType) -> Self {
pub fn new_without_timezone(
child: Arc<dyn PhysicalExpr>,
data_type: DataType,
ansi_mode: bool,
) -> Self {
Self {
child,
data_type,
timezone: "".to_string(),
ansi_mode,
}
}

Expand All @@ -77,17 +90,22 @@ impl Cast {
let array = array_with_timezone(array, self.timezone.clone(), Some(to_type));
let from_type = array.data_type();
let cast_result = match (from_type, to_type) {
(DataType::Utf8, DataType::Boolean) => Self::spark_cast_utf8_to_boolean::<i32>(&array),
(DataType::Utf8, DataType::Boolean) => {
Self::spark_cast_utf8_to_boolean::<i32>(&array, self.ansi_mode)?
}
(DataType::LargeUtf8, DataType::Boolean) => {
Self::spark_cast_utf8_to_boolean::<i64>(&array)
Self::spark_cast_utf8_to_boolean::<i64>(&array, self.ansi_mode)?
}
_ => cast_with_options(&array, to_type, &CAST_OPTIONS)?,
};
let result = spark_cast(cast_result, from_type, to_type);
Ok(result)
}

fn spark_cast_utf8_to_boolean<OffsetSize>(from: &dyn Array) -> ArrayRef
fn spark_cast_utf8_to_boolean<OffsetSize>(
from: &dyn Array,
ansi_mode: bool,
) -> CometResult<ArrayRef>
where
OffsetSize: OffsetSizeTrait,
{
Expand All @@ -100,15 +118,22 @@ impl Cast {
.iter()
.map(|value| match value {
Some(value) => match value.to_ascii_lowercase().trim() {
"t" | "true" | "y" | "yes" | "1" => Some(true),
"f" | "false" | "n" | "no" | "0" => Some(false),
_ => None,
"t" | "true" | "y" | "yes" | "1" => Ok(Some(true)),
"f" | "false" | "n" | "no" | "0" => Ok(Some(false)),
other if ansi_mode => {
Err(CometError::CastInvalidValue {
value: other.to_string(),
from_type: "STRING".to_string(),
to_type: "BOOLEAN".to_string(),
})
}
_ => Ok(None),
},
_ => None,
_ => Ok(None),
})
.collect::<BooleanArray>();
.collect::<Result<BooleanArray, _>>()?;

Arc::new(output_array)
Ok(Arc::new(output_array))
}
}

Expand Down Expand Up @@ -174,6 +199,7 @@ impl PhysicalExpr for Cast {
Ok(Arc::new(Cast::new(
andygrove marked this conversation as resolved.
Show resolved Hide resolved
children[0].clone(),
self.data_type.clone(),
self.ansi_mode,
self.timezone.clone(),
)))
}
Expand Down
50 changes: 34 additions & 16 deletions core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,27 +112,17 @@ pub struct PhysicalPlanner {
exec_context_id: i64,
execution_props: ExecutionProps,
session_ctx: Arc<SessionContext>,
}

impl Default for PhysicalPlanner {
andygrove marked this conversation as resolved.
Show resolved Hide resolved
fn default() -> Self {
let session_ctx = Arc::new(SessionContext::new());
let execution_props = ExecutionProps::new();
Self {
exec_context_id: TEST_EXEC_CONTEXT_ID,
execution_props,
session_ctx,
}
}
ansi_mode: bool,
}

impl PhysicalPlanner {
pub fn new(session_ctx: Arc<SessionContext>) -> Self {
pub fn new(session_ctx: Arc<SessionContext>, ansi_mode: bool) -> Self {
let execution_props = ExecutionProps::new();
Self {
exec_context_id: TEST_EXEC_CONTEXT_ID,
execution_props,
session_ctx,
ansi_mode,
}
}

Expand All @@ -141,6 +131,7 @@ impl PhysicalPlanner {
exec_context_id,
execution_props: self.execution_props,
session_ctx: self.session_ctx.clone(),
ansi_mode: self.ansi_mode,
}
}

Expand Down Expand Up @@ -343,7 +334,12 @@ impl PhysicalPlanner {
let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema)?;
let datatype = to_arrow_datatype(expr.datatype.as_ref().unwrap());
let timezone = expr.timezone.clone();
Ok(Arc::new(Cast::new(child, datatype, timezone)))
Ok(Arc::new(Cast::new(
child,
datatype,
self.ansi_mode,
timezone,
)))
}
ExprStruct::Hour(expr) => {
let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema)?;
Expand Down Expand Up @@ -638,13 +634,19 @@ impl PhysicalPlanner {
let left = Arc::new(Cast::new_without_timezone(
left,
DataType::Decimal256(p1, s1),
self.ansi_mode,
));
let right = Arc::new(Cast::new_without_timezone(
right,
DataType::Decimal256(p2, s2),
self.ansi_mode,
));
let child = Arc::new(BinaryExpr::new(left, op, right));
Ok(Arc::new(Cast::new_without_timezone(child, data_type)))
Ok(Arc::new(Cast::new_without_timezone(
child,
data_type,
self.ansi_mode,
)))
}
(
DataFusionOperator::Divide,
Expand Down Expand Up @@ -1432,7 +1434,9 @@ mod tests {

use arrow_array::{DictionaryArray, Int32Array, StringArray};
use arrow_schema::DataType;
use datafusion::{physical_plan::common::collect, prelude::SessionContext};
use datafusion::{
execution::context::ExecutionProps, physical_plan::common::collect, prelude::SessionContext,
};
use tokio::sync::mpsc;

use crate::execution::{
Expand All @@ -1442,9 +1446,23 @@ mod tests {
spark_operator,
};

use crate::execution::datafusion::planner::TEST_EXEC_CONTEXT_ID;
use spark_expression::expr::ExprStruct::*;
use spark_operator::{operator::OpStruct, Operator};

impl Default for PhysicalPlanner {
fn default() -> Self {
let session_ctx = Arc::new(SessionContext::new());
let execution_props = ExecutionProps::new();
Self {
exec_context_id: TEST_EXEC_CONTEXT_ID,
execution_props,
session_ctx,
ansi_mode: false,
}
}
}

#[test]
fn test_unpack_dictionary_primitive() {
let op_scan = Operator {
Expand Down
5 changes: 4 additions & 1 deletion core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,11 +317,14 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(

let exec_context_id = exec_context.id;

let ansi_mode =
matches!(exec_context.conf.get("ansi_mode"), Some(value) if value == "true");

// Initialize the execution stream.
// Because we don't know if input arrays are dictionary-encoded when we create
// query plan, we need to defer stream initialization to first time execution.
if exec_context.root_op.is_none() {
let planner = PhysicalPlanner::new(exec_context.session_ctx.clone())
let planner = PhysicalPlanner::new(exec_context.session_ctx.clone(), ansi_mode)
.with_exec_id(exec_context_id);
let (scans, root_op) = planner.create_plan(
&exec_context.spark_plan,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
package org.apache.comet

import org.apache.spark._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.comet.CometMetricNode
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.vectorized._

import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_FRACTION}
Expand All @@ -44,7 +46,8 @@ class CometExecIterator(
val id: Long,
inputs: Seq[Iterator[ColumnarBatch]],
protobufQueryPlan: Array[Byte],
nativeMetrics: CometMetricNode)
nativeMetrics: CometMetricNode,
ansiEnabled: Boolean)
extends Iterator[ColumnarBatch] {

private val nativeLib = new Native()
Expand Down Expand Up @@ -99,6 +102,7 @@ class CometExecIterator(
result.put("memory_fraction", String.valueOf(COMET_EXEC_MEMORY_FRACTION.get()))
result.put("batch_size", String.valueOf(COMET_BATCH_SIZE.get()))
result.put("debug_native", String.valueOf(COMET_DEBUG_ENABLED.get()))
result.put("ansi_mode", String.valueOf(ansiEnabled))

// Strip mandatory prefix spark. which is not required for DataFusion session params
conf.getAll.foreach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,8 +568,12 @@ class CometSparkSessionExtensions
// DataFusion doesn't have ANSI mode. For now we just disable CometExec if ANSI mode is
// enabled.
if (isANSIEnabled(conf)) {
logInfo("Comet extension disabled for ANSI mode")
return plan
if (COMET_ANSI_MODE_ENABLED.get()) {
logWarning("Using Comet's experimental support for ANSI mode.")
} else {
logInfo("Comet extension disabled for ANSI mode")
return plan
}
}

// We shouldn't transform Spark query plan if Comet is disabled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ package org.apache.spark.sql.comet

import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.comet.execution.shuffle.{CometShuffledBatchRDD, CometShuffleExchangeExec}
import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan, UnaryExecNode, UnsafeRowSerializer}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.vectorized.ColumnarBatch

import com.google.common.base.Objects
Expand Down Expand Up @@ -73,7 +75,11 @@ case class CometCollectLimitExec(
childRDD
} else {
val localLimitedRDD = if (limit >= 0) {
CometExecUtils.getNativeLimitRDD(childRDD, output, limit)
CometExecUtils.getNativeLimitRDD(
childRDD,
output,
limit,
SparkSession.active.conf.get(SQLConf.ANSI_ENABLED))
} else {
childRDD
}
Expand All @@ -88,7 +94,11 @@ case class CometCollectLimitExec(

new CometShuffledBatchRDD(dep, readMetrics)
}
CometExecUtils.getNativeLimitRDD(singlePartitionRDD, output, limit)
CometExecUtils.getNativeLimitRDD(
singlePartitionRDD,
output,
limit,
SparkSession.active.conf.get(SQLConf.ANSI_ENABLED))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@ object CometExecUtils {
def getNativeLimitRDD(
childPlan: RDD[ColumnarBatch],
outputAttribute: Seq[Attribute],
limit: Int): RDD[ColumnarBatch] = {
limit: Int,
ansiMode: Boolean): RDD[ColumnarBatch] = {
childPlan.mapPartitionsInternal { iter =>
val limitOp = CometExecUtils.getLimitNativePlan(outputAttribute, limit).get
CometExec.getCometIterator(Seq(iter), limitOp)
CometExec.getCometIterator(Seq(iter), limitOp, ansiMode)
}
}

Expand Down
Loading
Loading