From 0e5bb68d656aedd46d3d9195c46830bcf4550f52 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 5 Jun 2024 13:11:34 -0700 Subject: [PATCH] Add test --- .../execution/datafusion/shuffle_writer.rs | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/core/src/execution/datafusion/shuffle_writer.rs b/core/src/execution/datafusion/shuffle_writer.rs index e6f66ac242..3b8baa4bf8 100644 --- a/core/src/execution/datafusion/shuffle_writer.rs +++ b/core/src/execution/datafusion/shuffle_writer.rs @@ -1394,6 +1394,11 @@ impl RecordBatchStream for EmptyStream { #[cfg(test)] mod test { use super::*; + use datafusion::physical_plan::common::collect; + use datafusion::physical_plan::memory::MemoryExec; + use datafusion::prelude::SessionContext; + use datafusion_physical_expr::expressions::Column; + use tokio::runtime::Runtime; #[test] fn test_slot_size() { @@ -1422,4 +1427,32 @@ mod test { assert_eq!(slot_size, *expected); }) } + + #[test] + fn test_insert_larger_batch() { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Utf8, true)])); + let mut b = StringBuilder::new(); + for i in 0..10000 { + b.append_value(format!("{i}")); + } + let array = b.finish(); + let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(array)]).unwrap(); + + let mut batches = Vec::new(); + batches.push(batch.clone()); + + let partitions = &[batches]; + let exec = ShuffleWriterExec::try_new( + Arc::new(MemoryExec::try_new(partitions, batch.schema(), None).unwrap()), + Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 16), + "/tmp/data.out".to_string(), + "/tmp/index.out".to_string(), + ) + .unwrap(); + let ctx = SessionContext::new(); + let task_ctx = ctx.task_ctx(); + let stream = exec.execute(0, task_ctx).unwrap(); + let rt = Runtime::new().unwrap(); + rt.block_on(collect(stream)).unwrap(); + } }