Skip to content

Commit

Permalink
Warn if concurrent_batches config is set to True, but the availab…
Browse files Browse the repository at this point in the history
…le adapter doesn't support it (#11145) (#11154)

* Begin producing warning when attempting to force concurrent batches without adapter support

Batches of microbatch models can be executed sequentially or concurrently. We try to figure out which to do intelligently. As part of that, we implemented an override, the model config `concurrent_batches`, to allow the user to bypass _some_ of our automatic detection. However, a user _cannot_ for batches to run concurrently if the adapter doesn't support concurrent batches (declaring support is opt in). Thus, if an adapter _doesn't_ support running batches concurrently, and a user tries to force concurrent execution via `concurrent_batches`, then we need to warn the user that that isn't happening.

* Add custom event type for warning about invalid `concurrent_batches` config

* Fire `InvalidConcurrentBatchesConfig` warning via `warn_or_error` so it can be silenced

(cherry picked from commit 6c61cb7)

Co-authored-by: Quigley Malcolm <[email protected]>
  • Loading branch information
github-actions[bot] and QMalcolm authored Dec 16, 2024
1 parent dbd8ef3 commit 7fdd92f
Show file tree
Hide file tree
Showing 8 changed files with 608 additions and 448 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20241212-113611.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Warn about invalid usages of `concurrent_batches` config
time: 2024-12-12T11:36:11.451962-06:00
custom:
Author: QMalcolm
Issue: "11122"
12 changes: 12 additions & 0 deletions core/dbt/events/core_types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,18 @@ message MicrobatchModelNoEventTimeInputsMsg {
}


// I075
message InvalidConcurrentBatchesConfig {
int32 num_models = 1;
string adapter_type = 2;
}

message InvalidConcurrentBatchesConfigMsg {
CoreEventInfo info = 1;
InvalidConcurrentBatchesConfig data = 2;
}


// M - Deps generation


Expand Down
898 changes: 451 additions & 447 deletions core/dbt/events/core_types_pb2.py

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions core/dbt/events/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -967,6 +967,16 @@ def message(self) -> str:
return warning_tag(msg)


class InvalidConcurrentBatchesConfig(WarnLevel):
def code(self) -> str:
return "I075"

def message(self) -> str:
maybe_plural_count_of_models = pluralize(self.num_models, "microbatch model")
description = f"Found {maybe_plural_count_of_models} with the `concurrent_batches` config set to true, but the {self.adapter_type} adapter does not support running batches concurrently. Batches will be run sequentially."
return line_wrap_message(warning_tag(description))


# =======================================================
# M - Deps generation
# =======================================================
Expand Down
24 changes: 24 additions & 0 deletions core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import dbt.utils
import dbt_common.utils
from dbt import plugins
from dbt.adapters.capability import Capability
from dbt.adapters.factory import (
get_adapter,
get_adapter_package_names,
Expand Down Expand Up @@ -66,6 +67,7 @@
ArtifactWritten,
DeprecatedModel,
DeprecatedReference,
InvalidConcurrentBatchesConfig,
InvalidDisabledTargetInTestNode,
MicrobatchModelNoEventTimeInputs,
NodeNotFoundOrDisabled,
Expand Down Expand Up @@ -510,6 +512,7 @@ def load(self) -> Manifest:
self.check_for_model_deprecations()
self.check_for_spaces_in_resource_names()
self.check_for_microbatch_deprecations()
self.check_forcing_batch_concurrency()

return self.manifest

Expand Down Expand Up @@ -1484,6 +1487,27 @@ def check_valid_microbatch_config(self):
if not has_input_with_event_time_config:
fire_event(MicrobatchModelNoEventTimeInputs(model_name=node.name))

def check_forcing_batch_concurrency(self) -> None:
if self.manifest.use_microbatch_batches(project_name=self.root_project.project_name):
adapter = get_adapter(self.root_project)

if not adapter.supports(Capability.MicrobatchConcurrency):
models_forcing_concurrent_batches = 0
for node in self.manifest.nodes.values():
if (
hasattr(node.config, "concurrent_batches")
and node.config.concurrent_batches is True
):
models_forcing_concurrent_batches += 1

if models_forcing_concurrent_batches > 0:
warn_or_error(
InvalidConcurrentBatchesConfig(
num_models=models_forcing_concurrent_batches,
adapter_type=adapter.type(),
)
)

def write_perf_info(self, target_path: str):
path = os.path.join(target_path, PERF_INFO_FILE_NAME)
write_file(path, json.dumps(self._perf_info, cls=dbt.utils.JSONEncoder, indent=4))
Expand Down
45 changes: 45 additions & 0 deletions tests/functional/microbatch/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
ArtifactWritten,
EndOfRunSummary,
GenericExceptionOnRun,
InvalidConcurrentBatchesConfig,
JinjaLogDebug,
LogBatchResult,
LogModelResult,
Expand Down Expand Up @@ -71,6 +72,11 @@
select * from {{ ref('input_model') }}
"""

microbatch_model_force_concurrent_batches_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0), concurrent_batches=true) }}
select * from {{ ref('input_model') }}
"""

microbatch_yearly_model_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', unique_key='id', event_time='event_time', batch_size='year', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
select * from {{ ref('input_model') }}
Expand Down Expand Up @@ -1083,3 +1089,42 @@ def test_microbatch(

# we had a bug where having only one batch caused a generic exception
assert len(generic_exception_catcher.caught_events) == 0


class TestCanSilenceInvalidConcurrentBatchesConfigWarning(BaseMicrobatchTest):
@pytest.fixture(scope="class")
def models(self):
return {
"input_model.sql": input_model_sql,
"microbatch_model.sql": microbatch_model_force_concurrent_batches_sql,
}

@pytest.fixture
def event_catcher(self) -> EventCatcher:
return EventCatcher(event_to_catch=InvalidConcurrentBatchesConfig) # type: ignore

def test_microbatch(
self,
project,
event_catcher: EventCatcher,
) -> None:
# This test works because postgres doesn't support concurrent batch execution
# If the postgres adapter starts supporting concurrent batch execution we'll
# need to start mocking the return value of `adapter.supports()`

with patch_microbatch_end_time("2020-01-01 13:57:00"):
_ = run_dbt(["run"], callbacks=[event_catcher.catch])
# We didn't silence the warning, so we get it
assert len(event_catcher.caught_events) == 1

# Clear caught events
event_catcher.caught_events = []

# Run again with silencing
with patch_microbatch_end_time("2020-01-01 13:57:00"):
_ = run_dbt(
["run", "--warn-error-options", "{'silence': ['InvalidConcurrentBatchesConfig']}"],
callbacks=[event_catcher.catch],
)
# Because we silenced the warning, it shouldn't get fired
assert len(event_catcher.caught_events) == 0
60 changes: 59 additions & 1 deletion tests/unit/parser/test_manifest.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
from argparse import Namespace
from typing import Optional
from unittest.mock import MagicMock, patch

import pytest
from pytest_mock import MockerFixture

from dbt.adapters.postgres import PostgresAdapter
from dbt.artifacts.resources.base import FileHash
from dbt.config import RuntimeConfig
from dbt.contracts.graph.manifest import Manifest, ManifestStateCheck
from dbt.events.types import UnusedResourceConfigPath
from dbt.events.types import InvalidConcurrentBatchesConfig, UnusedResourceConfigPath
from dbt.flags import set_from_args
from dbt.parser.manifest import ManifestLoader, _warn_for_unused_resource_config_paths
from dbt.parser.read_files import FileDiff
from dbt.tracking import User
from dbt_common.events.event_manager_client import add_callback_to_manager
from tests.unit.fixtures import model_node
from tests.utils import EventCatcher


Expand Down Expand Up @@ -238,3 +241,58 @@ def test_warn_for_unused_resource_config_paths(
else:
assert len(catcher.caught_events) == 1
assert f"{resource_type}.{path}" in str(catcher.caught_events[0].data)


class TestCheckForcingConcurrentBatches:
@pytest.fixture
@patch("dbt.parser.manifest.ManifestLoader.build_manifest_state_check")
@patch("dbt.parser.manifest.os.path.exists")
@patch("dbt.parser.manifest.open")
def manifest_loader(
self, patched_open, patched_os_exist, patched_state_check
) -> ManifestLoader:
mock_project = MagicMock(RuntimeConfig)
mock_project.project_target_path = "mock_target_path"
mock_project.project_name = "mock_project_name"
return ManifestLoader(mock_project, {})

@pytest.fixture
def event_catcher(self) -> EventCatcher:
return EventCatcher(InvalidConcurrentBatchesConfig) # type: ignore

@pytest.mark.parametrize(
"adapter_support,concurrent_batches_config,expect_warning",
[
(False, True, True),
(False, False, False),
(False, None, False),
(True, True, False),
(True, False, False),
(True, None, False),
],
)
def test_check_forcing_concurrent_batches(
self,
mocker: MockerFixture,
manifest_loader: ManifestLoader,
postgres_adapter: PostgresAdapter,
event_catcher: EventCatcher,
adapter_support: bool,
concurrent_batches_config: Optional[bool],
expect_warning: bool,
):
add_callback_to_manager(event_catcher.catch)
model = model_node()
model.config.concurrent_batches = concurrent_batches_config
mocker.patch.object(postgres_adapter, "supports").return_value = adapter_support
mocker.patch("dbt.parser.manifest.get_adapter").return_value = postgres_adapter
mocker.patch.object(manifest_loader.manifest, "use_microbatch_batches").return_value = True

manifest_loader.manifest.add_node_nofile(model)
manifest_loader.check_forcing_batch_concurrency()

if expect_warning:
assert len(event_catcher.caught_events) == 1
assert "Batches will be run sequentially" in event_catcher.caught_events[0].info.msg # type: ignore
else:
assert len(event_catcher.caught_events) == 0
1 change: 1 addition & 0 deletions tests/unit/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ def test_event_codes(self):
core_types.FreshnessConfigProblem(msg=""),
core_types.SemanticValidationFailure(msg=""),
core_types.MicrobatchModelNoEventTimeInputs(model_name=""),
core_types.InvalidConcurrentBatchesConfig(num_models=1, adapter_type=""),
# M - Deps generation ======================
core_types.GitSparseCheckoutSubdirectory(subdir=""),
core_types.GitProgressCheckoutRevision(revision=""),
Expand Down

0 comments on commit 7fdd92f

Please sign in to comment.