Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into spark-4.0-more-test
Browse files Browse the repository at this point in the history
  • Loading branch information
kazuyukitanimura committed Dec 6, 2024
2 parents 6817994 + 587c29b commit 5b90a61
Show file tree
Hide file tree
Showing 11 changed files with 97 additions and 313 deletions.
21 changes: 15 additions & 6 deletions docs/source/user-guide/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,18 +103,27 @@ native shuffle currently only supports `HashPartitioning` and `SinglePartitionin
To enable native shuffle, set `spark.comet.exec.shuffle.mode` to `native`. If this mode is explicitly set,
then any shuffle operations that cannot be supported in this mode will fall back to Spark.

## Metrics
## Metrics

### Spark SQL Metrics

Some Comet metrics are not directly comparable to Spark metrics in some cases:

- `CometScanExec` uses nanoseconds for total scan time. Spark also measures scan time in nanoseconds but converts to
milliseconds _per batch_ which can result in a large loss of precision, making it difficult to compare scan times
between Spark and Comet.

Comet also adds some custom metrics:
### Native Metrics

Setting `spark.comet.explain.native.enabled=true` will cause native plans to be logged in each executor. Metrics are
logged for each native plan (and there is one plan per task, so this is very verbose).

Here is a guide to some of the native metrics.

### ShuffleWriterExec
### ScanExec

| Metric | Description |
| ---------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `jvm_fetch_time` | Measure the time it takes for `ShuffleWriterExec` to fetch batches from the JVM. Note that this does not include the execution time of the query that produced the input batches. |
| Metric | Description |
| ----------------- | --------------------------------------------------------------------------------------------------- |
| `elapsed_compute` | Total time spent in this operator, fetching batches from a JVM iterator. |
| `jvm_fetch_time` | Time spent in the JVM fetching input batches to be read by this `ScanExec` instance. |
| `arrow_ffi_time` | Time spent using Arrow FFI to create Arrow batches from the memory addresses returned from the JVM. |
18 changes: 0 additions & 18 deletions native/core/src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,5 @@

#[macro_use]
pub mod bit;

use crate::TypeTrait;

/// Getter APIs for Comet vectors.
trait ValueGetter<T: TypeTrait> {
/// Gets the non-null value at `idx`.
///
/// Note that null check needs to be done before the call, to ensure the value at `idx` is
/// not null.
fn value(&self, idx: usize) -> T::Native;
}

/// Setter APIs for Comet mutable vectors.
trait ValueSetter<T: TypeTrait> {
/// Appends a non-null value `v` to the end of this vector.
fn append_value(&mut self, v: &T::Native);
}

mod buffer;
pub use buffer::*;
241 changes: 0 additions & 241 deletions native/core/src/data_type.rs

This file was deleted.

26 changes: 6 additions & 20 deletions native/core/src/execution/datafusion/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ impl ExecutionPlan for ShuffleWriterExec {
) -> Result<SendableRecordBatchStream> {
let input = self.input.execute(partition, Arc::clone(&context))?;
let metrics = ShuffleRepartitionerMetrics::new(&self.metrics, 0);
let jvm_fetch_time = MetricBuilder::new(&self.metrics).subset_time("jvm_fetch_time", 0);

Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
Expand All @@ -152,7 +151,6 @@ impl ExecutionPlan for ShuffleWriterExec {
self.partitioning.clone(),
metrics,
context,
jvm_fetch_time,
)
.map_err(|e| ArrowError::ExternalError(Box::new(e))),
)
Expand Down Expand Up @@ -1085,7 +1083,6 @@ impl Debug for ShuffleRepartitioner {
}
}

#[allow(clippy::too_many_arguments)]
async fn external_shuffle(
mut input: SendableRecordBatchStream,
partition_id: usize,
Expand All @@ -1094,7 +1091,6 @@ async fn external_shuffle(
partitioning: Partitioning,
metrics: ShuffleRepartitionerMetrics,
context: Arc<TaskContext>,
jvm_fetch_time: Time,
) -> Result<SendableRecordBatchStream> {
let schema = input.schema();
let mut repartitioner = ShuffleRepartitioner::new(
Expand All @@ -1108,23 +1104,13 @@ async fn external_shuffle(
context.session_config().batch_size(),
);

loop {
let mut timer = jvm_fetch_time.timer();
let b = input.next().await;
timer.stop();

match b {
Some(batch_result) => {
// Block on the repartitioner to insert the batch and shuffle the rows
// into the corresponding partition buffer.
// Otherwise, pull the next batch from the input stream might overwrite the
// current batch in the repartitioner.
block_on(repartitioner.insert_batch(batch_result?))?;
}
_ => break,
}
while let Some(batch) = input.next().await {
// Block on the repartitioner to insert the batch and shuffle the rows
// into the corresponding partition buffer.
// Otherwise, pull the next batch from the input stream might overwrite the
// current batch in the repartitioner.
block_on(repartitioner.insert_batch(batch?))?;
}

repartitioner.shuffle_write().await
}

Expand Down
Loading

0 comments on commit 5b90a61

Please sign in to comment.