Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the Performance Characteristics of add_test_edges() #11092

Merged
merged 4 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20241204-100429.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Improve the performance characteristics of add_test_edges()
time: 2024-12-04T10:04:29.096231-05:00
custom:
Author: peterallenwebb
Issue: "10950"
1 change: 1 addition & 0 deletions core/dbt/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ def global_flags(func):
@p.warn_error
@p.warn_error_options
@p.write_json
@p.use_fast_test_edges
@functools.wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
Expand Down
7 changes: 7 additions & 0 deletions core/dbt/cli/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -735,3 +735,10 @@ def _version_callback(ctx, _param, value):
envvar="DBT_SHOW_RESOURCE_REPORT",
hidden=True,
)

use_fast_test_edges = click.option(
"--use-fast-test-edges/--no-use-fast-test-edges",
envvar="DBT_USE_FAST_TEST_EDGES",
default=False,
hidden=True,
)
195 changes: 191 additions & 4 deletions core/dbt/compilation.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import dataclasses
import json
import os
import pickle
from collections import defaultdict
from typing import Any, Dict, List, Optional, Tuple
from collections import defaultdict, deque
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple

import networkx as nx # type: ignore
import sqlparse
Expand Down Expand Up @@ -117,6 +118,16 @@
return tests


@dataclasses.dataclass
class SeenDetails:
node_id: UniqueID
visits: int = 0
ancestors: Set[UniqueID] = dataclasses.field(default_factory=set)
awaits_tests: Set[Tuple[UniqueID, Tuple[UniqueID, ...]]] = dataclasses.field(
default_factory=set
)


class Linker:
def __init__(self, data=None) -> None:
if data is None:
Expand Down Expand Up @@ -195,19 +206,62 @@
raise RuntimeError("Found a cycle: {}".format(cycle))

def add_test_edges(self, manifest: Manifest) -> None:
if not get_flags().USE_FAST_TEST_EDGES:
self.add_test_edges_1(manifest)
else:
self.add_test_edges_2(manifest)

Check warning on line 212 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L212

Added line #L212 was not covered by tests

def add_test_edges_1(self, manifest: Manifest) -> None:
"""This method adds additional edges to the DAG. For a given non-test
executable node, add an edge from an upstream test to the given node if
the set of nodes the test depends on is a subset of the upstream nodes
for the given node."""

# Given a graph:
# HISTORICAL NOTE: To understand the motivation behind this function,
# consider a node A with tests and a node B which depends (either directly
# or indirectly) on A. It would be nice if B were not executed until
# all of the tests on A are finished. After all, we don't want to
# propagate bad data. We can enforce that behavior by adding new
# dependencies (edges) from tests to nodes that should wait on them.
#
# This function implements a rough approximation of the behavior just
# described. In fact, for tests that only depend on a single node, it
# always works.
#
# Things get trickier for tests that depend on multiple nodes. In that
# case, if we are not careful, we will introduce cycles. That seems to
# be the reason this function adds dependencies from a downstream node to
# an upstream test if and only if the downstream node is already a
# descendant of all the nodes the upstream test depends on. By following
# that rule, it never makes the node dependent on new upstream nodes other
# than the tests themselves, and no cycles will be created.
#
# One drawback (Drawback 1) of the approach taken in this function is
# that it could still allow a downstream node to proceed before all
# testing is done on its ancestors, if it happens to have ancestors that
# are not also ancestors of a test with multiple dependencies.
#
# Another drawback (Drawback 2) is that the approach below adds far more
# edges than are strictly needed. After all, if we have A -> B -> C,
# there is no need to add a new edge A -> C. But this function often does.
#
# Drawback 2 is resolved in the new add_test_edges_2() implementation
# below, which is also typically much faster. Drawback 1 has been left in
# place in order to conservatively retain existing behavior, and so that
# the new implementation can be verified against this existing
# implementation by ensuring both resulting graphs have the same transitive
# reduction.

# MOTIVATING IDEA: Given a graph...
#
# model1 --> model2 --> model3
# | |
# | \/
# \/ test 2
# test1
#
# Produce the following graph:
# ...produce the following...
#
# model1 --> model2 --> model3
# | /\ | /\ /\
# | | \/ | |
Expand Down Expand Up @@ -247,6 +301,139 @@
if test_depends_on.issubset(upstream_nodes):
self.graph.add_edge(upstream_test, node_id, edge_type="parent_test")

def add_test_edges_2(self, manifest: Manifest):
graph = self.graph
new_edges = self._get_test_edges_2(graph, manifest)
for e in new_edges:
graph.add_edge(e[0], e[1], edge_type="parent_test")

Check warning on line 308 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L305-L308

Added lines #L305 - L308 were not covered by tests

@staticmethod
def _get_test_edges_2(
graph: nx.DiGraph, manifest: Manifest
) -> Iterable[Tuple[UniqueID, UniqueID]]:
# This function enforces the same execution behavior as add_test_edges,
# but executes far more quickly and adds far fewer edges. See the
# HISTORICAL NOTE above.
#
# The idea is to first scan for "single-tested" nodes (which have tests
# that depend only upon on that node) and "multi-tested" nodes (which
# have tests that depend on multiple nodes). Single-tested nodes are
# handled quickly and easily.
#
# The less common but more complex case of multi-tested nodes is handled
# by a specialized function.

new_edges: List[Tuple[UniqueID, UniqueID]] = []

Check warning on line 326 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L326

Added line #L326 was not covered by tests

source_nodes: List[UniqueID] = []
executable_nodes: Set[UniqueID] = set()
multi_tested_nodes = set()

Check warning on line 330 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L328-L330

Added lines #L328 - L330 were not covered by tests
# Dictionary mapping nodes with single-dep tests to a list of those tests.
single_tested_nodes: dict[UniqueID, List[UniqueID]] = defaultdict(list)
for node_id in graph.nodes:
manifest_node = manifest.nodes.get(node_id, None)
if manifest_node is None:
continue

Check warning on line 336 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L332-L336

Added lines #L332 - L336 were not covered by tests

if next(graph.predecessors(node_id), None) is None:
source_nodes.append(node_id)

Check warning on line 339 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L338-L339

Added lines #L338 - L339 were not covered by tests

if manifest_node.resource_type != NodeType.Test:
executable_nodes.add(node_id)

Check warning on line 342 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L341-L342

Added lines #L341 - L342 were not covered by tests
else:
test_deps = manifest_node.depends_on_nodes
if len(test_deps) == 1:
single_tested_nodes[test_deps[0]].append(node_id)
elif len(test_deps) > 1:
multi_tested_nodes.update(manifest_node.depends_on_nodes)

Check warning on line 348 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L344-L348

Added lines #L344 - L348 were not covered by tests

# Now that we have all the necessary information conveniently organized,
# add new edges for single-tested nodes.
for node_id, test_ids in single_tested_nodes.items():
succs = [s for s in graph.successors(node_id) if s in executable_nodes]
for succ_id in succs:
for test_id in test_ids:
new_edges.append((test_id, succ_id))

Check warning on line 356 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L352-L356

Added lines #L352 - L356 were not covered by tests

# Get the edges for multi-tested nodes separately, if needed.
if len(multi_tested_nodes) > 0:
multi_test_edges = Linker._get_multi_test_edges(

Check warning on line 360 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L359-L360

Added lines #L359 - L360 were not covered by tests
graph, manifest, source_nodes, executable_nodes, multi_tested_nodes
)
new_edges += multi_test_edges

Check warning on line 363 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L363

Added line #L363 was not covered by tests

return new_edges

Check warning on line 365 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L365

Added line #L365 was not covered by tests

@staticmethod
def _get_multi_test_edges(
graph: nx.DiGraph,
manifest: Manifest,
source_nodes: Iterable[UniqueID],
executable_nodes: Set[UniqueID],
multi_tested_nodes,
) -> List[Tuple[UniqueID, UniqueID]]:
# Works through the graph in a breadth-first style, processing nodes from
# a ready queue which initially consists of nodes with no ancestors,
# and adding more nodes to the ready queue after all their ancestors
# have been processed. All the while, the relevant details of all nodes
# "seen" by the search so far are maintained in a SeenDetails record,
# including the ancestor set which tests it is "awaiting" (i.e. tests of
# its ancestors). The processing step adds test edges when every dependency
# of an awaited test is an ancestor of a node that is being processed.
# Downstream nodes are then exempted from awaiting the test.
#
# Memory consumption is potentially O(n^2) with n the number of nodes in
# the graph, since the average number of ancestors and tests being awaited
# for each of the n nodes could itself be O(n) but we only track ancestors
# that are multi-tested, which should keep things closer to O(n) in real-
# world scenarios.

new_edges: List[Tuple[UniqueID, UniqueID]] = []
ready: deque = deque(source_nodes)
details = {node_id: SeenDetails(node_id) for node_id in source_nodes}

Check warning on line 393 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L391-L393

Added lines #L391 - L393 were not covered by tests

while len(ready) > 0:
curr_details: SeenDetails = details[ready.pop()]
test_ids = _get_tests_for_node(manifest, curr_details.node_id)
new_awaits_for_succs = curr_details.awaits_tests.copy()
for test_id in test_ids:
deps: List[UniqueID] = sorted(manifest.nodes[test_id].depends_on_nodes)
if len(deps) > 1:

Check warning on line 401 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L395-L401

Added lines #L395 - L401 were not covered by tests
# Tests with only one dep were already handled.
new_awaits_for_succs.add((test_id, tuple(deps)))

Check warning on line 403 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L403

Added line #L403 was not covered by tests

for succ_id in [

Check warning on line 405 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L405

Added line #L405 was not covered by tests
s for s in graph.successors(curr_details.node_id) if s in executable_nodes
]:
suc_details = details.get(succ_id, None)
if suc_details is None:
suc_details = SeenDetails(succ_id)
details[succ_id] = suc_details
suc_details.visits += 1
suc_details.awaits_tests.update(new_awaits_for_succs)
suc_details.ancestors.update(curr_details.ancestors)
if curr_details.node_id in multi_tested_nodes:

Check warning on line 415 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L408-L415

Added lines #L408 - L415 were not covered by tests
# Only track ancestry information for the set of nodes
# we will actually check against later.
suc_details.ancestors.add(curr_details.node_id)

Check warning on line 418 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L418

Added line #L418 was not covered by tests

if suc_details.visits == graph.in_degree(succ_id):
if len(suc_details.awaits_tests) > 0:
removes = set()
for awt in suc_details.awaits_tests:
if not any(True for a in awt[1] if a not in suc_details.ancestors):
removes.add(awt)
new_edges.append((awt[0], succ_id))

Check warning on line 426 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L420-L426

Added lines #L420 - L426 were not covered by tests

suc_details.awaits_tests.difference_update(removes)
ready.appendleft(succ_id)

Check warning on line 429 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L428-L429

Added lines #L428 - L429 were not covered by tests

# We are now done with the current node and all of its ancestors.
# Discard its details to save memory.
del details[curr_details.node_id]

Check warning on line 433 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L433

Added line #L433 was not covered by tests

return new_edges

Check warning on line 435 in core/dbt/compilation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/compilation.py#L435

Added line #L435 was not covered by tests

def get_graph(self, manifest: Manifest) -> Graph:
self.link_graph(manifest)
return Graph(self.graph)
Expand Down
Loading