From 3cbe12f4543d0357ec0ed4b7f6a17345c6198bae Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Wed, 18 Sep 2024 13:29:42 -0400 Subject: [PATCH] Microbatch strategy (#1179) * first pass: add incremental_predicates * safely add incremental_predicates + testing * remove requirement for unique_id --------- Co-authored-by: Quigley Malcolm --- .../unreleased/Features-20240913-215416.yaml | 6 ++++ dbt/adapters/snowflake/impl.py | 2 +- .../macros/materializations/merge.sql | 32 +++++++++++++++++++ .../adapter/test_incremental_microbatch.py | 24 ++++++++++++++ 4 files changed, 63 insertions(+), 1 deletion(-) create mode 100644 .changes/unreleased/Features-20240913-215416.yaml create mode 100644 tests/functional/adapter/test_incremental_microbatch.py diff --git a/.changes/unreleased/Features-20240913-215416.yaml b/.changes/unreleased/Features-20240913-215416.yaml new file mode 100644 index 000000000..b2a6e556e --- /dev/null +++ b/.changes/unreleased/Features-20240913-215416.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Microbatch incremental strategy +time: 2024-09-13T21:54:16.492885-04:00 +custom: + Author: michelleark + Issue: "1182" diff --git a/dbt/adapters/snowflake/impl.py b/dbt/adapters/snowflake/impl.py index 7e8ec9cf2..a6297887d 100644 --- a/dbt/adapters/snowflake/impl.py +++ b/dbt/adapters/snowflake/impl.py @@ -416,7 +416,7 @@ def submit_python_job(self, parsed_model: dict, compiled_code: str): return response def valid_incremental_strategies(self): - return ["append", "merge", "delete+insert"] + return ["append", "merge", "delete+insert", "microbatch"] def debug_query(self): """Override for DebugTask method""" diff --git a/dbt/include/snowflake/macros/materializations/merge.sql b/dbt/include/snowflake/macros/materializations/merge.sql index e93b29155..57c58afdd 100644 --- a/dbt/include/snowflake/macros/materializations/merge.sql +++ b/dbt/include/snowflake/macros/materializations/merge.sql @@ -48,3 +48,35 @@ {% set dml = default__get_incremental_append_sql(get_incremental_append_sql) %} {% do return(snowflake_dml_explicit_transaction(dml)) %} {% endmacro %} + + +{% macro snowflake__get_incremental_microbatch_sql(arg_dict) %} + {%- set target = arg_dict["target_relation"] -%} + {%- set source = arg_dict["temp_relation"] -%} + {%- set dest_columns = arg_dict["dest_columns"] -%} + {%- set incremental_predicates = [] if arg_dict.get('incremental_predicates') is none else arg_dict.get('incremental_predicates') -%} + + {#-- Add additional incremental_predicates to filter for batch --#} + {% if model.config.get("__dbt_internal_microbatch_event_time_start") -%} + {% do incremental_predicates.append("DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " >= TIMESTAMP '" ~ model.config.__dbt_internal_microbatch_event_time_start ~ "'") %} + {% endif %} + {% if model.config.__dbt_internal_microbatch_event_time_end -%} + {% do incremental_predicates.append("DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " < TIMESTAMP '" ~ model.config.__dbt_internal_microbatch_event_time_end ~ "'") %} + {% endif %} + {% do arg_dict.update({'incremental_predicates': incremental_predicates}) %} + + delete from {{ target }} DBT_INTERNAL_TARGET + using {{ source }} + where ( + {% for predicate in incremental_predicates %} + {%- if not loop.first %}and {% endif -%} {{ predicate }} + {% endfor %} + ); + + {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} + insert into {{ target }} ({{ dest_cols_csv }}) + ( + select {{ dest_cols_csv }} + from {{ source }} + ) +{% endmacro %} diff --git a/tests/functional/adapter/test_incremental_microbatch.py b/tests/functional/adapter/test_incremental_microbatch.py new file mode 100644 index 000000000..bbb57f96c --- /dev/null +++ b/tests/functional/adapter/test_incremental_microbatch.py @@ -0,0 +1,24 @@ +import pytest +from dbt.tests.adapter.incremental.test_incremental_microbatch import ( + BaseMicrobatch, +) + + +# No requirement for a unique_id for snowflake microbatch! +_microbatch_model_no_unique_id_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', event_time='event_time', batch_size='day') }} +select * from {{ ref('input_model') }} +""" + + +class TestSnowflakeMicrobatch(BaseMicrobatch): + @pytest.fixture(scope="class") + def microbatch_model_sql(self) -> str: + return _microbatch_model_no_unique_id_sql + + @pytest.fixture(scope="class") + def insert_two_rows_sql(self, project) -> str: + test_schema_relation = project.adapter.Relation.create( + database=project.database, schema=project.test_schema + ) + return f"insert into {test_schema_relation}.input_model (id, event_time) values (4, '2020-01-04 00:00:00-0'), (5, '2020-01-05 00:00:00-0')"