Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: adjust heap without any popping or pushing. #13211

Merged
merged 3 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/query/pipeline/transforms/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@
#![deny(unused_crate_dependencies)]
#![feature(core_intrinsics)]
#![feature(int_roundings)]
#![feature(binary_heap_as_slice)]

pub mod processors;
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::cmp::Ordering;
use super::rows::Rows;

/// A cursor point to a certain row in a data block.
#[derive(Clone)]
pub struct Cursor<R: Rows> {
pub input_index: usize,
pub row_index: usize,
Expand Down Expand Up @@ -85,14 +86,3 @@ impl<R: Rows> PartialOrd for Cursor<R> {
Some(self.cmp(other))
}
}

impl<R: Clone + Rows> Clone for Cursor<R> {
fn clone(&self) -> Self {
Self {
input_index: self.input_index,
row_index: self.row_index,
num_rows: self.num_rows,
rows: self.rows.clone(),
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ where Self: Sized

/// Rows can be compared.
pub trait Rows
where Self: Sized
where Self: Sized + Clone
{
type Item<'a>: Ord
where Self: 'a;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub struct SimpleRow<T: ValueType> {
}

/// Rows structure for single simple types. (numbers, date, timestamp)
#[derive(Clone)]
pub struct SimpleRows<T: ValueType> {
inner: T::Column,
desc: bool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,19 +261,25 @@ where R: Rows
// Need to pop data to in_progress_rows.
// Use `>=` because some of the input ports may be finished, but the data is still in the heap.
while self.heap.len() >= nums_active_inputs && !need_output {
match self.heap.pop() {
Some(Reverse(mut cursor)) => {
match self.heap.peek() {
Some(Reverse(cursor)) => {
let input_index = cursor.input_index;
if self.heap.is_empty() {
if self.heap.len() == 1 {
let cursor = self.heap.pop().unwrap().0;
// If there is no other block in the heap, we can drain the whole block.
need_output = self.drain_cursor(cursor);
} else {
let next_cursor = &self.heap.peek().unwrap().0;
// If the last row of current block is smaller than the next cursor,
// we can drain the whole block.
let next_cursor = &find_bigger_child_of_root(&self.heap).0;
if cursor.last().le(&next_cursor.current()) {
// If the last row of current block is smaller than the next cursor,
// we can drain the whole block.
let cursor = self.heap.pop().unwrap().0;
need_output = self.drain_cursor(cursor);
} else {
// We copy current cursor for advancing,
// and we will use this copied cursor to update the top of the heap at last
// (let heap adjust itself without popping and pushing any element).
let mut cursor = cursor.clone();
let block_index = self.blocks[input_index].len() - 1;
while !cursor.is_finished() && cursor.le(next_cursor) {
// If the cursor is smaller than the next cursor, don't need to push the cursor back to the heap.
Expand All @@ -289,9 +295,15 @@ where R: Rows
}
}
}

if !cursor.is_finished() {
self.heap.push(Reverse(cursor));
// Update the top of the heap.
// `self.heap.peek_mut` will return a `PeekMut` object which allows us to modify the top element of the heap.
// The heap will adjust itself automatically when the `PeekMut` object is dropped (RAII).
self.heap.peek_mut().unwrap().0 = cursor;
} else {
// Pop the current `cursor`.
self.heap.pop();
// We have read all rows of this block, need to read a new one.
self.cursor_finished[input_index] = true;
}
Expand Down Expand Up @@ -513,3 +525,15 @@ enum ProcessorState {
// Need to generate output block.
Generated(DataBlock), // Need to push output block to output port.
}

/// Find the bigger child of the root of the heap.
#[inline(always)]
fn find_bigger_child_of_root<T: Ord>(heap: &BinaryHeap<T>) -> &T {
debug_assert!(heap.len() >= 2);
let slice = heap.as_slice();
if heap.len() == 2 {
&slice[1]
} else {
(&slice[1]).max(&slice[2])
}
}
Loading