diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index e0501e5bf..91ea4c926 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -53,6 +53,8 @@ use datafusion_physical_expr::EquivalenceProperties; use futures::executor::block_on; use futures::{lock::Mutex, Stream, StreamExt, TryFutureExt, TryStreamExt}; use itertools::Itertools; +use lz4::liblz4::BlockChecksum; +use lz4::{BlockSize, ContentChecksum}; use simd_adler32::Adler32; use std::io::Error; use std::{ @@ -65,8 +67,6 @@ use std::{ sync::Arc, task::{Context, Poll}, }; -use lz4::{BlockSize, ContentChecksum}; -use lz4::liblz4::BlockChecksum; use tokio::time::Instant; /// The status of appending rows to a partition buffer. @@ -1571,23 +1571,22 @@ pub(crate) fn write_ipc_compressed( let output = match codec { CompressionCodec::Lz4 => { - // write IPC first without compression let mut buffer = vec![]; let mut arrow_writer = StreamWriter::try_new(&mut buffer, &batch.schema())?; arrow_writer.write(batch)?; arrow_writer.finish()?; - let encoded = arrow_writer.into_inner()?; + let ipc_encoded = arrow_writer.into_inner()?; - let encoder = lz4::EncoderBuilder::new() - .content_size(encoded.len() as u64) + let mut encoder = lz4::EncoderBuilder::new() + .content_size(ipc_encoded.len() as u64) .checksum(ContentChecksum::ChecksumEnabled) .block_checksum(BlockChecksum::BlockChecksumEnabled) .level(4) .block_size(BlockSize::Default) .auto_flush(true) - .build(output)?; - + .build(&mut *output)?; + encoder.write_all(ipc_encoded.as_slice())?; let (output, result) = encoder.finish(); result?; output