Skip to content

Commit

Permalink
Bump version
Browse files Browse the repository at this point in the history
  • Loading branch information
imDema committed Mar 14, 2024
1 parent 0248358 commit 380ee22
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 62 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "noir-compute"
description = "Network of Operators In Rust"
version = "0.1.0"
version = "0.2.0"
edition = "2021"
authors = [
"Luca De Martini <[email protected]>",
Expand Down
5 changes: 1 addition & 4 deletions src/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,7 @@ impl StreamContext {
}

let block = inner.new_block(source, Default::default(), Default::default());
Stream {
block,
ctx: self.inner.clone(),
}
Stream::new(self.inner.clone(), block)
}

/// Start the computation. Await on the returned future to actually start the computation.
Expand Down
21 changes: 4 additions & 17 deletions src/operator/iteration/iterate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,10 +447,7 @@ where
);
let output_id = output_block.id;

let iter_stream = Stream {
ctx: ctx.clone(),
block: iter_block,
};
let iter_stream = Stream::new(ctx.clone(), iter_block);
// attach the body of the loop to the Iterate operator
let body_stream = body(iter_stream, state_clone);

Expand Down Expand Up @@ -480,10 +477,7 @@ where
batch_mode,
Default::default(),
);
let state_stream = Stream {
ctx: ctx.clone(),
block: state_block,
};
let state_stream = Stream::new(ctx.clone(), state_block);
let state_stream = state_stream
.key_by(|_| ())
.fold(StateUpdate::default(), local_fold)
Expand Down Expand Up @@ -538,15 +532,8 @@ where
// the connections made by the scheduler and if accidentally set to OnlyOne will
// break the connections.
(
Stream {
ctx: ctx.clone(),
block: leader_block,
}
.split_block(End::new, NextStrategy::random()),
Stream {
ctx,
block: output_block,
},
Stream::new(ctx.clone(), leader_block).split_block(End::new, NextStrategy::random()),
Stream::new(ctx, output_block),
)
}
}
Expand Down
6 changes: 1 addition & 5 deletions src/operator/iteration/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,10 +353,6 @@ where
// is not changed by the following operators. This because the next strategy affects
// the connections made by the scheduler and if accidentally set to OnlyOne will
// break the connections.
Stream {
ctx: env,
block: output_block,
}
.split_block(End::new, NextStrategy::random())
Stream::new(env, output_block).split_block(End::new, NextStrategy::random())
}
}
17 changes: 7 additions & 10 deletions src/operator/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,16 @@ impl<Out: ExchangeData, OperatorChain: Operator<Out = Out> + 'static>

pub(crate) fn build_inner(self) -> Vec<Stream<Start<SimpleStartReceiver<Out>>>> {
// This is needed to maintain the same parallelism of the split block
let env_lock = self.stream.ctx.clone();
let mut env = env_lock.lock();
let ctx = self.stream.ctx.clone();
let mut ctx_lock = ctx.lock();
let scheduler_requirements = self.stream.block.scheduling.clone();
let batch_mode = self.stream.block.batch_mode;
let block_id = self.stream.block.id;
let iteration_context = self.stream.block.iteration_ctx.clone();

let mut new_blocks = (0..self.routes.len())
.map(|_| {
env.new_block(
ctx_lock.new_block(
Start::single(block_id, iteration_context.last().cloned()),
batch_mode,
iteration_context.clone(),
Expand All @@ -78,20 +78,17 @@ impl<Out: ExchangeData, OperatorChain: Operator<Out = Out> + 'static>
RoutingEnd::new(prev, routes, NextStrategy::only_one(), batch_mode)
});

env.close_block(stream.block);
ctx_lock.close_block(stream.block);

for new_block in &mut new_blocks {
env.connect_blocks::<Out>(block_id, new_block.id);
ctx_lock.connect_blocks::<Out>(block_id, new_block.id);
new_block.scheduling = scheduler_requirements.clone();
}

drop(env);
drop(ctx_lock);
new_blocks
.into_iter()
.map(|block| Stream {
block,
ctx: env_lock.clone(),
})
.map(|block| Stream::new(ctx.clone(), block))
.collect()
}
}
Expand Down
36 changes: 12 additions & 24 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ impl<Op> Stream<Op>
where
Op: Operator,
{
pub(crate) fn new(ctx: Arc<Mutex<StreamContextInner>>, block: Block<Op>) -> Self {
Self { block, ctx }
}

/// Add a new operator to the current chain inside the stream. This consumes the stream and
/// returns a new one with the operator added.
///
Expand All @@ -118,10 +122,7 @@ where
Op2: Operator,
GetOp: FnOnce(Op) -> Op2,
{
Stream {
block: self.block.add_operator(get_operator),
ctx: self.ctx,
}
Stream::new(self.ctx, self.block.add_operator(get_operator))
}

/// Add a new block to the stream, closing and registering the previous one. The new block is
Expand All @@ -143,7 +144,7 @@ where
OpEnd: Operator<Out = ()> + 'static,
GetEndOp: FnOnce(Op, NextStrategy<Op::Out, IndexFn>, BatchMode) -> OpEnd,
{
let Stream { block, ctx: env } = self;
let Stream { block, ctx } = self;
// Clone parameters for new block
let batch_mode = block.batch_mode;
let iteration_ctx = block.iteration_ctx.clone();
Expand All @@ -153,7 +154,7 @@ where
block.is_only_one_strategy = matches!(next_strategy, NextStrategy::OnlyOne);

// Close old block
let mut env_lock = env.lock();
let mut env_lock = ctx.lock();
let prev_id = env_lock.close_block(block);
// Create new block
let source = Start::single(prev_id, iteration_ctx.last().cloned());
Expand All @@ -162,10 +163,7 @@ where
env_lock.connect_blocks::<Op::Out>(prev_id, new_block.id);

drop(env_lock);
Stream {
block: new_block,
ctx: env,
}
Stream::new(ctx, new_block)
}

/// Similar to `.add_block`, but with 2 incoming blocks.
Expand Down Expand Up @@ -196,10 +194,7 @@ where
S: Operator + Source,
Fs: FnOnce(BlockId, BlockId, bool, bool, Option<Arc<IterationStateLock>>) -> S,
{
let Stream {
block: b1,
ctx: env,
} = self;
let Stream { block: b1, ctx } = self;
let Stream { block: b2, .. } = oth;

let batch_mode = b1.batch_mode;
Expand Down Expand Up @@ -239,7 +234,7 @@ where
b1.is_only_one_strategy = is_one_1;
b2.is_only_one_strategy = is_one_2;

let mut env_lock = env.lock();
let mut env_lock = ctx.lock();
let id_1 = b1.id;
let id_2 = b2.id;

Expand Down Expand Up @@ -270,21 +265,14 @@ where
_ => Scheduling::default(),
};

Stream {
block: new_block,
ctx: env,
}
Stream::new(ctx, new_block)
}

/// Clone the given block, taking care of connecting the new block to the same previous blocks
/// of the original one.
pub(crate) fn clone(&mut self) -> Self {
let new_block = self.ctx.lock().clone_block(&self.block);

Stream {
block: new_block,
ctx: self.ctx.clone(),
}
Stream::new(self.ctx.clone(), new_block)
}

/// Like `add_block` but without creating a new block. Therefore this closes the current stream
Expand Down

0 comments on commit 380ee22

Please sign in to comment.