diff --git a/dbt/adapters/athena/connections.py b/dbt/adapters/athena/connections.py index 67e2e71e..17e65ad5 100644 --- a/dbt/adapters/athena/connections.py +++ b/dbt/adapters/athena/connections.py @@ -132,6 +132,7 @@ def execute( # type: ignore endpoint_url: Optional[str] = None, cache_size: int = 0, cache_expiration_time: int = 0, + catch_partitions_limit: bool = False, **kwargs, ): def inner() -> AthenaCursor: @@ -158,7 +159,12 @@ def inner() -> AthenaCursor: return self retry = tenacity.Retrying( - retry=retry_if_exception(lambda _: True), + # No need to retry if TOO_MANY_OPEN_PARTITIONS occurs. + # Otherwise, Athena throws ICEBERG_FILESYSTEM_ERROR after retry, + # because not all files are removed immediately after first try to create table + retry=retry_if_exception( + lambda e: False if catch_partitions_limit and "TOO_MANY_OPEN_PARTITIONS" in str(e) else True + ), stop=stop_after_attempt(self._retry_config.attempt), wait=wait_exponential( multiplier=self._retry_config.attempt, diff --git a/dbt/adapters/athena/impl.py b/dbt/adapters/athena/impl.py index 07a82769..4d6dccc2 100755 --- a/dbt/adapters/athena/impl.py +++ b/dbt/adapters/athena/impl.py @@ -1,6 +1,7 @@ import csv import os import posixpath as path +import re import tempfile from itertools import chain from textwrap import dedent @@ -19,6 +20,7 @@ TableTypeDef, TableVersionTypeDef, ) +from pyathena.error import OperationalError from dbt.adapters.athena import AthenaConnectionManager from dbt.adapters.athena.column import AthenaColumn @@ -875,3 +877,26 @@ def _get_table_input(table: TableTypeDef) -> TableInputTypeDef: returned by get_table() method. """ return {k: v for k, v in table.items() if k in TableInputTypeDef.__annotations__} + + @available + def run_query_with_partitions_limit_catching(self, sql: str) -> str: + conn = self.connections.get_thread_connection() + cursor = conn.handle.cursor() + try: + cursor.execute(sql, catch_partitions_limit=True) + except OperationalError as e: + LOGGER.debug(f"CAUGHT EXCEPTION: {e}") + if "TOO_MANY_OPEN_PARTITIONS" in str(e): + return "TOO_MANY_OPEN_PARTITIONS" + raise e + return "SUCCESS" + + @available + def format_partition_keys(self, partition_keys: List[str]) -> str: + return ", ".join([self.format_one_partition_key(k) for k in partition_keys]) + + @available + def format_one_partition_key(self, partition_key: str) -> str: + """Check if partition key uses Iceberg hidden partitioning""" + hidden = re.search(r"^(hour|day|month|year)\((.+)\)", partition_key.lower()) + return f"date_trunc('{hidden.group(1)}', {hidden.group(2)})" if hidden else partition_key.lower() diff --git a/dbt/include/athena/macros/materializations/models/helpers/get_partition_batches.sql b/dbt/include/athena/macros/materializations/models/helpers/get_partition_batches.sql new file mode 100644 index 00000000..20773841 --- /dev/null +++ b/dbt/include/athena/macros/materializations/models/helpers/get_partition_batches.sql @@ -0,0 +1,52 @@ +{% macro get_partition_batches(sql) -%} + {%- set partitioned_by = config.get('partitioned_by') -%} + {%- set athena_partitions_limit = config.get('partitions_limit', 100) | int -%} + {%- set partitioned_keys = adapter.format_partition_keys(partitioned_by) -%} + {% do log('PARTITIONED KEYS: ' ~ partitioned_keys) %} + + {% call statement('get_partitions', fetch_result=True) %} + select distinct {{ partitioned_keys }} from ({{ sql }}) order by {{ partitioned_keys }}; + {% endcall %} + + {%- set table = load_result('get_partitions').table -%} + {%- set rows = table.rows -%} + {%- set partitions = {} -%} + {% do log('TOTAL PARTITIONS TO PROCESS: ' ~ rows | length) %} + {%- set partitions_batches = [] -%} + + {%- for row in rows -%} + {%- set single_partition = [] -%} + {%- for col in row -%} + + {%- set column_type = adapter.convert_type(table, loop.index0) -%} + {%- if column_type == 'integer' -%} + {%- set value = col | string -%} + {%- elif column_type == 'string' -%} + {%- set value = "'" + col + "'" -%} + {%- elif column_type == 'date' -%} + {%- set value = "DATE'" + col | string + "'" -%} + {%- elif column_type == 'timestamp' -%} + {%- set value = "TIMESTAMP'" + col | string + "'" -%} + {%- else -%} + {%- do exceptions.raise_compiler_error('Need to add support for column type ' + column_type) -%} + {%- endif -%} + {%- set partition_key = adapter.format_one_partition_key(partitioned_by[loop.index0]) -%} + {%- do single_partition.append(partition_key + '=' + value) -%} + {%- endfor -%} + + {%- set single_partition_expression = single_partition | join(' and ') -%} + + {%- set batch_number = (loop.index0 / athena_partitions_limit) | int -%} + {% if not batch_number in partitions %} + {% do partitions.update({batch_number: []}) %} + {% endif %} + + {%- do partitions[batch_number].append('(' + single_partition_expression + ')') -%} + {%- if partitions[batch_number] | length == athena_partitions_limit or loop.last -%} + {%- do partitions_batches.append(partitions[batch_number] | join(' or ')) -%} + {%- endif -%} + {%- endfor -%} + + {{ return(partitions_batches) }} + +{%- endmacro %} diff --git a/dbt/include/athena/macros/materializations/models/incremental/helpers.sql b/dbt/include/athena/macros/materializations/models/incremental/helpers.sql index a965e85c..8c253dcf 100644 --- a/dbt/include/athena/macros/materializations/models/incremental/helpers.sql +++ b/dbt/include/athena/macros/materializations/models/incremental/helpers.sql @@ -22,19 +22,42 @@ {% endmacro %} {% macro incremental_insert(on_schema_change, tmp_relation, target_relation, existing_relation, statement_name="main") %} - {% set dest_columns = process_schema_changes(on_schema_change, tmp_relation, existing_relation) %} - {% if not dest_columns %} + {%- set dest_columns = process_schema_changes(on_schema_change, tmp_relation, existing_relation) -%} + {%- if not dest_columns -%} {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} - {% endif %} + {%- endif -%} {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} - insert into {{ target_relation }} ({{ dest_cols_csv }}) - ( - select {{ dest_cols_csv }} - from {{ tmp_relation }} - ); + {%- set insert_full -%} + insert into {{ target_relation }} ({{ dest_cols_csv }}) + ( + select {{ dest_cols_csv }} + from {{ tmp_relation }} + ); + {%- endset -%} + + {%- set query_result = adapter.run_query_with_partitions_limit_catching(insert_full) -%} + {%- do log('QUERY RESULT: ' ~ query_result) -%} + {%- if query_result == 'TOO_MANY_OPEN_PARTITIONS' -%} + {% set partitions_batches = get_partition_batches(tmp_relation) %} + {% do log('BATCHES TO PROCESS: ' ~ partitions_batches | length) %} + {%- for batch in partitions_batches -%} + {%- do log('BATCH PROCESSING: ' ~ loop.index ~ ' OF ' ~ partitions_batches|length) -%} + {%- set insert_batch_partitions -%} + insert into {{ target_relation }} ({{ dest_cols_csv }}) + ( + select {{ dest_cols_csv }} + from {{ tmp_relation }} + where {{ batch }} + ); + {%- endset -%} + {%- do run_query(insert_batch_partitions) -%} + {%- endfor -%} + {%- endif -%} + SELECT 'SUCCESSFULLY INSERTED DATA IN {{ target_relation }}' {%- endmacro %} + {% macro delete_overlapping_partitions(target_relation, tmp_relation, partitioned_by) %} {%- set partitioned_keys = partitioned_by | tojson | replace('\"', '') | replace('[', '') | replace(']', '') -%} {% call statement('get_partitions', fetch_result=True) %} diff --git a/dbt/include/athena/macros/materializations/models/incremental/incremental.sql b/dbt/include/athena/macros/materializations/models/incremental/incremental.sql index 879aa335..90e51d4a 100644 --- a/dbt/include/athena/macros/materializations/models/incremental/incremental.sql +++ b/dbt/include/athena/macros/materializations/models/incremental/incremental.sql @@ -7,7 +7,7 @@ {% set lf_tags_config = config.get('lf_tags_config') %} {% set lf_grants = config.get('lf_grants') %} - {% set partitioned_by = config.get('partitioned_by', default=none) %} + {% set partitioned_by = config.get('partitioned_by') %} {% set target_relation = this.incorporate(type='table') %} {% set existing_relation = load_relation(this) %} {% set tmp_relation = make_temp_relation(this) %} @@ -24,16 +24,18 @@ {% set to_drop = [] %} {% if existing_relation is none %} - {% set build_sql = create_table_as(False, target_relation, sql) -%} + {%- do safe_create_table_as(False, target_relation, sql) -%} + {% set build_sql = "select 'SUCCESSFULLY CREATED TABLE {{ target_relation }} from scratch'" -%} {% elif existing_relation.is_view or should_full_refresh() %} {% do drop_relation(existing_relation) %} - {% set build_sql = create_table_as(False, target_relation, sql) -%} + {%- do safe_create_table_as(False, target_relation, sql) -%} + {% set build_sql = "select 'SUCCESSFULLY RECREATED TABLE {{ target_relation }}'" -%} {% elif partitioned_by is not none and strategy == 'insert_overwrite' %} {% set tmp_relation = make_temp_relation(target_relation) %} {% if tmp_relation is not none %} {% do drop_relation(tmp_relation) %} {% endif %} - {% do run_query(create_table_as(True, tmp_relation, sql)) %} + {%- do safe_create_table_as(True, tmp_relation, sql) -%} {% do delete_overlapping_partitions(target_relation, tmp_relation, partitioned_by) %} {% set build_sql = incremental_insert(on_schema_change, tmp_relation, target_relation, existing_relation) %} {% do to_drop.append(tmp_relation) %} @@ -42,7 +44,7 @@ {% if tmp_relation is not none %} {% do drop_relation(tmp_relation) %} {% endif %} - {% do run_query(create_table_as(True, tmp_relation, sql)) %} + {%- do safe_create_table_as(True, tmp_relation, sql) -%} {% set build_sql = incremental_insert(on_schema_change, tmp_relation, target_relation, existing_relation) %} {% do to_drop.append(tmp_relation) %} {% elif strategy == 'merge' and table_type == 'iceberg' %} @@ -67,7 +69,7 @@ {% if tmp_relation is not none %} {% do drop_relation(tmp_relation) %} {% endif %} - {% do run_query(create_table_as(True, tmp_relation, sql)) %} + {%- do safe_create_table_as(True, tmp_relation, sql) -%} {% set build_sql = iceberg_merge(on_schema_change, tmp_relation, target_relation, unique_key, incremental_predicates, existing_relation, delete_condition) %} {% do to_drop.append(tmp_relation) %} {% endif %} diff --git a/dbt/include/athena/macros/materializations/models/incremental/merge.sql b/dbt/include/athena/macros/materializations/models/incremental/merge.sql index cd06cb03..0bffc5c6 100644 --- a/dbt/include/athena/macros/materializations/models/incremental/merge.sql +++ b/dbt/include/athena/macros/materializations/models/incremental/merge.sql @@ -73,33 +73,67 @@ {%- endfor -%} {%- set update_columns = get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns_wo_keys) -%} {%- set src_cols_csv = src_columns_quoted | join(', ') -%} - merge into {{ target_relation }} as target using {{ tmp_relation }} as src - on ( - {%- for key in unique_key_cols %} - target.{{ key }} = src.{{ key }} {{ "and " if not loop.last }} - {%- endfor %} - ) - {% if incremental_predicates is not none -%} - and ( - {%- for inc_predicate in incremental_predicates %} - {{ inc_predicate }} {{ "and " if not loop.last }} - {%- endfor %} - ) - {%- endif %} - {% if delete_condition is not none -%} - when matched and ({{ delete_condition }}) - then delete - {%- endif %} - when matched - then update set - {%- for col in update_columns %} - {%- if merge_update_columns_rules and col.name in merge_update_columns_rules %} - {{ get_update_statement(col, merge_update_columns_rules[col.name], loop.last) }} - {%- else -%} - {{ get_update_statement(col, merge_update_columns_default_rule, loop.last) }} - {%- endif -%} - {%- endfor %} - when not matched - then insert ({{ dest_cols_csv }}) - values ({{ src_cols_csv }}); + + {%- set src_part -%} + merge into {{ target_relation }} as target using {{ tmp_relation }} as src + {%- endset -%} + + {%- set merge_part -%} + on ( + {%- for key in unique_key_cols -%} + target.{{ key }} = src.{{ key }} + {{ " and " if not loop.last }} + {%- endfor -%} + {% if incremental_predicates is not none -%} + and ( + {%- for inc_predicate in incremental_predicates %} + {{ inc_predicate }} {{ "and " if not loop.last }} + {%- endfor %} + ) + {%- endif %} + ) + {% if delete_condition is not none -%} + when matched and ({{ delete_condition }}) + then delete + {%- endif %} + when matched + then update set + {%- for col in update_columns %} + {%- if merge_update_columns_rules and col.name in merge_update_columns_rules %} + {{ get_update_statement(col, merge_update_columns_rules[col.name], loop.last) }} + {%- else -%} + {{ get_update_statement(col, merge_update_columns_default_rule, loop.last) }} + {%- endif -%} + {%- endfor %} + when not matched + then insert ({{ dest_cols_csv }}) + values ({{ src_cols_csv }}) + {%- endset -%} + + {%- set merge_full -%} + {{ src_part }} + {{ merge_part }} + {%- endset -%} + + {%- set query_result = adapter.run_query_with_partitions_limit_catching(merge_full) -%} + {%- do log('QUERY RESULT: ' ~ query_result) -%} + {%- if query_result == 'TOO_MANY_OPEN_PARTITIONS' -%} + {% set partitions_batches = get_partition_batches(tmp_relation) %} + {% do log('BATCHES TO PROCESS: ' ~ partitions_batches | length) %} + {%- for batch in partitions_batches -%} + {%- do log('BATCH PROCESSING: ' ~ loop.index ~ ' OF ' ~ partitions_batches | length) -%} + {%- set src_batch_part -%} + merge into {{ target_relation }} as target + using (select * from {{ tmp_relation }} where {{ batch }}) as src + {%- endset -%} + {%- set merge_batch -%} + {{ src_batch_part }} + {{ merge_part }} + {%- endset -%} + {%- do run_query(merge_batch) -%} + {%- endfor -%} + + {%- endif -%} + + SELECT 'SUCCESSFULLY INSERTED DATA IN {{ target_relation }}' {%- endmacro %} diff --git a/dbt/include/athena/macros/materializations/models/table/create_table_as.sql b/dbt/include/athena/macros/materializations/models/table/create_table_as.sql index af730a30..f64e7543 100644 --- a/dbt/include/athena/macros/materializations/models/table/create_table_as.sql +++ b/dbt/include/athena/macros/materializations/models/table/create_table_as.sql @@ -87,3 +87,42 @@ as {{ sql }} {% endmacro %} + +{% macro create_table_as_with_partitions(temporary, relation, sql) -%} + + {% set partitions_batches = get_partition_batches(sql) %} + {% do log('BATCHES TO PROCESS: ' ~ partitions_batches | length) %} + + {%- do log('CREATE EMPTY TABLE: ' ~ relation) -%} + {%- set create_empty_table_query -%} + {{ create_table_as(temporary, relation, sql) }} + limit 0 + {%- endset -%} + {%- do run_query(create_empty_table_query) -%} + {%- set dest_columns = adapter.get_columns_in_relation(relation) -%} + {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} + + {%- for batch in partitions_batches -%} + {%- do log('BATCH PROCESSING: ' ~ loop.index ~ ' OF ' ~ partitions_batches | length) -%} + + {%- set insert_batch_partitions -%} + insert into {{ relation }} ({{ dest_cols_csv }}) + select {{ dest_cols_csv }} + from ({{ sql }}) + where {{ batch }} + {%- endset -%} + + {%- do run_query(insert_batch_partitions) -%} + {%- endfor -%} + + select 'SUCCESSFULLY CREATED TABLE {{ relation }}' + +{%- endmacro %} + +{% macro safe_create_table_as(temporary, relation, sql) -%} + {%- set query_result = adapter.run_query_with_partitions_limit_catching(create_table_as(temporary, relation, sql)) -%} + {%- do log('QUERY RESULT: ' ~ query_result) -%} + {%- if query_result == 'TOO_MANY_OPEN_PARTITIONS' -%} + {%- do create_table_as_with_partitions(temporary, relation, sql) -%} + {%- endif -%} +{%- endmacro %} diff --git a/dbt/include/athena/macros/materializations/models/table/table.sql b/dbt/include/athena/macros/materializations/models/table/table.sql index 7f9856e4..ee82a633 100644 --- a/dbt/include/athena/macros/materializations/models/table/table.sql +++ b/dbt/include/athena/macros/materializations/models/table/table.sql @@ -49,28 +49,22 @@ {%- endif -%} -- create tmp table - {% call statement('main') -%} - {{ create_table_as(False, tmp_relation, sql) }} - {%- endcall %} + {%- do safe_create_table_as(False, tmp_relation, sql) -%} -- swap table - {%- set swap_table = adapter.swap_table(tmp_relation, - target_relation) -%} + {%- set swap_table = adapter.swap_table(tmp_relation, target_relation) -%} -- delete glue tmp table, do not use drop_relation, as it will remove data of the target table {%- do adapter.delete_from_glue_catalog(tmp_relation) -%} - {% do adapter.expire_glue_table_versions(target_relation, - versions_to_keep, - True) %} + {% do adapter.expire_glue_table_versions(target_relation, versions_to_keep, True) %} + {%- else -%} -- Here we are in the case of non-ha tables or ha tables but in case of full refresh. {%- if old_relation is not none -%} {{ drop_relation(old_relation) }} {%- endif -%} - {%- call statement('main') -%} - {{ create_table_as(False, target_relation, sql) }} - {%- endcall %} + {%- do safe_create_table_as(False, target_relation, sql) -%} {%- endif -%} {{ set_table_classification(target_relation) }} @@ -78,14 +72,10 @@ {%- else -%} {%- if old_relation is none -%} - {%- call statement('main') -%} - {{ create_table_as(False, target_relation, sql) }} - {%- endcall %} + {%- do safe_create_table_as(False, target_relation, sql) -%} {%- else -%} {%- if old_relation.is_view -%} - {%- call statement('main') -%} - {{ create_table_as(False, tmp_relation, sql) }} - {%- endcall -%} + {%- do safe_create_table_as(False, tmp_relation, sql) -%} {%- do drop_relation(old_relation) -%} {%- do rename_relation(tmp_relation, target_relation) -%} {%- else -%} @@ -103,9 +93,7 @@ {%- do drop_relation(old_relation_bkp) -%} {%- endif -%} - {%- call statement('main') -%} - {{ create_table_as(False, tmp_relation, sql) }} - {%- endcall -%} + {%- do safe_create_table_as(False, tmp_relation, sql) -%} {{ rename_relation(old_relation, old_relation_bkp) }} {{ rename_relation(tmp_relation, target_relation) }} @@ -116,6 +104,10 @@ {%- endif -%} + {% call statement("main") %} + SELECT 'SUCCESSFULLY CREATED TABLE {{ target_relation }}'; + {% endcall %} + {{ run_hooks(post_hooks) }} {% if lf_tags_config is not none %}