Skip to content

Commit

Permalink
New function to add graph edges.
Browse files Browse the repository at this point in the history
  • Loading branch information
peterallenwebb committed Dec 3, 2024
1 parent 1b7d9b5 commit 7c13c39
Showing 1 changed file with 172 additions and 4 deletions.
176 changes: 172 additions & 4 deletions core/dbt/compilation.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import dataclasses
import json
import os
import pickle
from collections import defaultdict
from typing import Any, Dict, List, Optional, Tuple
from collections import defaultdict, deque
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple

import networkx as nx # type: ignore
import sqlparse
Expand Down Expand Up @@ -117,6 +118,14 @@ def _get_tests_for_node(manifest: Manifest, unique_id: UniqueID) -> List[UniqueI
return tests


@dataclasses.dataclass
class SeenDetails:
node_id: UniqueID
visits: int
ancestors: Set[UniqueID]
awaits_tests: Set[Tuple[UniqueID, Tuple[UniqueID, ...]]]


class Linker:
def __init__(self, data=None) -> None:
if data is None:
Expand Down Expand Up @@ -200,14 +209,51 @@ def add_test_edges(self, manifest: Manifest) -> None:
the set of nodes the test depends on is a subset of the upstream nodes
for the given node."""

# Given a graph:
# HISTORICAL NOTE: To understand the motivation behind this function,
# consider a node A with tests and a node B which depends (either directly
# or indirectly) on A. It would be nice if B were not executed until
# all of the tests on A are finished. After all, we don't want to
# propagate bad data. We can enforce that behavior by adding new
# dependencies (edges) from tests to nodes that should wait on them.
#
# This function implements a rough approximation of the behavior just
# described. In fact, for tests that only depend on a single node, it
# always works.
#
# Things get trickier for tests that depend on multiple nodes. In that
# case, if we are not careful, we will introduce cycles. That seems to
# be the reason this function adds dependencies from a downstream node to
# an upstream test if and only if the downstream node is already a
# descendant of all the nodes the upstream test depends on. By following
# that rule, it never makes the node dependent on new upstream nodes other
# than the test itself, and no cycles will be created.
#
# One drawback (Drawback 1) of the approach taken in this function is
# that it could still allow a downstream node to proceed before all
# testing is done on its ancestors, if it happens to have ancestors that
# are not also ancestors of a test with multiple dependencies.
#
# Another drawback (Drawback 2) is that the approach below adds far more
# edges than are strictly needed. After all, if we have A -> B -> C,
# there is no need to add a new edge A -> C. But this function often does.
#
# Drawback 2 is resolved in the new add_test_edges_2() implementation
# below, which is also typically much faster. Drawback 1 has been left in
# place in order to conservatively retain existing behavior, and so that
# the new implementation can be verified against this existing
# implementation by ensuring both resulting graphs have the same transitive
# reduction.

# MOTIVATING IDEA: Given a graph...
#
# model1 --> model2 --> model3
# | |
# | \/
# \/ test 2
# test1
#
# Produce the following graph:
# ...produce the following...
#
# model1 --> model2 --> model3
# | /\ | /\ /\
# | | \/ | |
Expand Down Expand Up @@ -247,6 +293,128 @@ def add_test_edges(self, manifest: Manifest) -> None:
if test_depends_on.issubset(upstream_nodes):
self.graph.add_edge(upstream_test, node_id, edge_type="parent_test")

def add_test_edges_2(self, manifest: Manifest):
graph = self.graph
new_edges = self._add_test_edges_2(graph, manifest)
for e in new_edges:
graph.add_edge(e[0], e[1], edge_type="parent_test")

Check warning on line 300 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L297-L300

Added lines #L297 - L300 were not covered by tests

@staticmethod
def _add_test_edges_2(
graph: nx.DiGraph, manifest: Manifest
) -> Iterable[Tuple[UniqueID, UniqueID]]:
# This function enforces the same execution behavior as add_test_eges,
# but executes more quickly in almost all cases, and adds far fewer
# edges. See the HISTORICAL NOTE above.
#
# The idea is to first scan for "single-tested" nodes (which have tests
# that depend only upon on that node) and "multi-tested" nodes (which
# have tests that also depend on other nodes). Single-tested nodes
# are handled quickly and easily.
#
# The less common but more complex case of multi-tested nodes is handled
# by sweeping through the graph in a breadth-first style, processing nodes
# from a ready queue which initially consists of nodes with no ancestors,
# and adding more nodes to the ready queue after all their ancestors
# have been processed. All the while, the relevant details of all known
# ancestors each "seen" node has, and which tests it is awaiting, are
# maintained in a SeenDetails record. The processing step adds test edges
# when every dependency of an awaited test is an ancestor of a node that
# is being processed. Downstream nodes are then exempted from awaiting
# that same test.

new_edges: List[Tuple[UniqueID, UniqueID]] = [] # The list of new edges to be returned
details: Dict[UniqueID, SeenDetails] = (

Check warning on line 327 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L326-L327

Added lines #L326 - L327 were not covered by tests
{}
) # Details of all the nodes found in the breadth first search
ready_set: deque = deque() # Queue of nodes ready for processing
exec_nodes = (

Check warning on line 331 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L330-L331

Added lines #L330 - L331 were not covered by tests
set()
) # Set of nodes that are "executable" (i.e. in the manifest and not tests)
multi_tested_nodes = (

Check warning on line 334 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L334

Added line #L334 was not covered by tests
set()
) # The set of nodes which have tests dependent on more than just themselves
singles: dict[UniqueID, List[UniqueID]] = (

Check warning on line 337 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L337

Added line #L337 was not covered by tests
{}
) # A dictionary mapping nodes to all single tests depending on them
for node_id in graph.nodes:
manifest_node = manifest.nodes.get(node_id, None)
if manifest_node is not None:
if manifest_node.resource_type != NodeType.Test:
exec_nodes.add(node_id)

Check warning on line 344 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L340-L344

Added lines #L340 - L344 were not covered by tests
else:
test_deps = manifest_node.depends_on_nodes
if len(test_deps) == 1:
if test_deps[0] not in singles:
singles[test_deps[0]] = []
singles[test_deps[0]].append(node_id)
elif len(test_deps) > 1:
multi_tested_nodes.update(manifest_node.depends_on_nodes)

Check warning on line 352 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L346-L352

Added lines #L346 - L352 were not covered by tests

if graph.in_degree(node_id) == 0:
details[node_id] = SeenDetails(

Check warning on line 355 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L354-L355

Added lines #L354 - L355 were not covered by tests
node_id=node_id, visits=0, ancestors=set(), awaits_tests=set()
)
ready_set.appendleft(node_id)

Check warning on line 358 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L358

Added line #L358 was not covered by tests

for node_id, test_ids in singles.items():
succs = [s for s in graph.successors(node_id) if s in exec_nodes]
for succ_id in succs:
for test_id in test_ids:
new_edges.append((test_id, succ_id))

Check warning on line 364 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L360-L364

Added lines #L360 - L364 were not covered by tests

# If there are no multi-tested nodes, we can skip the slower processing
# below.
if len(multi_tested_nodes) == 0:
return new_edges

Check warning on line 369 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L368-L369

Added lines #L368 - L369 were not covered by tests

# Perform the BFS-style processing of the graph.
while not len(ready_set) == 0:
curr_details: SeenDetails = details[ready_set.pop()]
test_ids = _get_tests_for_node(manifest, curr_details.node_id)
succs = graph.successors(curr_details.node_id)

Check warning on line 375 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L372-L375

Added lines #L372 - L375 were not covered by tests

new_awaits_for_succs = curr_details.awaits_tests.copy()
for test_id in test_ids:
deps: List[UniqueID] = sorted(manifest.nodes[test_id].depends_on_nodes)
if len(deps) > 1:
new_awaits_for_succs.add((test_id, tuple(deps)))

Check warning on line 381 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L377-L381

Added lines #L377 - L381 were not covered by tests

for succ_id in succs:
if succ_id not in exec_nodes:
continue
suc_details = details.get(succ_id, None)
if suc_details is None:
suc_details = SeenDetails(

Check warning on line 388 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L383-L388

Added lines #L383 - L388 were not covered by tests
node_id=succ_id, visits=0, ancestors=set(), awaits_tests=set()
)
details[succ_id] = suc_details
suc_details.visits += 1
suc_details.ancestors.update(curr_details.ancestors)
if curr_details.node_id in multi_tested_nodes:
suc_details.ancestors.add(curr_details.node_id)
suc_details.awaits_tests.update(new_awaits_for_succs)

Check warning on line 396 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L391-L396

Added lines #L391 - L396 were not covered by tests

if suc_details.visits == graph.in_degree(succ_id):
removes = set()
if len(suc_details.awaits_tests) > 0:
p = set(graph.predecessors(succ_id))
for awt in suc_details.awaits_tests:
if not any(

Check warning on line 403 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L398-L403

Added lines #L398 - L403 were not covered by tests
a for a in awt[1] if a not in p and a not in suc_details.ancestors
):
removes.add(awt)
new_edges.append((awt[0], succ_id))

Check warning on line 407 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L406-L407

Added lines #L406 - L407 were not covered by tests

suc_details.awaits_tests.difference_update(removes)
ready_set.appendleft(succ_id)

Check warning on line 410 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L409-L410

Added lines #L409 - L410 were not covered by tests

# We are now done with the current node and all of its ancestors.
# Discard its details to save memory.
del details[curr_details.node_id]

Check warning on line 414 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L414

Added line #L414 was not covered by tests

return new_edges

Check warning on line 416 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L416

Added line #L416 was not covered by tests

def get_graph(self, manifest: Manifest) -> Graph:
self.link_graph(manifest)
return Graph(self.graph)
Expand Down

0 comments on commit 7c13c39

Please sign in to comment.