Skip to content

Commit

Permalink
upmerge
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Dec 18, 2024
2 parents 0a737b1 + e297d23 commit 2d698c3
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 11 deletions.
103 changes: 102 additions & 1 deletion dev/diffs/4.0.0-preview1.diff
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,77 @@ index 698ca009b4f..57d774a3617 100644

-- Test tables
CREATE table explain_temp1 (key int, val int) USING PARQUET;
diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql
index 3a409eea348..26e9aaf215c 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int4.sql
@@ -6,6 +6,9 @@
-- https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/int4.sql
--

+-- TODO: https://github.com/apache/datafusion-comet/issues/551
+--SET spark.comet.enabled = false
+
CREATE TABLE INT4_TBL(f1 int) USING parquet;

-- [SPARK-28023] Trim the string when cast string type to other types
diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
index fac23b4a26f..98b12ae5ccc 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
@@ -6,6 +6,10 @@
-- Test int8 64-bit integers.
-- https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/int8.sql
--
+
+-- TODO: https://github.com/apache/datafusion-comet/issues/551
+--SET spark.comet.enabled = false
+
CREATE TABLE INT8_TBL(q1 bigint, q2 bigint) USING parquet;

-- PostgreSQL implicitly casts string literals to data with integral types, but
diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
index 0efe0877e9b..f9df0400c99 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/select_having.sql
@@ -6,6 +6,9 @@
-- https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/select_having.sql
--

+-- TODO: https://github.com/apache/datafusion-comet/issues/551
+--SET spark.comet.enabled = false
+
-- load test data
CREATE TABLE test_having (a int, b int, c string, d string) USING parquet;
INSERT INTO test_having VALUES (0, 1, 'XXXX', 'A');
diff --git a/sql/core/src/test/resources/sql-tests/inputs/view-schema-binding-config.sql b/sql/core/src/test/resources/sql-tests/inputs/view-schema-binding-config.sql
index e803254ea64..74db78aee38 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/view-schema-binding-config.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/view-schema-binding-config.sql
@@ -1,6 +1,9 @@
-- This test suits check the spark.sql.viewSchemaBindingMode configuration.
-- It can be DISABLED and COMPENSATION

+-- TODO: https://github.com/apache/datafusion-comet/issues/551
+--SET spark.comet.enabled = false
+
-- Verify the default binding is true
SET spark.sql.legacy.viewSchemaBindingMode;

diff --git a/sql/core/src/test/resources/sql-tests/inputs/view-schema-compensation.sql b/sql/core/src/test/resources/sql-tests/inputs/view-schema-compensation.sql
index 21a3ce1e122..f4762ab98f0 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/view-schema-compensation.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/view-schema-compensation.sql
@@ -1,5 +1,9 @@
-- This test suite checks the WITH SCHEMA COMPENSATION clause
-- Disable ANSI mode to ensure we are forcing it explicitly in the CASTS
+
+-- TODO: https://github.com/apache/datafusion-comet/issues/551
+--SET spark.comet.enabled = false
+
SET spark.sql.ansi.enabled = false;

-- In COMPENSATION views get invalidated if the type can't cast
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index d023fb82185..0f4f03bda6c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
Expand Down Expand Up @@ -917,7 +988,7 @@ index 34c6c49bc49..f5dea07a213 100644
protected val baseResourcePath = {
// use the same way as `SQLQueryTestSuite` to get the resource path
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 56c364e2084..a00a50e020a 100644
index 56c364e2084..fc3abd7cdc4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1510,7 +1510,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
Expand All @@ -930,6 +1001,36 @@ index 56c364e2084..a00a50e020a 100644
AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external sort") {
sql("SELECT * FROM testData2 ORDER BY a ASC, b ASC").collect()
}
@@ -4454,7 +4455,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
}

test("SPARK-39166: Query context of binary arithmetic should be serialized to executors" +
- " when WSCG is off") {
+ " when WSCG is off",
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/551")) {
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
SQLConf.ANSI_ENABLED.key -> "true") {
withTable("t") {
@@ -4475,7 +4477,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
}

test("SPARK-39175: Query context of Cast should be serialized to executors" +
- " when WSCG is off") {
+ " when WSCG is off",
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/551")) {
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
SQLConf.ANSI_ENABLED.key -> "true") {
withTable("t") {
@@ -4502,7 +4505,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
}

test("SPARK-39190,SPARK-39208,SPARK-39210: Query context of decimal overflow error should " +
- "be serialized to executors when WSCG is off") {
+ "be serialized to executors when WSCG is off",
+ IgnoreComet("TODO: https://github.com/apache/datafusion-comet/issues/551")) {
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
SQLConf.ANSI_ENABLED.key -> "true") {
withTable("t") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index 68f14f13bbd..174636cefb5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
Expand Down
26 changes: 22 additions & 4 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
metrics_node: JObject,
comet_task_memory_manager_obj: JObject,
batch_size: jint,
use_unified_memory_manager: jboolean,
memory_limit: jlong,
memory_fraction: jdouble,
debug_native: jboolean,
explain_native: jboolean,
worker_threads: jint,
Expand Down Expand Up @@ -147,7 +150,13 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
// We need to keep the session context alive. Some session state like temporary
// dictionaries are stored in session context. If it is dropped, the temporary
// dictionaries will be dropped as well.
let session = prepare_datafusion_session_context(batch_size as usize, task_memory_manager)?;
let session = prepare_datafusion_session_context(
batch_size as usize,
use_unified_memory_manager == 1,
memory_limit as usize,
memory_fraction,
task_memory_manager,
)?;

let plan_creation_time = start.elapsed();

Expand All @@ -174,13 +183,22 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
/// Configure DataFusion session context.
fn prepare_datafusion_session_context(
batch_size: usize,
use_unified_memory_manager: bool,
memory_limit: usize,
memory_fraction: f64,
comet_task_memory_manager: Arc<GlobalRef>,
) -> CometResult<SessionContext> {
let mut rt_config = RuntimeConfig::new().with_disk_manager(DiskManagerConfig::NewOs);

// Set Comet memory pool for native
let memory_pool = CometMemoryPool::new(comet_task_memory_manager);
rt_config = rt_config.with_memory_pool(Arc::new(memory_pool));
// Check if we are using unified memory manager integrated with Spark.
if use_unified_memory_manager {
// Set Comet memory pool for native
let memory_pool = CometMemoryPool::new(comet_task_memory_manager);
rt_config = rt_config.with_memory_pool(Arc::new(memory_pool));
} else {
// Use the memory pool from DF
rt_config = rt_config.with_memory_limit(memory_limit, memory_fraction)
}

// Get Datafusion configuration from Spark Execution context
// can be configured in Comet Spark JVM using Spark --conf parameters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark._
import org.apache.spark.sql.comet.CometMetricNode
import org.apache.spark.sql.vectorized._

import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS}
import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_FRACTION, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS}
import org.apache.comet.vector.NativeUtil

/**
Expand Down Expand Up @@ -60,13 +60,20 @@ class CometExecIterator(
new CometBatchIterator(iterator, nativeUtil)
}.toArray
private val plan = {
val conf = SparkEnv.get.conf
// Only enable unified memory manager when off-heap mode is enabled. Otherwise,
// we'll use the built-in memory pool from DF, and initializes with `memory_limit`
// and `memory_fraction` below.
nativeLib.createPlan(
id,
cometBatchIterators,
protobufQueryPlan,
nativeMetrics,
new CometTaskMemoryManager(id),
batchSize = COMET_BATCH_SIZE.get(),
use_unified_memory_manager = conf.getBoolean("spark.memory.offHeap.enabled", false),
memory_limit = CometSparkSessionExtensions.getCometMemoryOverhead(conf),
memory_fraction = COMET_EXEC_MEMORY_FRACTION.get(),
debug = COMET_DEBUG_ENABLED.get(),
explain = COMET_EXPLAIN_NATIVE_ENABLED.get(),
workerThreads = COMET_WORKER_THREADS.get(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import org.apache.spark.sql.types.{DoubleType, FloatType}

import org.apache.comet.CometConf._
import org.apache.comet.CometExplainInfo.getActualPlan
import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometEnabled, isCometExecEnabled, isCometJVMShuffleMode, isCometNativeShuffleMode, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isSpark34Plus, isSpark40Plus, shouldApplySparkToColumnar, withInfo, withInfos}
import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometEnabled, isCometExecEnabled, isCometJVMShuffleMode, isCometNativeShuffleMode, isCometScan, isCometScanEnabled, isCometShuffleEnabled, isOffHeapEnabled, isSpark34Plus, isSpark40Plus, isTesting, shouldApplySparkToColumnar, withInfo, withInfos}
import org.apache.comet.parquet.{CometParquetScan, SupportsComet}
import org.apache.comet.rules.RewriteJoin
import org.apache.comet.serde.OperatorOuterClass.Operator
Expand Down Expand Up @@ -921,8 +921,9 @@ class CometSparkSessionExtensions
override def apply(plan: SparkPlan): SparkPlan = {

// Comet required off-heap memory to be enabled
if ("true" != conf.getConfString("spark.memory.offHeap.enabled", "false")) {
logInfo("Comet extension disabled because spark.memory.offHeap.enabled=false")
if (!isOffHeapEnabled(conf) && !isTesting) {
logWarning("Comet native exec disabled because spark.memory.offHeap.enabled=false")
withInfo(plan, "Comet native exec disabled because spark.memory.offHeap.enabled=false")
return plan
}

Expand Down Expand Up @@ -1174,8 +1175,7 @@ object CometSparkSessionExtensions extends Logging {
}

private[comet] def isOffHeapEnabled(conf: SQLConf): Boolean =
conf.contains("spark.memory.offHeap.enabled") &&
conf.getConfString("spark.memory.offHeap.enabled").toBoolean
conf.getConfString("spark.memory.offHeap.enabled", "false").toBoolean

// Copied from org.apache.spark.util.Utils which is private to Spark.
private[comet] def isTesting: Boolean = {
Expand Down
5 changes: 5 additions & 0 deletions spark/src/main/scala/org/apache/comet/Native.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,22 @@ class Native extends NativeBase {
* @return
* the address to native query plan.
*/
// scalastyle:off
@native def createPlan(
id: Long,
iterators: Array[CometBatchIterator],
plan: Array[Byte],
metrics: CometMetricNode,
taskMemoryManager: CometTaskMemoryManager,
batchSize: Int,
use_unified_memory_manager: Boolean,
memory_limit: Long,
memory_fraction: Double,
debug: Boolean,
explain: Boolean,
workerThreads: Int,
blockingThreads: Int): Long
// scalastyle:on

/**
* Execute a native query plan based on given input Arrow arrays.
Expand Down

0 comments on commit 2d698c3

Please sign in to comment.