From a4b1f79dbdb50c82c3a0c79a3d11f66121c7f16d Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Tue, 19 Mar 2024 14:38:23 +0800 Subject: [PATCH] refactor(stream): use `params.eval_error_report` instead of creating a new one (#15714) Signed-off-by: Richard Chien --- src/stream/src/executor/dynamic_filter.rs | 25 +++++++++---------- src/stream/src/executor/watermark_filter.rs | 20 +++++++-------- src/stream/src/from_proto/dynamic_filter.rs | 6 +++-- src/stream/src/from_proto/watermark_filter.rs | 8 +++--- 4 files changed, 31 insertions(+), 28 deletions(-) diff --git a/src/stream/src/executor/dynamic_filter.rs b/src/stream/src/executor/dynamic_filter.rs index 6fa51b9fe7d2..f6ca469fb162 100644 --- a/src/stream/src/executor/dynamic_filter.rs +++ b/src/stream/src/executor/dynamic_filter.rs @@ -38,7 +38,7 @@ use risingwave_storage::StateStore; use super::barrier_align::*; use super::error::StreamExecutorError; use super::monitor::StreamingMetrics; -use super::{ActorContextRef, BoxedMessageStream, Execute, Executor, ExecutorInfo, Message}; +use super::{ActorContextRef, BoxedMessageStream, Execute, Executor, Message}; use crate::common::table::state_table::{StateTable, WatermarkCacheParameterizedStateTable}; use crate::common::StreamChunkBuilder; use crate::executor::expect_first_barrier_from_aligned_stream; @@ -70,7 +70,8 @@ impl DynamicFilterExecutor DynamicFilterExecutor Self { - let eval_error_report = ActorEvalErrorReport { - actor_context: ctx.clone(), - identity: Arc::from(info.identity.as_str()), - }; Self { ctx, eval_error_report, - schema: info.schema.clone(), + schema, source_l: Some(source_l), source_r: Some(source_r), key_l, @@ -556,13 +553,15 @@ mod tests { let (tx_r, source_r) = MockSource::channel(); let source_r = source_r.into_executor(schema, vec![]); + let ctx = ActorContext::for_test(123); + let eval_error_report = ActorEvalErrorReport { + actor_context: ctx.clone(), + identity: "DynamicFilterExecutor".into(), + }; let executor = DynamicFilterExecutor::::new( - ActorContext::for_test(123), - &ExecutorInfo { - schema: source_l.schema().clone(), - pk_indices: vec![0], - identity: "DynamicFilterExecutor".to_string(), - }, + ctx, + eval_error_report, + source_l.schema().clone(), source_l, source_r, 0, diff --git a/src/stream/src/executor/watermark_filter.rs b/src/stream/src/executor/watermark_filter.rs index 0d77ad49fd32..fc2f92237115 100644 --- a/src/stream/src/executor/watermark_filter.rs +++ b/src/stream/src/executor/watermark_filter.rs @@ -36,7 +36,7 @@ use risingwave_storage::StateStore; use super::error::StreamExecutorError; use super::filter::FilterExecutor; -use super::{ActorContextRef, Execute, Executor, ExecutorInfo, Message, StreamExecutorResult}; +use super::{ActorContextRef, Execute, Executor, Message, StreamExecutorResult}; use crate::common::table::state_table::StateTable; use crate::executor::{expect_first_barrier, Watermark}; use crate::task::ActorEvalErrorReport; @@ -61,18 +61,13 @@ pub struct WatermarkFilterExecutor { impl WatermarkFilterExecutor { pub fn new( ctx: ActorContextRef, - info: &ExecutorInfo, input: Executor, watermark_expr: NonStrictExpression, event_time_col_idx: usize, table: StateTable, global_watermark_table: StorageTable, + eval_error_report: ActorEvalErrorReport, ) -> Self { - let eval_error_report = ActorEvalErrorReport { - actor_context: ctx.clone(), - identity: Arc::from(info.identity.as_ref()), - }; - Self { ctx, input, @@ -377,7 +372,7 @@ mod tests { use super::*; use crate::executor::test_utils::expr::build_from_pretty; use crate::executor::test_utils::{MessageSender, MockSource}; - use crate::executor::ActorContext; + use crate::executor::{ActorContext, ExecutorInfo}; const WATERMARK_TYPE: DataType = DataType::Timestamp; @@ -463,21 +458,26 @@ mod tests { let (tx, source) = MockSource::channel(); let source = source.into_executor(schema, vec![0]); + let ctx = ActorContext::for_test(123); let info = ExecutorInfo { schema: source.schema().clone(), pk_indices: source.pk_indices().to_vec(), identity: "WatermarkFilterExecutor".to_string(), }; + let eval_error_report = ActorEvalErrorReport { + actor_context: ctx.clone(), + identity: info.identity.clone().into(), + }; ( WatermarkFilterExecutor::new( - ActorContext::for_test(123), - &info, + ctx, source, watermark_expr, 1, table, storage_table, + eval_error_report, ) .boxed(), tx, diff --git a/src/stream/src/from_proto/dynamic_filter.rs b/src/stream/src/from_proto/dynamic_filter.rs index be744354c3d1..c6ccd7038254 100644 --- a/src/stream/src/from_proto/dynamic_filter.rs +++ b/src/stream/src/from_proto/dynamic_filter.rs @@ -65,7 +65,8 @@ impl ExecutorBuilder for DynamicFilterExecutorBuilder { DynamicFilterExecutor::new( params.actor_context, - ¶ms.info, + params.eval_error_report, + params.info.schema.clone(), source_l, source_r, key_l, @@ -84,7 +85,8 @@ impl ExecutorBuilder for DynamicFilterExecutorBuilder { DynamicFilterExecutor::new( params.actor_context, - ¶ms.info, + params.eval_error_report, + params.info.schema.clone(), source_l, source_r, key_l, diff --git a/src/stream/src/from_proto/watermark_filter.rs b/src/stream/src/from_proto/watermark_filter.rs index f01695e99148..0081f00cc39e 100644 --- a/src/stream/src/from_proto/watermark_filter.rs +++ b/src/stream/src/from_proto/watermark_filter.rs @@ -38,8 +38,10 @@ impl ExecutorBuilder for WatermarkFilterBuilder { let [input]: [_; 1] = params.input.try_into().unwrap(); let watermark_descs = node.get_watermark_descs().clone(); let [watermark_desc]: [_; 1] = watermark_descs.try_into().unwrap(); - let watermark_expr = - build_non_strict_from_prost(&watermark_desc.expr.unwrap(), params.eval_error_report)?; + let watermark_expr = build_non_strict_from_prost( + &watermark_desc.expr.unwrap(), + params.eval_error_report.clone(), + )?; let event_time_col_idx = watermark_desc.watermark_idx as usize; let vnodes = Arc::new( params @@ -65,12 +67,12 @@ impl ExecutorBuilder for WatermarkFilterBuilder { let exec = WatermarkFilterExecutor::new( params.actor_context, - ¶ms.info, input, watermark_expr, event_time_col_idx, table, global_watermark_table, + params.eval_error_report, ); Ok((params.info, exec).into()) }