diff --git a/src/stream/src/executor/hash_join.rs b/src/stream/src/executor/hash_join.rs index d12117d74e0a..63bc653863c2 100644 --- a/src/stream/src/executor/hash_join.rs +++ b/src/stream/src/executor/hash_join.rs @@ -11,9 +11,10 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashSet}; +use std::collections::{BTreeMap, Bound, HashSet}; use std::time::Duration; +use anyhow::Context; use itertools::Itertools; use multimap::MultiMap; use risingwave_common::array::Op; @@ -23,6 +24,7 @@ use risingwave_common::util::epoch::EpochPair; use risingwave_common::util::iter_util::ZipEqDebug; use risingwave_expr::expr::NonStrictExpression; use risingwave_expr::ExprError; +use risingwave_storage::store::PrefetchOptions; use tokio::time::Instant; use self::builder::JoinChunkBuilder; @@ -735,6 +737,17 @@ impl HashJoinExecutor) -> Option> { + if !key.null_bitmap().is_subset(ht.null_matched()) { + tracing::info!("no match for join key: {:?}", key); + None + } else { + Some(ht.take_state_opt(key)) + } + } + fn row_concat( row_update: &RowRef<'_>, update_start_pos: usize, @@ -756,14 +769,440 @@ impl HashJoinExecutor, ) -> impl Stream> + '_ { - Self::eq_join_oneside::<{ SideType::Left }>(args) + Self::eq_join_oneside_opt::<{ SideType::Left }>(args) } /// Used to forward `eq_join_oneside` to show join side in stack. fn eq_join_right( args: EqJoinArgs<'_, K, S>, ) -> impl Stream> + '_ { - Self::eq_join_oneside::<{ SideType::Right }>(args) + Self::eq_join_oneside_opt::<{ SideType::Right }>(args) + } + + /// Fetch and match rows + #[try_stream(ok = StreamChunk, error = StreamExecutorError)] + async fn handle_fetch_matched_rows<'a, const SIDE: SideTypePrimitive>( + row: RowRef<'a>, + key: &'a K, + op: Op, + hashjoin_chunk_builder: &'a mut JoinChunkBuilder, + side_match: &'a mut JoinSide, + side_update: &'a mut JoinSide, + useful_state_clean_columns: &'a [(usize, &'a Watermark)], + cond: &'a mut Option, + append_only_optimize: bool, + ) { + let mut entry_state = JoinEntryState::default(); + let mut entry_state_count = 0; + let entry_state_max_rows = 30000; + #[for_await] + for result in side_match.ht.fetch_matched_rows(key) { + let (encoded_pk, join_row) = result?; + if entry_state_count <= entry_state_max_rows { + entry_state + .insert(encoded_pk, join_row.encode()) + .with_context(|| side_match.ht.error_context(&join_row.row))?; + entry_state_count += 1; + } + match op { + Op::Insert | Op::UpdateInsert => { + if let Some(chunk) = + hashjoin_chunk_builder.with_match_on_insert(&row, &join_row) + { + yield chunk; + } + } + Op::Delete | Op::UpdateDelete => { + if let Some(chunk) = + hashjoin_chunk_builder.with_match_on_delete(&row, &join_row) + { + yield chunk; + } + } + } + } + match op { + Op::Insert | Op::UpdateInsert => { + tracing::info!("insert row: {:?}", row); + side_update.ht.insert_row(key, row).await?; + } + Op::Delete | Op::UpdateDelete => { + side_update.ht.delete_row(key, row)?; + } + } + if entry_state_count <= entry_state_max_rows { + side_match.ht.update_state(key, Box::new(entry_state)); + } else { + } + } + + #[try_stream(ok = StreamChunk, error = StreamExecutorError)] + async fn handle_cached_matched_rows<'a, const SIDE: SideTypePrimitive>( + matched_rows: Option, + row: RowRef<'a>, + op: Op, + key: &'a K, + hashjoin_chunk_builder: &'a mut JoinChunkBuilder, + side_match: &'a mut JoinSide, + side_update: &'a mut JoinSide, + useful_state_clean_columns: &'a [(usize, &'a Watermark)], + cond: &'a mut Option, + append_only_optimize: bool, + ) { + match op { + Op::Insert | Op::UpdateInsert => { + let mut degree = 0; + let mut append_only_matched_row: Option> = None; + if let Some(mut matched_rows) = matched_rows { + let mut matched_rows_to_clean = vec![]; + for (matched_row_ref, matched_row) in + matched_rows.values_mut(&side_match.all_data_types) + { + let mut matched_row = matched_row?; + // TODO(yuhao-su): We should find a better way to eval the expression + // without concat two rows. + // if there are non-equi expressions + let check_join_condition = if let Some(ref mut cond) = cond { + let new_row = Self::row_concat( + &row, + side_update.start_pos, + &matched_row.row, + side_match.start_pos, + ); + + cond.eval_row_infallible(&new_row) + .await + .map(|s| *s.as_bool()) + .unwrap_or(false) + } else { + true + }; + let mut need_state_clean = false; + if check_join_condition { + degree += 1; + if !forward_exactly_once(T, SIDE) { + if let Some(chunk) = + hashjoin_chunk_builder.with_match_on_insert(&row, &matched_row) + { + yield chunk; + } + } + if side_match.need_degree_table { + side_match.ht.inc_degree(matched_row_ref, &mut matched_row); + } + } else { + for (column_idx, watermark) in useful_state_clean_columns { + if matched_row + .row + .datum_at(*column_idx) + .map_or(false, |scalar| { + scalar + .default_cmp(&watermark.val.as_scalar_ref_impl()) + .is_lt() + }) + { + need_state_clean = true; + break; + } + } + } + // If the stream is append-only and the join key covers pk in both side, + // then we can remove matched rows since pk is unique and will not be + // inserted again + if append_only_optimize { + // Since join key contains pk and pk is unique, there should be only + // one row if matched. + assert!(append_only_matched_row.is_none()); + append_only_matched_row = Some(matched_row); + } else if need_state_clean { + // `append_only_optimize` and `need_state_clean` won't both be true. + // 'else' here is only to suppress compiler error. + matched_rows_to_clean.push(matched_row); + } + } + if degree == 0 { + if let Some(chunk) = + hashjoin_chunk_builder.forward_if_not_matched(Op::Insert, row) + { + yield chunk; + } + } else if let Some(chunk) = + hashjoin_chunk_builder.forward_exactly_once_if_matched(Op::Insert, row) + { + yield chunk; + } + // Insert back the state taken from ht. + side_match.ht.update_state(key, matched_rows); + for matched_row in matched_rows_to_clean { + if side_match.need_degree_table { + side_match.ht.delete(key, matched_row)?; + } else { + side_match.ht.delete_row(key, matched_row.row)?; + } + } + + if append_only_optimize && let Some(row) = append_only_matched_row { + if side_match.need_degree_table { + side_match.ht.delete(key, row)?; + } else { + side_match.ht.delete_row(key, row.row)?; + } + } else if side_update.need_degree_table { + side_update.ht.insert(key, JoinRow::new(row, degree)).await?; + } else { + side_update.ht.insert_row(key, row).await?; + } + } else { + // Row which violates null-safe bitmap will never be matched so we need not + // store. + if let Some(chunk) = + hashjoin_chunk_builder.forward_if_not_matched(Op::Insert, row) + { + yield chunk; + } + } + } + Op::Delete | Op::UpdateDelete => { + let mut degree = 0; + if let Some(mut matched_rows) = matched_rows { + let mut matched_rows_to_clean = vec![]; + for (matched_row_ref, matched_row) in + matched_rows.values_mut(&side_match.all_data_types) + { + let mut matched_row = matched_row?; + // TODO(yuhao-su): We should find a better way to eval the expression + // without concat two rows. + // if there are non-equi expressions + let check_join_condition = if let Some(ref mut cond) = cond { + let new_row = Self::row_concat( + &row, + side_update.start_pos, + &matched_row.row, + side_match.start_pos, + ); + + cond.eval_row_infallible(&new_row) + .await + .map(|s| *s.as_bool()) + .unwrap_or(false) + } else { + true + }; + let mut need_state_clean = false; + if check_join_condition { + degree += 1; + if side_match.need_degree_table { + side_match.ht.dec_degree(matched_row_ref, &mut matched_row); + } + if !forward_exactly_once(T, SIDE) { + if let Some(chunk) = + hashjoin_chunk_builder.with_match_on_delete(&row, &matched_row) + { + yield chunk; + } + } + } else { + for (column_idx, watermark) in useful_state_clean_columns { + if matched_row + .row + .datum_at(*column_idx) + .map_or(false, |scalar| { + scalar + .default_cmp(&watermark.val.as_scalar_ref_impl()) + .is_lt() + }) + { + need_state_clean = true; + break; + } + } + } + if need_state_clean { + matched_rows_to_clean.push(matched_row); + } + } + if degree == 0 { + if let Some(chunk) = + hashjoin_chunk_builder.forward_if_not_matched(Op::Delete, row) + { + yield chunk; + } + } else if let Some(chunk) = + hashjoin_chunk_builder.forward_exactly_once_if_matched(Op::Delete, row) + { + yield chunk; + } + // Insert back the state taken from ht. + side_match.ht.update_state(key, matched_rows); + for matched_row in matched_rows_to_clean { + if side_match.need_degree_table { + side_match.ht.delete(key, matched_row)?; + } else { + side_match.ht.delete_row(key, matched_row.row)?; + } + } + + if append_only_optimize { + unreachable!(); + } else if side_update.need_degree_table { + side_update.ht.delete(key, JoinRow::new(row, degree))?; + } else { + side_update.ht.delete_row(key, row)?; + }; + } else { + // We do not store row which violates null-safe bitmap. + if let Some(chunk) = + hashjoin_chunk_builder.forward_if_not_matched(Op::Delete, row) + { + yield chunk; + } + } + } + } + } + + #[try_stream(ok = StreamChunk, error = StreamExecutorError)] + async fn eq_join_oneside_opt(args: EqJoinArgs<'_, K, S>) { + let EqJoinArgs { + ctx, + side_l, + side_r, + actual_output_data_types, + cond, + inequality_watermarks, + chunk, + append_only_optimize, + chunk_size, + cnt_rows_received, + high_join_amplification_threshold, + .. + } = args; + + let (side_update, side_match) = if SIDE == SideType::Left { + (side_l, side_r) + } else { + (side_r, side_l) + }; + + let useful_state_clean_columns = side_match + .state_clean_columns + .iter() + .filter_map(|(column_idx, inequality_index)| { + inequality_watermarks[*inequality_index] + .as_ref() + .map(|watermark| (*column_idx, watermark)) + }) + .collect_vec(); + + let mut hashjoin_chunk_builder = + JoinChunkBuilder::::new(JoinStreamChunkBuilder::new( + chunk_size, + actual_output_data_types.to_vec(), + side_update.i2o_mapping.clone(), + side_match.i2o_mapping.clone(), + )); + + let join_matched_join_keys = ctx + .streaming_metrics + .join_matched_join_keys + .with_label_values(&[ + &ctx.id.to_string(), + &ctx.fragment_id.to_string(), + &side_update.ht.table_id().to_string(), + ]); + + let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk()); + for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) { + let Some((op, row)) = r else { + continue; + }; + Self::evict_cache(side_update, side_match, cnt_rows_received); + + let matched_rows: Option> = if side_update + .non_null_fields + .iter() + .all(|column_idx| unsafe { row.datum_at_unchecked(*column_idx).is_some() }) + { + Self::hash_eq_match_opt(key, &mut side_match.ht) + } else { + None + }; + + // if let Some(rows) = &matched_rows { + // join_matched_join_keys.observe(rows.len() as _); + // if rows.len() > high_join_amplification_threshold { + // let join_key_data_types = side_update.ht.join_key_data_types(); + // let key = key.deserialize(join_key_data_types)?; + // tracing::warn!(target: "high_join_amplification", + // matched_rows_len = rows.len(), + // update_table_id = side_update.ht.table_id(), + // match_table_id = side_match.ht.table_id(), + // join_key = ?key, + // actor_id = ctx.id, + // fragment_id = ctx.fragment_id, + // "large rows matched for join key" + // ); + // } + // } else { + // join_matched_join_keys.observe(0.0) + // } + + match matched_rows { + Some(Some(rows)) => { + #[for_await] + for chunk in Self::handle_cached_matched_rows( + Some(rows), + row, + op, + key, + &mut hashjoin_chunk_builder, + side_match, + side_update, + &useful_state_clean_columns, + cond, + append_only_optimize, + ) { + yield chunk?; + } + } + Some(None) => { + #[for_await] + for chunk in Self::handle_fetch_matched_rows( + row, + key, + op, + &mut hashjoin_chunk_builder, + side_match, + side_update, + &useful_state_clean_columns, + cond, + append_only_optimize, + ) { + yield chunk?; + } + } + None => { + #[for_await] + for chunk in Self::handle_cached_matched_rows( + None, + row, + op, + key, + &mut hashjoin_chunk_builder, + side_match, + side_update, + &useful_state_clean_columns, + cond, + append_only_optimize, + ) { + yield chunk?; + } + } + } + } + if let Some(chunk) = hashjoin_chunk_builder.take() { + yield chunk; + } } #[try_stream(ok = StreamChunk, error = StreamExecutorError)] diff --git a/src/stream/src/executor/join/hash_join.rs b/src/stream/src/executor/join/hash_join.rs index 10e9e26f784c..ec7bb1fb93f7 100644 --- a/src/stream/src/executor/join/hash_join.rs +++ b/src/stream/src/executor/join/hash_join.rs @@ -11,7 +11,6 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - use std::alloc::Global; use std::cmp::Ordering; use std::ops::{Bound, Deref, DerefMut}; @@ -43,6 +42,7 @@ use crate::consistency::{consistency_error, consistency_panic, enable_strict_con use crate::executor::error::StreamExecutorResult; use crate::executor::join::row::JoinRow; use crate::executor::monitor::StreamingMetrics; +use crate::executor::StreamExecutorError; use crate::task::{ActorId, AtomicU64Ref, FragmentId}; /// Memcomparable encoding. @@ -314,6 +314,28 @@ impl JoinHashMap { self.degree_state.table.update_watermark(watermark); } + /// Take the state for the given `key` out of the hash table and return it. One **MUST** call + /// `update_state` after some operations to put the state back. + /// + /// If the state does not exist in the cache, fetch the remote storage and return. If it still + /// does not exist in the remote storage, a [`JoinEntryState`] with empty cache will be + /// returned. + /// + /// Note: This will NOT remove anything from remote storage. + pub fn take_state_opt(&mut self, key: &K) -> Option { + self.metrics.total_lookup_count += 1; + if self.inner.contains(key) { + tracing::info!("hit cache for join key: {:?}", key); + // Do not update the LRU statistics here with `peek_mut` since we will put the state + // back. + let mut state = self.inner.peek_mut(key).unwrap(); + Some(state.take()) + } else { + tracing::info!("miss cache for join key: {:?}", key); + None + } + } + /// Take the state for the given `key` out of the hash table and return it. One **MUST** call /// `update_state` after some operations to put the state back. /// @@ -336,6 +358,33 @@ impl JoinHashMap { Ok(state) } + #[try_stream(ok = (Vec, JoinRow), error = StreamExecutorError)] + pub async fn fetch_matched_rows<'a>(&'a self, key: &'a K) { + tracing::debug!("fetching matched rows for join key: {:?}", key); + let key = key.deserialize(&self.join_key_data_types)?; + let sub_range: &(Bound, Bound) = &(Bound::Unbounded, Bound::Unbounded); + let table_iter = self + .state + .table + .iter_with_prefix(&key, sub_range, PrefetchOptions::default()) + .await?; + + #[for_await] + for entry in table_iter { + let row = entry?; + let pk = row + .as_ref() + .project(&self.state.pk_indices) + .memcmp_serialize(&self.pk_serializer); + tracing::debug!( + "found matching rows for join key: {:?}, row: {:?}", + key, + row + ); + yield (pk, JoinRow::new(row.into_owned_row(), 0)); + } + } + /// Fetch cache from the state store. Should only be called if the key does not exist in memory. /// Will return a empty `JoinEntryState` even when state does not exist in remote. async fn fetch_cached_state(&self, key: &K) -> StreamExecutorResult { @@ -495,6 +544,10 @@ impl JoinHashMap { Ok(entry_state) } + pub fn error_context(&self, row: &impl Row) -> String { + self.state.error_context(row) + } + pub async fn flush(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> { self.metrics.flush(); self.state.table.commit(epoch).await?; @@ -682,10 +735,15 @@ impl JoinHashMap { } } +use risingwave_common::array::{Op, RowRef}; use risingwave_common_estimate_size::KvSize; +use risingwave_expr::expr::NonStrictExpression; use thiserror::Error; use super::*; +use crate::executor::join::builder::JoinChunkBuilder; +use crate::executor::prelude::try_stream; +use crate::executor::Watermark; /// We manages a `HashMap` in memory for all entries belonging to a join key. /// When evicted, `cached` does not hold any entries.