Skip to content

Commit

Permalink
Implement changes in dbt-labs#236
Browse files Browse the repository at this point in the history
  • Loading branch information
nwatson committed Dec 30, 2024
1 parent 0e778b3 commit 72e882a
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 10 deletions.
12 changes: 12 additions & 0 deletions macros/common/helpers/recover_partitions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{% macro recover_partitions(source_node) %}
{{ return(adapter.dispatch('recover_partitions', 'dbt_external_tables')(source_node)) }}
{% endmacro %}

{% macro default__recover_partitions(source_node) %}
/*{#
We're dispatching this macro so that users can override it if required on other adapters
but this will work for spark/databricks.
#}*/

{{ exceptions.raise_not_implemented('recover_partitions macro not implemented for adapter ' + adapter.type()) }}
{% endmacro %}
41 changes: 41 additions & 0 deletions macros/plugins/databricks/create_external_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{% macro databricks__create_external_table(source_node) %}

{%- set columns = source_node.columns.values() -%}
{%- set external = source_node.external -%}
{%- set partitions = external.partitions -%}
{%- set options = external.options -%}

{%- set columns_and_partitions = columns | list -%}
{%- if partitions -%}
{%- for i in partitions -%}
{%- if i.name not in columns_and_partitions | list | map(attribute='name') -%}
{%- do columns_and_partitions.append(i) -%}
{%- endif -%}
{%- endfor -%}
{%- endif -%}

{# https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html #}
create table {{source(source_node.source_name, source_node.name)}}
{%- if columns | length > 0 %} (
{% for column in columns_and_partitions %}
{{column.name}} {{column.data_type}}
{{- ',' if not loop.last -}}
{% endfor %}
) {% endif -%}
{% if external.using %} using {{external.using}} {%- endif %}
{% if options -%} options (
{%- for key, value in options.items() -%}
'{{ key }}' = '{{value}}' {{- ', \n' if not loop.last -}}
{%- endfor -%}
) {%- endif %}
{% if partitions -%} partitioned by (
{%- for partition in partitions -%}
{{partition.name}}{{', ' if not loop.last}}
{%- endfor -%}
) {%- endif %}
{% if external.row_format -%} row format {{external.row_format}} {%- endif %}
{% if external.file_format -%} stored as {{external.file_format}} {%- endif %}
{% if external.location -%} location '{{external.location}}' {%- endif %}
{% if external.table_properties -%} tblproperties {{ external.table_properties }} {%- endif -%}

{% endmacro %}
25 changes: 25 additions & 0 deletions macros/plugins/databricks/get_external_build_plan.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{% macro databricks__get_external_build_plan(source_node) %}

{% set build_plan = [] %}

{% set old_relation = adapter.get_relation(
database = source_node.database,
schema = source_node.schema,
identifier = source_node.identifier
) %}

{% set create_or_replace = (old_relation is none or var('ext_full_refresh', false)) %}

{% if create_or_replace %}
{% set build_plan = build_plan + [
dbt_external_tables.create_external_schema(source_node),
dbt_external_tables.dropif(source_node),
dbt_external_tables.create_external_table(source_node)
] %}
{% else %}
{% set build_plan = build_plan + dbt_external_tables.refresh_external_table(source_node) %}
{% endif %}

{% do return(build_plan) %}

{% endmacro %}
9 changes: 9 additions & 0 deletions macros/plugins/databricks/helpers/dropif.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{% macro databricks__dropif(node) %}

{% set ddl %}
drop table if exists {{source(node.source_name, node.name)}}
{% endset %}

{{return(ddl)}}

{% endmacro %}
14 changes: 14 additions & 0 deletions macros/plugins/databricks/helpers/recover_partitions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{% macro databricks__recover_partitions(source_node) %}
{# https://docs.databricks.com/sql/language-manual/sql-ref-syntax-ddl-alter-table.html #}

{%- if source_node.external.partitions and source_node.external.using and source_node.external.using|lower != 'delta' -%}
{% set ddl %}
ALTER TABLE {{ source(source_node.source_name, source_node.name) }} RECOVER PARTITIONS
{% endset %}
{%- else -%}
{% set ddl = none %}
{%- endif -%}

{{return(ddl)}}

{% endmacro %}
9 changes: 9 additions & 0 deletions macros/plugins/databricks/refresh_external_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{% macro databricks__refresh_external_table(source_node) %}

{% set refresh %}
refresh table {{source(source_node.source_name, source_node.name)}}
{% endset %}

{% do return([refresh]) %}

{% endmacro %}
11 changes: 1 addition & 10 deletions macros/plugins/spark/helpers/recover_partitions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,4 @@

{% macro recover_partitions(source_node) %}
{{ return(adapter.dispatch('recover_partitions', 'dbt_external_tables')(source_node)) }}
{% endmacro %}

{% macro default__recover_partitions(source_node) %}
/*{#
We're dispatching this macro so that users can override it if required on other adapters
but this will work for spark/databricks.
#}*/

{{ exceptions.raise_not_implemented('recover_partitions macro not implemented for adapter ' + adapter.type()) }}
{% endmacro %}
{% endmacro %}

0 comments on commit 72e882a

Please sign in to comment.