Skip to content

Commit

Permalink
Remove anyhow and itertools dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
imDema committed Mar 14, 2024
1 parent e8c6b0c commit 9e80db8
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 40 deletions.
7 changes: 0 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,9 @@ serde_json = "1.0.114"
bincode = "1.3.3"
toml = "0.8.11"

# handy Result type
anyhow = "1.0.81"
thiserror = "1.0.58"

# handy iterators functions
itertools = "0.12.1"

# for storing non-mutable static variables with non-trivial initialization
once_cell = "1.19.0"
Expand Down Expand Up @@ -91,6 +88,7 @@ criterion = { version = "0.5.1", features = ["html_reports"] }
fake = "2.9.2"
mimalloc = { version = "0.1.39", default-features = false }
tracing-subscriber = "0.3.18"
itertools = "0.12.1"

micrometer = { version = "0.2.7", features = ["enable"]}

Expand Down
33 changes: 23 additions & 10 deletions src/block/graph_generator.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use std::collections::HashMap;

use itertools::Itertools;
use indexmap::IndexMap;

use crate::{
block::{BlockStructure, ConnectionStrategy, DataType, OperatorKind},
Expand All @@ -12,7 +10,7 @@ use crate::{
#[derive(Clone, Debug)]
pub struct JobGraphGenerator {
/// The list of known blocks, indexed by block id.
blocks: HashMap<BlockId, BlockStructure, crate::block::CoordHasherBuilder>,
blocks: IndexMap<BlockId, BlockStructure, crate::block::CoordHasherBuilder>,
}

impl JobGraphGenerator {
Expand All @@ -30,11 +28,16 @@ impl JobGraphGenerator {
}

/// Finalize the generator and generate a string representation of the job graph in dot format.
pub fn finalize(self) -> String {
pub fn finalize(mut self) -> String {
self.blocks.sort_keys();
let attributes = vec!["ranksep=0.1"];
format!(
"digraph noir {{\n{attributes}\n{subgraphs}\n{connections}\n}}",
attributes = attributes.into_iter().map(|s| format!(" {s};")).join("\n"),
attributes = attributes
.into_iter()
.map(|s| format!(" {s};"))
.collect::<Vec<_>>()
.join("\n"),
subgraphs = self.gen_subgraphs(),
connections = self.gen_connections()
)
Expand All @@ -44,7 +47,7 @@ impl JobGraphGenerator {
/// the blocks in the network.
fn gen_subgraphs(&self) -> String {
let mut result = String::new();
for &block_id in self.blocks.keys().sorted() {
for &block_id in self.blocks.keys() {
let block = self.blocks.get(&block_id).unwrap();
result += &self.gen_subgraph(block_id, block);
}
Expand Down Expand Up @@ -89,19 +92,25 @@ impl JobGraphGenerator {
let attributes = attributes
.into_iter()
.map(|s| format!(" {s};"))
.collect::<Vec<_>>()
.join("\n");
let nodes = nodes
.into_iter()
.map(|s| format!(" {s};"))
.collect::<Vec<_>>()
.join("\n");
let nodes = nodes.into_iter().map(|s| format!(" {s};")).join("\n");
let connections = connections
.into_iter()
.map(|s| format!(" {s};",))
.collect::<Vec<_>>()
.join("\n");

format!(" subgraph {cluster_id} {{\n{attributes}\n{nodes}\n{connections}\n }}\n",)
}

/// Generate the connections between the operators in different blocks,
fn gen_connections(&self) -> String {
let mut receivers: HashMap<
let mut receivers: IndexMap<
(BlockId, BlockId),
(usize, DataType),
crate::block::CoordHasherBuilder,
Expand Down Expand Up @@ -150,7 +159,11 @@ impl JobGraphGenerator {
}
}
}
result.into_iter().map(|s| format!(" {s};")).join("\n")
result
.into_iter()
.map(|s| format!(" {s};"))
.collect::<Vec<_>>()
.join("\n")
}

/// Return the identifier of an operator.
Expand Down
19 changes: 15 additions & 4 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::path::Path;
use std::path::PathBuf;
use std::str::FromStr;

use anyhow::{bail, Result};
#[cfg(feature = "clap")]
use clap::Parser;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -52,7 +51,7 @@ pub const CONFIG_ENV_VAR: &str = "NOIR_CONFIG";
/// address = "host1"
/// base_port = 9500
/// num_cores = 16
///
///
/// [[host]]
/// address = "host2"
/// base_port = 9500
Expand Down Expand Up @@ -230,7 +229,7 @@ impl RuntimeConfig {
/// If it's the runner, the configuration file is read. If it's a worker, the configuration is
/// read directly from the environment variable and not from the file (remote hosts may not have
/// the configuration file).
pub fn remote<P: AsRef<Path>>(config: P) -> Result<RuntimeConfig> {
pub fn remote<P: AsRef<Path>>(config: P) -> Result<RuntimeConfig, ConfigError> {
let mut config = if let Some(config) = RuntimeConfig::config_from_env() {
config
} else {
Expand All @@ -242,7 +241,7 @@ impl RuntimeConfig {
// validate the configuration
for (host_id, host) in config.hosts.iter().enumerate() {
if host.ssh.password.is_some() && host.ssh.key_file.is_some() {
bail!("Malformed configuration: cannot specify both password and key file on host {}: {}", host_id, host.address);
return Err(ConfigError::Invalid(format!("Malformed configuration: cannot specify both password and key file on host {}: {}", host_id, host.address)));
}
}

Expand Down Expand Up @@ -333,3 +332,15 @@ impl CommandLineOptions {
fn ssh_default_port() -> u16 {
22
}

#[derive(Debug, thiserror::Error)]
pub enum ConfigError {
#[error("Serialization error: {0}")]
Serialization(#[from] toml::de::Error),

#[error("Input-Output error: {0}")]
IO(#[from] std::io::Error),

#[error("Invalid configuration: {0}")]
Invalid(String),
}
7 changes: 2 additions & 5 deletions src/network/sync/demultiplexer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::net::{Shutdown, TcpListener, TcpStream};
use std::thread::JoinHandle;

use anyhow::anyhow;
use std::collections::HashMap;
use std::net::ToSocketAddrs;

Expand Down Expand Up @@ -76,11 +75,9 @@ fn bind_remotes<In: ExchangeData>(
log::debug!("{coord} binding {}", address[0]);
let listener = TcpListener::bind(&*address)
.map_err(|e| {
anyhow!(
panic!(
"Failed to bind socket for {} at {:?}: {:?}",
coord,
address,
e
coord, address, e
)
})
.unwrap();
Expand Down
16 changes: 11 additions & 5 deletions src/network/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::thread::JoinHandle;

#[cfg(feature = "tokio")]
use futures::StreamExt;
use itertools::Itertools;
use indexmap::IndexSet;
use typemap_rev::{TypeMap, TypeMapKey};

use crate::channel::Sender;
Expand Down Expand Up @@ -509,16 +509,17 @@ impl NetworkTopology {
} else {
return;
};
let mut coords = HashSet::new();
let mut coords = IndexSet::new();
for (&(from, _typ), to) in self.next.iter() {
for &(to, _fragile) in to {
let coord = DemuxCoord::new(from, to);
coords.insert(coord);
}
}
coords.sort();
let mut used_ports: HashMap<HostId, u16> = HashMap::new();
// sort the coords in order to have a deterministic assignment between all the hosts
for coord in coords.into_iter().sorted() {
for coord in coords.into_iter() {
let host_id = coord.coord.host_id;
let port_offset = used_ports.entry(host_id).or_default();
let host = &config.hosts[host_id as usize];
Expand All @@ -543,9 +544,13 @@ impl NetworkTopology {

pub fn log(&self) {
let mut topology = "execution graph:".to_owned();
for ((coord, _typ), next) in self.next.iter().sorted() {
let mut sorted = self.next.iter().collect::<Vec<_>>();
sorted.sort();
for ((coord, _typ), next) in sorted {
write!(&mut topology, "\n {coord}:",).unwrap();
for (next, fragile) in next.iter().sorted() {
let mut next_sorted = next.iter().collect::<Vec<_>>();
next_sorted.sort();
for (next, fragile) in next_sorted {
write!(
&mut topology,
" {}{}",
Expand All @@ -563,6 +568,7 @@ impl NetworkTopology {
mod tests {
use crate::network::NetworkMessage;
use crate::operator::StreamElement;
use itertools::Itertools;

use super::*;

Expand Down
2 changes: 1 addition & 1 deletion src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use std::time::{Duration, Instant};

use base64::{engine::general_purpose::URL_SAFE_NO_PAD as B64, Engine};

use itertools::Itertools;
use sha2::Digest;
#[cfg(feature = "ssh")]
use ssh2::Session;
Expand Down Expand Up @@ -383,6 +382,7 @@ fn build_remote_command(
let args = std::env::args()
.skip(1)
.map(|arg| shell_escape::escape(arg.into()))
.collect::<Vec<_>>()
.join(" ");
let perf_cmd = if let Some(path) = perf_path.as_ref() {
warn!("Running remote process on host {} with perf enabled. This may cause performance regressions.", host_id);
Expand Down
8 changes: 3 additions & 5 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ use std::collections::HashMap;
use std::fmt::Write;
use std::thread::JoinHandle;

use itertools::Itertools;

use crate::block::{BatchMode, Block, BlockStructure, JobGraphGenerator, Replication};
use crate::config::{LocalConfig, RemoteConfig, RuntimeConfig};
use crate::network::{Coord, NetworkTopology};
Expand Down Expand Up @@ -316,11 +314,11 @@ impl Scheduler {
for (block_id, block) in self.block_info.iter() {
write!(&mut topology, "\n {}: {}", block_id, block.repr).unwrap();
if let Some(next) = &self.next_blocks.get(block_id) {
let sorted = next
let mut sorted = next
.iter()
.map(|(x, _, fragile)| format!("{}{}", x, if *fragile { "*" } else { "" }))
.sorted()
.collect_vec();
.collect::<Vec<_>>();
sorted.sort();
write!(&mut topology, "\n -> {sorted:?}",).unwrap();
}
}
Expand Down

0 comments on commit 9e80db8

Please sign in to comment.