Skip to content

Commit

Permalink
chore: improve spiller buffer (#15904)
Browse files Browse the repository at this point in the history
chore: improve spill buffer
  • Loading branch information
xudong963 authored Jun 26, 2024
1 parent ccd7a61 commit 17355f4
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 32 deletions.
10 changes: 5 additions & 5 deletions src/query/service/src/spillers/spiller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@ impl Spiller {
) -> Result<Self> {
let join_spilling_partition_bits = ctx.get_settings().get_join_spilling_partition_bits()?;
Ok(Self {
ctx,
ctx: ctx.clone(),
operator,
config,
_spiller_type: spiller_type,
spiller_buffer: SpillerBuffer::create(),
spiller_buffer: SpillerBuffer::create(ctx)?,
join_spilling_partition_bits,
partition_location: Default::default(),
columns_layout: Default::default(),
Expand Down Expand Up @@ -273,11 +273,11 @@ impl Spiller {
&block_row_indexes,
row_indexes.len(),
);
if let Some((p_id, block)) = self
if let Some(block) = self
.spiller_buffer
.add_partition_unspilled_data(*p_id, block)?
{
self.spill_with_partition(p_id, block).await?;
self.spill_with_partition(*p_id, block).await?;
}
}
if !left_related_join {
Expand Down Expand Up @@ -317,7 +317,7 @@ impl Spiller {
}

pub(crate) fn format_spill_info(&self) -> String {
// Using a single line to print how many bytes have been spilled and how many files have been spiled for each partition.
// Using a single line to print how many bytes have been spilled and how many files have been spilled for each partition.
let mut info = String::new();
for (p_id, bytes) in self.partition_spilled_bytes.iter() {
// Covert bytes to GB
Expand Down
63 changes: 36 additions & 27 deletions src/query/service/src/spillers/spiller_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,64 +13,73 @@
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use databend_common_exception::ErrorCode;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_common_expression::DataBlock;

use crate::sessions::QueryContext;

// The spiller buffer will record each partition's unspilled data.
// When the buffer is full(>=8MB), it will pick the partition with the most unspilled data to spill.
#[derive(Clone)]
pub(crate) struct SpillerBuffer {
partition_unspilled_data: HashMap<u8, Vec<DataBlock>>,
buffer_size: usize,
buffer_size: HashMap<u8, usize>,
partition_buffer_threshold: usize,
}

impl SpillerBuffer {
pub(crate) fn create() -> Self {
SpillerBuffer {
pub(crate) fn create(ctx: Arc<QueryContext>) -> Result<Self> {
let partition_number = 1_u32 << ctx.get_settings().get_join_spilling_partition_bits()?;
let buffer_threshold = ctx
.get_settings()
.get_join_spilling_buffer_threshold_per_proc()?;
let partition_buffer_threshold = buffer_threshold / partition_number as usize;
Ok(SpillerBuffer {
partition_unspilled_data: HashMap::new(),
buffer_size: 0,
}
buffer_size: HashMap::new(),
partition_buffer_threshold,
})
}

// Add a partition's unspilled data to the buffer
// The method will check if the buffer is full, if so, it will spill the partition with the most unspilled data
// After spilling, the buffer will be clear the spilled partition's unspilled data
// The method will check if the partition's buffer is full, if so, it will spill the partition
// The return value is the partition id and the spilled data
pub(crate) fn add_partition_unspilled_data(
&mut self,
partition_id: u8,
data: DataBlock,
) -> Result<Option<(u8, DataBlock)>> {
fn rows(data: &[DataBlock]) -> usize {
data.iter().map(|block| block.num_rows()).sum()
}
) -> Result<Option<DataBlock>> {
let data_size = data.memory_size();
self.buffer_size += data_size;
self.buffer_size
.entry(partition_id)
.and_modify(|e| *e += data_size)
.or_insert(data_size);
self.partition_unspilled_data
.entry(partition_id)
.or_default()
.push(data);
if self.buffer_size >= 8 * 1024 * 1024 {
// Pick the partition with the most unspilled data in `partition_unspilled_data`
let (partition_id, blocks) = self
if *self.buffer_size.get(&partition_id).unwrap()
>= self.partition_buffer_threshold * 1000 * 1000
{
let blocks = self
.partition_unspilled_data
.iter()
.max_by_key(|(_, v)| rows(v))
.map(|(k, v)| (*k, v.clone()))
.ok_or_else(|| ErrorCode::Internal("No unspilled data in the buffer"))?;
.get_mut(&partition_id)
.unwrap();
debug_assert!(!blocks.is_empty());
self.partition_unspilled_data.remove(&partition_id);
let merged_block = DataBlock::concat(&blocks)?;
self.buffer_size -= merged_block.memory_size();
return Ok(Some((partition_id, merged_block)));
let merged_block = DataBlock::concat(blocks)?;
blocks.clear();
let old_size = self.buffer_size.get_mut(&partition_id).unwrap();
*old_size = 0;
return Ok(Some(merged_block));
}
Ok(None)
}

pub(crate) fn empty(&self) -> bool {
self.buffer_size == 0
// Check if each partition's buffer is empty
self.buffer_size.iter().all(|(_, v)| *v == 0)
}

pub(crate) fn partition_unspilled_data(&self) -> HashMap<u8, Vec<DataBlock>> {
Expand All @@ -79,6 +88,6 @@ impl SpillerBuffer {

pub(crate) fn reset(&mut self) {
self.partition_unspilled_data.clear();
self.buffer_size = 0;
self.buffer_size.clear();
}
}
6 changes: 6 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,12 @@ impl DefaultSettings {
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..=u64::MAX)),
}),
("join_spilling_buffer_threshold_per_proc_mb", DefaultSettingValue {
value: UserSettingValue::UInt64(1024),
desc: "Set the spilling buffer threshold (MB) for each join processor.",
mode: SettingMode::Both,
range: Some(SettingRange::Numeric(0..=u64::MAX)),
}),
("disable_merge_into_join_reorder", DefaultSettingValue {
value: UserSettingValue::UInt64(0),
desc: "Disable merge into join reorder optimization.",
Expand Down
4 changes: 4 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,10 @@ impl Settings {
Ok(self.try_get_u64("join_spilling_partition_bits")? as usize)
}

pub fn get_join_spilling_buffer_threshold_per_proc(&self) -> Result<usize> {
Ok(self.try_get_u64("join_spilling_buffer_threshold_per_proc_mb")? as usize)
}

pub fn get_inlist_to_join_threshold(&self) -> Result<usize> {
Ok(self.try_get_u64("inlist_to_join_threshold")? as usize)
}
Expand Down

0 comments on commit 17355f4

Please sign in to comment.