From ec3c6e7b5d00a9f359bdb8e09ccfa9dd43018879 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Fri, 20 Sep 2024 14:20:25 -0500 Subject: [PATCH 1/8] Initial pass at microbatch config validation --- core/dbt/parser/manifest.py | 40 +++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/core/dbt/parser/manifest.py b/core/dbt/parser/manifest.py index d54aa898713..7f151c52a09 100644 --- a/core/dbt/parser/manifest.py +++ b/core/dbt/parser/manifest.py @@ -24,6 +24,7 @@ register_adapter, ) from dbt.artifacts.resources import FileHash, NodeRelation, NodeVersion +from dbt.artifacts.resources.types import BatchSize from dbt.artifacts.schemas.base import Writable from dbt.clients.jinja import MacroStack, get_rendered from dbt.clients.jinja_static import statically_extract_macro_calls @@ -468,6 +469,7 @@ def load(self) -> Manifest: self.check_valid_group_config() self.check_valid_access_property() self.check_valid_snapshot_config() + self.check_valid_microbatch_config() semantic_manifest = SemanticManifest(self.manifest) if not semantic_manifest.validate(): @@ -1355,6 +1357,44 @@ def check_valid_snapshot_config(self): continue node.config.final_validate() + def check_valid_microbatch_config(self): + if os.environ.get("DBT_EXPERIMENTAL_MICROBATCH"): + for node in self.manifest.nodes.values(): + if ( + node.config.materialized == "incremental" + and node.config.incremental_strategy == "microbatch" + ): + event_time = node.config.event_time + if not isinstance(event_time, str): + # TODO be more specific/verbose + raise DbtValidationError( + f"When used with microbatch, `event_time` must be of type str, but got {type(event_time)}" + ) + + lookback = node.config.lookback + if not isinstance(lookback, int) and lookback is not None: + # TODO be more specific/verbose + raise DbtValidationError( + f"When used with microbatch, `lookback` must be of type int or None, but got {type(lookback)}" + ) + + batch_size = node.config.batch_size + valid_batch_sizes = [size.value for size in BatchSize] + if batch_size not in valid_batch_sizes: + # TODO be more specific/verbose + raise DbtValidationError( + f"When used with microbatch, `batch_size` must be one of {valid_batch_sizes}, but got {batch_size}" + ) + + for input_unique_id in node.depends_on.nodes: + input_node = self.manifest.expect(unique_id=input_unique_id) + input_event_time = input_node.config.event_time + if input_event_time and not isinstance(input_event_time, str): + # TODO be more specific/verbose + raise DbtValidationError( + f"When used as an input to a microbatch model, `event_time` must be a str or None, but got {type(input_event_time)}" + ) + def write_perf_info(self, target_path: str): path = os.path.join(target_path, PERF_INFO_FILE_NAME) write_file(path, json.dumps(self._perf_info, cls=dbt.utils.JSONEncoder, indent=4)) From 1f75a3b070f782a7f97d55330dbbb72a706807cf Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Mon, 23 Sep 2024 15:51:52 +0100 Subject: [PATCH 2/8] polish up validation + start testing --- core/dbt/parser/manifest.py | 45 +++++++++----- .../test_microbatch_config_validation.py | 60 +++++++++++++++++++ 2 files changed, 90 insertions(+), 15 deletions(-) create mode 100644 tests/functional/microbatch/test_microbatch_config_validation.py diff --git a/core/dbt/parser/manifest.py b/core/dbt/parser/manifest.py index 7f151c52a09..66582fb4547 100644 --- a/core/dbt/parser/manifest.py +++ b/core/dbt/parser/manifest.py @@ -1364,35 +1364,50 @@ def check_valid_microbatch_config(self): node.config.materialized == "incremental" and node.config.incremental_strategy == "microbatch" ): + # Required configs: event_time, batch_size, begin event_time = node.config.event_time + if event_time is None: + raise dbt.exceptions.ParsingError( + f"Microbatch model '{node.name}' must provide an 'event_time' (string) config that indicates " + "the name of the column" + ) if not isinstance(event_time, str): - # TODO be more specific/verbose - raise DbtValidationError( - f"When used with microbatch, `event_time` must be of type str, but got {type(event_time)}" + raise dbt.exceptions.ParsingError( + f"Microbatch model '{node.name}' must provide an 'event_time' config of type string, but got: {type(event_time)}." ) - lookback = node.config.lookback - if not isinstance(lookback, int) and lookback is not None: - # TODO be more specific/verbose - raise DbtValidationError( - f"When used with microbatch, `lookback` must be of type int or None, but got {type(lookback)}" - ) + # begin = node.config.begin + # if begin is None: + # raise dbt.exceptions.ParsingError( + # f"Microbatch model '{node.name}' must provide a 'begin' (datetime) config that indicates the earliest timestamp the microbatch" + # "model should be built from." + # ) + # if not isinstance(begin, datetime): + # raise dbt.exceptions.ParsingError( + # f"Microbatch model '{node.name}' must provide a 'begin' config of type datetime, but got: {type(begin)}." + # ) batch_size = node.config.batch_size valid_batch_sizes = [size.value for size in BatchSize] if batch_size not in valid_batch_sizes: - # TODO be more specific/verbose - raise DbtValidationError( - f"When used with microbatch, `batch_size` must be one of {valid_batch_sizes}, but got {batch_size}" + raise dbt.exceptions.ParsingError( + f"Microbatch model '{node.name}' must provide a 'batch_size' config that is one of {valid_batch_sizes}, but got: {batch_size}." + ) + + # Optional config: lookback (int) + lookback = node.config.lookback + if not isinstance(lookback, int) and lookback is not None: + raise dbt.exceptions.ParsingError( + f"Microbatch model '{node.name}' must provide the optional 'lookback' config as type int, but got: {type(lookback)})." ) + # Validate upstream node event_time (if configured) for input_unique_id in node.depends_on.nodes: input_node = self.manifest.expect(unique_id=input_unique_id) input_event_time = input_node.config.event_time if input_event_time and not isinstance(input_event_time, str): - # TODO be more specific/verbose - raise DbtValidationError( - f"When used as an input to a microbatch model, `event_time` must be a str or None, but got {type(input_event_time)}" + raise dbt.exceptions.ParsingError( + f"Microbatch model '{node.name}' depends on an input node '{input_node.name}' with an 'event_time' config of invalid (non-string) type: {type(input_event_time)}." ) def write_perf_info(self, target_path: str): diff --git a/tests/functional/microbatch/test_microbatch_config_validation.py b/tests/functional/microbatch/test_microbatch_config_validation.py new file mode 100644 index 00000000000..e1be003bed1 --- /dev/null +++ b/tests/functional/microbatch/test_microbatch_config_validation.py @@ -0,0 +1,60 @@ +import pytest +from dbt.tests.util import ( + patch_microbatch_end_time, + relation_from_name, + run_dbt, + run_dbt_and_capture, + write_file, +) +from dbt.exceptions import ParsingError + +missing_event_time_microbatch_model_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', batch_size='day') }} +select * from {{ ref('input_model') }} +""" + +invalid_event_time_microbatch_model_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', batch_size='day', event_time=2) }} +select * from {{ ref('input_model') }} +""" + +missing_batch_size_microbatch_model_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', event_time='event_time') }} +select * from {{ ref('input_model') }} +""" + +invalid_batch_size_microbatch_model_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', batch_size='invalid', event_time='event_time') }} +select * from {{ ref('input_model') }} +""" + +invalid_event_time_input_model_sql = """ +{{ config(materialized='table', event_time=1) }} + +select 1 as id, TIMESTAMP '2020-01-01 00:00:00-0' as event_time +""" + +valid_input_model_sql = """ +{{ config(materialized='table') }} + +select 1 as id, TIMESTAMP '2020-01-01 00:00:00-0' as event_time +""" + + +class BaseMicrobatchTest: + @pytest.fixture(scope="class") + def models(self): + return {} + + def test_parsing_error_raised(self, project): + with pytest.raises(ParsingError): + run_dbt(["parse"]) + + +class TestMissingEventTimeMicrobatch(BaseMicrobatchTest): + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": valid_input_model_sql, + "microbatch.sql": missing_event_time_microbatch_model_sql + } From 7115034dacf2ad4b56b93928aa310db72a004af7 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Tue, 24 Sep 2024 14:51:43 +0100 Subject: [PATCH 3/8] add functional tests --- core/dbt/parser/manifest.py | 3 +- .../test_microbatch_config_validation.py | 50 +++++++++++++++++-- 2 files changed, 47 insertions(+), 6 deletions(-) diff --git a/core/dbt/parser/manifest.py b/core/dbt/parser/manifest.py index 66582fb4547..7470b825d07 100644 --- a/core/dbt/parser/manifest.py +++ b/core/dbt/parser/manifest.py @@ -1368,8 +1368,7 @@ def check_valid_microbatch_config(self): event_time = node.config.event_time if event_time is None: raise dbt.exceptions.ParsingError( - f"Microbatch model '{node.name}' must provide an 'event_time' (string) config that indicates " - "the name of the column" + f"Microbatch model '{node.name}' must provide an 'event_time' (string) config that indicates the name of the event time column." ) if not isinstance(event_time, str): raise dbt.exceptions.ParsingError( diff --git a/tests/functional/microbatch/test_microbatch_config_validation.py b/tests/functional/microbatch/test_microbatch_config_validation.py index e1be003bed1..fbc30715790 100644 --- a/tests/functional/microbatch/test_microbatch_config_validation.py +++ b/tests/functional/microbatch/test_microbatch_config_validation.py @@ -1,13 +1,18 @@ +import os import pytest +from unittest import mock + from dbt.tests.util import ( - patch_microbatch_end_time, - relation_from_name, run_dbt, - run_dbt_and_capture, - write_file, ) from dbt.exceptions import ParsingError + +valid_microbatch_model_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', batch_size='day', event_time='event_time') }} +select * from {{ ref('input_model') }} +""" + missing_event_time_microbatch_model_sql = """ {{ config(materialized='incremental', incremental_strategy='microbatch', batch_size='day') }} select * from {{ ref('input_model') }} @@ -46,6 +51,7 @@ class BaseMicrobatchTest: def models(self): return {} + @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) def test_parsing_error_raised(self, project): with pytest.raises(ParsingError): run_dbt(["parse"]) @@ -58,3 +64,39 @@ def models(self): "input_model.sql": valid_input_model_sql, "microbatch.sql": missing_event_time_microbatch_model_sql } + + +class TestInvalidEventTimeMicrobatch(BaseMicrobatchTest): + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": valid_input_model_sql, + "microbatch.sql": invalid_event_time_microbatch_model_sql + } + + +class TestMissingBatchSizeMicrobatch(BaseMicrobatchTest): + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": valid_input_model_sql, + "microbatch.sql": missing_batch_size_microbatch_model_sql + } + + +class TestInvalidBatchSizeMicrobatch(BaseMicrobatchTest): + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": valid_input_model_sql, + "microbatch.sql": invalid_batch_size_microbatch_model_sql + } + + +class TestInvalidInputEventTimeMicrobatch(BaseMicrobatchTest): + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": invalid_event_time_input_model_sql, + "microbatch.sql": valid_microbatch_model_sql + } From 9c8a3db3f18aad89965a72c16153647eafd2431a Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Tue, 24 Sep 2024 14:55:28 +0100 Subject: [PATCH 4/8] add begin parsing validation --- core/dbt/parser/manifest.py | 19 ++++++------ .../test_microbatch_config_validation.py | 29 +++++++++++++++++++ 2 files changed, 38 insertions(+), 10 deletions(-) diff --git a/core/dbt/parser/manifest.py b/core/dbt/parser/manifest.py index 7470b825d07..752955c9a4e 100644 --- a/core/dbt/parser/manifest.py +++ b/core/dbt/parser/manifest.py @@ -1375,16 +1375,15 @@ def check_valid_microbatch_config(self): f"Microbatch model '{node.name}' must provide an 'event_time' config of type string, but got: {type(event_time)}." ) - # begin = node.config.begin - # if begin is None: - # raise dbt.exceptions.ParsingError( - # f"Microbatch model '{node.name}' must provide a 'begin' (datetime) config that indicates the earliest timestamp the microbatch" - # "model should be built from." - # ) - # if not isinstance(begin, datetime): - # raise dbt.exceptions.ParsingError( - # f"Microbatch model '{node.name}' must provide a 'begin' config of type datetime, but got: {type(begin)}." - # ) + begin = node.config.begin + if begin is None: + raise dbt.exceptions.ParsingError( + f"Microbatch model '{node.name}' must provide a 'begin' (datetime) config that indicates the earliest timestamp the microbatch model should be built from." + ) + if not isinstance(begin, datetime.datetime): + raise dbt.exceptions.ParsingError( + f"Microbatch model '{node.name}' must provide a 'begin' config of type datetime, but got: {type(begin)}." + ) batch_size = node.config.batch_size valid_batch_sizes = [size.value for size in BatchSize] diff --git a/tests/functional/microbatch/test_microbatch_config_validation.py b/tests/functional/microbatch/test_microbatch_config_validation.py index fbc30715790..ae8788623cb 100644 --- a/tests/functional/microbatch/test_microbatch_config_validation.py +++ b/tests/functional/microbatch/test_microbatch_config_validation.py @@ -23,6 +23,17 @@ select * from {{ ref('input_model') }} """ +missing_begin_microbatch_model_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', batch_size='day', event_time='event_time') }} +select * from {{ ref('input_model') }} +""" + +invalid_begin_microbatch_model_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', batch_size='day', event_time='event_time', begin=2) }} +select * from {{ ref('input_model') }} +""" + + missing_batch_size_microbatch_model_sql = """ {{ config(materialized='incremental', incremental_strategy='microbatch', event_time='event_time') }} select * from {{ ref('input_model') }} @@ -75,6 +86,24 @@ def models(self): } +class TestMissingBeginMicrobatch(BaseMicrobatchTest): + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": valid_input_model_sql, + "microbatch.sql": missing_begin_microbatch_model_sql + } + + +class TestInvaliBeginMicrobatch(BaseMicrobatchTest): + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": valid_input_model_sql, + "microbatch.sql": invalid_begin_microbatch_model_sql + } + + class TestMissingBatchSizeMicrobatch(BaseMicrobatchTest): @pytest.fixture(scope="class") def models(self): From 5a2655c92c23288fa9afbb92f650bba0ec74e610 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Tue, 24 Sep 2024 15:38:51 +0100 Subject: [PATCH 5/8] add begin config to existing functional test microbatch models --- .../functional/microbatch/test_microbatch.py | 8 +++--- .../test_microbatch_config_validation.py | 26 +++++++++---------- 2 files changed, 16 insertions(+), 18 deletions(-) diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index fc27482a2df..7b6699334b2 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -39,7 +39,7 @@ microbatch_model_ref_render_sql = """ -{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day') }} +{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} select * from {{ ref('input_model').render() }} """ @@ -369,7 +369,7 @@ def test_run_with_event_time(self, project): microbatch_model_context_vars = """ -{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day') }} +{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} {{ log("start: "~ model.config.__dbt_internal_microbatch_event_time_start, info=True)}} {{ log("end: "~ model.config.__dbt_internal_microbatch_event_time_end, info=True)}} select * from {{ ref('input_model') }} @@ -400,7 +400,7 @@ def test_run_with_event_time_logs(self, project): microbatch_model_failing_incremental_partition_sql = """ -{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day') }} +{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} {% if '2020-01-02' in (model.config.__dbt_internal_microbatch_event_time_start | string) %} invalid_sql {% endif %} @@ -425,7 +425,7 @@ def test_run_with_event_time(self, project): microbatch_model_first_partition_failing_sql = """ -{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day') }} +{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} {% if '2020-01-01' in (model.config.__dbt_internal_microbatch_event_time_start | string) %} invalid_sql {% endif %} diff --git a/tests/functional/microbatch/test_microbatch_config_validation.py b/tests/functional/microbatch/test_microbatch_config_validation.py index ae8788623cb..86114d529b6 100644 --- a/tests/functional/microbatch/test_microbatch_config_validation.py +++ b/tests/functional/microbatch/test_microbatch_config_validation.py @@ -1,12 +1,10 @@ import os -import pytest from unittest import mock -from dbt.tests.util import ( - run_dbt, -) -from dbt.exceptions import ParsingError +import pytest +from dbt.exceptions import ParsingError +from dbt.tests.util import run_dbt valid_microbatch_model_sql = """ {{ config(materialized='incremental', incremental_strategy='microbatch', batch_size='day', event_time='event_time') }} @@ -73,16 +71,16 @@ class TestMissingEventTimeMicrobatch(BaseMicrobatchTest): def models(self): return { "input_model.sql": valid_input_model_sql, - "microbatch.sql": missing_event_time_microbatch_model_sql + "microbatch.sql": missing_event_time_microbatch_model_sql, } - + class TestInvalidEventTimeMicrobatch(BaseMicrobatchTest): @pytest.fixture(scope="class") def models(self): return { "input_model.sql": valid_input_model_sql, - "microbatch.sql": invalid_event_time_microbatch_model_sql + "microbatch.sql": invalid_event_time_microbatch_model_sql, } @@ -91,16 +89,16 @@ class TestMissingBeginMicrobatch(BaseMicrobatchTest): def models(self): return { "input_model.sql": valid_input_model_sql, - "microbatch.sql": missing_begin_microbatch_model_sql + "microbatch.sql": missing_begin_microbatch_model_sql, } - + class TestInvaliBeginMicrobatch(BaseMicrobatchTest): @pytest.fixture(scope="class") def models(self): return { "input_model.sql": valid_input_model_sql, - "microbatch.sql": invalid_begin_microbatch_model_sql + "microbatch.sql": invalid_begin_microbatch_model_sql, } @@ -109,7 +107,7 @@ class TestMissingBatchSizeMicrobatch(BaseMicrobatchTest): def models(self): return { "input_model.sql": valid_input_model_sql, - "microbatch.sql": missing_batch_size_microbatch_model_sql + "microbatch.sql": missing_batch_size_microbatch_model_sql, } @@ -118,7 +116,7 @@ class TestInvalidBatchSizeMicrobatch(BaseMicrobatchTest): def models(self): return { "input_model.sql": valid_input_model_sql, - "microbatch.sql": invalid_batch_size_microbatch_model_sql + "microbatch.sql": invalid_batch_size_microbatch_model_sql, } @@ -127,5 +125,5 @@ class TestInvalidInputEventTimeMicrobatch(BaseMicrobatchTest): def models(self): return { "input_model.sql": invalid_event_time_input_model_sql, - "microbatch.sql": valid_microbatch_model_sql + "microbatch.sql": valid_microbatch_model_sql, } From 0a48bb924d6cfd540cfae32b2520295b9df98da6 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Tue, 24 Sep 2024 15:46:44 +0100 Subject: [PATCH 6/8] changelog entry --- .changes/unreleased/Features-20240924-154639.yaml | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 .changes/unreleased/Features-20240924-154639.yaml diff --git a/.changes/unreleased/Features-20240924-154639.yaml b/.changes/unreleased/Features-20240924-154639.yaml new file mode 100644 index 00000000000..41fbdeaaa6f --- /dev/null +++ b/.changes/unreleased/Features-20240924-154639.yaml @@ -0,0 +1,7 @@ +kind: Features +body: 'Parse-time validation of microbatch configs: require event_time, batch_size, + lookback and validate input event_time' +time: 2024-09-24T15:46:39.83112+01:00 +custom: + Author: michelleark + Issue: "10709" From 165507a4e6a413ca4949bf7221c88418f2a0cb31 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Tue, 24 Sep 2024 17:10:05 +0100 Subject: [PATCH 7/8] fix schema.yml config validation for datetime --- core/dbt/artifacts/resources/v1/config.py | 3 +- .../test_microbatch_config_validation.py | 51 ++++++++++++++++--- 2 files changed, 45 insertions(+), 9 deletions(-) diff --git a/core/dbt/artifacts/resources/v1/config.py b/core/dbt/artifacts/resources/v1/config.py index 5a7977c0958..79529aa9e99 100644 --- a/core/dbt/artifacts/resources/v1/config.py +++ b/core/dbt/artifacts/resources/v1/config.py @@ -1,5 +1,6 @@ import re from dataclasses import dataclass, field +from datetime import datetime from typing import Any, Dict, List, Optional, Union from mashumaro.jsonschema.annotations import Pattern @@ -82,7 +83,7 @@ class NodeConfig(NodeAndTestConfig): incremental_strategy: Optional[str] = None batch_size: Any = None lookback: Any = 0 - begin: Any = None + begin: Union[datetime, Any] = None persist_docs: Dict[str, Any] = field(default_factory=dict) post_hook: List[Hook] = field( default_factory=list, diff --git a/tests/functional/microbatch/test_microbatch_config_validation.py b/tests/functional/microbatch/test_microbatch_config_validation.py index 86114d529b6..6a27eb97fad 100644 --- a/tests/functional/microbatch/test_microbatch_config_validation.py +++ b/tests/functional/microbatch/test_microbatch_config_validation.py @@ -11,6 +11,21 @@ select * from {{ ref('input_model') }} """ +valid_microbatch_model_no_config_sql = """ +select * from {{ ref('input_model') }} +""" + +valid_microbatch_model_config_yml = """ +models: + - name: microbatch + config: + materialized: incremental + incremental_strategy: microbatch + batch_size: day + event_time: event_time + begin: 2020-01-01 +""" + missing_event_time_microbatch_model_sql = """ {{ config(materialized='incremental', incremental_strategy='microbatch', batch_size='day') }} select * from {{ ref('input_model') }} @@ -55,7 +70,7 @@ """ -class BaseMicrobatchTest: +class BaseMicrobatchTestParseError: @pytest.fixture(scope="class") def models(self): return {} @@ -66,7 +81,17 @@ def test_parsing_error_raised(self, project): run_dbt(["parse"]) -class TestMissingEventTimeMicrobatch(BaseMicrobatchTest): +class BaseMicrobatchTestNoError: + @pytest.fixture(scope="class") + def models(self): + return {} + + @mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) + def test_parsing_error_not_raised(self, project): + run_dbt(["parse"]) + + +class TestMissingEventTimeMicrobatch(BaseMicrobatchTestParseError): @pytest.fixture(scope="class") def models(self): return { @@ -75,7 +100,7 @@ def models(self): } -class TestInvalidEventTimeMicrobatch(BaseMicrobatchTest): +class TestInvalidEventTimeMicrobatch(BaseMicrobatchTestParseError): @pytest.fixture(scope="class") def models(self): return { @@ -84,7 +109,7 @@ def models(self): } -class TestMissingBeginMicrobatch(BaseMicrobatchTest): +class TestMissingBeginMicrobatch(BaseMicrobatchTestParseError): @pytest.fixture(scope="class") def models(self): return { @@ -93,7 +118,7 @@ def models(self): } -class TestInvaliBeginMicrobatch(BaseMicrobatchTest): +class TestInvaliBeginMicrobatch(BaseMicrobatchTestParseError): @pytest.fixture(scope="class") def models(self): return { @@ -102,7 +127,7 @@ def models(self): } -class TestMissingBatchSizeMicrobatch(BaseMicrobatchTest): +class TestMissingBatchSizeMicrobatch(BaseMicrobatchTestParseError): @pytest.fixture(scope="class") def models(self): return { @@ -111,7 +136,7 @@ def models(self): } -class TestInvalidBatchSizeMicrobatch(BaseMicrobatchTest): +class TestInvalidBatchSizeMicrobatch(BaseMicrobatchTestParseError): @pytest.fixture(scope="class") def models(self): return { @@ -120,10 +145,20 @@ def models(self): } -class TestInvalidInputEventTimeMicrobatch(BaseMicrobatchTest): +class TestInvalidInputEventTimeMicrobatch(BaseMicrobatchTestParseError): @pytest.fixture(scope="class") def models(self): return { "input_model.sql": invalid_event_time_input_model_sql, "microbatch.sql": valid_microbatch_model_sql, } + + +class TestValidBeginMicrobatch(BaseMicrobatchTestNoError): + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": valid_input_model_sql, + "microbatch.sql": valid_microbatch_model_no_config_sql, + "schema.yml": valid_microbatch_model_config_yml, + } From c1449bc5983e9c5d77c651e34ce039117bc29fab Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Tue, 24 Sep 2024 19:04:03 +0100 Subject: [PATCH 8/8] use isoformat to parse str begin config --- core/dbt/artifacts/resources/v1/config.py | 3 +-- core/dbt/parser/manifest.py | 12 ++++++++++ .../test_microbatch_config_validation.py | 23 ++++++++++++++++++- 3 files changed, 35 insertions(+), 3 deletions(-) diff --git a/core/dbt/artifacts/resources/v1/config.py b/core/dbt/artifacts/resources/v1/config.py index 79529aa9e99..5a7977c0958 100644 --- a/core/dbt/artifacts/resources/v1/config.py +++ b/core/dbt/artifacts/resources/v1/config.py @@ -1,6 +1,5 @@ import re from dataclasses import dataclass, field -from datetime import datetime from typing import Any, Dict, List, Optional, Union from mashumaro.jsonschema.annotations import Pattern @@ -83,7 +82,7 @@ class NodeConfig(NodeAndTestConfig): incremental_strategy: Optional[str] = None batch_size: Any = None lookback: Any = 0 - begin: Union[datetime, Any] = None + begin: Any = None persist_docs: Dict[str, Any] = field(default_factory=dict) post_hook: List[Hook] = field( default_factory=list, diff --git a/core/dbt/parser/manifest.py b/core/dbt/parser/manifest.py index 752955c9a4e..e265408602b 100644 --- a/core/dbt/parser/manifest.py +++ b/core/dbt/parser/manifest.py @@ -1380,6 +1380,18 @@ def check_valid_microbatch_config(self): raise dbt.exceptions.ParsingError( f"Microbatch model '{node.name}' must provide a 'begin' (datetime) config that indicates the earliest timestamp the microbatch model should be built from." ) + + # Try to cast begin to a datetime using same format as mashumaro for consistency with other yaml-provided datetimes + # Mashumaro default: https://github.com/Fatal1ty/mashumaro/blob/4ac16fd060a6c651053475597b58b48f958e8c5c/README.md?plain=1#L1186 + if isinstance(begin, str): + try: + begin = datetime.datetime.fromisoformat(begin) + node.config.begin = begin + except Exception: + raise dbt.exceptions.ParsingError( + f"Microbatch model '{node.name}' must provide a 'begin' config of valid datetime (ISO format), but got: {begin}." + ) + if not isinstance(begin, datetime.datetime): raise dbt.exceptions.ParsingError( f"Microbatch model '{node.name}' must provide a 'begin' config of type datetime, but got: {type(begin)}." diff --git a/tests/functional/microbatch/test_microbatch_config_validation.py b/tests/functional/microbatch/test_microbatch_config_validation.py index 6a27eb97fad..cdebd3a791b 100644 --- a/tests/functional/microbatch/test_microbatch_config_validation.py +++ b/tests/functional/microbatch/test_microbatch_config_validation.py @@ -26,6 +26,17 @@ begin: 2020-01-01 """ +invalid_microbatch_model_config_yml = """ +models: + - name: microbatch + config: + materialized: incremental + incremental_strategy: microbatch + batch_size: day + event_time: event_time + begin: 2020-01-01 11 PM +""" + missing_event_time_microbatch_model_sql = """ {{ config(materialized='incremental', incremental_strategy='microbatch', batch_size='day') }} select * from {{ ref('input_model') }} @@ -118,7 +129,7 @@ def models(self): } -class TestInvaliBeginMicrobatch(BaseMicrobatchTestParseError): +class TestInvaliBeginTypeMicrobatch(BaseMicrobatchTestParseError): @pytest.fixture(scope="class") def models(self): return { @@ -127,6 +138,16 @@ def models(self): } +class TestInvaliBegiFormatMicrobatch(BaseMicrobatchTestParseError): + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": valid_input_model_sql, + "microbatch.sql": valid_microbatch_model_no_config_sql, + "microbatch.yml": invalid_microbatch_model_config_yml, + } + + class TestMissingBatchSizeMicrobatch(BaseMicrobatchTestParseError): @pytest.fixture(scope="class") def models(self):