diff --git a/src/query/service/src/pipelines/processors/transforms/range_join/range_join_state.rs b/src/query/service/src/pipelines/processors/transforms/range_join/range_join_state.rs index dc29f00d75d5..bc0e38e00080 100644 --- a/src/query/service/src/pipelines/processors/transforms/range_join/range_join_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/range_join/range_join_state.rs @@ -17,6 +17,7 @@ use std::sync::atomic::AtomicU64; use std::sync::Arc; use common_base::base::tokio::sync::Notify; +use common_catalog::table_context::TableContext; use common_exception::Result; use common_expression::types::DataType; use common_expression::types::NumberDataType; @@ -171,8 +172,29 @@ impl RangeJoinState { } pub(crate) fn partition(&self) -> Result<()> { + let max_threads = self.ctx.get_settings().get_max_threads()? as usize; let left_table = self.left_table.read(); - let right_table = self.right_table.read(); + // Right table is bigger than left table + let mut right_table = self.right_table.write(); + if !left_table.is_empty() + && !right_table.is_empty() + && left_table.len() * right_table.len() < max_threads + { + let num_parts = max_threads / left_table.len() + 1; + // Spit right_table to num_parts equally + let merged_right_table = DataBlock::concat(&right_table)?; + let mut indices = Vec::with_capacity(merged_right_table.num_rows()); + for idx in 0..merged_right_table.num_rows() { + indices.push((idx % num_parts) as u32); + } + let scatter_blocks = DataBlock::scatter(&merged_right_table, &indices, num_parts)?; + right_table.clear(); + for block in scatter_blocks.iter() { + if !block.is_empty() { + right_table.push(block.clone()); + } + } + } let mut left_sorted_blocks = self.left_sorted_blocks.write(); let mut right_sorted_blocks = self.right_sorted_blocks.write();