diff --git a/core/dbt/parser/manifest.py b/core/dbt/parser/manifest.py index 72f328a0bbe..4eb71dbe7f2 100644 --- a/core/dbt/parser/manifest.py +++ b/core/dbt/parser/manifest.py @@ -17,6 +17,7 @@ import dbt.utils import dbt_common.utils from dbt import plugins +from dbt.adapters.capability import Capability from dbt.adapters.factory import ( get_adapter, get_adapter_package_names, @@ -510,6 +511,7 @@ def load(self) -> Manifest: self.check_for_model_deprecations() self.check_for_spaces_in_resource_names() self.check_for_microbatch_deprecations() + self.check_forcing_batch_concurrency() return self.manifest @@ -1484,6 +1486,24 @@ def check_valid_microbatch_config(self): if not has_input_with_event_time_config: fire_event(MicrobatchModelNoEventTimeInputs(model_name=node.name)) + def check_forcing_batch_concurrency(self): + if self.manifest.use_microbatch_batches(project_name=self.root_project.project_name): + adapter = get_adapter(self.root_project) + + if not adapter.supports(Capability.MicrobatchConcurrency): + models_forcing_concurrent_batches = 0 + for node in self.manifest.nodes.values(): + if node.config.concurrent_batches is True: + models_forcing_concurrent_batches += 1 + + if models_forcing_concurrent_batches > 0: + fire_event( + Note( + msg=f"Found {models_forcing_concurrent_batches} microbatch model(s) with the `concurrent_batches` config set to true, but the {adapter.type()} adapter does not support running batches concurrently. Batches will be run sequentially." + ), + level=EventLevel.WARN, + ) + def write_perf_info(self, target_path: str): path = os.path.join(target_path, PERF_INFO_FILE_NAME) write_file(path, json.dumps(self._perf_info, cls=dbt.utils.JSONEncoder, indent=4)) diff --git a/tests/unit/parser/test_manifest.py b/tests/unit/parser/test_manifest.py index e01b41ce5b2..c6809cab078 100644 --- a/tests/unit/parser/test_manifest.py +++ b/tests/unit/parser/test_manifest.py @@ -1,9 +1,11 @@ from argparse import Namespace +from typing import Optional from unittest.mock import MagicMock, patch import pytest from pytest_mock import MockerFixture +from dbt.adapters.postgres import PostgresAdapter from dbt.artifacts.resources.base import FileHash from dbt.config import RuntimeConfig from dbt.contracts.graph.manifest import Manifest, ManifestStateCheck @@ -13,6 +15,8 @@ from dbt.parser.read_files import FileDiff from dbt.tracking import User from dbt_common.events.event_manager_client import add_callback_to_manager +from dbt_common.events.types import Note +from tests.unit.fixtures import model_node from tests.utils import EventCatcher @@ -238,3 +242,59 @@ def test_warn_for_unused_resource_config_paths( else: assert len(catcher.caught_events) == 1 assert f"{resource_type}.{path}" in str(catcher.caught_events[0].data) + + +class TestCheckForcingConcurrentBatches: + @pytest.fixture + @patch("dbt.parser.manifest.ManifestLoader.build_manifest_state_check") + @patch("dbt.parser.manifest.os.path.exists") + @patch("dbt.parser.manifest.open") + def manifest_loader( + self, patched_open, patched_os_exist, patched_state_check + ) -> ManifestLoader: + mock_project = MagicMock(RuntimeConfig) + mock_project.project_target_path = "mock_target_path" + mock_project.project_name = "mock_project_name" + # patched_os_exist.return_value = True + return ManifestLoader(mock_project, {}) + + @pytest.fixture + def event_catcher(self) -> EventCatcher: + return EventCatcher(Note) # type: ignore + + @pytest.mark.parametrize( + "adapter_support,concurrent_batches_config,expect_warning", + [ + (False, True, True), + (False, False, False), + (False, None, False), + (True, True, False), + (True, False, False), + (True, None, False), + ], + ) + def test_check_forcing_concurrent_batches( + self, + mocker: MockerFixture, + manifest_loader: ManifestLoader, + postgres_adapter: PostgresAdapter, + event_catcher: EventCatcher, + adapter_support: bool, + concurrent_batches_config: Optional[bool], + expect_warning: bool, + ): + add_callback_to_manager(event_catcher.catch) + model = model_node() + model.config.concurrent_batches = concurrent_batches_config + mocker.patch.object(postgres_adapter, "supports").return_value = adapter_support + mocker.patch("dbt.parser.manifest.get_adapter").return_value = postgres_adapter + mocker.patch.object(manifest_loader.manifest, "use_microbatch_batches").return_value = True + + manifest_loader.manifest.add_node_nofile(model) + manifest_loader.check_forcing_batch_concurrency() + + if expect_warning: + assert len(event_catcher.caught_events) == 1 + assert "Batches will be run sequentially" in event_catcher.caught_events[0].data.msg # type: ignore + else: + assert len(event_catcher.caught_events) == 0