Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new get_catalog_relations macro, Supporting Changes #8648

Merged
merged 12 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20230929-154743.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Added support for retrieving partial catalog information from a schema
time: 2023-09-29T15:47:43.612438-04:00
custom:
Author: peterallenwebb
Issue: "8521"
107 changes: 89 additions & 18 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
from dbt import deprecations

GET_CATALOG_MACRO_NAME = "get_catalog"
GET_CATALOG_RELATIONS_MACRO_NAME = "get_catalog_relations"
peterallenwebb marked this conversation as resolved.
Show resolved Hide resolved
FRESHNESS_MACRO_NAME = "collect_freshness"


Expand Down Expand Up @@ -161,6 +162,14 @@
raise NotImplementedError("PythonJobHelper submit function is not implemented yet")


class AdapterFeature(str, Enum):
"""Enumeration of optional adapter features which can be probed using BaseAdapter.has_feature()"""

CatalogByRelations = "CatalogByRelations"
"""Flags support for retrieving catalog information using a list of relations, rather than always retrieving all
the relations in a schema """


class BaseAdapter(metaclass=AdapterMeta):
"""The BaseAdapter provides an abstract base class for adapters.

Expand Down Expand Up @@ -415,6 +424,29 @@
lowercase strings.
"""
info_schema_name_map = SchemaSearchMap()
relations = self._get_catalog_relations(manifest)
for relation in relations:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't put my finger on why, but I feel like this piece should be a method on SchemaSearchMap, something like SchemaSearchMap.extend(relations). Then this becomes:

info_schema_name_map = SchemaSearchMap()
info_schema_name_map.extend(self._get_catalog_relations(manifest))
return info_schema_name_map

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, @mikealfare, following this up, since I didn't incorporate every one of your requested changes.

My thinking at the time was that the code I had was tested and working, so I didn't want continue tweaking relatively small details after we had already done a lot of collaboration and gone multiple rounds of review. I should have at least left a note to that effect.

However, if you think this or any of the other remaining issues ought to be addressed before 1.7.0, let me know and I am happy to open a follow up issue.

info_schema_name_map.add(relation)

Check warning on line 429 in core/dbt/adapters/base/impl.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/adapters/base/impl.py#L427-L429

Added lines #L427 - L429 were not covered by tests
# result is a map whose keys are information_schema Relations without
# identifiers that have appropriate database prefixes, and whose values
# are sets of lowercase schema names that are valid members of those
# databases
return info_schema_name_map

Check warning on line 434 in core/dbt/adapters/base/impl.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/adapters/base/impl.py#L434

Added line #L434 was not covered by tests

def _get_catalog_relations_by_info_schema(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be more readable with a defaultdict. Untested pseudo code:

from collections import defaultdict

def _get_catalog_relations_by_info_schema(self, manifest):
    relations_by_info_schema = defaultdict(list)
    for relation in self._get_catalog_relations(manifext):
        relations_by_info_schema[relation.information_schema_only()].append(relation)
    return dict(relations_by_info_schema)

self, manifest: Manifest
) -> Dict[InformationSchema, List[BaseRelation]]:
relations = self._get_catalog_relations(manifest)
relations_by_info_schema: Dict[InformationSchema, List[BaseRelation]] = dict()
for relation in relations:
info_schema = relation.information_schema_only()
if info_schema not in relations_by_info_schema:
relations_by_info_schema[info_schema] = []
relations_by_info_schema[info_schema].append(relation)

return relations_by_info_schema

def _get_catalog_relations(self, manifest: Manifest) -> List[BaseRelation]:
nodes: Iterator[ResultNode] = chain(
[
node
Expand All @@ -423,14 +455,9 @@
],
manifest.sources.values(),
)
for node in nodes:
relation = self.Relation.create_from(self.config, node)
info_schema_name_map.add(relation)
# result is a map whose keys are information_schema Relations without
# identifiers that have appropriate database prefixes, and whose values
# are sets of lowercase schema names that are valid members of those
# databases
return info_schema_name_map

relations = [self.Relation.create_from(self.config, n) for n in nodes]
return relations

def _relations_cache_for_schemas(
self, manifest: Manifest, cache_schemas: Optional[Set[BaseRelation]] = None
Expand Down Expand Up @@ -1093,20 +1120,57 @@
results = self._catalog_filter_table(table, manifest) # type: ignore[arg-type]
return results

def _get_one_catalog_by_relations(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does the name of this macro mean? Put another way, how do I know what this should return? It looks like this runs the new macro, but also limits it to objects in the manifest; is that right? This only gets called once, and in the branch of code where we're passing in an explicit list of relations that we got from the manifest. Is the filter still necessary?

I think there's value in wrapping the macro itself in an adapter method. But if we do need to filter it for some reason, then we should do that after this is called.

self,
information_schema: InformationSchema,
relations: List[BaseRelation],
manifest: Manifest,
) -> agate.Table:

kwargs = {
"information_schema": information_schema,
"relations": relations,
}
table = self.execute_macro(
GET_CATALOG_RELATIONS_MACRO_NAME,
kwargs=kwargs,
# pass in the full manifest, so we get any local project
# overrides
manifest=manifest,
)

results = self._catalog_filter_table(table, manifest) # type: ignore[arg-type]
return results

def get_catalog(self, manifest: Manifest) -> Tuple[agate.Table, List[Exception]]:
schema_map = self._get_catalog_schemas(manifest)

with executor(self.config) as tpe:
futures: List[Future[agate.Table]] = []
for info, schemas in schema_map.items():
if len(schemas) == 0:
continue
name = ".".join([str(info.database), "information_schema"])

fut = tpe.submit_connected(
self, name, self._get_one_catalog, info, schemas, manifest
)
futures.append(fut)
relation_count = len(self._get_catalog_relations(manifest))
if relation_count <= 100 and self.has_feature(AdapterFeature.CatalogByRelations):
relations_by_schema = self._get_catalog_relations_by_info_schema(manifest)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few observations:

  • We're calling self._get_catalog_relations() twice, once for the length and again inside of self._get_catalog_relations_by_info_schema
  • self._get_catalog_relations_by_info_schema is doing two things, getting the relations and reformatting the resulting list into a dictionary
  • The new method self._get_catalog_relations_by_info_schema is only used once, here
  • relations_by_schema is grouping by the information_schema, not relation.schema, which is misleading
  • relations_by_schema is created to filter a list of relations

Perhaps we could simplify this by reusing the results of self._get_catalog_relations(manifest) (let's refer to this as relations) and filtering relations by info_schema while looping through the info_schema values?

Some pseudo code to add context (and to organize my own thoughts):

def get_catalog(self, manifest):
    with executor(self.config) as tpe:
        futures = []
        relations = self._get_catalog_relations(manifest)
        if self.CATALOG_BY_RELATION_SUPPORT and len(relations) <= 100:
            info_schemas = {relation.information_schema_only() for relation in relations}
            for info_schema in info_schemas:
                name = ".".join([str(info_schema.database), "information_schema"])
                relations_in_this_info_schema = [
                    r for r in relations if r.information_schema_only() == info_schema
                ],
                fut = tpe.submit_connected(
                    self,
                    name,
                    self._get_one_catalog_by_relations,
                    info_schema,
                    relations_in_this_info_schema,
                    manifest,
                )
                futures.append(fut)
        else:
            # old way

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this function, it may be worth reviewing the state of the code today if you still have concerns. Gerda refactored it a bit, and made changes along the lines you are suggesting.

for info_schema in relations_by_schema:
name = ".".join([str(info_schema.database), "information_schema"])
relations = relations_by_schema[info_schema]
fut = tpe.submit_connected(
self,
name,
self._get_one_catalog_by_relations,
info_schema,
relations,
manifest,
)
futures.append(fut)
else:
schema_map: SchemaSearchMap = self._get_catalog_schemas(manifest)
for info, schemas in schema_map.items():
if len(schemas) == 0:
continue
name = ".".join([str(info.database), "information_schema"])
fut = tpe.submit_connected(

Check warning on line 1170 in core/dbt/adapters/base/impl.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/adapters/base/impl.py#L1165-L1170

Added lines #L1165 - L1170 were not covered by tests
self, name, self._get_one_catalog, info, schemas, manifest
)
futures.append(fut)

Check warning on line 1173 in core/dbt/adapters/base/impl.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/adapters/base/impl.py#L1173

Added line #L1173 was not covered by tests

catalogs, exceptions = catch_as_completed(futures)

Expand Down Expand Up @@ -1437,6 +1501,13 @@
else:
return None

@classmethod
def has_feature(cls, feature: AdapterFeature) -> bool:
# The base adapter implementation does not implement any optional
# features, so always return false. Adapters which wish to provide
# optional features will have to override this function.
return False

Check warning on line 1509 in core/dbt/adapters/base/impl.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/adapters/base/impl.py#L1509

Added line #L1509 was not covered by tests


COLUMNS_EQUAL_SQL = """
with diff_count as (
Expand Down
6 changes: 3 additions & 3 deletions core/dbt/adapters/base/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,11 +459,11 @@
self[key].add(schema)

def search(self) -> Iterator[Tuple[InformationSchema, Optional[str]]]:
for information_schema_name, schemas in self.items():
mikealfare marked this conversation as resolved.
Show resolved Hide resolved
for information_schema, schemas in self.items():

Check warning on line 462 in core/dbt/adapters/base/relation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/adapters/base/relation.py#L462

Added line #L462 was not covered by tests
for schema in schemas:
yield information_schema_name, schema
yield information_schema, schema

Check warning on line 464 in core/dbt/adapters/base/relation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/adapters/base/relation.py#L464

Added line #L464 was not covered by tests

def flatten(self, allow_multiple_databases: bool = False):
def flatten(self, allow_multiple_databases: bool = False) -> "SchemaSearchMap":
new = self.__class__()

# make sure we don't have multiple databases if allow_multiple_databases is set to False
Expand Down
13 changes: 13 additions & 0 deletions core/dbt/include/global_project/macros/adapters/metadata.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
{% macro get_catalog_relations(information_schema, relations) -%}
{{ return(adapter.dispatch('get_catalog_relations', 'dbt')(information_schema, relations)) }}
{%- endmacro %}

{% macro default__get_catalog_relations(information_schema, relations) -%}
{% set typename = adapter.type() %}
{% set msg -%}
get_catalog_relations not implemented for {{ typename }}
{%- endset %}

{{ exceptions.raise_compiler_error(msg) }}
{%- endmacro %}

{% macro get_catalog(information_schema, schemas) -%}
{{ return(adapter.dispatch('get_catalog', 'dbt')(information_schema, schemas)) }}
{%- endmacro %}
Expand Down
14 changes: 11 additions & 3 deletions plugins/postgres/dbt/adapters/postgres/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Optional, Set, List, Any

from dbt.adapters.base.meta import available
from dbt.adapters.base.impl import AdapterConfig, ConstraintSupport
from dbt.adapters.base.impl import AdapterConfig, AdapterFeature, ConstraintSupport
from dbt.adapters.sql import SQLAdapter
from dbt.adapters.postgres import PostgresConnectionManager
from dbt.adapters.postgres.column import PostgresColumn
Expand Down Expand Up @@ -73,6 +73,10 @@ class PostgresAdapter(SQLAdapter):
ConstraintType.foreign_key: ConstraintSupport.ENFORCED,
}

CATALOG_BY_RELATION_SUPPORT = True

SUPPORTED_FEATURES: Set[AdapterFeature] = frozenset([AdapterFeature.CatalogByRelations])

@classmethod
def date_function(cls):
return "now()"
Expand Down Expand Up @@ -113,9 +117,9 @@ def _link_cached_database_relations(self, schemas: Set[str]):

def _get_catalog_schemas(self, manifest):
# postgres only allow one database (the main one)
schemas = super()._get_catalog_schemas(manifest)
schema_search_map = super()._get_catalog_schemas(manifest)
try:
return schemas.flatten()
return schema_search_map.flatten()
except DbtRuntimeError as exc:
raise CrossDbReferenceProhibitedError(self.type(), exc.msg)

Expand Down Expand Up @@ -143,3 +147,7 @@ def valid_incremental_strategies(self):

def debug_query(self):
self.execute("select 1 as id")

@classmethod
def has_feature(cls, feature: AdapterFeature) -> bool:
return feature in cls.SUPPORTED_FEATURES
27 changes: 20 additions & 7 deletions plugins/postgres/dbt/include/postgres/macros/catalog.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

{% macro postgres__get_catalog(information_schema, schemas) -%}

{% macro postgres__get_catalog_relations(information_schema, relations) -%}
{%- call statement('catalog', fetch_result=True) -%}

{#
If the user has multiple databases set and the first one is wrong, this will fail.
But we won't fail in the case where there are multiple quoting-difference-only dbs, which is better.
Expand Down Expand Up @@ -29,12 +29,17 @@
join pg_catalog.pg_attribute col on col.attrelid = tbl.oid
left outer join pg_catalog.pg_description tbl_desc on (tbl_desc.objoid = tbl.oid and tbl_desc.objsubid = 0)
left outer join pg_catalog.pg_description col_desc on (col_desc.objoid = tbl.oid and col_desc.objsubid = col.attnum)

where (
{%- for schema in schemas -%}
upper(sch.nspname) = upper('{{ schema }}'){%- if not loop.last %} or {% endif -%}
{%- endfor -%}
)
{%- for relation in relations -%}
{%- if relation.identifier -%}
(upper(sch.nspname) = upper('{{ relation.schema }}') and
upper(tbl.relname) = upper('{{ relation.identifier }}'))
{%- else-%}
upper(sch.nspname) = upper('{{ relation.schema }}')
{%- endif -%}
{%- if not loop.last %} or {% endif -%}
{%- endfor -%}
)
and not pg_is_other_temp_schema(sch.oid) -- not a temporary schema belonging to another session
and tbl.relpersistence in ('p', 'u') -- [p]ermanent table or [u]nlogged table. Exclude [t]emporary tables
and tbl.relkind in ('r', 'v', 'f', 'p') -- o[r]dinary table, [v]iew, [f]oreign table, [p]artitioned table. Other values are [i]ndex, [S]equence, [c]omposite type, [t]OAST table, [m]aterialized view
Expand All @@ -49,5 +54,13 @@
{%- endcall -%}

{{ return(load_result('catalog').table) }}
{%- endmacro %}


{% macro postgres__get_catalog(information_schema, schemas) -%}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason I thought schemas here was a list of strings (the schema names), and not a list of BaseRelation objects that act as schemas (by not defining identifier). If that's the case, the where clause produced below should return nothing (relation.identifier is "falsy" because of jinja things, upper(sch.nspname) = upper('{{ relation.schema }}') fails, I think because it becomes upper(sch.nspname) = upper('')).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The schemas parameter to this macro is indeed a list of strings. The loop below converts it to relations: List[obj] where the obj is acting like a BaseRelation (specifically a "schema relation") for the limited purpose of calling get_catalog_where_clause.

So when this macro is called with schemas = [ "schema_a", "schema_b", "schema_c" ], it will in turn call get_catalog_where_clause with relations = [ { "schema": "schema_a" }, { "schema": "schema_b" }, { "schema": "schema_c" } ].

This is just to make it easier to reuse get_catalog_where_clause.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, does jinja treat relation.schema the same as relation.get("schema") or relation["schema"]?

{%- set relations = [] -%}
{%- for schema in schemas -%}
{%- set dummy = relations.append({'schema': schema}) -%}
{%- endfor -%}
{{ return(postgres__get_catalog_relations(information_schema, relations)) }}
{%- endmacro %}
2 changes: 1 addition & 1 deletion tests/functional/artifacts/test_override.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"""

fail_macros__failure_sql = """
{% macro get_catalog(information_schema, schemas) %}
{% macro get_catalog_relations(information_schema, relations) %}
{% do exceptions.raise_compiler_error('rejected: no catalogs for you') %}
{% endmacro %}

Expand Down
14 changes: 9 additions & 5 deletions tests/unit/test_postgres_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,8 @@ def test_set_zero_keepalive(self, psycopg2):
)

@mock.patch.object(PostgresAdapter, "execute_macro")
@mock.patch.object(PostgresAdapter, "_get_catalog_schemas")
def test_get_catalog_various_schemas(self, mock_get_schemas, mock_execute):
@mock.patch.object(PostgresAdapter, "_get_catalog_relations_by_info_schema")
def test_get_catalog_various_schemas(self, mock_get_relations, mock_execute):
column_names = ["table_database", "table_schema", "table_name"]
rows = [
("dbt", "foo", "bar"),
Expand All @@ -334,9 +334,13 @@ def test_get_catalog_various_schemas(self, mock_get_schemas, mock_execute):
]
mock_execute.return_value = agate.Table(rows=rows, column_names=column_names)

mock_get_schemas.return_value.items.return_value = [
(mock.MagicMock(database="dbt"), {"foo", "FOO", "quux"})
]
mock_get_relations.return_value = {
mock.MagicMock(database="dbt"): [
mock.MagicMock(schema="foo"),
mock.MagicMock(schema="FOO"),
mock.MagicMock(schema="quux"),
]
}

mock_manifest = mock.MagicMock()
mock_manifest.get_used_schemas.return_value = {("dbt", "foo"), ("dbt", "quux")}
Expand Down