From c0743e0d7e5d406123845f7e6e20c15ab06e5e15 Mon Sep 17 00:00:00 2001 From: Quigley Malcolm Date: Fri, 6 Dec 2024 19:46:31 -0600 Subject: [PATCH] Update microbatch integrationt tests to catch batch specific event types --- .../functional/microbatch/test_microbatch.py | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index 227d93a1d51..6d3eb960b76 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -8,11 +8,11 @@ EndOfRunSummary, GenericExceptionOnRun, JinjaLogDebug, + LogBatchResult, LogModelResult, MicrobatchExecutionDebug, MicrobatchMacroOutsideOfBatchesDeprecation, MicrobatchModelNoEventTimeInputs, - SkippingDetails, ) from dbt.tests.util import ( get_artifact, @@ -290,15 +290,18 @@ class TestMicrobatchCLI(BaseMicrobatchTest): def test_run_with_event_time(self, project): # run without --event-time-start or --event-time-end - 3 expected rows in output - catcher = EventCatcher(event_to_catch=LogModelResult) + + model_catcher = EventCatcher(event_to_catch=LogModelResult) + batch_catcher = EventCatcher(event_to_catch=LogBatchResult) with patch_microbatch_end_time("2020-01-03 13:57:00"): - run_dbt([self.CLI_COMMAND_NAME], callbacks=[catcher.catch]) + run_dbt([self.CLI_COMMAND_NAME], callbacks=[model_catcher.catch, batch_catcher.catch]) self.assert_row_count(project, "microbatch_model", 3) - assert len(catcher.caught_events) == 5 + assert len(model_catcher.caught_events) == 2 + assert len(batch_catcher.caught_events) == 3 batch_creation_events = 0 - for caught_event in catcher.caught_events: + for caught_event in batch_catcher.caught_events: if "batch 2020" in caught_event.data.description: batch_creation_events += 1 assert caught_event.data.execution_time > 0 @@ -705,20 +708,24 @@ def test_run_with_event_time(self, project): # be skipped and the model marked as failed (not _partial success_) general_exc_catcher = EventCatcher( - GenericExceptionOnRun, predicate=lambda event: event.data.node_info is not None + GenericExceptionOnRun, + predicate=lambda event: event.data.node_info is not None, + ) + batch_catcher = EventCatcher( + event_to_catch=LogBatchResult, + predicate=lambda event: event.data.status == "skipped", ) - skipping_catcher = EventCatcher(event_to_catch=SkippingDetails) # run all partitions from start - 2 expected rows in output, one failed with patch_microbatch_end_time("2020-01-03 13:57:00"): run_dbt( ["run"], expect_pass=False, - callbacks=[general_exc_catcher.catch, skipping_catcher.catch], + callbacks=[general_exc_catcher.catch, batch_catcher.catch], ) assert len(general_exc_catcher.caught_events) == 1 - assert len(skipping_catcher.caught_events) == 2 + assert len(batch_catcher.caught_events) == 2 # Because the first batch failed, and the rest of the batches were skipped, the table shouldn't # exist in the data warehosue