Skip to content

Commit

Permalink
add loser tree merge
Browse files Browse the repository at this point in the history
feat: reorganise

remove

format

format

fix index out of bound bug

fix index out of bound bug

approve organization

test
  • Loading branch information
陈侗 authored and 陈侗 committed Oct 9, 2023
1 parent 5c188c5 commit 3ce59af
Showing 1 changed file with 72 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use std::cmp::Reverse;
use std::collections::BinaryHeap;
use std::intrinsics::unlikely;
use std::marker::PhantomData;
use std::sync::atomic::AtomicBool;
Expand Down Expand Up @@ -48,6 +47,9 @@ use super::sort::SimpleRows;
use super::Compactor;
use super::TransformCompact;

const MIN_LOSER_TREE_MARK: i32 = -1;
const MAX_LOSER_TREE_MARK: i32 = -2;

/// Merge sort blocks without limit.
///
/// For merge sort with limit, see [`super::transform_sort_merge_limit`]
Expand Down Expand Up @@ -89,6 +91,52 @@ where
_r: PhantomData,
})
}

fn adjust(
loser_tree: &mut [(i32, i32)],
loser_tree_leaf: &mut [Reverse<Cursor<R>>],
index: usize,
len: usize,
) {
let mut father = (index + len) / 2;
let mut cursor_index = if loser_tree_leaf[index].0.is_finished() {
(MAX_LOSER_TREE_MARK, MAX_LOSER_TREE_MARK)
} else {
(index as i32, loser_tree_leaf[index].0.row_index as i32)
};
while father > 0 {
match (cursor_index.0, loser_tree[father].0) {
(_, MAX_LOSER_TREE_MARK) => {}
(MIN_LOSER_TREE_MARK, _) => {
break;
}
(MAX_LOSER_TREE_MARK, MIN_LOSER_TREE_MARK) => {
loser_tree[father] = (MAX_LOSER_TREE_MARK, MAX_LOSER_TREE_MARK);
cursor_index = (MIN_LOSER_TREE_MARK, MIN_LOSER_TREE_MARK);
}
(MAX_LOSER_TREE_MARK, _) => {
cursor_index = loser_tree[father];
loser_tree[father] = (MAX_LOSER_TREE_MARK, MAX_LOSER_TREE_MARK);
}
(_, MIN_LOSER_TREE_MARK) => {
loser_tree[father] = cursor_index;
cursor_index = (MIN_LOSER_TREE_MARK, MIN_LOSER_TREE_MARK);
}
_ => {
if loser_tree_leaf[cursor_index.0 as usize]
.0
.gt(&loser_tree_leaf[loser_tree[father].0 as usize].0)
{
std::mem::swap(&mut loser_tree[father], &mut cursor_index);
}
}
}

father /= 2;
}

loser_tree[0] = cursor_index;
}
}

impl<R, Converter> Compactor for SortMergeCompactor<R, Converter>
Expand All @@ -109,17 +157,19 @@ where
return Ok(vec![]);
}

let mut blocks = blocks
.into_iter()
.filter(|b| !b.is_empty())
.collect::<Vec<_>>();

let output_size = blocks.iter().map(|b| b.num_rows()).sum::<usize>();
if output_size == 0 {
return Ok(vec![]);
}

let mut blocks = blocks
.into_iter()
.filter(|b| !b.is_empty())
.collect::<Vec<_>>();
let len = blocks.len();

if blocks.len() == 1 {
if len == 1 {
if self.gen_order_col {
let block = blocks.get_mut(0).ok_or(ErrorCode::Internal("It's a bug"))?;
let columns = self
Expand All @@ -140,9 +190,10 @@ where
let output_block_num = output_size.div_ceil(self.block_size);
let mut output_blocks = Vec::with_capacity(output_block_num);
let mut output_indices = Vec::with_capacity(output_size);
let mut heap: BinaryHeap<Reverse<Cursor<R>>> = BinaryHeap::with_capacity(blocks.len());
let mut loser_tree = vec![(MIN_LOSER_TREE_MARK, MIN_LOSER_TREE_MARK); len];
let mut loser_tree_leaf = Vec::with_capacity(len);

// 1. Put all blocks into a min-heap.
// push all blocks into loser tree leaf.
for (i, block) in blocks.iter_mut().enumerate() {
let columns = self
.order_by_cols
Expand All @@ -159,44 +210,29 @@ where
});
}
let cursor = Cursor::new(i, rows);
heap.push(Reverse(cursor));
loser_tree_leaf.push(Reverse(cursor));
}

for i in 0..len {
Self::adjust(&mut loser_tree, &mut loser_tree_leaf, i, len);
}

// 2. Drain the heap
while let Some(Reverse(mut cursor)) = heap.pop() {
while loser_tree[0].0 != MAX_LOSER_TREE_MARK {
if unlikely(self.aborting.load(Ordering::Relaxed)) {
return Err(ErrorCode::AbortedQuery(
"Aborted query, because the server is shutting down or the query was killed.",
));
}
output_indices.push((
loser_tree_leaf[loser_tree[0].0 as usize].0.input_index,
loser_tree[0].1 as usize,
));
let index = loser_tree[0].0 as usize;
loser_tree_leaf[index].0.advance();

let block_idx = cursor.input_index;
if heap.is_empty() {
// If there is no other block in the heap, we can drain the whole block.
while !cursor.is_finished() {
output_indices.push((block_idx, cursor.advance()));
}
} else {
let next_cursor = &heap.peek().unwrap().0;
// If the last row of current block is smaller than the next cursor,
// we can drain the whole block.
if cursor.last().le(&next_cursor.current()) {
while !cursor.is_finished() {
output_indices.push((block_idx, cursor.advance()));
}
} else {
while !cursor.is_finished() && cursor.le(next_cursor) {
// If the cursor is smaller than the next cursor, don't need to push the cursor back to the heap.
output_indices.push((block_idx, cursor.advance()));
}
if !cursor.is_finished() {
heap.push(Reverse(cursor));
}
}
}
Self::adjust(&mut loser_tree, &mut loser_tree_leaf, index, len);
}

// 3. Build final blocks from `output_indices`.
for i in 0..output_block_num {
if unlikely(self.aborting.load(Ordering::Relaxed)) {
return Err(ErrorCode::AbortedQuery(
Expand Down

0 comments on commit 3ce59af

Please sign in to comment.