Skip to content

Commit

Permalink
Add: dep-graph library for scheduler
Browse files Browse the repository at this point in the history
This library originates from https://github.com/nmoutschen/dep-graph
It is used to create a dependency graph for plugins and iterate over its notes either sequentiel or in parallel
  • Loading branch information
Kraemii committed Oct 25, 2023
1 parent c4c8e4f commit d9529cf
Show file tree
Hide file tree
Showing 8 changed files with 1,203 additions and 0 deletions.
28 changes: 28 additions & 0 deletions rust/dep-graph/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
[package]
name = "dep-graph"
version = "0.2.0"
authors = ["Nicolas Moutschen <[email protected]>"]
edition = "2018"
license = "MIT"
readme = "README.md"
repository = "https://github.com/nmoutschen/dep-graph"
description = "Dependency graph resolver library"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]
default = ["parallel"]

parallel = ["rayon", "crossbeam-channel"]

[dev-dependencies]
criterion = "0.5.1"

[[bench]]
name = "dep_graph"
harness = false

[dependencies]
crossbeam-channel = { version = "0.5.8", optional = true }
rayon = { version = "1.8.0", optional = true }
num_cpus = "1.16.0"
61 changes: 61 additions & 0 deletions rust/dep-graph/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Dependency Graph

Author: Nicolas Moutschen <[email protected]>

Origin: https://github.com/nmoutschen/dep-graph

This is a rust library to perform iterative operations over dependency graphs.

## Usage

This library supports both sequential and parallel (multi-threaded) operations out of the box. By default, multi-threaded operations will run a number of threads equal to the number of cores.

### Parallel operations

Here is a simple example on how to use this library:

```rust
use dep_graph::{Node, DepGraph};
#[cfg(feature = "parallel")]
use rayon::prelude::*;

// Create a list of nodes
let mut root = Node::new("root");
let mut dep1 = Node::new("dep1");
let mut dep2 = Node::new("dep2");
let leaf = Node::new("leaf");

// Map their connections
root.add_dep(dep1.id());
root.add_dep(dep2.id());
dep1.add_dep(leaf.id());
dep2.add_dep(leaf.id());

// Create a graph
let nodes = vec![root, dep1, dep2, leaf];

// Print the name of all nodes in the dependency graph.
// This will parse the dependency graph sequentially
{
let graph = DepGraph::new(&nodes);
graph
.into_iter()
.for_each(|node| {
println!("{:?}", node)
});
}

// This is the same as the previous command, excepts it leverages rayon
// to process them in parallel as much as possible.
#[cfg(feature = "parallel")]
{
let graph = DepGraph::new(&nodes);
graph
.into_par_iter()
.for_each(|node| {
// The node is a dep_graph::Wrapper object, not a String.
// We need to use `*node` to get its value.
println!("{:?}", *node)
});
}
```
128 changes: 128 additions & 0 deletions rust/dep-graph/benches/dep_graph.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// SPDX-FileCopyrightText: 2023 Greenbone AG
// SPDX-FileCopyrightText: 2018 Nicolas Moutschen
//
// SPDX-License-Identifier: GPL-2.0-or-later

use criterion::{black_box, criterion_group, criterion_main, Criterion};
use dep_graph::{DepGraph, Node};
#[cfg(feature = "parallel")]
use rayon::prelude::*;
use std::thread;
use std::time::Duration;

/// Create a layer of nodes that don't have any dependencies
fn root_layer(count: usize) -> Vec<Node<String>> {
(0..count)
.map(|i| Node::new(format!("node0_{}", i)))
.collect()
}

/// Utility function that adds a layer of nodes depending on the provided
/// layer and returns them.
fn add_layer(index: usize, count: usize) -> Vec<Node<String>> {
(0..count)
.map(|i| {
let mut node = Node::new(format!("node{}_{}", index, i));
// Mark the entire previous layer as a dependency of this node
for j in 0..count {
node.add_dep(format!("node{}_{}", index - 1, j));
}
node
})
.collect()
}

pub fn parallel_benchmark(c: &mut Criterion) {
const NUM_LAYERS: usize = 20;
#[cfg(feature = "parallel")]
fn par_no_op(nodes: &Vec<Node<String>>) {
DepGraph::new(nodes)
.into_par_iter()
.for_each(|_node| thread::sleep(Duration::from_nanos(100)))
}
fn seq_no_op(nodes: &Vec<Node<String>>) {
DepGraph::new(nodes)
.into_iter()
.for_each(|_node| thread::sleep(Duration::from_nanos(100)))
}

{
// Create a graph with the same number of nodes per layer as the number
// of cores.
let count = num_cpus::get();
let mut nodes = root_layer(count);
(1..NUM_LAYERS).for_each(|i| {
add_layer(i, count)
.iter()
.for_each(|node| nodes.push(node.clone()))
});

// Run the resolver
#[cfg(feature = "parallel")]
c.bench_function("par_same_nodes", |b| {
b.iter(|| par_no_op(black_box(&nodes)))
});
c.bench_function("seq_same_nodes", |b| {
b.iter(|| seq_no_op(black_box(&nodes)))
});
}

{
// Create a graph with a graph twice as broad but half as deep
let count = num_cpus::get();
let mut nodes = root_layer(count * 2);
(1..NUM_LAYERS / 2).for_each(|i| {
add_layer(i, count * 2)
.iter()
.for_each(|node| nodes.push(node.clone()))
});

// Run the resolver
#[cfg(feature = "parallel")]
c.bench_function("par_double_nodes", |b| {
b.iter(|| par_no_op(black_box(&nodes)))
});
c.bench_function("seq_double_nodes", |b| {
b.iter(|| seq_no_op(black_box(&nodes)))
});
}

{
// Create a graph with a graph half as broad but twice as deep
let count = num_cpus::get();
let mut nodes = root_layer(count / 2);
(1..NUM_LAYERS * 2).for_each(|i| {
add_layer(i, count / 2)
.iter()
.for_each(|node| nodes.push(node.clone()))
});

// Run the resolver
#[cfg(feature = "parallel")]
c.bench_function("par_half_nodes", |b| {
b.iter(|| par_no_op(black_box(&nodes)))
});
c.bench_function("seq_half_nodes", |b| {
b.iter(|| seq_no_op(black_box(&nodes)))
});
}

{
// Create a graph with 100 nodes per layer
let count = 100;
let mut nodes = root_layer(count);
(1..NUM_LAYERS).for_each(|i| {
add_layer(i, count)
.iter()
.for_each(|node| nodes.push(node.clone()))
});

// Run the resolver
#[cfg(feature = "parallel")]
c.bench_function("par_100_nodes", |b| b.iter(|| par_no_op(black_box(&nodes))));
c.bench_function("seq_100_nodes", |b| b.iter(|| seq_no_op(black_box(&nodes))));
}
}

criterion_group!(benches, parallel_benchmark);
criterion_main!(benches);
36 changes: 36 additions & 0 deletions rust/dep-graph/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// SPDX-FileCopyrightText: 2023 Greenbone AG
// SPDX-FileCopyrightText: 2018 Nicolas Moutschen
//
// SPDX-License-Identifier: GPL-2.0-or-later

use std::error;
use std::fmt;

#[derive(Debug)]
pub enum Error {
CloseNodeError(String, &'static str),
/// The list of dependencies is empty
EmptyListError,
IteratorDropped,
NoAvailableNodeError,
ResolveGraphError(&'static str),
}

impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::CloseNodeError(name, reason) => {
write!(f, "Failed to close node {}: {}", name, reason)
}
Self::EmptyListError => write!(f, "The dependency list is empty"),
Self::IteratorDropped => write!(
f,
"The iterator attached to the coordination thread dropped"
),
Self::NoAvailableNodeError => write!(f, "No node are currently available"),
Self::ResolveGraphError(reason) => write!(f, "Failed to resolve the graph: {}", reason), // _ => write!(f, "{:?}", self),
}
}
}

impl error::Error for Error {}
Loading

0 comments on commit d9529cf

Please sign in to comment.