Skip to content

Commit

Permalink
improve
Browse files Browse the repository at this point in the history
  • Loading branch information
yuhao-su committed Sep 12, 2024
1 parent 19e5f74 commit 2c34c2f
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 10 deletions.
2 changes: 2 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -471,9 +471,11 @@ message AsOfJoinNode {
repeated uint32 output_indices = 8;
// Left deduped input pk indices. The pk of the left_table and
// The pk of the left_table is [left_join_key | left_inequality_key | left_deduped_input_pk_indices]
// left_inequality_key is not used but for forward compatibility.
repeated uint32 left_deduped_input_pk_indices = 9;
// Right deduped input pk indices.
// The pk of the right_table is [right_join_key | right_inequality_key | right_deduped_input_pk_indices]
// right_inequality_key is not used but for forward compatibility.
repeated uint32 right_deduped_input_pk_indices = 10;
repeated bool null_safe = 11;
optional plan_common.AsOfJoinDesc asof_desc = 12;
Expand Down
12 changes: 3 additions & 9 deletions src/stream/src/executor/asof_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,19 +619,13 @@ impl<K: HashKey, S: StateStore, const T: AsOfJoinTypePrimitive> AsOfJoinExecutor

let (side_update, side_match) = (side_l, side_r);

let CHUNK_BUILDER_T = match T {
AsOfJoinType::Inner => JoinType::Inner,
AsOfJoinType::LeftOuter => JoinType::LeftOuter,
};

let mut join_chunk_builder = JoinChunkBuilder::<CHUNK_BUILDER_T, { SideType::Left }>::new(
JoinStreamChunkBuilder::new(
let mut join_chunk_builder =
JoinChunkBuilder::<T, { SideType::Left }>::new(JoinStreamChunkBuilder::new(
chunk_size,
actual_output_data_types.to_vec(),
side_update.i2o_mapping.clone(),
side_match.i2o_mapping.clone(),
),
);
));

let keys = K::build_many(&side_update.join_key_indices, chunk.data_chunk());
for (r, key) in chunk.rows_with_holes().zip_eq_debug(keys.iter()) {
Expand Down
5 changes: 4 additions & 1 deletion src/stream/src/from_proto/asof_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use super::*;
use crate::common::table::state_table::StateTable;
use crate::executor::asof_join::*;
use crate::executor::monitor::StreamingMetrics;
use crate::executor::{ActorContextRef, AsOfDesc, AsOfJoinType};
use crate::executor::{ActorContextRef, AsOfDesc, AsOfJoinType, JoinType};
use crate::task::AtomicU64Ref;

pub struct AsOfJoinExecutorBuilder;
Expand All @@ -36,6 +36,9 @@ impl ExecutorBuilder for AsOfJoinExecutorBuilder {
node: &Self::Node,
store: impl StateStore,
) -> StreamResult<Executor> {
// This assert is to make sure AsOf join can use `JoinChunkBuilder` as Hash join.
assert_eq!(AsOfJoinType::Inner, JoinType::Inner);
assert_eq!(AsOfJoinType::LeftOuter, JoinType::LeftOuter);
let vnodes = Arc::new(params.vnode_bitmap.expect("vnodes not set for AsOf join"));

let [source_l, source_r]: [_; 2] = params.input.try_into().unwrap();
Expand Down

0 comments on commit 2c34c2f

Please sign in to comment.