Skip to content

Commit

Permalink
feat: make Top-N sort can be spilled. (#14131)
Browse files Browse the repository at this point in the history
  • Loading branch information
RinChanNOWWW authored Dec 25, 2023
1 parent d6f7262 commit 37ff2a4
Show file tree
Hide file tree
Showing 7 changed files with 359 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@ use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use databend_common_base::runtime::GLOBAL_MEM_STAT;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::row::RowConverter as CommonConverter;
use databend_common_expression::BlockMetaInfo;
use databend_common_expression::Column;
use databend_common_expression::DataBlock;
use databend_common_expression::DataSchemaRef;
Expand All @@ -40,14 +38,8 @@ use super::sort::StringRows;
use super::sort::TimestampConverter;
use super::sort::TimestampRows;
use super::transform_sort_merge_base::MergeSort;
use super::transform_sort_merge_base::Status;
use super::transform_sort_merge_base::TransformSortMergeBase;
use super::AccumulatingTransform;
use crate::processors::sort::SortSpillMeta;
use crate::processors::sort::SortSpillMetaWithParams;

/// A spilled block file is at most 8MB.
const SPILL_BATCH_BYTES_SIZE: usize = 8 * 1024 * 1024;

/// Merge sort blocks without limit.
///
Expand All @@ -60,21 +52,11 @@ pub struct TransformSortMerge<R: Rows> {
buffer: Vec<Option<(DataBlock, Column)>>,

aborting: Arc<AtomicBool>,
// The following fields are used for spilling.
may_spill: bool,
max_memory_usage: usize,
spilling_bytes_threshold: usize,

/// Record current memory usage.
num_bytes: usize,
num_rows: usize,

// The following two fields will be passed to the spill processor.
// If these two fields are not zero, it means we need to spill.
/// The number of rows of each spilled block.
spill_batch_size: usize,
/// The number of spilled blocks in each merge of the spill processor.
spill_num_merge: usize,

_r: PhantomData<R>,
}

Expand All @@ -83,23 +65,15 @@ impl<R: Rows> TransformSortMerge<R> {
schema: DataSchemaRef,
sort_desc: Arc<Vec<SortColumnDescription>>,
block_size: usize,
max_memory_usage: usize,
spilling_bytes_threshold: usize,
) -> Self {
let may_spill = max_memory_usage != 0 && spilling_bytes_threshold != 0;
TransformSortMerge {
schema,
sort_desc,
block_size,
buffer: vec![],
aborting: Arc::new(AtomicBool::new(false)),
may_spill,
max_memory_usage,
spilling_bytes_threshold,
num_bytes: 0,
num_rows: 0,
spill_batch_size: 0,
spill_num_merge: 0,
_r: PhantomData,
}
}
Expand All @@ -108,37 +82,27 @@ impl<R: Rows> TransformSortMerge<R> {
impl<R: Rows> MergeSort<R> for TransformSortMerge<R> {
const NAME: &'static str = "TransformSortMerge";

fn add_block(&mut self, block: DataBlock, init_cursor: Cursor<R>) -> Result<Status> {
fn add_block(&mut self, block: DataBlock, init_cursor: Cursor<R>) -> Result<()> {
if unlikely(self.aborting.load(Ordering::Relaxed)) {
return Err(ErrorCode::AbortedQuery(
"Aborted query, because the server is shutting down or the query was killed.",
));
}

if unlikely(block.is_empty()) {
return Ok(Status::Continue);
return Ok(());
}

self.num_bytes += block.memory_size();
self.num_rows += block.num_rows();
self.buffer.push(Some((block, init_cursor.to_column())));

if self.may_spill
&& (self.num_bytes >= self.spilling_bytes_threshold
|| GLOBAL_MEM_STAT.get_memory_usage() as usize >= self.max_memory_usage)
{
let blocks = self.prepare_spill()?;
return Ok(Status::Spill(blocks));
}

Ok(Status::Continue)
Ok(())
}

fn on_finish(&mut self) -> Result<Vec<DataBlock>> {
if self.spill_num_merge > 0 {
debug_assert!(self.spill_batch_size > 0);
// Make the last block as a big memory block.
self.merge_sort(usize::MAX)
fn on_finish(&mut self, all_in_one_block: bool) -> Result<Vec<DataBlock>> {
if all_in_one_block {
self.merge_sort(self.num_rows)
} else {
self.merge_sort(self.block_size)
}
Expand All @@ -147,41 +111,30 @@ impl<R: Rows> MergeSort<R> for TransformSortMerge<R> {
fn interrupt(&self) {
self.aborting.store(true, Ordering::Release);
}
}

impl<R: Rows> TransformSortMerge<R> {
fn prepare_spill(&mut self) -> Result<Vec<DataBlock>> {
let mut spill_meta = Box::new(SortSpillMeta {}) as Box<dyn BlockMetaInfo>;
if self.spill_batch_size == 0 {
debug_assert_eq!(self.spill_num_merge, 0);
// We use the first memory calculation to estimate the batch size and the number of merge.
self.spill_num_merge = self.num_bytes.div_ceil(SPILL_BATCH_BYTES_SIZE).max(2);
self.spill_batch_size = self.num_rows.div_ceil(self.spill_num_merge);
// The first block to spill will contain the parameters of spilling.
// Later blocks just contain a empty struct `SortSpillMeta` to save memory.
spill_meta = Box::new(SortSpillMetaWithParams {
batch_size: self.spill_batch_size,
num_merge: self.spill_num_merge,
}) as Box<dyn BlockMetaInfo>;
} else {
debug_assert!(self.spill_num_merge > 0);
}
#[inline(always)]
fn num_bytes(&self) -> usize {
self.num_bytes
}

let mut blocks = self.merge_sort(self.spill_batch_size)?;
if let Some(b) = blocks.first_mut() {
b.replace_meta(spill_meta);
}
for b in blocks.iter_mut().skip(1) {
b.replace_meta(Box::new(SortSpillMeta {}));
}
#[inline(always)]
fn num_rows(&self) -> usize {
self.num_rows
}

fn prepare_spill(&mut self, spill_batch_size: usize) -> Result<Vec<DataBlock>> {
let blocks = self.merge_sort(spill_batch_size)?;

self.num_rows = 0;
self.num_bytes = 0;
self.buffer.clear();

debug_assert!(self.buffer.is_empty());

Ok(blocks)
}
}

impl<R: Rows> TransformSortMerge<R> {
fn merge_sort(&mut self, batch_size: usize) -> Result<Vec<DataBlock>> {
if self.buffer.is_empty() {
return Ok(vec![]);
Expand Down Expand Up @@ -266,7 +219,9 @@ pub fn sort_merge(
sort_desc.clone(),
false,
false,
MergeSortCommonImpl::create(schema, sort_desc, block_size, 0, 0),
0,
0,
MergeSortCommonImpl::create(schema, sort_desc, block_size),
)?;
for block in data_blocks {
processor.transform(block)?;
Expand Down
Loading

0 comments on commit 37ff2a4

Please sign in to comment.