Skip to content

Commit

Permalink
Microbatch strategy (#1179)
Browse files Browse the repository at this point in the history
* first pass: add incremental_predicates
* safely add incremental_predicates + testing
* remove requirement for unique_id

---------

Co-authored-by: Quigley Malcolm <[email protected]>
  • Loading branch information
MichelleArk and QMalcolm authored Sep 18, 2024
1 parent 49623d7 commit 3cbe12f
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 1 deletion.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20240913-215416.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Microbatch incremental strategy
time: 2024-09-13T21:54:16.492885-04:00
custom:
Author: michelleark
Issue: "1182"
2 changes: 1 addition & 1 deletion dbt/adapters/snowflake/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ def submit_python_job(self, parsed_model: dict, compiled_code: str):
return response

def valid_incremental_strategies(self):
return ["append", "merge", "delete+insert"]
return ["append", "merge", "delete+insert", "microbatch"]

def debug_query(self):
"""Override for DebugTask method"""
Expand Down
32 changes: 32 additions & 0 deletions dbt/include/snowflake/macros/materializations/merge.sql
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,35 @@
{% set dml = default__get_incremental_append_sql(get_incremental_append_sql) %}
{% do return(snowflake_dml_explicit_transaction(dml)) %}
{% endmacro %}


{% macro snowflake__get_incremental_microbatch_sql(arg_dict) %}
{%- set target = arg_dict["target_relation"] -%}
{%- set source = arg_dict["temp_relation"] -%}
{%- set dest_columns = arg_dict["dest_columns"] -%}
{%- set incremental_predicates = [] if arg_dict.get('incremental_predicates') is none else arg_dict.get('incremental_predicates') -%}

{#-- Add additional incremental_predicates to filter for batch --#}
{% if model.config.get("__dbt_internal_microbatch_event_time_start") -%}
{% do incremental_predicates.append("DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " >= TIMESTAMP '" ~ model.config.__dbt_internal_microbatch_event_time_start ~ "'") %}
{% endif %}
{% if model.config.__dbt_internal_microbatch_event_time_end -%}
{% do incremental_predicates.append("DBT_INTERNAL_TARGET." ~ model.config.event_time ~ " < TIMESTAMP '" ~ model.config.__dbt_internal_microbatch_event_time_end ~ "'") %}
{% endif %}
{% do arg_dict.update({'incremental_predicates': incremental_predicates}) %}

delete from {{ target }} DBT_INTERNAL_TARGET
using {{ source }}
where (
{% for predicate in incremental_predicates %}
{%- if not loop.first %}and {% endif -%} {{ predicate }}
{% endfor %}
);

{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
insert into {{ target }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ source }}
)
{% endmacro %}
24 changes: 24 additions & 0 deletions tests/functional/adapter/test_incremental_microbatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import pytest
from dbt.tests.adapter.incremental.test_incremental_microbatch import (
BaseMicrobatch,
)


# No requirement for a unique_id for snowflake microbatch!
_microbatch_model_no_unique_id_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', event_time='event_time', batch_size='day') }}
select * from {{ ref('input_model') }}
"""


class TestSnowflakeMicrobatch(BaseMicrobatch):
@pytest.fixture(scope="class")
def microbatch_model_sql(self) -> str:
return _microbatch_model_no_unique_id_sql

@pytest.fixture(scope="class")
def insert_two_rows_sql(self, project) -> str:
test_schema_relation = project.adapter.Relation.create(
database=project.database, schema=project.test_schema
)
return f"insert into {test_schema_relation}.input_model (id, event_time) values (4, '2020-01-04 00:00:00-0'), (5, '2020-01-05 00:00:00-0')"

0 comments on commit 3cbe12f

Please sign in to comment.