Skip to content

Commit

Permalink
upmerge
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Jan 1, 2025
2 parents 7c4e1a8 + 1c08a4b commit 573748a
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 10 deletions.
2 changes: 2 additions & 0 deletions native/core/benches/row_columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use arrow::datatypes::DataType as ArrowDataType;
use comet::execution::shuffle::row::{
process_sorted_row_partition, SparkUnsafeObject, SparkUnsafeRow,
};
use comet::execution::shuffle::CompressionCodec;
use criterion::{criterion_group, criterion_main, Criterion};
use tempfile::Builder;

Expand Down Expand Up @@ -77,6 +78,7 @@ fn benchmark(c: &mut Criterion) {
false,
0,
None,
&CompressionCodec::Zstd(1),
)
.unwrap();
});
Expand Down
2 changes: 1 addition & 1 deletion native/core/benches/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ fn create_shuffle_writer_exec(compression_codec: CompressionCodec) -> ShuffleWri
compression_codec,
"/tmp/data.out".to_string(),
"/tmp/index.out".to_string(),
true
true,
)
.unwrap()
}
Expand Down
17 changes: 16 additions & 1 deletion native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ use jni::{
use tokio::runtime::Runtime;

use crate::execution::operators::ScanExec;
use crate::execution::shuffle::read_ipc_compressed;
use crate::execution::shuffle::{read_ipc_compressed, CompressionCodec};
use crate::execution::spark_plan::SparkPlan;
use log::info;

Expand Down Expand Up @@ -467,6 +467,8 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_writeSortedFileNative
checksum_enabled: jboolean,
checksum_algo: jint,
current_checksum: jlong,
compression_codec: jstring,
compression_level: jint,
) -> jlongArray {
try_unwrap_or_throw(&e, |mut env| unsafe {
let data_types = convert_datatype_arrays(&mut env, serialized_datatypes)?;
Expand Down Expand Up @@ -494,6 +496,18 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_writeSortedFileNative
Some(current_checksum as u32)
};

let compression_codec: String = env
.get_string(&JString::from_raw(compression_codec))
.unwrap()
.into();

let compression_codec = match compression_codec.as_str() {
"zstd" => CompressionCodec::Zstd(compression_level),
"lz4" => CompressionCodec::Lz4Frame,
"snappy" => CompressionCodec::Snappy,
_ => CompressionCodec::Lz4Frame,
};

let (written_bytes, checksum) = process_sorted_row_partition(
row_num,
batch_size as usize,
Expand All @@ -505,6 +519,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_writeSortedFileNative
checksum_enabled,
checksum_algo,
current_checksum,
&compression_codec,
)?;

let checksum = if let Some(checksum) = checksum {
Expand Down
4 changes: 2 additions & 2 deletions native/core/src/execution/shuffle/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3297,6 +3297,7 @@ pub fn process_sorted_row_partition(
// this is the initial checksum for this method, as it also gets updated iteratively
// inside the loop within the method across batches.
initial_checksum: Option<u32>,
codec: &CompressionCodec,
) -> Result<(i64, Option<u32>), CometError> {
// TODO: We can tune this parameter automatically based on row size and cache size.
let row_step = 10;
Expand Down Expand Up @@ -3360,8 +3361,7 @@ pub fn process_sorted_row_partition(
// we do not collect metrics in Native_writeSortedFileNative
let ipc_time = Time::default();
// compression codec is not configurable for CometBypassMergeSortShuffleWriter
let codec = CompressionCodec::Zstd(1);
let block_writer = ShuffleBlockWriter::try_new(batch.schema().as_ref(), false, codec)?;
let block_writer = ShuffleBlockWriter::try_new(batch.schema().as_ref(), false, codec.clone())?;
written += block_writer.write_batch(&batch, &mut cursor, &ipc_time)?;

if let Some(checksum) = &mut current_checksum {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ public final class CometShuffleExternalSorter implements CometShuffleChecksumSup
private final long[] partitionChecksums;

private final String checksumAlgorithm;
private final String compressionCodec;
private final int compressionLevel;

// The memory allocator for this sorter. It is used to allocate/free memory pages for this sorter.
// Because we need to allocate off-heap memory regardless of configured Spark memory mode
Expand Down Expand Up @@ -153,6 +155,8 @@ public CometShuffleExternalSorter(
this.peakMemoryUsedBytes = getMemoryUsage();
this.partitionChecksums = createPartitionChecksums(numPartitions, conf);
this.checksumAlgorithm = getChecksumAlgorithm(conf);
this.compressionCodec = CometConf$.MODULE$.COMET_EXEC_SHUFFLE_COMPRESSION_CODEC().get();
this.compressionLevel = (int) CometConf$.MODULE$.COMET_EXEC_SHUFFLE_COMPRESSION_LEVEL().get();

this.initialSize = initialSize;

Expand Down Expand Up @@ -556,7 +560,9 @@ public void writeSortedFileNative(boolean isLastFile) throws IOException {
spillInfo.file,
rowPartition,
writeMetricsToUse,
preferDictionaryRatio);
preferDictionaryRatio,
compressionCodec,
compressionLevel);
spillInfo.partitionLengths[currentPartition] = written;

// Store the checksum for the current partition.
Expand All @@ -578,7 +584,13 @@ public void writeSortedFileNative(boolean isLastFile) throws IOException {
if (currentPartition != -1) {
long written =
doSpilling(
dataTypes, spillInfo.file, rowPartition, writeMetricsToUse, preferDictionaryRatio);
dataTypes,
spillInfo.file,
rowPartition,
writeMetricsToUse,
preferDictionaryRatio,
compressionCodec,
compressionLevel);
spillInfo.partitionLengths[currentPartition] = written;

synchronized (spills) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ public final class CometDiskBlockWriter {
private long totalWritten = 0L;
private boolean initialized = false;
private final int columnarBatchSize;
private final String compressionCodec;
private final int compressionLevel;
private final boolean isAsync;
private final int asyncThreadNum;
private final ExecutorService threadPool;
Expand Down Expand Up @@ -153,6 +155,8 @@ public final class CometDiskBlockWriter {
this.threadPool = threadPool;

this.columnarBatchSize = (int) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_BATCH_SIZE().get();
this.compressionCodec = CometConf$.MODULE$.COMET_EXEC_SHUFFLE_COMPRESSION_CODEC().get();
this.compressionLevel = (int) CometConf$.MODULE$.COMET_EXEC_SHUFFLE_COMPRESSION_LEVEL().get();

this.numElementsForSpillThreshold =
(int) CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_SPILL_THRESHOLD().get();
Expand Down Expand Up @@ -397,7 +401,14 @@ long doSpilling(boolean isLast) throws IOException {
synchronized (file) {
outputRecords += rowPartition.getNumRows();
written =
doSpilling(dataTypes, file, rowPartition, writeMetricsToUse, preferDictionaryRatio);
doSpilling(
dataTypes,
file,
rowPartition,
writeMetricsToUse,
preferDictionaryRatio,
compressionCodec,
compressionLevel);
}

// Update metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,9 @@ protected long doSpilling(
File file,
RowPartition rowPartition,
ShuffleWriteMetricsReporter writeMetricsToUse,
double preferDictionaryRatio) {
double preferDictionaryRatio,
String compressionCodec,
int compressionLevel) {
long[] addresses = rowPartition.getRowAddresses();
int[] sizes = rowPartition.getRowSizes();

Expand All @@ -190,7 +192,9 @@ protected long doSpilling(
batchSize,
checksumEnabled,
checksumAlgo,
currentChecksum);
currentChecksum,
compressionCodec,
compressionLevel);

long written = results[0];
checksum = results[1];
Expand Down
10 changes: 9 additions & 1 deletion spark/src/main/scala/org/apache/comet/Native.scala
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,14 @@ class Native extends NativeBase {
* @param currentChecksum
* the current checksum of the file. As the checksum is computed incrementally, this is used
* to resume the computation of checksum for previous written data.
* @param compressionCodec
* the compression codec
* @param compressionLevel
* the compression level
* @return
* [the number of bytes written to disk, the checksum]
*/
// scalastyle:off
@native def writeSortedFileNative(
addresses: Array[Long],
rowSizes: Array[Int],
Expand All @@ -130,7 +135,10 @@ class Native extends NativeBase {
batchSize: Int,
checksumEnabled: Boolean,
checksumAlgo: Int,
currentChecksum: Long): Array[Long]
currentChecksum: Long,
compressionCodec: String,
compressionLevel: Int): Array[Long]
// scalastyle:on

/**
* Sorts partition ids of Spark unsafe rows in place. Used by Comet shuffle external sorter.
Expand Down

0 comments on commit 573748a

Please sign in to comment.