diff --git a/.changes/unreleased/Features-20230929-154743.yaml b/.changes/unreleased/Features-20230929-154743.yaml new file mode 100644 index 00000000000..b3acb9a034c --- /dev/null +++ b/.changes/unreleased/Features-20230929-154743.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Added support for retrieving partial catalog information from a schema +time: 2023-09-29T15:47:43.612438-04:00 +custom: + Author: peterallenwebb + Issue: "8521" diff --git a/core/dbt/adapters/base/impl.py b/core/dbt/adapters/base/impl.py index 37ab45f8292..f8103009d76 100644 --- a/core/dbt/adapters/base/impl.py +++ b/core/dbt/adapters/base/impl.py @@ -74,6 +74,7 @@ from dbt import deprecations GET_CATALOG_MACRO_NAME = "get_catalog" +GET_CATALOG_RELATIONS_MACRO_NAME = "get_catalog_relations" FRESHNESS_MACRO_NAME = "collect_freshness" @@ -161,6 +162,14 @@ def submit(self, compiled_code: str) -> Any: raise NotImplementedError("PythonJobHelper submit function is not implemented yet") +class AdapterFeature(str, Enum): + """Enumeration of optional adapter features which can be probed using BaseAdapter.has_feature()""" + + CatalogByRelations = "CatalogByRelations" + """Flags support for retrieving catalog information using a list of relations, rather than always retrieving all + the relations in a schema """ + + class BaseAdapter(metaclass=AdapterMeta): """The BaseAdapter provides an abstract base class for adapters. @@ -415,6 +424,29 @@ def _get_catalog_schemas(self, manifest: Manifest) -> SchemaSearchMap: lowercase strings. """ info_schema_name_map = SchemaSearchMap() + relations = self._get_catalog_relations(manifest) + for relation in relations: + info_schema_name_map.add(relation) + # result is a map whose keys are information_schema Relations without + # identifiers that have appropriate database prefixes, and whose values + # are sets of lowercase schema names that are valid members of those + # databases + return info_schema_name_map + + def _get_catalog_relations_by_info_schema( + self, manifest: Manifest + ) -> Dict[InformationSchema, List[BaseRelation]]: + relations = self._get_catalog_relations(manifest) + relations_by_info_schema: Dict[InformationSchema, List[BaseRelation]] = dict() + for relation in relations: + info_schema = relation.information_schema_only() + if info_schema not in relations_by_info_schema: + relations_by_info_schema[info_schema] = [] + relations_by_info_schema[info_schema].append(relation) + + return relations_by_info_schema + + def _get_catalog_relations(self, manifest: Manifest) -> List[BaseRelation]: nodes: Iterator[ResultNode] = chain( [ node @@ -423,14 +455,9 @@ def _get_catalog_schemas(self, manifest: Manifest) -> SchemaSearchMap: ], manifest.sources.values(), ) - for node in nodes: - relation = self.Relation.create_from(self.config, node) - info_schema_name_map.add(relation) - # result is a map whose keys are information_schema Relations without - # identifiers that have appropriate database prefixes, and whose values - # are sets of lowercase schema names that are valid members of those - # databases - return info_schema_name_map + + relations = [self.Relation.create_from(self.config, n) for n in nodes] + return relations def _relations_cache_for_schemas( self, manifest: Manifest, cache_schemas: Optional[Set[BaseRelation]] = None @@ -1093,20 +1120,57 @@ def _get_one_catalog( results = self._catalog_filter_table(table, manifest) # type: ignore[arg-type] return results + def _get_one_catalog_by_relations( + self, + information_schema: InformationSchema, + relations: List[BaseRelation], + manifest: Manifest, + ) -> agate.Table: + + kwargs = { + "information_schema": information_schema, + "relations": relations, + } + table = self.execute_macro( + GET_CATALOG_RELATIONS_MACRO_NAME, + kwargs=kwargs, + # pass in the full manifest, so we get any local project + # overrides + manifest=manifest, + ) + + results = self._catalog_filter_table(table, manifest) # type: ignore[arg-type] + return results + def get_catalog(self, manifest: Manifest) -> Tuple[agate.Table, List[Exception]]: - schema_map = self._get_catalog_schemas(manifest) with executor(self.config) as tpe: futures: List[Future[agate.Table]] = [] - for info, schemas in schema_map.items(): - if len(schemas) == 0: - continue - name = ".".join([str(info.database), "information_schema"]) - - fut = tpe.submit_connected( - self, name, self._get_one_catalog, info, schemas, manifest - ) - futures.append(fut) + relation_count = len(self._get_catalog_relations(manifest)) + if relation_count <= 100 and self.has_feature(AdapterFeature.CatalogByRelations): + relations_by_schema = self._get_catalog_relations_by_info_schema(manifest) + for info_schema in relations_by_schema: + name = ".".join([str(info_schema.database), "information_schema"]) + relations = relations_by_schema[info_schema] + fut = tpe.submit_connected( + self, + name, + self._get_one_catalog_by_relations, + info_schema, + relations, + manifest, + ) + futures.append(fut) + else: + schema_map: SchemaSearchMap = self._get_catalog_schemas(manifest) + for info, schemas in schema_map.items(): + if len(schemas) == 0: + continue + name = ".".join([str(info.database), "information_schema"]) + fut = tpe.submit_connected( + self, name, self._get_one_catalog, info, schemas, manifest + ) + futures.append(fut) catalogs, exceptions = catch_as_completed(futures) @@ -1437,6 +1501,13 @@ def render_model_constraint(cls, constraint: ModelLevelConstraint) -> Optional[s else: return None + @classmethod + def has_feature(cls, feature: AdapterFeature) -> bool: + # The base adapter implementation does not implement any optional + # features, so always return false. Adapters which wish to provide + # optional features will have to override this function. + return False + COLUMNS_EQUAL_SQL = """ with diff_count as ( diff --git a/core/dbt/adapters/base/relation.py b/core/dbt/adapters/base/relation.py index 4e683a52afa..67a50d9061f 100644 --- a/core/dbt/adapters/base/relation.py +++ b/core/dbt/adapters/base/relation.py @@ -459,11 +459,11 @@ def add(self, relation: BaseRelation): self[key].add(schema) def search(self) -> Iterator[Tuple[InformationSchema, Optional[str]]]: - for information_schema_name, schemas in self.items(): + for information_schema, schemas in self.items(): for schema in schemas: - yield information_schema_name, schema + yield information_schema, schema - def flatten(self, allow_multiple_databases: bool = False): + def flatten(self, allow_multiple_databases: bool = False) -> "SchemaSearchMap": new = self.__class__() # make sure we don't have multiple databases if allow_multiple_databases is set to False diff --git a/core/dbt/include/global_project/macros/adapters/metadata.sql b/core/dbt/include/global_project/macros/adapters/metadata.sql index 9e45c500a3f..bdbb910ff51 100644 --- a/core/dbt/include/global_project/macros/adapters/metadata.sql +++ b/core/dbt/include/global_project/macros/adapters/metadata.sql @@ -1,3 +1,16 @@ +{% macro get_catalog_relations(information_schema, relations) -%} + {{ return(adapter.dispatch('get_catalog_relations', 'dbt')(information_schema, relations)) }} +{%- endmacro %} + +{% macro default__get_catalog_relations(information_schema, relations) -%} + {% set typename = adapter.type() %} + {% set msg -%} + get_catalog_relations not implemented for {{ typename }} + {%- endset %} + + {{ exceptions.raise_compiler_error(msg) }} +{%- endmacro %} + {% macro get_catalog(information_schema, schemas) -%} {{ return(adapter.dispatch('get_catalog', 'dbt')(information_schema, schemas)) }} {%- endmacro %} diff --git a/plugins/postgres/dbt/adapters/postgres/impl.py b/plugins/postgres/dbt/adapters/postgres/impl.py index adffc4d3a62..a312a35ea7e 100644 --- a/plugins/postgres/dbt/adapters/postgres/impl.py +++ b/plugins/postgres/dbt/adapters/postgres/impl.py @@ -3,7 +3,7 @@ from typing import Optional, Set, List, Any from dbt.adapters.base.meta import available -from dbt.adapters.base.impl import AdapterConfig, ConstraintSupport +from dbt.adapters.base.impl import AdapterConfig, AdapterFeature, ConstraintSupport from dbt.adapters.sql import SQLAdapter from dbt.adapters.postgres import PostgresConnectionManager from dbt.adapters.postgres.column import PostgresColumn @@ -73,6 +73,10 @@ class PostgresAdapter(SQLAdapter): ConstraintType.foreign_key: ConstraintSupport.ENFORCED, } + CATALOG_BY_RELATION_SUPPORT = True + + SUPPORTED_FEATURES: Set[AdapterFeature] = frozenset([AdapterFeature.CatalogByRelations]) + @classmethod def date_function(cls): return "now()" @@ -113,9 +117,9 @@ def _link_cached_database_relations(self, schemas: Set[str]): def _get_catalog_schemas(self, manifest): # postgres only allow one database (the main one) - schemas = super()._get_catalog_schemas(manifest) + schema_search_map = super()._get_catalog_schemas(manifest) try: - return schemas.flatten() + return schema_search_map.flatten() except DbtRuntimeError as exc: raise CrossDbReferenceProhibitedError(self.type(), exc.msg) @@ -143,3 +147,7 @@ def valid_incremental_strategies(self): def debug_query(self): self.execute("select 1 as id") + + @classmethod + def has_feature(cls, feature: AdapterFeature) -> bool: + return feature in cls.SUPPORTED_FEATURES diff --git a/plugins/postgres/dbt/include/postgres/macros/catalog.sql b/plugins/postgres/dbt/include/postgres/macros/catalog.sql index f0d68e1741c..f2b66c8cbf9 100644 --- a/plugins/postgres/dbt/include/postgres/macros/catalog.sql +++ b/plugins/postgres/dbt/include/postgres/macros/catalog.sql @@ -1,7 +1,7 @@ -{% macro postgres__get_catalog(information_schema, schemas) -%} - +{% macro postgres__get_catalog_relations(information_schema, relations) -%} {%- call statement('catalog', fetch_result=True) -%} + {# If the user has multiple databases set and the first one is wrong, this will fail. But we won't fail in the case where there are multiple quoting-difference-only dbs, which is better. @@ -29,12 +29,17 @@ join pg_catalog.pg_attribute col on col.attrelid = tbl.oid left outer join pg_catalog.pg_description tbl_desc on (tbl_desc.objoid = tbl.oid and tbl_desc.objsubid = 0) left outer join pg_catalog.pg_description col_desc on (col_desc.objoid = tbl.oid and col_desc.objsubid = col.attnum) - where ( - {%- for schema in schemas -%} - upper(sch.nspname) = upper('{{ schema }}'){%- if not loop.last %} or {% endif -%} - {%- endfor -%} - ) + {%- for relation in relations -%} + {%- if relation.identifier -%} + (upper(sch.nspname) = upper('{{ relation.schema }}') and + upper(tbl.relname) = upper('{{ relation.identifier }}')) + {%- else-%} + upper(sch.nspname) = upper('{{ relation.schema }}') + {%- endif -%} + {%- if not loop.last %} or {% endif -%} + {%- endfor -%} + ) and not pg_is_other_temp_schema(sch.oid) -- not a temporary schema belonging to another session and tbl.relpersistence in ('p', 'u') -- [p]ermanent table or [u]nlogged table. Exclude [t]emporary tables and tbl.relkind in ('r', 'v', 'f', 'p') -- o[r]dinary table, [v]iew, [f]oreign table, [p]artitioned table. Other values are [i]ndex, [S]equence, [c]omposite type, [t]OAST table, [m]aterialized view @@ -49,5 +54,13 @@ {%- endcall -%} {{ return(load_result('catalog').table) }} +{%- endmacro %} + +{% macro postgres__get_catalog(information_schema, schemas) -%} + {%- set relations = [] -%} + {%- for schema in schemas -%} + {%- set dummy = relations.append({'schema': schema}) -%} + {%- endfor -%} + {{ return(postgres__get_catalog_relations(information_schema, relations)) }} {%- endmacro %} diff --git a/tests/functional/artifacts/test_override.py b/tests/functional/artifacts/test_override.py index a7b689a3670..1d4f32030bd 100644 --- a/tests/functional/artifacts/test_override.py +++ b/tests/functional/artifacts/test_override.py @@ -7,7 +7,7 @@ """ fail_macros__failure_sql = """ -{% macro get_catalog(information_schema, schemas) %} +{% macro get_catalog_relations(information_schema, relations) %} {% do exceptions.raise_compiler_error('rejected: no catalogs for you') %} {% endmacro %} diff --git a/tests/unit/test_postgres_adapter.py b/tests/unit/test_postgres_adapter.py index 0127d0c6398..80b8d61b9b4 100644 --- a/tests/unit/test_postgres_adapter.py +++ b/tests/unit/test_postgres_adapter.py @@ -322,8 +322,8 @@ def test_set_zero_keepalive(self, psycopg2): ) @mock.patch.object(PostgresAdapter, "execute_macro") - @mock.patch.object(PostgresAdapter, "_get_catalog_schemas") - def test_get_catalog_various_schemas(self, mock_get_schemas, mock_execute): + @mock.patch.object(PostgresAdapter, "_get_catalog_relations_by_info_schema") + def test_get_catalog_various_schemas(self, mock_get_relations, mock_execute): column_names = ["table_database", "table_schema", "table_name"] rows = [ ("dbt", "foo", "bar"), @@ -334,9 +334,13 @@ def test_get_catalog_various_schemas(self, mock_get_schemas, mock_execute): ] mock_execute.return_value = agate.Table(rows=rows, column_names=column_names) - mock_get_schemas.return_value.items.return_value = [ - (mock.MagicMock(database="dbt"), {"foo", "FOO", "quux"}) - ] + mock_get_relations.return_value = { + mock.MagicMock(database="dbt"): [ + mock.MagicMock(schema="foo"), + mock.MagicMock(schema="FOO"), + mock.MagicMock(schema="quux"), + ] + } mock_manifest = mock.MagicMock() mock_manifest.get_used_schemas.return_value = {("dbt", "foo"), ("dbt", "quux")}