diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index 02c6976c848..46cbb2cdbfc 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -42,6 +42,31 @@ select * from {{ ref('input_model') }} """ +invalid_batch_context_macro_sql = """ +{% macro check_invalid_batch_context() %} + +{% if model is not mapping %} + {{ exceptions.raise_compiler_error("`model` is invalid: expected mapping type") }} +{% elif compiled_code and compiled_code is not string %} + {{ exceptions.raise_compiler_error("`compiled_code` is invalid: expected string type") }} +{% elif sql and sql is not string %} + {{ exceptions.raise_compiler_error("`sql` is invalid: expected string type") }} +{% elif is_incremental is not callable %} + {{ exceptions.raise_compiler_error("`is_incremental()` is invalid: expected callable type") }} +{% elif should_full_refresh is not callable %} + {{ exceptions.raise_compiler_error("`should_full_refresh()` is invalid: expected callable type") }} +{% endif %} + +{% endmacro %} +""" + +microbatch_model_with_context_checks_sql = """ +{{ config(pre_hook="{{ check_invalid_batch_context() }}", materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} + +{{ check_invalid_batch_context() }} +select * from {{ ref('input_model') }} +""" + microbatch_model_downstream_sql = """ {{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} select * from {{ ref('microbatch_model') }} @@ -324,6 +349,27 @@ def test_run_with_event_time(self, project): self.assert_row_count(project, "microbatch_model", 5) +class TestMicrobatchJinjaContext(BaseMicrobatchTest): + + @pytest.fixture(scope="class") + def macros(self): + return {"check_batch_context.sql": invalid_batch_context_macro_sql} + + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": input_model_sql, + "microbatch_model.sql": microbatch_model_with_context_checks_sql, + } + + @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + def test_run_with_event_time(self, project): + # initial run -- backfills all data + with patch_microbatch_end_time("2020-01-03 13:57:00"): + run_dbt(["run"]) + self.assert_row_count(project, "microbatch_model", 3) + + class TestMicrobatchWithInputWithoutEventTime(BaseMicrobatchTest): @pytest.fixture(scope="class") def models(self):