Skip to content

Commit

Permalink
refine and fix ut
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Sep 6, 2024
1 parent c1a44ab commit 29ddf58
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 229 deletions.
10 changes: 5 additions & 5 deletions src/stream/src/executor/backfill/snapshot_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use crate::executor::monitor::StreamingMetrics;
use crate::executor::prelude::{try_stream, StreamExt};
use crate::executor::{
expect_first_barrier, ActorContextRef, BackfillExecutor, Barrier, BoxedMessageStream,
DispatcherBarrier, DispatcherMessage, Execute, InputExecutor, Message, Mutation,
DispatcherBarrier, DispatcherMessage, Execute, MergeExecutorInput, Message, Mutation,
StreamExecutorError, StreamExecutorResult,
};
use crate::task::CreateMviewProgress;
Expand All @@ -50,7 +50,7 @@ pub struct SnapshotBackfillExecutor<S: StateStore> {
upstream_table: StorageTable<S>,

/// Upstream with the same schema with the upstream table.
upstream: InputExecutor,
upstream: MergeExecutorInput,

/// The column indices need to be forwarded to the downstream from the upstream and table scan.
output_indices: Vec<usize>,
Expand All @@ -70,7 +70,7 @@ impl<S: StateStore> SnapshotBackfillExecutor<S> {
#[expect(clippy::too_many_arguments)]
pub(crate) fn new(
upstream_table: StorageTable<S>,
upstream: InputExecutor,
upstream: MergeExecutorInput,
output_indices: Vec<usize>,
actor_ctx: ActorContextRef,
progress: CreateMviewProgress,
Expand Down Expand Up @@ -403,7 +403,7 @@ impl<'a> UpstreamBufferState for StateOfConsumingLogStore<'a> {
}

struct UpstreamBuffer<'a, S> {
upstream: &'a mut InputExecutor,
upstream: &'a mut MergeExecutorInput,
state: S,
consume_upstream_row_count: LabelGuardedIntCounter<3>,
upstream_table_id: TableId,
Expand All @@ -412,7 +412,7 @@ struct UpstreamBuffer<'a, S> {

impl<'a> UpstreamBuffer<'a, StateOfConsumingSnapshot> {
fn new(
upstream: &'a mut InputExecutor,
upstream: &'a mut MergeExecutorInput,
upstream_table_id: TableId,
current_subscriber_id: u32,
consume_upstream_row_count: LabelGuardedIntCounter<3>,
Expand Down
9 changes: 5 additions & 4 deletions src/stream/src/executor/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1180,10 +1180,6 @@ mod tests {
let actor_id = 233;
let fragment_id = 666;
let barrier_test_env = LocalBarrierTestEnv::for_test().await;
let input = Executor::new(
Default::default(),
ReceiverExecutor::for_test(233, rx, barrier_test_env.shared_context.clone()).boxed(),
);
let ctx = Arc::new(SharedContext::for_test());
let metrics = Arc::new(StreamingMetrics::unused());

Expand Down Expand Up @@ -1252,6 +1248,11 @@ mod tests {
.flush_all_events()
.await;

let input = Executor::new(
Default::default(),
ReceiverExecutor::for_test(actor_id, rx, barrier_test_env.shared_context.clone())
.boxed(),
);
let executor = Box::new(DispatchExecutor::new(
input,
vec![broadcast_dispatcher, simple_dispatcher],
Expand Down
276 changes: 143 additions & 133 deletions src/stream/src/executor/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,41 +64,45 @@ async fn test_merger_sum_aggr() {
let input_schema = Schema {
fields: vec![Field::unnamed(DataType::Int64)],
};
let input = Executor::new(
ExecutorInfo {
schema: input_schema,
pk_indices: PkIndices::new(),
identity: "ReceiverExecutor".to_string(),
},
ReceiverExecutor::for_test(actor_id, input_rx, barrier_test_env.shared_context.clone())
.boxed(),
);
let agg_calls = vec![
AggCall::from_pretty("(count:int8)"),
AggCall::from_pretty("(sum:int8 $0:int8)"),
];
let schema = generate_agg_schema(&input, &agg_calls, None);
// for the local aggregator, we need two states: row count and sum
let aggregator =
StatelessSimpleAggExecutor::new(actor_ctx.clone(), input, schema, agg_calls).unwrap();
let shared_context = barrier_test_env.shared_context.clone();
let expr_context = expr_context.clone();
let (tx, rx) = channel_for_test();
let consumer = SenderConsumer {
input: aggregator.boxed(),
channel: Box::new(LocalOutput::new(233, tx)),
};
let actor_future = async move {
let input = Executor::new(
ExecutorInfo {
schema: input_schema,
pk_indices: PkIndices::new(),
identity: "ReceiverExecutor".to_string(),
},
ReceiverExecutor::for_test(actor_id, input_rx, shared_context.clone()).boxed(),
);
let agg_calls = vec![
AggCall::from_pretty("(count:int8)"),
AggCall::from_pretty("(sum:int8 $0:int8)"),
];
let schema = generate_agg_schema(&input, &agg_calls, None);
// for the local aggregator, we need two states: row count and sum
let aggregator =
StatelessSimpleAggExecutor::new(actor_ctx.clone(), input, schema, agg_calls)
.unwrap();
let consumer = SenderConsumer {
input: aggregator.boxed(),
channel: Box::new(LocalOutput::new(233, tx)),
};

let actor = Actor::new(
consumer,
vec![],
StreamingMetrics::unused().into(),
actor_ctx,
expr_context.clone(),
barrier_test_env
.shared_context
.local_barrier_manager
.clone(),
);
(actor, rx)
let actor = Actor::new(
consumer,
vec![],
StreamingMetrics::unused().into(),
actor_ctx,
expr_context,
shared_context.local_barrier_manager.clone(),
);

actor.run().await
}
.boxed();
(actor_future, rx)
};

// join handles of all actors
Expand All @@ -113,121 +117,127 @@ async fn test_merger_sum_aggr() {
// create 17 local aggregation actors
for _ in 0..17 {
let (tx, rx) = channel_for_test();
let (actor, channel) = make_actor(rx);
let (actor_future, channel) = make_actor(rx);
outputs.push(channel);
actor_futures.push(actor.run().boxed());
actor_futures.push(actor_future);
inputs.push(Box::new(LocalOutput::new(233, tx)) as BoxedOutput);
}

// create a round robin dispatcher, which dispatches messages to the actors

let actor_id = gen_next_actor_id();
let (input, rx) = channel_for_test();
let receiver_op = Executor::new(
ExecutorInfo {
// input schema of local simple agg
schema: Schema::new(vec![Field::unnamed(DataType::Int64)]),
pk_indices: PkIndices::new(),
identity: "ReceiverExecutor".to_string(),
},
ReceiverExecutor::for_test(actor_id, rx, barrier_test_env.shared_context.clone()).boxed(),
);
let dispatcher = DispatchExecutor::new(
receiver_op,
vec![DispatcherImpl::RoundRobin(RoundRobinDataDispatcher::new(
inputs,
vec![0],
0,
))],
0,
0,
barrier_test_env.shared_context.clone(),
metrics,
config::default::developer::stream_chunk_size(),
);
let actor = Actor::new(
dispatcher,
vec![],
StreamingMetrics::unused().into(),
ActorContext::for_test(actor_id),
expr_context.clone(),
barrier_test_env
.shared_context
.local_barrier_manager
.clone(),
);
actor_futures.push(actor.run().boxed());
let actor_future = {
let shared_context = barrier_test_env.shared_context.clone();
let expr_context = expr_context.clone();
async move {
let receiver_op = Executor::new(
ExecutorInfo {
// input schema of local simple agg
schema: Schema::new(vec![Field::unnamed(DataType::Int64)]),
pk_indices: PkIndices::new(),
identity: "ReceiverExecutor".to_string(),
},
ReceiverExecutor::for_test(actor_id, rx, shared_context.clone()).boxed(),
);
let dispatcher = DispatchExecutor::new(
receiver_op,
vec![DispatcherImpl::RoundRobin(RoundRobinDataDispatcher::new(
inputs,
vec![0],
0,
))],
0,
0,
shared_context.clone(),
metrics,
config::default::developer::stream_chunk_size(),
);
let actor = Actor::new(
dispatcher,
vec![],
StreamingMetrics::unused().into(),
ActorContext::for_test(actor_id),
expr_context,
shared_context.local_barrier_manager.clone(),
);
actor.run().await
}
.boxed()
};
actor_futures.push(actor_future);

let actor_ctx = ActorContext::for_test(gen_next_actor_id());

// use a merge operator to collect data from dispatchers before sending them to aggregator
let merger = Executor::new(
ExecutorInfo {
// output schema of local simple agg
schema: Schema::new(vec![
Field::unnamed(DataType::Int64),
Field::unnamed(DataType::Int64),
]),
pk_indices: PkIndices::new(),
identity: "MergeExecutor".to_string(),
},
MergeExecutor::for_test(
actor_ctx.id,
outputs,
barrier_test_env.shared_context.clone(),
)
.boxed(),
);
let items = Arc::new(Mutex::new(vec![]));
let actor_future = {
let shared_context = barrier_test_env.shared_context.clone();
let expr_context = expr_context.clone();
let items = items.clone();
async move {
// use a merge operator to collect data from dispatchers before sending them to aggregator
let merger = Executor::new(
ExecutorInfo {
// output schema of local simple agg
schema: Schema::new(vec![
Field::unnamed(DataType::Int64),
Field::unnamed(DataType::Int64),
]),
pk_indices: PkIndices::new(),
identity: "MergeExecutor".to_string(),
},
MergeExecutor::for_test(actor_ctx.id, outputs, shared_context.clone()).boxed(),
);

// for global aggregator, we need to sum data and sum row count
let is_append_only = false;
let aggregator = new_boxed_simple_agg_executor(
actor_ctx.clone(),
MemoryStateStore::new(),
merger,
is_append_only,
vec![
AggCall::from_pretty("(sum0:int8 $0:int8)"),
AggCall::from_pretty("(sum:int8 $1:int8)"),
AggCall::from_pretty("(count:int8)"),
],
2, // row_count_index
vec![],
2,
false,
)
.await;
// for global aggregator, we need to sum data and sum row count
let is_append_only = false;
let aggregator = new_boxed_simple_agg_executor(
actor_ctx.clone(),
MemoryStateStore::new(),
merger,
is_append_only,
vec![
AggCall::from_pretty("(sum0:int8 $0:int8)"),
AggCall::from_pretty("(sum:int8 $1:int8)"),
AggCall::from_pretty("(count:int8)"),
],
2, // row_count_index
vec![],
2,
false,
)
.await;

let projection = ProjectExecutor::new(
actor_ctx.clone(),
aggregator,
vec![
// TODO: use the new streaming_if_null expression here, and add `None` tests
NonStrictExpression::for_test(InputRefExpression::new(DataType::Int64, 1)),
],
MultiMap::new(),
vec![],
0.0,
false,
);
let projection = ProjectExecutor::new(
actor_ctx.clone(),
aggregator,
vec![
// TODO: use the new streaming_if_null expression here, and add `None` tests
NonStrictExpression::for_test(InputRefExpression::new(DataType::Int64, 1)),
],
MultiMap::new(),
vec![],
0.0,
false,
);

let items = Arc::new(Mutex::new(vec![]));
let consumer = MockConsumer {
input: projection.boxed(),
data: items.clone(),
let consumer = MockConsumer {
input: projection.boxed(),
data: items.clone(),
};
let actor = Actor::new(
consumer,
vec![],
StreamingMetrics::unused().into(),
actor_ctx.clone(),
expr_context,
shared_context.local_barrier_manager.clone(),
);
actor.run().await
}
.boxed()
};
let actor = Actor::new(
consumer,
vec![],
StreamingMetrics::unused().into(),
actor_ctx.clone(),
expr_context.clone(),
barrier_test_env
.shared_context
.local_barrier_manager
.clone(),
);
actor_futures.push(actor.run().boxed());
actor_futures.push(actor_future);

let mut epoch = test_epoch(1);
let b1 = Barrier::new_test_barrier(epoch);
Expand Down
Loading

0 comments on commit 29ddf58

Please sign in to comment.