diff --git a/benches/batch_mode.rs b/benches/batch_mode.rs index edcca6b0..935009c6 100644 --- a/benches/batch_mode.rs +++ b/benches/batch_mode.rs @@ -10,7 +10,7 @@ use noir_compute::RuntimeConfig; use noir_compute::StreamContext; fn batch_mode(batch_mode: BatchMode, dataset: &'static [u32]) { - let config = RuntimeConfig::local(4); + let config = RuntimeConfig::local(4).unwrap(); let env = StreamContext::new(config); let source = IteratorSource::new(dataset.iter().cloned()); diff --git a/benches/collatz.rs b/benches/collatz.rs index 192659ae..4e1e1852 100644 --- a/benches/collatz.rs +++ b/benches/collatz.rs @@ -16,7 +16,7 @@ fn bench_main(c: &mut Criterion) { g.throughput(Throughput::Elements(size)); g.bench_with_input(BenchmarkId::new("collatz", size), &size, |b, n| { b.iter(|| { - let env = StreamContext::default(); + let env = StreamContext::new_local(); env.stream_par_iter(0..*n) .batch_mode(BatchMode::fixed(1024)) .map(move |n| { diff --git a/benches/common.rs b/benches/common.rs index d4a22dcf..8418425c 100644 --- a/benches/common.rs +++ b/benches/common.rs @@ -1,7 +1,7 @@ #![allow(unused)] use criterion::{black_box, Bencher}; -use noir_compute::config::{HostConfig, RemoteConfig, RuntimeConfig}; +use noir_compute::config::{ConfigBuilder, HostConfig, RemoteConfig, RuntimeConfig}; use std::marker::PhantomData; use std::sync::atomic::{AtomicU16, Ordering}; use std::sync::Arc; @@ -36,12 +36,11 @@ pub fn remote_loopback_deploy( let mut join_handles = vec![]; let body = Arc::new(body); for host_id in 0..num_hosts { - let config = RuntimeConfig::Remote(RemoteConfig { - host_id: Some(host_id), - hosts: hosts.clone(), - tracing_dir: None, - cleanup_executable: false, - }); + let config = ConfigBuilder::new_remote() + .add_hosts(&hosts) + .host_id(host_id) + .build() + .unwrap(); let body = body.clone(); join_handles.push( @@ -100,6 +99,6 @@ where } pub fn noir_bench_default(b: &mut Bencher, logic: impl Fn(&StreamContext)) { - let builder = NoirBenchBuilder::new(StreamContext::default, logic); + let builder = NoirBenchBuilder::new(StreamContext::new_local, logic); b.iter_custom(|n| builder.bench(n)); } diff --git a/benches/connected.rs b/benches/connected.rs index 07305c12..581b0316 100644 --- a/benches/connected.rs +++ b/benches/connected.rs @@ -2,7 +2,6 @@ use criterion::BenchmarkId; use criterion::{criterion_group, criterion_main, Criterion, Throughput}; use fxhash::FxHashMap; use noir_compute::operator::Operator; -use noir_compute::RuntimeConfig; use noir_compute::Stream; use noir_compute::StreamContext; use rand::prelude::*; @@ -109,7 +108,7 @@ fn bench_main(c: &mut Criterion) { g.throughput(Throughput::Elements(size)); g.bench_with_input(BenchmarkId::new("connected", size), &size, |b, size| { b.iter(|| { - let env = StreamContext::new(RuntimeConfig::local(4)); + let env = StreamContext::new_local(); let edges = *size; let nodes = ((edges as f32).sqrt() * 25.) as u64 + 1; diff --git a/benches/fold_vs_reduce.rs b/benches/fold_vs_reduce.rs index e90d6e88..4048d582 100644 --- a/benches/fold_vs_reduce.rs +++ b/benches/fold_vs_reduce.rs @@ -8,7 +8,7 @@ use noir_compute::RuntimeConfig; use noir_compute::StreamContext; fn fold(dataset: &'static [u32]) { - let config = RuntimeConfig::local(4); + let config = RuntimeConfig::default(); let env = StreamContext::new(config); let source = IteratorSource::new(dataset.iter().cloned()); @@ -21,7 +21,7 @@ fn fold(dataset: &'static [u32]) { } fn reduce(dataset: &'static [u32]) { - let config = RuntimeConfig::local(4); + let config = RuntimeConfig::default(); let env = StreamContext::new(config); let source = IteratorSource::new(dataset.iter().cloned()); @@ -34,7 +34,7 @@ fn reduce(dataset: &'static [u32]) { } fn fold_assoc(dataset: &'static [u32]) { - let config = RuntimeConfig::local(4); + let config = RuntimeConfig::default(); let env = StreamContext::new(config); let source = IteratorSource::new(dataset.iter().cloned()); @@ -51,7 +51,7 @@ fn fold_assoc(dataset: &'static [u32]) { } fn reduce_assoc(dataset: &'static [u32]) { - let config = RuntimeConfig::local(4); + let config = RuntimeConfig::default(); let env = StreamContext::new(config); let source = IteratorSource::new(dataset.iter().cloned()); diff --git a/benches/nexmark.rs b/benches/nexmark.rs index 9a4ee8f4..a7719f85 100644 --- a/benches/nexmark.rs +++ b/benches/nexmark.rs @@ -365,7 +365,7 @@ fn bench_main(c: &mut Criterion) { ($q:expr, $n:expr) => {{ g.bench_with_input(BenchmarkId::new($q, $n), &$n, |b, size| { b.iter(|| { - let env = StreamContext::default(); + let env = StreamContext::new_local(); run_query(&env, $q, *size); env.execute_blocking(); }) diff --git a/benches/shuffle.rs b/benches/shuffle.rs index 748cf22f..cc3544a8 100644 --- a/benches/shuffle.rs +++ b/benches/shuffle.rs @@ -8,7 +8,7 @@ use noir_compute::BatchMode; use noir_compute::StreamContext; fn shuffle(dataset: &'static [u32]) { - let env = StreamContext::default(); + let env = StreamContext::new_local(); let source = IteratorSource::new(dataset.iter().cloned()); let stream = env diff --git a/benches/wordcount.rs b/benches/wordcount.rs index a4e9282c..74c130c4 100644 --- a/benches/wordcount.rs +++ b/benches/wordcount.rs @@ -58,7 +58,7 @@ fn wordcount_bench(c: &mut Criterion) { file.path(), |b, path| { b.iter(move || { - let env = StreamContext::default(); + let env = StreamContext::new_local(); wc_fold(&env, path); env.execute_blocking(); }) @@ -70,7 +70,7 @@ fn wordcount_bench(c: &mut Criterion) { file.path(), |b, path| { b.iter(move || { - let env = StreamContext::default(); + let env = StreamContext::new_local(); wc_fold_assoc(&env, path); env.execute_blocking(); }) @@ -82,7 +82,7 @@ fn wordcount_bench(c: &mut Criterion) { file.path(), |b, path| { b.iter(move || { - let env = StreamContext::default(); + let env = StreamContext::new_local(); wc_count_assoc(&env, path); env.execute_blocking(); }) @@ -94,7 +94,7 @@ fn wordcount_bench(c: &mut Criterion) { file.path(), |b, path| { b.iter(move || { - let env = StreamContext::default(); + let env = StreamContext::new_local(); wc_reduce(&env, path); env.execute_blocking(); }) @@ -106,7 +106,7 @@ fn wordcount_bench(c: &mut Criterion) { file.path(), |b, path| { b.iter(move || { - let env = StreamContext::default(); + let env = StreamContext::new_local(); wc_reduce_assoc(&env, path); env.execute_blocking(); }) @@ -118,7 +118,7 @@ fn wordcount_bench(c: &mut Criterion) { file.path(), |b, path| { b.iter(move || { - let env = StreamContext::default(); + let env = StreamContext::new_local(); wc_fast(&env, path); env.execute_blocking(); }) @@ -130,7 +130,7 @@ fn wordcount_bench(c: &mut Criterion) { file.path(), |b, path| { b.iter(move || { - let env = StreamContext::default(); + let env = StreamContext::new_local(); wc_fast_kstring(&env, path); env.execute_blocking(); }) diff --git a/config.toml b/config.toml new file mode 100644 index 00000000..e8bc3478 --- /dev/null +++ b/config.toml @@ -0,0 +1,10 @@ + +[[host]] +address = "host1" +base_port = 9500 +num_cores = 16 + +[[host]] +address = "host2" +base_port = 9500 +num_cores = 24 diff --git a/src/config.rs b/src/config.rs index c285040c..156f642a 100644 --- a/src/config.rs +++ b/src/config.rs @@ -2,6 +2,7 @@ //! //! See the documentation of [`RuntimeConfig`] for more details. +use std::env; use std::fmt::{Display, Formatter}; use std::path::Path; use std::path::PathBuf; @@ -36,7 +37,7 @@ pub const CONFIG_ENV_VAR: &str = "NOIR_CONFIG"; /// /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; -/// let config = RuntimeConfig::local(1); +/// let config = RuntimeConfig::local(2).unwrap(); /// let env = StreamContext::new(config); /// ``` /// @@ -82,6 +83,15 @@ pub enum RuntimeConfig { Remote(RemoteConfig), } +impl Default for RuntimeConfig { + fn default() -> Self { + let parallelism = std::thread::available_parallelism() + .map(|q| q.get()) + .unwrap_or(4); + RuntimeConfig::local(parallelism as u64).unwrap() + } +} + // #[derive(Debug, Clone, Eq, PartialEq)] // pub struct RuntimeConfig { // /// Which runtime to use for the environment. @@ -97,7 +107,7 @@ pub struct LocalConfig { /// The number of CPU cores of this host. /// /// A thread will be spawned for each core, for each block in the job graph. - pub num_cores: CoordUInt, + pub parallelism: CoordUInt, } /// This environment uses local threads and remote hosts. @@ -105,7 +115,7 @@ pub struct LocalConfig { pub struct RemoteConfig { /// The identifier for this host. #[serde(skip)] - pub host_id: Option, + host_id: Option, // TODO: remove option /// The set of remote hosts to use. #[serde(rename = "host")] pub hosts: Vec, @@ -161,16 +171,24 @@ pub struct SSHConfig { impl std::fmt::Debug for SSHConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("RemoteHostSSHConfig") - .field("ssh_port", &self.ssh_port) - .field("username", &self.username) - .field("password", &self.password.as_ref().map(|_| "REDACTED")) - .field("key_file", &self.key_file) - .field( - "key_passphrase", - &self.key_passphrase.as_ref().map(|_| "REDACTED"), - ) - .finish() + let mut d = f.debug_struct("Ssh"); + if self.ssh_port != 22 { + d.field("port", &self.ssh_port); + } + if let Some(username) = &self.username { + d.field("username", &username); + } + if let Some(key_file) = &self.key_file { + d.field("key_file", &key_file); + } + if self.password.is_some() { + d.field("password", &"REDACTED"); + } + if self.key_passphrase.is_some() { + d.field("key_passphrase", &"REDACTED"); + } + + d.finish() } } @@ -206,20 +224,20 @@ impl RuntimeConfig { opt.validate(); let mut args = opt.args; - args.insert(0, std::env::args().next().unwrap()); + args.insert(0, env::args().next().unwrap()); - if let Some(num_cores) = opt.local { - (Self::local(num_cores), args) + if let Some(parallelism) = opt.local { + (Self::local(parallelism).expect("Configuration error"), args) } else if let Some(remote) = opt.remote { - (Self::remote(remote).unwrap(), args) + (Self::remote(remote).expect("Configuration error"), args) } else { unreachable!("Invalid configuration") } } /// Local environment that avoid using the network and runs concurrently using only threads. - pub fn local(num_cores: CoordUInt) -> RuntimeConfig { - RuntimeConfig::Local(LocalConfig { num_cores }) + pub fn local(parallelism: CoordUInt) -> Result { + ConfigBuilder::new_local(parallelism) } /// Remote environment based on the provided configuration file. @@ -229,58 +247,17 @@ impl RuntimeConfig { /// If it's the runner, the configuration file is read. If it's a worker, the configuration is /// read directly from the environment variable and not from the file (remote hosts may not have /// the configuration file). - pub fn remote>(config: P) -> Result { - let mut config = if let Some(config) = RuntimeConfig::config_from_env() { - config - } else { - log::info!("reading config from: {}", config.as_ref().display()); - let content = std::fs::read_to_string(config)?; - toml::from_str(&content)? - }; - - // validate the configuration - for (host_id, host) in config.hosts.iter().enumerate() { - if host.ssh.password.is_some() && host.ssh.key_file.is_some() { - return Err(ConfigError::Invalid(format!("Malformed configuration: cannot specify both password and key file on host {}: {}", host_id, host.address))); - } - } + pub fn remote>(toml_path: P) -> Result { + let mut builder = ConfigBuilder::new_remote(); - config.host_id = RuntimeConfig::host_id_from_env(config.hosts.len().try_into().unwrap()); - log::debug!("runtime configuration: {config:#?}"); - Ok(RuntimeConfig::Remote(config)) - } - - /// Extract the host id from the environment variable, if present. - fn host_id_from_env(num_hosts: CoordUInt) -> Option { - let host_id = match std::env::var(HOST_ID_ENV_VAR) { - Ok(host_id) => host_id, - Err(_) => return None, - }; - let host_id = match HostId::from_str(&host_id) { - Ok(host_id) => host_id, - Err(e) => panic!("Invalid value for environment {HOST_ID_ENV_VAR}: {e:?}"), - }; - if host_id >= num_hosts { - panic!( - "Invalid value for environment {}: value too large, max possible is {}", - HOST_ID_ENV_VAR, - num_hosts - 1 - ); + if env::var(CONFIG_ENV_VAR).is_ok() { + builder.parse_env()?; + builder.host_id_from_env()?; + } else { + builder.parse_file(toml_path)?; } - Some(host_id) - } - /// Extract the configuration from the environment, if it's present. - fn config_from_env() -> Option { - match std::env::var(CONFIG_ENV_VAR) { - Ok(config) => { - info!("reading remote config from env {}", CONFIG_ENV_VAR); - let config: RemoteConfig = - toml::from_str(&config).expect("Invalid configuration from environment"); - Some(config) - } - Err(_) => None, - } + builder.build() } /// Spawn the remote workers via SSH and exit if this is the process that should spawn. If this @@ -320,11 +297,114 @@ impl CommandLineOptions { if !(self.remote.is_some() ^ self.local.is_some()) { panic!("Use one of --remote or --local"); } - if let Some(threads) = self.local { - if threads == 0 { - panic!("The number of cores should be positive"); + } +} +#[derive(Debug, Clone)] +pub struct ConfigBuilder { + host_id: Option, + hosts: Vec, + tracing_dir: Option, + cleanup_executable: bool, +} + +impl ConfigBuilder { + pub fn new_local(parallelism: CoordUInt) -> Result { + if parallelism == 0 { + Err(ConfigError::Invalid( + "The number of cores should be positive".into(), + )) + } else { + Ok(RuntimeConfig::Local(LocalConfig { parallelism })) + } + } + + pub fn new_remote() -> Self { + Self { + host_id: None, + hosts: Vec::new(), + tracing_dir: None, + cleanup_executable: false, + } + } + /// Parse toml and integrate it in the builder. + /// Hosts are appended to the list, the rest of the parameters set only if they were not present. + /// host_id is ignored. Configure it directly + pub fn parse_toml_str(&mut self, config_str: &str) -> Result<&mut Self, ConfigError> { + let RemoteConfig { + host_id: _, // Ignore serialized host_id + hosts, + tracing_dir, + cleanup_executable, + } = toml::from_str(config_str)?; + + // validate the configuration + for host in hosts.into_iter() { + if host.ssh.password.is_some() && host.ssh.key_file.is_some() { + return Err(ConfigError::Invalid(format!( + "Malformed configuration: cannot specify both password and key file on host {}", + host.address + ))); } + self.hosts.push(host); } + self.tracing_dir = self.tracing_dir.take().or(tracing_dir); + self.cleanup_executable |= cleanup_executable; + + Ok(self) + } + + /// Read toml file and integrate it in the builder. + /// Hosts are appended to the list, the rest of the parameters set only if they were not present. + pub fn parse_file(&mut self, toml_path: impl AsRef) -> Result<&mut Self, ConfigError> { + let content = std::fs::read_to_string(toml_path)?; + self.parse_toml_str(&content) + } + + pub fn add_hosts(&mut self, hosts: &[HostConfig]) -> &mut Self { + self.hosts.extend_from_slice(hosts); + self + } + + /// Read toml from env variable [CONFIG_ENV_VAR] and integrate it in the builder. + /// Hosts are appended to the list, the rest of the parameters set only if they were not present. + pub fn parse_env(&mut self) -> Result<&mut Self, ConfigError> { + let config_str = env::var(CONFIG_ENV_VAR) + .map_err(|e| ConfigError::Environment(CONFIG_ENV_VAR.to_string(), e))?; + self.parse_toml_str(&config_str) + } + + pub fn host_id(&mut self, host_id: HostId) -> &mut Self { + self.host_id = Some(host_id); + self + } + + /// Extract the host id from the environment variable [HOST_ID_ENV_VAR]. + pub fn host_id_from_env(&mut self) -> Result<&mut Self, ConfigError> { + let host_id = env::var(HOST_ID_ENV_VAR) + .map_err(|e| ConfigError::Environment(HOST_ID_ENV_VAR.to_string(), e))?; + let host_id = HostId::from_str(&host_id) + .map_err(|_| ConfigError::Invalid("host_id must be an integer".into()))?; + self.host_id = Some(host_id); + Ok(self) + } + + pub fn build(&mut self) -> Result { + if let Some(host_id) = self.host_id { + let num_hosts = self.hosts.len() as u64; + if host_id >= num_hosts { + return Err(ConfigError::Invalid(format!( + "invalid host_id, must be between 0 and the number of hosts - 1: (0..{num_hosts})", + ))); + } + }; + + let conf = RuntimeConfig::Remote(RemoteConfig { + host_id: self.host_id, + hosts: self.hosts.clone(), + tracing_dir: self.tracing_dir.clone(), + cleanup_executable: self.cleanup_executable, + }); + Ok(conf) } } @@ -343,4 +423,7 @@ pub enum ConfigError { #[error("Invalid configuration: {0}")] Invalid(String), + + #[error("Missing environment variable {0}: {1}")] + Environment(String, env::VarError), } diff --git a/src/environment.rs b/src/environment.rs index 691cc8c0..496add7f 100644 --- a/src/environment.rs +++ b/src/environment.rs @@ -1,7 +1,6 @@ use parking_lot::Mutex; use std::any::TypeId; use std::sync::Arc; -use std::thread::available_parallelism; use crate::block::{Block, Scheduling}; use crate::config::RuntimeConfig; @@ -45,14 +44,6 @@ pub struct StreamContext { inner: Arc>, } -impl Default for StreamContext { - fn default() -> Self { - Self::new(RuntimeConfig::local( - available_parallelism().map(|q| q.get()).unwrap_or(1) as u64, - )) - } -} - impl StreamContext { /// Construct a new environment from the config. pub fn new(config: RuntimeConfig) -> Self { @@ -62,15 +53,21 @@ impl StreamContext { } } + pub fn new_local() -> Self { + let parallelism = std::thread::available_parallelism() + .map(|q| q.get()) + .unwrap_or(4); + let conf = RuntimeConfig::local(parallelism as u64).unwrap(); + Self::new(conf) + } + /// Construct a new stream bound to this environment starting with the specified source. pub fn stream(&self, source: S) -> Stream where S: Source + Send + 'static, { let mut inner = self.inner.lock(); - if let RuntimeConfig::Remote(remote) = &inner.config { - assert!(remote.host_id.is_some(), "remote config must be started using RuntimeConfig::spawn_remote_workers(). (Or initialize `host_id` correctly)"); - } + assert!(inner.config.host_id().is_some(), "remote config must be started using RuntimeConfig::spawn_remote_workers(). (Or initialize `host_id` correctly)"); let block = inner.new_block(source, Default::default(), Default::default()); Stream::new(self.inner.clone(), block) @@ -103,7 +100,7 @@ impl StreamContext { /// Get the total number of processing cores in the cluster. pub fn parallelism(&self) -> CoordUInt { match &self.inner.lock().config { - RuntimeConfig::Local(local) => local.num_cores, + RuntimeConfig::Local(local) => local.parallelism, RuntimeConfig::Remote(remote) => remote.hosts.iter().map(|h| h.num_cores).sum(), } } diff --git a/src/network/topology.rs b/src/network/topology.rs index 73bd8066..46b2ddef 100644 --- a/src/network/topology.rs +++ b/src/network/topology.rs @@ -578,7 +578,7 @@ mod tests { #[test] fn test_local_topology() { - let config = RuntimeConfig::local(4); + let config = RuntimeConfig::local(4).unwrap(); let mut topology = NetworkTopology::new(config); // s1 [b0, h0] -> r1 [b2, h0] (endpoint 1) type=i32 @@ -636,7 +636,9 @@ mod tests { #[cfg(not(feature = "tokio"))] #[test] fn test_remote_topology() { - let mut config = tempfile::NamedTempFile::new().unwrap(); + use crate::config::ConfigBuilder; + + let mut toml_path = tempfile::NamedTempFile::new().unwrap(); let config_toml = r#"[[host]] address = "127.0.0.1" base_port = 21841 @@ -646,8 +648,7 @@ address = "127.0.0.1" base_port = 31258 num_cores = 1 "#; - std::io::Write::write_all(&mut config, config_toml.as_bytes()).unwrap(); - let config = RuntimeConfig::remote(config.path()).unwrap(); + std::io::Write::write_all(&mut toml_path, config_toml.as_bytes()).unwrap(); // s1 [b0, h0, r0] -> r1 [b2, h1, r0] (endpoint 1) type=i32 // s2 [b0, h1, r0] -> r1 [b2, h1, r0] (endpoint 1) type=i32 @@ -656,11 +657,8 @@ num_cores = 1 // s3 [b1, h0, r0] -> r2 [b2, h1, r1] (endpoint 3) type=u64 // s4 [b1, h0, r1] -> r2 [b2, h1, r1] (endpoint 3) type=u64 - let run = |mut config: RuntimeConfig, host: HostId| { - if let RuntimeConfig::Remote(remote) = &mut config { - remote.host_id = Some(host); - } - + let run = |config: RuntimeConfig| { + let host = config.host_id().unwrap(); let mut topology = NetworkTopology::new(config); let s1 = Coord::new(0, 0, 0); @@ -760,14 +758,27 @@ num_cores = 1 topology.stop_and_wait(); }; - let config0 = config.clone(); + let config0 = ConfigBuilder::new_remote() + .parse_file(toml_path.path()) + .unwrap() + .host_id(0) + .build() + .unwrap(); + + let config1 = ConfigBuilder::new_remote() + .parse_file(toml_path.path()) + .unwrap() + .host_id(1) + .build() + .unwrap(); + let join0 = std::thread::Builder::new() .name("host0".into()) - .spawn(move || run(config0, 0)) + .spawn(move || run(config0)) .unwrap(); let join1 = std::thread::Builder::new() .name("host1".into()) - .spawn(move || run(config, 1)) + .spawn(move || run(config1)) .unwrap(); join0.join().unwrap(); @@ -791,7 +802,7 @@ num_cores = 1 #[test] fn test_multiple_output_types() { - let config = RuntimeConfig::local(4); + let config = RuntimeConfig::local(4).unwrap(); let mut topology = NetworkTopology::new(config); // s1 [b0, h0] -> r1 [b1, h0] (endpoint 1) type=i32 diff --git a/src/operator/batch_mode.rs b/src/operator/batch_mode.rs index 6aeaaa06..23ade492 100644 --- a/src/operator/batch_mode.rs +++ b/src/operator/batch_mode.rs @@ -9,7 +9,7 @@ mod tests { #[test] fn batch_mode_fixed() { - let env = StreamContext::new(RuntimeConfig::local(4)); + let env = StreamContext::new(RuntimeConfig::local(4).unwrap()); let source = FakeOperator::::empty(); let batch_mode = BatchMode::fixed(42); let stream = env.stream(source).batch_mode(batch_mode); @@ -18,7 +18,7 @@ mod tests { #[test] fn batch_mode_adaptive() { - let env = StreamContext::new(RuntimeConfig::local(4)); + let env = StreamContext::new(RuntimeConfig::local(4).unwrap()); let source = FakeOperator::::empty(); let batch_mode = BatchMode::adaptive(42, Duration::from_secs(42)); let stream = env.stream(source).batch_mode(batch_mode); @@ -27,7 +27,7 @@ mod tests { #[test] fn batch_inherit_from_previous() { - let env = StreamContext::new(RuntimeConfig::local(4)); + let env = StreamContext::new(RuntimeConfig::local(4).unwrap()); let source = FakeOperator::::empty(); let batch_mode = BatchMode::adaptive(42, Duration::from_secs(42)); let stream = env.stream(source).batch_mode(batch_mode).group_by(|_| 0); diff --git a/src/operator/iteration/iterate.rs b/src/operator/iteration/iterate.rs index 4f1bade2..7ebe11dc 100644 --- a/src/operator/iteration/iterate.rs +++ b/src/operator/iteration/iterate.rs @@ -341,7 +341,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..3).shuffle(); /// let (state, items) = s.iterate( /// 3, // at most 3 iterations @@ -356,7 +356,9 @@ where /// env.execute_blocking(); /// /// assert_eq!(state.get().unwrap(), vec![10 + 11 + 12 + 20 + 21 + 22 + 30 + 31 + 32]); - /// assert_eq!(items.get().unwrap(), vec![30, 31, 32]); + /// let mut sorted = items.get().unwrap(); + /// sorted.sort(); + /// assert_eq!(sorted, vec![30, 31, 32]); /// ``` pub fn iterate< Body, diff --git a/src/operator/iteration/replay.rs b/src/operator/iteration/replay.rs index 6d31867b..ddb7fcda 100644 --- a/src/operator/iteration/replay.rs +++ b/src/operator/iteration/replay.rs @@ -238,7 +238,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..3).shuffle(); /// let state = s.replay( /// 3, // at most 3 iterations diff --git a/src/operator/join/mod.rs b/src/operator/join/mod.rs index d3d3585e..22991b42 100644 --- a/src/operator/join/mod.rs +++ b/src/operator/join/mod.rs @@ -101,7 +101,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s1 = env.stream_iter(0..5u8); /// let s2 = env.stream_iter(0..5i32); /// let res = s1.join(s2, |n| (n % 5) as i32, |n| n % 2).drop_key().collect_vec(); @@ -149,7 +149,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s1 = env.stream_iter(0..5u8); /// let s2 = env.stream_iter(0..5i32); /// let res = s1.left_join(s2, |n| (n % 5) as i32, |n| n % 2).drop_key().collect_vec(); @@ -198,7 +198,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s1 = env.stream_iter(0..5u8); /// let s2 = env.stream_iter(0..5i32); /// let res = s1.outer_join(s2, |n| (n % 5) as i32, |n| n % 2).drop_key().collect_vec(); @@ -250,7 +250,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s1 = env.stream_iter(0..5u8); /// let s2 = env.stream_iter(0..5i32); /// let j = s1.join_with(s2, |n| (n % 5) as i32, |n| n % 2).ship_hash(); @@ -259,7 +259,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s1 = env.stream_iter(0..5u8); /// let s2 = env.stream_iter(0..5i32); /// let j = s1.join_with(s2, |n| (n % 5) as i32, |n| n % 2).ship_broadcast_right(); diff --git a/src/operator/merge.rs b/src/operator/merge.rs index 880d77fb..e8ca64a5 100644 --- a/src/operator/merge.rs +++ b/src/operator/merge.rs @@ -27,7 +27,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s1 = env.stream_iter(0..10); /// let s2 = env.stream_iter(10..20); /// let res = s1.merge(s2).collect_vec(); diff --git a/src/operator/mod.rs b/src/operator/mod.rs index 7b3a22f8..f2fff973 100644 --- a/src/operator/mod.rs +++ b/src/operator/mod.rs @@ -309,7 +309,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// use noir_compute::operator::Timestamp; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// /// let s = env.stream_iter(0..10); /// s.add_timestamps( @@ -345,7 +345,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// use noir_compute::BatchMode; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// /// let s = env.stream_iter(0..10); /// s.batch_mode(BatchMode::fixed(1024)); @@ -365,7 +365,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..10); /// let res = s.filter_map(|n| if n % 2 == 0 { Some(n * 3) } else { None }).collect_vec(); /// @@ -390,7 +390,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..10); /// let res = s.filter(|&n| n % 2 == 0).collect_vec(); /// @@ -432,7 +432,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter((std::array::IntoIter::new([1, 2, -5, 3, 1]))); /// let res = s.rich_filter_map({ /// let mut sum = 0; @@ -478,7 +478,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(1..=5); /// let res = s.rich_map({ /// let mut sum = 0; @@ -499,7 +499,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(1..=5); /// let res = s.rich_map({ /// let mut id = 0; @@ -532,7 +532,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..5); /// let res = s.map(|n| n * 10).collect_vec(); /// @@ -564,7 +564,7 @@ where /// # .unwrap() /// # .block_on(base()); /// # async fn base() { - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(5..15); /// let res = s.map_async_memo_by( /// |n| async move {(n * n) % 7}, |n| n % 7, 5 @@ -629,7 +629,7 @@ where /// # .unwrap() /// # .block_on(base()); /// # async fn base() { - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(5..15); /// let res = s.map_async(|n| async move {(n * n) % 7}).collect_vec(); /// env.execute().await; @@ -658,7 +658,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(5..15); /// let res = s.map_memo_by(|n| (n * n) % 7, |n| n % 7, 5).collect_vec(); /// @@ -706,7 +706,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..5); /// let res = s.fold(0, |acc, value| *acc += value).collect_vec(); /// @@ -752,7 +752,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..5); /// let res = s.fold_assoc(0, |acc, value| *acc += value, |acc, value| *acc += value).collect_vec(); /// @@ -799,7 +799,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..5); /// let res = s /// .group_by_fold(|&n| n % 2, 0, |acc, value| *acc += value, |acc, value| *acc += value) @@ -890,7 +890,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..5); /// let res = s.key_by(|&n| n % 2).collect_vec(); /// @@ -915,7 +915,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..5); /// s.inspect(|n| println!("Item: {}", n)).for_each(std::mem::drop); /// @@ -947,7 +947,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..=3); /// let res = s.rich_flat_map({ /// let mut elements = Vec::new(); @@ -1011,7 +1011,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..3); /// let res = s.flat_map(|n| vec![n, n]).collect_vec(); /// @@ -1036,7 +1036,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..5); /// s.for_each(|n| println!("Item: {}", n)); /// @@ -1059,7 +1059,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter((vec![ /// vec![1, 2, 3], /// vec![], @@ -1098,7 +1098,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..10); /// s.broadcast(); /// ``` @@ -1124,7 +1124,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..5); /// let keyed = s.group_by(|&n| n % 2); // partition even and odd elements /// ``` @@ -1157,7 +1157,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..5); /// let res = s /// .group_by_max_element(|&n| n % 2, |&n| n) @@ -1206,7 +1206,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..5); /// let res = s /// .group_by_sum(|&n| n % 2, |n| n) @@ -1270,7 +1270,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..5); /// let res = s /// .group_by_avg(|&n| n % 2, |&n| n as f64) @@ -1333,7 +1333,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..5); /// let res = s /// .group_by_count(|&n| n % 2) @@ -1375,7 +1375,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..5); /// let res = s /// .group_by_min_element(|&n| n % 2, |&n| n) @@ -1432,7 +1432,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..5); /// let res = s /// .group_by_reduce(|&n| n % 2, |acc, value| *acc += value) @@ -1573,7 +1573,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..5); /// let res = s.reduce(|a, b| a + b).collect::>(); /// @@ -1618,7 +1618,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..5); /// let res = s.reduce_assoc(|a, b| a + b).collect_vec(); /// @@ -1658,7 +1658,7 @@ where /// /// ``` /// # use noir_compute::prelude::*; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// # let s = env.stream_iter(0..10); /// let mut routes = s.route() /// .add_route(|&i| i < 5) @@ -1689,7 +1689,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..5); /// let res = s.shuffle(); /// ``` @@ -1708,7 +1708,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..5); /// let mut splits = s.split(3); /// let a = splits.pop().unwrap(); @@ -1744,7 +1744,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s1 = env.stream_iter((vec!['A', 'B', 'C', 'D'].into_iter())); /// let s2 = env.stream_iter((vec![1, 2, 3].into_iter())); /// let res = s1.zip(s2).collect_vec(); @@ -1783,7 +1783,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..10u32); /// let rx = s.collect_channel(); /// @@ -1814,7 +1814,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..10u32); /// let rx = s.collect_channel(); /// @@ -1846,7 +1846,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..10); /// let res = s.collect_vec(); /// @@ -1877,7 +1877,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..10); /// let res = s.collect_vec(); /// @@ -1907,7 +1907,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..10); /// let res = s.collect_vec(); /// @@ -1937,7 +1937,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..10); /// let res = s.collect_vec(); /// @@ -1966,7 +1966,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..10); /// let res = s.collect_vec(); /// @@ -2003,7 +2003,7 @@ where /// # .unwrap() /// # .block_on(base()); /// # async fn base() { - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter((0..4).cycle().take(10)); /// let res = s.map_async_memo(|n| async move {n * n}, 100).collect_vec(); /// env.execute().await; @@ -2040,7 +2040,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter((0..4).cycle().take(10)); /// let res = s.map_memo(|n| n * n, 5).collect_vec(); /// @@ -2087,7 +2087,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// use noir_compute::operator::Timestamp; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// /// let s = env.stream_iter(0..10); /// s @@ -2126,7 +2126,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// use noir_compute::BatchMode; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// /// let s = env.stream_iter(0..10).group_by(|&n| n % 2); /// s.batch_mode(BatchMode::fixed(1024)); @@ -2146,7 +2146,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..10).group_by(|&n| n % 2); /// let res = s.filter_map(|(_key, n)| if n % 3 == 0 { Some(n * 4) } else { None }).collect_vec(); /// @@ -2175,7 +2175,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..10).group_by(|&n| n % 2); /// let res = s.filter(|&(_key, n)| n % 3 == 0).collect_vec(); /// @@ -2202,7 +2202,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..3).group_by(|&n| n % 2); /// let res = s.flat_map(|(_key, n)| vec![n, n]).collect_vec(); /// @@ -2230,7 +2230,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..5).group_by(|&n| n % 2); /// s.inspect(|(key, n)| println!("Item: {} has key {}", n, key)).for_each(std::mem::drop); /// @@ -2268,7 +2268,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..5).group_by(|&n| n % 2); /// let res = s /// .fold(0, |acc, value| *acc += value) @@ -2313,7 +2313,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..5).group_by(|&n| n % 2); /// let res = s /// .reduce(|acc, value| *acc += value) @@ -2346,7 +2346,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..5).group_by(|&n| n % 2); /// let res = s.map(|(_key, n)| 10 * n).collect_vec(); /// @@ -2437,7 +2437,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let stream = env.stream_iter(0..4).group_by(|&n| n % 2); /// let res = stream.unkey().collect_vec(); /// @@ -2459,7 +2459,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let stream = env.stream_iter(0..4).group_by(|&n| n % 2); /// let res = stream.drop_key().collect_vec(); /// @@ -2480,7 +2480,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..5).group_by(|&n| n % 2); /// s.for_each(|(key, n)| println!("Item: {} has key {}", n, key)); /// @@ -2540,7 +2540,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s1 = env.stream_iter(0..3).group_by(|&n| n % 2); /// let s2 = env.stream_iter(3..5).group_by(|&n| n % 2); /// let res = s1.merge(s2).collect_vec(); @@ -2585,7 +2585,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..5); /// let res = s.shuffle(); /// ``` @@ -2608,7 +2608,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..10u32); /// let rx = s.collect_channel(); /// @@ -2635,7 +2635,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..10u32); /// let rx = s.collect_channel(); /// @@ -2666,7 +2666,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..3).group_by(|&n| n % 2); /// let res = s.collect_vec(); /// @@ -2696,7 +2696,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..3).group_by(|&n| n % 2); /// let res = s.collect_vec_all(); /// @@ -2725,7 +2725,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..3).group_by(|&n| n % 2); /// let res = s.collect_vec(); /// @@ -2753,7 +2753,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..3).group_by(|&n| n % 2); /// let res = s.collect_vec(); /// @@ -2786,7 +2786,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env /// .stream_iter((vec![ /// vec![0, 1, 2], diff --git a/src/operator/replication.rs b/src/operator/replication.rs index f126a9e0..d9c04b19 100644 --- a/src/operator/replication.rs +++ b/src/operator/replication.rs @@ -7,7 +7,7 @@ mod tests { #[test] fn test_replication() { - let env = StreamContext::new(RuntimeConfig::local(4)); + let env = StreamContext::new(RuntimeConfig::local(4).unwrap()); let operator = FakeOperator::::empty(); let stream = env.stream(operator); let old_block_id = stream.block.id; @@ -20,3 +20,4 @@ mod tests { assert_ne!(old_block_id, new_block_id); } } +// TODO: Actual meaningful tests diff --git a/src/operator/route.rs b/src/operator/route.rs index 363abb4a..70e0ae3a 100644 --- a/src/operator/route.rs +++ b/src/operator/route.rs @@ -305,7 +305,7 @@ mod tests { #[test] #[allow(clippy::identity_op)] fn test_route() { - let env = StreamContext::new(RuntimeConfig::local(1)); + let env = StreamContext::new(RuntimeConfig::local(2).unwrap()); let s = env.stream_iter(0..10); let mut routes = s @@ -317,17 +317,14 @@ mod tests { assert_eq!(routes.len(), 2); // 0 1 2 3 4 - routes - .next() - .unwrap() - .for_each(|i| eprintln!("route1: {i}")); + let r1 = routes.next().unwrap().collect_vec(); // 6 8 - routes - .next() - .unwrap() - .for_each(|i| eprintln!("route2: {i}")); + let r2 = routes.next().unwrap().collect_vec(); // 5 7 9 ignored env.execute_blocking(); + + assert_eq!(&[0, 1, 2, 3, 4], r1.get().unwrap().as_slice()); + assert_eq!(&[6, 8], r2.get().unwrap().as_slice()); } } diff --git a/src/operator/sink/collect.rs b/src/operator/sink/collect.rs index 95da701c..cd1f3631 100644 --- a/src/operator/sink/collect.rs +++ b/src/operator/sink/collect.rs @@ -104,7 +104,7 @@ mod qtests { #[test] fn collect_vec() { - let env = StreamContext::new(RuntimeConfig::local(4)); + let env = StreamContext::new(RuntimeConfig::local(4).unwrap()); let source = source::IteratorSource::new(0..10u8); let res = env.stream(source).collect::>(); env.execute_blocking(); @@ -113,7 +113,7 @@ mod qtests { #[test] fn collect_set() { - let env = StreamContext::new(RuntimeConfig::local(4)); + let env = StreamContext::new(RuntimeConfig::local(4).unwrap()); let source = source::IteratorSource::new(0..10u8); let res = env.stream(source).collect::>(); env.execute_blocking(); diff --git a/src/operator/sink/collect_channel.rs b/src/operator/sink/collect_channel.rs index 765e5dc4..5975cd52 100644 --- a/src/operator/sink/collect_channel.rs +++ b/src/operator/sink/collect_channel.rs @@ -82,7 +82,7 @@ mod tests { #[test] fn collect_channel() { - let env = StreamContext::new(RuntimeConfig::local(4)); + let env = StreamContext::new(RuntimeConfig::local(4).unwrap()); let source = source::IteratorSource::new(0..10u8); let rx = env.stream(source).collect_channel(); env.execute_blocking(); diff --git a/src/operator/sink/collect_count.rs b/src/operator/sink/collect_count.rs index 38367cc8..d102ec79 100644 --- a/src/operator/sink/collect_count.rs +++ b/src/operator/sink/collect_count.rs @@ -95,7 +95,7 @@ mod tests { #[test] fn collect_vec() { - let env = StreamContext::new(RuntimeConfig::local(4)); + let env = StreamContext::new(RuntimeConfig::local(4).unwrap()); let source = source::IteratorSource::new(0..10u8); let res = env.stream(source).collect_vec(); env.execute_blocking(); diff --git a/src/operator/sink/collect_vec.rs b/src/operator/sink/collect_vec.rs index 5b0bb12b..57249969 100644 --- a/src/operator/sink/collect_vec.rs +++ b/src/operator/sink/collect_vec.rs @@ -99,7 +99,7 @@ mod tests { #[test] fn collect_vec() { - let env = StreamContext::new(RuntimeConfig::local(4)); + let env = StreamContext::new(RuntimeConfig::local(4).unwrap()); let source = source::IteratorSource::new(0..10u8); let res = env.stream(source).collect_vec(); env.execute_blocking(); diff --git a/src/operator/sink/for_each.rs b/src/operator/sink/for_each.rs index b713cf70..de9623ca 100644 --- a/src/operator/sink/for_each.rs +++ b/src/operator/sink/for_each.rs @@ -80,7 +80,7 @@ mod tests { #[test] fn for_each() { - let env = StreamContext::new(RuntimeConfig::local(4)); + let env = StreamContext::new(RuntimeConfig::local(4).unwrap()); let source = source::IteratorSource::new(0..10u8); let sum = Arc::new(AtomicU8::new(0)); let sum2 = sum.clone(); @@ -93,7 +93,7 @@ mod tests { #[test] fn for_each_keyed() { - let env = StreamContext::new(RuntimeConfig::local(4)); + let env = StreamContext::new(RuntimeConfig::local(4).unwrap()); let source = source::IteratorSource::new(0..10u8); let sum = Arc::new(AtomicU8::new(0)); let sum2 = sum.clone(); diff --git a/src/operator/source/async_stream.rs b/src/operator/source/async_stream.rs index 2836d9fe..6d75e26d 100644 --- a/src/operator/source/async_stream.rs +++ b/src/operator/source/async_stream.rs @@ -50,7 +50,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::AsyncStreamSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let stream = futures::stream::iter(0..10u32); /// let source = AsyncStreamSource::new(stream); /// let s = env.stream(source); diff --git a/src/operator/source/channel.rs b/src/operator/source/channel.rs index 008773c3..65fd918b 100644 --- a/src/operator/source/channel.rs +++ b/src/operator/source/channel.rs @@ -41,7 +41,7 @@ impl ChannelSource { /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::ChannelSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let (tx_channel, source) = ChannelSource::new(4); /// let R = env.stream(source); /// tx_channel.send(1); diff --git a/src/operator/source/csv.rs b/src/operator/source/csv.rs index e590e703..4eee40a6 100644 --- a/src/operator/source/csv.rs +++ b/src/operator/source/csv.rs @@ -129,7 +129,7 @@ impl Deserialize<'a>> CsvSource { /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::CsvSource; /// # use serde::{Deserialize, Serialize}; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// #[derive(Clone, Deserialize, Serialize)] /// struct Thing { /// what: String, @@ -456,7 +456,7 @@ mod tests { write!(file.as_file(), "{},{}{}", i, i + 1, terminator).unwrap(); } - let env = StreamContext::new(RuntimeConfig::local(4)); + let env = StreamContext::new(RuntimeConfig::local(4).unwrap()); let source = CsvSource::<(i32, i32)>::new(file.path()).has_headers(false); let res = env.stream(source).shuffle().collect_vec(); env.execute_blocking(); @@ -484,7 +484,7 @@ mod tests { write!(file.as_file(), "{},{}{}", i, i + 1, terminator).unwrap(); } - let env = StreamContext::new(RuntimeConfig::local(4)); + let env = StreamContext::new(RuntimeConfig::local(4).unwrap()); let source = CsvSource::::new(file.path()); let res = env.stream(source).shuffle().collect_vec(); env.execute_blocking(); diff --git a/src/operator/source/file.rs b/src/operator/source/file.rs index 6fa84ffa..9dbda445 100644 --- a/src/operator/source/file.rs +++ b/src/operator/source/file.rs @@ -48,7 +48,7 @@ impl FileSource { /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::FileSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let source = FileSource::new("/datasets/huge.txt"); /// let s = env.stream(source); /// ``` diff --git a/src/operator/source/iterator.rs b/src/operator/source/iterator.rs index 098836a7..68d8c7f3 100644 --- a/src/operator/source/iterator.rs +++ b/src/operator/source/iterator.rs @@ -48,7 +48,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let source = IteratorSource::new((0..5)); /// let s = env.stream(source); /// ``` diff --git a/src/operator/source/parallel_iterator.rs b/src/operator/source/parallel_iterator.rs index 898a0ba9..d3cd1183 100644 --- a/src/operator/source/parallel_iterator.rs +++ b/src/operator/source/parallel_iterator.rs @@ -207,7 +207,7 @@ impl crate::StreamContext { /// ``` /// use noir_compute::prelude::*; /// - /// let env = StreamContext::default(); + /// let env = StreamContext::new_local(); /// /// env.stream_par_iter(0..10) /// .for_each(|q| println!("a: {q}")); @@ -257,7 +257,7 @@ where /// ``` /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::ParallelIteratorSource; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// // generate the numbers from 0 to 99 using multiple replicas /// let n = 100; /// let source = ParallelIteratorSource::new(move |id, instances| { diff --git a/src/operator/window/aggr/fold.rs b/src/operator/window/aggr/fold.rs index 8b3cdc58..8d244edc 100644 --- a/src/operator/window/aggr/fold.rs +++ b/src/operator/window/aggr/fold.rs @@ -109,7 +109,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # use noir_compute::operator::window::CountWindow; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..5); /// let res = s /// .group_by(|&n| n % 2) diff --git a/src/operator/window/mod.rs b/src/operator/window/mod.rs index 29d163a8..3299537c 100644 --- a/src/operator/window/mod.rs +++ b/src/operator/window/mod.rs @@ -294,7 +294,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # use noir_compute::operator::window::CountWindow; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..9); /// let res = s /// .group_by(|&n| n % 2) @@ -337,7 +337,7 @@ where /// # use noir_compute::{StreamContext, RuntimeConfig}; /// # use noir_compute::operator::source::IteratorSource; /// # use noir_compute::operator::window::CountWindow; - /// # let mut env = StreamContext::new(RuntimeConfig::local(1)); + /// # let mut env = StreamContext::new_local(); /// let s = env.stream_iter(0..5usize); /// let res = s /// .window_all(CountWindow::tumbling(2)) diff --git a/src/runner.rs b/src/runner.rs index 1032e2a0..1ea95fb7 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -147,7 +147,7 @@ fn remote_worker( if host.ssh.username.is_none() { host.ssh.username = Some(whoami::username()); } - info!("starting remote worker for host {}: {:#?}", host_id, host); + info!("starting remote worker for host {}: {:?}", host_id, host); // connect to the ssh server let address = (host.address.as_str(), host.ssh.ssh_port); diff --git a/src/scheduler.rs b/src/scheduler.rs index 8ed0b5b7..09a5d2fc 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -351,7 +351,7 @@ impl Scheduler { OperatorChain: Operator, { let replication = block.scheduling.replication; - let instances = replication.clamp(local.num_cores); + let instances = replication.clamp(local.parallelism); log::debug!( "local (b{:02}): {{ replicas: {:2}, replication: {:?}, only_one: {} }}", block.id, @@ -459,7 +459,7 @@ mod tests { #[test] #[should_panic(expected = "Some streams do not have a sink attached")] fn test_scheduler_panic_on_missing_sink() { - let env = StreamContext::new(RuntimeConfig::local(4)); + let env = StreamContext::new(RuntimeConfig::local(4).unwrap()); let source = IteratorSource::new(vec![1, 2, 3].into_iter()); let _stream = env.stream(source); env.execute_blocking(); @@ -468,7 +468,7 @@ mod tests { #[test] #[should_panic(expected = "Some streams do not have a sink attached")] fn test_scheduler_panic_on_missing_sink_shuffle() { - let env = StreamContext::new(RuntimeConfig::local(4)); + let env = StreamContext::new(RuntimeConfig::local(4).unwrap()); let source = IteratorSource::new(vec![1, 2, 3].into_iter()); let _stream = env.stream(source).shuffle(); env.execute_blocking(); diff --git a/src/test.rs b/src/test.rs index d002de0a..97682f8b 100644 --- a/src/test.rs +++ b/src/test.rs @@ -79,7 +79,7 @@ impl FakeNetworkTopology { /// Build a fake network topology for a single replica (with coord b0 h0 r0), that receives data /// of type `T` from `num_prev_blocks`, each with `instances_per_block` replicas. pub fn new(num_prev_blocks: CoordUInt, instances_per_block: CoordUInt) -> Self { - let config = RuntimeConfig::local(1); + let config = RuntimeConfig::local(1).unwrap(); let mut topology = NetworkTopology::new(config); let dest = Coord::new(0, 0, 0); diff --git a/tests/utils.rs b/tests/utils.rs index a85b4210..f4acf09a 100644 --- a/tests/utils.rs +++ b/tests/utils.rs @@ -10,7 +10,7 @@ use std::time::Duration; use itertools::{process_results, Itertools}; -use noir_compute::config::{HostConfig, RemoteConfig, RuntimeConfig}; +use noir_compute::config::{ConfigBuilder, HostConfig, RuntimeConfig}; use noir_compute::operator::{Data, Operator, StreamElement, Timestamp}; use noir_compute::structure::BlockStructure; use noir_compute::CoordUInt; @@ -136,7 +136,7 @@ impl TestHelper { /// Run the test body under a local environment. pub fn local_env(body: Arc, num_cores: CoordUInt) { Self::setup(); - let config = RuntimeConfig::local(num_cores); + let config = RuntimeConfig::local(num_cores).unwrap(); log::debug!("Running test with env: {:?}", config); Self::env_with_config(config, body) } @@ -165,12 +165,11 @@ impl TestHelper { let mut join_handles = vec![]; for host_id in 0..num_hosts { - let config = RuntimeConfig::Remote(RemoteConfig { - host_id: Some(host_id), - hosts: hosts.clone(), - tracing_dir: None, - cleanup_executable: true, - }); + let config = ConfigBuilder::new_remote() + .add_hosts(&hosts) + .host_id(host_id) + .build() + .unwrap(); let body = body.clone(); join_handles.push( std::thread::Builder::new()