diff --git a/docs/source/user-guide/tuning.md b/docs/source/user-guide/tuning.md index e0d7c1bc9..af722494f 100644 --- a/docs/source/user-guide/tuning.md +++ b/docs/source/user-guide/tuning.md @@ -103,7 +103,9 @@ native shuffle currently only supports `HashPartitioning` and `SinglePartitionin To enable native shuffle, set `spark.comet.exec.shuffle.mode` to `native`. If this mode is explicitly set, then any shuffle operations that cannot be supported in this mode will fall back to Spark. -## Metrics +## Metrics + +### Spark SQL Metrics Some Comet metrics are not directly comparable to Spark metrics in some cases: @@ -111,10 +113,17 @@ Some Comet metrics are not directly comparable to Spark metrics in some cases: milliseconds _per batch_ which can result in a large loss of precision, making it difficult to compare scan times between Spark and Comet. -Comet also adds some custom metrics: +### Native Metrics + +Setting `spark.comet.explain.native.enabled=true` will cause native plans to be logged in each executor. Metrics are +logged for each native plan (and there is one plan per task, so this is very verbose). + +Here is a guide to some of the native metrics. -### ShuffleWriterExec +### ScanExec -| Metric | Description | -| ---------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| `jvm_fetch_time` | Measure the time it takes for `ShuffleWriterExec` to fetch batches from the JVM. Note that this does not include the execution time of the query that produced the input batches. | +| Metric | Description | +| ----------------- | --------------------------------------------------------------------------------------------------- | +| `elapsed_compute` | Total time spent in this operator, fetching batches from a JVM iterator. | +| `jvm_fetch_time` | Time spent in the JVM fetching input batches to be read by this `ScanExec` instance. | +| `arrow_ffi_time` | Time spent using Arrow FFI to create Arrow batches from the memory addresses returned from the JVM. | diff --git a/native/core/src/common/mod.rs b/native/core/src/common/mod.rs index 1b7dfad28..dc539879f 100644 --- a/native/core/src/common/mod.rs +++ b/native/core/src/common/mod.rs @@ -17,23 +17,5 @@ #[macro_use] pub mod bit; - -use crate::TypeTrait; - -/// Getter APIs for Comet vectors. -trait ValueGetter { - /// Gets the non-null value at `idx`. - /// - /// Note that null check needs to be done before the call, to ensure the value at `idx` is - /// not null. - fn value(&self, idx: usize) -> T::Native; -} - -/// Setter APIs for Comet mutable vectors. -trait ValueSetter { - /// Appends a non-null value `v` to the end of this vector. - fn append_value(&mut self, v: &T::Native); -} - mod buffer; pub use buffer::*; diff --git a/native/core/src/data_type.rs b/native/core/src/data_type.rs deleted file mode 100644 index b275de1c6..000000000 --- a/native/core/src/data_type.rs +++ /dev/null @@ -1,241 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use arrow::datatypes::DataType as ArrowDataType; -use arrow_schema::TimeUnit; -use std::{cmp, fmt::Debug}; - -#[derive(Debug, PartialEq)] -pub enum DataType { - Boolean, - Byte, - Short, - Integer, - Long, - Float, - Double, - Decimal(u8, i8), - String, - Binary, - Timestamp, - Date, -} - -impl From<&ArrowDataType> for DataType { - fn from(dt: &ArrowDataType) -> Self { - match dt { - ArrowDataType::Boolean => DataType::Boolean, - ArrowDataType::Int8 => DataType::Byte, - ArrowDataType::Int16 => DataType::Short, - ArrowDataType::Int32 => DataType::Integer, - ArrowDataType::Int64 => DataType::Long, - ArrowDataType::Float32 => DataType::Float, - ArrowDataType::Float64 => DataType::Double, - ArrowDataType::Decimal128(precision, scale) => DataType::Decimal(*precision, *scale), - ArrowDataType::Utf8 => DataType::String, - ArrowDataType::Binary => DataType::Binary, - // Spark always store timestamp in micro seconds - ArrowDataType::Timestamp(TimeUnit::Microsecond, _) => DataType::Timestamp, - ArrowDataType::Date32 => DataType::Date, - ArrowDataType::Dictionary(key_dt, value_dt) if is_valid_key_type(key_dt) => { - Self::from(value_dt.as_ref()) - } - dt => panic!("unsupported Arrow data type: {:?}", dt), - } - } -} - -impl DataType { - pub fn kind(&self) -> TypeKind { - match self { - DataType::Boolean => TypeKind::Boolean, - DataType::Byte => TypeKind::Byte, - DataType::Short => TypeKind::Short, - DataType::Integer => TypeKind::Integer, - DataType::Long => TypeKind::Long, - DataType::Float => TypeKind::Float, - DataType::Double => TypeKind::Double, - DataType::Decimal(_, _) => TypeKind::Decimal, - DataType::String => TypeKind::String, - DataType::Binary => TypeKind::Binary, - DataType::Timestamp => TypeKind::Timestamp, - DataType::Date => TypeKind::Date, - } - } -} - -/// Comet only use i32 as dictionary key -fn is_valid_key_type(dt: &ArrowDataType) -> bool { - matches!(dt, ArrowDataType::Int32) -} - -/// Unlike [`DataType`], [`TypeKind`] doesn't carry extra information about the type itself, such as -/// decimal precision & scale. Instead, it is merely a token that is used to do runtime case -/// analysis depending on the actual type. It can be obtained from a `TypeTrait` generic parameter. -#[derive(Debug, PartialEq)] -pub enum TypeKind { - Boolean, - Byte, - Short, - Integer, - Long, - Float, - Double, - Decimal, - String, - Binary, - Timestamp, - Date, -} - -pub const BITS_PER_BYTE: usize = 8; - -impl TypeKind { - /// Returns the size of this type, in number of bits. - pub fn type_size(&self) -> usize { - match self { - TypeKind::Boolean => 1, - TypeKind::Byte => BITS_PER_BYTE, - TypeKind::Short => BITS_PER_BYTE * 2, - TypeKind::Integer | TypeKind::Float => BITS_PER_BYTE * 4, - TypeKind::Long | TypeKind::Double => BITS_PER_BYTE * 8, - TypeKind::Decimal => BITS_PER_BYTE * 16, - TypeKind::String | TypeKind::Binary => BITS_PER_BYTE * 16, - TypeKind::Timestamp => BITS_PER_BYTE * 8, - TypeKind::Date => BITS_PER_BYTE * 4, - } - } -} - -pub const STRING_VIEW_LEN: usize = 16; // StringView is stored using 16 bytes -pub const STRING_VIEW_PREFIX_LEN: usize = 4; // String prefix in StringView is stored using 4 bytes - -#[repr(C, align(16))] -#[derive(Clone, Copy, Debug)] -pub struct StringView { - pub len: u32, - pub prefix: [u8; STRING_VIEW_PREFIX_LEN], - pub ptr: usize, -} - -impl StringView { - pub fn as_utf8_str(&self) -> &str { - unsafe { - let slice = std::slice::from_raw_parts(self.ptr as *const u8, self.len as usize); - std::str::from_utf8_unchecked(slice) - } - } -} - -impl Default for StringView { - fn default() -> Self { - Self { - len: 0, - prefix: [0; STRING_VIEW_PREFIX_LEN], - ptr: 0, - } - } -} - -impl PartialEq for StringView { - fn eq(&self, other: &Self) -> bool { - if self.len != other.len { - return false; - } - if self.prefix != other.prefix { - return false; - } - self.as_utf8_str() == other.as_utf8_str() - } -} - -pub trait NativeEqual { - fn is_equal(&self, other: &Self) -> bool; -} - -macro_rules! make_native_equal { - ($native_ty:ty) => { - impl NativeEqual for $native_ty { - fn is_equal(&self, other: &Self) -> bool { - self == other - } - } - }; -} - -make_native_equal!(bool); -make_native_equal!(i8); -make_native_equal!(i16); -make_native_equal!(i32); -make_native_equal!(i64); -make_native_equal!(i128); -make_native_equal!(StringView); - -impl NativeEqual for f32 { - fn is_equal(&self, other: &Self) -> bool { - self.total_cmp(other) == cmp::Ordering::Equal - } -} - -impl NativeEqual for f64 { - fn is_equal(&self, other: &Self) -> bool { - self.total_cmp(other) == cmp::Ordering::Equal - } -} -pub trait NativeType: Debug + Default + Copy + NativeEqual {} - -impl NativeType for bool {} -impl NativeType for i8 {} -impl NativeType for i16 {} -impl NativeType for i32 {} -impl NativeType for i64 {} -impl NativeType for i128 {} -impl NativeType for f32 {} -impl NativeType for f64 {} -impl NativeType for StringView {} - -/// A trait for Comet data type. This should only be used as generic parameter during method -/// invocations. -pub trait TypeTrait: 'static { - type Native: NativeType; - fn type_kind() -> TypeKind; -} - -macro_rules! make_type_trait { - ($name:ident, $native_ty:ty, $kind:path) => { - pub struct $name {} - impl TypeTrait for $name { - type Native = $native_ty; - fn type_kind() -> TypeKind { - $kind - } - } - }; -} - -make_type_trait!(BoolType, bool, TypeKind::Boolean); -make_type_trait!(ByteType, i8, TypeKind::Byte); -make_type_trait!(ShortType, i16, TypeKind::Short); -make_type_trait!(IntegerType, i32, TypeKind::Integer); -make_type_trait!(LongType, i64, TypeKind::Long); -make_type_trait!(FloatType, f32, TypeKind::Float); -make_type_trait!(DoubleType, f64, TypeKind::Double); -make_type_trait!(DecimalType, i128, TypeKind::Decimal); -make_type_trait!(StringType, StringView, TypeKind::String); -make_type_trait!(BinaryType, StringView, TypeKind::Binary); -make_type_trait!(TimestampType, i64, TypeKind::Timestamp); -make_type_trait!(DateType, i32, TypeKind::Date); diff --git a/native/core/src/execution/datafusion/shuffle_writer.rs b/native/core/src/execution/datafusion/shuffle_writer.rs index c79eeeb4a..7587ff06d 100644 --- a/native/core/src/execution/datafusion/shuffle_writer.rs +++ b/native/core/src/execution/datafusion/shuffle_writer.rs @@ -139,7 +139,6 @@ impl ExecutionPlan for ShuffleWriterExec { ) -> Result { let input = self.input.execute(partition, Arc::clone(&context))?; let metrics = ShuffleRepartitionerMetrics::new(&self.metrics, 0); - let jvm_fetch_time = MetricBuilder::new(&self.metrics).subset_time("jvm_fetch_time", 0); Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), @@ -152,7 +151,6 @@ impl ExecutionPlan for ShuffleWriterExec { self.partitioning.clone(), metrics, context, - jvm_fetch_time, ) .map_err(|e| ArrowError::ExternalError(Box::new(e))), ) @@ -1085,7 +1083,6 @@ impl Debug for ShuffleRepartitioner { } } -#[allow(clippy::too_many_arguments)] async fn external_shuffle( mut input: SendableRecordBatchStream, partition_id: usize, @@ -1094,7 +1091,6 @@ async fn external_shuffle( partitioning: Partitioning, metrics: ShuffleRepartitionerMetrics, context: Arc, - jvm_fetch_time: Time, ) -> Result { let schema = input.schema(); let mut repartitioner = ShuffleRepartitioner::new( @@ -1108,23 +1104,13 @@ async fn external_shuffle( context.session_config().batch_size(), ); - loop { - let mut timer = jvm_fetch_time.timer(); - let b = input.next().await; - timer.stop(); - - match b { - Some(batch_result) => { - // Block on the repartitioner to insert the batch and shuffle the rows - // into the corresponding partition buffer. - // Otherwise, pull the next batch from the input stream might overwrite the - // current batch in the repartitioner. - block_on(repartitioner.insert_batch(batch_result?))?; - } - _ => break, - } + while let Some(batch) = input.next().await { + // Block on the repartitioner to insert the batch and shuffle the rows + // into the corresponding partition buffer. + // Otherwise, pull the next batch from the input stream might overwrite the + // current batch in the repartitioner. + block_on(repartitioner.insert_batch(batch?))?; } - repartitioner.shuffle_write().await } diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index 2cb8a84d9..a97caf0db 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -77,6 +77,10 @@ pub struct ScanExec { metrics: ExecutionPlanMetricsSet, /// Baseline metrics baseline_metrics: BaselineMetrics, + /// Time waiting for JVM input plan to execute and return batches + jvm_fetch_time: Time, + /// Time spent in FFI + arrow_ffi_time: Time, } impl ScanExec { @@ -88,6 +92,8 @@ impl ScanExec { ) -> Result { let metrics_set = ExecutionPlanMetricsSet::default(); let baseline_metrics = BaselineMetrics::new(&metrics_set, 0); + let arrow_ffi_time = MetricBuilder::new(&metrics_set).subset_time("arrow_ffi_time", 0); + let jvm_fetch_time = MetricBuilder::new(&metrics_set).subset_time("jvm_fetch_time", 0); // Scan's schema is determined by the input batch, so we need to set it before execution. // Note that we determine if arrays are dictionary-encoded based on the @@ -97,8 +103,13 @@ impl ScanExec { // Dictionary-encoded primitive arrays are always unpacked. let first_batch = if let Some(input_source) = input_source.as_ref() { let mut timer = baseline_metrics.elapsed_compute().timer(); - let batch = - ScanExec::get_next(exec_context_id, input_source.as_obj(), data_types.len())?; + let batch = ScanExec::get_next( + exec_context_id, + input_source.as_obj(), + data_types.len(), + &jvm_fetch_time, + &arrow_ffi_time, + )?; timer.stop(); batch } else { @@ -124,6 +135,8 @@ impl ScanExec { cache, metrics: metrics_set, baseline_metrics, + jvm_fetch_time, + arrow_ffi_time, schema, }) } @@ -171,6 +184,8 @@ impl ScanExec { self.exec_context_id, self.input_source.as_ref().unwrap().as_obj(), self.data_types.len(), + &self.jvm_fetch_time, + &self.arrow_ffi_time, )?; *current_batch = Some(next_batch); } @@ -185,6 +200,8 @@ impl ScanExec { exec_context_id: i64, iter: &JObject, num_cols: usize, + jvm_fetch_time: &Time, + arrow_ffi_time: &Time, ) -> Result { if exec_context_id == TEST_EXEC_CONTEXT_ID { // This is a unit test. We don't need to call JNI. @@ -200,6 +217,21 @@ impl ScanExec { let mut env = JVMClasses::get_env()?; + let mut timer = jvm_fetch_time.timer(); + + let num_rows: i32 = unsafe { + jni_call!(&mut env, + comet_batch_iterator(iter).has_next() -> i32)? + }; + + timer.stop(); + + if num_rows == -1 { + return Ok(InputBatch::EOF); + } + + let mut timer = arrow_ffi_time.timer(); + let mut array_addrs = Vec::with_capacity(num_cols); let mut schema_addrs = Vec::with_capacity(num_cols); @@ -233,9 +265,9 @@ impl ScanExec { comet_batch_iterator(iter).next(array_obj, schema_obj) -> i32)? }; - if num_rows == -1 { - return Ok(InputBatch::EOF); - } + // we already checked for end of results on call to has_next() so should always + // have a valid row count when calling next() + assert!(num_rows != -1); let mut inputs: Vec = Vec::with_capacity(num_cols); @@ -255,6 +287,8 @@ impl ScanExec { } } + timer.stop(); + Ok(InputBatch::new(inputs, Some(num_rows as usize))) } } diff --git a/native/core/src/jvm_bridge/batch_iterator.rs b/native/core/src/jvm_bridge/batch_iterator.rs index 4870624d2..45b10cf20 100644 --- a/native/core/src/jvm_bridge/batch_iterator.rs +++ b/native/core/src/jvm_bridge/batch_iterator.rs @@ -26,6 +26,8 @@ use jni::{ /// A struct that holds all the JNI methods and fields for JVM `CometBatchIterator` class. pub struct CometBatchIterator<'a> { pub class: JClass<'a>, + pub method_has_next: JMethodID, + pub method_has_next_ret: ReturnType, pub method_next: JMethodID, pub method_next_ret: ReturnType, } @@ -38,6 +40,8 @@ impl<'a> CometBatchIterator<'a> { Ok(CometBatchIterator { class, + method_has_next: env.get_method_id(Self::JVM_CLASS, "hasNext", "()I")?, + method_has_next_ret: ReturnType::Primitive(Primitive::Int), method_next: env.get_method_id(Self::JVM_CLASS, "next", "([J[J)I")?, method_next_ret: ReturnType::Primitive(Primitive::Int), }) diff --git a/native/core/src/lib.rs b/native/core/src/lib.rs index 36e63e39c..c6a7a4143 100644 --- a/native/core/src/lib.rs +++ b/native/core/src/lib.rs @@ -40,15 +40,12 @@ use log4rs::{ use mimalloc::MiMalloc; use once_cell::sync::OnceCell; -pub use data_type::*; - use errors::{try_unwrap_or_throw, CometError, CometResult}; #[macro_use] mod errors; #[macro_use] pub mod common; -mod data_type; pub mod execution; mod jvm_bridge; pub mod parquet; diff --git a/spark/src/main/java/org/apache/comet/CometBatchIterator.java b/spark/src/main/java/org/apache/comet/CometBatchIterator.java index accd57c20..e05bea1df 100644 --- a/spark/src/main/java/org/apache/comet/CometBatchIterator.java +++ b/spark/src/main/java/org/apache/comet/CometBatchIterator.java @@ -33,12 +33,31 @@ public class CometBatchIterator { final Iterator input; final NativeUtil nativeUtil; + private ColumnarBatch currentBatch = null; CometBatchIterator(Iterator input, NativeUtil nativeUtil) { this.input = input; this.nativeUtil = nativeUtil; } + /** + * Fetch the next input batch. + * + * @return Number of rows in next batch or -1 if no batches left. + */ + public int hasNext() { + if (currentBatch == null) { + if (input.hasNext()) { + currentBatch = input.next(); + } + } + if (currentBatch == null) { + return -1; + } else { + return currentBatch.numRows(); + } + } + /** * Get the next batches of Arrow arrays. * @@ -47,12 +66,11 @@ public class CometBatchIterator { * @return the number of rows of the current batch. -1 if there is no more batch. */ public int next(long[] arrayAddrs, long[] schemaAddrs) { - boolean hasBatch = input.hasNext(); - - if (!hasBatch) { + if (currentBatch == null) { return -1; } - - return nativeUtil.exportBatch(arrayAddrs, schemaAddrs, input.next()); + int numRows = nativeUtil.exportBatch(arrayAddrs, schemaAddrs, currentBatch); + currentBatch = null; + return numRows; } } diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 2bb467af5..b33f6b5a6 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2909,7 +2909,12 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim case op if isCometSink(op) && op.output.forall(a => supportedDataType(a.dataType, true)) => // These operators are source of Comet native execution chain val scanBuilder = OperatorOuterClass.Scan.newBuilder() - scanBuilder.setSource(op.simpleStringWithNodeId()) + val source = op.simpleStringWithNodeId() + if (source.isEmpty) { + scanBuilder.setSource(op.getClass.getSimpleName) + } else { + scanBuilder.setSource(source) + } val scanTypes = op.output.flatten { attr => serializeDataType(attr.dataType) diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/CometExecUtils.scala b/spark/src/main/scala/org/apache/spark/sql/comet/CometExecUtils.scala index 9698dc98b..2fc73bb7c 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/CometExecUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/CometExecUtils.scala @@ -88,7 +88,7 @@ object CometExecUtils { * child partition */ def getLimitNativePlan(outputAttributes: Seq[Attribute], limit: Int): Option[Operator] = { - val scanBuilder = OperatorOuterClass.Scan.newBuilder() + val scanBuilder = OperatorOuterClass.Scan.newBuilder().setSource("LimitInput") val scanOpBuilder = OperatorOuterClass.Operator.newBuilder() val scanTypes = outputAttributes.flatten { attr => @@ -118,7 +118,7 @@ object CometExecUtils { sortOrder: Seq[SortOrder], child: SparkPlan, limit: Int): Option[Operator] = { - val scanBuilder = OperatorOuterClass.Scan.newBuilder() + val scanBuilder = OperatorOuterClass.Scan.newBuilder().setSource("TopKInput") val scanOpBuilder = OperatorOuterClass.Operator.newBuilder() val scanTypes = outputAttributes.flatten { attr => diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala index a7a33c40d..b1dd9ac83 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala @@ -77,9 +77,6 @@ case class CometShuffleExchangeExec( SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext) override lazy val metrics: Map[String, SQLMetric] = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), - "jvm_fetch_time" -> SQLMetrics.createNanoTimingMetric( - sparkContext, - "time fetching batches from JVM"), "numPartitions" -> SQLMetrics.createMetric( sparkContext, "number of partitions")) ++ readMetrics ++ writeMetrics @@ -485,14 +482,7 @@ class CometShuffleWriteProcessor( "output_rows" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN), "data_size" -> metrics("dataSize"), "elapsed_compute" -> metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)) - - val nativeMetrics = if (metrics.contains("jvm_fetch_time")) { - CometMetricNode( - nativeSQLMetrics ++ Map("jvm_fetch_time" -> - metrics("jvm_fetch_time"))) - } else { - CometMetricNode(nativeSQLMetrics) - } + val nativeMetrics = CometMetricNode(nativeSQLMetrics) // Getting rid of the fake partitionId val newInputs = inputs.asInstanceOf[Iterator[_ <: Product2[Any, Any]]].map(_._2) @@ -538,7 +528,7 @@ class CometShuffleWriteProcessor( } def getNativePlan(dataFile: String, indexFile: String): Operator = { - val scanBuilder = OperatorOuterClass.Scan.newBuilder() + val scanBuilder = OperatorOuterClass.Scan.newBuilder().setSource("ShuffleWriterInput") val opBuilder = OperatorOuterClass.Operator.newBuilder() val scanTypes = outputAttributes.flatten { attr =>