diff --git a/.changes/unreleased/Features-20231219-201203.yaml b/.changes/unreleased/Features-20231219-201203.yaml new file mode 100644 index 000000000..eee3f1026 --- /dev/null +++ b/.changes/unreleased/Features-20231219-201203.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Support limiting get_catalog by object name +time: 2023-12-19T20:12:03.990725-05:00 +custom: + Author: mikealfare + Issue: "950" diff --git a/dbt/adapters/bigquery/impl.py b/dbt/adapters/bigquery/impl.py index eb2aec175..2df35bc65 100644 --- a/dbt/adapters/bigquery/impl.py +++ b/dbt/adapters/bigquery/impl.py @@ -23,7 +23,7 @@ ) from dbt.adapters.base.impl import FreshnessResponse from dbt.adapters.cache import _make_ref_key_dict # type: ignore -from dbt.adapters.capability import CapabilityDict, CapabilitySupport, Support, Capability +from dbt.adapters.capability import Capability, CapabilityDict, CapabilitySupport, Support import dbt_common.clients.agate_helper from dbt.adapters.contracts.connection import AdapterResponse from dbt.adapters.contracts.macros import MacroResolverProtocol @@ -126,6 +126,7 @@ class BigQueryAdapter(BaseAdapter): _capabilities: CapabilityDict = CapabilityDict( { Capability.TableLastModifiedMetadata: CapabilitySupport(support=Support.Full), + Capability.SchemaMetadataByRelations: CapabilitySupport(support=Support.Full), } ) diff --git a/dbt/adapters/bigquery/relation_configs/_partition.py b/dbt/adapters/bigquery/relation_configs/_partition.py index 97e695fc0..555aa3664 100644 --- a/dbt/adapters/bigquery/relation_configs/_partition.py +++ b/dbt/adapters/bigquery/relation_configs/_partition.py @@ -111,7 +111,7 @@ def parse_model_node(cls, relation_config: RelationConfig) -> Dict[str, Any]: This doesn't currently collect `time_ingestion_partitioning` and `copy_partitions` because this was built for materialized views, which do not support those settings. """ - config_dict: Dict[str, str] = relation_config.config.extra.get("partition_by") # type: ignore + config_dict: Dict[str, Any] = relation_config.config.extra.get("partition_by") # type: ignore if "time_ingestion_partitioning" in config_dict: del config_dict["time_ingestion_partitioning"] if "copy_partitions" in config_dict: diff --git a/dbt/include/bigquery/macros/catalog.sql b/dbt/include/bigquery/macros/catalog.sql deleted file mode 100644 index 25166c7b4..000000000 --- a/dbt/include/bigquery/macros/catalog.sql +++ /dev/null @@ -1,231 +0,0 @@ - -{% macro bigquery__get_catalog(information_schema, schemas) -%} - - {%- if (schemas | length) == 0 -%} - {# Hopefully nothing cares about the columns we return when there are no rows #} - {%- set query = "select 1 as id limit 0" -%} - {%- else -%} - - {%- set query -%} - with materialized_views as ( - select - table_catalog as project_id, - table_schema as dataset_id, - table_name as table_id - from {{ information_schema.replace(information_schema_view='MATERIALIZED_VIEWS') }} - ), - tables as ( - select - tables.project_id as table_database, - tables.dataset_id as table_schema, - tables.table_id as original_table_name, - - concat(tables.project_id, '.', tables.dataset_id, '.', tables.table_id) as relation_id, - - tables.row_count, - tables.size_bytes as size_bytes, - case - when materialized_views.table_id is not null then 'materialized view' - when tables.type = 1 then 'table' - when tables.type = 2 then 'view' - else 'external' - end as table_type, - - REGEXP_CONTAINS(tables.table_id, '^.+[0-9]{8}$') and coalesce(type, 0) = 1 as is_date_shard, - REGEXP_EXTRACT(tables.table_id, '^(.+)[0-9]{8}$') as shard_base_name, - REGEXP_EXTRACT(tables.table_id, '^.+([0-9]{8})$') as shard_name - - from {{ information_schema.replace(information_schema_view='__TABLES__') }} tables - left join materialized_views - on materialized_views.project_id = tables.project_id - and materialized_views.dataset_id = tables.dataset_id - and materialized_views.table_id = tables.table_id - where ( - {%- for schema in schemas -%} - upper(tables.dataset_id) = upper('{{ schema }}'){%- if not loop.last %} or {% endif -%} - {%- endfor -%} - ) - ), - - table_options as ( - select - concat(table_catalog, '.', table_schema, '.', table_name) as relation_id, - JSON_VALUE(option_value) as table_comment - - from {{ information_schema.replace(information_schema_view='TABLE_OPTIONS') }} - where option_name = 'description' - ), - extracted as ( - - select *, - case - when is_date_shard then shard_base_name - else original_table_name - end as table_name - - from tables - - ), - - unsharded_tables as ( - - select - table_database, - table_schema, - table_name, - coalesce(table_type, 'external') as table_type, - is_date_shard, - - struct( - min(shard_name) as shard_min, - max(shard_name) as shard_max, - count(*) as shard_count - ) as table_shards, - - sum(size_bytes) as size_bytes, - sum(row_count) as row_count, - - max(relation_id) as relation_id - - from extracted - group by 1,2,3,4,5 - - ), - - info_schema_columns as ( - - select - concat(table_catalog, '.', table_schema, '.', table_name) as relation_id, - table_catalog as table_database, - table_schema, - table_name, - - -- use the "real" column name from the paths query below - column_name as base_column_name, - ordinal_position as column_index, - - is_partitioning_column, - clustering_ordinal_position - - from {{ information_schema.replace(information_schema_view='COLUMNS') }} - where ordinal_position is not null - - ), - - info_schema_column_paths as ( - - select - concat(table_catalog, '.', table_schema, '.', table_name) as relation_id, - field_path as column_name, - data_type as column_type, - column_name as base_column_name, - description as column_comment - - from {{ information_schema.replace(information_schema_view='COLUMN_FIELD_PATHS') }} - - ), - - columns as ( - - select * except (base_column_name) - from info_schema_columns - join info_schema_column_paths using (relation_id, base_column_name) - - ), - - column_stats as ( - - select - table_database, - table_schema, - table_name, - max(relation_id) as relation_id, - max(case when is_partitioning_column = 'YES' then 1 else 0 end) = 1 as is_partitioned, - max(case when is_partitioning_column = 'YES' then column_name else null end) as partition_column, - max(case when clustering_ordinal_position is not null then 1 else 0 end) = 1 as is_clustered, - array_to_string( - array_agg( - case - when clustering_ordinal_position is not null then column_name - else null - end ignore nulls - order by clustering_ordinal_position - ), ', ' - ) as clustering_columns - - from columns - group by 1,2,3 - - ) - - select - unsharded_tables.table_database, - unsharded_tables.table_schema, - case - when is_date_shard then concat(unsharded_tables.table_name, '*') - else unsharded_tables.table_name - end as table_name, - unsharded_tables.table_type, - table_options.table_comment, - - -- coalesce name and type for External tables - these columns are not - -- present in the COLUMN_FIELD_PATHS resultset - coalesce(columns.column_name, '') as column_name, - -- invent a row number to account for nested fields -- BQ does - -- not treat these nested properties as independent fields - row_number() over ( - partition by relation_id - order by columns.column_index, columns.column_name - ) as column_index, - coalesce(columns.column_type, '') as column_type, - columns.column_comment, - - 'Shard count' as `stats__date_shards__label`, - table_shards.shard_count as `stats__date_shards__value`, - 'The number of date shards in this table' as `stats__date_shards__description`, - is_date_shard as `stats__date_shards__include`, - - 'Shard (min)' as `stats__date_shard_min__label`, - table_shards.shard_min as `stats__date_shard_min__value`, - 'The first date shard in this table' as `stats__date_shard_min__description`, - is_date_shard as `stats__date_shard_min__include`, - - 'Shard (max)' as `stats__date_shard_max__label`, - table_shards.shard_max as `stats__date_shard_max__value`, - 'The last date shard in this table' as `stats__date_shard_max__description`, - is_date_shard as `stats__date_shard_max__include`, - - '# Rows' as `stats__num_rows__label`, - row_count as `stats__num_rows__value`, - 'Approximate count of rows in this table' as `stats__num_rows__description`, - (unsharded_tables.table_type = 'table') as `stats__num_rows__include`, - - 'Approximate Size' as `stats__num_bytes__label`, - size_bytes as `stats__num_bytes__value`, - 'Approximate size of table as reported by BigQuery' as `stats__num_bytes__description`, - (unsharded_tables.table_type = 'table') as `stats__num_bytes__include`, - - 'Partitioned By' as `stats__partitioning_type__label`, - partition_column as `stats__partitioning_type__value`, - 'The partitioning column for this table' as `stats__partitioning_type__description`, - is_partitioned as `stats__partitioning_type__include`, - - 'Clustered By' as `stats__clustering_fields__label`, - clustering_columns as `stats__clustering_fields__value`, - 'The clustering columns for this table' as `stats__clustering_fields__description`, - is_clustered as `stats__clustering_fields__include` - - -- join using relation_id (an actual relation, not a shard prefix) to make - -- sure that column metadata is picked up through the join. This will only - -- return the column information for the "max" table in a date-sharded table set - from unsharded_tables - left join table_options using (relation_id) - left join columns using (relation_id) - left join column_stats using (relation_id) - {%- endset -%} - - {%- endif -%} - - {{ return(run_query(query)) }} - -{%- endmacro %} diff --git a/dbt/include/bigquery/macros/catalog/by_relation.sql b/dbt/include/bigquery/macros/catalog/by_relation.sql new file mode 100644 index 000000000..adaa740f6 --- /dev/null +++ b/dbt/include/bigquery/macros/catalog/by_relation.sql @@ -0,0 +1,36 @@ +{% macro bigquery__get_catalog_relations(information_schema, relations) -%} + + {%- if (relations | length) == 0 -%} + {# Hopefully nothing cares about the columns we return when there are no rows #} + {%- set query = "select 1 as id limit 0" -%} + + {%- else -%} + {%- set query -%} + with + table_shards_stage as ({{ _bigquery__get_table_shards_sql(information_schema) }}), + table_shards as ( + select * from table_shards_stage + where ( + {%- for relation in relations -%} + ( + upper(table_schema) = upper('{{ relation.schema }}') + and upper(table_name) = upper('{{ relation.identifier }}') + ) + {%- if not loop.last %} or {% endif -%} + {%- endfor -%} + ) + ), + tables as ({{ _bigquery__get_tables_sql() }}), + table_stats as ({{ _bigquery__get_table_stats_sql() }}), + + columns as ({{ _bigquery__get_columns_sql(information_schema) }}), + column_stats as ({{ _bigquery__get_column_stats_sql() }}) + + {{ _bigquery__get_extended_catalog_sql() }} + {%- endset -%} + + {%- endif -%} + + {{ return(run_query(query)) }} + +{%- endmacro %} diff --git a/dbt/include/bigquery/macros/catalog/by_schema.sql b/dbt/include/bigquery/macros/catalog/by_schema.sql new file mode 100644 index 000000000..0d36f2b84 --- /dev/null +++ b/dbt/include/bigquery/macros/catalog/by_schema.sql @@ -0,0 +1,32 @@ +{% macro bigquery__get_catalog(information_schema, schemas) -%} + + {%- if (schemas | length) == 0 -%} + {# Hopefully nothing cares about the columns we return when there are no rows #} + {%- set query = "select 1 as id limit 0" -%} + + {%- else -%} + {%- set query -%} + with + table_shards as ( + {{ _bigquery__get_table_shards_sql(information_schema) }} + where ( + {%- for schema in schemas -%} + upper(tables.dataset_id) = upper('{{ schema }}') + {%- if not loop.last %} or {% endif -%} + {%- endfor -%} + ) + ), + tables as ({{ _bigquery__get_tables_sql() }}), + table_stats as ({{ _bigquery__get_table_stats_sql() }}), + + columns as ({{ _bigquery__get_columns_sql(information_schema) }}), + column_stats as ({{ _bigquery__get_column_stats_sql() }}) + + {{ _bigquery__get_extended_catalog_sql() }} + {%- endset -%} + + {%- endif -%} + + {{ return(run_query(query)) }} + +{%- endmacro %} diff --git a/dbt/include/bigquery/macros/catalog/catalog.sql b/dbt/include/bigquery/macros/catalog/catalog.sql new file mode 100644 index 000000000..de16f82bf --- /dev/null +++ b/dbt/include/bigquery/macros/catalog/catalog.sql @@ -0,0 +1,177 @@ +{% macro _bigquery__get_table_shards_sql(information_schema) %} + select + tables.project_id as table_catalog, + tables.dataset_id as table_schema, + coalesce(REGEXP_EXTRACT(tables.table_id, '^(.+)[0-9]{8}$'), tables.table_id) as table_name, + tables.table_id as shard_name, + REGEXP_EXTRACT(tables.table_id, '^.+([0-9]{8})$') as shard_index, + REGEXP_CONTAINS(tables.table_id, '^.+[0-9]{8}$') and tables.type = 1 as is_date_shard, + case + when materialized_views.table_name is not null then 'materialized view' + when tables.type = 1 then 'table' + when tables.type = 2 then 'view' + else 'external' + end as table_type, + tables.type = 1 as is_table, + JSON_VALUE(table_description.option_value) as table_comment, + tables.size_bytes, + tables.row_count + from {{ information_schema.replace(information_schema_view='__TABLES__') }} tables + left join {{ information_schema.replace(information_schema_view='MATERIALIZED_VIEWS') }} materialized_views + on materialized_views.table_catalog = tables.project_id + and materialized_views.table_schema = tables.dataset_id + and materialized_views.table_name = tables.table_id + left join {{ information_schema.replace(information_schema_view='TABLE_OPTIONS') }} table_description + on table_description.table_catalog = tables.project_id + and table_description.table_schema = tables.dataset_id + and table_description.table_name = tables.table_id + and table_description.option_name = 'description' +{% endmacro %} + + +{% macro _bigquery__get_tables_sql() %} + select distinct + table_catalog, + table_schema, + table_name, + is_date_shard, + table_type, + is_table, + table_comment + from table_shards +{% endmacro %} + + +{% macro _bigquery__get_table_stats_sql() %} + select + table_catalog, + table_schema, + table_name, + max(shard_name) as latest_shard_name, + min(shard_index) as shard_min, + max(shard_index) as shard_max, + count(shard_index) as shard_count, + sum(size_bytes) as size_bytes, + sum(row_count) as row_count + from table_shards + group by 1, 2, 3 +{% endmacro %} + + +{% macro _bigquery__get_columns_sql(information_schema) %} + select + columns.table_catalog, + columns.table_schema, + columns.table_name as shard_name, + coalesce(paths.field_path, '') as column_name, + -- invent a row number to account for nested fields + -- BQ does not treat these nested properties as independent fields + row_number() over ( + partition by + columns.table_catalog, + columns.table_schema, + columns.table_name + order by + columns.ordinal_position, + paths.field_path + ) as column_index, + coalesce(paths.data_type, '') as column_type, + paths.description as column_comment, + case when columns.is_partitioning_column = 'YES' then 1 else 0 end as is_partitioning_column, + case when columns.is_partitioning_column = 'YES' then paths.field_path end as partition_column, + case when columns.clustering_ordinal_position is not null then 1 else 0 end as is_clustering_column, + case when columns.clustering_ordinal_position is not null then paths.field_path end as cluster_column, + columns.clustering_ordinal_position + from {{ information_schema.replace(information_schema_view='COLUMNS') }} columns + join {{ information_schema.replace(information_schema_view='COLUMN_FIELD_PATHS') }} paths + on paths.table_catalog = columns.table_catalog + and paths.table_schema = columns.table_schema + and paths.table_name = columns.table_name + and paths.column_name = columns.column_name + where columns.ordinal_position is not null +{% endmacro %} + + +{% macro _bigquery__get_column_stats_sql() %} + select + table_catalog, + table_schema, + shard_name, + max(is_partitioning_column) = 1 as is_partitioned, + max(partition_column) as partition_column, + max(is_clustering_column) = 1 as is_clustered, + array_to_string( + array_agg( + cluster_column ignore nulls + order by clustering_ordinal_position + ), ', ' + ) as clustering_columns + from columns + group by 1, 2, 3 +{% endmacro %} + + +{% macro _bigquery__get_extended_catalog_sql() %} + select + tables.table_catalog as table_database, + tables.table_schema, + case + when tables.is_date_shard then concat(tables.table_name, '*') + else tables.table_name + end as table_name, + tables.table_type, + tables.table_comment, + columns.column_name, + columns.column_index, + columns.column_type, + columns.column_comment, + + 'Shard count' as `stats__date_shards__label`, + table_stats.shard_count as `stats__date_shards__value`, + 'The number of date shards in this table' as `stats__date_shards__description`, + tables.is_date_shard as `stats__date_shards__include`, + + 'Shard (min)' as `stats__date_shard_min__label`, + table_stats.shard_min as `stats__date_shard_min__value`, + 'The first date shard in this table' as `stats__date_shard_min__description`, + tables.is_date_shard as `stats__date_shard_min__include`, + + 'Shard (max)' as `stats__date_shard_max__label`, + table_stats.shard_max as `stats__date_shard_max__value`, + 'The last date shard in this table' as `stats__date_shard_max__description`, + tables.is_date_shard as `stats__date_shard_max__include`, + + '# Rows' as `stats__num_rows__label`, + table_stats.row_count as `stats__num_rows__value`, + 'Approximate count of rows in this table' as `stats__num_rows__description`, + tables.is_table as `stats__num_rows__include`, + + 'Approximate Size' as `stats__num_bytes__label`, + table_stats.size_bytes as `stats__num_bytes__value`, + 'Approximate size of table as reported by BigQuery' as `stats__num_bytes__description`, + tables.is_table as `stats__num_bytes__include`, + + 'Partitioned By' as `stats__partitioning_type__label`, + column_stats.partition_column as `stats__partitioning_type__value`, + 'The partitioning column for this table' as `stats__partitioning_type__description`, + column_stats.is_partitioned as `stats__partitioning_type__include`, + + 'Clustered By' as `stats__clustering_fields__label`, + column_stats.clustering_columns as `stats__clustering_fields__value`, + 'The clustering columns for this table' as `stats__clustering_fields__description`, + column_stats.is_clustered as `stats__clustering_fields__include` + + from tables + join table_stats + on table_stats.table_catalog = tables.table_catalog + and table_stats.table_schema = tables.table_schema + and table_stats.table_name = tables.table_name + left join column_stats + on column_stats.table_catalog = tables.table_catalog + and column_stats.table_schema = tables.table_schema + and column_stats.shard_name = table_stats.latest_shard_name + left join columns + on columns.table_catalog = tables.table_catalog + and columns.table_schema = tables.table_schema + and columns.shard_name = table_stats.latest_shard_name +{% endmacro %}