Skip to content

Commit

Permalink
Switch to stream_iter in examples
Browse files Browse the repository at this point in the history
  • Loading branch information
imDema committed Mar 13, 2024
1 parent 9af5ce3 commit 0248358
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 94 deletions.
2 changes: 1 addition & 1 deletion src/operator/iteration/iterate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ where
/// # use noir_compute::{StreamContext, RuntimeConfig};
/// # use noir_compute::operator::source::IteratorSource;
/// # let mut env = StreamContext::new(RuntimeConfig::local(1));
/// let s = env.stream(IteratorSource::new(0..3)).shuffle();
/// let s = env.stream_iter(0..3).shuffle();
/// let (state, items) = s.iterate(
/// 3, // at most 3 iterations
/// 0, // the initial state is zero
Expand Down
20 changes: 10 additions & 10 deletions src/operator/iteration/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ where
/// # use noir_compute::{StreamContext, RuntimeConfig};
/// # use noir_compute::operator::source::IteratorSource;
/// # let mut env = StreamContext::new(RuntimeConfig::local(1));
/// let s = env.stream(IteratorSource::new(0..3)).shuffle();
/// let s = env.stream_iter(0..3).shuffle();
/// let state = s.replay(
/// 3, // at most 3 iterations
/// 0, // the initial state is zero
Expand Down Expand Up @@ -300,14 +300,14 @@ where
Default::default(),
self.block.iteration_ctx.clone(),
);
let leader_block_id = output_block.id;
let output_id = output_block.id;
// the output stream is outside this loop, so it doesn't have the lock for this state

// the lock for synchronizing the access to the state of this iteration
let state_lock = Arc::new(IterationStateLock::default());

let mut iter_start =
self.add_operator(|prev| Replay::new(prev, state, leader_block_id, state_lock.clone()));
self.add_operator(|prev| Replay::new(prev, state, output_id, state_lock.clone()));
let replay_block_id = iter_start.block.id;

// save the stack of the iteration for checking the stream returned by the body
Expand All @@ -325,24 +325,24 @@ where
}
iter_end.block.iteration_ctx.pop().unwrap();

let iter_end = iter_end.add_operator(|prev| IterationEnd::new(prev, leader_block_id));
let iter_end = iter_end.add_operator(|prev| IterationEnd::new(prev, output_id));
let iteration_end_block_id = iter_end.block.id;

let mut env_lock = iter_end.ctx.lock();
let scheduler = env_lock.scheduler_mut();
scheduler.schedule_block(iter_end.block);
let mut ctx_lock = iter_end.ctx.lock();
let scheduler = ctx_lock.scheduler_mut();
// connect the IterationEnd to the IterationLeader
scheduler.connect_blocks(
iteration_end_block_id,
leader_block_id,
output_id,
TypeId::of::<DeltaUpdate>(),
);
scheduler.connect_blocks(
leader_block_id,
output_id,
replay_block_id,
TypeId::of::<StateFeedback<State>>(),
);
drop(env_lock);
scheduler.schedule_block(iter_end.block);
drop(ctx_lock);

// store the id of the block containing the IterationEnd
feedback_block_id.store(iteration_end_block_id as usize, Ordering::Release);
Expand Down
20 changes: 10 additions & 10 deletions src/operator/join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ where
/// # use noir_compute::{StreamContext, RuntimeConfig};
/// # use noir_compute::operator::source::IteratorSource;
/// # let mut env = StreamContext::new(RuntimeConfig::local(1));
/// let s1 = env.stream(IteratorSource::new(0..5u8));
/// let s2 = env.stream(IteratorSource::new(0..5i32));
/// let s1 = env.stream_iter(0..5u8);
/// let s2 = env.stream_iter(0..5i32);
/// let res = s1.join(s2, |n| (n % 5) as i32, |n| n % 2).drop_key().collect_vec();
///
/// env.execute_blocking();
Expand Down Expand Up @@ -150,8 +150,8 @@ where
/// # use noir_compute::{StreamContext, RuntimeConfig};
/// # use noir_compute::operator::source::IteratorSource;
/// # let mut env = StreamContext::new(RuntimeConfig::local(1));
/// let s1 = env.stream(IteratorSource::new(0..5u8));
/// let s2 = env.stream(IteratorSource::new(0..5i32));
/// let s1 = env.stream_iter(0..5u8);
/// let s2 = env.stream_iter(0..5i32);
/// let res = s1.left_join(s2, |n| (n % 5) as i32, |n| n % 2).drop_key().collect_vec();
///
/// env.execute_blocking();
Expand Down Expand Up @@ -199,8 +199,8 @@ where
/// # use noir_compute::{StreamContext, RuntimeConfig};
/// # use noir_compute::operator::source::IteratorSource;
/// # let mut env = StreamContext::new(RuntimeConfig::local(1));
/// let s1 = env.stream(IteratorSource::new(0..5u8));
/// let s2 = env.stream(IteratorSource::new(0..5i32));
/// let s1 = env.stream_iter(0..5u8);
/// let s2 = env.stream_iter(0..5i32);
/// let res = s1.outer_join(s2, |n| (n % 5) as i32, |n| n % 2).drop_key().collect_vec();
///
/// env.execute_blocking();
Expand Down Expand Up @@ -251,17 +251,17 @@ where
/// # use noir_compute::{StreamContext, RuntimeConfig};
/// # use noir_compute::operator::source::IteratorSource;
/// # let mut env = StreamContext::new(RuntimeConfig::local(1));
/// let s1 = env.stream(IteratorSource::new(0..5u8));
/// let s2 = env.stream(IteratorSource::new(0..5i32));
/// let s1 = env.stream_iter(0..5u8);
/// let s2 = env.stream_iter(0..5i32);
/// let j = s1.join_with(s2, |n| (n % 5) as i32, |n| n % 2).ship_hash();
/// ```
///
/// ```
/// # use noir_compute::{StreamContext, RuntimeConfig};
/// # use noir_compute::operator::source::IteratorSource;
/// # let mut env = StreamContext::new(RuntimeConfig::local(1));
/// let s1 = env.stream(IteratorSource::new(0..5u8));
/// let s2 = env.stream(IteratorSource::new(0..5i32));
/// let s1 = env.stream_iter(0..5u8);
/// let s2 = env.stream_iter(0..5i32);
/// let j = s1.join_with(s2, |n| (n % 5) as i32, |n| n % 2).ship_broadcast_right();
/// ```
pub fn join_with<Out2: ExchangeData, OperatorChain2, Key, Keyer1, Keyer2>(
Expand Down
4 changes: 2 additions & 2 deletions src/operator/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ where
/// # use noir_compute::{StreamContext, RuntimeConfig};
/// # use noir_compute::operator::source::IteratorSource;
/// # let mut env = StreamContext::new(RuntimeConfig::local(1));
/// let s1 = env.stream(IteratorSource::new((0..10)));
/// let s2 = env.stream(IteratorSource::new((10..20)));
/// let s1 = env.stream_iter(0..10);
/// let s2 = env.stream_iter(10..20);
/// let res = s1.merge(s2).collect_vec();
///
/// env.execute_blocking();
Expand Down
Loading

0 comments on commit 0248358

Please sign in to comment.