From 901b743fe4ec8117450c46d1f63d2b3e7c4da1cb Mon Sep 17 00:00:00 2001 From: Neng Li Date: Fri, 1 Dec 2023 10:50:48 +0800 Subject: [PATCH] fix(interactive): fix unexpect result of operator aggregate() + iterate() (#3391) Fixes #3177 --- .../pegasus/src/operator/iteration/mod.rs | 7 +- .../engine/pegasus/pegasus/src/stream.rs | 4 ++ .../pegasus/pegasus/tests/iteration_test.rs | 69 ++++++++++++++++++- 3 files changed, 78 insertions(+), 2 deletions(-) diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/operator/iteration/mod.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/operator/iteration/mod.rs index 39573479821e..40465ff63ee7 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/operator/iteration/mod.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/operator/iteration/mod.rs @@ -1,3 +1,5 @@ +use std::cmp::max; + use crate::api::{IterCondition, Iteration}; use crate::macros::filter::*; use crate::stream::Stream; @@ -53,7 +55,7 @@ where F: FnOnce(Stream) -> Result, BuildJobError>, { let max_iters = until.max_iters; - let (leave, enter) = stream + let (mut leave, enter) = stream .enter()? .binary_branch_notify("switch", |info| { SwitchOperator::::new(info.scope_level, emit_kind, until) @@ -65,7 +67,10 @@ where .transform_notify("feedback", move |info| { FeedbackOperator::::new(info.scope_level, max_iters) })?; + let feedback_partitions = feedback.get_partitions(); feedback.feedback_to(index)?; + let partition_update = max(feedback_partitions, leave.get_partitions()); + leave.set_partitions(partition_update); leave.leave() } diff --git a/interactive_engine/executor/engine/pegasus/pegasus/src/stream.rs b/interactive_engine/executor/engine/pegasus/pegasus/src/stream.rs index 87b97fbd6701..d0fa37b72c96 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/src/stream.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/src/stream.rs @@ -108,6 +108,10 @@ impl Stream { pub fn get_partitions(&self) -> usize { self.partitions } + + pub fn set_partitions(&mut self, partitions: usize) { + self.partitions = partitions; + } } impl Stream { diff --git a/interactive_engine/executor/engine/pegasus/pegasus/tests/iteration_test.rs b/interactive_engine/executor/engine/pegasus/pegasus/tests/iteration_test.rs index 89a4061881fe..b1f18dc7e7b2 100644 --- a/interactive_engine/executor/engine/pegasus/pegasus/tests/iteration_test.rs +++ b/interactive_engine/executor/engine/pegasus/pegasus/tests/iteration_test.rs @@ -13,7 +13,9 @@ //! See the License for the specific language governing permissions and //! limitations under the License. // -use pegasus::api::{CorrelatedSubTask, Count, EmitKind, Fold, IterCondition, Iteration, Map, Reduce, Sink}; +use pegasus::api::{ + CorrelatedSubTask, Count, EmitKind, Fold, IterCondition, Iteration, Limit, Map, Reduce, Sink, +}; use pegasus::JobConf; #[test] @@ -119,6 +121,71 @@ fn iterate_emit_before_x_r_map_x_test() { assert_eq!(results, expected); } +#[test] +fn aggregate_iterate_count_test() { + let mut conf = JobConf::new("aggregate_iterate_count_test"); + conf.set_workers(2); + let mut result_stream = pegasus::run(conf, || { + |input, output| { + input + .input_from(0..10000u32)? + .limit(1000)? + .filter_map(|i| Ok(Some(i)))? + .iterate_emit_until(IterCondition::max_iters(1), EmitKind::Before, |start| { + Ok(start + .flat_map(|x| Ok(vec![x + 1].into_iter()))? + .repartition(|x| Ok((x % 32) as u64))) + })? + .flat_map(|x| Ok(vec![x + 1].into_iter()))? + .count()? + .sink_into(output) + } + }) + .expect("build job failure"); + + let mut count = 0; + let mut value = 0; + while let Some(Ok(d)) = result_stream.next() { + count += 1; + value = d; + } + assert_eq!(count, 1); + assert_eq!(value, 2000); +} + +#[test] +fn iterate_aggregate_count_test() { + let mut conf = JobConf::new("aggregate_iterate_count_test"); + conf.set_workers(2); + let mut result_stream = pegasus::run(conf, || { + |input, output| { + input + .input_from(0..10000u32)? + .limit(1000)? + .repartition(|x| Ok((x % 32) as u64)) + .filter_map(|i| Ok(Some(i)))? + .iterate_emit_until(IterCondition::max_iters(1), EmitKind::Before, |start| { + Ok(start + .flat_map(|x| Ok(vec![x + 1].into_iter()))? + .aggregate()) + })? + .flat_map(|x| Ok(vec![x + 1].into_iter()))? + .count()? + .sink_into(output) + } + }) + .expect("build job failure"); + + let mut count = 0; + let mut value = 0; + while let Some(Ok(d)) = result_stream.next() { + count += 1; + value = d; + } + assert_eq!(count, 1); + assert_eq!(value, 2000); +} + #[test] fn ping_pong_test_01() { let mut conf = JobConf::new("ping_pong_test_01");