diff --git a/Cargo.lock b/Cargo.lock index 9785c41a..811ab9d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -889,7 +889,7 @@ dependencies = [ [[package]] name = "noir-compute" -version = "0.1.0" +version = "0.2.0" dependencies = [ "anyhow", "base64", diff --git a/Cargo.toml b/Cargo.toml index aca18c57..6dd66fdc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "noir-compute" description = "Network of Operators In Rust" -version = "0.1.0" +version = "0.2.0" edition = "2021" authors = [ "Luca De Martini ", diff --git a/src/environment.rs b/src/environment.rs index 2f16bd86..e4061519 100644 --- a/src/environment.rs +++ b/src/environment.rs @@ -73,10 +73,7 @@ impl StreamContext { } let block = inner.new_block(source, Default::default(), Default::default()); - Stream { - block, - ctx: self.inner.clone(), - } + Stream::new(self.inner.clone(), block) } /// Start the computation. Await on the returned future to actually start the computation. diff --git a/src/operator/iteration/iterate.rs b/src/operator/iteration/iterate.rs index b2cff6b9..4f1bade2 100644 --- a/src/operator/iteration/iterate.rs +++ b/src/operator/iteration/iterate.rs @@ -447,10 +447,7 @@ where ); let output_id = output_block.id; - let iter_stream = Stream { - ctx: ctx.clone(), - block: iter_block, - }; + let iter_stream = Stream::new(ctx.clone(), iter_block); // attach the body of the loop to the Iterate operator let body_stream = body(iter_stream, state_clone); @@ -480,10 +477,7 @@ where batch_mode, Default::default(), ); - let state_stream = Stream { - ctx: ctx.clone(), - block: state_block, - }; + let state_stream = Stream::new(ctx.clone(), state_block); let state_stream = state_stream .key_by(|_| ()) .fold(StateUpdate::default(), local_fold) @@ -538,15 +532,8 @@ where // the connections made by the scheduler and if accidentally set to OnlyOne will // break the connections. ( - Stream { - ctx: ctx.clone(), - block: leader_block, - } - .split_block(End::new, NextStrategy::random()), - Stream { - ctx, - block: output_block, - }, + Stream::new(ctx.clone(), leader_block).split_block(End::new, NextStrategy::random()), + Stream::new(ctx, output_block), ) } } diff --git a/src/operator/iteration/replay.rs b/src/operator/iteration/replay.rs index a6453bc2..6d31867b 100644 --- a/src/operator/iteration/replay.rs +++ b/src/operator/iteration/replay.rs @@ -353,10 +353,6 @@ where // is not changed by the following operators. This because the next strategy affects // the connections made by the scheduler and if accidentally set to OnlyOne will // break the connections. - Stream { - ctx: env, - block: output_block, - } - .split_block(End::new, NextStrategy::random()) + Stream::new(env, output_block).split_block(End::new, NextStrategy::random()) } } diff --git a/src/operator/route.rs b/src/operator/route.rs index 0f5c6565..363abb4a 100644 --- a/src/operator/route.rs +++ b/src/operator/route.rs @@ -56,8 +56,8 @@ impl + 'static> pub(crate) fn build_inner(self) -> Vec>>> { // This is needed to maintain the same parallelism of the split block - let env_lock = self.stream.ctx.clone(); - let mut env = env_lock.lock(); + let ctx = self.stream.ctx.clone(); + let mut ctx_lock = ctx.lock(); let scheduler_requirements = self.stream.block.scheduling.clone(); let batch_mode = self.stream.block.batch_mode; let block_id = self.stream.block.id; @@ -65,7 +65,7 @@ impl + 'static> let mut new_blocks = (0..self.routes.len()) .map(|_| { - env.new_block( + ctx_lock.new_block( Start::single(block_id, iteration_context.last().cloned()), batch_mode, iteration_context.clone(), @@ -78,20 +78,17 @@ impl + 'static> RoutingEnd::new(prev, routes, NextStrategy::only_one(), batch_mode) }); - env.close_block(stream.block); + ctx_lock.close_block(stream.block); for new_block in &mut new_blocks { - env.connect_blocks::(block_id, new_block.id); + ctx_lock.connect_blocks::(block_id, new_block.id); new_block.scheduling = scheduler_requirements.clone(); } - drop(env); + drop(ctx_lock); new_blocks .into_iter() - .map(|block| Stream { - block, - ctx: env_lock.clone(), - }) + .map(|block| Stream::new(ctx.clone(), block)) .collect() } } diff --git a/src/stream.rs b/src/stream.rs index 0826561c..a852077c 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -104,6 +104,10 @@ impl Stream where Op: Operator, { + pub(crate) fn new(ctx: Arc>, block: Block) -> Self { + Self { block, ctx } + } + /// Add a new operator to the current chain inside the stream. This consumes the stream and /// returns a new one with the operator added. /// @@ -118,10 +122,7 @@ where Op2: Operator, GetOp: FnOnce(Op) -> Op2, { - Stream { - block: self.block.add_operator(get_operator), - ctx: self.ctx, - } + Stream::new(self.ctx, self.block.add_operator(get_operator)) } /// Add a new block to the stream, closing and registering the previous one. The new block is @@ -143,7 +144,7 @@ where OpEnd: Operator + 'static, GetEndOp: FnOnce(Op, NextStrategy, BatchMode) -> OpEnd, { - let Stream { block, ctx: env } = self; + let Stream { block, ctx } = self; // Clone parameters for new block let batch_mode = block.batch_mode; let iteration_ctx = block.iteration_ctx.clone(); @@ -153,7 +154,7 @@ where block.is_only_one_strategy = matches!(next_strategy, NextStrategy::OnlyOne); // Close old block - let mut env_lock = env.lock(); + let mut env_lock = ctx.lock(); let prev_id = env_lock.close_block(block); // Create new block let source = Start::single(prev_id, iteration_ctx.last().cloned()); @@ -162,10 +163,7 @@ where env_lock.connect_blocks::(prev_id, new_block.id); drop(env_lock); - Stream { - block: new_block, - ctx: env, - } + Stream::new(ctx, new_block) } /// Similar to `.add_block`, but with 2 incoming blocks. @@ -196,10 +194,7 @@ where S: Operator + Source, Fs: FnOnce(BlockId, BlockId, bool, bool, Option>) -> S, { - let Stream { - block: b1, - ctx: env, - } = self; + let Stream { block: b1, ctx } = self; let Stream { block: b2, .. } = oth; let batch_mode = b1.batch_mode; @@ -239,7 +234,7 @@ where b1.is_only_one_strategy = is_one_1; b2.is_only_one_strategy = is_one_2; - let mut env_lock = env.lock(); + let mut env_lock = ctx.lock(); let id_1 = b1.id; let id_2 = b2.id; @@ -270,21 +265,14 @@ where _ => Scheduling::default(), }; - Stream { - block: new_block, - ctx: env, - } + Stream::new(ctx, new_block) } /// Clone the given block, taking care of connecting the new block to the same previous blocks /// of the original one. pub(crate) fn clone(&mut self) -> Self { let new_block = self.ctx.lock().clone_block(&self.block); - - Stream { - block: new_block, - ctx: self.ctx.clone(), - } + Stream::new(self.ctx.clone(), new_block) } /// Like `add_block` but without creating a new block. Therefore this closes the current stream