Skip to content

Commit

Permalink
feat: speed up iejoin (#13229)
Browse files Browse the repository at this point in the history
  • Loading branch information
xudong963 authored Oct 12, 2023
1 parent be5d33d commit e048513
Showing 1 changed file with 23 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::atomic::AtomicU64;
use std::sync::Arc;

use common_base::base::tokio::sync::Notify;
use common_catalog::table_context::TableContext;
use common_exception::Result;
use common_expression::types::DataType;
use common_expression::types::NumberDataType;
Expand Down Expand Up @@ -171,8 +172,29 @@ impl RangeJoinState {
}

pub(crate) fn partition(&self) -> Result<()> {
let max_threads = self.ctx.get_settings().get_max_threads()? as usize;
let left_table = self.left_table.read();
let right_table = self.right_table.read();
// Right table is bigger than left table
let mut right_table = self.right_table.write();
if !left_table.is_empty()
&& !right_table.is_empty()
&& left_table.len() * right_table.len() < max_threads
{
let num_parts = max_threads / left_table.len() + 1;
// Spit right_table to num_parts equally
let merged_right_table = DataBlock::concat(&right_table)?;
let mut indices = Vec::with_capacity(merged_right_table.num_rows());
for idx in 0..merged_right_table.num_rows() {
indices.push((idx % num_parts) as u32);
}
let scatter_blocks = DataBlock::scatter(&merged_right_table, &indices, num_parts)?;
right_table.clear();
for block in scatter_blocks.iter() {
if !block.is_empty() {
right_table.push(block.clone());
}
}
}

let mut left_sorted_blocks = self.left_sorted_blocks.write();
let mut right_sorted_blocks = self.right_sorted_blocks.write();
Expand Down

1 comment on commit e048513

@vercel
Copy link

@vercel vercel bot commented on e048513 Oct 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.