From 7c13c39ee807439b7576ec53b8833ce915946aad Mon Sep 17 00:00:00 2001 From: Peter Allen Webb Date: Tue, 3 Dec 2024 17:40:07 -0500 Subject: [PATCH 1/4] New function to add graph edges. --- core/dbt/compilation.py | 176 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 172 insertions(+), 4 deletions(-) diff --git a/core/dbt/compilation.py b/core/dbt/compilation.py index 0ffa73df715..b1c4c1bc9c8 100644 --- a/core/dbt/compilation.py +++ b/core/dbt/compilation.py @@ -1,8 +1,9 @@ +import dataclasses import json import os import pickle -from collections import defaultdict -from typing import Any, Dict, List, Optional, Tuple +from collections import defaultdict, deque +from typing import Any, Dict, Iterable, List, Optional, Set, Tuple import networkx as nx # type: ignore import sqlparse @@ -117,6 +118,14 @@ def _get_tests_for_node(manifest: Manifest, unique_id: UniqueID) -> List[UniqueI return tests +@dataclasses.dataclass +class SeenDetails: + node_id: UniqueID + visits: int + ancestors: Set[UniqueID] + awaits_tests: Set[Tuple[UniqueID, Tuple[UniqueID, ...]]] + + class Linker: def __init__(self, data=None) -> None: if data is None: @@ -200,14 +209,51 @@ def add_test_edges(self, manifest: Manifest) -> None: the set of nodes the test depends on is a subset of the upstream nodes for the given node.""" - # Given a graph: + # HISTORICAL NOTE: To understand the motivation behind this function, + # consider a node A with tests and a node B which depends (either directly + # or indirectly) on A. It would be nice if B were not executed until + # all of the tests on A are finished. After all, we don't want to + # propagate bad data. We can enforce that behavior by adding new + # dependencies (edges) from tests to nodes that should wait on them. + # + # This function implements a rough approximation of the behavior just + # described. In fact, for tests that only depend on a single node, it + # always works. + # + # Things get trickier for tests that depend on multiple nodes. In that + # case, if we are not careful, we will introduce cycles. That seems to + # be the reason this function adds dependencies from a downstream node to + # an upstream test if and only if the downstream node is already a + # descendant of all the nodes the upstream test depends on. By following + # that rule, it never makes the node dependent on new upstream nodes other + # than the test itself, and no cycles will be created. + # + # One drawback (Drawback 1) of the approach taken in this function is + # that it could still allow a downstream node to proceed before all + # testing is done on its ancestors, if it happens to have ancestors that + # are not also ancestors of a test with multiple dependencies. + # + # Another drawback (Drawback 2) is that the approach below adds far more + # edges than are strictly needed. After all, if we have A -> B -> C, + # there is no need to add a new edge A -> C. But this function often does. + # + # Drawback 2 is resolved in the new add_test_edges_2() implementation + # below, which is also typically much faster. Drawback 1 has been left in + # place in order to conservatively retain existing behavior, and so that + # the new implementation can be verified against this existing + # implementation by ensuring both resulting graphs have the same transitive + # reduction. + + # MOTIVATING IDEA: Given a graph... + # # model1 --> model2 --> model3 # | | # | \/ # \/ test 2 # test1 # - # Produce the following graph: + # ...produce the following... + # # model1 --> model2 --> model3 # | /\ | /\ /\ # | | \/ | | @@ -247,6 +293,128 @@ def add_test_edges(self, manifest: Manifest) -> None: if test_depends_on.issubset(upstream_nodes): self.graph.add_edge(upstream_test, node_id, edge_type="parent_test") + def add_test_edges_2(self, manifest: Manifest): + graph = self.graph + new_edges = self._add_test_edges_2(graph, manifest) + for e in new_edges: + graph.add_edge(e[0], e[1], edge_type="parent_test") + + @staticmethod + def _add_test_edges_2( + graph: nx.DiGraph, manifest: Manifest + ) -> Iterable[Tuple[UniqueID, UniqueID]]: + # This function enforces the same execution behavior as add_test_eges, + # but executes more quickly in almost all cases, and adds far fewer + # edges. See the HISTORICAL NOTE above. + # + # The idea is to first scan for "single-tested" nodes (which have tests + # that depend only upon on that node) and "multi-tested" nodes (which + # have tests that also depend on other nodes). Single-tested nodes + # are handled quickly and easily. + # + # The less common but more complex case of multi-tested nodes is handled + # by sweeping through the graph in a breadth-first style, processing nodes + # from a ready queue which initially consists of nodes with no ancestors, + # and adding more nodes to the ready queue after all their ancestors + # have been processed. All the while, the relevant details of all known + # ancestors each "seen" node has, and which tests it is awaiting, are + # maintained in a SeenDetails record. The processing step adds test edges + # when every dependency of an awaited test is an ancestor of a node that + # is being processed. Downstream nodes are then exempted from awaiting + # that same test. + + new_edges: List[Tuple[UniqueID, UniqueID]] = [] # The list of new edges to be returned + details: Dict[UniqueID, SeenDetails] = ( + {} + ) # Details of all the nodes found in the breadth first search + ready_set: deque = deque() # Queue of nodes ready for processing + exec_nodes = ( + set() + ) # Set of nodes that are "executable" (i.e. in the manifest and not tests) + multi_tested_nodes = ( + set() + ) # The set of nodes which have tests dependent on more than just themselves + singles: dict[UniqueID, List[UniqueID]] = ( + {} + ) # A dictionary mapping nodes to all single tests depending on them + for node_id in graph.nodes: + manifest_node = manifest.nodes.get(node_id, None) + if manifest_node is not None: + if manifest_node.resource_type != NodeType.Test: + exec_nodes.add(node_id) + else: + test_deps = manifest_node.depends_on_nodes + if len(test_deps) == 1: + if test_deps[0] not in singles: + singles[test_deps[0]] = [] + singles[test_deps[0]].append(node_id) + elif len(test_deps) > 1: + multi_tested_nodes.update(manifest_node.depends_on_nodes) + + if graph.in_degree(node_id) == 0: + details[node_id] = SeenDetails( + node_id=node_id, visits=0, ancestors=set(), awaits_tests=set() + ) + ready_set.appendleft(node_id) + + for node_id, test_ids in singles.items(): + succs = [s for s in graph.successors(node_id) if s in exec_nodes] + for succ_id in succs: + for test_id in test_ids: + new_edges.append((test_id, succ_id)) + + # If there are no multi-tested nodes, we can skip the slower processing + # below. + if len(multi_tested_nodes) == 0: + return new_edges + + # Perform the BFS-style processing of the graph. + while not len(ready_set) == 0: + curr_details: SeenDetails = details[ready_set.pop()] + test_ids = _get_tests_for_node(manifest, curr_details.node_id) + succs = graph.successors(curr_details.node_id) + + new_awaits_for_succs = curr_details.awaits_tests.copy() + for test_id in test_ids: + deps: List[UniqueID] = sorted(manifest.nodes[test_id].depends_on_nodes) + if len(deps) > 1: + new_awaits_for_succs.add((test_id, tuple(deps))) + + for succ_id in succs: + if succ_id not in exec_nodes: + continue + suc_details = details.get(succ_id, None) + if suc_details is None: + suc_details = SeenDetails( + node_id=succ_id, visits=0, ancestors=set(), awaits_tests=set() + ) + details[succ_id] = suc_details + suc_details.visits += 1 + suc_details.ancestors.update(curr_details.ancestors) + if curr_details.node_id in multi_tested_nodes: + suc_details.ancestors.add(curr_details.node_id) + suc_details.awaits_tests.update(new_awaits_for_succs) + + if suc_details.visits == graph.in_degree(succ_id): + removes = set() + if len(suc_details.awaits_tests) > 0: + p = set(graph.predecessors(succ_id)) + for awt in suc_details.awaits_tests: + if not any( + a for a in awt[1] if a not in p and a not in suc_details.ancestors + ): + removes.add(awt) + new_edges.append((awt[0], succ_id)) + + suc_details.awaits_tests.difference_update(removes) + ready_set.appendleft(succ_id) + + # We are now done with the current node and all of its ancestors. + # Discard its details to save memory. + del details[curr_details.node_id] + + return new_edges + def get_graph(self, manifest: Manifest) -> Graph: self.link_graph(manifest) return Graph(self.graph) From 333294fbdd15d7e188dce284fbd10c3b1b5e703b Mon Sep 17 00:00:00 2001 From: Peter Allen Webb Date: Wed, 4 Dec 2024 15:10:34 -0500 Subject: [PATCH 2/4] Clean up, leave out flag temporarily for testing. --- .../unreleased/Fixes-20241204-100429.yaml | 6 + core/dbt/cli/main.py | 1 + core/dbt/cli/params.py | 7 + core/dbt/compilation.py | 162 ++++++++++-------- 4 files changed, 105 insertions(+), 71 deletions(-) create mode 100644 .changes/unreleased/Fixes-20241204-100429.yaml diff --git a/.changes/unreleased/Fixes-20241204-100429.yaml b/.changes/unreleased/Fixes-20241204-100429.yaml new file mode 100644 index 00000000000..378444e769d --- /dev/null +++ b/.changes/unreleased/Fixes-20241204-100429.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: Improve the performance characteristics of add_test_edges() +time: 2024-12-04T10:04:29.096231-05:00 +custom: + Author: peterallenwebb + Issue: "10950" diff --git a/core/dbt/cli/main.py b/core/dbt/cli/main.py index a9de9441365..11cc81ef70e 100644 --- a/core/dbt/cli/main.py +++ b/core/dbt/cli/main.py @@ -140,6 +140,7 @@ def global_flags(func): @p.warn_error @p.warn_error_options @p.write_json + @p.use_fast_test_edges @functools.wraps(func) def wrapper(*args, **kwargs): return func(*args, **kwargs) diff --git a/core/dbt/cli/params.py b/core/dbt/cli/params.py index 96e9e7acd7a..612728de222 100644 --- a/core/dbt/cli/params.py +++ b/core/dbt/cli/params.py @@ -735,3 +735,10 @@ def _version_callback(ctx, _param, value): envvar="DBT_SHOW_RESOURCE_REPORT", hidden=True, ) + +use_fast_test_edges = click.option( + "--use-fast-test-edges/--no-use-fast-test-edges", + envvar="DBT_USE_FAST_TEST_EDGES", + default=False, + hidden=True, +) diff --git a/core/dbt/compilation.py b/core/dbt/compilation.py index b1c4c1bc9c8..c3eb555b76e 100644 --- a/core/dbt/compilation.py +++ b/core/dbt/compilation.py @@ -121,9 +121,11 @@ def _get_tests_for_node(manifest: Manifest, unique_id: UniqueID) -> List[UniqueI @dataclasses.dataclass class SeenDetails: node_id: UniqueID - visits: int - ancestors: Set[UniqueID] - awaits_tests: Set[Tuple[UniqueID, Tuple[UniqueID, ...]]] + visits: int = 0 + ancestors: Set[UniqueID] = dataclasses.field(default_factory=set) + awaits_tests: Set[Tuple[UniqueID, Tuple[UniqueID, ...]]] = dataclasses.field( + default_factory=set + ) class Linker: @@ -204,6 +206,13 @@ def link_graph(self, manifest: Manifest): raise RuntimeError("Found a cycle: {}".format(cycle)) def add_test_edges(self, manifest: Manifest) -> None: + # if not get_flags().USE_FAST_TEST_EDGES: + # self.add_test_edges_1(manifest) + # else: + # self.add_test_edges_2(manifest) + self.add_test_edges_2(manifest) + + def add_test_edges_1(self, manifest: Manifest) -> None: """This method adds additional edges to the DAG. For a given non-test executable node, add an edge from an upstream test to the given node if the set of nodes the test depends on is a subset of the upstream nodes @@ -295,15 +304,15 @@ def add_test_edges(self, manifest: Manifest) -> None: def add_test_edges_2(self, manifest: Manifest): graph = self.graph - new_edges = self._add_test_edges_2(graph, manifest) + new_edges = self._get_test_edges_2(graph, manifest) for e in new_edges: graph.add_edge(e[0], e[1], edge_type="parent_test") @staticmethod - def _add_test_edges_2( + def _get_test_edges_2( graph: nx.DiGraph, manifest: Manifest ) -> Iterable[Tuple[UniqueID, UniqueID]]: - # This function enforces the same execution behavior as add_test_eges, + # This function enforces the same execution behavior as add_test_edges, # but executes more quickly in almost all cases, and adds far fewer # edges. See the HISTORICAL NOTE above. # @@ -313,101 +322,112 @@ def _add_test_edges_2( # are handled quickly and easily. # # The less common but more complex case of multi-tested nodes is handled - # by sweeping through the graph in a breadth-first style, processing nodes - # from a ready queue which initially consists of nodes with no ancestors, - # and adding more nodes to the ready queue after all their ancestors - # have been processed. All the while, the relevant details of all known - # ancestors each "seen" node has, and which tests it is awaiting, are - # maintained in a SeenDetails record. The processing step adds test edges - # when every dependency of an awaited test is an ancestor of a node that - # is being processed. Downstream nodes are then exempted from awaiting - # that same test. - - new_edges: List[Tuple[UniqueID, UniqueID]] = [] # The list of new edges to be returned - details: Dict[UniqueID, SeenDetails] = ( - {} - ) # Details of all the nodes found in the breadth first search - ready_set: deque = deque() # Queue of nodes ready for processing - exec_nodes = ( - set() - ) # Set of nodes that are "executable" (i.e. in the manifest and not tests) - multi_tested_nodes = ( - set() - ) # The set of nodes which have tests dependent on more than just themselves - singles: dict[UniqueID, List[UniqueID]] = ( - {} - ) # A dictionary mapping nodes to all single tests depending on them + # by a specialized function. + + new_edges: List[Tuple[UniqueID, UniqueID]] = [] + + source_nodes: List[UniqueID] = [] + executable_nodes: Set[UniqueID] = set() + multi_tested_nodes = set() + # Dictionary mapping nodes with single-dep tests to a list of those tests. + single_tested_nodes: dict[UniqueID, List[UniqueID]] = defaultdict(list) for node_id in graph.nodes: manifest_node = manifest.nodes.get(node_id, None) - if manifest_node is not None: - if manifest_node.resource_type != NodeType.Test: - exec_nodes.add(node_id) - else: - test_deps = manifest_node.depends_on_nodes - if len(test_deps) == 1: - if test_deps[0] not in singles: - singles[test_deps[0]] = [] - singles[test_deps[0]].append(node_id) - elif len(test_deps) > 1: - multi_tested_nodes.update(manifest_node.depends_on_nodes) - - if graph.in_degree(node_id) == 0: - details[node_id] = SeenDetails( - node_id=node_id, visits=0, ancestors=set(), awaits_tests=set() - ) - ready_set.appendleft(node_id) + if manifest_node is None: + continue + + if next(graph.predecessors(node_id), None) is None: + source_nodes.append(node_id) - for node_id, test_ids in singles.items(): - succs = [s for s in graph.successors(node_id) if s in exec_nodes] + if manifest_node.resource_type != NodeType.Test: + executable_nodes.add(node_id) + else: + test_deps = manifest_node.depends_on_nodes + if len(test_deps) == 1: + single_tested_nodes[test_deps[0]].append(node_id) + elif len(test_deps) > 1: + multi_tested_nodes.update(manifest_node.depends_on_nodes) + + # Now that we have all the necessary information conveniently organized, + # add new edges for single-tested nodes. + for node_id, test_ids in single_tested_nodes.items(): + succs = [s for s in graph.successors(node_id) if s in executable_nodes] for succ_id in succs: for test_id in test_ids: new_edges.append((test_id, succ_id)) - # If there are no multi-tested nodes, we can skip the slower processing - # below. - if len(multi_tested_nodes) == 0: - return new_edges + # Get the edges for multi-tested nodes separately, if needed. + if len(multi_tested_nodes) > 0: + multi_test_edges = Linker._get_multi_test_edges( + graph, manifest, source_nodes, executable_nodes, multi_tested_nodes + ) + new_edges += multi_test_edges - # Perform the BFS-style processing of the graph. - while not len(ready_set) == 0: - curr_details: SeenDetails = details[ready_set.pop()] - test_ids = _get_tests_for_node(manifest, curr_details.node_id) - succs = graph.successors(curr_details.node_id) + return new_edges + @staticmethod + def _get_multi_test_edges( + graph: nx.DiGraph, + manifest: Manifest, + source_nodes: Iterable[UniqueID], + executable_nodes: Set[UniqueID], + multi_tested_nodes, + ) -> List[Tuple[UniqueID, UniqueID]]: + # Works through the graph in a breadth-first style, processing nodes from + # a ready queue which initially consists of nodes with no ancestors, + # and adding more nodes to the ready queue after all their ancestors + # have been processed. All the while, the relevant details of all known + # the "seen" (i.e. encountered) nodes are maintained in a SeenDetails + # record, including ancestor set which tests it is "awaiting" (i.e. which + # are tests of its ancestors), maintained in a SeenDetails record. The + # processing step adds test edges when every dependency of an awaited + # test is an ancestor of a node that is being processed. Downstream nodes + # are then exempted from awaiting that same test. + # + # Memory consumption is potentially O(n^2) with n the number of nodes in + # the graph, since the average number of ancestors for each of n nodes + # could be O(n) but we only track ancestors that are multi-tested, which + # should keep things closer to O(n) in real-world scenarios. + + new_edges: List[Tuple[UniqueID, UniqueID]] = [] + ready: deque = deque(source_nodes) + details = {node_id: SeenDetails(node_id) for node_id in source_nodes} + + while len(ready) > 0: + curr_details: SeenDetails = details[ready.pop()] + test_ids = _get_tests_for_node(manifest, curr_details.node_id) new_awaits_for_succs = curr_details.awaits_tests.copy() for test_id in test_ids: deps: List[UniqueID] = sorted(manifest.nodes[test_id].depends_on_nodes) if len(deps) > 1: + # Tests with only one dep were already handled. new_awaits_for_succs.add((test_id, tuple(deps))) - for succ_id in succs: - if succ_id not in exec_nodes: - continue + for succ_id in [ + s for s in graph.successors(curr_details.node_id) if s in executable_nodes + ]: suc_details = details.get(succ_id, None) if suc_details is None: - suc_details = SeenDetails( - node_id=succ_id, visits=0, ancestors=set(), awaits_tests=set() - ) + suc_details = SeenDetails(succ_id) details[succ_id] = suc_details suc_details.visits += 1 + suc_details.awaits_tests.update(new_awaits_for_succs) suc_details.ancestors.update(curr_details.ancestors) if curr_details.node_id in multi_tested_nodes: + # Only track ancestry information for the set of nodes + # we will actually check against later. suc_details.ancestors.add(curr_details.node_id) - suc_details.awaits_tests.update(new_awaits_for_succs) if suc_details.visits == graph.in_degree(succ_id): - removes = set() if len(suc_details.awaits_tests) > 0: - p = set(graph.predecessors(succ_id)) + removes = set() for awt in suc_details.awaits_tests: - if not any( - a for a in awt[1] if a not in p and a not in suc_details.ancestors - ): + if not any(True for a in awt[1] if a not in suc_details.ancestors): removes.add(awt) new_edges.append((awt[0], succ_id)) suc_details.awaits_tests.difference_update(removes) - ready_set.appendleft(succ_id) + ready.appendleft(succ_id) # We are now done with the current node and all of its ancestors. # Discard its details to save memory. From 755771205d702dd9e43b7aa69dd0b69c9a61cb33 Mon Sep 17 00:00:00 2001 From: Peter Allen Webb Date: Wed, 4 Dec 2024 15:34:12 -0500 Subject: [PATCH 3/4] Put new test edge behavior behind flag. --- core/dbt/compilation.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/core/dbt/compilation.py b/core/dbt/compilation.py index c3eb555b76e..7c488935f0a 100644 --- a/core/dbt/compilation.py +++ b/core/dbt/compilation.py @@ -206,11 +206,10 @@ def link_graph(self, manifest: Manifest): raise RuntimeError("Found a cycle: {}".format(cycle)) def add_test_edges(self, manifest: Manifest) -> None: - # if not get_flags().USE_FAST_TEST_EDGES: - # self.add_test_edges_1(manifest) - # else: - # self.add_test_edges_2(manifest) - self.add_test_edges_2(manifest) + if not get_flags().USE_FAST_TEST_EDGES: + self.add_test_edges_1(manifest) + else: + self.add_test_edges_2(manifest) def add_test_edges_1(self, manifest: Manifest) -> None: """This method adds additional edges to the DAG. For a given non-test From 234956d72d30f2026ad43b796ca6bb004f6daca7 Mon Sep 17 00:00:00 2001 From: Peter Allen Webb Date: Wed, 4 Dec 2024 17:18:33 -0500 Subject: [PATCH 4/4] Final draft of documentaiton. --- core/dbt/compilation.py | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/core/dbt/compilation.py b/core/dbt/compilation.py index 7c488935f0a..81ab849c8d1 100644 --- a/core/dbt/compilation.py +++ b/core/dbt/compilation.py @@ -234,7 +234,7 @@ def add_test_edges_1(self, manifest: Manifest) -> None: # an upstream test if and only if the downstream node is already a # descendant of all the nodes the upstream test depends on. By following # that rule, it never makes the node dependent on new upstream nodes other - # than the test itself, and no cycles will be created. + # than the tests themselves, and no cycles will be created. # # One drawback (Drawback 1) of the approach taken in this function is # that it could still allow a downstream node to proceed before all @@ -312,13 +312,13 @@ def _get_test_edges_2( graph: nx.DiGraph, manifest: Manifest ) -> Iterable[Tuple[UniqueID, UniqueID]]: # This function enforces the same execution behavior as add_test_edges, - # but executes more quickly in almost all cases, and adds far fewer - # edges. See the HISTORICAL NOTE above. + # but executes far more quickly and adds far fewer edges. See the + # HISTORICAL NOTE above. # # The idea is to first scan for "single-tested" nodes (which have tests # that depend only upon on that node) and "multi-tested" nodes (which - # have tests that also depend on other nodes). Single-tested nodes - # are handled quickly and easily. + # have tests that depend on multiple nodes). Single-tested nodes are + # handled quickly and easily. # # The less common but more complex case of multi-tested nodes is handled # by a specialized function. @@ -375,18 +375,18 @@ def _get_multi_test_edges( # Works through the graph in a breadth-first style, processing nodes from # a ready queue which initially consists of nodes with no ancestors, # and adding more nodes to the ready queue after all their ancestors - # have been processed. All the while, the relevant details of all known - # the "seen" (i.e. encountered) nodes are maintained in a SeenDetails - # record, including ancestor set which tests it is "awaiting" (i.e. which - # are tests of its ancestors), maintained in a SeenDetails record. The - # processing step adds test edges when every dependency of an awaited - # test is an ancestor of a node that is being processed. Downstream nodes - # are then exempted from awaiting that same test. + # have been processed. All the while, the relevant details of all nodes + # "seen" by the search so far are maintained in a SeenDetails record, + # including the ancestor set which tests it is "awaiting" (i.e. tests of + # its ancestors). The processing step adds test edges when every dependency + # of an awaited test is an ancestor of a node that is being processed. + # Downstream nodes are then exempted from awaiting the test. # # Memory consumption is potentially O(n^2) with n the number of nodes in - # the graph, since the average number of ancestors for each of n nodes - # could be O(n) but we only track ancestors that are multi-tested, which - # should keep things closer to O(n) in real-world scenarios. + # the graph, since the average number of ancestors and tests being awaited + # for each of the n nodes could itself be O(n) but we only track ancestors + # that are multi-tested, which should keep things closer to O(n) in real- + # world scenarios. new_edges: List[Tuple[UniqueID, UniqueID]] = [] ready: deque = deque(source_nodes)