From 37ff2a4cce01d2e60da80d6caa566b262d0ee870 Mon Sep 17 00:00:00 2001 From: RinChanNOW Date: Mon, 25 Dec 2023 13:27:04 +0800 Subject: [PATCH] feat: make Top-N sort can be spilled. (#14131) --- .../transforms/transform_sort_merge.rs | 95 +++------ .../transforms/transform_sort_merge_base.rs | 183 ++++++++++++------ .../transforms/transform_sort_merge_limit.rs | 85 ++++++-- .../src/pipelines/builders/builder_sort.rs | 1 + .../transforms/transform_sort_spill.rs | 124 ++++++++---- .../20+_others/20_0014_sort_spill.result | 24 +++ .../20+_others/20_0014_sort_spill.sql | 29 ++- 7 files changed, 359 insertions(+), 182 deletions(-) diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs index cc2bee874477..05a9b1ca5776 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs @@ -18,11 +18,9 @@ use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::sync::Arc; -use databend_common_base::runtime::GLOBAL_MEM_STAT; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::row::RowConverter as CommonConverter; -use databend_common_expression::BlockMetaInfo; use databend_common_expression::Column; use databend_common_expression::DataBlock; use databend_common_expression::DataSchemaRef; @@ -40,14 +38,8 @@ use super::sort::StringRows; use super::sort::TimestampConverter; use super::sort::TimestampRows; use super::transform_sort_merge_base::MergeSort; -use super::transform_sort_merge_base::Status; use super::transform_sort_merge_base::TransformSortMergeBase; use super::AccumulatingTransform; -use crate::processors::sort::SortSpillMeta; -use crate::processors::sort::SortSpillMetaWithParams; - -/// A spilled block file is at most 8MB. -const SPILL_BATCH_BYTES_SIZE: usize = 8 * 1024 * 1024; /// Merge sort blocks without limit. /// @@ -60,21 +52,11 @@ pub struct TransformSortMerge { buffer: Vec>, aborting: Arc, - // The following fields are used for spilling. - may_spill: bool, - max_memory_usage: usize, - spilling_bytes_threshold: usize, + /// Record current memory usage. num_bytes: usize, num_rows: usize, - // The following two fields will be passed to the spill processor. - // If these two fields are not zero, it means we need to spill. - /// The number of rows of each spilled block. - spill_batch_size: usize, - /// The number of spilled blocks in each merge of the spill processor. - spill_num_merge: usize, - _r: PhantomData, } @@ -83,23 +65,15 @@ impl TransformSortMerge { schema: DataSchemaRef, sort_desc: Arc>, block_size: usize, - max_memory_usage: usize, - spilling_bytes_threshold: usize, ) -> Self { - let may_spill = max_memory_usage != 0 && spilling_bytes_threshold != 0; TransformSortMerge { schema, sort_desc, block_size, buffer: vec![], aborting: Arc::new(AtomicBool::new(false)), - may_spill, - max_memory_usage, - spilling_bytes_threshold, num_bytes: 0, num_rows: 0, - spill_batch_size: 0, - spill_num_merge: 0, _r: PhantomData, } } @@ -108,7 +82,7 @@ impl TransformSortMerge { impl MergeSort for TransformSortMerge { const NAME: &'static str = "TransformSortMerge"; - fn add_block(&mut self, block: DataBlock, init_cursor: Cursor) -> Result { + fn add_block(&mut self, block: DataBlock, init_cursor: Cursor) -> Result<()> { if unlikely(self.aborting.load(Ordering::Relaxed)) { return Err(ErrorCode::AbortedQuery( "Aborted query, because the server is shutting down or the query was killed.", @@ -116,29 +90,19 @@ impl MergeSort for TransformSortMerge { } if unlikely(block.is_empty()) { - return Ok(Status::Continue); + return Ok(()); } self.num_bytes += block.memory_size(); self.num_rows += block.num_rows(); self.buffer.push(Some((block, init_cursor.to_column()))); - if self.may_spill - && (self.num_bytes >= self.spilling_bytes_threshold - || GLOBAL_MEM_STAT.get_memory_usage() as usize >= self.max_memory_usage) - { - let blocks = self.prepare_spill()?; - return Ok(Status::Spill(blocks)); - } - - Ok(Status::Continue) + Ok(()) } - fn on_finish(&mut self) -> Result> { - if self.spill_num_merge > 0 { - debug_assert!(self.spill_batch_size > 0); - // Make the last block as a big memory block. - self.merge_sort(usize::MAX) + fn on_finish(&mut self, all_in_one_block: bool) -> Result> { + if all_in_one_block { + self.merge_sort(self.num_rows) } else { self.merge_sort(self.block_size) } @@ -147,41 +111,30 @@ impl MergeSort for TransformSortMerge { fn interrupt(&self) { self.aborting.store(true, Ordering::Release); } -} -impl TransformSortMerge { - fn prepare_spill(&mut self) -> Result> { - let mut spill_meta = Box::new(SortSpillMeta {}) as Box; - if self.spill_batch_size == 0 { - debug_assert_eq!(self.spill_num_merge, 0); - // We use the first memory calculation to estimate the batch size and the number of merge. - self.spill_num_merge = self.num_bytes.div_ceil(SPILL_BATCH_BYTES_SIZE).max(2); - self.spill_batch_size = self.num_rows.div_ceil(self.spill_num_merge); - // The first block to spill will contain the parameters of spilling. - // Later blocks just contain a empty struct `SortSpillMeta` to save memory. - spill_meta = Box::new(SortSpillMetaWithParams { - batch_size: self.spill_batch_size, - num_merge: self.spill_num_merge, - }) as Box; - } else { - debug_assert!(self.spill_num_merge > 0); - } + #[inline(always)] + fn num_bytes(&self) -> usize { + self.num_bytes + } - let mut blocks = self.merge_sort(self.spill_batch_size)?; - if let Some(b) = blocks.first_mut() { - b.replace_meta(spill_meta); - } - for b in blocks.iter_mut().skip(1) { - b.replace_meta(Box::new(SortSpillMeta {})); - } + #[inline(always)] + fn num_rows(&self) -> usize { + self.num_rows + } + + fn prepare_spill(&mut self, spill_batch_size: usize) -> Result> { + let blocks = self.merge_sort(spill_batch_size)?; self.num_rows = 0; self.num_bytes = 0; - self.buffer.clear(); + + debug_assert!(self.buffer.is_empty()); Ok(blocks) } +} +impl TransformSortMerge { fn merge_sort(&mut self, batch_size: usize) -> Result> { if self.buffer.is_empty() { return Ok(vec![]); @@ -266,7 +219,9 @@ pub fn sort_merge( sort_desc.clone(), false, false, - MergeSortCommonImpl::create(schema, sort_desc, block_size, 0, 0), + 0, + 0, + MergeSortCommonImpl::create(schema, sort_desc, block_size), )?; for block in data_blocks { processor.transform(block)?; diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge_base.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge_base.rs index 3a636a284080..b83675734928 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge_base.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge_base.rs @@ -15,12 +15,14 @@ use std::marker::PhantomData; use std::sync::Arc; +use databend_common_base::runtime::GLOBAL_MEM_STAT; use databend_common_exception::Result; use databend_common_expression::types::DataType; use databend_common_expression::types::NumberDataType; use databend_common_expression::types::NumberType; use databend_common_expression::with_number_mapped_type; use databend_common_expression::BlockEntry; +use databend_common_expression::BlockMetaInfo; use databend_common_expression::DataBlock; use databend_common_expression::DataSchemaRef; use databend_common_expression::SortColumnDescription; @@ -34,6 +36,7 @@ use super::sort::RowConverter; use super::sort::Rows; use super::sort::SimpleRowConverter; use super::sort::SimpleRows; +use super::sort::SortSpillMeta; use super::AccumulatingTransform; use super::AccumulatingTransformer; use super::MergeSortCommon; @@ -55,13 +58,10 @@ use super::MergeSortTimestampImpl; use super::TransformSortMerge; use super::TransformSortMergeLimit; use crate::processors::sort::utils::ORDER_COL_NAME; +use crate::processors::sort::SortSpillMetaWithParams; -pub enum Status { - /// Continue to add blocks. - Continue, - // Need to spill blocks. - Spill(Vec), -} +/// A spilled block file is at most 8MB. +const SPILL_BATCH_BYTES_SIZE: usize = 8 * 1024 * 1024; pub trait MergeSort { const NAME: &'static str; @@ -69,9 +69,21 @@ pub trait MergeSort { /// Add a block to the merge sort processor. /// `block` is the input data block. /// `init_cursor` is the initial sorting cursor of this `block`. - fn add_block(&mut self, block: DataBlock, init_cursor: Cursor) -> Result; + fn add_block(&mut self, block: DataBlock, init_cursor: Cursor) -> Result<()>; + + /// Return buffered data size. + fn num_bytes(&self) -> usize; + + /// Return buffered rows. + fn num_rows(&self) -> usize; - fn on_finish(&mut self) -> Result>; + /// Prepare the blocks to spill. + fn prepare_spill(&mut self, spill_batch_size: usize) -> Result>; + + /// Finish the merge sorter and output the remain data. + /// + /// If `all_in_one_block`, the return value is a single block. + fn on_finish(&mut self, all_in_one_block: bool) -> Result>; fn interrupt(&self) {} } @@ -94,6 +106,17 @@ pub struct TransformSortMergeBase { /// The index for the next input block. next_index: usize, + // The following fields are used for spilling. + may_spill: bool, + max_memory_usage: usize, + spilling_bytes_threshold: usize, + // The following two fields will be passed to the spill processor. + // If these two fields are not zero, it means we need to spill. + /// The number of rows of each spilled block. + spill_batch_size: usize, + /// The number of spilled blocks in each merge of the spill processor. + spill_num_merge: usize, + _r: PhantomData, } @@ -108,8 +131,11 @@ where sort_desc: Arc>, order_col_generated: bool, output_order_col: bool, + max_memory_usage: usize, + spilling_bytes_threshold: usize, inner: M, ) -> Result { + let may_spill = max_memory_usage != 0 && spilling_bytes_threshold != 0; let row_converter = Converter::create(&sort_desc, schema)?; Ok(Self { @@ -119,9 +145,53 @@ where output_order_col, order_col_generated, next_index: 0, + max_memory_usage, + spilling_bytes_threshold, + spill_batch_size: 0, + spill_num_merge: 0, + may_spill, _r: PhantomData, }) } + + fn prepare_spill(&mut self) -> Result> { + let mut spill_meta = Box::new(SortSpillMeta {}) as Box; + if self.spill_batch_size == 0 { + debug_assert_eq!(self.spill_num_merge, 0); + // We use the first memory calculation to estimate the batch size and the number of merge. + self.spill_num_merge = self + .inner + .num_bytes() + .div_ceil(SPILL_BATCH_BYTES_SIZE) + .max(2); + self.spill_batch_size = self.inner.num_rows().div_ceil(self.spill_num_merge); + // The first block to spill will contain the parameters of spilling. + // Later blocks just contain a empty struct `SortSpillMeta` to save memory. + spill_meta = Box::new(SortSpillMetaWithParams { + batch_size: self.spill_batch_size, + num_merge: self.spill_num_merge, + }) as Box; + } else { + debug_assert!(self.spill_num_merge > 0); + } + + let mut blocks = self.inner.prepare_spill(self.spill_batch_size)?; + + // Fill the spill meta. + if let Some(b) = blocks.first_mut() { + b.replace_meta(spill_meta); + } + for b in blocks.iter_mut().skip(1) { + b.replace_meta(Box::new(SortSpillMeta {})); + } + + debug_assert_eq!(self.inner.num_bytes(), 0); + debug_assert_eq!(self.inner.num_rows(), 0); + // Re-count the block index. + self.next_index = 0; + + Ok(blocks) + } } impl AccumulatingTransform for TransformSortMergeBase @@ -163,17 +233,24 @@ where let cursor = Cursor::new(self.next_index, rows); self.next_index += 1; - match self.inner.add_block(block, cursor)? { - Status::Continue => Ok(vec![]), - Status::Spill(to_spill) => { - self.next_index = 0; - Ok(to_spill) - } - } + self.inner.add_block(block, cursor)?; + + let blocks = if self.may_spill + && (self.inner.num_bytes() >= self.spilling_bytes_threshold + || GLOBAL_MEM_STAT.get_memory_usage() as usize >= self.max_memory_usage) + { + self.prepare_spill()? + } else { + vec![] + }; + + Ok(blocks) } fn on_finish(&mut self, _output: bool) -> Result> { - self.inner.on_finish() + // If the processor has started to spill blocks, + // gather the final few data in one block. + self.inner.on_finish(self.spill_num_merge > 0) } } @@ -284,13 +361,9 @@ impl TransformSortMergeBuilder { sort_desc.clone(), order_col_generated, output_order_col, - TransformSortMerge::create( - schema, - sort_desc, - block_size, - max_memory_usage, - spilling_bytes_threshold_per_core, - ), + max_memory_usage, + spilling_bytes_threshold_per_core, + TransformSortMerge::create(schema, sort_desc, block_size), )?, ), }), @@ -302,13 +375,9 @@ impl TransformSortMergeBuilder { sort_desc.clone(), order_col_generated, output_order_col, - MergeSortDateImpl::create( - schema, - sort_desc, - block_size, - max_memory_usage, - spilling_bytes_threshold_per_core, - ), + max_memory_usage, + spilling_bytes_threshold_per_core, + MergeSortDateImpl::create(schema, sort_desc, block_size), )?, ), DataType::Timestamp => AccumulatingTransformer::create( @@ -319,13 +388,9 @@ impl TransformSortMergeBuilder { sort_desc.clone(), order_col_generated, output_order_col, - MergeSortTimestampImpl::create( - schema, - sort_desc, - block_size, - max_memory_usage, - spilling_bytes_threshold_per_core, - ), + max_memory_usage, + spilling_bytes_threshold_per_core, + MergeSortTimestampImpl::create(schema, sort_desc, block_size), )?, ), DataType::String => AccumulatingTransformer::create( @@ -336,13 +401,9 @@ impl TransformSortMergeBuilder { sort_desc.clone(), order_col_generated, output_order_col, - MergeSortStringImpl::create( - schema, - sort_desc, - block_size, - max_memory_usage, - spilling_bytes_threshold_per_core, - ), + max_memory_usage, + spilling_bytes_threshold_per_core, + MergeSortStringImpl::create(schema, sort_desc, block_size), )?, ), _ => AccumulatingTransformer::create( @@ -353,13 +414,9 @@ impl TransformSortMergeBuilder { sort_desc.clone(), order_col_generated, output_order_col, - MergeSortCommonImpl::create( - schema, - sort_desc, - block_size, - max_memory_usage, - spilling_bytes_threshold_per_core, - ), + max_memory_usage, + spilling_bytes_threshold_per_core, + MergeSortCommonImpl::create(schema, sort_desc, block_size), )?, ), } @@ -372,13 +429,9 @@ impl TransformSortMergeBuilder { sort_desc.clone(), order_col_generated, output_order_col, - MergeSortCommonImpl::create( - schema, - sort_desc, - block_size, - max_memory_usage, - spilling_bytes_threshold_per_core, - ), + max_memory_usage, + spilling_bytes_threshold_per_core, + MergeSortCommonImpl::create(schema, sort_desc, block_size), )?, ) }; @@ -396,6 +449,8 @@ impl TransformSortMergeBuilder { order_col_generated, output_order_col, limit, + spilling_bytes_threshold_per_core, + max_memory_usage, .. } = self; let limit = limit.unwrap(); @@ -416,6 +471,8 @@ impl TransformSortMergeBuilder { sort_desc, order_col_generated, output_order_col, + max_memory_usage, + spilling_bytes_threshold_per_core, TransformSortMergeLimit::create(block_size, limit), )?, ), @@ -428,6 +485,8 @@ impl TransformSortMergeBuilder { sort_desc, order_col_generated, output_order_col, + max_memory_usage, + spilling_bytes_threshold_per_core, MergeSortLimitDateImpl::create(block_size, limit), )?, ), @@ -439,6 +498,8 @@ impl TransformSortMergeBuilder { sort_desc, order_col_generated, output_order_col, + max_memory_usage, + spilling_bytes_threshold_per_core, MergeSortLimitTimestampImpl::create(block_size, limit), )?, ), @@ -450,6 +511,8 @@ impl TransformSortMergeBuilder { sort_desc, order_col_generated, output_order_col, + max_memory_usage, + spilling_bytes_threshold_per_core, MergeSortLimitStringImpl::create(block_size, limit), )?, ), @@ -461,6 +524,8 @@ impl TransformSortMergeBuilder { sort_desc, order_col_generated, output_order_col, + max_memory_usage, + spilling_bytes_threshold_per_core, MergeSortLimitCommonImpl::create(block_size, limit), )?, ), @@ -474,6 +539,8 @@ impl TransformSortMergeBuilder { sort_desc, order_col_generated, output_order_col, + max_memory_usage, + spilling_bytes_threshold_per_core, MergeSortLimitCommonImpl::create(block_size, limit), )?, ) diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge_limit.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge_limit.rs index 61ab2f4aa6f7..87a39396016c 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge_limit.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge_limit.rs @@ -31,7 +31,6 @@ use super::sort::StringRows; use super::sort::TimestampConverter; use super::sort::TimestampRows; use super::transform_sort_merge_base::MergeSort; -use super::transform_sort_merge_base::Status; use super::transform_sort_merge_base::TransformSortMergeBase; /// This is a specific version of [`super::transform_sort_merge::TransformSortMerge`] which sort blocks with limit. @@ -39,18 +38,24 @@ pub struct TransformSortMergeLimit { heap: FixedHeap>>, buffer: HashMap, + /// Record current memory usage. + num_bytes: usize, + num_rows: usize, + block_size: usize, } impl MergeSort for TransformSortMergeLimit { const NAME: &'static str = "TransformSortMergeLimit"; - fn add_block(&mut self, block: DataBlock, mut cursor: Cursor) -> Result { + fn add_block(&mut self, block: DataBlock, mut cursor: Cursor) -> Result<()> { if unlikely(self.heap.cap() == 0 || block.is_empty()) { // limit is 0 or block is empty. - return Ok(Status::Continue); + return Ok(()); } + self.num_bytes += block.memory_size(); + self.num_rows += block.num_rows(); let cur_index = cursor.input_index; self.buffer.insert(cur_index, block); @@ -59,7 +64,10 @@ impl MergeSort for TransformSortMergeLimit { if evict.row_index == 0 { // Evict the first row of the block, // which means the block must not appear in the Top-N result. - self.buffer.remove(&evict.input_index); + if let Some(block) = self.buffer.remove(&evict.input_index) { + self.num_bytes -= block.memory_size(); + self.num_rows -= block.num_rows(); + } } if evict.input_index == cur_index { @@ -70,12 +78,55 @@ impl MergeSort for TransformSortMergeLimit { cursor.advance(); } - Ok(Status::Continue) + Ok(()) + } + + fn on_finish(&mut self, all_in_one_block: bool) -> Result> { + if all_in_one_block { + Ok(self.drain_heap(self.num_rows)) + } else { + Ok(self.drain_heap(self.block_size)) + } + } + + #[inline(always)] + fn num_bytes(&self) -> usize { + self.num_bytes + } + + #[inline(always)] + fn num_rows(&self) -> usize { + self.num_rows } - fn on_finish(&mut self) -> Result> { + fn prepare_spill(&mut self, spill_batch_size: usize) -> Result> { + // TBD: if it's better to add the blocks back to the heap. + // Reason: the output `blocks` is a result of Top-N, + // so the memory usage will be less than the original buffered data. + // If the reduced memory usage does not reach the spilling threshold, + // we can avoid one spilling. + let blocks = self.drain_heap(spill_batch_size); + + debug_assert!(self.buffer.is_empty()); + + Ok(blocks) + } +} + +impl TransformSortMergeLimit { + pub fn create(block_size: usize, limit: usize) -> Self { + TransformSortMergeLimit { + heap: FixedHeap::new(limit), + buffer: HashMap::with_capacity(limit), + block_size, + num_bytes: 0, + num_rows: 0, + } + } + + fn drain_heap(&mut self, batch_size: usize) -> Vec { if self.heap.is_empty() { - return Ok(vec![]); + return vec![]; } let output_size = self.heap.len(); @@ -91,12 +142,12 @@ impl MergeSort for TransformSortMergeLimit { output_indices.push((block_index, cursor.row_index)); } - let output_block_num = output_size.div_ceil(self.block_size); + let output_block_num = output_size.div_ceil(batch_size); let mut output_blocks = Vec::with_capacity(output_block_num); for i in 0..output_block_num { - let start = i * self.block_size; - let end = (start + self.block_size).min(output_indices.len()); + let start = i * batch_size; + let end = (start + batch_size).min(output_indices.len()); // Convert indices to merge slice. let mut merge_slices = Vec::with_capacity(output_indices.len()); let (block_idx, row_idx) = output_indices[start]; @@ -113,17 +164,11 @@ impl MergeSort for TransformSortMergeLimit { output_blocks.push(block); } - Ok(output_blocks) - } -} + self.buffer.clear(); + self.num_bytes = 0; + self.num_rows = 0; -impl TransformSortMergeLimit { - pub fn create(block_size: usize, limit: usize) -> Self { - TransformSortMergeLimit { - heap: FixedHeap::new(limit), - buffer: HashMap::with_capacity(limit), - block_size, - } + output_blocks } } diff --git a/src/query/service/src/pipelines/builders/builder_sort.rs b/src/query/service/src/pipelines/builders/builder_sort.rs index 47f7a087ae75..68edb97bf100 100644 --- a/src/query/service/src/pipelines/builders/builder_sort.rs +++ b/src/query/service/src/pipelines/builders/builder_sort.rs @@ -334,6 +334,7 @@ impl SortPipelineBuilder { output, schema.clone(), self.sort_desc.clone(), + self.limit, spiller, output_order_col, ); diff --git a/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs b/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs index 5362d02fac3a..c29a193f766f 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs @@ -71,6 +71,7 @@ pub struct TransformSortSpill { output: Arc, schema: DataSchemaRef, output_order_col: bool, + limit: Option, input_data: Option, output_data: Option, @@ -240,6 +241,7 @@ where R: Rows + Sync + Send + 'static output: Arc, schema: DataSchemaRef, sort_desc: Arc>, + limit: Option, spiller: Spiller, output_order_col: bool, ) -> Self { @@ -247,6 +249,7 @@ where R: Rows + Sync + Send + 'static input, output, schema, + limit, output_order_col, input_data: None, output_data: None, @@ -313,7 +316,7 @@ where R: Rows + Sync + Send + 'static streams, self.sort_desc.clone(), self.batch_size, - None, + self.limit, ) } @@ -428,6 +431,7 @@ pub fn create_transform_sort_spill( output: Arc, schema: DataSchemaRef, sort_desc: Arc>, + limit: Option, spiller: Spiller, output_order_col: bool, ) -> Box { @@ -442,6 +446,7 @@ pub fn create_transform_sort_spill( output, schema, sort_desc, + limit, spiller, output_order_col )), @@ -451,6 +456,7 @@ pub fn create_transform_sort_spill( output, schema, sort_desc, + limit, spiller, output_order_col, )), @@ -459,6 +465,7 @@ pub fn create_transform_sort_spill( output, schema, sort_desc, + limit, spiller, output_order_col, )), @@ -467,6 +474,7 @@ pub fn create_transform_sort_spill( output, schema, sort_desc, + limit, spiller, output_order_col, )), @@ -475,6 +483,7 @@ pub fn create_transform_sort_spill( output, schema, sort_desc, + limit, spiller, output_order_col, )), @@ -485,6 +494,7 @@ pub fn create_transform_sort_spill( output, schema, sort_desc, + limit, spiller, output_order_col, )) @@ -523,6 +533,7 @@ mod tests { async fn create_test_transform( ctx: Arc, + limit: Option, ) -> Result>> { let op = DataOperator::instance().operator(); let spiller = Spiller::create( @@ -547,6 +558,7 @@ mod tests { DataType::Number(NumberDataType::Int32), )]), sort_desc, + limit, spiller, false, ); @@ -555,7 +567,7 @@ mod tests { } /// Returns (input, expected) - fn basic_test_data() -> (Vec, DataBlock) { + fn basic_test_data(limit: Option) -> (Vec, DataBlock) { let data = vec![ vec![1, 3, 5, 7], vec![1, 2, 3, 4], @@ -570,13 +582,21 @@ mod tests { .map(|v| DataBlock::new_from_columns(vec![Int32Type::from_data(v)])) .collect::>(); let result = data.into_iter().flatten().sorted().collect::>(); + let result = if let Some(limit) = limit { + result.into_iter().take(limit).collect::>() + } else { + result + }; let result = DataBlock::new_from_columns(vec![Int32Type::from_data(result)]); (input, result) } /// Returns (input, expected, batch_size, num_merge) - fn random_test_data(rng: &mut ThreadRng) -> (Vec, DataBlock, usize, usize) { + fn random_test_data( + rng: &mut ThreadRng, + limit: Option, + ) -> (Vec, DataBlock, usize, usize) { let random_batch_size = rng.gen_range(1..=10); let random_num_streams = rng.gen_range(5..=10); let random_num_merge = rng.gen_range(2..=10); @@ -601,6 +621,11 @@ mod tests { .flatten() .sorted() .collect::>(); + let result = if let Some(limit) = limit { + result.into_iter().take(limit).collect::>() + } else { + result + }; let result = DataBlock::new_from_columns(vec![Int32Type::from_data(result)]); (input, result, random_batch_size, random_num_merge) @@ -613,8 +638,9 @@ mod tests { batch_size: usize, num_merge: usize, has_memory_block: bool, + limit: Option, ) -> Result<()> { - let mut transform = create_test_transform(ctx).await?; + let mut transform = create_test_transform(ctx, limit).await?; transform.num_merge = num_merge; transform.batch_size = batch_size; @@ -651,28 +677,48 @@ mod tests { async fn test_two_way_merge_sort() -> Result<()> { let fixture = TestFixture::setup().await?; let ctx = fixture.new_query_ctx().await?; - let (input, expected) = basic_test_data(); + let (input, expected) = basic_test_data(None); - test(ctx, input, expected, 4, 2, false).await + test(ctx, input, expected, 4, 2, false, None).await } #[tokio::test(flavor = "multi_thread")] async fn test_two_way_merge_sort_with_memory_block() -> Result<()> { let fixture = TestFixture::setup().await?; let ctx = fixture.new_query_ctx().await?; - let (input, expected) = basic_test_data(); + let (input, expected) = basic_test_data(None); + + test(ctx, input, expected, 4, 2, true, None).await + } - test(ctx, input, expected, 4, 2, true).await + async fn basic_test( + ctx: Arc, + batch_size: usize, + num_merge: usize, + limit: Option, + ) -> Result<()> { + let (input, expected) = basic_test_data(limit); + + test( + ctx.clone(), + input.clone(), + expected.clone(), + batch_size, + num_merge, + false, + limit, + ) + .await?; + test(ctx, input, expected, batch_size, num_merge, true, limit).await } #[tokio::test(flavor = "multi_thread")] async fn test_three_way_merge_sort() -> Result<()> { let fixture = TestFixture::setup().await?; let ctx = fixture.new_query_ctx().await?; - let (input, expected) = basic_test_data(); - test(ctx.clone(), input.clone(), expected.clone(), 4, 3, false).await?; - test(ctx, input, expected, 4, 3, true).await + basic_test(ctx.clone(), 4, 3, None).await?; + basic_test(ctx, 4, 3, Some(2)).await } #[tokio::test(flavor = "multi_thread")] @@ -680,10 +726,37 @@ mod tests { // Test if `num_merge` is bigger than the number of streams. let fixture = TestFixture::setup().await?; let ctx = fixture.new_query_ctx().await?; - let (input, expected) = basic_test_data(); - test(ctx.clone(), input.clone(), expected.clone(), 4, 10, false).await?; - test(ctx, input, expected, 4, 10, true).await + basic_test(ctx.clone(), 4, 10, None).await?; + basic_test(ctx, 4, 10, Some(2)).await + } + + async fn random_test( + ctx: Arc, + rng: &mut ThreadRng, + limit: Option, + ) -> Result<()> { + let (input, expected, batch_size, num_merge) = random_test_data(rng, limit); + test( + ctx.clone(), + input.clone(), + expected.clone(), + batch_size, + num_merge, + false, + limit, + ) + .await?; + test( + ctx.clone(), + input.clone(), + expected.clone(), + batch_size, + num_merge, + true, + limit, + ) + .await } #[tokio::test(flavor = "multi_thread")] @@ -693,25 +766,10 @@ mod tests { let mut rng = rand::thread_rng(); for _ in 0..10 { - let (input, expected, batch_size, num_merge) = random_test_data(&mut rng); - test( - ctx.clone(), - input.clone(), - expected.clone(), - batch_size, - num_merge, - false, - ) - .await?; - test( - ctx.clone(), - input.clone(), - expected.clone(), - batch_size, - num_merge, - true, - ) - .await?; + random_test(ctx.clone(), &mut rng, None).await?; + + let limit = rng.gen_range(1..=5); + random_test(ctx.clone(), &mut rng, Some(limit)).await?; } Ok(()) diff --git a/tests/suites/0_stateless/20+_others/20_0014_sort_spill.result b/tests/suites/0_stateless/20+_others/20_0014_sort_spill.result index 0f168f39ce7e..e36a1101c1c2 100644 --- a/tests/suites/0_stateless/20+_others/20_0014_sort_spill.result +++ b/tests/suites/0_stateless/20+_others/20_0014_sort_spill.result @@ -1,3 +1,4 @@ +==TEST GLOBAL SORT== 0 1 NULL @@ -60,3 +61,26 @@ NULL NULL 2 5 2 NULL 4 8 +==TEST TOP-N SORT== +=================== +0 +=================== +transform_spill_write_count_total {"spill":"sort_spill"} 1.0 +transform_spill_write_bytes_total {"spill":"sort_spill"} 1168.0 +transform_spill_read_count_total {"spill":"sort_spill"} 1.0 +transform_spill_read_bytes_total {"spill":"sort_spill"} 1168.0 +=================== +2 NULL +2 5 +4 8 +2 NULL +2 5 +4 8 +NULL 6 +NULL NULL +2 5 +=================== +transform_spill_write_count_total {"spill":"sort_spill"} 7.0 +transform_spill_write_bytes_total {"spill":"sort_spill"} 11968.0 +transform_spill_read_count_total {"spill":"sort_spill"} 7.0 +transform_spill_read_bytes_total {"spill":"sort_spill"} 11968.0 diff --git a/tests/suites/0_stateless/20+_others/20_0014_sort_spill.sql b/tests/suites/0_stateless/20+_others/20_0014_sort_spill.sql index a51a557bebee..1a2bdfe81972 100644 --- a/tests/suites/0_stateless/20+_others/20_0014_sort_spill.sql +++ b/tests/suites/0_stateless/20+_others/20_0014_sort_spill.sql @@ -1,3 +1,4 @@ +SELECT '==TEST GLOBAL SORT=='; set sort_spilling_bytes_threshold_per_proc = 8; drop table if exists t; CREATE TABLE t (a INT, b INT, c BOOLEAN NULL); @@ -52,5 +53,31 @@ SELECT x, y FROM xy ORDER BY x NULLS LAST, y DESC NULLS FIRST; SELECT x, y FROM xy ORDER BY x NULLS FIRST, y DESC NULLS LAST; SELECT x, y FROM xy ORDER BY x NULLS FIRST, y DESC; -set sort_spilling_bytes_threshold_per_proc = 0; + set max_threads = 16; + +-- Test spill in Top-N scenario. +SELECT '==TEST TOP-N SORT=='; + +truncate table system.metrics; + +SELECT '==================='; + +SELECT c FROM t ORDER BY c limit 1; + +SELECT '==================='; + +set sort_spilling_bytes_threshold_per_proc = 0; +select metric, labels, sum(value::float) from system.metrics where metric like '%spill%total' group by metric, labels order by metric desc; +set sort_spilling_bytes_threshold_per_proc = 60; + +SELECT '==================='; + +SELECT x, y FROM xy ORDER BY x, y DESC NULLS FIRST LIMIT 3; +SELECT x, y FROM xy ORDER BY x NULLS LAST, y DESC NULLS FIRST LIMIT 3; +SELECT x, y FROM xy ORDER BY x NULLS FIRST, y DESC NULLS LAST LIMIT 3; + +SELECT '==================='; + +set sort_spilling_bytes_threshold_per_proc = 0; +select metric, labels, sum(value::float) from system.metrics where metric like '%spill%total' group by metric, labels order by metric desc; \ No newline at end of file