From 79f5c823105e4d928cdd6d838b047a7f7e40cf9b Mon Sep 17 00:00:00 2001 From: hezheyu Date: Wed, 11 Oct 2023 19:33:03 +0800 Subject: [PATCH 1/2] chore: adjust heap without pop or push. --- src/query/pipeline/transforms/src/lib.rs | 1 + .../src/processors/transforms/sort/cursor.rs | 12 +----- .../processors/transforms/sort/rows/mod.rs | 2 +- .../processors/transforms/sort/rows/simple.rs | 1 + .../transforms/transform_multi_sort_merge.rs | 42 +++++++++++++++---- 5 files changed, 37 insertions(+), 21 deletions(-) diff --git a/src/query/pipeline/transforms/src/lib.rs b/src/query/pipeline/transforms/src/lib.rs index 5fe5de0523a1..d46ce23f2c50 100644 --- a/src/query/pipeline/transforms/src/lib.rs +++ b/src/query/pipeline/transforms/src/lib.rs @@ -15,5 +15,6 @@ #![deny(unused_crate_dependencies)] #![feature(core_intrinsics)] #![feature(int_roundings)] +#![feature(binary_heap_as_slice)] pub mod processors; diff --git a/src/query/pipeline/transforms/src/processors/transforms/sort/cursor.rs b/src/query/pipeline/transforms/src/processors/transforms/sort/cursor.rs index 02208cf3245a..e564fee1e019 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/sort/cursor.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/sort/cursor.rs @@ -17,6 +17,7 @@ use std::cmp::Ordering; use super::rows::Rows; /// A cursor point to a certain row in a data block. +#[derive(Clone)] pub struct Cursor { pub input_index: usize, pub row_index: usize, @@ -85,14 +86,3 @@ impl PartialOrd for Cursor { Some(self.cmp(other)) } } - -impl Clone for Cursor { - fn clone(&self) -> Self { - Self { - input_index: self.input_index, - row_index: self.row_index, - num_rows: self.num_rows, - rows: self.rows.clone(), - } - } -} diff --git a/src/query/pipeline/transforms/src/processors/transforms/sort/rows/mod.rs b/src/query/pipeline/transforms/src/processors/transforms/sort/rows/mod.rs index e65792666ce5..4ccafe7782f9 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/sort/rows/mod.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/sort/rows/mod.rs @@ -37,7 +37,7 @@ where Self: Sized /// Rows can be compared. pub trait Rows -where Self: Sized +where Self: Sized + Clone { type Item<'a>: Ord where Self: 'a; diff --git a/src/query/pipeline/transforms/src/processors/transforms/sort/rows/simple.rs b/src/query/pipeline/transforms/src/processors/transforms/sort/rows/simple.rs index 81756ee414ce..ae31d1711de6 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/sort/rows/simple.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/sort/rows/simple.rs @@ -37,6 +37,7 @@ pub struct SimpleRow { } /// Rows structure for single simple types. (numbers, date, timestamp) +#[derive(Clone)] pub struct SimpleRows { inner: T::Column, desc: bool, diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs index cebcf7dfdb2b..a93637f7fa21 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs @@ -261,26 +261,32 @@ where R: Rows // Need to pop data to in_progress_rows. // Use `>=` because some of the input ports may be finished, but the data is still in the heap. while self.heap.len() >= nums_active_inputs && !need_output { - match self.heap.pop() { - Some(Reverse(mut cursor)) => { + match self.heap.peek() { + Some(Reverse(cursor)) => { let input_index = cursor.input_index; - if self.heap.is_empty() { + if self.heap.len() == 1 { + let cursor = self.heap.pop().unwrap().0; // If there is no other block in the heap, we can drain the whole block. need_output = self.drain_cursor(cursor); } else { - let next_cursor = &self.heap.peek().unwrap().0; - // If the last row of current block is smaller than the next cursor, - // we can drain the whole block. + let next_cursor = &find_bigger_child_of_root(&self.heap).0; if cursor.last().le(&next_cursor.current()) { + // If the last row of current block is smaller than the next cursor, + // we can drain the whole block. + let cursor = self.heap.pop().unwrap().0; need_output = self.drain_cursor(cursor); } else { + // We copy current cursor for advancing, + // and we will use this copied cursor to update the top of the heap at last + // (let heap adjust itself without popping and pushing any element). + let mut copied_cursor = cursor.clone(); let block_index = self.blocks[input_index].len() - 1; while !cursor.is_finished() && cursor.le(next_cursor) { // If the cursor is smaller than the next cursor, don't need to push the cursor back to the heap. self.in_progress_rows.push(( input_index, block_index, - cursor.advance(), + copied_cursor.advance(), )); if let Some(limit) = self.limit { if self.in_progress_rows.len() == limit { @@ -289,9 +295,15 @@ where R: Rows } } } - if !cursor.is_finished() { - self.heap.push(Reverse(cursor)); + + if !copied_cursor.is_finished() { + // Update the top of the heap. + // `self.heap.peek_mut` will return a `PeekMut` object which allows us to modify the top element of the heap. + // The heap will adjust itself automatically when the `PeekMut` object is dropped (RAII). + self.heap.peek_mut().unwrap().0 = copied_cursor; } else { + // Pop the current `cursor`. + self.heap.pop(); // We have read all rows of this block, need to read a new one. self.cursor_finished[input_index] = true; } @@ -513,3 +525,15 @@ enum ProcessorState { // Need to generate output block. Generated(DataBlock), // Need to push output block to output port. } + +/// Get the bigger child of the root of the heap. +#[inline(always)] +fn find_bigger_child_of_root(heap: &BinaryHeap) -> &T { + debug_assert!(heap.len() >= 2); + let slice = heap.as_slice(); + if heap.len() == 2 { + &slice[1] + } else { + (&slice[1]).max(&slice[2]) + } +} From 906e66efa2fea2ee9f5893797c7bb9520e62d26b Mon Sep 17 00:00:00 2001 From: hezheyu Date: Wed, 11 Oct 2023 20:49:27 +0800 Subject: [PATCH 2/2] Fix. --- .../transforms/transform_multi_sort_merge.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs index a93637f7fa21..e5eb00d85a08 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs @@ -279,14 +279,14 @@ where R: Rows // We copy current cursor for advancing, // and we will use this copied cursor to update the top of the heap at last // (let heap adjust itself without popping and pushing any element). - let mut copied_cursor = cursor.clone(); + let mut cursor = cursor.clone(); let block_index = self.blocks[input_index].len() - 1; while !cursor.is_finished() && cursor.le(next_cursor) { // If the cursor is smaller than the next cursor, don't need to push the cursor back to the heap. self.in_progress_rows.push(( input_index, block_index, - copied_cursor.advance(), + cursor.advance(), )); if let Some(limit) = self.limit { if self.in_progress_rows.len() == limit { @@ -296,11 +296,11 @@ where R: Rows } } - if !copied_cursor.is_finished() { + if !cursor.is_finished() { // Update the top of the heap. // `self.heap.peek_mut` will return a `PeekMut` object which allows us to modify the top element of the heap. // The heap will adjust itself automatically when the `PeekMut` object is dropped (RAII). - self.heap.peek_mut().unwrap().0 = copied_cursor; + self.heap.peek_mut().unwrap().0 = cursor; } else { // Pop the current `cursor`. self.heap.pop(); @@ -526,7 +526,7 @@ enum ProcessorState { Generated(DataBlock), // Need to push output block to output port. } -/// Get the bigger child of the root of the heap. +/// Find the bigger child of the root of the heap. #[inline(always)] fn find_bigger_child_of_root(heap: &BinaryHeap) -> &T { debug_assert!(heap.len() >= 2);