Skip to content

Commit

Permalink
Merge branch 'ADAP-9656_raise_an_error_when_check_cols_has_duplicated…
Browse files Browse the repository at this point in the history
…_values' of https://github.com/ariosramirez/dbt-core into ADAP-9656_raise_an_error_when_check_cols_has_duplicated_values
  • Loading branch information
ariosramirez committed Feb 28, 2024
2 parents 67d5cb4 + bad0a96 commit 7bd6cb5
Show file tree
Hide file tree
Showing 13 changed files with 185 additions and 50 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Dependencies-20240227-151115.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Dependencies
body: bump dbt-common to accept major version 1
time: 2024-02-27T15:11:15.583604-05:00
custom:
Author: michelleark
PR: "9690"
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20240226-184258.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Use Manifest instead of WritableManifest in PreviousState and _get_deferred_manifest
time: 2024-02-26T18:42:58.740808-05:00
custom:
Author: michelleark
Issue: "9567"
14 changes: 14 additions & 0 deletions core/dbt/artifacts/resources/v1/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,17 @@ class CompiledResource(ParsedResource):
extra_ctes: List[InjectedCTE] = field(default_factory=list)
_pre_injected_sql: Optional[str] = None
contract: Contract = field(default_factory=Contract)

def __post_serialize__(self, dct):
dct = super().__post_serialize__(dct)
if "_pre_injected_sql" in dct:
del dct["_pre_injected_sql"]
# Remove compiled attributes
if "compiled" in dct and dct["compiled"] is False:
del dct["compiled"]
del dct["extra_ctes_injected"]
del dct["extra_ctes"]
# "omit_none" means these might not be in the dictionary
if "compiled_code" in dct:
del dct["compiled_code"]
return dct
39 changes: 30 additions & 9 deletions core/dbt/artifacts/schemas/manifest/v12/manifest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass, field
from typing import Mapping, Iterable, Tuple, Optional, Dict, List, Any
from typing import Mapping, Iterable, Tuple, Optional, Dict, List, Any, Union
from uuid import UUID

from dbt.artifacts.schemas.base import (
Expand All @@ -19,17 +19,38 @@
SemanticModel,
SourceDefinition,
UnitTestDefinition,
)

# TODO: remove usage of dbt modules other than dbt.artifacts
from dbt.contracts.graph.nodes import (
GraphMemberNode,
ManifestNode,
Seed,
Analysis,
SingularTest,
HookNode,
Model,
SqlOperation,
GenericTest,
Snapshot,
)


NodeEdgeMap = Dict[str, List[str]]
UniqueID = str
ManifestResource = Union[
Seed,
Analysis,
SingularTest,
HookNode,
Model,
SqlOperation,
GenericTest,
Snapshot,
]
DisabledManifestResource = Union[
ManifestResource,
SourceDefinition,
Exposure,
Metric,
SavedQuery,
SemanticModel,
UnitTestDefinition,
]


@dataclass
Expand Down Expand Up @@ -78,7 +99,7 @@ def default(cls):
@dataclass
@schema_version("manifest", 12)
class WritableManifest(ArtifactMixin):
nodes: Mapping[UniqueID, ManifestNode] = field(
nodes: Mapping[UniqueID, ManifestResource] = field(
metadata=dict(description=("The nodes defined in the dbt project and its dependencies"))
)
sources: Mapping[UniqueID, SourceDefinition] = field(
Expand All @@ -104,7 +125,7 @@ class WritableManifest(ArtifactMixin):
selectors: Mapping[UniqueID, Any] = field(
metadata=dict(description=("The selectors defined in selectors.yml"))
)
disabled: Optional[Mapping[UniqueID, List[GraphMemberNode]]] = field(
disabled: Optional[Mapping[UniqueID, List[DisabledManifestResource]]] = field(
metadata=dict(description="A mapping of the disabled nodes in the target")
)
parent_map: Optional[NodeEdgeMap] = field(
Expand Down
64 changes: 60 additions & 4 deletions core/dbt/contracts/graph/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,17 @@
UnpatchedSourceDefinition,
UnitTestDefinition,
UnitTestFileFixture,
RESOURCE_CLASS_TO_NODE_CLASS,
)
from dbt.contracts.graph.unparsed import SourcePatch, UnparsedVersion
from dbt.flags import get_flags

# to preserve import paths
from dbt.artifacts.resources import NodeVersion, DeferRelation
from dbt.artifacts.resources import (
NodeVersion,
DeferRelation,
BaseResource,
)
from dbt.artifacts.schemas.manifest import WritableManifest, ManifestMetadata, UniqueID
from dbt.contracts.files import (
SourceFile,
Expand Down Expand Up @@ -774,6 +779,7 @@ class ManifestStateCheck(dbtClassMixin):


NodeClassT = TypeVar("NodeClassT", bound="BaseNode")
ResourceClassT = TypeVar("ResourceClassT", bound="BaseResource")


@dataclass
Expand Down Expand Up @@ -1029,16 +1035,66 @@ def fill_tracking_metadata(self):
self.metadata.send_anonymous_usage_stats = get_flags().SEND_ANONYMOUS_USAGE_STATS

@classmethod
def from_writable_manifest(cls, writable_manifest: WritableManifest) -> "Manifest":
manifest = Manifest(
nodes=cls._map_resources_to_map_nodes(writable_manifest.nodes),
disabled=cls._map_list_resources_to_map_list_nodes(writable_manifest.disabled),
unit_tests=cls._map_resources_to_map_nodes(writable_manifest.unit_tests),
sources=cls._map_resources_to_map_nodes(writable_manifest.sources),
macros=cls._map_resources_to_map_nodes(writable_manifest.macros),
docs=cls._map_resources_to_map_nodes(writable_manifest.docs),
exposures=cls._map_resources_to_map_nodes(writable_manifest.exposures),
metrics=cls._map_resources_to_map_nodes(writable_manifest.metrics),
groups=cls._map_resources_to_map_nodes(writable_manifest.groups),
semantic_models=cls._map_resources_to_map_nodes(writable_manifest.semantic_models),
selectors={
selector_id: selector
for selector_id, selector in writable_manifest.selectors.items()
},
)

return manifest

def _map_nodes_to_map_resources(cls, nodes_map: MutableMapping[str, NodeClassT]):
return {node_id: node.to_resource() for node_id, node in nodes_map.items()}

def _map_list_nodes_to_map_list_resources(
cls, nodes_map: MutableMapping[str, List[NodeClassT]]
):
return {
node_id: [node.to_resource() for node in node_list]
for node_id, node_list in nodes_map.items()
}

@classmethod
def _map_resources_to_map_nodes(cls, resources_map: Mapping[str, ResourceClassT]):
return {
node_id: RESOURCE_CLASS_TO_NODE_CLASS[type(resource)].from_resource(resource)
for node_id, resource in resources_map.items()
}

@classmethod
def _map_list_resources_to_map_list_nodes(
cls, resources_map: Optional[Mapping[str, List[ResourceClassT]]]
):
if resources_map is None:
return {}

return {
node_id: [
RESOURCE_CLASS_TO_NODE_CLASS[type(resource)].from_resource(resource)
for resource in resource_list
]
for node_id, resource_list in resources_map.items()
}

def writable_manifest(self) -> "WritableManifest":
self.build_parent_and_child_maps()
self.build_group_map()
self.fill_tracking_metadata()

return WritableManifest(
nodes=self.nodes,
nodes=self._map_nodes_to_map_resources(self.nodes),
sources=self._map_nodes_to_map_resources(self.sources),
macros=self._map_nodes_to_map_resources(self.macros),
docs=self._map_nodes_to_map_resources(self.docs),
Expand All @@ -1047,7 +1103,7 @@ def writable_manifest(self) -> "WritableManifest":
groups=self._map_nodes_to_map_resources(self.groups),
selectors=self.selectors,
metadata=self.metadata,
disabled=self.disabled,
disabled=self._map_list_nodes_to_map_list_resources(self.disabled),
child_map=self.child_map,
parent_map=self.parent_map,
group_map=self.group_map,
Expand Down Expand Up @@ -1369,7 +1425,7 @@ def is_invalid_protected_ref(
def merge_from_artifact(
self,
adapter,
other: "WritableManifest",
other: "Manifest",
selected: AbstractSet[UniqueID],
favor_state: bool = False,
) -> None:
Expand Down
54 changes: 36 additions & 18 deletions core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
Type,
Iterator,
Literal,
get_args,
)

from dbt import deprecations
Expand Down Expand Up @@ -396,20 +397,6 @@ def set_cte(self, cte_id: str, sql: str):
else:
self.extra_ctes.append(InjectedCTE(id=cte_id, sql=sql))

def __post_serialize__(self, dct):
dct = super().__post_serialize__(dct)
if "_pre_injected_sql" in dct:
del dct["_pre_injected_sql"]
# Remove compiled attributes
if "compiled" in dct and dct["compiled"] is False:
del dct["compiled"]
del dct["extra_ctes_injected"]
del dct["extra_ctes"]
# "omit_none" means these might not be in the dictionary
if "compiled_code" in dct:
del dct["compiled_code"]
return dct

@property
def depends_on_nodes(self):
return self.depends_on.nodes
Expand All @@ -426,16 +413,24 @@ def depends_on_macros(self):

@dataclass
class AnalysisNode(AnalysisResource, CompiledNode):
pass
@classmethod
def resource_class(cls) -> Type[AnalysisResource]:
return AnalysisResource


@dataclass
class HookNode(HookNodeResource, CompiledNode):
pass
@classmethod
def resource_class(cls) -> Type[HookNodeResource]:
return HookNodeResource


@dataclass
class ModelNode(ModelResource, CompiledNode):
@classmethod
def resource_class(cls) -> Type[ModelResource]:
return ModelResource

@classmethod
def from_args(cls, args: ModelNodeArgs) -> "ModelNode":
unique_id = args.unique_id
Expand Down Expand Up @@ -768,7 +763,9 @@ def same_contract(self, old, adapter_type=None) -> bool:

@dataclass
class SqlNode(SqlOperationResource, CompiledNode):
pass
@classmethod
def resource_class(cls) -> Type[SqlOperationResource]:
return SqlOperationResource


# ====================================
Expand All @@ -778,6 +775,10 @@ class SqlNode(SqlOperationResource, CompiledNode):

@dataclass
class SeedNode(SeedResource, ParsedNode): # No SQLDefaults!
@classmethod
def resource_class(cls) -> Type[SeedResource]:
return SeedResource

def same_seeds(self, other: "SeedNode") -> bool:
# for seeds, we check the hashes. If the hashes are different types,
# no match. If the hashes are both the same 'path', log a warning and
Expand Down Expand Up @@ -896,6 +897,10 @@ def is_relational(self):

@dataclass
class SingularTestNode(SingularTestResource, TestShouldStoreFailures, CompiledNode):
@classmethod
def resource_class(cls) -> Type[SingularTestResource]:
return SingularTestResource

@property
def test_node_type(self):
return "singular"
Expand All @@ -908,6 +913,10 @@ def test_node_type(self):

@dataclass
class GenericTestNode(GenericTestResource, TestShouldStoreFailures, CompiledNode):
@classmethod
def resource_class(cls) -> Type[GenericTestResource]:
return GenericTestResource

def same_contents(self, other, adapter_type: Optional[str]) -> bool:
if other is None:
return False
Expand Down Expand Up @@ -1014,7 +1023,9 @@ class IntermediateSnapshotNode(CompiledNode):

@dataclass
class SnapshotNode(SnapshotResource, CompiledNode):
pass
@classmethod
def resource_class(cls) -> Type[SnapshotResource]:
return SnapshotResource


# ====================================
Expand Down Expand Up @@ -1626,3 +1637,10 @@ class ParsedMacroPatch(ParsedPatch):
]

TestNode = Union[SingularTestNode, GenericTestNode]


RESOURCE_CLASS_TO_NODE_CLASS: Dict[Type[BaseResource], Type[BaseNode]] = {
node_class.resource_class(): node_class
for node_class in get_args(Resource)
if node_class is not UnitTestNode
}
8 changes: 5 additions & 3 deletions core/dbt/contracts/state.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from pathlib import Path
from typing import Optional

from dbt.contracts.graph.manifest import WritableManifest
from dbt.contracts.graph.manifest import Manifest
from dbt.artifacts.schemas.manifest import WritableManifest
from dbt.artifacts.schemas.freshness import FreshnessExecutionResultArtifact
from dbt.artifacts.schemas.run import RunResultsArtifact
from dbt_common.events.functions import fire_event
Expand All @@ -24,7 +25,7 @@ def __init__(self, state_path: Path, target_path: Path, project_root: Path) -> N
self.state_path: Path = state_path
self.target_path: Path = target_path
self.project_root: Path = project_root
self.manifest: Optional[WritableManifest] = None
self.manifest: Optional[Manifest] = None
self.results: Optional[RunResultsArtifact] = None
self.sources: Optional[FreshnessExecutionResultArtifact] = None
self.sources_current: Optional[FreshnessExecutionResultArtifact] = None
Expand All @@ -36,7 +37,8 @@ def __init__(self, state_path: Path, target_path: Path, project_root: Path) -> N
manifest_path = self.project_root / self.state_path / "manifest.json"
if manifest_path.exists() and manifest_path.is_file():
try:
self.manifest = WritableManifest.read_and_check_versions(str(manifest_path))
writable_manifest = WritableManifest.read_and_check_versions(str(manifest_path))
self.manifest = Manifest.from_writable_manifest(writable_manifest)
except IncompatibleSchemaError as exc:
exc.add_filename(str(manifest_path))
raise
Expand Down
4 changes: 2 additions & 2 deletions core/dbt/graph/selector_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from .graph import UniqueId

from dbt.contracts.graph.manifest import Manifest, WritableManifest
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import (
SingularTestNode,
Exposure,
Expand Down Expand Up @@ -725,7 +725,7 @@ def search(self, included_nodes: Set[UniqueId], selector: str) -> Iterator[Uniqu
f'Got an invalid selector "{selector}", expected one of ' f'"{list(state_checks)}"'
)

manifest: WritableManifest = self.previous_state.manifest
manifest: Manifest = self.previous_state.manifest

for unique_id, node in self.all_nodes(included_nodes):
previous_node: Optional[SelectorTarget] = None
Expand Down
Loading

0 comments on commit 7bd6cb5

Please sign in to comment.