From 2c34c2f17de549a5aec83368de2ac8c2807a3e3c Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Thu, 12 Sep 2024 17:47:26 +0800 Subject: [PATCH] improve --- proto/stream_plan.proto | 2 ++ src/stream/src/executor/asof_join.rs | 12 +++--------- src/stream/src/from_proto/asof_join.rs | 5 ++++- 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto index 385af9e8b3459..ca67737aeafe0 100644 --- a/proto/stream_plan.proto +++ b/proto/stream_plan.proto @@ -471,9 +471,11 @@ message AsOfJoinNode { repeated uint32 output_indices = 8; // Left deduped input pk indices. The pk of the left_table and // The pk of the left_table is [left_join_key | left_inequality_key | left_deduped_input_pk_indices] + // left_inequality_key is not used but for forward compatibility. repeated uint32 left_deduped_input_pk_indices = 9; // Right deduped input pk indices. // The pk of the right_table is [right_join_key | right_inequality_key | right_deduped_input_pk_indices] + // right_inequality_key is not used but for forward compatibility. repeated uint32 right_deduped_input_pk_indices = 10; repeated bool null_safe = 11; optional plan_common.AsOfJoinDesc asof_desc = 12; diff --git a/src/stream/src/executor/asof_join.rs b/src/stream/src/executor/asof_join.rs index 85dcd333b92a2..cb8a141481f28 100644 --- a/src/stream/src/executor/asof_join.rs +++ b/src/stream/src/executor/asof_join.rs @@ -619,19 +619,13 @@ impl AsOfJoinExecutor let (side_update, side_match) = (side_l, side_r); - let CHUNK_BUILDER_T = match T { - AsOfJoinType::Inner => JoinType::Inner, - AsOfJoinType::LeftOuter => JoinType::LeftOuter, - }; - - let mut join_chunk_builder = JoinChunkBuilder::::new( - JoinStreamChunkBuilder::new( + let mut join_chunk_builder = + JoinChunkBuilder::::new(JoinStreamChunkBuilder::new( chunk_size, actual_output_data_types.to_vec(), side_update.i2o_mapping.clone(), side_match.i2o_mapping.clone(), - ), - ); + )); let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk()); for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) { diff --git a/src/stream/src/from_proto/asof_join.rs b/src/stream/src/from_proto/asof_join.rs index 0e50bdfabd7b2..3d74ac884b4f0 100644 --- a/src/stream/src/from_proto/asof_join.rs +++ b/src/stream/src/from_proto/asof_join.rs @@ -23,7 +23,7 @@ use super::*; use crate::common::table::state_table::StateTable; use crate::executor::asof_join::*; use crate::executor::monitor::StreamingMetrics; -use crate::executor::{ActorContextRef, AsOfDesc, AsOfJoinType}; +use crate::executor::{ActorContextRef, AsOfDesc, AsOfJoinType, JoinType}; use crate::task::AtomicU64Ref; pub struct AsOfJoinExecutorBuilder; @@ -36,6 +36,9 @@ impl ExecutorBuilder for AsOfJoinExecutorBuilder { node: &Self::Node, store: impl StateStore, ) -> StreamResult { + // This assert is to make sure AsOf join can use `JoinChunkBuilder` as Hash join. + assert_eq!(AsOfJoinType::Inner, JoinType::Inner); + assert_eq!(AsOfJoinType::LeftOuter, JoinType::LeftOuter); let vnodes = Arc::new(params.vnode_bitmap.expect("vnodes not set for AsOf join")); let [source_l, source_r]: [_; 2] = params.input.try_into().unwrap();