From cff0b65b0152f04244007fd80063046e4fc8ed21 Mon Sep 17 00:00:00 2001 From: Gerda Shank Date: Thu, 11 Jan 2024 14:58:34 -0500 Subject: [PATCH] Move the contents of dbt.contracts.results to a new dbt.artifacts directory (#9350) --- .../Under the Hood-20240108-160140.yaml | 6 + core/dbt/artifacts/base.py | 180 ++++++ core/dbt/artifacts/catalog.py | 109 ++++ core/dbt/artifacts/freshness.py | 145 +++++ core/dbt/artifacts/results.py | 148 +++++ core/dbt/artifacts/run.py | 148 +++++ core/dbt/cli/main.py | 6 +- core/dbt/contracts/graph/manifest.py | 4 +- core/dbt/contracts/graph/unparsed.py | 4 +- core/dbt/contracts/results.py | 572 ++---------------- core/dbt/contracts/sql.py | 11 +- core/dbt/contracts/state.py | 4 +- core/dbt/contracts/util.py | 183 +----- core/dbt/parser/manifest.py | 2 +- core/dbt/plugins/contracts.py | 4 +- core/dbt/task/base.py | 11 +- core/dbt/task/build.py | 4 +- core/dbt/task/clone.py | 2 +- core/dbt/task/compile.py | 2 +- core/dbt/task/debug.py | 2 +- core/dbt/task/docs/generate.py | 4 +- core/dbt/task/freshness.py | 2 +- core/dbt/task/printer.py | 2 +- core/dbt/task/retry.py | 2 +- core/dbt/task/run.py | 3 +- core/dbt/task/run_operation.py | 3 +- core/dbt/task/runnable.py | 10 +- core/dbt/task/seed.py | 3 +- core/dbt/task/show.py | 2 +- core/dbt/task/snapshot.py | 2 +- core/dbt/task/test.py | 4 +- scripts/collect-artifact-schema.py | 10 +- .../tests/adapter/basic/test_incremental.py | 2 +- .../tests/adapter/catalog/relation_types.py | 2 +- .../incremental/test_incremental_unique_id.py | 2 +- .../store_test_failures_tests/basic.py | 2 +- tests/functional/artifacts/test_artifacts.py | 3 +- .../artifacts/test_previous_version_state.py | 4 +- tests/unit/test_contracts_graph_unparsed.py | 2 +- tests/unit/test_events.py | 3 +- 40 files changed, 839 insertions(+), 775 deletions(-) create mode 100644 .changes/unreleased/Under the Hood-20240108-160140.yaml create mode 100644 core/dbt/artifacts/base.py create mode 100644 core/dbt/artifacts/catalog.py create mode 100644 core/dbt/artifacts/freshness.py create mode 100644 core/dbt/artifacts/results.py create mode 100644 core/dbt/artifacts/run.py diff --git a/.changes/unreleased/Under the Hood-20240108-160140.yaml b/.changes/unreleased/Under the Hood-20240108-160140.yaml new file mode 100644 index 00000000000..2c4f9f8eaa8 --- /dev/null +++ b/.changes/unreleased/Under the Hood-20240108-160140.yaml @@ -0,0 +1,6 @@ +kind: Under the Hood +body: Move result objects to dbt.artifacts +time: 2024-01-08T16:01:40.20348-05:00 +custom: + Author: gshank + Issue: "9193" diff --git a/core/dbt/artifacts/base.py b/core/dbt/artifacts/base.py new file mode 100644 index 00000000000..69806915c43 --- /dev/null +++ b/core/dbt/artifacts/base.py @@ -0,0 +1,180 @@ +import dataclasses +from datetime import datetime +from typing import ClassVar, Type, TypeVar, Dict, Any, Optional + +from dbt.common.clients.system import write_json, read_json +from dbt.exceptions import ( + DbtInternalError, + DbtRuntimeError, + IncompatibleSchemaError, +) +from dbt.version import __version__ + +from dbt.common.events.functions import get_metadata_vars +from dbt.common.invocation import get_invocation_id +from dbt.common.dataclass_schema import dbtClassMixin + +from mashumaro.jsonschema import build_json_schema +from mashumaro.jsonschema.dialects import DRAFT_2020_12 +import functools + + +BASE_SCHEMAS_URL = "https://schemas.getdbt.com/" +SCHEMA_PATH = "dbt/{name}/v{version}.json" + + +@dataclasses.dataclass +class SchemaVersion: + name: str + version: int + + @property + def path(self) -> str: + return SCHEMA_PATH.format(name=self.name, version=self.version) + + def __str__(self) -> str: + return BASE_SCHEMAS_URL + self.path + + +class Writable: + def write(self, path: str): + write_json(path, self.to_dict(omit_none=False)) # type: ignore + + +class Readable: + @classmethod + def read(cls, path: str): + try: + data = read_json(path) + except (EnvironmentError, ValueError) as exc: + raise DbtRuntimeError( + f'Could not read {cls.__name__} at "{path}" as JSON: {exc}' + ) from exc + + return cls.from_dict(data) # type: ignore + + +# This is used in the ManifestMetadata, RunResultsMetadata, RunOperationResultMetadata, +# FreshnessMetadata, and CatalogMetadata classes +@dataclasses.dataclass +class BaseArtifactMetadata(dbtClassMixin): + dbt_schema_version: str + dbt_version: str = __version__ + generated_at: datetime = dataclasses.field(default_factory=datetime.utcnow) + invocation_id: Optional[str] = dataclasses.field(default_factory=get_invocation_id) + env: Dict[str, str] = dataclasses.field(default_factory=get_metadata_vars) + + def __post_serialize__(self, dct): + dct = super().__post_serialize__(dct) + if dct["generated_at"] and dct["generated_at"].endswith("+00:00"): + dct["generated_at"] = dct["generated_at"].replace("+00:00", "") + "Z" + return dct + + +# This is used as a class decorator to set the schema_version in the +# 'dbt_schema_version' class attribute. (It's copied into the metadata objects.) +# Name attributes of SchemaVersion in classes with the 'schema_version' decorator: +# manifest +# run-results +# run-operation-result +# sources +# catalog +# remote-compile-result +# remote-execution-result +# remote-run-result +def schema_version(name: str, version: int): + def inner(cls: Type[VersionedSchema]): + cls.dbt_schema_version = SchemaVersion( + name=name, + version=version, + ) + return cls + + return inner + + +# This is used in the ArtifactMixin and RemoteResult classes +@dataclasses.dataclass +class VersionedSchema(dbtClassMixin): + dbt_schema_version: ClassVar[SchemaVersion] + + @classmethod + @functools.lru_cache + def json_schema(cls) -> Dict[str, Any]: + json_schema_obj = build_json_schema(cls, dialect=DRAFT_2020_12, with_dialect_uri=True) + json_schema = json_schema_obj.to_dict() + json_schema["$id"] = str(cls.dbt_schema_version) + return json_schema + + @classmethod + def is_compatible_version(cls, schema_version): + compatible_versions = [str(cls.dbt_schema_version)] + if hasattr(cls, "compatible_previous_versions"): + for name, version in cls.compatible_previous_versions(): + compatible_versions.append(str(SchemaVersion(name, version))) + return str(schema_version) in compatible_versions + + @classmethod + def read_and_check_versions(cls, path: str): + try: + data = read_json(path) + except (EnvironmentError, ValueError) as exc: + raise DbtRuntimeError( + f'Could not read {cls.__name__} at "{path}" as JSON: {exc}' + ) from exc + + # Check metadata version. There is a class variable 'dbt_schema_version', but + # that doesn't show up in artifacts, where it only exists in the 'metadata' + # dictionary. + if hasattr(cls, "dbt_schema_version"): + if "metadata" in data and "dbt_schema_version" in data["metadata"]: + previous_schema_version = data["metadata"]["dbt_schema_version"] + # cls.dbt_schema_version is a SchemaVersion object + if not cls.is_compatible_version(previous_schema_version): + raise IncompatibleSchemaError( + expected=str(cls.dbt_schema_version), + found=previous_schema_version, + ) + + return cls.upgrade_schema_version(data) + + @classmethod + def upgrade_schema_version(cls, data): + """This will modify the data (dictionary) passed in to match the current + artifact schema code, if necessary. This is the default method, which + just returns the instantiated object via from_dict.""" + return cls.from_dict(data) + + +T = TypeVar("T", bound="ArtifactMixin") + + +# metadata should really be a Generic[T_M] where T_M is a TypeVar bound to +# BaseArtifactMetadata. Unfortunately this isn't possible due to a mypy issue: +# https://github.com/python/mypy/issues/7520 +# This is used in the WritableManifest, RunResultsArtifact, RunOperationResultsArtifact, +# and CatalogArtifact +@dataclasses.dataclass(init=False) +class ArtifactMixin(VersionedSchema, Writable, Readable): + metadata: BaseArtifactMetadata + + @classmethod + def validate(cls, data): + super().validate(data) + if cls.dbt_schema_version is None: + raise DbtInternalError("Cannot call from_dict with no schema version!") + + +def get_artifact_schema_version(dct: dict) -> int: + schema_version = dct.get("metadata", {}).get("dbt_schema_version", None) + if not schema_version: + raise ValueError("Artifact is missing schema version") + + # schema_version is in this format: https://schemas.getdbt.com/dbt/manifest/v10.json + # What the code below is doing: + # 1. Split on "/" – v10.json + # 2. Split on "." – v10 + # 3. Skip first character – 10 + # 4. Convert to int + # TODO: If this gets more complicated, turn into a regex + return int(schema_version.split("/")[-1].split(".")[0][1:]) diff --git a/core/dbt/artifacts/catalog.py b/core/dbt/artifacts/catalog.py new file mode 100644 index 00000000000..768992ddd1b --- /dev/null +++ b/core/dbt/artifacts/catalog.py @@ -0,0 +1,109 @@ +from typing import Dict, Union, Optional, NamedTuple, Any, List +from dataclasses import dataclass, field +from datetime import datetime + +from dbt.common.dataclass_schema import dbtClassMixin +from dbt.common.utils.formatting import lowercase +from dbt.common.contracts.util import Replaceable +from dbt.artifacts.base import ArtifactMixin, BaseArtifactMetadata, schema_version + +Primitive = Union[bool, str, float, None] +PrimitiveDict = Dict[str, Primitive] + +CatalogKey = NamedTuple( + "CatalogKey", [("database", Optional[str]), ("schema", str), ("name", str)] +) + + +@dataclass +class StatsItem(dbtClassMixin): + id: str + label: str + value: Primitive + include: bool + description: Optional[str] = None + + +StatsDict = Dict[str, StatsItem] + + +@dataclass +class ColumnMetadata(dbtClassMixin): + type: str + index: int + name: str + comment: Optional[str] = None + + +ColumnMap = Dict[str, ColumnMetadata] + + +@dataclass +class TableMetadata(dbtClassMixin): + type: str + schema: str + name: str + database: Optional[str] = None + comment: Optional[str] = None + owner: Optional[str] = None + + +@dataclass +class CatalogTable(dbtClassMixin, Replaceable): + metadata: TableMetadata + columns: ColumnMap + stats: StatsDict + # the same table with two unique IDs will just be listed two times + unique_id: Optional[str] = None + + def key(self) -> CatalogKey: + return CatalogKey( + lowercase(self.metadata.database), + self.metadata.schema.lower(), + self.metadata.name.lower(), + ) + + +@dataclass +class CatalogMetadata(BaseArtifactMetadata): + dbt_schema_version: str = field( + default_factory=lambda: str(CatalogArtifact.dbt_schema_version) + ) + + +@dataclass +class CatalogResults(dbtClassMixin): + nodes: Dict[str, CatalogTable] + sources: Dict[str, CatalogTable] + errors: Optional[List[str]] = None + _compile_results: Optional[Any] = None + + def __post_serialize__(self, dct): + dct = super().__post_serialize__(dct) + if "_compile_results" in dct: + del dct["_compile_results"] + return dct + + +@dataclass +@schema_version("catalog", 1) +class CatalogArtifact(CatalogResults, ArtifactMixin): + metadata: CatalogMetadata + + @classmethod + def from_results( + cls, + generated_at: datetime, + nodes: Dict[str, CatalogTable], + sources: Dict[str, CatalogTable], + compile_results: Optional[Any], + errors: Optional[List[str]], + ) -> "CatalogArtifact": + meta = CatalogMetadata(generated_at=generated_at) + return cls( + metadata=meta, + nodes=nodes, + sources=sources, + errors=errors, + _compile_results=compile_results, + ) diff --git a/core/dbt/artifacts/freshness.py b/core/dbt/artifacts/freshness.py new file mode 100644 index 00000000000..ccef153c9ac --- /dev/null +++ b/core/dbt/artifacts/freshness.py @@ -0,0 +1,145 @@ +from dataclasses import dataclass, field +from typing import Dict, Any, Sequence, List, Union, Optional +from datetime import datetime + +from dbt.artifacts.results import ExecutionResult, FreshnessStatus, NodeResult, TimingInfo +from dbt.artifacts.base import ArtifactMixin, VersionedSchema, schema_version, BaseArtifactMetadata +from dbt.common.dataclass_schema import dbtClassMixin, StrEnum +from dbt.common.exceptions import DbtInternalError + +from dbt.contracts.graph.unparsed import FreshnessThreshold +from dbt.contracts.graph.nodes import SourceDefinition + + +@dataclass +class SourceFreshnessResult(NodeResult): + node: SourceDefinition + status: FreshnessStatus + max_loaded_at: datetime + snapshotted_at: datetime + age: float + + @property + def skipped(self): + return False + + +@dataclass +class PartialSourceFreshnessResult(NodeResult): + status: FreshnessStatus + + @property + def skipped(self): + return False + + +FreshnessNodeResult = Union[PartialSourceFreshnessResult, SourceFreshnessResult] + + +@dataclass +class FreshnessMetadata(BaseArtifactMetadata): + dbt_schema_version: str = field( + default_factory=lambda: str(FreshnessExecutionResultArtifact.dbt_schema_version) + ) + + +@dataclass +class FreshnessResult(ExecutionResult): + metadata: FreshnessMetadata + results: Sequence[FreshnessNodeResult] + + @classmethod + def from_node_results( + cls, + results: List[FreshnessNodeResult], + elapsed_time: float, + generated_at: datetime, + ): + meta = FreshnessMetadata(generated_at=generated_at) + return cls(metadata=meta, results=results, elapsed_time=elapsed_time) + + def write(self, path): + FreshnessExecutionResultArtifact.from_result(self).write(path) + + +@dataclass +class SourceFreshnessOutput(dbtClassMixin): + unique_id: str + max_loaded_at: datetime + snapshotted_at: datetime + max_loaded_at_time_ago_in_s: float + status: FreshnessStatus + criteria: FreshnessThreshold + adapter_response: Dict[str, Any] + timing: List[TimingInfo] + thread_id: str + execution_time: float + + +class FreshnessErrorEnum(StrEnum): + runtime_error = "runtime error" + + +@dataclass +class SourceFreshnessRuntimeError(dbtClassMixin): + unique_id: str + error: Optional[Union[str, int]] + status: FreshnessErrorEnum + + +FreshnessNodeOutput = Union[SourceFreshnessRuntimeError, SourceFreshnessOutput] + + +@dataclass +@schema_version("sources", 3) +class FreshnessExecutionResultArtifact( + ArtifactMixin, + VersionedSchema, +): + metadata: FreshnessMetadata + results: Sequence[FreshnessNodeOutput] + elapsed_time: float + + @classmethod + def from_result(cls, base: FreshnessResult): + processed = [process_freshness_result(r) for r in base.results] + return cls( + metadata=base.metadata, + results=processed, + elapsed_time=base.elapsed_time, + ) + + +def process_freshness_result(result: FreshnessNodeResult) -> FreshnessNodeOutput: + unique_id = result.node.unique_id + if result.status == FreshnessStatus.RuntimeErr: + return SourceFreshnessRuntimeError( + unique_id=unique_id, + error=result.message, + status=FreshnessErrorEnum.runtime_error, + ) + + # we know that this must be a SourceFreshnessResult + if not isinstance(result, SourceFreshnessResult): + raise DbtInternalError( + "Got {} instead of a SourceFreshnessResult for a " + "non-error result in freshness execution!".format(type(result)) + ) + # if we're here, we must have a non-None freshness threshold + criteria = result.node.freshness + if criteria is None: + raise DbtInternalError( + "Somehow evaluated a freshness result for a source that has no freshness criteria!" + ) + return SourceFreshnessOutput( + unique_id=unique_id, + max_loaded_at=result.max_loaded_at, + snapshotted_at=result.snapshotted_at, + max_loaded_at_time_ago_in_s=result.age, + status=result.status, + criteria=criteria, + adapter_response=result.adapter_response, + timing=result.timing, + thread_id=result.thread_id, + execution_time=result.execution_time, + ) diff --git a/core/dbt/artifacts/results.py b/core/dbt/artifacts/results.py new file mode 100644 index 00000000000..8bd5bf1fe54 --- /dev/null +++ b/core/dbt/artifacts/results.py @@ -0,0 +1,148 @@ +from dbt.contracts.graph.nodes import ResultNode +from dbt.common.events.functions import fire_event +from dbt.events.types import TimingInfoCollected +from dbt.common.events.contextvars import get_node_info +from dbt.common.events.helpers import datetime_to_json_string +from dbt.logger import TimingProcessor +from dbt.common.utils import cast_to_str, cast_to_int +from dbt.common.dataclass_schema import dbtClassMixin, StrEnum + +from dataclasses import dataclass +from datetime import datetime +from typing import Any, Callable, Dict, List, Optional, Sequence, Union + + +@dataclass +class TimingInfo(dbtClassMixin): + name: str + started_at: Optional[datetime] = None + completed_at: Optional[datetime] = None + + def begin(self): + self.started_at = datetime.utcnow() + + def end(self): + self.completed_at = datetime.utcnow() + + def to_msg_dict(self): + msg_dict = {"name": self.name} + if self.started_at: + msg_dict["started_at"] = datetime_to_json_string(self.started_at) + if self.completed_at: + msg_dict["completed_at"] = datetime_to_json_string(self.completed_at) + return msg_dict + + +# This is a context manager +class collect_timing_info: + def __init__(self, name: str, callback: Callable[[TimingInfo], None]) -> None: + self.timing_info = TimingInfo(name=name) + self.callback = callback + + def __enter__(self): + self.timing_info.begin() + + def __exit__(self, exc_type, exc_value, traceback): + self.timing_info.end() + self.callback(self.timing_info) + # Note: when legacy logger is removed, we can remove the following line + with TimingProcessor(self.timing_info): + fire_event( + TimingInfoCollected( + timing_info=self.timing_info.to_msg_dict(), node_info=get_node_info() + ) + ) + + +class RunningStatus(StrEnum): + Started = "started" + Compiling = "compiling" + Executing = "executing" + + +class NodeStatus(StrEnum): + Success = "success" + Error = "error" + Fail = "fail" + Warn = "warn" + Skipped = "skipped" + Pass = "pass" + RuntimeErr = "runtime error" + + +class RunStatus(StrEnum): + Success = NodeStatus.Success + Error = NodeStatus.Error + Skipped = NodeStatus.Skipped + + +class TestStatus(StrEnum): + __test__ = False + Pass = NodeStatus.Pass + Error = NodeStatus.Error + Fail = NodeStatus.Fail + Warn = NodeStatus.Warn + Skipped = NodeStatus.Skipped + + +class FreshnessStatus(StrEnum): + Pass = NodeStatus.Pass + Warn = NodeStatus.Warn + Error = NodeStatus.Error + RuntimeErr = NodeStatus.RuntimeErr + + +@dataclass +class BaseResult(dbtClassMixin): + status: Union[RunStatus, TestStatus, FreshnessStatus] + timing: List[TimingInfo] + thread_id: str + execution_time: float + adapter_response: Dict[str, Any] + message: Optional[str] + failures: Optional[int] + + @classmethod + def __pre_deserialize__(cls, data): + data = super().__pre_deserialize__(data) + if "message" not in data: + data["message"] = None + if "failures" not in data: + data["failures"] = None + return data + + def to_msg_dict(self): + msg_dict = { + "status": str(self.status), + "message": cast_to_str(self.message), + "thread": self.thread_id, + "execution_time": self.execution_time, + "num_failures": cast_to_int(self.failures), + "timing_info": [ti.to_msg_dict() for ti in self.timing], + "adapter_response": self.adapter_response, + } + return msg_dict + + +@dataclass +class NodeResult(BaseResult): + node: ResultNode + + +@dataclass +class ExecutionResult(dbtClassMixin): + results: Sequence[BaseResult] + elapsed_time: float + + def __len__(self): + return len(self.results) + + def __iter__(self): + return iter(self.results) + + def __getitem__(self, idx): + return self.results[idx] + + +# due to issues with typing.Union collapsing subclasses, this can't subclass +# PartialResult diff --git a/core/dbt/artifacts/run.py b/core/dbt/artifacts/run.py new file mode 100644 index 00000000000..c3e55248230 --- /dev/null +++ b/core/dbt/artifacts/run.py @@ -0,0 +1,148 @@ +import threading +from typing import Any, Optional, Iterable, Tuple, Sequence, Dict +import agate +from dataclasses import dataclass, field +from datetime import datetime + + +from dbt.contracts.graph.nodes import CompiledNode +from dbt.artifacts.base import ( + BaseArtifactMetadata, + ArtifactMixin, + schema_version, + get_artifact_schema_version, +) +from dbt.artifacts.results import ( + BaseResult, + NodeResult, + RunStatus, + ResultNode, + ExecutionResult, +) +from dbt.common.clients.system import write_json + + +@dataclass +class RunResult(NodeResult): + agate_table: Optional[agate.Table] = field( + default=None, metadata={"serialize": lambda x: None, "deserialize": lambda x: None} + ) + + @property + def skipped(self): + return self.status == RunStatus.Skipped + + @classmethod + def from_node(cls, node: ResultNode, status: RunStatus, message: Optional[str]): + thread_id = threading.current_thread().name + return RunResult( + status=status, + thread_id=thread_id, + execution_time=0, + timing=[], + message=message, + node=node, + adapter_response={}, + failures=None, + ) + + +@dataclass +class RunResultsMetadata(BaseArtifactMetadata): + dbt_schema_version: str = field( + default_factory=lambda: str(RunResultsArtifact.dbt_schema_version) + ) + + +@dataclass +class RunResultOutput(BaseResult): + unique_id: str + compiled: Optional[bool] + compiled_code: Optional[str] + relation_name: Optional[str] + + +def process_run_result(result: RunResult) -> RunResultOutput: + + compiled = isinstance(result.node, CompiledNode) + + return RunResultOutput( + unique_id=result.node.unique_id, + status=result.status, + timing=result.timing, + thread_id=result.thread_id, + execution_time=result.execution_time, + message=result.message, + adapter_response=result.adapter_response, + failures=result.failures, + compiled=result.node.compiled if compiled else None, # type:ignore + compiled_code=result.node.compiled_code if compiled else None, # type:ignore + relation_name=result.node.relation_name if compiled else None, # type:ignore + ) + + +@dataclass +class RunExecutionResult( + ExecutionResult, +): + results: Sequence[RunResult] + args: Dict[str, Any] = field(default_factory=dict) + generated_at: datetime = field(default_factory=datetime.utcnow) + + def write(self, path: str): + writable = RunResultsArtifact.from_execution_results( + results=self.results, + elapsed_time=self.elapsed_time, + generated_at=self.generated_at, + args=self.args, + ) + writable.write(path) + + +@dataclass +@schema_version("run-results", 6) +class RunResultsArtifact(ExecutionResult, ArtifactMixin): + results: Sequence[RunResultOutput] + args: Dict[str, Any] = field(default_factory=dict) + + @classmethod + def from_execution_results( + cls, + results: Sequence[RunResult], + elapsed_time: float, + generated_at: datetime, + args: Dict, + ): + processed_results = [ + process_run_result(result) for result in results if isinstance(result, RunResult) + ] + meta = RunResultsMetadata( + dbt_schema_version=str(cls.dbt_schema_version), + generated_at=generated_at, + ) + return cls(metadata=meta, results=processed_results, elapsed_time=elapsed_time, args=args) + + @classmethod + def compatible_previous_versions(cls) -> Iterable[Tuple[str, int]]: + return [ + ("run-results", 4), + ("run-results", 5), + ] + + @classmethod + def upgrade_schema_version(cls, data): + """This overrides the "upgrade_schema_version" call in VersionedSchema (via + ArtifactMixin) to modify the dictionary passed in from earlier versions of the run_results.""" + run_results_schema_version = get_artifact_schema_version(data) + # If less than the current version (v5), preprocess contents to match latest schema version + if run_results_schema_version <= 5: + # In v5, we added 'compiled' attributes to each result entry + # Going forward, dbt expects these to be populated + for result in data["results"]: + result["compiled"] = False + result["compiled_code"] = "" + result["relation_name"] = "" + return cls.from_dict(data) + + def write(self, path: str): + write_json(path, self.to_dict(omit_none=False)) diff --git a/core/dbt/cli/main.py b/core/dbt/cli/main.py index 43d0aa3501f..3525de95f96 100644 --- a/core/dbt/cli/main.py +++ b/core/dbt/cli/main.py @@ -17,10 +17,8 @@ DbtUsageException, ) from dbt.contracts.graph.manifest import Manifest -from dbt.contracts.results import ( - CatalogArtifact, - RunExecutionResult, -) +from dbt.artifacts.catalog import CatalogArtifact +from dbt.artifacts.run import RunExecutionResult from dbt.common.events.base_types import EventMsg from dbt.task.build import BuildTask from dbt.task.clean import CleanTask diff --git a/core/dbt/contracts/graph/manifest.py b/core/dbt/contracts/graph/manifest.py index a094dcfec15..426b99d39b6 100644 --- a/core/dbt/contracts/graph/manifest.py +++ b/core/dbt/contracts/graph/manifest.py @@ -46,13 +46,13 @@ from dbt.contracts.graph.unparsed import SourcePatch, NodeVersion, UnparsedVersion from dbt.contracts.graph.manifest_upgrade import upgrade_manifest_json from dbt.contracts.files import SourceFile, SchemaSourceFile, FileHash, AnySourceFile -from dbt.contracts.util import ( +from dbt.artifacts.base import ( BaseArtifactMetadata, - SourceKey, ArtifactMixin, schema_version, get_artifact_schema_version, ) +from dbt.contracts.util import SourceKey from dbt.common.dataclass_schema import dbtClassMixin from dbt.exceptions import ( diff --git a/core/dbt/contracts/graph/unparsed.py b/core/dbt/contracts/graph/unparsed.py index 75c3304c5d1..17d889c4c32 100644 --- a/core/dbt/contracts/graph/unparsed.py +++ b/core/dbt/contracts/graph/unparsed.py @@ -301,8 +301,8 @@ class FreshnessThreshold(dbtClassMixin, Mergeable): error_after: Optional[Time] = field(default_factory=Time) filter: Optional[str] = None - def status(self, age: float) -> "dbt.contracts.results.FreshnessStatus": - from dbt.contracts.results import FreshnessStatus + def status(self, age: float) -> "dbt.artifacts.results.FreshnessStatus": + from dbt.artifacts.results import FreshnessStatus if self.error_after and self.error_after.exceeded(age): return FreshnessStatus.Error diff --git a/core/dbt/contracts/results.py b/core/dbt/contracts/results.py index 173cd396c90..ea69ee9b86d 100644 --- a/core/dbt/contracts/results.py +++ b/core/dbt/contracts/results.py @@ -1,536 +1,58 @@ -import threading +# flake8: noqa -from dbt.contracts.graph.unparsed import FreshnessThreshold -from dbt.contracts.graph.nodes import CompiledNode, SourceDefinition, ResultNode -from dbt.contracts.util import ( - BaseArtifactMetadata, +# This file is temporary, in order to not break various adapter tests, etc, until +# they are updated to use the new locations. + +from dbt.artifacts.base import ( ArtifactMixin, + BaseArtifactMetadata, VersionedSchema, - Replaceable, schema_version, - get_artifact_schema_version, ) -from dbt.common.exceptions import DbtInternalError -from dbt.common.events.functions import fire_event -from dbt.events.types import TimingInfoCollected -from dbt.common.events.contextvars import get_node_info -from dbt.common.events.helpers import datetime_to_json_string -from dbt.logger import TimingProcessor -from dbt.common.utils.formatting import lowercase -from dbt.common.utils import cast_to_str, cast_to_int -from dbt.common.dataclass_schema import dbtClassMixin, StrEnum - -import agate - -from dataclasses import dataclass, field -from datetime import datetime -from typing import ( - Any, - Callable, - Dict, - List, - NamedTuple, - Optional, - Sequence, - Union, - Iterable, - Tuple, -) - -from dbt.common.clients.system import write_json - - -@dataclass -class TimingInfo(dbtClassMixin): - name: str - started_at: Optional[datetime] = None - completed_at: Optional[datetime] = None - - def begin(self): - self.started_at = datetime.utcnow() - - def end(self): - self.completed_at = datetime.utcnow() - - def to_msg_dict(self): - msg_dict = {"name": self.name} - if self.started_at: - msg_dict["started_at"] = datetime_to_json_string(self.started_at) - if self.completed_at: - msg_dict["completed_at"] = datetime_to_json_string(self.completed_at) - return msg_dict - - -# This is a context manager -class collect_timing_info: - def __init__(self, name: str, callback: Callable[[TimingInfo], None]) -> None: - self.timing_info = TimingInfo(name=name) - self.callback = callback - - def __enter__(self): - self.timing_info.begin() - - def __exit__(self, exc_type, exc_value, traceback): - self.timing_info.end() - self.callback(self.timing_info) - # Note: when legacy logger is removed, we can remove the following line - with TimingProcessor(self.timing_info): - fire_event( - TimingInfoCollected( - timing_info=self.timing_info.to_msg_dict(), node_info=get_node_info() - ) - ) - - -class RunningStatus(StrEnum): - Started = "started" - Compiling = "compiling" - Executing = "executing" - - -class NodeStatus(StrEnum): - Success = "success" - Error = "error" - Fail = "fail" - Warn = "warn" - Skipped = "skipped" - Pass = "pass" - RuntimeErr = "runtime error" - - -class RunStatus(StrEnum): - Success = NodeStatus.Success - Error = NodeStatus.Error - Skipped = NodeStatus.Skipped - - -class TestStatus(StrEnum): - __test__ = False - Pass = NodeStatus.Pass - Error = NodeStatus.Error - Fail = NodeStatus.Fail - Warn = NodeStatus.Warn - Skipped = NodeStatus.Skipped - - -class FreshnessStatus(StrEnum): - Pass = NodeStatus.Pass - Warn = NodeStatus.Warn - Error = NodeStatus.Error - RuntimeErr = NodeStatus.RuntimeErr - - -@dataclass -class BaseResult(dbtClassMixin): - status: Union[RunStatus, TestStatus, FreshnessStatus] - timing: List[TimingInfo] - thread_id: str - execution_time: float - adapter_response: Dict[str, Any] - message: Optional[str] - failures: Optional[int] - @classmethod - def __pre_deserialize__(cls, data): - data = super().__pre_deserialize__(data) - if "message" not in data: - data["message"] = None - if "failures" not in data: - data["failures"] = None - return data - - def to_msg_dict(self): - msg_dict = { - "status": str(self.status), - "message": cast_to_str(self.message), - "thread": self.thread_id, - "execution_time": self.execution_time, - "num_failures": cast_to_int(self.failures), - "timing_info": [ti.to_msg_dict() for ti in self.timing], - "adapter_response": self.adapter_response, - } - return msg_dict - - -@dataclass -class NodeResult(BaseResult): - node: ResultNode - - -@dataclass -class RunResult(NodeResult): - agate_table: Optional[agate.Table] = field( - default=None, metadata={"serialize": lambda x: None, "deserialize": lambda x: None} - ) - - @property - def skipped(self): - return self.status == RunStatus.Skipped - - @classmethod - def from_node(cls, node: ResultNode, status: RunStatus, message: Optional[str]): - thread_id = threading.current_thread().name - return RunResult( - status=status, - thread_id=thread_id, - execution_time=0, - timing=[], - message=message, - node=node, - adapter_response={}, - failures=None, - ) - - -@dataclass -class ExecutionResult(dbtClassMixin): - results: Sequence[BaseResult] - elapsed_time: float - - def __len__(self): - return len(self.results) - - def __iter__(self): - return iter(self.results) - - def __getitem__(self, idx): - return self.results[idx] - - -@dataclass -class RunResultsMetadata(BaseArtifactMetadata): - dbt_schema_version: str = field( - default_factory=lambda: str(RunResultsArtifact.dbt_schema_version) - ) - - -@dataclass -class RunResultOutput(BaseResult): - unique_id: str - compiled: Optional[bool] - compiled_code: Optional[str] - relation_name: Optional[str] - - -def process_run_result(result: RunResult) -> RunResultOutput: - - compiled = isinstance(result.node, CompiledNode) - - return RunResultOutput( - unique_id=result.node.unique_id, - status=result.status, - timing=result.timing, - thread_id=result.thread_id, - execution_time=result.execution_time, - message=result.message, - adapter_response=result.adapter_response, - failures=result.failures, - compiled=result.node.compiled if compiled else None, # type:ignore - compiled_code=result.node.compiled_code if compiled else None, # type:ignore - relation_name=result.node.relation_name if compiled else None, # type:ignore - ) - - -@dataclass -class RunExecutionResult( +from dbt.artifacts.results import ( + NodeStatus, + RunStatus, + TestStatus, + FreshnessStatus, + RunningStatus, + TimingInfo, + collect_timing_info, + BaseResult, + NodeResult, ExecutionResult, -): - results: Sequence[RunResult] - args: Dict[str, Any] = field(default_factory=dict) - generated_at: datetime = field(default_factory=datetime.utcnow) - - def write(self, path: str): - writable = RunResultsArtifact.from_execution_results( - results=self.results, - elapsed_time=self.elapsed_time, - generated_at=self.generated_at, - args=self.args, - ) - writable.write(path) - - -@dataclass -@schema_version("run-results", 6) -class RunResultsArtifact(ExecutionResult, ArtifactMixin): - results: Sequence[RunResultOutput] - args: Dict[str, Any] = field(default_factory=dict) - - @classmethod - def from_execution_results( - cls, - results: Sequence[RunResult], - elapsed_time: float, - generated_at: datetime, - args: Dict, - ): - processed_results = [ - process_run_result(result) for result in results if isinstance(result, RunResult) - ] - meta = RunResultsMetadata( - dbt_schema_version=str(cls.dbt_schema_version), - generated_at=generated_at, - ) - return cls(metadata=meta, results=processed_results, elapsed_time=elapsed_time, args=args) - - @classmethod - def compatible_previous_versions(cls) -> Iterable[Tuple[str, int]]: - return [ - ("run-results", 4), - ("run-results", 5), - ] - - @classmethod - def upgrade_schema_version(cls, data): - """This overrides the "upgrade_schema_version" call in VersionedSchema (via - ArtifactMixin) to modify the dictionary passed in from earlier versions of the run_results.""" - run_results_schema_version = get_artifact_schema_version(data) - # If less than the current version (v5), preprocess contents to match latest schema version - if run_results_schema_version <= 5: - # In v5, we added 'compiled' attributes to each result entry - # Going forward, dbt expects these to be populated - for result in data["results"]: - result["compiled"] = False - result["compiled_code"] = "" - result["relation_name"] = "" - return cls.from_dict(data) - - def write(self, path: str): - write_json(path, self.to_dict(omit_none=False)) - - -# due to issues with typing.Union collapsing subclasses, this can't subclass -# PartialResult - - -@dataclass -class SourceFreshnessResult(NodeResult): - node: SourceDefinition - status: FreshnessStatus - max_loaded_at: datetime - snapshotted_at: datetime - age: float - - @property - def skipped(self): - return False - - -class FreshnessErrorEnum(StrEnum): - runtime_error = "runtime error" - - -@dataclass -class SourceFreshnessRuntimeError(dbtClassMixin): - unique_id: str - error: Optional[Union[str, int]] - status: FreshnessErrorEnum - - -@dataclass -class SourceFreshnessOutput(dbtClassMixin): - unique_id: str - max_loaded_at: datetime - snapshotted_at: datetime - max_loaded_at_time_ago_in_s: float - status: FreshnessStatus - criteria: FreshnessThreshold - adapter_response: Dict[str, Any] - timing: List[TimingInfo] - thread_id: str - execution_time: float - - -@dataclass -class PartialSourceFreshnessResult(NodeResult): - status: FreshnessStatus - - @property - def skipped(self): - return False - - -FreshnessNodeResult = Union[PartialSourceFreshnessResult, SourceFreshnessResult] -FreshnessNodeOutput = Union[SourceFreshnessRuntimeError, SourceFreshnessOutput] - - -def process_freshness_result(result: FreshnessNodeResult) -> FreshnessNodeOutput: - unique_id = result.node.unique_id - if result.status == FreshnessStatus.RuntimeErr: - return SourceFreshnessRuntimeError( - unique_id=unique_id, - error=result.message, - status=FreshnessErrorEnum.runtime_error, - ) - - # we know that this must be a SourceFreshnessResult - if not isinstance(result, SourceFreshnessResult): - raise DbtInternalError( - "Got {} instead of a SourceFreshnessResult for a " - "non-error result in freshness execution!".format(type(result)) - ) - # if we're here, we must have a non-None freshness threshold - criteria = result.node.freshness - if criteria is None: - raise DbtInternalError( - "Somehow evaluated a freshness result for a source that has no freshness criteria!" - ) - return SourceFreshnessOutput( - unique_id=unique_id, - max_loaded_at=result.max_loaded_at, - snapshotted_at=result.snapshotted_at, - max_loaded_at_time_ago_in_s=result.age, - status=result.status, - criteria=criteria, - adapter_response=result.adapter_response, - timing=result.timing, - thread_id=result.thread_id, - execution_time=result.execution_time, - ) - - -@dataclass -class FreshnessMetadata(BaseArtifactMetadata): - dbt_schema_version: str = field( - default_factory=lambda: str(FreshnessExecutionResultArtifact.dbt_schema_version) - ) - - -@dataclass -class FreshnessResult(ExecutionResult): - metadata: FreshnessMetadata - results: Sequence[FreshnessNodeResult] - - @classmethod - def from_node_results( - cls, - results: List[FreshnessNodeResult], - elapsed_time: float, - generated_at: datetime, - ): - meta = FreshnessMetadata(generated_at=generated_at) - return cls(metadata=meta, results=results, elapsed_time=elapsed_time) - - def write(self, path): - FreshnessExecutionResultArtifact.from_result(self).write(path) - - -@dataclass -@schema_version("sources", 3) -class FreshnessExecutionResultArtifact( - ArtifactMixin, - VersionedSchema, -): - metadata: FreshnessMetadata - results: Sequence[FreshnessNodeOutput] - elapsed_time: float - - @classmethod - def from_result(cls, base: FreshnessResult): - processed = [process_freshness_result(r) for r in base.results] - return cls( - metadata=base.metadata, - results=processed, - elapsed_time=base.elapsed_time, - ) - - -Primitive = Union[bool, str, float, None] -PrimitiveDict = Dict[str, Primitive] - -CatalogKey = NamedTuple( - "CatalogKey", [("database", Optional[str]), ("schema", str), ("name", str)] ) +from dbt.artifacts.run import ( + RunResult, + RunResultsMetadata, + RunExecutionResult, + RunResultsArtifact, + process_run_result, +) -@dataclass -class StatsItem(dbtClassMixin): - id: str - label: str - value: Primitive - include: bool - description: Optional[str] = None - - -StatsDict = Dict[str, StatsItem] - - -@dataclass -class ColumnMetadata(dbtClassMixin): - type: str - index: int - name: str - comment: Optional[str] = None - - -ColumnMap = Dict[str, ColumnMetadata] - - -@dataclass -class TableMetadata(dbtClassMixin): - type: str - schema: str - name: str - database: Optional[str] = None - comment: Optional[str] = None - owner: Optional[str] = None - - -@dataclass -class CatalogTable(dbtClassMixin, Replaceable): - metadata: TableMetadata - columns: ColumnMap - stats: StatsDict - # the same table with two unique IDs will just be listed two times - unique_id: Optional[str] = None - - def key(self) -> CatalogKey: - return CatalogKey( - lowercase(self.metadata.database), - self.metadata.schema.lower(), - self.metadata.name.lower(), - ) - - -@dataclass -class CatalogMetadata(BaseArtifactMetadata): - dbt_schema_version: str = field( - default_factory=lambda: str(CatalogArtifact.dbt_schema_version) - ) - - -@dataclass -class CatalogResults(dbtClassMixin): - nodes: Dict[str, CatalogTable] - sources: Dict[str, CatalogTable] - errors: Optional[List[str]] = None - _compile_results: Optional[Any] = None - - def __post_serialize__(self, dct): - dct = super().__post_serialize__(dct) - if "_compile_results" in dct: - del dct["_compile_results"] - return dct - - -@dataclass -@schema_version("catalog", 1) -class CatalogArtifact(CatalogResults, ArtifactMixin): - metadata: CatalogMetadata +from dbt.artifacts.freshness import ( + FreshnessErrorEnum, + FreshnessMetadata, + FreshnessResult, + FreshnessExecutionResultArtifact, + FreshnessNodeResult, + FreshnessNodeOutput, + process_freshness_result, + SourceFreshnessResult, + SourceFreshnessRuntimeError, + SourceFreshnessOutput, + PartialSourceFreshnessResult, +) - @classmethod - def from_results( - cls, - generated_at: datetime, - nodes: Dict[str, CatalogTable], - sources: Dict[str, CatalogTable], - compile_results: Optional[Any], - errors: Optional[List[str]], - ) -> "CatalogArtifact": - meta = CatalogMetadata(generated_at=generated_at) - return cls( - metadata=meta, - nodes=nodes, - sources=sources, - errors=errors, - _compile_results=compile_results, - ) +from dbt.artifacts.catalog import ( + CatalogResults, + CatalogKey, + StatsItem, + ColumnMetadata, + TableMetadata, + CatalogTable, + CatalogMetadata, + CatalogResults, + CatalogArtifact, +) diff --git a/core/dbt/contracts/sql.py b/core/dbt/contracts/sql.py index 4488e563e7c..2bcab9fd1ed 100644 --- a/core/dbt/contracts/sql.py +++ b/core/dbt/contracts/sql.py @@ -6,14 +6,9 @@ from dbt.common.dataclass_schema import dbtClassMixin from dbt.contracts.graph.nodes import ResultNode -from dbt.contracts.results import ( - RunResult, - RunResultsArtifact, - TimingInfo, - ExecutionResult, - RunExecutionResult, -) -from dbt.contracts.util import VersionedSchema, schema_version +from dbt.artifacts.results import TimingInfo, ExecutionResult +from dbt.artifacts.run import RunResult, RunResultsArtifact, RunExecutionResult +from dbt.artifacts.base import VersionedSchema, schema_version from dbt.logger import LogMessage diff --git a/core/dbt/contracts/state.py b/core/dbt/contracts/state.py index 022f4833c35..b117ff9fe79 100644 --- a/core/dbt/contracts/state.py +++ b/core/dbt/contracts/state.py @@ -2,8 +2,8 @@ from typing import Optional from dbt.contracts.graph.manifest import WritableManifest -from dbt.contracts.results import FreshnessExecutionResultArtifact -from dbt.contracts.results import RunResultsArtifact +from dbt.artifacts.freshness import FreshnessExecutionResultArtifact +from dbt.artifacts.run import RunResultsArtifact from dbt.common.events.functions import fire_event from dbt.events.types import WarnStateTargetEqual from dbt.exceptions import IncompatibleSchemaError diff --git a/core/dbt/contracts/util.py b/core/dbt/contracts/util.py index ac544802f2a..012e68a247e 100644 --- a/core/dbt/contracts/util.py +++ b/core/dbt/contracts/util.py @@ -1,27 +1,9 @@ import dataclasses -from datetime import datetime -from typing import List, Tuple, ClassVar, Type, TypeVar, Dict, Any, Optional +from typing import List, Any, Tuple -from dbt.common.clients.system import write_json, read_json -from dbt.exceptions import ( - DbtInternalError, - DbtRuntimeError, - IncompatibleSchemaError, -) -from dbt.version import __version__ +from dbt.common.dataclass_schema import ValidatedStringMixin, ValidationError from dbt.common.contracts.util import Replaceable -from dbt.common.events.functions import get_metadata_vars -from dbt.common.invocation import get_invocation_id -from dbt.common.dataclass_schema import dbtClassMixin - -from dbt.common.dataclass_schema import ( - ValidatedStringMixin, - ValidationError, -) -from mashumaro.jsonschema import build_json_schema -from mashumaro.jsonschema.dialects import DRAFT_2020_12 -import functools SourceKey = Tuple[str, str] @@ -59,167 +41,6 @@ def merged(self, *args): return self.replace(**replacements) -class Writable: - def write(self, path: str): - write_json(path, self.to_dict(omit_none=False)) # type: ignore - - -class Readable: - @classmethod - def read(cls, path: str): - try: - data = read_json(path) - except (EnvironmentError, ValueError) as exc: - raise DbtRuntimeError( - f'Could not read {cls.__name__} at "{path}" as JSON: {exc}' - ) from exc - - return cls.from_dict(data) # type: ignore - - -BASE_SCHEMAS_URL = "https://schemas.getdbt.com/" -SCHEMA_PATH = "dbt/{name}/v{version}.json" - - -@dataclasses.dataclass -class SchemaVersion: - name: str - version: int - - @property - def path(self) -> str: - return SCHEMA_PATH.format(name=self.name, version=self.version) - - def __str__(self) -> str: - return BASE_SCHEMAS_URL + self.path - - -# This is used in the ManifestMetadata, RunResultsMetadata, RunOperationResultMetadata, -# FreshnessMetadata, and CatalogMetadata classes -@dataclasses.dataclass -class BaseArtifactMetadata(dbtClassMixin): - dbt_schema_version: str - dbt_version: str = __version__ - generated_at: datetime = dataclasses.field(default_factory=datetime.utcnow) - invocation_id: Optional[str] = dataclasses.field(default_factory=get_invocation_id) - env: Dict[str, str] = dataclasses.field(default_factory=get_metadata_vars) - - def __post_serialize__(self, dct): - dct = super().__post_serialize__(dct) - if dct["generated_at"] and dct["generated_at"].endswith("+00:00"): - dct["generated_at"] = dct["generated_at"].replace("+00:00", "") + "Z" - return dct - - -# This is used as a class decorator to set the schema_version in the -# 'dbt_schema_version' class attribute. (It's copied into the metadata objects.) -# Name attributes of SchemaVersion in classes with the 'schema_version' decorator: -# manifest -# run-results -# run-operation-result -# sources -# catalog -# remote-compile-result -# remote-execution-result -# remote-run-result -def schema_version(name: str, version: int): - def inner(cls: Type[VersionedSchema]): - cls.dbt_schema_version = SchemaVersion( - name=name, - version=version, - ) - return cls - - return inner - - -# This is used in the ArtifactMixin and RemoteResult classes -@dataclasses.dataclass -class VersionedSchema(dbtClassMixin): - dbt_schema_version: ClassVar[SchemaVersion] - - @classmethod - @functools.lru_cache - def json_schema(cls) -> Dict[str, Any]: - json_schema_obj = build_json_schema(cls, dialect=DRAFT_2020_12, with_dialect_uri=True) - json_schema = json_schema_obj.to_dict() - json_schema["$id"] = str(cls.dbt_schema_version) - return json_schema - - @classmethod - def is_compatible_version(cls, schema_version): - compatible_versions = [str(cls.dbt_schema_version)] - if hasattr(cls, "compatible_previous_versions"): - for name, version in cls.compatible_previous_versions(): - compatible_versions.append(str(SchemaVersion(name, version))) - return str(schema_version) in compatible_versions - - @classmethod - def read_and_check_versions(cls, path: str): - try: - data = read_json(path) - except (EnvironmentError, ValueError) as exc: - raise DbtRuntimeError( - f'Could not read {cls.__name__} at "{path}" as JSON: {exc}' - ) from exc - - # Check metadata version. There is a class variable 'dbt_schema_version', but - # that doesn't show up in artifacts, where it only exists in the 'metadata' - # dictionary. - if hasattr(cls, "dbt_schema_version"): - if "metadata" in data and "dbt_schema_version" in data["metadata"]: - previous_schema_version = data["metadata"]["dbt_schema_version"] - # cls.dbt_schema_version is a SchemaVersion object - if not cls.is_compatible_version(previous_schema_version): - raise IncompatibleSchemaError( - expected=str(cls.dbt_schema_version), - found=previous_schema_version, - ) - - return cls.upgrade_schema_version(data) - - @classmethod - def upgrade_schema_version(cls, data): - """This will modify the data (dictionary) passed in to match the current - artifact schema code, if necessary. This is the default method, which - just returns the instantiated object via from_dict.""" - return cls.from_dict(data) - - -T = TypeVar("T", bound="ArtifactMixin") - - -# metadata should really be a Generic[T_M] where T_M is a TypeVar bound to -# BaseArtifactMetadata. Unfortunately this isn't possible due to a mypy issue: -# https://github.com/python/mypy/issues/7520 -# This is used in the WritableManifest, RunResultsArtifact, RunOperationResultsArtifact, -# and CatalogArtifact -@dataclasses.dataclass(init=False) -class ArtifactMixin(VersionedSchema, Writable, Readable): - metadata: BaseArtifactMetadata - - @classmethod - def validate(cls, data): - super().validate(data) - if cls.dbt_schema_version is None: - raise DbtInternalError("Cannot call from_dict with no schema version!") - - -def get_artifact_schema_version(dct: dict) -> int: - schema_version = dct.get("metadata", {}).get("dbt_schema_version", None) - if not schema_version: - raise ValueError("Artifact is missing schema version") - - # schema_version is in this format: https://schemas.getdbt.com/dbt/manifest/v10.json - # What the code below is doing: - # 1. Split on "/" – v10.json - # 2. Split on "." – v10 - # 3. Skip first character – 10 - # 4. Convert to int - # TODO: If this gets more complicated, turn into a regex - return int(schema_version.split("/")[-1].split(".")[0][1:]) - - class Identifier(ValidatedStringMixin): """Our definition of a valid Identifier is the same as what's valid for an unquoted database table name. diff --git a/core/dbt/parser/manifest.py b/core/dbt/parser/manifest.py index 6c2041be084..9edee2c984b 100644 --- a/core/dbt/parser/manifest.py +++ b/core/dbt/parser/manifest.py @@ -108,7 +108,7 @@ NodeRelation, ) from dbt.contracts.graph.unparsed import NodeVersion -from dbt.contracts.util import Writable +from dbt.artifacts.base import Writable from dbt.exceptions import ( TargetNotFoundError, AmbiguousAliasError, diff --git a/core/dbt/plugins/contracts.py b/core/dbt/plugins/contracts.py index 5b95ae7fcab..6d36b341a9a 100644 --- a/core/dbt/plugins/contracts.py +++ b/core/dbt/plugins/contracts.py @@ -1,8 +1,8 @@ from typing import Dict # just exports, they need "noqa" so flake8 will not complain. -from dbt.contracts.util import ArtifactMixin as PluginArtifact, schema_version # noqa -from dbt.contracts.util import BaseArtifactMetadata # noqa +from dbt.artifacts.base import ArtifactMixin as PluginArtifact, schema_version # noqa +from dbt.artifacts.base import BaseArtifactMetadata # noqa from dbt.common.dataclass_schema import dbtClassMixin, ExtensibleDbtClassMixin # noqa diff --git a/core/dbt/task/base.py b/core/dbt/task/base.py index 8b8018cda86..3243272d8b7 100644 --- a/core/dbt/task/base.py +++ b/core/dbt/task/base.py @@ -16,14 +16,9 @@ from dbt.config.profile import read_profile from dbt.constants import DBT_PROJECT_FILE_NAME from dbt.contracts.graph.manifest import Manifest -from dbt.contracts.results import ( - NodeStatus, - RunResult, - collect_timing_info, - RunStatus, - RunningStatus, - TimingInfo, -) +from dbt.artifacts.results import TimingInfo, collect_timing_info +from dbt.artifacts.results import NodeStatus, RunningStatus, RunStatus +from dbt.artifacts.run import RunResult from dbt.common.events.contextvars import get_node_info from dbt.common.events.functions import fire_event from dbt.events.types import ( diff --git a/core/dbt/task/build.py b/core/dbt/task/build.py index a6c5071101b..a3fc08a1bce 100644 --- a/core/dbt/task/build.py +++ b/core/dbt/task/build.py @@ -5,13 +5,13 @@ from .seed import SeedRunner as seed_runner from .test import TestRunner as test_runner -from dbt.contracts.results import NodeStatus +from dbt.artifacts.results import NodeStatus, RunStatus +from dbt.artifacts.run import RunResult from dbt.common.exceptions import DbtInternalError from dbt.graph import ResourceTypeSelector from dbt.node_types import NodeType from dbt.task.test import TestSelector from dbt.task.base import BaseRunner -from dbt.contracts.results import RunResult, RunStatus from dbt.common.events.functions import fire_event from dbt.events.types import LogStartLine, LogModelResult from dbt.common.events.base_types import EventLevel diff --git a/core/dbt/task/clone.py b/core/dbt/task/clone.py index 7a782682f65..849eec1c8b1 100644 --- a/core/dbt/task/clone.py +++ b/core/dbt/task/clone.py @@ -4,7 +4,7 @@ from dbt.adapters.base import BaseRelation from dbt.clients.jinja import MacroGenerator from dbt.context.providers import generate_runtime_model_context -from dbt.contracts.results import RunStatus, RunResult +from dbt.artifacts.run import RunStatus, RunResult from dbt.common.dataclass_schema import dbtClassMixin from dbt.common.exceptions import DbtInternalError, CompilationError from dbt.graph import ResourceTypeSelector diff --git a/core/dbt/task/compile.py b/core/dbt/task/compile.py index a0e9e7d53f7..4d6e049b359 100644 --- a/core/dbt/task/compile.py +++ b/core/dbt/task/compile.py @@ -2,7 +2,7 @@ from typing import AbstractSet, Optional from dbt.contracts.graph.manifest import WritableManifest -from dbt.contracts.results import RunStatus, RunResult +from dbt.artifacts.run import RunStatus, RunResult from dbt.common.events.base_types import EventLevel from dbt.common.events.functions import fire_event from dbt.common.events.types import Note diff --git a/core/dbt/task/debug.py b/core/dbt/task/debug.py index 1405b1261a1..522f8a0189f 100644 --- a/core/dbt/task/debug.py +++ b/core/dbt/task/debug.py @@ -21,7 +21,7 @@ from dbt.adapters.factory import get_adapter, register_adapter from dbt.config import PartialProject, Project, Profile from dbt.config.renderer import DbtProjectYamlRenderer, ProfileRenderer -from dbt.contracts.results import RunStatus +from dbt.artifacts.results import RunStatus from dbt.clients.yaml_helper import load_yaml_text from dbt.links import ProfileConfigDocs from dbt.common.ui import green, red diff --git a/core/dbt/task/docs/generate.py b/core/dbt/task/docs/generate.py index a99a6b8f2d6..4b791f56c6a 100644 --- a/core/dbt/task/docs/generate.py +++ b/core/dbt/task/docs/generate.py @@ -15,8 +15,8 @@ from dbt.adapters.factory import get_adapter from dbt.contracts.graph.nodes import ResultNode from dbt.contracts.graph.manifest import Manifest -from dbt.contracts.results import ( - NodeStatus, +from dbt.artifacts.results import NodeStatus +from dbt.artifacts.catalog import ( TableMetadata, CatalogTable, CatalogResults, diff --git a/core/dbt/task/freshness.py b/core/dbt/task/freshness.py index 8ab704ff996..8301893add9 100644 --- a/core/dbt/task/freshness.py +++ b/core/dbt/task/freshness.py @@ -9,7 +9,7 @@ ) from .runnable import GraphRunnableTask -from dbt.contracts.results import ( +from dbt.artifacts.freshness import ( FreshnessResult, PartialSourceFreshnessResult, SourceFreshnessResult, diff --git a/core/dbt/task/printer.py b/core/dbt/task/printer.py index 5ee53b16490..a05e4089676 100644 --- a/core/dbt/task/printer.py +++ b/core/dbt/task/printer.py @@ -20,7 +20,7 @@ from dbt.tracking import InvocationProcessor from dbt.common.events.format import pluralize -from dbt.contracts.results import NodeStatus +from dbt.artifacts.results import NodeStatus from dbt.node_types import NodeType diff --git a/core/dbt/task/retry.py b/core/dbt/task/retry.py index 764b2dbf19e..72a8c6feedd 100644 --- a/core/dbt/task/retry.py +++ b/core/dbt/task/retry.py @@ -6,7 +6,7 @@ from dbt.flags import set_flags, get_flags from dbt.cli.types import Command as CliCommand from dbt.config import RuntimeConfig -from dbt.contracts.results import NodeStatus +from dbt.artifacts.results import NodeStatus from dbt.contracts.state import load_result_state from dbt.common.exceptions import DbtRuntimeError from dbt.graph import GraphQueue diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index d3955e5c7ed..ab730973af9 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -19,7 +19,8 @@ from dbt.context.providers import generate_runtime_model_context from dbt.contracts.graph.model_config import Hook from dbt.contracts.graph.nodes import HookNode, ResultNode -from dbt.contracts.results import NodeStatus, RunResult, RunStatus, RunningStatus, BaseResult +from dbt.artifacts.results import NodeStatus, RunStatus, RunningStatus, BaseResult +from dbt.artifacts.run import RunResult from dbt.exceptions import ( CompilationError, DbtInternalError, diff --git a/core/dbt/task/run_operation.py b/core/dbt/task/run_operation.py index 6ee4c08392b..e77d5840766 100644 --- a/core/dbt/task/run_operation.py +++ b/core/dbt/task/run_operation.py @@ -9,7 +9,8 @@ from dbt.adapters.factory import get_adapter from dbt.contracts.files import FileHash from dbt.contracts.graph.nodes import HookNode -from dbt.contracts.results import RunResultsArtifact, RunResult, RunStatus, TimingInfo +from dbt.artifacts.results import RunStatus, TimingInfo +from dbt.artifacts.run import RunResultsArtifact, RunResult from dbt.common.events.functions import fire_event from dbt.events.types import ( LogDebugStackTrace, diff --git a/core/dbt/task/runnable.py b/core/dbt/task/runnable.py index e95b496691e..81f1b4922bd 100644 --- a/core/dbt/task/runnable.py +++ b/core/dbt/task/runnable.py @@ -15,14 +15,8 @@ from dbt.adapters.factory import get_adapter from dbt.contracts.graph.manifest import WritableManifest from dbt.contracts.graph.nodes import ResultNode -from dbt.contracts.results import ( - NodeStatus, - RunExecutionResult, - RunningStatus, - RunResult, - RunStatus, - BaseResult, -) +from dbt.artifacts.results import NodeStatus, RunningStatus, RunStatus, BaseResult +from dbt.artifacts.run import RunExecutionResult, RunResult from dbt.contracts.state import PreviousState from dbt.common.events.contextvars import log_contextvars, task_contextvars from dbt.common.events.functions import fire_event, warn_or_error diff --git a/core/dbt/task/seed.py b/core/dbt/task/seed.py index 0ea6862c1f5..31dbd241e56 100644 --- a/core/dbt/task/seed.py +++ b/core/dbt/task/seed.py @@ -5,7 +5,7 @@ print_run_end_messages, ) -from dbt.contracts.results import RunStatus +from dbt.artifacts.results import RunStatus, NodeStatus from dbt.common.exceptions import DbtInternalError from dbt.graph import ResourceTypeSelector from dbt.logger import TextOnly @@ -18,7 +18,6 @@ LogStartLine, ) from dbt.node_types import NodeType -from dbt.contracts.results import NodeStatus class SeedRunner(ModelRunner): diff --git a/core/dbt/task/show.py b/core/dbt/task/show.py index b1778813f2f..9f8c132a0ea 100644 --- a/core/dbt/task/show.py +++ b/core/dbt/task/show.py @@ -4,7 +4,7 @@ from dbt.context.providers import generate_runtime_model_context from dbt.contracts.graph.nodes import SeedNode -from dbt.contracts.results import RunResult, RunStatus +from dbt.artifacts.run import RunResult, RunStatus from dbt.common.events.base_types import EventLevel from dbt.common.events.functions import fire_event from dbt.common.events.types import Note diff --git a/core/dbt/task/snapshot.py b/core/dbt/task/snapshot.py index fe04e0761f9..a3469248f1b 100644 --- a/core/dbt/task/snapshot.py +++ b/core/dbt/task/snapshot.py @@ -6,7 +6,7 @@ from dbt.events.types import LogSnapshotResult from dbt.graph import ResourceTypeSelector from dbt.node_types import NodeType -from dbt.contracts.results import NodeStatus +from dbt.artifacts.results import NodeStatus from dbt.common.utils import cast_dict_to_dict_of_strings diff --git a/core/dbt/task/test.py b/core/dbt/task/test.py index 07dc04d4552..344e712aa72 100644 --- a/core/dbt/task/test.py +++ b/core/dbt/task/test.py @@ -14,7 +14,9 @@ TestNode, ) from dbt.contracts.graph.manifest import Manifest -from dbt.contracts.results import TestStatus, PrimitiveDict, RunResult +from dbt.artifacts.results import TestStatus +from dbt.artifacts.run import RunResult +from dbt.artifacts.catalog import PrimitiveDict from dbt.context.providers import generate_runtime_model_context from dbt.clients.jinja import MacroGenerator from dbt.common.events.functions import fire_event diff --git a/scripts/collect-artifact-schema.py b/scripts/collect-artifact-schema.py index 88897925ebf..96069e613a9 100755 --- a/scripts/collect-artifact-schema.py +++ b/scripts/collect-artifact-schema.py @@ -6,12 +6,10 @@ from typing import Type, Dict, Any from dbt.contracts.graph.manifest import WritableManifest -from dbt.contracts.results import ( - CatalogArtifact, - RunResultsArtifact, - FreshnessExecutionResultArtifact, -) -from dbt.contracts.util import VersionedSchema +from dbt.artifacts.catalog import CatalogArtifact +from dbt.artifacts.run import RunResultsArtifact +from dbt.artifacts.freshness import FreshnessExecutionResultArtifact +from dbt.artifacts.base import VersionedSchema from dbt.common.clients.system import write_file diff --git a/tests/adapter/dbt/tests/adapter/basic/test_incremental.py b/tests/adapter/dbt/tests/adapter/basic/test_incremental.py index 3c13e510611..fbba4bf7c31 100644 --- a/tests/adapter/dbt/tests/adapter/basic/test_incremental.py +++ b/tests/adapter/dbt/tests/adapter/basic/test_incremental.py @@ -1,6 +1,6 @@ import pytest from dbt.tests.util import run_dbt, check_relations_equal, relation_from_name -from dbt.contracts.results import RunStatus +from dbt.artifacts.results import RunStatus from dbt.tests.adapter.basic.files import ( seeds_base_csv, seeds_added_csv, diff --git a/tests/adapter/dbt/tests/adapter/catalog/relation_types.py b/tests/adapter/dbt/tests/adapter/catalog/relation_types.py index d09fb3d1fc3..f67f9c4718a 100644 --- a/tests/adapter/dbt/tests/adapter/catalog/relation_types.py +++ b/tests/adapter/dbt/tests/adapter/catalog/relation_types.py @@ -1,4 +1,4 @@ -from dbt.contracts.results import CatalogArtifact +from dbt.artifacts.catalog import CatalogArtifact from dbt.tests.util import run_dbt import pytest diff --git a/tests/adapter/dbt/tests/adapter/incremental/test_incremental_unique_id.py b/tests/adapter/dbt/tests/adapter/incremental/test_incremental_unique_id.py index 328b0d7e4da..01f9c2da235 100644 --- a/tests/adapter/dbt/tests/adapter/incremental/test_incremental_unique_id.py +++ b/tests/adapter/dbt/tests/adapter/incremental/test_incremental_unique_id.py @@ -1,6 +1,6 @@ import pytest from dbt.tests.util import run_dbt, check_relations_equal -from dbt.contracts.results import RunStatus +from dbt.artifacts.results import RunStatus from collections import namedtuple from pathlib import Path diff --git a/tests/adapter/dbt/tests/adapter/store_test_failures_tests/basic.py b/tests/adapter/dbt/tests/adapter/store_test_failures_tests/basic.py index e8beb0f1fde..d4d1bf338c3 100644 --- a/tests/adapter/dbt/tests/adapter/store_test_failures_tests/basic.py +++ b/tests/adapter/dbt/tests/adapter/store_test_failures_tests/basic.py @@ -3,7 +3,7 @@ import pytest -from dbt.contracts.results import TestStatus +from dbt.artifacts.results import TestStatus from dbt.tests.util import run_dbt, check_relation_types from dbt.tests.adapter.store_test_failures_tests import _files diff --git a/tests/functional/artifacts/test_artifacts.py b/tests/functional/artifacts/test_artifacts.py index cf467d85833..ff854cab040 100644 --- a/tests/functional/artifacts/test_artifacts.py +++ b/tests/functional/artifacts/test_artifacts.py @@ -17,7 +17,8 @@ ) from dbt.contracts.graph.manifest import WritableManifest -from dbt.contracts.results import RunResultsArtifact, RunStatus +from dbt.artifacts.results import RunStatus +from dbt.artifacts.run import RunResultsArtifact models__schema_yml = """ version: 2 diff --git a/tests/functional/artifacts/test_previous_version_state.py b/tests/functional/artifacts/test_previous_version_state.py index 33d5749f9a3..38366a899ca 100644 --- a/tests/functional/artifacts/test_previous_version_state.py +++ b/tests/functional/artifacts/test_previous_version_state.py @@ -4,9 +4,9 @@ import pytest -from dbt.contracts.util import get_artifact_schema_version +from dbt.artifacts.base import get_artifact_schema_version from dbt.contracts.graph.manifest import WritableManifest -from dbt.contracts.results import RunResultsArtifact +from dbt.artifacts.run import RunResultsArtifact from dbt.exceptions import IncompatibleSchemaError from dbt.tests.util import run_dbt, get_manifest diff --git a/tests/unit/test_contracts_graph_unparsed.py b/tests/unit/test_contracts_graph_unparsed.py index fbf23a082c7..94578fdb303 100644 --- a/tests/unit/test_contracts_graph_unparsed.py +++ b/tests/unit/test_contracts_graph_unparsed.py @@ -26,7 +26,7 @@ UnparsedMetricInputMeasure, UnparsedVersion, ) -from dbt.contracts.results import FreshnessStatus +from dbt.artifacts.results import FreshnessStatus from dbt.node_types import NodeType from .utils import ContractTestCase diff --git a/tests/unit/test_events.py b/tests/unit/test_events.py index 467babe91b0..0cb68e19b14 100644 --- a/tests/unit/test_events.py +++ b/tests/unit/test_events.py @@ -7,7 +7,8 @@ from dbt.adapters.events import types as adapter_types from dbt.common.events.event_manager_client import ctx_set_event_manager -from dbt.contracts.results import TimingInfo, RunResult, RunStatus +from dbt.artifacts.results import TimingInfo, RunStatus +from dbt.artifacts.run import RunResult from dbt.common.events import types from dbt.adapters.events.logging import AdapterLogger from dbt.common.events.base_types import msg_from_base_event