Skip to content

Commit

Permalink
same for delete
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel committed Dec 10, 2024
1 parent 5e25fa7 commit f2afca5
Showing 1 changed file with 18 additions and 55 deletions.
73 changes: 18 additions & 55 deletions src/stream/src/executor/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1488,13 +1488,16 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
#[for_await]
for matched_row in matched_rows {
let (encoded_pk, mut matched_row) = matched_row?;

// cache refill
if entry_state_count <= entry_state_max_rows {
entry_state
.insert(encoded_pk, matched_row.encode(), None) // TODO(kwannoel): handle ineq key for asof join.
.with_context(|| format!("row: {}", row.display(),))?;
entry_state_count += 1;
}

// check join cond
let join_condition_satisfied = Self::check_join_condition(
row,
side_update.start_pos,
Expand All @@ -1505,13 +1508,15 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
.await;
let mut need_state_clean = false;
if join_condition_satisfied {
// update degree
degree += 1;
if side_match.need_degree_table {
update_degree::<S, true>(
degree_table.as_mut().unwrap(),
&mut matched_row,
);
}
// send matched row downstream
if !forward_exactly_once(T, SIDE) {
if let Some(chunk) =
hashjoin_chunk_builder.with_match_on_insert(&row, &matched_row)
Expand All @@ -1520,6 +1525,7 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
}
}
} else {
// state cleaning
for (column_idx, watermark) in useful_state_clean_columns {
if matched_row
.row
Expand Down Expand Up @@ -1587,53 +1593,19 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
Op::Delete | Op::UpdateDelete => {
let mut degree = 0;
let mut matched_rows_to_clean = vec![];
let (
order_key_indices,
join_key_data_types,
pk_indices,
pk_serializer,
state_table,
mut degree_table,
) = side_match.ht.get_table_mut_refs();
let decoded_key = key.deserialize(join_key_data_types)?;
let degrees = if side_match.need_degree_table {
let degrees = Self::handle_fetch_degrees(
key,
join_key_data_types,
degree_table.as_ref().unwrap(),
)
.await?;
Some(degrees)
} else {
None
};
let table_iter = state_table
.iter_with_prefix(&decoded_key, sub_range, PrefetchOptions::default())
let (matched_rows, mut degree_table) = side_match
.ht
.fetch_matched_rows_and_get_degree_table_ref(key)
.await?;

#[for_await]
for (i, entry) in table_iter.enumerate() {
let encoded_row = entry?;
let encoded_pk = encoded_row
.as_ref()
.project(pk_indices)
.memcmp_serialize(pk_serializer);
let mut matched_row = JoinRow::new(
encoded_row.into_owned_row(),
degrees.as_ref().map_or(0, |d| d[i]),
);
for matched_row in matched_rows {
let (encoded_pk, mut matched_row) = matched_row?;

// cache refill
if entry_state_count <= entry_state_max_rows {
entry_state
.insert(encoded_pk, matched_row.encode(), None) // TODO(kwannoel): handle ineq key for asof join.
.with_context(|| {
let pk = row.project(pk_indices);
format!(
"pk: {}, row: {}, state_table_id: {}",
pk.display(),
row.display(),
state_table.table_id()
)
})?;
.with_context(|| format!("row: {}", row.display(),))?;
entry_state_count += 1;
}
let join_condition_satisfied = Self::check_join_condition(
Expand All @@ -1648,19 +1620,10 @@ impl<K: HashKey, S: StateStore, const T: JoinTypePrimitive> HashJoinExecutor<K,
if join_condition_satisfied {
degree += 1;
if side_match.need_degree_table {
// TODO: no need to `into_owned_row` here due to partial borrow.
let old_degree = matched_row
.to_table_rows(order_key_indices)
.1
.into_owned_row();

matched_row.degree += 1;

let new_degree = matched_row.to_table_rows(order_key_indices).1;
degree_table
.as_mut()
.unwrap()
.update(old_degree, new_degree);
update_degree::<S, false>(
degree_table.as_mut().unwrap(),
&mut matched_row,
);
}
if !forward_exactly_once(T, SIDE) {
if let Some(chunk) =
Expand Down

0 comments on commit f2afca5

Please sign in to comment.