Skip to content

Commit

Permalink
chore: the final merge sort output block directly.
Browse files Browse the repository at this point in the history
  • Loading branch information
RinChanNOWWW committed Dec 20, 2023
1 parent 37e04b9 commit 70c9416
Showing 1 changed file with 54 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ enum State {
Spill,
/// This state means the processor is doing external merge sort.
Merging,
/// Merge finished, we can output the sorted data now.
MergeFinished,
/// This state is used to merge the last few sorted streams and output direclty.
MergeFinal,
/// Finish the process.
Finish,
}

pub struct TransformSortSpill<R> {
pub struct TransformSortSpill<R: Rows> {
input: Arc<InputPort>,
output: Arc<OutputPort>,
schema: DataSchemaRef,
Expand All @@ -84,6 +84,10 @@ pub struct TransformSortSpill<R> {
/// Unmerged list of blocks. Each list are sorted.
unmerged_blocks: VecDeque<VecDeque<String>>,

/// If `ummerged_blocks.len()` < `num_merge`,
/// we can use a final merger to merge the last few sorted streams to reduce IO.
final_merger: Option<HeapMerger<R, BlockStream>>,

sort_desc: Arc<Vec<SortColumnDescription>>,

_r: PhantomData<R>,
Expand Down Expand Up @@ -131,7 +135,7 @@ where R: Rows + Send + Sync + 'static
}

if let Some(block) = self.output_data.take() {
debug_assert!(matches!(self.state, State::MergeFinished));
debug_assert!(matches!(self.state, State::MergeFinal));
self.output_block(block);
return Ok(Event::NeedConsume);
}
Expand Down Expand Up @@ -193,8 +197,7 @@ where R: Rows + Send + Sync + 'static
self.state = State::Merging;
Ok(Event::Async)
}
State::Merging => Ok(Event::Async),
State::MergeFinished => Ok(Event::Async),
State::Merging | State::MergeFinal => Ok(Event::Async),
};
}

Expand All @@ -213,21 +216,11 @@ where R: Rows + Send + Sync + 'static
let block = self.input_data.take();
self.merge_sort(block).await?;
}
State::MergeFinished => {
debug_assert_eq!(self.unmerged_blocks.len(), 1);
// TODO: pass the spilled locations to next processor directly.
// The next processor will read and process the spilled files.
if let Some(file) = self.unmerged_blocks[0].pop_front() {
let ins = Instant::now();
let (block, bytes) = self.spiller.read_spilled(&file).await?;

// perf
{
metrics_inc_sort_spill_read_count();
metrics_inc_sort_spill_read_bytes(bytes);
metrics_inc_sort_spill_read_milliseconds(ins.elapsed().as_millis() as u64);
}

State::MergeFinal => {
debug_assert!(self.final_merger.is_some());
debug_assert!(self.unmerged_blocks.is_empty());
let merger = self.final_merger.as_mut().unwrap();
if let Some(block) = merger.async_next_block().await? {
self.output_data = Some(block);
} else {
self.state = State::Finish;
Expand Down Expand Up @@ -261,6 +254,7 @@ where R: Rows + Sync + Send + 'static
state: State::Init,
num_merge: 0,
unmerged_blocks: VecDeque::new(),
final_merger: None,
batch_size: 0,
sort_desc,
_r: PhantomData,
Expand Down Expand Up @@ -292,25 +286,15 @@ where R: Rows + Sync + Send + 'static
Ok(())
}

/// Do an external merge sort until there is only one sorted stream.
/// If `block` is not [None], we need to merge it with spilled files.
async fn merge_sort(&mut self, mut block: Option<DataBlock>) -> Result<()> {
while (self.unmerged_blocks.len() + block.is_some() as usize) > 1 {
let b = block.take();
self.merge_sort_one_round(b).await?;
}
self.state = State::MergeFinished;
Ok(())
}

/// Merge certain number of sorted streams to one sorted stream.
async fn merge_sort_one_round(&mut self, block: Option<DataBlock>) -> Result<()> {
let num_streams =
(self.unmerged_blocks.len() + block.is_some() as usize).min(self.num_merge);
debug_assert!(num_streams > 1);
fn create_merger(
&mut self,
memory_block: Option<DataBlock>,
num_streams: usize,
) -> HeapMerger<R, BlockStream> {
debug_assert!(num_streams <= self.unmerged_blocks.len() + memory_block.is_some() as usize);

let mut streams = Vec::with_capacity(num_streams);
if let Some(block) = block {
if let Some(block) = memory_block {
streams.push(BlockStream::Block(Some(block)));
}

Expand All @@ -324,13 +308,38 @@ where R: Rows + Sync + Send + 'static
streams.push(stream);
}

let mut merger = HeapMerger::<R, BlockStream>::create(
HeapMerger::<R, BlockStream>::create(
self.schema.clone(),
streams,
self.sort_desc.clone(),
self.batch_size,
None,
);
)
}

/// Do an external merge sort until there is only one sorted stream.
/// If `block` is not [None], we need to merge it with spilled files.
async fn merge_sort(&mut self, mut block: Option<DataBlock>) -> Result<()> {
while (self.unmerged_blocks.len() + block.is_some() as usize) > self.num_merge {
let b = block.take();
self.merge_sort_one_round(b).await?;
}

let num_streams = self.unmerged_blocks.len() + block.is_some() as usize;
debug_assert!(num_streams <= self.num_merge && num_streams > 1);

self.final_merger = Some(self.create_merger(block, num_streams));
self.state = State::MergeFinal;

Ok(())
}

/// Merge certain number of sorted streams to one sorted stream.
async fn merge_sort_one_round(&mut self, block: Option<DataBlock>) -> Result<()> {
let num_streams =
(self.unmerged_blocks.len() + block.is_some() as usize).min(self.num_merge);
debug_assert!(num_streams > 1);
let mut merger = self.create_merger(block, num_streams);

let mut spilled = VecDeque::new();
while let Some(block) = merger.async_next_block().await? {
Expand Down Expand Up @@ -478,14 +487,12 @@ mod tests {
use databend_common_pipeline_core::processors::InputPort;
use databend_common_pipeline_core::processors::OutputPort;
use databend_common_pipeline_transforms::processors::sort::SimpleRows;
use databend_common_pipeline_transforms::processors::sort::SortedStream;
use databend_common_storage::DataOperator;
use itertools::Itertools;
use rand::rngs::ThreadRng;
use rand::Rng;

use super::TransformSortSpill;
use crate::pipelines::processors::transforms::transform_sort_spill::BlockStream;
use crate::sessions::QueryContext;
use crate::spillers::Spiller;
use crate::spillers::SpillerConfig;
Expand Down Expand Up @@ -597,17 +604,15 @@ mod tests {
}
transform.merge_sort(memory_block).await?;

debug_assert_eq!(transform.unmerged_blocks.len(), 1);
let mut block_stream = BlockStream::Spilled((
transform.unmerged_blocks[0].clone(),
Arc::new(transform.spiller.clone()),
));

let mut result = Vec::new();

while let (Some((block, _)), _) = block_stream.async_next().await? {
debug_assert!(transform.final_merger.is_some());
debug_assert!(transform.unmerged_blocks.is_empty());
let merger = transform.final_merger.as_mut().unwrap();
while let Some(block) = merger.async_next_block().await? {
result.push(block);
}
debug_assert!(merger.is_finished());

let result = pretty_format_blocks(&result).unwrap();
let expected = pretty_format_blocks(&[expected]).unwrap();
Expand Down

0 comments on commit 70c9416

Please sign in to comment.