Skip to content

Commit

Permalink
test: enabling Spark tests with offHeap requirement
Browse files Browse the repository at this point in the history
  • Loading branch information
kazuyukitanimura committed Dec 18, 2024
1 parent dd27c5f commit 8b3d6fa
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 5 deletions.
26 changes: 22 additions & 4 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
metrics_node: JObject,
comet_task_memory_manager_obj: JObject,
batch_size: jint,
use_unified_memory_manager: jboolean,
memory_limit: jlong,
memory_fraction: jdouble,
debug_native: jboolean,
explain_native: jboolean,
worker_threads: jint,
Expand Down Expand Up @@ -147,7 +150,13 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
// We need to keep the session context alive. Some session state like temporary
// dictionaries are stored in session context. If it is dropped, the temporary
// dictionaries will be dropped as well.
let session = prepare_datafusion_session_context(batch_size as usize, task_memory_manager)?;
let session = prepare_datafusion_session_context(
batch_size as usize,
use_unified_memory_manager == 1,
memory_limit as usize,
memory_fraction as f64,
task_memory_manager,
)?;

let plan_creation_time = start.elapsed();

Expand All @@ -174,13 +183,22 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
/// Configure DataFusion session context.
fn prepare_datafusion_session_context(
batch_size: usize,
use_unified_memory_manager: bool,
memory_limit: usize,
memory_fraction: f64,
comet_task_memory_manager: Arc<GlobalRef>,
) -> CometResult<SessionContext> {
let mut rt_config = RuntimeConfig::new().with_disk_manager(DiskManagerConfig::NewOs);

// Set Comet memory pool for native
let memory_pool = CometMemoryPool::new(comet_task_memory_manager);
rt_config = rt_config.with_memory_pool(Arc::new(memory_pool));
// Check if we are using unified memory manager integrated with Spark.
if use_unified_memory_manager {
// Set Comet memory pool for native
let memory_pool = CometMemoryPool::new(comet_task_memory_manager);
rt_config = rt_config.with_memory_pool(Arc::new(memory_pool));
} else {
// Use the memory pool from DF
rt_config = rt_config.with_memory_limit(memory_limit, memory_fraction)
}

// Get Datafusion configuration from Spark Execution context
// can be configured in Comet Spark JVM using Spark --conf parameters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark._
import org.apache.spark.sql.comet.CometMetricNode
import org.apache.spark.sql.vectorized._

import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS}
import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_FRACTION, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS}
import org.apache.comet.vector.NativeUtil

/**
Expand Down Expand Up @@ -60,13 +60,20 @@ class CometExecIterator(
new CometBatchIterator(iterator, nativeUtil)
}.toArray
private val plan = {
val conf = SparkEnv.get.conf
// Only enable unified memory manager when off-heap mode is enabled. Otherwise,
// we'll use the built-in memory pool from DF, and initializes with `memory_limit`
// and `memory_fraction` below.
nativeLib.createPlan(
id,
cometBatchIterators,
protobufQueryPlan,
nativeMetrics,
new CometTaskMemoryManager(id),
batchSize = COMET_BATCH_SIZE.get(),
use_unified_memory_manager = conf.getBoolean("spark.memory.offHeap.enabled", false),
memory_limit = CometSparkSessionExtensions.getCometMemoryOverhead(conf),
memory_fraction = COMET_EXEC_MEMORY_FRACTION.get(),
debug = COMET_DEBUG_ENABLED.get(),
explain = COMET_EXPLAIN_NATIVE_ENABLED.get(),
workerThreads = COMET_WORKER_THREADS.get(),
Expand Down
3 changes: 3 additions & 0 deletions spark/src/main/scala/org/apache/comet/Native.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ class Native extends NativeBase {
metrics: CometMetricNode,
taskMemoryManager: CometTaskMemoryManager,
batchSize: Int,
use_unified_memory_manager: Boolean,
memory_limit: Long,
memory_fraction: Double,
debug: Boolean,
explain: Boolean,
workerThreads: Int,
Expand Down

0 comments on commit 8b3d6fa

Please sign in to comment.