From 666cc3bb72a04a0c6a010b703e6e6086077100ba Mon Sep 17 00:00:00 2001 From: Colin Date: Tue, 1 Oct 2024 13:58:54 -0700 Subject: [PATCH 1/6] add py.typed --- core/dbt/config/external_config.py | 26 ++++++++++++++++++++++++++ core/dbt/config/runtime.py | 18 +++++++++++++++--- core/dbt/constants.py | 1 + dev-requirements.txt | 3 ++- 4 files changed, 44 insertions(+), 4 deletions(-) create mode 100644 core/dbt/config/external_config.py diff --git a/core/dbt/config/external_config.py b/core/dbt/config/external_config.py new file mode 100644 index 00000000000..4bf581cc528 --- /dev/null +++ b/core/dbt/config/external_config.py @@ -0,0 +1,26 @@ +from typing import Optional + +from dbt_config.external_config import ExternalCatalogConfig + +from dbt.clients.yaml_helper import load_yaml_text +from dbt.constants import EXTERNAL_CATALOG_FILE_NAME +from dbt_common.clients.system import load_file_contents, path_exists + + +def _load_yaml(path): + contents = load_file_contents(path) + return load_yaml_text(contents) + + +def _load_yml_dict(file_path): + if path_exists(file_path): + ret = _load_yaml(file_path) or {} + return ret + return None + + +def load_external_catalog_config(project_root) -> Optional[ExternalCatalogConfig]: + unparsed_config = _load_yml_dict(f"{project_root}/{EXTERNAL_CATALOG_FILE_NAME}") + if unparsed_config is not None: + return ExternalCatalogConfig.model_validate(unparsed_config) + return None diff --git a/core/dbt/config/runtime.py b/core/dbt/config/runtime.py index e1c24cf5f0c..463ecacd24d 100644 --- a/core/dbt/config/runtime.py +++ b/core/dbt/config/runtime.py @@ -15,6 +15,8 @@ Type, ) +from dbt_config.external_config import ExternalCatalogConfig + from dbt import tracking from dbt.adapters.contracts.connection import ( AdapterRequiredConfig, @@ -39,6 +41,7 @@ from dbt_common.events.functions import warn_or_error from dbt_common.helper_types import DictDefaultEmptyStr, FQNPath, PathSet +from .external_config import load_external_catalog_config from .profile import Profile from .project import Project from .renderer import DbtProjectYamlRenderer, ProfileRenderer @@ -98,6 +101,7 @@ class RuntimeConfig(Project, Profile, AdapterRequiredConfig): profile_name: str cli_vars: Dict[str, Any] dependencies: Optional[Mapping[str, "RuntimeConfig"]] = None + catalogs: Optional[ExternalCatalogConfig] = None def __post_init__(self): self.validate() @@ -125,12 +129,15 @@ def from_parts( profile: Profile, args: Any, dependencies: Optional[Mapping[str, "RuntimeConfig"]] = None, + catalogs: Optional[ExternalCatalogConfig] = None, ) -> "RuntimeConfig": """Instantiate a RuntimeConfig from its components. :param profile: A parsed dbt Profile. :param project: A parsed dbt Project. :param args: The parsed command-line arguments. + :param dependencies: A mapping of project names to RuntimeConfigs. + :param catalogs: A parsed dbt ExternalCatalogConfig. :returns RuntimeConfig: The new configuration. """ quoting: Dict[str, Any] = ( @@ -194,6 +201,7 @@ def from_parts( dependencies=dependencies, dbt_cloud=project.dbt_cloud, flags=project.flags, + catalogs=catalogs, ) # Called by 'load_projects' in this class @@ -253,7 +261,9 @@ def validate(self): # Called by RuntimeConfig.from_args @classmethod - def collect_parts(cls: Type["RuntimeConfig"], args: Any) -> Tuple[Project, Profile]: + def collect_parts( + cls: Type["RuntimeConfig"], args: Any + ) -> Tuple[Project, Profile, Optional[ExternalCatalogConfig]]: # profile_name from the project project_root = args.project_dir if args.project_dir else os.getcwd() cli_vars: Dict[str, Any] = getattr(args, "vars", {}) @@ -264,7 +274,8 @@ def collect_parts(cls: Type["RuntimeConfig"], args: Any) -> Tuple[Project, Profi ) flags = get_flags() project = load_project(project_root, bool(flags.VERSION_CHECK), profile, cli_vars) - return project, profile + catalogs = load_external_catalog_config(project) + return project, profile, catalogs # Called in task/base.py, in BaseTask.from_args @classmethod @@ -278,12 +289,13 @@ def from_args(cls, args: Any) -> "RuntimeConfig": :raises DbtProfileError: If the profile is invalid or missing. :raises DbtValidationError: If the cli variables are invalid. """ - project, profile = cls.collect_parts(args) + project, profile, catalogs = cls.collect_parts(args) return cls.from_parts( project=project, profile=profile, args=args, + catalogs=catalogs, ) def get_metadata(self) -> ManifestMetadata: diff --git a/core/dbt/constants.py b/core/dbt/constants.py index 0ff538910d5..a74110116af 100644 --- a/core/dbt/constants.py +++ b/core/dbt/constants.py @@ -15,6 +15,7 @@ PACKAGES_FILE_NAME = "packages.yml" DEPENDENCIES_FILE_NAME = "dependencies.yml" PACKAGE_LOCK_FILE_NAME = "package-lock.yml" +EXTERNAL_CATALOG_FILE_NAME = "catalog.yml" MANIFEST_FILE_NAME = "manifest.json" SEMANTIC_MANIFEST_FILE_NAME = "semantic_manifest.json" LEGACY_TIME_SPINE_MODEL_NAME = "metricflow_time_spine" diff --git a/dev-requirements.txt b/dev-requirements.txt index 20605e632b8..55962bb5136 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,6 +1,7 @@ git+https://github.com/dbt-labs/dbt-adapters.git@main git+https://github.com/dbt-labs/dbt-adapters.git@main#subdirectory=dbt-tests-adapter -git+https://github.com/dbt-labs/dbt-common.git@main +git+https://github.com/dbt-labs/dbt-common.git@feature/externalCatalogConfig +git+https://github.com/dbt-labs/dbt-common.git@feature/externalCatalogConfig#egg=dbt-config&subdirectory=config git+https://github.com/dbt-labs/dbt-postgres.git@main # black must match what's in .pre-commit-config.yaml to be sure local env matches CI black==24.3.0 From b5bf57b910982eac6a2a1b75b6b63f7c5b252d22 Mon Sep 17 00:00:00 2001 From: Colin Date: Wed, 2 Oct 2024 08:57:51 -0700 Subject: [PATCH 2/6] add catalog config to manifest.py --- core/dbt/config/external_config.py | 10 ++++------ core/dbt/config/renderer.py | 6 ++++++ core/dbt/contracts/graph/manifest.py | 2 ++ tests/functional/configs/fixtures.py | 2 +- tests/unit/utils/__init__.py | 20 ++++++++++++++++++-- 5 files changed, 31 insertions(+), 9 deletions(-) diff --git a/core/dbt/config/external_config.py b/core/dbt/config/external_config.py index 4bf581cc528..7533a8becbf 100644 --- a/core/dbt/config/external_config.py +++ b/core/dbt/config/external_config.py @@ -1,6 +1,4 @@ -from typing import Optional - -from dbt_config.external_config import ExternalCatalogConfig +from typing import Dict, Optional from dbt.clients.yaml_helper import load_yaml_text from dbt.constants import EXTERNAL_CATALOG_FILE_NAME @@ -19,8 +17,8 @@ def _load_yml_dict(file_path): return None -def load_external_catalog_config(project_root) -> Optional[ExternalCatalogConfig]: - unparsed_config = _load_yml_dict(f"{project_root}/{EXTERNAL_CATALOG_FILE_NAME}") +def load_external_catalog_config(project) -> Optional[Dict]: + unparsed_config = _load_yml_dict(f"{project.project_root}/{EXTERNAL_CATALOG_FILE_NAME}") if unparsed_config is not None: - return ExternalCatalogConfig.model_validate(unparsed_config) + return unparsed_config return None diff --git a/core/dbt/config/renderer.py b/core/dbt/config/renderer.py index 4f605979e62..d4b5ad75c3c 100644 --- a/core/dbt/config/renderer.py +++ b/core/dbt/config/renderer.py @@ -229,3 +229,9 @@ class PackageRenderer(SecretRenderer): @property def name(self): return "Packages config" + + +class CatalogRenderer(SecretRenderer): + @property + def name(self): + return "Catalog config" diff --git a/core/dbt/contracts/graph/manifest.py b/core/dbt/contracts/graph/manifest.py index f4cdafea737..f9a2de34fdf 100644 --- a/core/dbt/contracts/graph/manifest.py +++ b/core/dbt/contracts/graph/manifest.py @@ -20,6 +20,7 @@ Union, ) +from dbt_config.external_config import ExternalCatalog from typing_extensions import Protocol import dbt_common.exceptions @@ -844,6 +845,7 @@ class Manifest(MacroMethods, dbtClassMixin): unit_tests: MutableMapping[str, UnitTestDefinition] = field(default_factory=dict) saved_queries: MutableMapping[str, SavedQuery] = field(default_factory=dict) fixtures: MutableMapping[str, UnitTestFileFixture] = field(default_factory=dict) + catalogs: MutableMapping[str, ExternalCatalog] = field(default_factory=dict) _doc_lookup: Optional[DocLookup] = field( default=None, metadata={"serialize": lambda x: None, "deserialize": lambda x: None} diff --git a/tests/functional/configs/fixtures.py b/tests/functional/configs/fixtures.py index 63490289528..63f58b89aef 100644 --- a/tests/functional/configs/fixtures.py +++ b/tests/functional/configs/fixtures.py @@ -32,7 +32,7 @@ models__untagged_sql = """ {{ - config(materialized='table') + config(materialized=table) }} select id, value from {{ source('raw', 'seed') }} diff --git a/tests/unit/utils/__init__.py b/tests/unit/utils/__init__.py index ec9cb57595d..32e72cc9bbd 100644 --- a/tests/unit/utils/__init__.py +++ b/tests/unit/utils/__init__.py @@ -6,12 +6,15 @@ import os import string +from typing import Dict from unittest import TestCase, mock import agate import pytest +from dbt_config.external_config import ExternalCatalogConfig from dbt.config.project import PartialProject +from dbt.config.renderer import CatalogRenderer from dbt.contracts.graph.manifest import Manifest from dbt_common.dataclass_schema import ValidationError @@ -57,6 +60,14 @@ def profile_from_dict(profile, profile_name, cli_vars="{}"): ) +def catalog_from_dict(catalog, cli_vars=None): + if cli_vars is None: + cli_vars = {} + renderer = CatalogRenderer(cli_vars) + rendered = renderer.render_value(catalog) + return ExternalCatalogConfig.model_validate(rendered) + + def project_from_dict(project, profile, packages=None, selectors=None, cli_vars="{}"): from dbt.config.renderer import DbtProjectYamlRenderer from dbt.config.utils import parse_cli_vars @@ -77,7 +88,9 @@ def project_from_dict(project, profile, packages=None, selectors=None, cli_vars= return partial.render(renderer) -def config_from_parts_or_dicts(project, profile, packages=None, selectors=None, cli_vars={}): +def config_from_parts_or_dicts( + project, profile, packages=None, selectors=None, cli_vars={}, catalogs=None +): from copy import deepcopy from dbt.config import Profile, Project, RuntimeConfig @@ -103,10 +116,13 @@ def config_from_parts_or_dicts(project, profile, packages=None, selectors=None, cli_vars, ) + if isinstance(catalogs, Dict): + catalogs = catalog_from_dict(catalogs, cli_vars) + args = Obj() args.vars = cli_vars args.profile_dir = "/dev/null" - return RuntimeConfig.from_parts(project=project, profile=profile, args=args) + return RuntimeConfig.from_parts(project=project, profile=profile, args=args, catalogs=catalogs) def inject_plugin(plugin): From 706ff326e94a232096ebec03da5669584a41ae2c Mon Sep 17 00:00:00 2001 From: Colin Date: Wed, 2 Oct 2024 08:58:51 -0700 Subject: [PATCH 3/6] add catalog config to manifest.py --- tests/functional/test_external_catalog.py | 37 +++++++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 tests/functional/test_external_catalog.py diff --git a/tests/functional/test_external_catalog.py b/tests/functional/test_external_catalog.py new file mode 100644 index 00000000000..8e8f89617da --- /dev/null +++ b/tests/functional/test_external_catalog.py @@ -0,0 +1,37 @@ +import pytest +import yaml + +from dbt.tests.util import run_dbt, write_file +from tests.fixtures.jaffle_shop import JaffleShopProject + + +@pytest.fixture(scope="class", autouse=True) +def dbt_catalog_config(project_root): + config = { + "name": "my_project", + "version": "0.1", + "config-version": 2, + "external-catalog": { + "name": "my_external_catalog", + "type": "iceberg", + "configuration": { + "table_format": "parquet", + "namespace": "dbt", + "external_location": "s3://my-bucket/my-path", + }, + "management": { + "enabled": True, + "create_if_not_exists": False, + "alter_if_different": False, + "read_only": True, + "refresh": "on_change", + }, + }, + } + write_file(yaml.safe_dump(config), project_root, "catalog.yml") + + +class TestCatalogConfig(JaffleShopProject): + + def test_supplying_external_catalog(self, project): + run_dbt(["build"]) From 288be9675486d0d3e98a4851162057a6d4902751 Mon Sep 17 00:00:00 2001 From: Colin Date: Thu, 3 Oct 2024 15:52:06 -0700 Subject: [PATCH 4/6] explore where/how to ingest the catalog config --- core/dbt/config/runtime.py | 5 +-- core/dbt/context/providers.py | 1 + core/dbt/contracts/graph/manifest.py | 3 +- core/dbt/parser/manifest.py | 15 +++++++- tests/functional/test_external_catalog.py | 42 ++++++++++++----------- tests/unit/utils/__init__.py | 2 +- 6 files changed, 42 insertions(+), 26 deletions(-) diff --git a/core/dbt/config/runtime.py b/core/dbt/config/runtime.py index 463ecacd24d..0727053def0 100644 --- a/core/dbt/config/runtime.py +++ b/core/dbt/config/runtime.py @@ -15,7 +15,7 @@ Type, ) -from dbt_config.external_config import ExternalCatalogConfig +from dbt_config.catalog_config import ExternalCatalogConfig from dbt import tracking from dbt.adapters.contracts.connection import ( @@ -274,7 +274,8 @@ def collect_parts( ) flags = get_flags() project = load_project(project_root, bool(flags.VERSION_CHECK), profile, cli_vars) - catalogs = load_external_catalog_config(project) + catalog_yml = load_external_catalog_config(project) + catalogs = ExternalCatalogConfig.model_validate(catalog_yml) if catalog_yml else None return project, profile, catalogs # Called in task/base.py, in BaseTask.from_args diff --git a/core/dbt/context/providers.py b/core/dbt/context/providers.py index a0e3751587a..6cd0c6d8b19 100644 --- a/core/dbt/context/providers.py +++ b/core/dbt/context/providers.py @@ -1295,6 +1295,7 @@ def api(self) -> Dict[str, Any]: return { "Relation": self.db_wrapper.Relation, "Column": self.adapter.Column, + # "Catalog": self.adapter.Catalog, } @contextproperty() diff --git a/core/dbt/contracts/graph/manifest.py b/core/dbt/contracts/graph/manifest.py index f9a2de34fdf..9a76c255d4a 100644 --- a/core/dbt/contracts/graph/manifest.py +++ b/core/dbt/contracts/graph/manifest.py @@ -20,7 +20,6 @@ Union, ) -from dbt_config.external_config import ExternalCatalog from typing_extensions import Protocol import dbt_common.exceptions @@ -845,7 +844,7 @@ class Manifest(MacroMethods, dbtClassMixin): unit_tests: MutableMapping[str, UnitTestDefinition] = field(default_factory=dict) saved_queries: MutableMapping[str, SavedQuery] = field(default_factory=dict) fixtures: MutableMapping[str, UnitTestFileFixture] = field(default_factory=dict) - catalogs: MutableMapping[str, ExternalCatalog] = field(default_factory=dict) + catalogs: MutableMapping[str, str] = field(default_factory=dict) _doc_lookup: Optional[DocLookup] = field( default=None, metadata={"serialize": lambda x: None, "deserialize": lambda x: None} diff --git a/core/dbt/parser/manifest.py b/core/dbt/parser/manifest.py index 7ffd00febc5..151386c9337 100644 --- a/core/dbt/parser/manifest.py +++ b/core/dbt/parser/manifest.py @@ -10,6 +10,7 @@ from typing import Any, Callable, Dict, List, Mapping, Optional, Set, Tuple, Type, Union import msgpack +from dbt_config.catalog_config import ExternalCatalogConfig import dbt.deprecations import dbt.exceptions @@ -29,6 +30,7 @@ from dbt.clients.jinja import MacroStack, get_rendered from dbt.clients.jinja_static import statically_extract_macro_calls from dbt.config import Project, RuntimeConfig +from dbt.config.external_config import load_external_catalog_config from dbt.constants import ( MANIFEST_FILE_NAME, PARTIAL_PARSE_FILE_NAME, @@ -443,7 +445,12 @@ def load(self) -> Manifest: patcher.construct_sources() self.manifest.sources = patcher.sources self._perf_info.patch_sources_elapsed = time.perf_counter() - start_patch - + raw_catalog = load_external_catalog_config(self.root_project) + if raw_catalog: + catalog_config = ExternalCatalogConfig.model_validate(raw_catalog) + self.manifest.catalogs = { + c.name: c.model_dump_json() for c in catalog_config.catalogs + } # We need to rebuild disabled in order to include disabled sources self.manifest.rebuild_disabled_lookup() @@ -466,6 +473,7 @@ def load(self) -> Manifest: self.process_docs(self.root_project) self.process_metrics(self.root_project) self.process_saved_queries(self.root_project) + self.process_catalog(self.root_project) self.process_model_inferred_primary_keys() self.check_valid_group_config() self.check_valid_access_property() @@ -1140,6 +1148,11 @@ def process_metrics(self, config: RuntimeConfig): continue _process_metrics_for_node(self.manifest, current_project, exposure) + def process_catalog(self, config: RuntimeConfig): + if config.catalogs: + for catalog in config.catalogs.catalogs: + self.manifest.catalogs[catalog.name] = catalog.model_dump_json() + def process_saved_queries(self, config: RuntimeConfig): """Processes SavedQuery nodes to populate their `depends_on`.""" # Note: This will also capture various nodes which have been re-parsed diff --git a/tests/functional/test_external_catalog.py b/tests/functional/test_external_catalog.py index 8e8f89617da..3c5e81b2f94 100644 --- a/tests/functional/test_external_catalog.py +++ b/tests/functional/test_external_catalog.py @@ -1,5 +1,6 @@ import pytest import yaml +from dbt_config.catalog_config import ExternalCatalog from dbt.tests.util import run_dbt, write_file from tests.fixtures.jaffle_shop import JaffleShopProject @@ -8,25 +9,24 @@ @pytest.fixture(scope="class", autouse=True) def dbt_catalog_config(project_root): config = { - "name": "my_project", - "version": "0.1", - "config-version": 2, - "external-catalog": { - "name": "my_external_catalog", - "type": "iceberg", - "configuration": { - "table_format": "parquet", - "namespace": "dbt", - "external_location": "s3://my-bucket/my-path", - }, - "management": { - "enabled": True, - "create_if_not_exists": False, - "alter_if_different": False, - "read_only": True, - "refresh": "on_change", - }, - }, + "catalogs": [ + { + "name": "my_external_catalog", + "type": "iceberg", + "configuration": { + "table_format": "parquet", + "namespace": "dbt", + "external_location": "s3://my-bucket/my-path", + }, + "management": { + "enabled": True, + "create_if_not_exists": False, + "alter_if_different": False, + "read_only": True, + "refresh": "on_change", + }, + } + ], } write_file(yaml.safe_dump(config), project_root, "catalog.yml") @@ -34,4 +34,6 @@ def dbt_catalog_config(project_root): class TestCatalogConfig(JaffleShopProject): def test_supplying_external_catalog(self, project): - run_dbt(["build"]) + manifest = run_dbt(["parse"]) + assert manifest.catalogs != {} + ExternalCatalog.model_validate_json(manifest.catalogs["my_external_catalog"]) diff --git a/tests/unit/utils/__init__.py b/tests/unit/utils/__init__.py index 32e72cc9bbd..9c106338e9d 100644 --- a/tests/unit/utils/__init__.py +++ b/tests/unit/utils/__init__.py @@ -11,7 +11,7 @@ import agate import pytest -from dbt_config.external_config import ExternalCatalogConfig +from dbt_config.catalog_config import ExternalCatalogConfig from dbt.config.project import PartialProject from dbt.config.renderer import CatalogRenderer From 418ce4d62f84d2fa6ea9ba40d56406286d238172 Mon Sep 17 00:00:00 2001 From: Colin Date: Wed, 9 Oct 2024 12:55:45 -0700 Subject: [PATCH 5/6] add functional test and add catalogs to source resolution --- core/dbt/contracts/graph/manifest.py | 21 +++++++++++++++++++++ core/dbt/parser/manifest.py | 6 ++++-- tests/functional/test_external_catalog.py | 19 ++++++++++++++----- 3 files changed, 39 insertions(+), 7 deletions(-) diff --git a/core/dbt/contracts/graph/manifest.py b/core/dbt/contracts/graph/manifest.py index 9a76c255d4a..9b4544318cd 100644 --- a/core/dbt/contracts/graph/manifest.py +++ b/core/dbt/contracts/graph/manifest.py @@ -20,6 +20,7 @@ Union, ) +from dbt_config.catalog_config import ExternalCatalog from typing_extensions import Protocol import dbt_common.exceptions @@ -1333,6 +1334,26 @@ def resolve_source( current_project: str, node_package: str, ) -> MaybeParsedSource: + if target_source_name in self.catalogs: + catalog = ExternalCatalog.model_validate_json(self.catalogs[target_source_name]) + identifier = f"{target_source_name}.{target_table_name}" + catalog_database = catalog.configuration.internal_namespace.database + catalog_schema = catalog.configuration.internal_namespace.schema_ + return SourceDefinition( + database=catalog_database, + schema=catalog_schema, + fqn=[catalog_database, catalog_schema, catalog.name, target_table_name], + name=target_table_name, + source_description=f"External Catalog source for {target_source_name}.{target_table_name}", + source_name=target_source_name, + unique_id=identifier, + identifier=identifier, + package_name="dbt", + path="/root/catalogs.yml", + loader=catalog.type.value, + resource_type=NodeType.Source, + original_file_path="/root/catalogs.yml", + ) search_name = f"{target_source_name}.{target_table_name}" candidates = _packages_to_search(current_project, node_package) diff --git a/core/dbt/parser/manifest.py b/core/dbt/parser/manifest.py index 151386c9337..8dc743b965d 100644 --- a/core/dbt/parser/manifest.py +++ b/core/dbt/parser/manifest.py @@ -445,11 +445,13 @@ def load(self) -> Manifest: patcher.construct_sources() self.manifest.sources = patcher.sources self._perf_info.patch_sources_elapsed = time.perf_counter() - start_patch + + # Get catalog.yml and update the manifest raw_catalog = load_external_catalog_config(self.root_project) if raw_catalog: catalog_config = ExternalCatalogConfig.model_validate(raw_catalog) self.manifest.catalogs = { - c.name: c.model_dump_json() for c in catalog_config.catalogs + c.name: c.model_dump_json(by_alias=True) for c in catalog_config.catalogs } # We need to rebuild disabled in order to include disabled sources self.manifest.rebuild_disabled_lookup() @@ -1151,7 +1153,7 @@ def process_metrics(self, config: RuntimeConfig): def process_catalog(self, config: RuntimeConfig): if config.catalogs: for catalog in config.catalogs.catalogs: - self.manifest.catalogs[catalog.name] = catalog.model_dump_json() + self.manifest.catalogs[catalog.name] = catalog.model_dump_json(by_alias=True) def process_saved_queries(self, config: RuntimeConfig): """Processes SavedQuery nodes to populate their `depends_on`.""" diff --git a/tests/functional/test_external_catalog.py b/tests/functional/test_external_catalog.py index 3c5e81b2f94..6c97b64387e 100644 --- a/tests/functional/test_external_catalog.py +++ b/tests/functional/test_external_catalog.py @@ -3,7 +3,6 @@ from dbt_config.catalog_config import ExternalCatalog from dbt.tests.util import run_dbt, write_file -from tests.fixtures.jaffle_shop import JaffleShopProject @pytest.fixture(scope="class", autouse=True) @@ -14,8 +13,12 @@ def dbt_catalog_config(project_root): "name": "my_external_catalog", "type": "iceberg", "configuration": { - "table_format": "parquet", - "namespace": "dbt", + "table_format": "iceberg", + "catalog_namespace": "dbt", + "internal_namespace": { + "database": "my_db", + "schema": "my_schema", + }, "external_location": "s3://my-bucket/my-path", }, "management": { @@ -23,7 +26,7 @@ def dbt_catalog_config(project_root): "create_if_not_exists": False, "alter_if_different": False, "read_only": True, - "refresh": "on_change", + "refresh": "on-start", }, } ], @@ -31,9 +34,15 @@ def dbt_catalog_config(project_root): write_file(yaml.safe_dump(config), project_root, "catalog.yml") -class TestCatalogConfig(JaffleShopProject): +class TestCatalogConfig: + @pytest.fixture(scope="class") + def models(self): + return { + "model.sql": "select 1 as id from {{ source('my_external_catalog', 'my_table') }}", + } def test_supplying_external_catalog(self, project): manifest = run_dbt(["parse"]) assert manifest.catalogs != {} + assert manifest.nodes["model.test.model"].sources == [["my_external_catalog", "my_table"]] ExternalCatalog.model_validate_json(manifest.catalogs["my_external_catalog"]) From d76fc5751aa67aac71653dc6a019423e4b21a7e6 Mon Sep 17 00:00:00 2001 From: Colin Date: Thu, 10 Oct 2024 12:51:30 -0700 Subject: [PATCH 6/6] add catalog to provider and components --- core/dbt/artifacts/resources/v1/components.py | 3 +++ core/dbt/context/providers.py | 6 +++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/core/dbt/artifacts/resources/v1/components.py b/core/dbt/artifacts/resources/v1/components.py index 02bfa5d875d..836b62a95c0 100644 --- a/core/dbt/artifacts/resources/v1/components.py +++ b/core/dbt/artifacts/resources/v1/components.py @@ -3,6 +3,8 @@ from datetime import timedelta from typing import Any, Dict, List, Optional, Union +from dbt_config.catalog_config import ExternalCatalog + from dbt.artifacts.resources.base import Docs, FileHash, GraphResource from dbt.artifacts.resources.types import NodeType, TimePeriod from dbt.artifacts.resources.v1.config import NodeConfig @@ -164,6 +166,7 @@ class DeferRelation(HasRelationMetadata): meta: Dict[str, Any] tags: List[str] config: Optional[NodeConfig] + external_catalog: Optional[ExternalCatalog] @property def identifier(self): diff --git a/core/dbt/context/providers.py b/core/dbt/context/providers.py index 6cd0c6d8b19..cf4dede8c1a 100644 --- a/core/dbt/context/providers.py +++ b/core/dbt/context/providers.py @@ -19,6 +19,7 @@ from typing_extensions import Protocol from dbt import selected_resources +from dbt.adapters.base.catalog import ExternalCatalogIntegrations from dbt.adapters.base.column import Column from dbt.adapters.base.relation import EventTimeFilter from dbt.adapters.contracts.connection import AdapterResponse @@ -898,6 +899,9 @@ def __init__( self.context_config: Optional[ContextConfig] = context_config self.provider: Provider = provider self.adapter = get_adapter(self.config) + self.catalog_integrations = ExternalCatalogIntegrations.from_json_strings( + self.manifest.catalogs.values(), self.adapter.ExternalCatalogIntegration + ) # The macro namespace is used in creating the DatabaseWrapper self.db_wrapper = self.provider.DatabaseWrapper(self.adapter, self.namespace) @@ -1295,7 +1299,7 @@ def api(self) -> Dict[str, Any]: return { "Relation": self.db_wrapper.Relation, "Column": self.adapter.Column, - # "Catalog": self.adapter.Catalog, + "catalogs": self.catalog_integrations, } @contextproperty()