From 2cb2c96a18735f66e0fbf70d7472fc19c8b14586 Mon Sep 17 00:00:00 2001 From: Michelle Ark Date: Thu, 21 Nov 2024 11:21:51 -0500 Subject: [PATCH] make microbatch models skippable --- core/dbt/task/run.py | 9 ++++--- .../functional/microbatch/test_microbatch.py | 24 +++++++++++++++++++ 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/core/dbt/task/run.py b/core/dbt/task/run.py index 1b4be463e78..0b991fc9f29 100644 --- a/core/dbt/task/run.py +++ b/core/dbt/task/run.py @@ -2,7 +2,7 @@ import threading import time from copy import deepcopy -from dataclasses import asdict, field +from dataclasses import asdict from datetime import datetime from multiprocessing.pool import ThreadPool from typing import AbstractSet, Any, Dict, Iterable, List, Optional, Set, Tuple, Type @@ -335,7 +335,7 @@ def execute(self, model, manifest): class MicrobatchModelRunner(ModelRunner): batch_idx: Optional[int] = None - batches: Dict[int, BatchType] = field(default_factory=dict) + batches: Dict[int, BatchType] = {} relation_exists: bool = False def set_batch_idx(self, batch_idx: int) -> None: @@ -704,8 +704,11 @@ def handle_microbatch_model( runner: MicrobatchModelRunner, pool: ThreadPool, ) -> RunResult: - # Initial run computes batch metadata + # Initial run computes batch metadata, unless model is skipped result = self.call_runner(runner) + if result.status == RunStatus.Skipped: + return result + batch_results: List[RunResult] = [] # Execute batches serially until a relation exists, at which point future batches are run in parallel diff --git a/tests/functional/microbatch/test_microbatch.py b/tests/functional/microbatch/test_microbatch.py index c9b9db2ff4e..ddac8959a5a 100644 --- a/tests/functional/microbatch/test_microbatch.py +++ b/tests/functional/microbatch/test_microbatch.py @@ -33,6 +33,12 @@ select 3 as id, TIMESTAMP '2020-01-03 00:00:00-0' as event_time """ +input_model_invalid_sql = """ +{{ config(materialized='table', event_time='event_time') }} + +select invalid as event_time +""" + input_model_without_event_time_sql = """ {{ config(materialized='table') }} @@ -835,6 +841,24 @@ def test_microbatch( assert len(catch_aw.caught_events) == 1 +class TestMicrobatchModelSkipped(BaseMicrobatchTest): + @pytest.fixture(scope="class") + def models(self): + return { + "input_model.sql": input_model_invalid_sql, + "microbatch_model.sql": microbatch_model_sql, + } + + def test_microbatch_model_skipped(self, project) -> None: + run_dbt(["run"], expect_pass=False) + + run_results = get_artifact(project.project_root, "target", "run_results.json") + + microbatch_result = run_results["results"][1] + assert microbatch_result["status"] == "skipped" + assert microbatch_result["batch_results"] is None + + class TestMicrobatchCanRunParallelOrSequential(BaseMicrobatchTest): @pytest.fixture def batch_exc_catcher(self) -> EventCatcher: