Skip to content

Commit

Permalink
Clean up, leave out flag temporarily for testing.
Browse files Browse the repository at this point in the history
  • Loading branch information
peterallenwebb committed Dec 4, 2024
1 parent 7c13c39 commit 333294f
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 71 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20241204-100429.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Improve the performance characteristics of add_test_edges()
time: 2024-12-04T10:04:29.096231-05:00
custom:
Author: peterallenwebb
Issue: "10950"
1 change: 1 addition & 0 deletions core/dbt/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ def global_flags(func):
@p.warn_error
@p.warn_error_options
@p.write_json
@p.use_fast_test_edges
@functools.wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
Expand Down
7 changes: 7 additions & 0 deletions core/dbt/cli/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -735,3 +735,10 @@ def _version_callback(ctx, _param, value):
envvar="DBT_SHOW_RESOURCE_REPORT",
hidden=True,
)

use_fast_test_edges = click.option(
"--use-fast-test-edges/--no-use-fast-test-edges",
envvar="DBT_USE_FAST_TEST_EDGES",
default=False,
hidden=True,
)
162 changes: 91 additions & 71 deletions core/dbt/compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,11 @@ def _get_tests_for_node(manifest: Manifest, unique_id: UniqueID) -> List[UniqueI
@dataclasses.dataclass
class SeenDetails:
node_id: UniqueID
visits: int
ancestors: Set[UniqueID]
awaits_tests: Set[Tuple[UniqueID, Tuple[UniqueID, ...]]]
visits: int = 0
ancestors: Set[UniqueID] = dataclasses.field(default_factory=set)
awaits_tests: Set[Tuple[UniqueID, Tuple[UniqueID, ...]]] = dataclasses.field(
default_factory=set
)


class Linker:
Expand Down Expand Up @@ -204,6 +206,13 @@ def link_graph(self, manifest: Manifest):
raise RuntimeError("Found a cycle: {}".format(cycle))

def add_test_edges(self, manifest: Manifest) -> None:
# if not get_flags().USE_FAST_TEST_EDGES:
# self.add_test_edges_1(manifest)
# else:
# self.add_test_edges_2(manifest)
self.add_test_edges_2(manifest)

Check warning on line 213 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L213

Added line #L213 was not covered by tests

def add_test_edges_1(self, manifest: Manifest) -> None:
"""This method adds additional edges to the DAG. For a given non-test
executable node, add an edge from an upstream test to the given node if
the set of nodes the test depends on is a subset of the upstream nodes
Expand Down Expand Up @@ -295,15 +304,15 @@ def add_test_edges(self, manifest: Manifest) -> None:

def add_test_edges_2(self, manifest: Manifest):
graph = self.graph
new_edges = self._add_test_edges_2(graph, manifest)
new_edges = self._get_test_edges_2(graph, manifest)
for e in new_edges:
graph.add_edge(e[0], e[1], edge_type="parent_test")

Check warning on line 309 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L306-L309

Added lines #L306 - L309 were not covered by tests

@staticmethod
def _add_test_edges_2(
def _get_test_edges_2(
graph: nx.DiGraph, manifest: Manifest
) -> Iterable[Tuple[UniqueID, UniqueID]]:
# This function enforces the same execution behavior as add_test_eges,
# This function enforces the same execution behavior as add_test_edges,
# but executes more quickly in almost all cases, and adds far fewer
# edges. See the HISTORICAL NOTE above.
#
Expand All @@ -313,101 +322,112 @@ def _add_test_edges_2(
# are handled quickly and easily.
#
# The less common but more complex case of multi-tested nodes is handled
# by sweeping through the graph in a breadth-first style, processing nodes
# from a ready queue which initially consists of nodes with no ancestors,
# and adding more nodes to the ready queue after all their ancestors
# have been processed. All the while, the relevant details of all known
# ancestors each "seen" node has, and which tests it is awaiting, are
# maintained in a SeenDetails record. The processing step adds test edges
# when every dependency of an awaited test is an ancestor of a node that
# is being processed. Downstream nodes are then exempted from awaiting
# that same test.

new_edges: List[Tuple[UniqueID, UniqueID]] = [] # The list of new edges to be returned
details: Dict[UniqueID, SeenDetails] = (
{}
) # Details of all the nodes found in the breadth first search
ready_set: deque = deque() # Queue of nodes ready for processing
exec_nodes = (
set()
) # Set of nodes that are "executable" (i.e. in the manifest and not tests)
multi_tested_nodes = (
set()
) # The set of nodes which have tests dependent on more than just themselves
singles: dict[UniqueID, List[UniqueID]] = (
{}
) # A dictionary mapping nodes to all single tests depending on them
# by a specialized function.

new_edges: List[Tuple[UniqueID, UniqueID]] = []

Check warning on line 327 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L327

Added line #L327 was not covered by tests

source_nodes: List[UniqueID] = []
executable_nodes: Set[UniqueID] = set()
multi_tested_nodes = set()

Check warning on line 331 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L329-L331

Added lines #L329 - L331 were not covered by tests
# Dictionary mapping nodes with single-dep tests to a list of those tests.
single_tested_nodes: dict[UniqueID, List[UniqueID]] = defaultdict(list)
for node_id in graph.nodes:
manifest_node = manifest.nodes.get(node_id, None)
if manifest_node is not None:
if manifest_node.resource_type != NodeType.Test:
exec_nodes.add(node_id)
else:
test_deps = manifest_node.depends_on_nodes
if len(test_deps) == 1:
if test_deps[0] not in singles:
singles[test_deps[0]] = []
singles[test_deps[0]].append(node_id)
elif len(test_deps) > 1:
multi_tested_nodes.update(manifest_node.depends_on_nodes)

if graph.in_degree(node_id) == 0:
details[node_id] = SeenDetails(
node_id=node_id, visits=0, ancestors=set(), awaits_tests=set()
)
ready_set.appendleft(node_id)
if manifest_node is None:
continue

Check warning on line 337 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L333-L337

Added lines #L333 - L337 were not covered by tests

if next(graph.predecessors(node_id), None) is None:
source_nodes.append(node_id)

Check warning on line 340 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L339-L340

Added lines #L339 - L340 were not covered by tests

for node_id, test_ids in singles.items():
succs = [s for s in graph.successors(node_id) if s in exec_nodes]
if manifest_node.resource_type != NodeType.Test:
executable_nodes.add(node_id)

Check warning on line 343 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L342-L343

Added lines #L342 - L343 were not covered by tests
else:
test_deps = manifest_node.depends_on_nodes
if len(test_deps) == 1:
single_tested_nodes[test_deps[0]].append(node_id)
elif len(test_deps) > 1:
multi_tested_nodes.update(manifest_node.depends_on_nodes)

Check warning on line 349 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L345-L349

Added lines #L345 - L349 were not covered by tests

# Now that we have all the necessary information conveniently organized,
# add new edges for single-tested nodes.
for node_id, test_ids in single_tested_nodes.items():
succs = [s for s in graph.successors(node_id) if s in executable_nodes]
for succ_id in succs:
for test_id in test_ids:
new_edges.append((test_id, succ_id))

Check warning on line 357 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L353-L357

Added lines #L353 - L357 were not covered by tests

# If there are no multi-tested nodes, we can skip the slower processing
# below.
if len(multi_tested_nodes) == 0:
return new_edges
# Get the edges for multi-tested nodes separately, if needed.
if len(multi_tested_nodes) > 0:
multi_test_edges = Linker._get_multi_test_edges(

Check warning on line 361 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L360-L361

Added lines #L360 - L361 were not covered by tests
graph, manifest, source_nodes, executable_nodes, multi_tested_nodes
)
new_edges += multi_test_edges

Check warning on line 364 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L364

Added line #L364 was not covered by tests

# Perform the BFS-style processing of the graph.
while not len(ready_set) == 0:
curr_details: SeenDetails = details[ready_set.pop()]
test_ids = _get_tests_for_node(manifest, curr_details.node_id)
succs = graph.successors(curr_details.node_id)
return new_edges

Check warning on line 366 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L366

Added line #L366 was not covered by tests

@staticmethod
def _get_multi_test_edges(
graph: nx.DiGraph,
manifest: Manifest,
source_nodes: Iterable[UniqueID],
executable_nodes: Set[UniqueID],
multi_tested_nodes,
) -> List[Tuple[UniqueID, UniqueID]]:
# Works through the graph in a breadth-first style, processing nodes from
# a ready queue which initially consists of nodes with no ancestors,
# and adding more nodes to the ready queue after all their ancestors
# have been processed. All the while, the relevant details of all known
# the "seen" (i.e. encountered) nodes are maintained in a SeenDetails
# record, including ancestor set which tests it is "awaiting" (i.e. which
# are tests of its ancestors), maintained in a SeenDetails record. The
# processing step adds test edges when every dependency of an awaited
# test is an ancestor of a node that is being processed. Downstream nodes
# are then exempted from awaiting that same test.
#
# Memory consumption is potentially O(n^2) with n the number of nodes in
# the graph, since the average number of ancestors for each of n nodes
# could be O(n) but we only track ancestors that are multi-tested, which
# should keep things closer to O(n) in real-world scenarios.

new_edges: List[Tuple[UniqueID, UniqueID]] = []
ready: deque = deque(source_nodes)
details = {node_id: SeenDetails(node_id) for node_id in source_nodes}

Check warning on line 394 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L392-L394

Added lines #L392 - L394 were not covered by tests

while len(ready) > 0:
curr_details: SeenDetails = details[ready.pop()]
test_ids = _get_tests_for_node(manifest, curr_details.node_id)
new_awaits_for_succs = curr_details.awaits_tests.copy()
for test_id in test_ids:
deps: List[UniqueID] = sorted(manifest.nodes[test_id].depends_on_nodes)
if len(deps) > 1:

Check warning on line 402 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L396-L402

Added lines #L396 - L402 were not covered by tests
# Tests with only one dep were already handled.
new_awaits_for_succs.add((test_id, tuple(deps)))

Check warning on line 404 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L404

Added line #L404 was not covered by tests

for succ_id in succs:
if succ_id not in exec_nodes:
continue
for succ_id in [

Check warning on line 406 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L406

Added line #L406 was not covered by tests
s for s in graph.successors(curr_details.node_id) if s in executable_nodes
]:
suc_details = details.get(succ_id, None)
if suc_details is None:
suc_details = SeenDetails(
node_id=succ_id, visits=0, ancestors=set(), awaits_tests=set()
)
suc_details = SeenDetails(succ_id)
details[succ_id] = suc_details
suc_details.visits += 1
suc_details.awaits_tests.update(new_awaits_for_succs)
suc_details.ancestors.update(curr_details.ancestors)
if curr_details.node_id in multi_tested_nodes:

Check warning on line 416 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L409-L416

Added lines #L409 - L416 were not covered by tests
# Only track ancestry information for the set of nodes
# we will actually check against later.
suc_details.ancestors.add(curr_details.node_id)

Check warning on line 419 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L419

Added line #L419 was not covered by tests
suc_details.awaits_tests.update(new_awaits_for_succs)

if suc_details.visits == graph.in_degree(succ_id):
removes = set()
if len(suc_details.awaits_tests) > 0:
p = set(graph.predecessors(succ_id))
removes = set()
for awt in suc_details.awaits_tests:
if not any(
a for a in awt[1] if a not in p and a not in suc_details.ancestors
):
if not any(True for a in awt[1] if a not in suc_details.ancestors):
removes.add(awt)
new_edges.append((awt[0], succ_id))

Check warning on line 427 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L421-L427

Added lines #L421 - L427 were not covered by tests

suc_details.awaits_tests.difference_update(removes)
ready_set.appendleft(succ_id)
ready.appendleft(succ_id)

Check warning on line 430 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L429-L430

Added lines #L429 - L430 were not covered by tests

# We are now done with the current node and all of its ancestors.
# Discard its details to save memory.
Expand Down

0 comments on commit 333294f

Please sign in to comment.