From db869042c78c729a32a919c1db43891009a8e77f Mon Sep 17 00:00:00 2001 From: RinChanNOW Date: Thu, 12 Oct 2023 11:54:20 +0800 Subject: [PATCH] chore: adjust heap without any popping or pushing. (#13211) * chore: adjust heap without pop or push. * Fix. --- src/query/pipeline/transforms/src/lib.rs | 1 + .../src/processors/transforms/sort/cursor.rs | 12 +----- .../processors/transforms/sort/rows/mod.rs | 2 +- .../processors/transforms/sort/rows/simple.rs | 1 + .../transforms/transform_multi_sort_merge.rs | 38 +++++++++++++++---- 5 files changed, 35 insertions(+), 19 deletions(-) diff --git a/src/query/pipeline/transforms/src/lib.rs b/src/query/pipeline/transforms/src/lib.rs index 5fe5de0523a19..d46ce23f2c504 100644 --- a/src/query/pipeline/transforms/src/lib.rs +++ b/src/query/pipeline/transforms/src/lib.rs @@ -15,5 +15,6 @@ #![deny(unused_crate_dependencies)] #![feature(core_intrinsics)] #![feature(int_roundings)] +#![feature(binary_heap_as_slice)] pub mod processors; diff --git a/src/query/pipeline/transforms/src/processors/transforms/sort/cursor.rs b/src/query/pipeline/transforms/src/processors/transforms/sort/cursor.rs index 02208cf3245aa..e564fee1e0191 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/sort/cursor.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/sort/cursor.rs @@ -17,6 +17,7 @@ use std::cmp::Ordering; use super::rows::Rows; /// A cursor point to a certain row in a data block. +#[derive(Clone)] pub struct Cursor { pub input_index: usize, pub row_index: usize, @@ -85,14 +86,3 @@ impl PartialOrd for Cursor { Some(self.cmp(other)) } } - -impl Clone for Cursor { - fn clone(&self) -> Self { - Self { - input_index: self.input_index, - row_index: self.row_index, - num_rows: self.num_rows, - rows: self.rows.clone(), - } - } -} diff --git a/src/query/pipeline/transforms/src/processors/transforms/sort/rows/mod.rs b/src/query/pipeline/transforms/src/processors/transforms/sort/rows/mod.rs index e65792666ce5c..4ccafe7782f92 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/sort/rows/mod.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/sort/rows/mod.rs @@ -37,7 +37,7 @@ where Self: Sized /// Rows can be compared. pub trait Rows -where Self: Sized +where Self: Sized + Clone { type Item<'a>: Ord where Self: 'a; diff --git a/src/query/pipeline/transforms/src/processors/transforms/sort/rows/simple.rs b/src/query/pipeline/transforms/src/processors/transforms/sort/rows/simple.rs index 81756ee414ce1..ae31d1711de6d 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/sort/rows/simple.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/sort/rows/simple.rs @@ -37,6 +37,7 @@ pub struct SimpleRow { } /// Rows structure for single simple types. (numbers, date, timestamp) +#[derive(Clone)] pub struct SimpleRows { inner: T::Column, desc: bool, diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs index cebcf7dfdb2b7..e5eb00d85a084 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs @@ -261,19 +261,25 @@ where R: Rows // Need to pop data to in_progress_rows. // Use `>=` because some of the input ports may be finished, but the data is still in the heap. while self.heap.len() >= nums_active_inputs && !need_output { - match self.heap.pop() { - Some(Reverse(mut cursor)) => { + match self.heap.peek() { + Some(Reverse(cursor)) => { let input_index = cursor.input_index; - if self.heap.is_empty() { + if self.heap.len() == 1 { + let cursor = self.heap.pop().unwrap().0; // If there is no other block in the heap, we can drain the whole block. need_output = self.drain_cursor(cursor); } else { - let next_cursor = &self.heap.peek().unwrap().0; - // If the last row of current block is smaller than the next cursor, - // we can drain the whole block. + let next_cursor = &find_bigger_child_of_root(&self.heap).0; if cursor.last().le(&next_cursor.current()) { + // If the last row of current block is smaller than the next cursor, + // we can drain the whole block. + let cursor = self.heap.pop().unwrap().0; need_output = self.drain_cursor(cursor); } else { + // We copy current cursor for advancing, + // and we will use this copied cursor to update the top of the heap at last + // (let heap adjust itself without popping and pushing any element). + let mut cursor = cursor.clone(); let block_index = self.blocks[input_index].len() - 1; while !cursor.is_finished() && cursor.le(next_cursor) { // If the cursor is smaller than the next cursor, don't need to push the cursor back to the heap. @@ -289,9 +295,15 @@ where R: Rows } } } + if !cursor.is_finished() { - self.heap.push(Reverse(cursor)); + // Update the top of the heap. + // `self.heap.peek_mut` will return a `PeekMut` object which allows us to modify the top element of the heap. + // The heap will adjust itself automatically when the `PeekMut` object is dropped (RAII). + self.heap.peek_mut().unwrap().0 = cursor; } else { + // Pop the current `cursor`. + self.heap.pop(); // We have read all rows of this block, need to read a new one. self.cursor_finished[input_index] = true; } @@ -513,3 +525,15 @@ enum ProcessorState { // Need to generate output block. Generated(DataBlock), // Need to push output block to output port. } + +/// Find the bigger child of the root of the heap. +#[inline(always)] +fn find_bigger_child_of_root(heap: &BinaryHeap) -> &T { + debug_assert!(heap.len() >= 2); + let slice = heap.as_slice(); + if heap.len() == 2 { + &slice[1] + } else { + (&slice[1]).max(&slice[2]) + } +}