Skip to content

Commit

Permalink
Add custom event type for warning about invalid concurrent_batches
Browse files Browse the repository at this point in the history
…config
  • Loading branch information
QMalcolm committed Dec 12, 2024
1 parent 1351472 commit d81e7d3
Show file tree
Hide file tree
Showing 5 changed files with 481 additions and 455 deletions.
12 changes: 12 additions & 0 deletions core/dbt/events/core_types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,18 @@ message MicrobatchModelNoEventTimeInputsMsg {
}


// I075
message InvalidConcurrentBatchesConfig {
int32 num_models = 1;
string adapter_type = 2;
}

message InvalidConcurrentBatchesConfigMsg {
CoreEventInfo info = 1;
InvalidConcurrentBatchesConfig data = 2;
}


// M - Deps generation


Expand Down
898 changes: 451 additions & 447 deletions core/dbt/events/core_types_pb2.py

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions core/dbt/events/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -967,6 +967,16 @@ def message(self) -> str:
return warning_tag(msg)


class InvalidConcurrentBatchesConfig(WarnLevel):
def code(self) -> str:
return "I075"

def message(self) -> str:
maybe_plural_count_of_models = pluralize(self.num_models, "microbatch model")
description = f"Found {maybe_plural_count_of_models} with the `concurrent_batches` config set to true, but the {self.adapter_type} adapter does not support running batches concurrently. Batches will be run sequentially."
return line_wrap_message(warning_tag(description))


# =======================================================
# M - Deps generation
# =======================================================
Expand Down
9 changes: 5 additions & 4 deletions core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
ArtifactWritten,
DeprecatedModel,
DeprecatedReference,
InvalidConcurrentBatchesConfig,
InvalidDisabledTargetInTestNode,
MicrobatchModelNoEventTimeInputs,
NodeNotFoundOrDisabled,
Expand Down Expand Up @@ -1498,10 +1499,10 @@ def check_forcing_batch_concurrency(self):

if models_forcing_concurrent_batches > 0:
fire_event(
Note(
msg=f"Found {models_forcing_concurrent_batches} microbatch model(s) with the `concurrent_batches` config set to true, but the {adapter.type()} adapter does not support running batches concurrently. Batches will be run sequentially."
),
level=EventLevel.WARN,
InvalidConcurrentBatchesConfig(
num_models=models_forcing_concurrent_batches,
adapter_type=adapter.type(),
)
)

def write_perf_info(self, target_path: str):
Expand Down
7 changes: 3 additions & 4 deletions tests/unit/parser/test_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@
from dbt.artifacts.resources.base import FileHash
from dbt.config import RuntimeConfig
from dbt.contracts.graph.manifest import Manifest, ManifestStateCheck
from dbt.events.types import UnusedResourceConfigPath
from dbt.events.types import InvalidConcurrentBatchesConfig, UnusedResourceConfigPath
from dbt.flags import set_from_args
from dbt.parser.manifest import ManifestLoader, _warn_for_unused_resource_config_paths
from dbt.parser.read_files import FileDiff
from dbt.tracking import User
from dbt_common.events.event_manager_client import add_callback_to_manager
from dbt_common.events.types import Note
from tests.unit.fixtures import model_node
from tests.utils import EventCatcher

Expand Down Expand Up @@ -260,7 +259,7 @@ def manifest_loader(

@pytest.fixture
def event_catcher(self) -> EventCatcher:
return EventCatcher(Note) # type: ignore
return EventCatcher(InvalidConcurrentBatchesConfig) # type: ignore

@pytest.mark.parametrize(
"adapter_support,concurrent_batches_config,expect_warning",
Expand Down Expand Up @@ -295,6 +294,6 @@ def test_check_forcing_concurrent_batches(

if expect_warning:
assert len(event_catcher.caught_events) == 1
assert "Batches will be run sequentially" in event_catcher.caught_events[0].data.msg # type: ignore
assert "Batches will be run sequentially" in event_catcher.caught_events[0].info.msg # type: ignore
else:
assert len(event_catcher.caught_events) == 0

0 comments on commit d81e7d3

Please sign in to comment.