Skip to content

Commit

Permalink
feat: reduce probe spilled data and I/O (#13035)
Browse files Browse the repository at this point in the history
* feat: reduce probe spilled data

* fix

* fix
  • Loading branch information
xudong963 authored Sep 27, 2023
1 parent d53524f commit c130d26
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ pub struct BuildSpillCoordinator {
pub(crate) total_builder_count: usize,
/// Spill tasks, the size is the same as the total active processor count.
pub(crate) spill_tasks: Mutex<VecDeque<Vec<(u8, DataBlock)>>>,
/// If send partition set to probe
pub(crate) send_partition_set: AtomicBool,
/// When a build processor won't trigger spill, the field will plus one
pub(crate) non_spill_processors: AtomicUsize,
/// If there is the last active processor, send true to watcher channel
Expand All @@ -57,7 +55,6 @@ impl BuildSpillCoordinator {
waiting_spill_count: Default::default(),
total_builder_count,
spill_tasks: Default::default(),
send_partition_set: Default::default(),
non_spill_processors: Default::default(),
ready_spill_watcher,
dummy_ready_spill_receiver,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ impl HashJoinState {

pub fn set_spilled_partition(&self, partitions: &HashSet<u8>) {
let mut spill_partition = self.build_spilled_partitions.write();
*spill_partition = partitions.clone();
spill_partition.extend(partitions);
}

#[async_backtrace::framed]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ pub struct TransformHashJoinBuild {
from_spill: bool,
spill_state: Option<Box<BuildSpillState>>,
spill_data: Option<DataBlock>,
// If send partition set to probe
send_partition_set: bool,
}

impl TransformHashJoinBuild {
Expand All @@ -79,6 +81,7 @@ impl TransformHashJoinBuild {
finalize_finished: false,
from_spill: false,
processor_id,
send_partition_set: false,
}))
}

Expand Down Expand Up @@ -201,18 +204,11 @@ impl Processor for TransformHashJoinBuild {
if let Some(spill_state) = &mut self.spill_state {
// Send spilled partition to `HashJoinState`, used by probe spill.
// The method should be called only once.
if !spill_state
.spill_coordinator
.send_partition_set
.load(Ordering::Relaxed)
{
if !self.send_partition_set {
self.build_state
.hash_join_state
.set_spilled_partition(&spill_state.spiller.spilled_partition_set);
spill_state
.spill_coordinator
.send_partition_set
.store(true, Ordering::Relaxed);
self.send_partition_set = true;
}
self.step = HashJoinBuildStep::WaitProbe;
Ok(Event::Async)
Expand Down Expand Up @@ -321,9 +317,10 @@ impl Processor for TransformHashJoinBuild {
let spill_state = self.spill_state.as_mut().unwrap();
let mut hashes = Vec::with_capacity(data.num_rows());
spill_state.get_hashes(&data, &mut hashes)?;
let spilled_partition_set = spill_state.spiller.spilled_partition_set.clone();
let unspilled_data = spill_state
.spiller
.spill_input(data, &hashes, self.processor_id)
.spill_input(data, &hashes, &spilled_partition_set, self.processor_id)
.await?;
if !unspilled_data.is_empty() {
self.build_state.build(unspilled_data)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::pipelines::processors::transforms::hash_join::HashJoinProbeState;
use crate::pipelines::processors::transforms::hash_join::ProbeState;
use crate::pipelines::processors::Processor;

#[derive(Debug)]
enum HashJoinProbeStep {
// The step is to wait build phase finished.
WaitBuild,
Expand Down Expand Up @@ -63,13 +64,12 @@ pub struct TransformHashJoinProbe {
outer_scan_finished: bool,
processor_id: usize,

// If it's first round, after last processor finish spill
// We need to read corresponding spilled data to probe with build hash table.
// Only need to use diff partitions data to probe first-round's hash table
diff_partitions: VecDeque<u8>,
// If the processor has finished spill, set it to true.
spill_done: bool,
spill_state: Option<Box<ProbeSpillState>>,
// If input data can't find proper partitions to spill,
// directly probe them with hashtable.
need_spill: bool,
}

impl TransformHashJoinProbe {
Expand Down Expand Up @@ -97,10 +97,10 @@ impl TransformHashJoinProbe {
probe_state: ProbeState::create(max_block_size, join_type, with_conjunct, func_ctx),
max_block_size,
outer_scan_finished: false,
diff_partitions: Default::default(),
spill_done: false,
spill_state: probe_spill_state,
processor_id: id,
need_spill: true,
}))
}

Expand Down Expand Up @@ -173,6 +173,11 @@ impl TransformHashJoinProbe {
if let Some(remain) = remain_block {
self.input_data.push_back(remain);
}
if self.spill_state.is_some() {
self.need_spill = true;
self.step = HashJoinProbeStep::Spill;
return Ok(Event::Async);
}
return Ok(Event::Sync);
}

Expand All @@ -183,11 +188,6 @@ impl TransformHashJoinProbe {
self.join_probe_state.probe_done()?;
Ok(Event::Async)
} else {
if !self.diff_partitions.is_empty() {
// Continue to the first round
self.step = HashJoinProbeStep::AsyncRunning;
return Ok(Event::Async);
}
if !self.join_probe_state.spill_partitions.read().is_empty() {
self.join_probe_state.finish_final_probe()?;
self.step = HashJoinProbeStep::WaitBuild;
Expand Down Expand Up @@ -242,27 +242,6 @@ impl TransformHashJoinProbe {
self.outer_scan_finished = false;
Ok(())
}

fn set_diff_partitions(&mut self) {
let spill_state = self.spill_state.as_ref().unwrap();
let probe_spilled_partitions = &spill_state.spiller.spilled_partition_set;
let build_spilled_partitions = self
.join_probe_state
.hash_join_state
.build_spilled_partitions
.read()
.clone();
let partitions_diff = probe_spilled_partitions
.difference(&build_spilled_partitions)
.cloned()
.collect();
self.diff_partitions.extend(&partitions_diff);
let mut spill_partitions = self.join_probe_state.spill_partitions.write();
*spill_partitions = spill_partitions
.difference(&partitions_diff)
.cloned()
.collect();
}
}

#[async_trait::async_trait]
Expand All @@ -280,6 +259,10 @@ impl Processor for TransformHashJoinProbe {
HashJoinProbeStep::WaitBuild => Ok(Event::Async),
HashJoinProbeStep::Spill => {
if !self.input_data.is_empty() {
if !self.need_spill {
self.step = HashJoinProbeStep::Running;
return Ok(Event::Sync);
}
return Ok(Event::Async);
}

Expand All @@ -303,17 +286,6 @@ impl Processor for TransformHashJoinProbe {
spill_partitions.extend(spilled_partition_set);
}

if !spilled_partition_set.is_empty()
&& unsafe { &*self.join_probe_state.hash_join_state.build_num_rows.get() }
!= &(0_usize)
{
self.set_diff_partitions();
self.spill_done = true;
self.join_probe_state.finish_spill()?;
self.step = HashJoinProbeStep::AsyncRunning;
return Ok(Event::Async);
}

self.spill_done = true;
self.join_probe_state.finish_spill()?;
// Wait build side to build hash table
Expand Down Expand Up @@ -397,7 +369,7 @@ impl Processor for TransformHashJoinProbe {
HashJoinProbeStep::FastReturn
| HashJoinProbeStep::WaitBuild
| HashJoinProbeStep::Spill
| HashJoinProbeStep::AsyncRunning => unreachable!(),
| HashJoinProbeStep::AsyncRunning => unreachable!("{:?}", self.step),
}
}

Expand Down Expand Up @@ -488,20 +460,29 @@ impl Processor for TransformHashJoinProbe {
let spill_state = self.spill_state.as_mut().unwrap();
let mut hashes = Vec::with_capacity(data.num_rows());
spill_state.get_hashes(&data, &mut hashes)?;
// FIXME: we can directly discard `_non_matched_data`, because there is no matched data with build side.
let _non_matched_data = spill_state
// Pass build spilled partition set, we only need to spill data in build spilled partition set
let build_spilled_partitions = self
.join_probe_state
.hash_join_state
.build_spilled_partitions
.read()
.clone();
let non_matched_data = spill_state
.spiller
.spill_input(data, &hashes, self.processor_id)
.spill_input(data, &hashes, &build_spilled_partitions, self.processor_id)
.await?;
// Use `non_matched_data` to probe the first round hashtable (if the hashtable isn't empty)
if !non_matched_data.is_empty()
&& unsafe { &*self.join_probe_state.hash_join_state.build_num_rows.get() }
!= &(0_usize)
{
self.input_data.push_back(non_matched_data);
self.need_spill = false;
}
}
}
HashJoinProbeStep::AsyncRunning => {
let spill_state = self.spill_state.as_ref().unwrap();
if let Some(p_id) = self.diff_partitions.pop_back() {
let spilled_data = spill_state.spiller.read_spilled_data(&p_id).await?;
self.input_data.extend(spilled_data);
return Ok(());
}
let p_id = self
.join_probe_state
.hash_join_state
Expand Down
5 changes: 2 additions & 3 deletions src/query/service/src/spillers/spiller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ impl Spiller {
&mut self,
data_block: DataBlock,
hashes: &[u64],
spilled_partition_set: &HashSet<u8>,
worker_id: usize,
) -> Result<DataBlock> {
// Save the row index which is not spilled.
Expand All @@ -226,9 +227,7 @@ impl Spiller {
// Classify rows to spill or not spill.
for (row_idx, hash) in hashes.iter().enumerate() {
let partition_id = *hash as u8 & 0b0000_0011;
if self.spilled_partition_set.contains(&partition_id)
|| self.spiller_type == SpillerType::HashJoinProbe
{
if spilled_partition_set.contains(&partition_id) {
// the row can be directly spilled to corresponding partition
partition_rows
.entry(partition_id)
Expand Down
12 changes: 0 additions & 12 deletions tests/sqllogictests/suites/query/spill.test
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,7 @@ select a from t3 inner join numbers(100000) on t3.a = number order by a;
0
0

statement ok
drop table t3;

statement ok
create table t3 as select number as a from numbers(1000000);

statement ok
set join_spilling_threshold = 100;

query I
select count() from t3 inner join numbers(1000000) on t3.a = number;
----
1000000

statement ok
drop table t3;
Expand Down

1 comment on commit c130d26

@vercel
Copy link

@vercel vercel bot commented on c130d26 Sep 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.