Skip to content

Commit

Permalink
actually write output
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Dec 18, 2024
1 parent 0aab2a4 commit 738b30c
Showing 1 changed file with 7 additions and 8 deletions.
15 changes: 7 additions & 8 deletions native/core/src/execution/shuffle/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ use datafusion_physical_expr::EquivalenceProperties;
use futures::executor::block_on;
use futures::{lock::Mutex, Stream, StreamExt, TryFutureExt, TryStreamExt};
use itertools::Itertools;
use lz4::liblz4::BlockChecksum;
use lz4::{BlockSize, ContentChecksum};
use simd_adler32::Adler32;
use std::io::Error;
use std::{
Expand All @@ -65,8 +67,6 @@ use std::{
sync::Arc,
task::{Context, Poll},
};
use lz4::{BlockSize, ContentChecksum};
use lz4::liblz4::BlockChecksum;
use tokio::time::Instant;

/// The status of appending rows to a partition buffer.
Expand Down Expand Up @@ -1571,23 +1571,22 @@ pub(crate) fn write_ipc_compressed<W: Write + Seek>(

let output = match codec {
CompressionCodec::Lz4 => {

// write IPC first without compression
let mut buffer = vec![];
let mut arrow_writer = StreamWriter::try_new(&mut buffer, &batch.schema())?;
arrow_writer.write(batch)?;
arrow_writer.finish()?;
let encoded = arrow_writer.into_inner()?;
let ipc_encoded = arrow_writer.into_inner()?;

let encoder = lz4::EncoderBuilder::new()
.content_size(encoded.len() as u64)
let mut encoder = lz4::EncoderBuilder::new()
.content_size(ipc_encoded.len() as u64)
.checksum(ContentChecksum::ChecksumEnabled)
.block_checksum(BlockChecksum::BlockChecksumEnabled)
.level(4)
.block_size(BlockSize::Default)
.auto_flush(true)
.build(output)?;

.build(&mut *output)?;
encoder.write_all(ipc_encoded.as_slice())?;
let (output, result) = encoder.finish();
result?;
output
Expand Down

0 comments on commit 738b30c

Please sign in to comment.