diff --git a/core/dbt/artifacts/schemas/results.py b/core/dbt/artifacts/schemas/results.py index dd455f309b8..9211d713310 100644 --- a/core/dbt/artifacts/schemas/results.py +++ b/core/dbt/artifacts/schemas/results.py @@ -64,6 +64,7 @@ class NodeStatus(StrEnum): PartialSuccess = "partial success" Pass = "pass" RuntimeErr = "runtime error" + NoOp = "no-op" class RunStatus(StrEnum): @@ -71,6 +72,7 @@ class RunStatus(StrEnum): Error = NodeStatus.Error Skipped = NodeStatus.Skipped PartialSuccess = NodeStatus.PartialSuccess + NoOp = NodeStatus.NoOp class TestStatus(StrEnum): diff --git a/core/dbt/contracts/graph/nodes.py b/core/dbt/contracts/graph/nodes.py index 8fc39c7621b..9e72c107846 100644 --- a/core/dbt/contracts/graph/nodes.py +++ b/core/dbt/contracts/graph/nodes.py @@ -1378,7 +1378,7 @@ def group(self): @dataclass -class Exposure(GraphNode, ExposureResource): +class Exposure(NodeInfoMixin, GraphNode, ExposureResource): @property def depends_on_nodes(self): return self.depends_on.nodes diff --git a/core/dbt/events/types.py b/core/dbt/events/types.py index 85d2df355ae..c3bf53ff977 100644 --- a/core/dbt/events/types.py +++ b/core/dbt/events/types.py @@ -1892,7 +1892,9 @@ def code(self) -> str: return "Z023" def message(self) -> str: - stats_line = "Done. PASS={pass} WARN={warn} ERROR={error} SKIP={skip} TOTAL={total}" + stats_line = ( + "Done. PASS={pass} WARN={warn} ERROR={error} SKIP={skip} NO-OP={noop} TOTAL={total}" + ) return stats_line.format(**self.stats) diff --git a/core/dbt/graph/selector.py b/core/dbt/graph/selector.py index bf586851fb8..b9786b4ddeb 100644 --- a/core/dbt/graph/selector.py +++ b/core/dbt/graph/selector.py @@ -178,10 +178,12 @@ def _is_graph_member(self, unique_id: UniqueId) -> bool: elif unique_id in self.manifest.saved_queries: saved_query = self.manifest.saved_queries[unique_id] return saved_query.config.enabled - - node = self.manifest.nodes[unique_id] - - return node.config.enabled + elif unique_id in self.manifest.exposures: + exposure = self.manifest.exposures[unique_id] + return exposure.config.enabled + else: + node = self.manifest.nodes[unique_id] + return node.config.enabled def _is_empty_node(self, unique_id: UniqueId) -> bool: if unique_id in self.manifest.nodes: diff --git a/core/dbt/runner/__init__.py b/core/dbt/runner/__init__.py new file mode 100644 index 00000000000..78b38ac55ac --- /dev/null +++ b/core/dbt/runner/__init__.py @@ -0,0 +1,2 @@ +from .exposure_runner import ExposureRunner +from .saved_query_runner import SavedQueryRunner diff --git a/core/dbt/runner/exposure_runner.py b/core/dbt/runner/exposure_runner.py new file mode 100644 index 00000000000..8afa53f659b --- /dev/null +++ b/core/dbt/runner/exposure_runner.py @@ -0,0 +1,7 @@ +from dbt.runner.no_op_runner import NoOpRunner + + +class ExposureRunner(NoOpRunner): + @property + def description(self) -> str: + return f"exposure {self.node.name}" diff --git a/core/dbt/runner/no_op_runner.py b/core/dbt/runner/no_op_runner.py new file mode 100644 index 00000000000..2789c1fa9a6 --- /dev/null +++ b/core/dbt/runner/no_op_runner.py @@ -0,0 +1,45 @@ +import threading + +from dbt.artifacts.schemas.results import RunStatus +from dbt.artifacts.schemas.run import RunResult +from dbt.contracts.graph.manifest import Manifest +from dbt.events.types import LogNodeNoOpResult +from dbt.task.base import BaseRunner +from dbt_common.events.functions import fire_event + + +class NoOpRunner(BaseRunner): + @property + def description(self) -> str: + raise NotImplementedError("description not implemented") + + def before_execute(self) -> None: + pass + + def compile(self, manifest: Manifest): + return self.node + + def after_execute(self, result) -> None: + fire_event( + LogNodeNoOpResult( + description=self.description, + index=self.node_index, + total=self.num_nodes, + node_info=self.node.node_info, + ) + ) + + def execute(self, compiled_node, manifest): + # no-op + return RunResult( + node=compiled_node, + status=RunStatus.NoOp, + timing=[], + thread_id=threading.current_thread().name, + execution_time=0, + message="NO-OP", + adapter_response={}, + failures=0, + batch_results=None, + agate_table=None, + ) diff --git a/core/dbt/runner/saved_query_runner.py b/core/dbt/runner/saved_query_runner.py new file mode 100644 index 00000000000..76cfc774f19 --- /dev/null +++ b/core/dbt/runner/saved_query_runner.py @@ -0,0 +1,7 @@ +from dbt.runner.no_op_runner import NoOpRunner + + +class SavedQueryRunner(NoOpRunner): + @property + def description(self) -> str: + return f"saved query {self.node.name}" diff --git a/core/dbt/task/build.py b/core/dbt/task/build.py index ff68d976744..2f00716cae9 100644 --- a/core/dbt/task/build.py +++ b/core/dbt/task/build.py @@ -1,18 +1,17 @@ -import threading from typing import Dict, List, Optional, Set, Type -from dbt.artifacts.schemas.results import NodeStatus, RunStatus +from dbt.artifacts.schemas.results import NodeStatus from dbt.artifacts.schemas.run import RunResult from dbt.cli.flags import Flags from dbt.config.runtime import RuntimeConfig from dbt.contracts.graph.manifest import Manifest -from dbt.events.types import LogNodeNoOpResult from dbt.exceptions import DbtInternalError from dbt.graph import Graph, GraphQueue, ResourceTypeSelector from dbt.node_types import NodeType +from dbt.runner import ExposureRunner as exposure_runner +from dbt.runner import SavedQueryRunner as saved_query_runner from dbt.task.base import BaseRunner, resource_types_from_args from dbt.task.run import MicrobatchModelRunner -from dbt_common.events.functions import fire_event from .run import ModelRunner as run_model_runner from .run import RunTask @@ -21,44 +20,6 @@ from .test import TestRunner as test_runner -class SavedQueryRunner(BaseRunner): - # Stub. No-op Runner for Saved Queries, which require MetricFlow for execution. - @property - def description(self) -> str: - return f"saved query {self.node.name}" - - def before_execute(self) -> None: - pass - - def compile(self, manifest: Manifest): - return self.node - - def after_execute(self, result) -> None: - fire_event( - LogNodeNoOpResult( - description=self.description, - index=self.node_index, - total=self.num_nodes, - node_info=self.node.node_info, - ) - ) - - def execute(self, compiled_node, manifest): - # no-op - return RunResult( - node=compiled_node, - status=RunStatus.Success, - timing=[], - thread_id=threading.current_thread().name, - execution_time=0, - message="NO-OP", - adapter_response={}, - failures=0, - batch_results=None, - agate_table=None, - ) - - class BuildTask(RunTask): """The Build task processes all assets of a given process and attempts to 'build' them in an opinionated fashion. Every resource type outlined in @@ -80,7 +41,8 @@ class BuildTask(RunTask): NodeType.Seed: seed_runner, NodeType.Test: test_runner, NodeType.Unit: test_runner, - NodeType.SavedQuery: SavedQueryRunner, + NodeType.SavedQuery: saved_query_runner, + NodeType.Exposure: exposure_runner, } ALL_RESOURCE_VALUES = frozenset({x for x in RUNNER_MAP.keys()}) diff --git a/core/dbt/task/printer.py b/core/dbt/task/printer.py index 58a39552450..01ccd75a586 100644 --- a/core/dbt/task/printer.py +++ b/core/dbt/task/printer.py @@ -48,6 +48,8 @@ def interpret_run_result(result) -> str: return "warn" elif result.status in (NodeStatus.Pass, NodeStatus.Success): return "pass" + elif result.status == NodeStatus.NoOp: + return "noop" else: raise RuntimeError(f"unhandled result {result}") @@ -58,6 +60,7 @@ def print_run_status_line(results) -> None: "skip": 0, "pass": 0, "warn": 0, + "noop": 0, "total": 0, } diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index b889558d048..2cf8a9d0740 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -184,6 +184,8 @@ def _runtime_initialize(self): self._flattened_nodes.append(self.manifest.saved_queries[uid]) elif uid in self.manifest.unit_tests: self._flattened_nodes.append(self.manifest.unit_tests[uid]) + elif uid in self.manifest.exposures: + self._flattened_nodes.append(self.manifest.exposures[uid]) else: raise DbtInternalError( f"Node selection returned {uid}, expected a node, a source, or a unit test" diff --git a/tests/unit/task/test_build.py b/tests/unit/task/test_build.py index 87d7b081ae8..db2bb73c0fe 100644 --- a/tests/unit/task/test_build.py +++ b/tests/unit/task/test_build.py @@ -1,5 +1,5 @@ from dbt.contracts.graph.nodes import SavedQuery -from dbt.task.build import SavedQueryRunner +from dbt.runner import SavedQueryRunner def test_saved_query_runner_on_skip(saved_query: SavedQuery):