From cab4a6ad9cbc1be23c364795ac8b62c07060b445 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 30 May 2024 15:07:14 -0600 Subject: [PATCH 1/3] coalesce batches after scan --- core/src/execution/datafusion/planner.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/execution/datafusion/planner.rs b/core/src/execution/datafusion/planner.rs index 7a37e3aae..3bf81b386 100644 --- a/core/src/execution/datafusion/planner.rs +++ b/core/src/execution/datafusion/planner.rs @@ -39,6 +39,7 @@ use datafusion::{ }, physical_plan::{ aggregates::{AggregateMode as DFAggregateMode, PhysicalGroupBy}, + coalesce_batches::CoalesceBatchesExec, filter::FilterExec, joins::{utils::JoinFilter, HashJoinExec, PartitionMode, SortMergeJoinExec}, limit::LocalLimitExec, @@ -725,7 +726,11 @@ impl PhysicalPlanner { .map(|r| (r, format!("col_{}", idx))) }) .collect(); - Ok((scans, Arc::new(ProjectionExec::try_new(exprs?, child)?))) + + let projection = Arc::new(ProjectionExec::try_new(exprs?, child)?); + let coalesced: Arc = + Arc::new(CoalesceBatchesExec::new(projection, 4196)); + Ok((scans, coalesced)) } OpStruct::Filter(filter) => { assert!(children.len() == 1); From 312562e82b67ea091e8245978381d6b52515a572 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 4 Jun 2024 16:49:32 -0600 Subject: [PATCH 2/3] add copyexec --- core/src/execution/datafusion/planner.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/execution/datafusion/planner.rs b/core/src/execution/datafusion/planner.rs index 3bf81b386..b4843704f 100644 --- a/core/src/execution/datafusion/planner.rs +++ b/core/src/execution/datafusion/planner.rs @@ -728,8 +728,12 @@ impl PhysicalPlanner { .collect(); let projection = Arc::new(ProjectionExec::try_new(exprs?, child)?); + // the scan reuses batches so we need to copy them before coalescing ... if + // this approach looks like it has benefit then we may want to build a custom + // version of CoalesceBatchesExec that copies when needed + let copied = Arc::new(CopyExec::new(projection)); let coalesced: Arc = - Arc::new(CoalesceBatchesExec::new(projection, 4196)); + Arc::new(CoalesceBatchesExec::new(copied, 4196)); Ok((scans, coalesced)) } OpStruct::Filter(filter) => { From f076e96388fb038e3444b84719874d0c004ccf84 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 4 Jun 2024 18:10:01 -0600 Subject: [PATCH 3/3] save --- core/src/execution/datafusion/planner.rs | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/core/src/execution/datafusion/planner.rs b/core/src/execution/datafusion/planner.rs index 6e1e68b61..dd476205b 100644 --- a/core/src/execution/datafusion/planner.rs +++ b/core/src/execution/datafusion/planner.rs @@ -731,8 +731,24 @@ impl PhysicalPlanner { // this approach looks like it has benefit then we may want to build a custom // version of CoalesceBatchesExec that copies when needed let copied = Arc::new(CopyExec::new(projection)); + + // keep this small to avoid error: + let target_batch_size = 1024; + + /* + org.apache.comet.CometNativeException: range end index 8504 out of range for slice of length 8192 + at comet::errors::init::{{closure}}(__internal__:0) + at std::panicking::rust_panic_with_hook(__internal__:0) + at std::panicking::begin_panic_handler::{{closure}}(__internal__:0) + at std::sys_common::backtrace::__rust_end_short_backtrace(__internal__:0) + at rust_begin_unwind(__internal__:0) + at core::panicking::panic_fmt(__internal__:0) + at core::slice::index::slice_end_index_len_fail(__internal__:0) + at comet::execution::datafusion::shuffle_writer::external_shuffle::{{closure}}(__internal__:0) + */ + let coalesced: Arc = - Arc::new(CoalesceBatchesExec::new(copied, 4196)); + Arc::new(CoalesceBatchesExec::new(copied, target_batch_size)); Ok((scans, coalesced)) } OpStruct::Filter(filter) => {