diff --git a/src/query/service/src/spillers/spiller.rs b/src/query/service/src/spillers/spiller.rs index b7f5d702e74e..58f8f3433bb5 100644 --- a/src/query/service/src/spillers/spiller.rs +++ b/src/query/service/src/spillers/spiller.rs @@ -99,11 +99,11 @@ impl Spiller { ) -> Result { let join_spilling_partition_bits = ctx.get_settings().get_join_spilling_partition_bits()?; Ok(Self { - ctx, + ctx: ctx.clone(), operator, config, _spiller_type: spiller_type, - spiller_buffer: SpillerBuffer::create(), + spiller_buffer: SpillerBuffer::create(ctx)?, join_spilling_partition_bits, partition_location: Default::default(), columns_layout: Default::default(), @@ -273,11 +273,11 @@ impl Spiller { &block_row_indexes, row_indexes.len(), ); - if let Some((p_id, block)) = self + if let Some(block) = self .spiller_buffer .add_partition_unspilled_data(*p_id, block)? { - self.spill_with_partition(p_id, block).await?; + self.spill_with_partition(*p_id, block).await?; } } if !left_related_join { @@ -317,7 +317,7 @@ impl Spiller { } pub(crate) fn format_spill_info(&self) -> String { - // Using a single line to print how many bytes have been spilled and how many files have been spiled for each partition. + // Using a single line to print how many bytes have been spilled and how many files have been spilled for each partition. let mut info = String::new(); for (p_id, bytes) in self.partition_spilled_bytes.iter() { // Covert bytes to GB diff --git a/src/query/service/src/spillers/spiller_buffer.rs b/src/query/service/src/spillers/spiller_buffer.rs index 9e64b3429f58..dd263ca2067b 100644 --- a/src/query/service/src/spillers/spiller_buffer.rs +++ b/src/query/service/src/spillers/spiller_buffer.rs @@ -13,64 +13,73 @@ // limitations under the License. use std::collections::HashMap; +use std::sync::Arc; -use databend_common_exception::ErrorCode; +use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use databend_common_expression::DataBlock; +use crate::sessions::QueryContext; + // The spiller buffer will record each partition's unspilled data. -// When the buffer is full(>=8MB), it will pick the partition with the most unspilled data to spill. #[derive(Clone)] pub(crate) struct SpillerBuffer { partition_unspilled_data: HashMap>, - buffer_size: usize, + buffer_size: HashMap, + partition_buffer_threshold: usize, } impl SpillerBuffer { - pub(crate) fn create() -> Self { - SpillerBuffer { + pub(crate) fn create(ctx: Arc) -> Result { + let partition_number = 1_u32 << ctx.get_settings().get_join_spilling_partition_bits()?; + let buffer_threshold = ctx + .get_settings() + .get_join_spilling_buffer_threshold_per_proc()?; + let partition_buffer_threshold = buffer_threshold / partition_number as usize; + Ok(SpillerBuffer { partition_unspilled_data: HashMap::new(), - buffer_size: 0, - } + buffer_size: HashMap::new(), + partition_buffer_threshold, + }) } // Add a partition's unspilled data to the buffer - // The method will check if the buffer is full, if so, it will spill the partition with the most unspilled data - // After spilling, the buffer will be clear the spilled partition's unspilled data + // The method will check if the partition's buffer is full, if so, it will spill the partition // The return value is the partition id and the spilled data pub(crate) fn add_partition_unspilled_data( &mut self, partition_id: u8, data: DataBlock, - ) -> Result> { - fn rows(data: &[DataBlock]) -> usize { - data.iter().map(|block| block.num_rows()).sum() - } + ) -> Result> { let data_size = data.memory_size(); - self.buffer_size += data_size; + self.buffer_size + .entry(partition_id) + .and_modify(|e| *e += data_size) + .or_insert(data_size); self.partition_unspilled_data .entry(partition_id) .or_default() .push(data); - if self.buffer_size >= 8 * 1024 * 1024 { - // Pick the partition with the most unspilled data in `partition_unspilled_data` - let (partition_id, blocks) = self + if *self.buffer_size.get(&partition_id).unwrap() + >= self.partition_buffer_threshold * 1000 * 1000 + { + let blocks = self .partition_unspilled_data - .iter() - .max_by_key(|(_, v)| rows(v)) - .map(|(k, v)| (*k, v.clone())) - .ok_or_else(|| ErrorCode::Internal("No unspilled data in the buffer"))?; + .get_mut(&partition_id) + .unwrap(); debug_assert!(!blocks.is_empty()); - self.partition_unspilled_data.remove(&partition_id); - let merged_block = DataBlock::concat(&blocks)?; - self.buffer_size -= merged_block.memory_size(); - return Ok(Some((partition_id, merged_block))); + let merged_block = DataBlock::concat(blocks)?; + blocks.clear(); + let old_size = self.buffer_size.get_mut(&partition_id).unwrap(); + *old_size = 0; + return Ok(Some(merged_block)); } Ok(None) } pub(crate) fn empty(&self) -> bool { - self.buffer_size == 0 + // Check if each partition's buffer is empty + self.buffer_size.iter().all(|(_, v)| *v == 0) } pub(crate) fn partition_unspilled_data(&self) -> HashMap> { @@ -79,6 +88,6 @@ impl SpillerBuffer { pub(crate) fn reset(&mut self) { self.partition_unspilled_data.clear(); - self.buffer_size = 0; + self.buffer_size.clear(); } } diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 7e9582733900..ea505698670c 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -296,6 +296,12 @@ impl DefaultSettings { mode: SettingMode::Both, range: Some(SettingRange::Numeric(0..=u64::MAX)), }), + ("join_spilling_buffer_threshold_per_proc_mb", DefaultSettingValue { + value: UserSettingValue::UInt64(1024), + desc: "Set the spilling buffer threshold (MB) for each join processor.", + mode: SettingMode::Both, + range: Some(SettingRange::Numeric(0..=u64::MAX)), + }), ("disable_merge_into_join_reorder", DefaultSettingValue { value: UserSettingValue::UInt64(0), desc: "Disable merge into join reorder optimization.", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index df2d9e63d21b..f6a78f2de3af 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -286,6 +286,10 @@ impl Settings { Ok(self.try_get_u64("join_spilling_partition_bits")? as usize) } + pub fn get_join_spilling_buffer_threshold_per_proc(&self) -> Result { + Ok(self.try_get_u64("join_spilling_buffer_threshold_per_proc_mb")? as usize) + } + pub fn get_inlist_to_join_threshold(&self) -> Result { Ok(self.try_get_u64("inlist_to_join_threshold")? as usize) }