diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/build_spill/build_spill_coordinator.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/build_spill/build_spill_coordinator.rs index 5f841b5f6d55..f3434b8e7395 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/build_spill/build_spill_coordinator.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/build_spill/build_spill_coordinator.rs @@ -40,8 +40,6 @@ pub struct BuildSpillCoordinator { pub(crate) total_builder_count: usize, /// Spill tasks, the size is the same as the total active processor count. pub(crate) spill_tasks: Mutex>>, - /// If send partition set to probe - pub(crate) send_partition_set: AtomicBool, /// When a build processor won't trigger spill, the field will plus one pub(crate) non_spill_processors: AtomicUsize, /// If there is the last active processor, send true to watcher channel @@ -57,7 +55,6 @@ impl BuildSpillCoordinator { waiting_spill_count: Default::default(), total_builder_count, spill_tasks: Default::default(), - send_partition_set: Default::default(), non_spill_processors: Default::default(), ready_spill_watcher, dummy_ready_spill_receiver, diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs index 8991ab017e1d..363660e0097b 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs @@ -220,7 +220,7 @@ impl HashJoinState { pub fn set_spilled_partition(&self, partitions: &HashSet) { let mut spill_partition = self.build_spilled_partitions.write(); - *spill_partition = partitions.clone(); + spill_partition.extend(partitions); } #[async_backtrace::framed] diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_build.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_build.rs index 200fe944c69b..44fbfba659d5 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_build.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_build.rs @@ -60,6 +60,8 @@ pub struct TransformHashJoinBuild { from_spill: bool, spill_state: Option>, spill_data: Option, + // If send partition set to probe + send_partition_set: bool, } impl TransformHashJoinBuild { @@ -79,6 +81,7 @@ impl TransformHashJoinBuild { finalize_finished: false, from_spill: false, processor_id, + send_partition_set: false, })) } @@ -201,18 +204,11 @@ impl Processor for TransformHashJoinBuild { if let Some(spill_state) = &mut self.spill_state { // Send spilled partition to `HashJoinState`, used by probe spill. // The method should be called only once. - if !spill_state - .spill_coordinator - .send_partition_set - .load(Ordering::Relaxed) - { + if !self.send_partition_set { self.build_state .hash_join_state .set_spilled_partition(&spill_state.spiller.spilled_partition_set); - spill_state - .spill_coordinator - .send_partition_set - .store(true, Ordering::Relaxed); + self.send_partition_set = true; } self.step = HashJoinBuildStep::WaitProbe; Ok(Event::Async) @@ -321,9 +317,10 @@ impl Processor for TransformHashJoinBuild { let spill_state = self.spill_state.as_mut().unwrap(); let mut hashes = Vec::with_capacity(data.num_rows()); spill_state.get_hashes(&data, &mut hashes)?; + let spilled_partition_set = spill_state.spiller.spilled_partition_set.clone(); let unspilled_data = spill_state .spiller - .spill_input(data, &hashes, self.processor_id) + .spill_input(data, &hashes, &spilled_partition_set, self.processor_id) .await?; if !unspilled_data.is_empty() { self.build_state.build(unspilled_data)?; diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs index 50c1c7adc572..edf56c633e47 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/transform_hash_join_probe.rs @@ -34,6 +34,7 @@ use crate::pipelines::processors::transforms::hash_join::HashJoinProbeState; use crate::pipelines::processors::transforms::hash_join::ProbeState; use crate::pipelines::processors::Processor; +#[derive(Debug)] enum HashJoinProbeStep { // The step is to wait build phase finished. WaitBuild, @@ -63,13 +64,12 @@ pub struct TransformHashJoinProbe { outer_scan_finished: bool, processor_id: usize, - // If it's first round, after last processor finish spill - // We need to read corresponding spilled data to probe with build hash table. - // Only need to use diff partitions data to probe first-round's hash table - diff_partitions: VecDeque, // If the processor has finished spill, set it to true. spill_done: bool, spill_state: Option>, + // If input data can't find proper partitions to spill, + // directly probe them with hashtable. + need_spill: bool, } impl TransformHashJoinProbe { @@ -97,10 +97,10 @@ impl TransformHashJoinProbe { probe_state: ProbeState::create(max_block_size, join_type, with_conjunct, func_ctx), max_block_size, outer_scan_finished: false, - diff_partitions: Default::default(), spill_done: false, spill_state: probe_spill_state, processor_id: id, + need_spill: true, })) } @@ -173,6 +173,11 @@ impl TransformHashJoinProbe { if let Some(remain) = remain_block { self.input_data.push_back(remain); } + if self.spill_state.is_some() { + self.need_spill = true; + self.step = HashJoinProbeStep::Spill; + return Ok(Event::Async); + } return Ok(Event::Sync); } @@ -183,11 +188,6 @@ impl TransformHashJoinProbe { self.join_probe_state.probe_done()?; Ok(Event::Async) } else { - if !self.diff_partitions.is_empty() { - // Continue to the first round - self.step = HashJoinProbeStep::AsyncRunning; - return Ok(Event::Async); - } if !self.join_probe_state.spill_partitions.read().is_empty() { self.join_probe_state.finish_final_probe()?; self.step = HashJoinProbeStep::WaitBuild; @@ -242,27 +242,6 @@ impl TransformHashJoinProbe { self.outer_scan_finished = false; Ok(()) } - - fn set_diff_partitions(&mut self) { - let spill_state = self.spill_state.as_ref().unwrap(); - let probe_spilled_partitions = &spill_state.spiller.spilled_partition_set; - let build_spilled_partitions = self - .join_probe_state - .hash_join_state - .build_spilled_partitions - .read() - .clone(); - let partitions_diff = probe_spilled_partitions - .difference(&build_spilled_partitions) - .cloned() - .collect(); - self.diff_partitions.extend(&partitions_diff); - let mut spill_partitions = self.join_probe_state.spill_partitions.write(); - *spill_partitions = spill_partitions - .difference(&partitions_diff) - .cloned() - .collect(); - } } #[async_trait::async_trait] @@ -280,6 +259,10 @@ impl Processor for TransformHashJoinProbe { HashJoinProbeStep::WaitBuild => Ok(Event::Async), HashJoinProbeStep::Spill => { if !self.input_data.is_empty() { + if !self.need_spill { + self.step = HashJoinProbeStep::Running; + return Ok(Event::Sync); + } return Ok(Event::Async); } @@ -303,17 +286,6 @@ impl Processor for TransformHashJoinProbe { spill_partitions.extend(spilled_partition_set); } - if !spilled_partition_set.is_empty() - && unsafe { &*self.join_probe_state.hash_join_state.build_num_rows.get() } - != &(0_usize) - { - self.set_diff_partitions(); - self.spill_done = true; - self.join_probe_state.finish_spill()?; - self.step = HashJoinProbeStep::AsyncRunning; - return Ok(Event::Async); - } - self.spill_done = true; self.join_probe_state.finish_spill()?; // Wait build side to build hash table @@ -397,7 +369,7 @@ impl Processor for TransformHashJoinProbe { HashJoinProbeStep::FastReturn | HashJoinProbeStep::WaitBuild | HashJoinProbeStep::Spill - | HashJoinProbeStep::AsyncRunning => unreachable!(), + | HashJoinProbeStep::AsyncRunning => unreachable!("{:?}", self.step), } } @@ -488,20 +460,29 @@ impl Processor for TransformHashJoinProbe { let spill_state = self.spill_state.as_mut().unwrap(); let mut hashes = Vec::with_capacity(data.num_rows()); spill_state.get_hashes(&data, &mut hashes)?; - // FIXME: we can directly discard `_non_matched_data`, because there is no matched data with build side. - let _non_matched_data = spill_state + // Pass build spilled partition set, we only need to spill data in build spilled partition set + let build_spilled_partitions = self + .join_probe_state + .hash_join_state + .build_spilled_partitions + .read() + .clone(); + let non_matched_data = spill_state .spiller - .spill_input(data, &hashes, self.processor_id) + .spill_input(data, &hashes, &build_spilled_partitions, self.processor_id) .await?; + // Use `non_matched_data` to probe the first round hashtable (if the hashtable isn't empty) + if !non_matched_data.is_empty() + && unsafe { &*self.join_probe_state.hash_join_state.build_num_rows.get() } + != &(0_usize) + { + self.input_data.push_back(non_matched_data); + self.need_spill = false; + } } } HashJoinProbeStep::AsyncRunning => { let spill_state = self.spill_state.as_ref().unwrap(); - if let Some(p_id) = self.diff_partitions.pop_back() { - let spilled_data = spill_state.spiller.read_spilled_data(&p_id).await?; - self.input_data.extend(spilled_data); - return Ok(()); - } let p_id = self .join_probe_state .hash_join_state diff --git a/src/query/service/src/spillers/spiller.rs b/src/query/service/src/spillers/spiller.rs index 68a1b50de8a4..9e056df9f354 100644 --- a/src/query/service/src/spillers/spiller.rs +++ b/src/query/service/src/spillers/spiller.rs @@ -217,6 +217,7 @@ impl Spiller { &mut self, data_block: DataBlock, hashes: &[u64], + spilled_partition_set: &HashSet, worker_id: usize, ) -> Result { // Save the row index which is not spilled. @@ -226,9 +227,7 @@ impl Spiller { // Classify rows to spill or not spill. for (row_idx, hash) in hashes.iter().enumerate() { let partition_id = *hash as u8 & 0b0000_0011; - if self.spilled_partition_set.contains(&partition_id) - || self.spiller_type == SpillerType::HashJoinProbe - { + if spilled_partition_set.contains(&partition_id) { // the row can be directly spilled to corresponding partition partition_rows .entry(partition_id) diff --git a/tests/sqllogictests/suites/query/spill.test b/tests/sqllogictests/suites/query/spill.test index 0f14d278c563..05bf050ff1a2 100644 --- a/tests/sqllogictests/suites/query/spill.test +++ b/tests/sqllogictests/suites/query/spill.test @@ -58,19 +58,7 @@ select a from t3 inner join numbers(100000) on t3.a = number order by a; 0 0 -statement ok -drop table t3; - -statement ok -create table t3 as select number as a from numbers(1000000); -statement ok -set join_spilling_threshold = 100; - -query I -select count() from t3 inner join numbers(1000000) on t3.a = number; ----- -1000000 statement ok drop table t3;