From 0248358f24896e6267aa17da50c7ff8c3e2f32a5 Mon Sep 17 00:00:00 2001 From: imDema Date: Wed, 13 Mar 2024 17:16:34 +0100 Subject: [PATCH] Switch to stream_iter in examples --- src/operator/iteration/iterate.rs | 2 +- src/operator/iteration/replay.rs | 20 ++--- src/operator/join/mod.rs | 20 ++--- src/operator/merge.rs | 4 +- src/operator/mod.rs | 120 +++++++++++++++--------------- src/operator/window/aggr/fold.rs | 2 +- src/operator/window/mod.rs | 4 +- tests/iteration/iterate.rs | 2 +- tests/join.rs | 13 ++-- 9 files changed, 93 insertions(+), 94 deletions(-) diff --git a/src/operator/iteration/iterate.rs b/src/operator/iteration/iterate.rs index fc38da0c..b2cff6b9 100644 --- a/src/operator/iteration/iterate.rs +++ b/src/operator/iteration/iterate.rs @@ -342,7 +342,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new(0..3)).shuffle(); + /// let s = env.stream_iter(0..3).shuffle(); /// let (state, items) = s.iterate( /// 3, // at most 3 iterations /// 0, // the initial state is zero diff --git a/src/operator/iteration/replay.rs b/src/operator/iteration/replay.rs index d060681f..a6453bc2 100644 --- a/src/operator/iteration/replay.rs +++ b/src/operator/iteration/replay.rs @@ -239,7 +239,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new(0..3)).shuffle(); + /// let s = env.stream_iter(0..3).shuffle(); /// let state = s.replay( /// 3, // at most 3 iterations /// 0, // the initial state is zero @@ -300,14 +300,14 @@ where Default::default(), self.block.iteration_ctx.clone(), ); - let leader_block_id = output_block.id; + let output_id = output_block.id; // the output stream is outside this loop, so it doesn't have the lock for this state // the lock for synchronizing the access to the state of this iteration let state_lock = Arc::new(IterationStateLock::default()); let mut iter_start = - self.add_operator(|prev| Replay::new(prev, state, leader_block_id, state_lock.clone())); + self.add_operator(|prev| Replay::new(prev, state, output_id, state_lock.clone())); let replay_block_id = iter_start.block.id; // save the stack of the iteration for checking the stream returned by the body @@ -325,24 +325,24 @@ where } iter_end.block.iteration_ctx.pop().unwrap(); - let iter_end = iter_end.add_operator(|prev| IterationEnd::new(prev, leader_block_id)); + let iter_end = iter_end.add_operator(|prev| IterationEnd::new(prev, output_id)); let iteration_end_block_id = iter_end.block.id; - let mut env_lock = iter_end.ctx.lock(); - let scheduler = env_lock.scheduler_mut(); - scheduler.schedule_block(iter_end.block); + let mut ctx_lock = iter_end.ctx.lock(); + let scheduler = ctx_lock.scheduler_mut(); // connect the IterationEnd to the IterationLeader scheduler.connect_blocks( iteration_end_block_id, - leader_block_id, + output_id, TypeId::of::(), ); scheduler.connect_blocks( - leader_block_id, + output_id, replay_block_id, TypeId::of::>(), ); - drop(env_lock); + scheduler.schedule_block(iter_end.block); + drop(ctx_lock); // store the id of the block containing the IterationEnd feedback_block_id.store(iteration_end_block_id as usize, Ordering::Release); diff --git a/src/operator/join/mod.rs b/src/operator/join/mod.rs index 58734b20..d3d3585e 100644 --- a/src/operator/join/mod.rs +++ b/src/operator/join/mod.rs @@ -102,8 +102,8 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s1 = env.stream(IteratorSource::new(0..5u8)); - /// let s2 = env.stream(IteratorSource::new(0..5i32)); + /// let s1 = env.stream_iter(0..5u8); + /// let s2 = env.stream_iter(0..5i32); /// let res = s1.join(s2, |n| (n % 5) as i32, |n| n % 2).drop_key().collect_vec(); /// /// env.execute_blocking(); @@ -150,8 +150,8 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s1 = env.stream(IteratorSource::new(0..5u8)); - /// let s2 = env.stream(IteratorSource::new(0..5i32)); + /// let s1 = env.stream_iter(0..5u8); + /// let s2 = env.stream_iter(0..5i32); /// let res = s1.left_join(s2, |n| (n % 5) as i32, |n| n % 2).drop_key().collect_vec(); /// /// env.execute_blocking(); @@ -199,8 +199,8 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s1 = env.stream(IteratorSource::new(0..5u8)); - /// let s2 = env.stream(IteratorSource::new(0..5i32)); + /// let s1 = env.stream_iter(0..5u8); + /// let s2 = env.stream_iter(0..5i32); /// let res = s1.outer_join(s2, |n| (n % 5) as i32, |n| n % 2).drop_key().collect_vec(); /// /// env.execute_blocking(); @@ -251,8 +251,8 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s1 = env.stream(IteratorSource::new(0..5u8)); - /// let s2 = env.stream(IteratorSource::new(0..5i32)); + /// let s1 = env.stream_iter(0..5u8); + /// let s2 = env.stream_iter(0..5i32); /// let j = s1.join_with(s2, |n| (n % 5) as i32, |n| n % 2).ship_hash(); /// ``` /// @@ -260,8 +260,8 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s1 = env.stream(IteratorSource::new(0..5u8)); - /// let s2 = env.stream(IteratorSource::new(0..5i32)); + /// let s1 = env.stream_iter(0..5u8); + /// let s2 = env.stream_iter(0..5i32); /// let j = s1.join_with(s2, |n| (n % 5) as i32, |n| n % 2).ship_broadcast_right(); /// ``` pub fn join_with( diff --git a/src/operator/merge.rs b/src/operator/merge.rs index 7a919e77..880d77fb 100644 --- a/src/operator/merge.rs +++ b/src/operator/merge.rs @@ -28,8 +28,8 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s1 = env.stream(IteratorSource::new((0..10))); - /// let s2 = env.stream(IteratorSource::new((10..20))); + /// let s1 = env.stream_iter(0..10); + /// let s2 = env.stream_iter(10..20); /// let res = s1.merge(s2).collect_vec(); /// /// env.execute_blocking(); diff --git a/src/operator/mod.rs b/src/operator/mod.rs index db941851..3ef54c52 100644 --- a/src/operator/mod.rs +++ b/src/operator/mod.rs @@ -314,7 +314,7 @@ where /// use noir_compute::operator::Timestamp; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); /// - /// let s = env.stream(IteratorSource::new((0..10))); + /// let s = env.stream_iter(0..10); /// s.add_timestamps( /// |&n| n, /// |&n, &ts| if n % 2 == 0 { Some(ts) } else { None } @@ -350,7 +350,7 @@ where /// use noir_compute::BatchMode; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); /// - /// let s = env.stream(IteratorSource::new((0..10))); + /// let s = env.stream_iter(0..10); /// s.batch_mode(BatchMode::fixed(1024)); /// ``` pub fn batch_mode(mut self, batch_mode: BatchMode) -> Self { @@ -369,7 +369,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..10))); + /// let s = env.stream_iter(0..10); /// let res = s.filter_map(|n| if n % 2 == 0 { Some(n * 3) } else { None }).collect_vec(); /// /// env.execute_blocking(); @@ -394,7 +394,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..10))); + /// let s = env.stream_iter(0..10); /// let res = s.filter(|&n| n % 2 == 0).collect_vec(); /// /// env.execute_blocking(); @@ -436,7 +436,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new(std::array::IntoIter::new([1, 2, -5, 3, 1]))); + /// let s = env.stream_iter((std::array::IntoIter::new([1, 2, -5, 3, 1]))); /// let res = s.rich_filter_map({ /// let mut sum = 0; /// move |x| { @@ -482,7 +482,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((1..=5))); + /// let s = env.stream_iter(1..=5); /// let res = s.rich_map({ /// let mut sum = 0; /// move |x| { @@ -503,7 +503,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((1..=5))); + /// let s = env.stream_iter(1..=5); /// let res = s.rich_map({ /// let mut id = 0; /// move |x| { @@ -536,7 +536,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..5))); + /// let s = env.stream_iter(0..5); /// let res = s.map(|n| n * 10).collect_vec(); /// /// env.execute_blocking(); @@ -663,7 +663,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((5..15))); + /// let s = env.stream_iter(5..15); /// let res = s.map_memo_by(|n| (n * n) % 7, |n| n % 7, 5).collect_vec(); /// /// env.execute_blocking(); @@ -711,7 +711,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..5))); + /// let s = env.stream_iter(0..5); /// let res = s.fold(0, |acc, value| *acc += value).collect_vec(); /// /// env.execute_blocking(); @@ -757,7 +757,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..5))); + /// let s = env.stream_iter(0..5); /// let res = s.fold_assoc(0, |acc, value| *acc += value, |acc, value| *acc += value).collect_vec(); /// /// env.execute_blocking(); @@ -804,7 +804,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..5))); + /// let s = env.stream_iter(0..5); /// let res = s /// .group_by_fold(|&n| n % 2, 0, |acc, value| *acc += value, |acc, value| *acc += value) /// .collect_vec(); @@ -895,7 +895,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..5))); + /// let s = env.stream_iter(0..5); /// let res = s.key_by(|&n| n % 2).collect_vec(); /// /// env.execute_blocking(); @@ -920,7 +920,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..5))); + /// let s = env.stream_iter(0..5); /// s.inspect(|n| println!("Item: {}", n)).for_each(std::mem::drop); /// /// env.execute_blocking(); @@ -952,7 +952,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..=3))); + /// let s = env.stream_iter(0..=3); /// let res = s.rich_flat_map({ /// let mut elements = Vec::new(); /// move |y| { @@ -1016,7 +1016,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..3))); + /// let s = env.stream_iter(0..3); /// let res = s.flat_map(|n| vec![n, n]).collect_vec(); /// /// env.execute_blocking(); @@ -1041,7 +1041,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..5))); + /// let s = env.stream_iter(0..5); /// s.for_each(|n| println!("Item: {}", n)); /// /// env.execute_blocking(); @@ -1064,7 +1064,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new(vec![ + /// let s = env.stream_iter((vec![ /// vec![1, 2, 3], /// vec![], /// vec![4, 5], @@ -1103,7 +1103,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..10))); + /// let s = env.stream_iter(0..10); /// s.broadcast(); /// ``` pub fn broadcast(self) -> Stream> { @@ -1129,7 +1129,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..5))); + /// let s = env.stream_iter(0..5); /// let keyed = s.group_by(|&n| n % 2); // partition even and odd elements /// ``` pub fn group_by(self, keyer: Fk) -> KeyedStream> @@ -1162,7 +1162,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..5))); + /// let s = env.stream_iter(0..5); /// let res = s /// .group_by_max_element(|&n| n % 2, |&n| n) /// .collect_vec(); @@ -1211,7 +1211,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..5))); + /// let s = env.stream_iter(0..5); /// let res = s /// .group_by_sum(|&n| n % 2, |n| n) /// .collect_vec(); @@ -1275,7 +1275,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..5))); + /// let s = env.stream_iter(0..5); /// let res = s /// .group_by_avg(|&n| n % 2, |&n| n as f64) /// .collect_vec(); @@ -1338,7 +1338,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..5))); + /// let s = env.stream_iter(0..5); /// let res = s /// .group_by_count(|&n| n % 2) /// .collect_vec(); @@ -1380,7 +1380,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..5))); + /// let s = env.stream_iter(0..5); /// let res = s /// .group_by_min_element(|&n| n % 2, |&n| n) /// .collect_vec(); @@ -1437,7 +1437,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..5))); + /// let s = env.stream_iter(0..5); /// let res = s /// .group_by_reduce(|&n| n % 2, |acc, value| *acc += value) /// .collect_vec(); @@ -1623,7 +1623,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..5))); + /// let s = env.stream_iter(0..5); /// let res = s.reduce_assoc(|a, b| a + b).collect_vec(); /// /// env.execute_blocking(); @@ -1694,7 +1694,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..5))); + /// let s = env.stream_iter(0..5); /// let res = s.shuffle(); /// ``` pub fn shuffle(self) -> Stream> { @@ -1713,7 +1713,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..5))); + /// let s = env.stream_iter(0..5); /// let mut splits = s.split(3); /// let a = splits.pop().unwrap(); /// let b = splits.pop().unwrap(); @@ -1749,8 +1749,8 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s1 = env.stream(IteratorSource::new(vec!['A', 'B', 'C', 'D'].into_iter())); - /// let s2 = env.stream(IteratorSource::new(vec![1, 2, 3].into_iter())); + /// let s1 = env.stream_iter((vec!['A', 'B', 'C', 'D'].into_iter())); + /// let s2 = env.stream_iter((vec![1, 2, 3].into_iter())); /// let res = s1.zip(s2).collect_vec(); /// /// env.execute_blocking(); @@ -1788,7 +1788,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..10u32))); + /// let s = env.stream_iter(0..10u32); /// let rx = s.collect_channel(); /// /// env.execute_blocking(); @@ -1819,7 +1819,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..10u32))); + /// let s = env.stream_iter(0..10u32); /// let rx = s.collect_channel(); /// /// env.execute_blocking(); @@ -1851,7 +1851,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..10))); + /// let s = env.stream_iter(0..10); /// let res = s.collect_vec(); /// /// env.execute_blocking(); @@ -1882,7 +1882,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..10))); + /// let s = env.stream_iter(0..10); /// let res = s.collect_vec(); /// /// env.execute_blocking(); @@ -1912,7 +1912,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..10))); + /// let s = env.stream_iter(0..10); /// let res = s.collect_vec(); /// /// env.execute_blocking(); @@ -1942,7 +1942,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..10))); + /// let s = env.stream_iter(0..10); /// let res = s.collect_vec(); /// /// env.execute_blocking(); @@ -1971,7 +1971,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..10))); + /// let s = env.stream_iter(0..10); /// let res = s.collect_vec(); /// /// env.execute_blocking(); @@ -2093,7 +2093,7 @@ where /// use noir_compute::operator::Timestamp; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); /// - /// let s = env.stream(IteratorSource::new((0..10))); + /// let s = env.stream_iter(0..10); /// s /// .group_by(|i| i % 2) /// .add_timestamps( @@ -2132,7 +2132,7 @@ where /// use noir_compute::BatchMode; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); /// - /// let s = env.stream(IteratorSource::new((0..10))).group_by(|&n| n % 2); + /// let s = env.stream_iter(0..10).group_by(|&n| n % 2); /// s.batch_mode(BatchMode::fixed(1024)); /// ``` pub fn batch_mode(mut self, batch_mode: BatchMode) -> Self { @@ -2151,7 +2151,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..10))).group_by(|&n| n % 2); + /// let s = env.stream_iter(0..10).group_by(|&n| n % 2); /// let res = s.filter_map(|(_key, n)| if n % 3 == 0 { Some(n * 4) } else { None }).collect_vec(); /// /// env.execute_blocking(); @@ -2180,7 +2180,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..10))).group_by(|&n| n % 2); + /// let s = env.stream_iter(0..10).group_by(|&n| n % 2); /// let res = s.filter(|&(_key, n)| n % 3 == 0).collect_vec(); /// /// env.execute_blocking(); @@ -2207,7 +2207,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..3))).group_by(|&n| n % 2); + /// let s = env.stream_iter(0..3).group_by(|&n| n % 2); /// let res = s.flat_map(|(_key, n)| vec![n, n]).collect_vec(); /// /// env.execute_blocking(); @@ -2235,7 +2235,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..5))).group_by(|&n| n % 2); + /// let s = env.stream_iter(0..5).group_by(|&n| n % 2); /// s.inspect(|(key, n)| println!("Item: {} has key {}", n, key)).for_each(std::mem::drop); /// /// env.execute_blocking(); @@ -2273,7 +2273,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..5))).group_by(|&n| n % 2); + /// let s = env.stream_iter(0..5).group_by(|&n| n % 2); /// let res = s /// .fold(0, |acc, value| *acc += value) /// .collect_vec(); @@ -2318,7 +2318,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..5))).group_by(|&n| n % 2); + /// let s = env.stream_iter(0..5).group_by(|&n| n % 2); /// let res = s /// .reduce(|acc, value| *acc += value) /// .collect_vec(); @@ -2351,7 +2351,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..5))).group_by(|&n| n % 2); + /// let s = env.stream_iter(0..5).group_by(|&n| n % 2); /// let res = s.map(|(_key, n)| 10 * n).collect_vec(); /// /// env.execute_blocking(); @@ -2442,7 +2442,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let stream = env.stream(IteratorSource::new((0..4))).group_by(|&n| n % 2); + /// let stream = env.stream_iter(0..4).group_by(|&n| n % 2); /// let res = stream.unkey().collect_vec(); /// /// env.execute_blocking(); @@ -2464,7 +2464,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let stream = env.stream(IteratorSource::new((0..4))).group_by(|&n| n % 2); + /// let stream = env.stream_iter(0..4).group_by(|&n| n % 2); /// let res = stream.drop_key().collect_vec(); /// /// env.execute_blocking(); @@ -2485,7 +2485,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..5))).group_by(|&n| n % 2); + /// let s = env.stream_iter(0..5).group_by(|&n| n % 2); /// s.for_each(|(key, n)| println!("Item: {} has key {}", n, key)); /// /// env.execute_blocking(); @@ -2545,8 +2545,8 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s1 = env.stream(IteratorSource::new((0..3))).group_by(|&n| n % 2); - /// let s2 = env.stream(IteratorSource::new((3..5))).group_by(|&n| n % 2); + /// let s1 = env.stream_iter(0..3).group_by(|&n| n % 2); + /// let s2 = env.stream_iter(3..5).group_by(|&n| n % 2); /// let res = s1.merge(s2).collect_vec(); /// /// env.execute_blocking(); @@ -2590,7 +2590,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..5))); + /// let s = env.stream_iter(0..5); /// let res = s.shuffle(); /// ``` @@ -2613,7 +2613,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..10u32))); + /// let s = env.stream_iter(0..10u32); /// let rx = s.collect_channel(); /// /// env.execute_blocking(); @@ -2640,7 +2640,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..10u32))); + /// let s = env.stream_iter(0..10u32); /// let rx = s.collect_channel(); /// /// env.execute_blocking(); @@ -2671,7 +2671,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..3))).group_by(|&n| n % 2); + /// let s = env.stream_iter(0..3).group_by(|&n| n % 2); /// let res = s.collect_vec(); /// /// env.execute_blocking(); @@ -2701,7 +2701,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..3))).group_by(|&n| n % 2); + /// let s = env.stream_iter(0..3).group_by(|&n| n % 2); /// let res = s.collect_vec_all(); /// /// env.execute_blocking(); @@ -2730,7 +2730,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..3))).group_by(|&n| n % 2); + /// let s = env.stream_iter(0..3).group_by(|&n| n % 2); /// let res = s.collect_vec(); /// /// env.execute_blocking(); @@ -2758,7 +2758,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..3))).group_by(|&n| n % 2); + /// let s = env.stream_iter(0..3).group_by(|&n| n % 2); /// let res = s.collect_vec(); /// /// env.execute_blocking(); @@ -2792,7 +2792,7 @@ where /// # use noir_compute::operator::source::IteratorSource; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); /// let s = env - /// .stream(IteratorSource::new(vec![ + /// .stream_iter((vec![ /// vec![0, 1, 2], /// vec![3, 4, 5], /// vec![6, 7] diff --git a/src/operator/window/aggr/fold.rs b/src/operator/window/aggr/fold.rs index 0a7b3f81..8b3cdc58 100644 --- a/src/operator/window/aggr/fold.rs +++ b/src/operator/window/aggr/fold.rs @@ -110,7 +110,7 @@ where /// # use noir_compute::operator::source::IteratorSource; /// # use noir_compute::operator::window::CountWindow; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..5))); + /// let s = env.stream_iter(0..5); /// let res = s /// .group_by(|&n| n % 2) /// .window(CountWindow::tumbling(2)) diff --git a/src/operator/window/mod.rs b/src/operator/window/mod.rs index 1f15d0e4..29d163a8 100644 --- a/src/operator/window/mod.rs +++ b/src/operator/window/mod.rs @@ -295,7 +295,7 @@ where /// # use noir_compute::operator::source::IteratorSource; /// # use noir_compute::operator::window::CountWindow; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..9))); + /// let s = env.stream_iter(0..9); /// let res = s /// .group_by(|&n| n % 2) /// .window(CountWindow::sliding(3, 2)) @@ -338,7 +338,7 @@ where /// # use noir_compute::operator::source::IteratorSource; /// # use noir_compute::operator::window::CountWindow; /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); - /// let s = env.stream(IteratorSource::new((0..5usize))); + /// let s = env.stream_iter(0..5usize); /// let res = s /// .window_all(CountWindow::tumbling(2)) /// .sum::() diff --git a/tests/iteration/iterate.rs b/tests/iteration/iterate.rs index 40685b51..ad867942 100644 --- a/tests/iteration/iterate.rs +++ b/tests/iteration/iterate.rs @@ -67,7 +67,7 @@ fn test_iterate_side_input() { let n_iter = 5; let source = IteratorSource::new(0..n); - let side = env.stream(IteratorSource::new(0..n)); + let side = env.stream_iter(0..n); let (state, res) = env.stream(source).map(|x| (x, x)).shuffle().iterate( n_iter, 0u64, diff --git a/tests/join.rs b/tests/join.rs index 8b39b8a2..ad64561a 100644 --- a/tests/join.rs +++ b/tests/join.rs @@ -5,15 +5,14 @@ use std::time::Duration; use itertools::Itertools; -use noir_compute::operator::source::IteratorSource; use noir_compute::BatchMode; use utils::TestHelper; mod utils; macro_rules! run_test { ($env:expr, $n1:expr, $n2:expr, $m:expr, $ship:tt, $local:tt, $variant:tt) => {{ - let s1 = $env.stream(IteratorSource::new(0..$n1)); - let s2 = $env.stream(IteratorSource::new(0..$n2)); + let s1 = $env.stream_iter(0..$n1); + let s2 = $env.stream_iter(0..$n2); let join = s1 .batch_mode(BatchMode::adaptive(100, Duration::from_millis(100))) .join_with(s2, |x| *x as u8 % $m, |x| *x as u8 % $m); @@ -54,8 +53,8 @@ macro_rules! run_test { macro_rules! run_test_shortcut { ($env:expr, $n1:expr, $n2:expr, $m:expr, $variant:tt) => {{ - let s1 = $env.stream(IteratorSource::new(0..$n1)); - let s2 = $env.stream(IteratorSource::new(0..$n2)); + let s1 = $env.stream_iter(0..$n1); + let s2 = $env.stream_iter(0..$n2); let join = s1 .batch_mode(BatchMode::adaptive(100, Duration::from_millis(100))); let res = run_test_shortcut!(@variant, $variant, join, s2, |x: &u16| *x as u8 % $m, |x: &u32| *x as u8 % $m); @@ -242,7 +241,7 @@ fn self_join() { TestHelper::local_remote_env(|env| { let n = 200u32; let s1 = env - .stream(IteratorSource::new(0..n)) + .stream_iter(0..n) .batch_mode(BatchMode::adaptive(100, Duration::from_millis(100))); let mut splits = s1.split(2).into_iter(); @@ -273,7 +272,7 @@ fn join_in_loop() { let n = 200u32; let n_iter = 10; let s = env - .stream(IteratorSource::new(0..n)) + .stream_iter(0..n) .shuffle() .batch_mode(BatchMode::adaptive(100, Duration::from_millis(100)));