From 29ddf583f3533bbb51a9b761c018d47fcb14f0df Mon Sep 17 00:00:00 2001 From: William Wen Date: Fri, 6 Sep 2024 18:46:36 +0800 Subject: [PATCH] refine and fix ut --- .../executor/backfill/snapshot_backfill.rs | 10 +- src/stream/src/executor/dispatch.rs | 9 +- src/stream/src/executor/integration_tests.rs | 276 +++++++++--------- src/stream/src/executor/merge.rs | 25 +- src/stream/src/executor/mod.rs | 2 +- src/stream/src/executor/test_utils.rs | 16 +- src/stream/src/from_proto/merge.rs | 14 +- src/stream/src/task/stream_manager.rs | 98 +++---- 8 files changed, 221 insertions(+), 229 deletions(-) diff --git a/src/stream/src/executor/backfill/snapshot_backfill.rs b/src/stream/src/executor/backfill/snapshot_backfill.rs index 77ed8473d7848..449af5c900a67 100644 --- a/src/stream/src/executor/backfill/snapshot_backfill.rs +++ b/src/stream/src/executor/backfill/snapshot_backfill.rs @@ -40,7 +40,7 @@ use crate::executor::monitor::StreamingMetrics; use crate::executor::prelude::{try_stream, StreamExt}; use crate::executor::{ expect_first_barrier, ActorContextRef, BackfillExecutor, Barrier, BoxedMessageStream, - DispatcherBarrier, DispatcherMessage, Execute, InputExecutor, Message, Mutation, + DispatcherBarrier, DispatcherMessage, Execute, MergeExecutorInput, Message, Mutation, StreamExecutorError, StreamExecutorResult, }; use crate::task::CreateMviewProgress; @@ -50,7 +50,7 @@ pub struct SnapshotBackfillExecutor { upstream_table: StorageTable, /// Upstream with the same schema with the upstream table. - upstream: InputExecutor, + upstream: MergeExecutorInput, /// The column indices need to be forwarded to the downstream from the upstream and table scan. output_indices: Vec, @@ -70,7 +70,7 @@ impl SnapshotBackfillExecutor { #[expect(clippy::too_many_arguments)] pub(crate) fn new( upstream_table: StorageTable, - upstream: InputExecutor, + upstream: MergeExecutorInput, output_indices: Vec, actor_ctx: ActorContextRef, progress: CreateMviewProgress, @@ -403,7 +403,7 @@ impl<'a> UpstreamBufferState for StateOfConsumingLogStore<'a> { } struct UpstreamBuffer<'a, S> { - upstream: &'a mut InputExecutor, + upstream: &'a mut MergeExecutorInput, state: S, consume_upstream_row_count: LabelGuardedIntCounter<3>, upstream_table_id: TableId, @@ -412,7 +412,7 @@ struct UpstreamBuffer<'a, S> { impl<'a> UpstreamBuffer<'a, StateOfConsumingSnapshot> { fn new( - upstream: &'a mut InputExecutor, + upstream: &'a mut MergeExecutorInput, upstream_table_id: TableId, current_subscriber_id: u32, consume_upstream_row_count: LabelGuardedIntCounter<3>, diff --git a/src/stream/src/executor/dispatch.rs b/src/stream/src/executor/dispatch.rs index 0fc9da0e5ab23..b3fbc22294fd1 100644 --- a/src/stream/src/executor/dispatch.rs +++ b/src/stream/src/executor/dispatch.rs @@ -1180,10 +1180,6 @@ mod tests { let actor_id = 233; let fragment_id = 666; let barrier_test_env = LocalBarrierTestEnv::for_test().await; - let input = Executor::new( - Default::default(), - ReceiverExecutor::for_test(233, rx, barrier_test_env.shared_context.clone()).boxed(), - ); let ctx = Arc::new(SharedContext::for_test()); let metrics = Arc::new(StreamingMetrics::unused()); @@ -1252,6 +1248,11 @@ mod tests { .flush_all_events() .await; + let input = Executor::new( + Default::default(), + ReceiverExecutor::for_test(actor_id, rx, barrier_test_env.shared_context.clone()) + .boxed(), + ); let executor = Box::new(DispatchExecutor::new( input, vec![broadcast_dispatcher, simple_dispatcher], diff --git a/src/stream/src/executor/integration_tests.rs b/src/stream/src/executor/integration_tests.rs index 13e9a67d1c525..6ea34de857219 100644 --- a/src/stream/src/executor/integration_tests.rs +++ b/src/stream/src/executor/integration_tests.rs @@ -64,41 +64,45 @@ async fn test_merger_sum_aggr() { let input_schema = Schema { fields: vec![Field::unnamed(DataType::Int64)], }; - let input = Executor::new( - ExecutorInfo { - schema: input_schema, - pk_indices: PkIndices::new(), - identity: "ReceiverExecutor".to_string(), - }, - ReceiverExecutor::for_test(actor_id, input_rx, barrier_test_env.shared_context.clone()) - .boxed(), - ); - let agg_calls = vec![ - AggCall::from_pretty("(count:int8)"), - AggCall::from_pretty("(sum:int8 $0:int8)"), - ]; - let schema = generate_agg_schema(&input, &agg_calls, None); - // for the local aggregator, we need two states: row count and sum - let aggregator = - StatelessSimpleAggExecutor::new(actor_ctx.clone(), input, schema, agg_calls).unwrap(); + let shared_context = barrier_test_env.shared_context.clone(); + let expr_context = expr_context.clone(); let (tx, rx) = channel_for_test(); - let consumer = SenderConsumer { - input: aggregator.boxed(), - channel: Box::new(LocalOutput::new(233, tx)), - }; + let actor_future = async move { + let input = Executor::new( + ExecutorInfo { + schema: input_schema, + pk_indices: PkIndices::new(), + identity: "ReceiverExecutor".to_string(), + }, + ReceiverExecutor::for_test(actor_id, input_rx, shared_context.clone()).boxed(), + ); + let agg_calls = vec![ + AggCall::from_pretty("(count:int8)"), + AggCall::from_pretty("(sum:int8 $0:int8)"), + ]; + let schema = generate_agg_schema(&input, &agg_calls, None); + // for the local aggregator, we need two states: row count and sum + let aggregator = + StatelessSimpleAggExecutor::new(actor_ctx.clone(), input, schema, agg_calls) + .unwrap(); + let consumer = SenderConsumer { + input: aggregator.boxed(), + channel: Box::new(LocalOutput::new(233, tx)), + }; - let actor = Actor::new( - consumer, - vec![], - StreamingMetrics::unused().into(), - actor_ctx, - expr_context.clone(), - barrier_test_env - .shared_context - .local_barrier_manager - .clone(), - ); - (actor, rx) + let actor = Actor::new( + consumer, + vec![], + StreamingMetrics::unused().into(), + actor_ctx, + expr_context, + shared_context.local_barrier_manager.clone(), + ); + + actor.run().await + } + .boxed(); + (actor_future, rx) }; // join handles of all actors @@ -113,9 +117,9 @@ async fn test_merger_sum_aggr() { // create 17 local aggregation actors for _ in 0..17 { let (tx, rx) = channel_for_test(); - let (actor, channel) = make_actor(rx); + let (actor_future, channel) = make_actor(rx); outputs.push(channel); - actor_futures.push(actor.run().boxed()); + actor_futures.push(actor_future); inputs.push(Box::new(LocalOutput::new(233, tx)) as BoxedOutput); } @@ -123,111 +127,117 @@ async fn test_merger_sum_aggr() { let actor_id = gen_next_actor_id(); let (input, rx) = channel_for_test(); - let receiver_op = Executor::new( - ExecutorInfo { - // input schema of local simple agg - schema: Schema::new(vec![Field::unnamed(DataType::Int64)]), - pk_indices: PkIndices::new(), - identity: "ReceiverExecutor".to_string(), - }, - ReceiverExecutor::for_test(actor_id, rx, barrier_test_env.shared_context.clone()).boxed(), - ); - let dispatcher = DispatchExecutor::new( - receiver_op, - vec![DispatcherImpl::RoundRobin(RoundRobinDataDispatcher::new( - inputs, - vec![0], - 0, - ))], - 0, - 0, - barrier_test_env.shared_context.clone(), - metrics, - config::default::developer::stream_chunk_size(), - ); - let actor = Actor::new( - dispatcher, - vec![], - StreamingMetrics::unused().into(), - ActorContext::for_test(actor_id), - expr_context.clone(), - barrier_test_env - .shared_context - .local_barrier_manager - .clone(), - ); - actor_futures.push(actor.run().boxed()); + let actor_future = { + let shared_context = barrier_test_env.shared_context.clone(); + let expr_context = expr_context.clone(); + async move { + let receiver_op = Executor::new( + ExecutorInfo { + // input schema of local simple agg + schema: Schema::new(vec![Field::unnamed(DataType::Int64)]), + pk_indices: PkIndices::new(), + identity: "ReceiverExecutor".to_string(), + }, + ReceiverExecutor::for_test(actor_id, rx, shared_context.clone()).boxed(), + ); + let dispatcher = DispatchExecutor::new( + receiver_op, + vec![DispatcherImpl::RoundRobin(RoundRobinDataDispatcher::new( + inputs, + vec![0], + 0, + ))], + 0, + 0, + shared_context.clone(), + metrics, + config::default::developer::stream_chunk_size(), + ); + let actor = Actor::new( + dispatcher, + vec![], + StreamingMetrics::unused().into(), + ActorContext::for_test(actor_id), + expr_context, + shared_context.local_barrier_manager.clone(), + ); + actor.run().await + } + .boxed() + }; + actor_futures.push(actor_future); let actor_ctx = ActorContext::for_test(gen_next_actor_id()); - // use a merge operator to collect data from dispatchers before sending them to aggregator - let merger = Executor::new( - ExecutorInfo { - // output schema of local simple agg - schema: Schema::new(vec![ - Field::unnamed(DataType::Int64), - Field::unnamed(DataType::Int64), - ]), - pk_indices: PkIndices::new(), - identity: "MergeExecutor".to_string(), - }, - MergeExecutor::for_test( - actor_ctx.id, - outputs, - barrier_test_env.shared_context.clone(), - ) - .boxed(), - ); + let items = Arc::new(Mutex::new(vec![])); + let actor_future = { + let shared_context = barrier_test_env.shared_context.clone(); + let expr_context = expr_context.clone(); + let items = items.clone(); + async move { + // use a merge operator to collect data from dispatchers before sending them to aggregator + let merger = Executor::new( + ExecutorInfo { + // output schema of local simple agg + schema: Schema::new(vec![ + Field::unnamed(DataType::Int64), + Field::unnamed(DataType::Int64), + ]), + pk_indices: PkIndices::new(), + identity: "MergeExecutor".to_string(), + }, + MergeExecutor::for_test(actor_ctx.id, outputs, shared_context.clone()).boxed(), + ); - // for global aggregator, we need to sum data and sum row count - let is_append_only = false; - let aggregator = new_boxed_simple_agg_executor( - actor_ctx.clone(), - MemoryStateStore::new(), - merger, - is_append_only, - vec![ - AggCall::from_pretty("(sum0:int8 $0:int8)"), - AggCall::from_pretty("(sum:int8 $1:int8)"), - AggCall::from_pretty("(count:int8)"), - ], - 2, // row_count_index - vec![], - 2, - false, - ) - .await; + // for global aggregator, we need to sum data and sum row count + let is_append_only = false; + let aggregator = new_boxed_simple_agg_executor( + actor_ctx.clone(), + MemoryStateStore::new(), + merger, + is_append_only, + vec![ + AggCall::from_pretty("(sum0:int8 $0:int8)"), + AggCall::from_pretty("(sum:int8 $1:int8)"), + AggCall::from_pretty("(count:int8)"), + ], + 2, // row_count_index + vec![], + 2, + false, + ) + .await; - let projection = ProjectExecutor::new( - actor_ctx.clone(), - aggregator, - vec![ - // TODO: use the new streaming_if_null expression here, and add `None` tests - NonStrictExpression::for_test(InputRefExpression::new(DataType::Int64, 1)), - ], - MultiMap::new(), - vec![], - 0.0, - false, - ); + let projection = ProjectExecutor::new( + actor_ctx.clone(), + aggregator, + vec![ + // TODO: use the new streaming_if_null expression here, and add `None` tests + NonStrictExpression::for_test(InputRefExpression::new(DataType::Int64, 1)), + ], + MultiMap::new(), + vec![], + 0.0, + false, + ); - let items = Arc::new(Mutex::new(vec![])); - let consumer = MockConsumer { - input: projection.boxed(), - data: items.clone(), + let consumer = MockConsumer { + input: projection.boxed(), + data: items.clone(), + }; + let actor = Actor::new( + consumer, + vec![], + StreamingMetrics::unused().into(), + actor_ctx.clone(), + expr_context, + shared_context.local_barrier_manager.clone(), + ); + actor.run().await + } + .boxed() }; - let actor = Actor::new( - consumer, - vec![], - StreamingMetrics::unused().into(), - actor_ctx.clone(), - expr_context.clone(), - barrier_test_env - .shared_context - .local_barrier_manager - .clone(), - ); - actor_futures.push(actor.run().boxed()); + actor_futures.push(actor_future); let mut epoch = test_epoch(1); let b1 = Barrier::new_test_barrier(epoch); diff --git a/src/stream/src/executor/merge.rs b/src/stream/src/executor/merge.rs index 1f33830e2a576..0316f7cf36796 100644 --- a/src/stream/src/executor/merge.rs +++ b/src/stream/src/executor/merge.rs @@ -33,13 +33,13 @@ use crate::executor::exchange::input::{ use crate::executor::prelude::*; use crate::task::SharedContext; -pub(crate) enum InputExecutorUpstream { +pub(crate) enum MergeExecutorUpstream { Singleton(BoxedInput), Merge(SelectReceivers), } -pub(crate) struct InputExecutor { - upstream: InputExecutorUpstream, +pub(crate) struct MergeExecutorInput { + upstream: MergeExecutorUpstream, actor_context: ActorContextRef, upstream_fragment_id: UpstreamFragmentId, shared_context: Arc, @@ -47,9 +47,9 @@ pub(crate) struct InputExecutor { info: ExecutorInfo, } -impl InputExecutor { +impl MergeExecutorInput { pub(crate) fn new( - upstream: InputExecutorUpstream, + upstream: MergeExecutorUpstream, actor_context: ActorContextRef, upstream_fragment_id: UpstreamFragmentId, shared_context: Arc, @@ -69,7 +69,7 @@ impl InputExecutor { pub(crate) fn into_executor(self, barrier_rx: mpsc::UnboundedReceiver) -> Executor { let fragment_id = self.actor_context.fragment_id; let executor = match self.upstream { - InputExecutorUpstream::Singleton(input) => ReceiverExecutor::new( + MergeExecutorUpstream::Singleton(input) => ReceiverExecutor::new( self.actor_context, fragment_id, self.upstream_fragment_id, @@ -79,7 +79,7 @@ impl InputExecutor { barrier_rx, ) .boxed(), - InputExecutorUpstream::Merge(inputs) => MergeExecutor::new( + MergeExecutorUpstream::Merge(inputs) => MergeExecutor::new( self.actor_context, fragment_id, self.upstream_fragment_id, @@ -94,13 +94,13 @@ impl InputExecutor { } } -impl Stream for InputExecutor { +impl Stream for MergeExecutorInput { type Item = DispatcherMessageStreamItem; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match &mut self.get_mut().upstream { - InputExecutorUpstream::Singleton(input) => input.poll_next_unpin(cx), - InputExecutorUpstream::Merge(inputs) => inputs.poll_next_unpin(cx), + MergeExecutorUpstream::Singleton(input) => input.poll_next_unpin(cx), + MergeExecutorUpstream::Merge(inputs) => inputs.poll_next_unpin(cx), } } } @@ -605,8 +605,7 @@ mod tests { rxs.push(rx); } let barrier_test_env = LocalBarrierTestEnv::for_test().await; - let merger = MergeExecutor::for_test(233, rxs, barrier_test_env.shared_context.clone()); - let actor_id = merger.actor_context.id; + let actor_id = 233; let mut handles = Vec::with_capacity(CHANNEL_NUMBER); let epochs = (10..1000u64) @@ -664,6 +663,8 @@ mod tests { handles.push(handle); } + let merger = + MergeExecutor::for_test(actor_id, rxs, barrier_test_env.shared_context.clone()); let mut merger = merger.boxed().execute(); for (idx, epoch) in epochs { // expect n chunks diff --git a/src/stream/src/executor/mod.rs b/src/stream/src/executor/mod.rs index e6f8530c8fdb4..05688474f5bd9 100644 --- a/src/stream/src/executor/mod.rs +++ b/src/stream/src/executor/mod.rs @@ -137,7 +137,7 @@ pub use join::JoinType; pub use lookup::*; pub use lookup_union::LookupUnionExecutor; pub use merge::MergeExecutor; -pub(crate) use merge::{InputExecutor, InputExecutorUpstream}; +pub(crate) use merge::{MergeExecutorInput, MergeExecutorUpstream}; pub use mview::*; pub use no_op::NoOpExecutor; pub use now::*; diff --git a/src/stream/src/executor/test_utils.rs b/src/stream/src/executor/test_utils.rs index 4744bae374bfb..36534ace12b05 100644 --- a/src/stream/src/executor/test_utils.rs +++ b/src/stream/src/executor/test_utils.rs @@ -324,13 +324,11 @@ pub mod agg_executor { agg_call: &AggCall, group_key_indices: &[usize], pk_indices: &[usize], - input_ref: &Executor, + input_fields: Vec, is_append_only: bool, ) -> AggStateStorage { match agg_call.kind { AggKind::Builtin(PbAggKind::Min | PbAggKind::Max) if !is_append_only => { - let input_fields = input_ref.schema().fields(); - let mut column_descs = Vec::new(); let mut order_types = Vec::new(); let mut upstream_columns = Vec::new(); @@ -397,10 +395,8 @@ pub mod agg_executor { table_id: TableId, agg_calls: &[AggCall], group_key_indices: &[usize], - input_ref: &Executor, + input_fields: Vec, ) -> StateTable { - let input_fields = input_ref.schema().fields(); - let mut column_descs = Vec::new(); let mut order_types = Vec::new(); @@ -455,7 +451,7 @@ pub mod agg_executor { agg_call, &group_key_indices, &pk_indices, - &input, + input.info.schema.fields.clone(), is_append_only, ) .await, @@ -467,7 +463,7 @@ pub mod agg_executor { TableId::new(agg_calls.len() as u32), &agg_calls, &group_key_indices, - &input, + input.info.schema.fields.clone(), ) .await; @@ -524,7 +520,7 @@ pub mod agg_executor { agg_call, &[], &pk_indices, - &input, + input.info.schema.fields.clone(), is_append_only, ) })) @@ -535,7 +531,7 @@ pub mod agg_executor { TableId::new(agg_calls.len() as u32), &agg_calls, &[], - &input, + input.info.schema.fields.clone(), ) .await; diff --git a/src/stream/src/from_proto/merge.rs b/src/stream/src/from_proto/merge.rs index e60079170bcef..d6c7ce157a931 100644 --- a/src/stream/src/from_proto/merge.rs +++ b/src/stream/src/from_proto/merge.rs @@ -19,19 +19,19 @@ use risingwave_pb::stream_plan::{DispatcherType, MergeNode}; use super::*; use crate::executor::exchange::input::new_input; use crate::executor::monitor::StreamingMetrics; -use crate::executor::{ActorContextRef, InputExecutor, InputExecutorUpstream, MergeExecutor}; +use crate::executor::{ActorContextRef, MergeExecutor, MergeExecutorInput, MergeExecutorUpstream}; use crate::task::SharedContext; pub struct MergeExecutorBuilder; impl MergeExecutorBuilder { - pub(crate) fn new_input_executor( + pub(crate) fn new_input( shared_context: Arc, executor_stats: Arc, actor_context: ActorContextRef, info: ExecutorInfo, node: &MergeNode, - ) -> StreamResult { + ) -> StreamResult { let upstreams = node.get_upstream_actor_id(); let upstream_fragment_id = node.get_upstream_fragment_id(); @@ -61,15 +61,15 @@ impl MergeExecutorBuilder { }; let upstreams = if always_single_input { - InputExecutorUpstream::Singleton(inputs.into_iter().exactly_one().unwrap()) + MergeExecutorUpstream::Singleton(inputs.into_iter().exactly_one().unwrap()) } else { - InputExecutorUpstream::Merge(MergeExecutor::new_select_receiver( + MergeExecutorUpstream::Merge(MergeExecutor::new_select_receiver( inputs, &executor_stats, &actor_context, )) }; - Ok(InputExecutor::new( + Ok(MergeExecutorInput::new( upstreams, actor_context, upstream_fragment_id, @@ -92,7 +92,7 @@ impl ExecutorBuilder for MergeExecutorBuilder { .shared_context .local_barrier_manager .subscribe_barrier(params.actor_context.id); - Ok(Self::new_input_executor( + Ok(Self::new_input( params.shared_context, params.executor_stats, params.actor_context, diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 08acb353c739c..b853aed3628b9 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -53,7 +53,8 @@ use crate::executor::monitor::StreamingMetrics; use crate::executor::subtask::SubtaskHandle; use crate::executor::{ Actor, ActorContext, ActorContextRef, DispatchExecutor, DispatcherImpl, Execute, Executor, - ExecutorInfo, InputExecutor, SnapshotBackfillExecutor, TroublemakerExecutor, WrapperExecutor, + ExecutorInfo, MergeExecutorInput, SnapshotBackfillExecutor, TroublemakerExecutor, + WrapperExecutor, }; use crate::from_proto::{create_executor, MergeExecutorBuilder}; use crate::task::barrier_manager::{ @@ -292,35 +293,45 @@ impl StreamActorManager { )) } - fn create_snapshot_backfill_input( - &self, - upstream_node: &StreamNode, - actor_context: &ActorContextRef, - shared_context: &Arc, - ) -> StreamResult { - let upstream_merge = must_match!(upstream_node.get_node_body().unwrap(), NodeBody::Merge(upstream_merge) => { - upstream_merge - }); - let executor_id = unique_executor_id(actor_context.id, upstream_node.operator_id); - let schema: Schema = upstream_node.fields.iter().map(Field::from).collect(); + fn get_executor_id(actor_context: &ActorContext, node: &StreamNode) -> u64 { + // We assume that the operator_id of different instances from the same RelNode will be the + // same. + unique_executor_id(actor_context.id, node.operator_id) + } - let pk_indices = upstream_node + fn get_executor_info(node: &StreamNode, executor_id: u64) -> ExecutorInfo { + let schema: Schema = node.fields.iter().map(Field::from).collect(); + + let pk_indices = node .get_stream_key() .iter() .map(|idx| *idx as usize) .collect::>(); - let identity = format!( - "{} {:X}", - upstream_node.get_node_body().unwrap(), - executor_id - ); - let info = ExecutorInfo { + let identity = format!("{} {:X}", node.get_node_body().unwrap(), executor_id); + ExecutorInfo { schema, pk_indices, identity, - }; - MergeExecutorBuilder::new_input_executor( + } + } + + fn create_snapshot_backfill_input( + &self, + upstream_node: &StreamNode, + actor_context: &ActorContextRef, + shared_context: &Arc, + ) -> StreamResult { + let info = Self::get_executor_info( + upstream_node, + Self::get_executor_id(actor_context, upstream_node), + ); + + let upstream_merge = must_match!(upstream_node.get_node_body().unwrap(), NodeBody::Merge(upstream_merge) => { + upstream_merge + }); + + MergeExecutorBuilder::new_input( shared_context.clone(), self.streaming_metrics.clone(), actor_context.clone(), @@ -340,12 +351,9 @@ impl StreamActorManager { env: StreamEnvironment, state_store: impl StateStore, ) -> StreamResult { - assert_eq!(2, stream_node.input.len()); - let upstream = self.create_snapshot_backfill_input( - &stream_node.input[0], - actor_context, - shared_context, - )?; + let [upstream_node, _]: &[_; 2] = stream_node.input.as_slice().try_into().unwrap(); + let upstream = + self.create_snapshot_backfill_input(upstream_node, actor_context, shared_context)?; let table_desc: &StorageTableDesc = node.get_table_desc()?; @@ -386,23 +394,10 @@ impl StreamActorManager { ) .boxed(); - let pk_indices = stream_node - .get_stream_key() - .iter() - .map(|idx| *idx as usize) - .collect::>(); - - // We assume that the operator_id of different instances from the same RelNode will be the - // same. - let executor_id = unique_executor_id(actor_context.id, stream_node.operator_id); - let schema: Schema = stream_node.fields.iter().map(Field::from).collect(); - - let identity = format!("{} {:X}", stream_node.get_node_body().unwrap(), executor_id); - let info = ExecutorInfo { - schema, - pk_indices, - identity, - }; + let info = Self::get_executor_info( + stream_node, + Self::get_executor_id(actor_context, stream_node), + ); if crate::consistency::insane() { let mut troubled_info = info.clone(); @@ -489,24 +484,13 @@ impl StreamActorManager { } let op_info = node.get_identity().clone(); - let pk_indices = node - .get_stream_key() - .iter() - .map(|idx| *idx as usize) - .collect::>(); // We assume that the operator_id of different instances from the same RelNode will be the // same. - let executor_id = unique_executor_id(actor_context.id, node.operator_id); + let executor_id = Self::get_executor_id(actor_context, node); let operator_id = unique_operator_id(fragment_id, node.operator_id); - let schema: Schema = node.fields.iter().map(Field::from).collect(); - let identity = format!("{} {:X}", node.get_node_body().unwrap(), executor_id); - let info = ExecutorInfo { - schema, - pk_indices, - identity, - }; + let info = Self::get_executor_info(node, executor_id); let eval_error_report = ActorEvalErrorReport { actor_context: actor_context.clone(),