Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new get_catalog_relations macro, Supporting Changes #8648

Merged
merged 12 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 76 additions & 18 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
from dbt import deprecations

GET_CATALOG_MACRO_NAME = "get_catalog"
GET_CATALOG_RELATIONS_MACRO_NAME = "get_catalog_relations"
peterallenwebb marked this conversation as resolved.
Show resolved Hide resolved
FRESHNESS_MACRO_NAME = "collect_freshness"


Expand Down Expand Up @@ -222,6 +223,8 @@
ConstraintType.foreign_key: ConstraintSupport.ENFORCED,
}

CATALOG_BY_RELATION_SUPPORT = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Putting a placeholder here to discuss "feature flag-like things".


def __init__(self, config) -> None:
self.config = config
self.cache = RelationsCache()
Expand Down Expand Up @@ -415,6 +418,29 @@
lowercase strings.
"""
info_schema_name_map = SchemaSearchMap()
relations = self._get_catalog_relations(manifest)
for relation in relations:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't put my finger on why, but I feel like this piece should be a method on SchemaSearchMap, something like SchemaSearchMap.extend(relations). Then this becomes:

info_schema_name_map = SchemaSearchMap()
info_schema_name_map.extend(self._get_catalog_relations(manifest))
return info_schema_name_map

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, @mikealfare, following this up, since I didn't incorporate every one of your requested changes.

My thinking at the time was that the code I had was tested and working, so I didn't want continue tweaking relatively small details after we had already done a lot of collaboration and gone multiple rounds of review. I should have at least left a note to that effect.

However, if you think this or any of the other remaining issues ought to be addressed before 1.7.0, let me know and I am happy to open a follow up issue.

info_schema_name_map.add(relation)

Check warning on line 423 in core/dbt/adapters/base/impl.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/adapters/base/impl.py#L421-L423

Added lines #L421 - L423 were not covered by tests
# result is a map whose keys are information_schema Relations without
# identifiers that have appropriate database prefixes, and whose values
# are sets of lowercase schema names that are valid members of those
# databases
return info_schema_name_map

Check warning on line 428 in core/dbt/adapters/base/impl.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/adapters/base/impl.py#L428

Added line #L428 was not covered by tests

def _get_catalog_relations_by_info_schema(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be more readable with a defaultdict. Untested pseudo code:

from collections import defaultdict

def _get_catalog_relations_by_info_schema(self, manifest):
    relations_by_info_schema = defaultdict(list)
    for relation in self._get_catalog_relations(manifext):
        relations_by_info_schema[relation.information_schema_only()].append(relation)
    return dict(relations_by_info_schema)

self, manifest: Manifest
) -> Dict[InformationSchema, List[BaseRelation]]:
relations = self._get_catalog_relations(manifest)
relations_by_info_schema: Dict[InformationSchema, List[BaseRelation]] = dict()
for relation in relations:
info_schema = relation.information_schema_only()
if info_schema not in relations_by_info_schema:
relations_by_info_schema[info_schema] = []
relations_by_info_schema[info_schema].append(relation)

return relations_by_info_schema

def _get_catalog_relations(self, manifest: Manifest) -> List[BaseRelation]:
nodes: Iterator[ResultNode] = chain(
[
node
Expand All @@ -423,14 +449,9 @@
],
manifest.sources.values(),
)
for node in nodes:
relation = self.Relation.create_from(self.config, node)
info_schema_name_map.add(relation)
# result is a map whose keys are information_schema Relations without
# identifiers that have appropriate database prefixes, and whose values
# are sets of lowercase schema names that are valid members of those
# databases
return info_schema_name_map

relations = [self.Relation.create_from(self.config, n) for n in nodes]
return relations

def _relations_cache_for_schemas(
self, manifest: Manifest, cache_schemas: Optional[Set[BaseRelation]] = None
Expand Down Expand Up @@ -1093,20 +1114,57 @@
results = self._catalog_filter_table(table, manifest) # type: ignore[arg-type]
return results

def _get_one_catalog_by_relations(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does the name of this macro mean? Put another way, how do I know what this should return? It looks like this runs the new macro, but also limits it to objects in the manifest; is that right? This only gets called once, and in the branch of code where we're passing in an explicit list of relations that we got from the manifest. Is the filter still necessary?

I think there's value in wrapping the macro itself in an adapter method. But if we do need to filter it for some reason, then we should do that after this is called.

self,
information_schema: InformationSchema,
relations: List[BaseRelation],
manifest: Manifest,
) -> agate.Table:

kwargs = {
"information_schema": information_schema,
"relations": relations,
}
table = self.execute_macro(
GET_CATALOG_RELATIONS_MACRO_NAME,
kwargs=kwargs,
# pass in the full manifest, so we get any local project
# overrides
manifest=manifest,
)

results = self._catalog_filter_table(table, manifest) # type: ignore[arg-type]
return results

def get_catalog(self, manifest: Manifest) -> Tuple[agate.Table, List[Exception]]:
schema_map = self._get_catalog_schemas(manifest)

with executor(self.config) as tpe:
futures: List[Future[agate.Table]] = []
for info, schemas in schema_map.items():
if len(schemas) == 0:
continue
name = ".".join([str(info.database), "information_schema"])

fut = tpe.submit_connected(
self, name, self._get_one_catalog, info, schemas, manifest
)
futures.append(fut)
relation_count = len(self._get_catalog_relations(manifest))
if relation_count <= 100 and self.CATALOG_BY_RELATION_SUPPORT:
relations_by_schema = self._get_catalog_relations_by_info_schema(manifest)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few observations:

  • We're calling self._get_catalog_relations() twice, once for the length and again inside of self._get_catalog_relations_by_info_schema
  • self._get_catalog_relations_by_info_schema is doing two things, getting the relations and reformatting the resulting list into a dictionary
  • The new method self._get_catalog_relations_by_info_schema is only used once, here
  • relations_by_schema is grouping by the information_schema, not relation.schema, which is misleading
  • relations_by_schema is created to filter a list of relations

Perhaps we could simplify this by reusing the results of self._get_catalog_relations(manifest) (let's refer to this as relations) and filtering relations by info_schema while looping through the info_schema values?

Some pseudo code to add context (and to organize my own thoughts):

def get_catalog(self, manifest):
    with executor(self.config) as tpe:
        futures = []
        relations = self._get_catalog_relations(manifest)
        if self.CATALOG_BY_RELATION_SUPPORT and len(relations) <= 100:
            info_schemas = {relation.information_schema_only() for relation in relations}
            for info_schema in info_schemas:
                name = ".".join([str(info_schema.database), "information_schema"])
                relations_in_this_info_schema = [
                    r for r in relations if r.information_schema_only() == info_schema
                ],
                fut = tpe.submit_connected(
                    self,
                    name,
                    self._get_one_catalog_by_relations,
                    info_schema,
                    relations_in_this_info_schema,
                    manifest,
                )
                futures.append(fut)
        else:
            # old way

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this function, it may be worth reviewing the state of the code today if you still have concerns. Gerda refactored it a bit, and made changes along the lines you are suggesting.

for info_schema in relations_by_schema:
name = ".".join([str(info_schema.database), "information_schema"])
relations = relations_by_schema[info_schema]
fut = tpe.submit_connected(
self,
name,
self._get_one_catalog_by_relations,
info_schema,
relations,
manifest,
)
futures.append(fut)
else:
schema_map: SchemaSearchMap = self._get_catalog_schemas(manifest)
for info, schemas in schema_map.items():
if len(schemas) == 0:
continue
name = ".".join([str(info.database), "information_schema"])
fut = tpe.submit_connected(

Check warning on line 1164 in core/dbt/adapters/base/impl.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/adapters/base/impl.py#L1159-L1164

Added lines #L1159 - L1164 were not covered by tests
self, name, self._get_one_catalog, info, schemas, manifest
)
futures.append(fut)

Check warning on line 1167 in core/dbt/adapters/base/impl.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/adapters/base/impl.py#L1167

Added line #L1167 was not covered by tests

catalogs, exceptions = catch_as_completed(futures)

Expand Down
6 changes: 3 additions & 3 deletions core/dbt/adapters/base/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,11 +459,11 @@
self[key].add(schema)

def search(self) -> Iterator[Tuple[InformationSchema, Optional[str]]]:
for information_schema_name, schemas in self.items():
mikealfare marked this conversation as resolved.
Show resolved Hide resolved
for information_schema, schemas in self.items():

Check warning on line 462 in core/dbt/adapters/base/relation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/adapters/base/relation.py#L462

Added line #L462 was not covered by tests
for schema in schemas:
yield information_schema_name, schema
yield information_schema, schema

Check warning on line 464 in core/dbt/adapters/base/relation.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/adapters/base/relation.py#L464

Added line #L464 was not covered by tests

def flatten(self, allow_multiple_databases: bool = False):
def flatten(self, allow_multiple_databases: bool = False) -> "SchemaSearchMap":
new = self.__class__()

# make sure we don't have multiple databases if allow_multiple_databases is set to False
Expand Down
13 changes: 13 additions & 0 deletions core/dbt/include/global_project/macros/adapters/metadata.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
{% macro get_catalog_relations(information_schema, relations) -%}
{{ return(adapter.dispatch('get_catalog_relations', 'dbt')(information_schema, relations)) }}
{%- endmacro %}

{% macro default__get_catalog_relations(information_schema, relations) -%}
{% set typename = adapter.type() %}
{% set msg -%}
get_catalog_relations not implemented for {{ typename }}
{%- endset %}

{{ exceptions.raise_compiler_error(msg) }}
{%- endmacro %}

{% macro get_catalog(information_schema, schemas) -%}
{{ return(adapter.dispatch('get_catalog', 'dbt')(information_schema, schemas)) }}
{%- endmacro %}
Expand Down
6 changes: 4 additions & 2 deletions plugins/postgres/dbt/adapters/postgres/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class PostgresAdapter(SQLAdapter):
ConstraintType.foreign_key: ConstraintSupport.ENFORCED,
}

CATALOG_BY_RELATION_SUPPORT = True

@classmethod
def date_function(cls):
return "now()"
Expand Down Expand Up @@ -113,9 +115,9 @@ def _link_cached_database_relations(self, schemas: Set[str]):

def _get_catalog_schemas(self, manifest):
# postgres only allow one database (the main one)
schemas = super()._get_catalog_schemas(manifest)
schema_search_map = super()._get_catalog_schemas(manifest)
try:
return schemas.flatten()
return schema_search_map.flatten()
except DbtRuntimeError as exc:
raise CrossDbReferenceProhibitedError(self.type(), exc.msg)

Expand Down
34 changes: 26 additions & 8 deletions plugins/postgres/dbt/include/postgres/macros/catalog.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

{% macro postgres__get_catalog(information_schema, schemas) -%}

{% macro postgres__get_catalog_relations(information_schema, relations) -%}
{%- call statement('catalog', fetch_result=True) -%}

{#
If the user has multiple databases set and the first one is wrong, this will fail.
But we won't fail in the case where there are multiple quoting-difference-only dbs, which is better.
Expand Down Expand Up @@ -29,12 +29,7 @@
join pg_catalog.pg_attribute col on col.attrelid = tbl.oid
left outer join pg_catalog.pg_description tbl_desc on (tbl_desc.objoid = tbl.oid and tbl_desc.objsubid = 0)
left outer join pg_catalog.pg_description col_desc on (col_desc.objoid = tbl.oid and col_desc.objsubid = col.attnum)

where (
{%- for schema in schemas -%}
upper(sch.nspname) = upper('{{ schema }}'){%- if not loop.last %} or {% endif -%}
{%- endfor -%}
)
{{ postgres__get_catalog_where_clause(relations) }}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the where clause is not reused, I would keep it in line here.

I made it a separate macro because I needed it twice (old implementation an new implementation). However, I think I like your approach at the adapter level of simply rerouting the existing macro to the new macro, avoiding the need for maintaining two versions of the query.

and not pg_is_other_temp_schema(sch.oid) -- not a temporary schema belonging to another session
and tbl.relpersistence in ('p', 'u') -- [p]ermanent table or [u]nlogged table. Exclude [t]emporary tables
and tbl.relkind in ('r', 'v', 'f', 'p') -- o[r]dinary table, [v]iew, [f]oreign table, [p]artitioned table. Other values are [i]ndex, [S]equence, [c]omposite type, [t]OAST table, [m]aterialized view
Expand All @@ -49,5 +44,28 @@
{%- endcall -%}

{{ return(load_result('catalog').table) }}
{%- endmacro %}


{% macro postgres__get_catalog(information_schema, schemas) -%}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason I thought schemas here was a list of strings (the schema names), and not a list of BaseRelation objects that act as schemas (by not defining identifier). If that's the case, the where clause produced below should return nothing (relation.identifier is "falsy" because of jinja things, upper(sch.nspname) = upper('{{ relation.schema }}') fails, I think because it becomes upper(sch.nspname) = upper('')).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The schemas parameter to this macro is indeed a list of strings. The loop below converts it to relations: List[obj] where the obj is acting like a BaseRelation (specifically a "schema relation") for the limited purpose of calling get_catalog_where_clause.

So when this macro is called with schemas = [ "schema_a", "schema_b", "schema_c" ], it will in turn call get_catalog_where_clause with relations = [ { "schema": "schema_a" }, { "schema": "schema_b" }, { "schema": "schema_c" } ].

This is just to make it easier to reuse get_catalog_where_clause.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, does jinja treat relation.schema the same as relation.get("schema") or relation["schema"]?

{%- set relations = [] -%}
{%- for schema in schemas -%}
{%- set dummy = relations.append({'schema': schema}) -%}
{%- endfor -%}
{{ return(postgres__get_catalog_relations(information_schema, relations)) }}
{%- endmacro %}


{% macro postgres__get_catalog_where_clause(relations) %}
where (
{%- for relation in relations -%}
{%- if relation.identifier -%}
(upper(sch.nspname) = upper('{{ relation.schema }}') and
upper(tbl.relname) = upper('{{ relation.identifier }}'))
{%- else-%}
upper(sch.nspname) = upper('{{ relation.schema }}')
{%- endif -%}
{%- if not loop.last %} or {% endif -%}
{%- endfor -%}
)
{%- endmacro %}
2 changes: 1 addition & 1 deletion tests/functional/artifacts/test_override.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"""

fail_macros__failure_sql = """
{% macro get_catalog(information_schema, schemas) %}
{% macro get_catalog_relations(information_schema, relations) %}
{% do exceptions.raise_compiler_error('rejected: no catalogs for you') %}
{% endmacro %}

Expand Down
14 changes: 9 additions & 5 deletions tests/unit/test_postgres_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,8 @@ def test_set_zero_keepalive(self, psycopg2):
)

@mock.patch.object(PostgresAdapter, "execute_macro")
@mock.patch.object(PostgresAdapter, "_get_catalog_schemas")
def test_get_catalog_various_schemas(self, mock_get_schemas, mock_execute):
@mock.patch.object(PostgresAdapter, "_get_catalog_relations_by_info_schema")
def test_get_catalog_various_schemas(self, mock_get_relations, mock_execute):
column_names = ["table_database", "table_schema", "table_name"]
rows = [
("dbt", "foo", "bar"),
Expand All @@ -334,9 +334,13 @@ def test_get_catalog_various_schemas(self, mock_get_schemas, mock_execute):
]
mock_execute.return_value = agate.Table(rows=rows, column_names=column_names)

mock_get_schemas.return_value.items.return_value = [
(mock.MagicMock(database="dbt"), {"foo", "FOO", "quux"})
]
mock_get_relations.return_value = {
mock.MagicMock(database="dbt"): [
mock.MagicMock(schema="foo"),
mock.MagicMock(schema="FOO"),
mock.MagicMock(schema="quux"),
]
}

mock_manifest = mock.MagicMock()
mock_manifest.get_used_schemas.return_value = {("dbt", "foo"), ("dbt", "quux")}
Expand Down
Loading