From 771792c7480341a3dff9c56e4e66461b0de91b98 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Wed, 13 Nov 2024 16:23:51 -0500 Subject: [PATCH] fire GenericExceptionOnRun for batch-level exception --- core/dbt/task/run.py | 10 ++++++++-- tests/functional/microbatch/test_microbatch.py | 11 +++++++++-- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index d59e9c6cf72..9c65a7d8438 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -27,11 +27,11 @@ from dbt.contracts.graph.manifest import Manifest from dbt.contracts.graph.nodes import HookNode, ModelNode, ResultNode from dbt.events.types import ( + GenericExceptionOnRun, LogHookEndLine, LogHookStartLine, LogModelResult, LogStartLine, - RunningOperationCaughtError, ) from dbt.exceptions import CompilationError, DbtInternalError, DbtRuntimeError from dbt.graph import ResourceTypeSelector @@ -275,7 +275,13 @@ def print_batch_result_line( level=level, ) if exception: - fire_event(RunningOperationCaughtError(exc=str(exception))) + fire_event( + GenericExceptionOnRun( + unique_id=self.node.unique_id, + exc=f"Exception on worker thread. {str(exception)}", + node_info=self.node.node_info, + ) + ) def print_batch_start_line( self, batch_start: Optional[datetime], batch_idx: int, batch_total: int diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index 3de48225f44..c0f687f0d18 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -3,6 +3,7 @@ import pytest from dbt.events.types import ( + GenericExceptionOnRun, LogModelResult, MicrobatchMacroOutsideOfBatchesDeprecation, MicrobatchModelNoEventTimeInputs, @@ -526,9 +527,15 @@ def models(self): } def test_run_with_event_time(self, project): - # run all partitions from start - 2 expected rows in output, one failed + event_catcher = EventCatcher( + GenericExceptionOnRun, predicate=lambda event: event.data.node_info is not None + ) + with patch_microbatch_end_time("2020-01-03 13:57:00"): - run_dbt(["run"], expect_pass=False) + run_dbt(["run"], callbacks=[event_catcher.catch], expect_pass=False) + + assert len(event_catcher.caught_events) == 1 + # run all partitions from start - 2 expected rows in output, one failed self.assert_row_count(project, "microbatch_model", 2) run_results = get_artifact(project.project_root, "target", "run_results.json")