From 263fd3ee14b1616b58630bd635609ce85eeb3424 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Thu, 12 Dec 2024 15:33:55 -0600 Subject: [PATCH] Fire `InvalidConcurrentBatchesConfig` warning via `warn_or_error` so it can be silenced --- core/dbt/parser/manifest.py | 2 +- .../functional/microbatch/test_microbatch.py | 45 +++++++++++++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/core/dbt/parser/manifest.py b/core/dbt/parser/manifest.py index 75d9e398ae6..ba42c6637d3 100644 --- a/core/dbt/parser/manifest.py +++ b/core/dbt/parser/manifest.py @@ -1501,7 +1501,7 @@ def check_forcing_batch_concurrency(self) -> None: models_forcing_concurrent_batches += 1 if models_forcing_concurrent_batches > 0: - fire_event( + warn_or_error( InvalidConcurrentBatchesConfig( num_models=models_forcing_concurrent_batches, adapter_type=adapter.type(), diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index c31657d94fc..953b372b226 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -7,6 +7,7 @@ ArtifactWritten, EndOfRunSummary, GenericExceptionOnRun, + InvalidConcurrentBatchesConfig, JinjaLogDebug, LogBatchResult, LogModelResult, @@ -71,6 +72,11 @@ select * from {{ ref('input_model') }} """ +microbatch_model_force_concurrent_batches_sql = """ +{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0), concurrent_batches=true) }} +select * from {{ ref('input_model') }} +""" + microbatch_yearly_model_sql = """ {{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='year', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }} select * from {{ ref('input_model') }} @@ -1083,3 +1089,42 @@ def test_microbatch( # we had a bug where having only one batch caused a generic exception assert len(generic_exception_catcher.caught_events) == 0 + + +class TestCanSilenceInvalidConcurrentBatchesConfigWarning(BaseMicrobatchTest): + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": input_model_sql, + "microbatch_model.sql": microbatch_model_force_concurrent_batches_sql, + } + + @pytest.fixture + def event_catcher(self) -> EventCatcher: + return EventCatcher(event_to_catch=InvalidConcurrentBatchesConfig) # type: ignore + + def test_microbatch( + self, + project, + event_catcher: EventCatcher, + ) -> None: + # This test works because postgres doesn't support concurrent batch execution + # If the postgres adapter starts supporting concurrent batch execution we'll + # need to start mocking the return value of `adapter.supports()` + + with patch_microbatch_end_time("2020-01-01 13:57:00"): + _ = run_dbt(["run"], callbacks=[event_catcher.catch]) + # We didn't silence the warning, so we get it + assert len(event_catcher.caught_events) == 1 + + # Clear caught events + event_catcher.caught_events = [] + + # Run again with silencing + with patch_microbatch_end_time("2020-01-01 13:57:00"): + _ = run_dbt( + ["run", "--warn-error-options", "{'silence': ['InvalidConcurrentBatchesConfig']}"], + callbacks=[event_catcher.catch], + ) + # Because we silenced the warning, it shouldn't get fired + assert len(event_catcher.caught_events) == 0