From f2afca50660d62ced86d044da0f27afe8c816b15 Mon Sep 17 00:00:00 2001 From: Noel Kwan Date: Tue, 10 Dec 2024 21:35:57 +0800 Subject: [PATCH] same for delete --- src/stream/src/executor/hash_join.rs | 73 +++++++--------------------- 1 file changed, 18 insertions(+), 55 deletions(-) diff --git a/src/stream/src/executor/hash_join.rs b/src/stream/src/executor/hash_join.rs index 4997bf31a0ee0..1f5a18b95fef1 100644 --- a/src/stream/src/executor/hash_join.rs +++ b/src/stream/src/executor/hash_join.rs @@ -1488,6 +1488,8 @@ impl HashJoinExecutor HashJoinExecutor HashJoinExecutor( @@ -1512,6 +1516,7 @@ impl HashJoinExecutor HashJoinExecutor HashJoinExecutor { let mut degree = 0; let mut matched_rows_to_clean = vec![]; - let ( - order_key_indices, - join_key_data_types, - pk_indices, - pk_serializer, - state_table, - mut degree_table, - ) = side_match.ht.get_table_mut_refs(); - let decoded_key = key.deserialize(join_key_data_types)?; - let degrees = if side_match.need_degree_table { - let degrees = Self::handle_fetch_degrees( - key, - join_key_data_types, - degree_table.as_ref().unwrap(), - ) - .await?; - Some(degrees) - } else { - None - }; - let table_iter = state_table - .iter_with_prefix(&decoded_key, sub_range, PrefetchOptions::default()) + let (matched_rows, mut degree_table) = side_match + .ht + .fetch_matched_rows_and_get_degree_table_ref(key) .await?; - #[for_await] - for (i, entry) in table_iter.enumerate() { - let encoded_row = entry?; - let encoded_pk = encoded_row - .as_ref() - .project(pk_indices) - .memcmp_serialize(pk_serializer); - let mut matched_row = JoinRow::new( - encoded_row.into_owned_row(), - degrees.as_ref().map_or(0, |d| d[i]), - ); + for matched_row in matched_rows { + let (encoded_pk, mut matched_row) = matched_row?; + + // cache refill if entry_state_count <= entry_state_max_rows { entry_state .insert(encoded_pk, matched_row.encode(), None) // TODO(kwannoel): handle ineq key for asof join. - .with_context(|| { - let pk = row.project(pk_indices); - format!( - "pk: {}, row: {}, state_table_id: {}", - pk.display(), - row.display(), - state_table.table_id() - ) - })?; + .with_context(|| format!("row: {}", row.display(),))?; entry_state_count += 1; } let join_condition_satisfied = Self::check_join_condition( @@ -1648,19 +1620,10 @@ impl HashJoinExecutor( + degree_table.as_mut().unwrap(), + &mut matched_row, + ); } if !forward_exactly_once(T, SIDE) { if let Some(chunk) =