Skip to content

Commit

Permalink
feat(query): adding indirect and all dependencies for tasks (#9207)
Browse files Browse the repository at this point in the history
### Description

Adds indirect and all dependencies as fields on tasks. 

You can review commit by commit.

### Testing Instructions

Adds tests along with a new fixture based off of
`task-dependencies/complex`
  • Loading branch information
NicholasLYang authored Oct 11, 2024
1 parent 8f1daba commit a99773e
Show file tree
Hide file tree
Showing 16 changed files with 570 additions and 76 deletions.
32 changes: 30 additions & 2 deletions crates/turborepo-graph-utils/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
mod walker;

use std::fmt::Display;
use std::{collections::HashSet, fmt::Display, hash::Hash};

use itertools::Itertools;
use petgraph::prelude::*;
use petgraph::{
prelude::*,
visit::{depth_first_search, Reversed},
};
use thiserror::Error;

#[derive(Debug, Error)]
Expand All @@ -14,6 +17,31 @@ pub enum Error {
SelfDependency(String),
}

pub fn transitive_closure<N: Hash + Eq + PartialEq, I: IntoIterator<Item = NodeIndex>>(
graph: &Graph<N, ()>,
indices: I,
direction: petgraph::Direction,
) -> HashSet<&N> {
let mut visited = HashSet::new();

let visitor = |event| {
if let petgraph::visit::DfsEvent::Discover(n, _) = event {
visited.insert(
graph
.node_weight(n)
.expect("node index found during dfs doesn't exist"),
);
}
};

match direction {
petgraph::Direction::Outgoing => depth_first_search(&graph, indices, visitor),
petgraph::Direction::Incoming => depth_first_search(Reversed(&graph), indices, visitor),
};

visited
}

pub fn validate_graph<G: Display>(graph: &Graph<G, ()>) -> Result<(), Error> {
// This is equivalent to AcyclicGraph.Cycles from Go's dag library
let cycles_lines = petgraph::algo::tarjan_scc(&graph)
Expand Down
33 changes: 33 additions & 0 deletions crates/turborepo-lib/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,23 @@ impl From<TaskId<'static>> for TaskNode {
}
}

#[derive(Debug, Error)]
pub enum Error {
#[error("expected a task node, got root")]
Root,
}

impl TryFrom<TaskNode> for TaskId<'static> {
type Error = Error;

fn try_from(node: TaskNode) -> Result<Self, Self::Error> {
match node {
TaskNode::Root => Err(Error::Root),
TaskNode::Task(id) => Ok(id),
}
}
}

#[derive(Debug, Default)]
pub struct Building;
#[derive(Debug, Default)]
Expand Down Expand Up @@ -332,6 +349,22 @@ impl Engine<Built> {
self.neighbors(task_id, petgraph::Direction::Incoming)
}

pub fn transitive_dependents(&self, task_id: &TaskId<'static>) -> HashSet<&TaskNode> {
turborepo_graph_utils::transitive_closure(
&self.task_graph,
self.task_lookup.get(task_id).cloned(),
petgraph::Direction::Incoming,
)
}

pub fn transitive_dependencies(&self, task_id: &TaskId<'static>) -> HashSet<&TaskNode> {
turborepo_graph_utils::transitive_closure(
&self.task_graph,
self.task_lookup.get(task_id).cloned(),
petgraph::Direction::Outgoing,
)
}

fn neighbors(
&self,
task_id: &TaskId,
Expand Down
1 change: 1 addition & 0 deletions crates/turborepo-lib/src/query/package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ impl Package {
async fn tasks(&self) -> Array<RepositoryTask> {
self.get_tasks()
.into_iter()
.sorted_by(|a, b| a.0.cmp(&b.0))
.map(|(name, script)| RepositoryTask {
name,
package: self.clone(),
Expand Down
155 changes: 138 additions & 17 deletions crates/turborepo-lib/src/query/task.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use std::sync::Arc;

use async_graphql::Object;
use itertools::Itertools;
use turborepo_errors::Spanned;

use crate::{
engine::TaskNode,
query::{package::Package, Array},
run::task_id::TaskId,
run::{task_id::TaskId, Run},
};

pub struct RepositoryTask {
Expand All @@ -13,6 +16,22 @@ pub struct RepositoryTask {
pub script: Option<Spanned<String>>,
}

impl RepositoryTask {
pub fn new(task_id: &TaskId, run: &Arc<Run>) -> Self {
let package = Package {
name: task_id.package().into(),
run: run.clone(),
};
let script = package.get_tasks().get(task_id.task()).cloned();

RepositoryTask {
name: task_id.task().to_string(),
package,
script,
}
}
}

#[Object]
impl RepositoryTask {
async fn name(&self) -> String {
Expand All @@ -23,6 +42,10 @@ impl RepositoryTask {
self.package.clone()
}

async fn full_name(&self) -> String {
format!("{}#{}", self.package.name, self.name)
}

async fn script(&self) -> Option<String> {
self.script.as_ref().map(|script| script.value.to_string())
}
Expand All @@ -37,14 +60,14 @@ impl RepositoryTask {
.flatten()
.filter_map(|task| match task {
TaskNode::Root => None,
TaskNode::Task(task) => Some(RepositoryTask {
name: task.task().to_string(),
package: Package {
run: self.package.run.clone(),
name: task.package().to_string().into(),
},
script: self.package.get_tasks().get(task.task()).cloned(),
}),
TaskNode::Task(task) if task == &task_id => None,
TaskNode::Task(task) => Some(RepositoryTask::new(task, &self.package.run)),
})
.sorted_by(|a, b| {
a.package
.name
.cmp(&b.package.name)
.then_with(|| a.name.cmp(&b.name))
})
.collect()
}
Expand All @@ -60,14 +83,112 @@ impl RepositoryTask {
.flatten()
.filter_map(|task| match task {
TaskNode::Root => None,
TaskNode::Task(task) => Some(RepositoryTask {
name: task.task().to_string(),
package: Package {
run: self.package.run.clone(),
name: task.package().to_string().into(),
},
script: self.package.get_tasks().get(task.task()).cloned(),
}),
TaskNode::Task(task) if task == &task_id => None,
TaskNode::Task(task) => Some(RepositoryTask::new(task, &self.package.run)),
})
.sorted_by(|a, b| {
a.package
.name
.cmp(&b.package.name)
.then_with(|| a.name.cmp(&b.name))
})
.collect()
}

async fn indirect_dependents(&self) -> Array<RepositoryTask> {
let task_id = TaskId::from_static(self.package.name.to_string(), self.name.clone());
let direct_dependents = self
.package
.run
.engine()
.dependencies(&task_id)
.unwrap_or_default();
self.package
.run
.engine()
.transitive_dependents(&task_id)
.into_iter()
.filter(|node| !direct_dependents.contains(node))
.filter_map(|node| match node {
TaskNode::Root => None,
TaskNode::Task(task) if task == &task_id => None,
TaskNode::Task(task) => Some(RepositoryTask::new(task, &self.package.run)),
})
.sorted_by(|a, b| {
a.package
.name
.cmp(&b.package.name)
.then_with(|| a.name.cmp(&b.name))
})
.collect()
}

async fn indirect_dependencies(&self) -> Array<RepositoryTask> {
let task_id = TaskId::from_static(self.package.name.to_string(), self.name.clone());
let direct_dependencies = self
.package
.run
.engine()
.dependencies(&task_id)
.unwrap_or_default();
self.package
.run
.engine()
.transitive_dependencies(&task_id)
.into_iter()
.filter(|node| !direct_dependencies.contains(node))
.filter_map(|node| match node {
TaskNode::Root => None,
TaskNode::Task(task) if task == &task_id => None,
TaskNode::Task(task) => Some(RepositoryTask::new(task, &self.package.run)),
})
.sorted_by(|a, b| {
a.package
.name
.cmp(&b.package.name)
.then_with(|| a.name.cmp(&b.name))
})
.collect()
}

async fn all_dependents(&self) -> Array<RepositoryTask> {
let task_id = TaskId::from_static(self.package.name.to_string(), self.name.clone());
self.package
.run
.engine()
.transitive_dependents(&task_id)
.into_iter()
.filter_map(|node| match node {
TaskNode::Root => None,
TaskNode::Task(task) if task == &task_id => None,
TaskNode::Task(task) => Some(RepositoryTask::new(task, &self.package.run)),
})
.sorted_by(|a, b| {
a.package
.name
.cmp(&b.package.name)
.then_with(|| a.name.cmp(&b.name))
})
.collect()
}

async fn all_dependencies(&self) -> Array<RepositoryTask> {
let task_id = TaskId::from_static(self.package.name.to_string(), self.name.clone());
self.package
.run
.engine()
.transitive_dependencies(&task_id)
.into_iter()
.filter_map(|node| match node {
TaskNode::Root => None,
TaskNode::Task(task) if task == &task_id => None,
TaskNode::Task(task) => Some(RepositoryTask::new(task, &self.package.run)),
})
.sorted_by(|a, b| {
a.package
.name
.cmp(&b.package.name)
.then_with(|| a.name.cmp(&b.name))
})
.collect()
}
Expand Down
61 changes: 22 additions & 39 deletions crates/turborepo-repository/src/package_graph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::{
};

use itertools::Itertools;
use petgraph::visit::{depth_first_search, Reversed};
use serde::Serialize;
use tracing::debug;
use turbopath::{
Expand Down Expand Up @@ -259,8 +258,11 @@ impl PackageGraph {
/// dependencies(a) = {b, c}
#[allow(dead_code)]
pub fn dependencies<'a>(&'a self, node: &PackageNode) -> HashSet<&'a PackageNode> {
let mut dependencies =
self.transitive_closure_inner(Some(node), petgraph::Direction::Outgoing);
let mut dependencies = turborepo_graph_utils::transitive_closure(
&self.graph,
self.node_lookup.get(node).cloned(),
petgraph::Direction::Outgoing,
);
// Add in all root dependencies as they're implied dependencies for every
// package in the graph.
dependencies.extend(self.root_internal_dependencies());
Expand All @@ -281,7 +283,11 @@ impl PackageGraph {
let mut dependents = if self.root_internal_dependencies().contains(node) {
return self.graph.node_weights().collect();
} else {
self.transitive_closure_inner(Some(node), petgraph::Direction::Incoming)
turborepo_graph_utils::transitive_closure(
&self.graph,
self.node_lookup.get(node).cloned(),
petgraph::Direction::Incoming,
)
};
dependents.remove(node);
dependents
Expand Down Expand Up @@ -358,8 +364,11 @@ impl PackageGraph {
fn root_internal_dependencies(&self) -> HashSet<&PackageNode> {
// We cannot call self.dependencies(&PackageNode::Workspace(PackageName::Root))
// as it will infinitely recurse.
let mut dependencies = self.transitive_closure_inner(
Some(&PackageNode::Workspace(PackageName::Root)),
let mut dependencies = turborepo_graph_utils::transitive_closure(
&self.graph,
self.node_lookup
.get(&PackageNode::Workspace(PackageName::Root))
.cloned(),
petgraph::Direction::Outgoing,
);
dependencies.remove(&PackageNode::Workspace(PackageName::Root));
Expand All @@ -375,39 +384,13 @@ impl PackageGraph {
&'a self,
nodes: I,
) -> HashSet<&'a PackageNode> {
self.transitive_closure_inner(nodes, petgraph::Direction::Outgoing)
}

fn transitive_closure_inner<'a, 'b, I: IntoIterator<Item = &'b PackageNode>>(
&'a self,
nodes: I,
direction: petgraph::Direction,
) -> HashSet<&'a PackageNode> {
let indices = nodes
.into_iter()
.filter_map(|node| self.node_lookup.get(node))
.copied();

let mut visited = HashSet::new();

let visitor = |event| {
if let petgraph::visit::DfsEvent::Discover(n, _) = event {
visited.insert(
self.graph
.node_weight(n)
.expect("node index found during dfs doesn't exist"),
);
}
};

match direction {
petgraph::Direction::Outgoing => depth_first_search(&self.graph, indices, visitor),
petgraph::Direction::Incoming => {
depth_first_search(Reversed(&self.graph), indices, visitor)
}
};

visited
turborepo_graph_utils::transitive_closure(
&self.graph,
nodes
.into_iter()
.flat_map(|node| self.node_lookup.get(node).cloned()),
petgraph::Direction::Outgoing,
)
}

pub fn transitive_external_dependencies<'a, I: IntoIterator<Item = &'a PackageName>>(
Expand Down
Loading

0 comments on commit a99773e

Please sign in to comment.