Skip to content

Commit

Permalink
Log whether a batch will be run in parallel/concurrently or not
Browse files Browse the repository at this point in the history
  • Loading branch information
QMalcolm committed Nov 20, 2024
1 parent f127b7d commit 7b3e54a
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 10 deletions.
11 changes: 11 additions & 0 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
LogHookStartLine,
LogModelResult,
LogStartLine,
MicrobatchExecutionDebug,
)
from dbt.exceptions import CompilationError, DbtInternalError, DbtRuntimeError
from dbt.graph import ResourceTypeSelector
Expand Down Expand Up @@ -719,8 +720,18 @@ def handle_microbatch_model(
batch_runner.set_batches(runner.batches)

if runner._should_run_in_parallel(relation_exists):
fire_event(
MicrobatchExecutionDebug(
msg=f"{batch_runner.describe_batch} is being run concurrently"
)
)
self._submit(pool, [batch_runner], batch_results.append)
else:
fire_event(
MicrobatchExecutionDebug(
msg=f"{batch_runner.describe_batch} is being run sequentially"
)
)
batch_results.append(self.call_runner(batch_runner))
relation_exists = batch_runner.relation_exists

Expand Down
38 changes: 28 additions & 10 deletions tests/functional/microbatch/test_microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
EndOfRunSummary,
GenericExceptionOnRun,
LogModelResult,
MicrobatchExecutionDebug,
MicrobatchMacroOutsideOfBatchesDeprecation,
MicrobatchModelNoEventTimeInputs,
)
Expand Down Expand Up @@ -835,23 +836,40 @@ def test_microbatch(


class TestMicrobatchCanRunParallelOrSequential(BaseMicrobatchTest):
@pytest.fixture
def batch_exc_catcher(self) -> EventCatcher:
return EventCatcher(MicrobatchExecutionDebug) # type: ignore

def test_microbatch(
self,
mocker: MockerFixture,
project,
self, mocker: MockerFixture, project, batch_exc_catcher: EventCatcher
) -> None:
mocked_srip = mocker.patch("dbt.task.run.MicrobatchModelRunner._should_run_in_parallel")

# Should run in parallel
# Should be run in parallel
mocked_srip.return_value = True
with patch_microbatch_end_time("2020-01-03 13:57:00"):
_, logs = run_dbt_and_capture(["run"])
assert "on Thread-" in logs
assert "on MainThread" not in logs
_ = run_dbt(["run"], callbacks=[batch_exc_catcher.catch])

assert len(batch_exc_catcher.caught_events) > 1
some_batches_run_concurrently = False
for caugh_event in batch_exc_catcher.caught_events:
if "is being run concurrently" in caugh_event.data.msg: # type: ignore
some_batches_run_concurrently = True
break
assert some_batches_run_concurrently, "Found no batches being run concurrently!"

# reset caught events
batch_exc_catcher.caught_events = []

# Should _not_ run in parallel
mocked_srip.return_value = False
with patch_microbatch_end_time("2020-01-03 13:57:00"):
_, logs = run_dbt_and_capture(["run"])
assert "on Thread-" not in logs
assert "on MainThread" in logs
_ = run_dbt(["run"], callbacks=[batch_exc_catcher.catch])

assert len(batch_exc_catcher.caught_events) > 1
some_batches_run_concurrently = False
for caugh_event in batch_exc_catcher.caught_events:
if "is being run concurrently" in caugh_event.data.msg: # type: ignore
some_batches_run_concurrently = True
break
assert not some_batches_run_concurrently, "Found a batch being run concurrently!"

0 comments on commit 7b3e54a

Please sign in to comment.