Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Microbatch Config Validation #10752

Merged
7 changes: 7 additions & 0 deletions .changes/unreleased/Features-20240924-154639.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Features
body: 'Parse-time validation of microbatch configs: require event_time, batch_size,
lookback and validate input event_time'
time: 2024-09-24T15:46:39.83112+01:00
custom:
Author: michelleark
Issue: "10709"
65 changes: 65 additions & 0 deletions core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
register_adapter,
)
from dbt.artifacts.resources import FileHash, NodeRelation, NodeVersion
from dbt.artifacts.resources.types import BatchSize
from dbt.artifacts.schemas.base import Writable
from dbt.clients.jinja import MacroStack, get_rendered
from dbt.clients.jinja_static import statically_extract_macro_calls
Expand Down Expand Up @@ -468,6 +469,7 @@
self.check_valid_group_config()
self.check_valid_access_property()
self.check_valid_snapshot_config()
self.check_valid_microbatch_config()

semantic_manifest = SemanticManifest(self.manifest)
if not semantic_manifest.validate():
Expand Down Expand Up @@ -1355,6 +1357,69 @@
continue
node.config.final_validate()

def check_valid_microbatch_config(self):
if os.environ.get("DBT_EXPERIMENTAL_MICROBATCH"):
for node in self.manifest.nodes.values():
if (
node.config.materialized == "incremental"
and node.config.incremental_strategy == "microbatch"
):
# Required configs: event_time, batch_size, begin
event_time = node.config.event_time
if event_time is None:
raise dbt.exceptions.ParsingError(
f"Microbatch model '{node.name}' must provide an 'event_time' (string) config that indicates the name of the event time column."
)
if not isinstance(event_time, str):
raise dbt.exceptions.ParsingError(
f"Microbatch model '{node.name}' must provide an 'event_time' config of type string, but got: {type(event_time)}."
)

begin = node.config.begin
if begin is None:
raise dbt.exceptions.ParsingError(
f"Microbatch model '{node.name}' must provide a 'begin' (datetime) config that indicates the earliest timestamp the microbatch model should be built from."
)

# Try to cast begin to a datetime using same format as mashumaro for consistency with other yaml-provided datetimes
# Mashumaro default: https://github.com/Fatal1ty/mashumaro/blob/4ac16fd060a6c651053475597b58b48f958e8c5c/README.md?plain=1#L1186
if isinstance(begin, str):
try:
begin = datetime.datetime.fromisoformat(begin)
node.config.begin = begin
except Exception:
raise dbt.exceptions.ParsingError(
f"Microbatch model '{node.name}' must provide a 'begin' config of valid datetime (ISO format), but got: {begin}."
)

if not isinstance(begin, datetime.datetime):
raise dbt.exceptions.ParsingError(
f"Microbatch model '{node.name}' must provide a 'begin' config of type datetime, but got: {type(begin)}."
)

batch_size = node.config.batch_size
valid_batch_sizes = [size.value for size in BatchSize]
if batch_size not in valid_batch_sizes:
raise dbt.exceptions.ParsingError(

Check warning on line 1403 in core/dbt/parser/manifest.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/parser/manifest.py#L1403

Added line #L1403 was not covered by tests
f"Microbatch model '{node.name}' must provide a 'batch_size' config that is one of {valid_batch_sizes}, but got: {batch_size}."
)

# Optional config: lookback (int)
lookback = node.config.lookback
if not isinstance(lookback, int) and lookback is not None:
raise dbt.exceptions.ParsingError(

Check warning on line 1410 in core/dbt/parser/manifest.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/parser/manifest.py#L1410

Added line #L1410 was not covered by tests
f"Microbatch model '{node.name}' must provide the optional 'lookback' config as type int, but got: {type(lookback)})."
)

# Validate upstream node event_time (if configured)
for input_unique_id in node.depends_on.nodes:
input_node = self.manifest.expect(unique_id=input_unique_id)
input_event_time = input_node.config.event_time
if input_event_time and not isinstance(input_event_time, str):
raise dbt.exceptions.ParsingError(

Check warning on line 1419 in core/dbt/parser/manifest.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/parser/manifest.py#L1419

Added line #L1419 was not covered by tests
f"Microbatch model '{node.name}' depends on an input node '{input_node.name}' with an 'event_time' config of invalid (non-string) type: {type(input_event_time)}."
)

def write_perf_info(self, target_path: str):
path = os.path.join(target_path, PERF_INFO_FILE_NAME)
write_file(path, json.dumps(self._perf_info, cls=dbt.utils.JSONEncoder, indent=4))
Expand Down
8 changes: 4 additions & 4 deletions tests/functional/microbatch/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@


microbatch_model_ref_render_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day') }}
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
select * from {{ ref('input_model').render() }}
"""

Expand Down Expand Up @@ -369,7 +369,7 @@ def test_run_with_event_time(self, project):


microbatch_model_context_vars = """
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day') }}
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
{{ log("start: "~ model.config.__dbt_internal_microbatch_event_time_start, info=True)}}
{{ log("end: "~ model.config.__dbt_internal_microbatch_event_time_end, info=True)}}
select * from {{ ref('input_model') }}
Expand Down Expand Up @@ -400,7 +400,7 @@ def test_run_with_event_time_logs(self, project):


microbatch_model_failing_incremental_partition_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day') }}
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
{% if '2020-01-02' in (model.config.__dbt_internal_microbatch_event_time_start | string) %}
invalid_sql
{% endif %}
Expand All @@ -425,7 +425,7 @@ def test_run_with_event_time(self, project):


microbatch_model_first_partition_failing_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day') }}
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
{% if '2020-01-01' in (model.config.__dbt_internal_microbatch_event_time_start | string) %}
invalid_sql
{% endif %}
Expand Down
185 changes: 185 additions & 0 deletions tests/functional/microbatch/test_microbatch_config_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
import os
from unittest import mock

import pytest

from dbt.exceptions import ParsingError
from dbt.tests.util import run_dbt

valid_microbatch_model_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', batch_size='day', event_time='event_time') }}
select * from {{ ref('input_model') }}
"""

valid_microbatch_model_no_config_sql = """
select * from {{ ref('input_model') }}
"""

valid_microbatch_model_config_yml = """
models:
- name: microbatch
config:
materialized: incremental
incremental_strategy: microbatch
batch_size: day
event_time: event_time
begin: 2020-01-01
"""

invalid_microbatch_model_config_yml = """
models:
- name: microbatch
config:
materialized: incremental
incremental_strategy: microbatch
batch_size: day
event_time: event_time
begin: 2020-01-01 11 PM
"""

missing_event_time_microbatch_model_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', batch_size='day') }}
select * from {{ ref('input_model') }}
"""

invalid_event_time_microbatch_model_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', batch_size='day', event_time=2) }}
select * from {{ ref('input_model') }}
"""

missing_begin_microbatch_model_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', batch_size='day', event_time='event_time') }}
select * from {{ ref('input_model') }}
"""

invalid_begin_microbatch_model_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', batch_size='day', event_time='event_time', begin=2) }}
select * from {{ ref('input_model') }}
"""


missing_batch_size_microbatch_model_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', event_time='event_time') }}
select * from {{ ref('input_model') }}
"""

invalid_batch_size_microbatch_model_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', batch_size='invalid', event_time='event_time') }}
select * from {{ ref('input_model') }}
"""

invalid_event_time_input_model_sql = """
{{ config(materialized='table', event_time=1) }}

select 1 as id, TIMESTAMP '2020-01-01 00:00:00-0' as event_time
"""

valid_input_model_sql = """
{{ config(materialized='table') }}

select 1 as id, TIMESTAMP '2020-01-01 00:00:00-0' as event_time
"""


class BaseMicrobatchTestParseError:
@pytest.fixture(scope="class")
def models(self):
return {}

@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_parsing_error_raised(self, project):
with pytest.raises(ParsingError):
run_dbt(["parse"])


class BaseMicrobatchTestNoError:
@pytest.fixture(scope="class")
def models(self):
return {}

@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"})
def test_parsing_error_not_raised(self, project):
run_dbt(["parse"])


class TestMissingEventTimeMicrobatch(BaseMicrobatchTestParseError):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": valid_input_model_sql,
"microbatch.sql": missing_event_time_microbatch_model_sql,
}


class TestInvalidEventTimeMicrobatch(BaseMicrobatchTestParseError):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": valid_input_model_sql,
"microbatch.sql": invalid_event_time_microbatch_model_sql,
}


class TestMissingBeginMicrobatch(BaseMicrobatchTestParseError):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": valid_input_model_sql,
"microbatch.sql": missing_begin_microbatch_model_sql,
}


class TestInvaliBeginTypeMicrobatch(BaseMicrobatchTestParseError):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": valid_input_model_sql,
"microbatch.sql": invalid_begin_microbatch_model_sql,
}


class TestInvaliBegiFormatMicrobatch(BaseMicrobatchTestParseError):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": valid_input_model_sql,
"microbatch.sql": valid_microbatch_model_no_config_sql,
"microbatch.yml": invalid_microbatch_model_config_yml,
}


class TestMissingBatchSizeMicrobatch(BaseMicrobatchTestParseError):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": valid_input_model_sql,
"microbatch.sql": missing_batch_size_microbatch_model_sql,
}


class TestInvalidBatchSizeMicrobatch(BaseMicrobatchTestParseError):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": valid_input_model_sql,
"microbatch.sql": invalid_batch_size_microbatch_model_sql,
}


class TestInvalidInputEventTimeMicrobatch(BaseMicrobatchTestParseError):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": invalid_event_time_input_model_sql,
"microbatch.sql": valid_microbatch_model_sql,
}


class TestValidBeginMicrobatch(BaseMicrobatchTestNoError):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": valid_input_model_sql,
"microbatch.sql": valid_microbatch_model_no_config_sql,
"schema.yml": valid_microbatch_model_config_yml,
}
Loading