Skip to content

Commit

Permalink
chore: adjust heap without any popping or pushing. (databendlabs#13211)
Browse files Browse the repository at this point in the history
* chore: adjust heap without pop or push.

* Fix.
  • Loading branch information
RinChanNOWWW authored and andylokandy committed Nov 27, 2023
1 parent 08bd72d commit db86904
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 19 deletions.
1 change: 1 addition & 0 deletions src/query/pipeline/transforms/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@
#![deny(unused_crate_dependencies)]
#![feature(core_intrinsics)]
#![feature(int_roundings)]
#![feature(binary_heap_as_slice)]

pub mod processors;
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::cmp::Ordering;
use super::rows::Rows;

/// A cursor point to a certain row in a data block.
#[derive(Clone)]
pub struct Cursor<R: Rows> {
pub input_index: usize,
pub row_index: usize,
Expand Down Expand Up @@ -85,14 +86,3 @@ impl<R: Rows> PartialOrd for Cursor<R> {
Some(self.cmp(other))
}
}

impl<R: Clone + Rows> Clone for Cursor<R> {
fn clone(&self) -> Self {
Self {
input_index: self.input_index,
row_index: self.row_index,
num_rows: self.num_rows,
rows: self.rows.clone(),
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ where Self: Sized

/// Rows can be compared.
pub trait Rows
where Self: Sized
where Self: Sized + Clone
{
type Item<'a>: Ord
where Self: 'a;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub struct SimpleRow<T: ValueType> {
}

/// Rows structure for single simple types. (numbers, date, timestamp)
#[derive(Clone)]
pub struct SimpleRows<T: ValueType> {
inner: T::Column,
desc: bool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,19 +261,25 @@ where R: Rows
// Need to pop data to in_progress_rows.
// Use `>=` because some of the input ports may be finished, but the data is still in the heap.
while self.heap.len() >= nums_active_inputs && !need_output {
match self.heap.pop() {
Some(Reverse(mut cursor)) => {
match self.heap.peek() {
Some(Reverse(cursor)) => {
let input_index = cursor.input_index;
if self.heap.is_empty() {
if self.heap.len() == 1 {
let cursor = self.heap.pop().unwrap().0;
// If there is no other block in the heap, we can drain the whole block.
need_output = self.drain_cursor(cursor);
} else {
let next_cursor = &self.heap.peek().unwrap().0;
// If the last row of current block is smaller than the next cursor,
// we can drain the whole block.
let next_cursor = &find_bigger_child_of_root(&self.heap).0;
if cursor.last().le(&next_cursor.current()) {
// If the last row of current block is smaller than the next cursor,
// we can drain the whole block.
let cursor = self.heap.pop().unwrap().0;
need_output = self.drain_cursor(cursor);
} else {
// We copy current cursor for advancing,
// and we will use this copied cursor to update the top of the heap at last
// (let heap adjust itself without popping and pushing any element).
let mut cursor = cursor.clone();
let block_index = self.blocks[input_index].len() - 1;
while !cursor.is_finished() && cursor.le(next_cursor) {
// If the cursor is smaller than the next cursor, don't need to push the cursor back to the heap.
Expand All @@ -289,9 +295,15 @@ where R: Rows
}
}
}

if !cursor.is_finished() {
self.heap.push(Reverse(cursor));
// Update the top of the heap.
// `self.heap.peek_mut` will return a `PeekMut` object which allows us to modify the top element of the heap.
// The heap will adjust itself automatically when the `PeekMut` object is dropped (RAII).
self.heap.peek_mut().unwrap().0 = cursor;
} else {
// Pop the current `cursor`.
self.heap.pop();
// We have read all rows of this block, need to read a new one.
self.cursor_finished[input_index] = true;
}
Expand Down Expand Up @@ -513,3 +525,15 @@ enum ProcessorState {
// Need to generate output block.
Generated(DataBlock), // Need to push output block to output port.
}

/// Find the bigger child of the root of the heap.
#[inline(always)]
fn find_bigger_child_of_root<T: Ord>(heap: &BinaryHeap<T>) -> &T {
debug_assert!(heap.len() >= 2);
let slice = heap.as_slice();
if heap.len() == 2 {
&slice[1]
} else {
(&slice[1]).max(&slice[2])
}
}

0 comments on commit db86904

Please sign in to comment.