Skip to content

Commit

Permalink
refactor(stream): use params.eval_error_report instead of creating …
Browse files Browse the repository at this point in the history
…a new one (#15714)

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Mar 19, 2024
1 parent e4f5eb4 commit a4b1f79
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 28 deletions.
25 changes: 12 additions & 13 deletions src/stream/src/executor/dynamic_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use risingwave_storage::StateStore;
use super::barrier_align::*;
use super::error::StreamExecutorError;
use super::monitor::StreamingMetrics;
use super::{ActorContextRef, BoxedMessageStream, Execute, Executor, ExecutorInfo, Message};
use super::{ActorContextRef, BoxedMessageStream, Execute, Executor, Message};
use crate::common::table::state_table::{StateTable, WatermarkCacheParameterizedStateTable};
use crate::common::StreamChunkBuilder;
use crate::executor::expect_first_barrier_from_aligned_stream;
Expand Down Expand Up @@ -70,7 +70,8 @@ impl<S: StateStore, const USE_WATERMARK_CACHE: bool> DynamicFilterExecutor<S, US
#[allow(clippy::too_many_arguments)]
pub fn new(
ctx: ActorContextRef,
info: &ExecutorInfo,
eval_error_report: ActorEvalErrorReport,
schema: Schema,
source_l: Executor,
source_r: Executor,
key_l: usize,
Expand All @@ -82,14 +83,10 @@ impl<S: StateStore, const USE_WATERMARK_CACHE: bool> DynamicFilterExecutor<S, US
condition_always_relax: bool,
cleaned_by_watermark: bool,
) -> Self {
let eval_error_report = ActorEvalErrorReport {
actor_context: ctx.clone(),
identity: Arc::from(info.identity.as_str()),
};
Self {
ctx,
eval_error_report,
schema: info.schema.clone(),
schema,
source_l: Some(source_l),
source_r: Some(source_r),
key_l,
Expand Down Expand Up @@ -556,13 +553,15 @@ mod tests {
let (tx_r, source_r) = MockSource::channel();
let source_r = source_r.into_executor(schema, vec![]);

let ctx = ActorContext::for_test(123);
let eval_error_report = ActorEvalErrorReport {
actor_context: ctx.clone(),
identity: "DynamicFilterExecutor".into(),
};
let executor = DynamicFilterExecutor::<MemoryStateStore, false>::new(
ActorContext::for_test(123),
&ExecutorInfo {
schema: source_l.schema().clone(),
pk_indices: vec![0],
identity: "DynamicFilterExecutor".to_string(),
},
ctx,
eval_error_report,
source_l.schema().clone(),
source_l,
source_r,
0,
Expand Down
20 changes: 10 additions & 10 deletions src/stream/src/executor/watermark_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use risingwave_storage::StateStore;

use super::error::StreamExecutorError;
use super::filter::FilterExecutor;
use super::{ActorContextRef, Execute, Executor, ExecutorInfo, Message, StreamExecutorResult};
use super::{ActorContextRef, Execute, Executor, Message, StreamExecutorResult};
use crate::common::table::state_table::StateTable;
use crate::executor::{expect_first_barrier, Watermark};
use crate::task::ActorEvalErrorReport;
Expand All @@ -61,18 +61,13 @@ pub struct WatermarkFilterExecutor<S: StateStore> {
impl<S: StateStore> WatermarkFilterExecutor<S> {
pub fn new(
ctx: ActorContextRef,
info: &ExecutorInfo,
input: Executor,
watermark_expr: NonStrictExpression,
event_time_col_idx: usize,
table: StateTable<S>,
global_watermark_table: StorageTable<S>,
eval_error_report: ActorEvalErrorReport,
) -> Self {
let eval_error_report = ActorEvalErrorReport {
actor_context: ctx.clone(),
identity: Arc::from(info.identity.as_ref()),
};

Self {
ctx,
input,
Expand Down Expand Up @@ -377,7 +372,7 @@ mod tests {
use super::*;
use crate::executor::test_utils::expr::build_from_pretty;
use crate::executor::test_utils::{MessageSender, MockSource};
use crate::executor::ActorContext;
use crate::executor::{ActorContext, ExecutorInfo};

const WATERMARK_TYPE: DataType = DataType::Timestamp;

Expand Down Expand Up @@ -463,21 +458,26 @@ mod tests {
let (tx, source) = MockSource::channel();
let source = source.into_executor(schema, vec![0]);

let ctx = ActorContext::for_test(123);
let info = ExecutorInfo {
schema: source.schema().clone(),
pk_indices: source.pk_indices().to_vec(),
identity: "WatermarkFilterExecutor".to_string(),
};
let eval_error_report = ActorEvalErrorReport {
actor_context: ctx.clone(),
identity: info.identity.clone().into(),
};

(
WatermarkFilterExecutor::new(
ActorContext::for_test(123),
&info,
ctx,
source,
watermark_expr,
1,
table,
storage_table,
eval_error_report,
)
.boxed(),
tx,
Expand Down
6 changes: 4 additions & 2 deletions src/stream/src/from_proto/dynamic_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ impl ExecutorBuilder for DynamicFilterExecutorBuilder {

DynamicFilterExecutor::new(
params.actor_context,
&params.info,
params.eval_error_report,
params.info.schema.clone(),
source_l,
source_r,
key_l,
Expand All @@ -84,7 +85,8 @@ impl ExecutorBuilder for DynamicFilterExecutorBuilder {

DynamicFilterExecutor::new(
params.actor_context,
&params.info,
params.eval_error_report,
params.info.schema.clone(),
source_l,
source_r,
key_l,
Expand Down
8 changes: 5 additions & 3 deletions src/stream/src/from_proto/watermark_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ impl ExecutorBuilder for WatermarkFilterBuilder {
let [input]: [_; 1] = params.input.try_into().unwrap();
let watermark_descs = node.get_watermark_descs().clone();
let [watermark_desc]: [_; 1] = watermark_descs.try_into().unwrap();
let watermark_expr =
build_non_strict_from_prost(&watermark_desc.expr.unwrap(), params.eval_error_report)?;
let watermark_expr = build_non_strict_from_prost(
&watermark_desc.expr.unwrap(),
params.eval_error_report.clone(),
)?;
let event_time_col_idx = watermark_desc.watermark_idx as usize;
let vnodes = Arc::new(
params
Expand All @@ -65,12 +67,12 @@ impl ExecutorBuilder for WatermarkFilterBuilder {

let exec = WatermarkFilterExecutor::new(
params.actor_context,
&params.info,
input,
watermark_expr,
event_time_col_idx,
table,
global_watermark_table,
params.eval_error_report,
);
Ok((params.info, exec).into())
}
Expand Down

0 comments on commit a4b1f79

Please sign in to comment.