From 87afb91d7f6a71ef75fc5a0fbf43e0be729a1587 Mon Sep 17 00:00:00 2001 From: Rex Ledesma Date: Mon, 11 Mar 2024 10:29:55 -0400 Subject: [PATCH] test(dbt): assert that `dbt retry` can be invoked to yield `Output` events (#20395) ## Summary & Motivation Put https://github.com/dagster-io/dagster/pull/18990#issuecomment-1987590960 under test. ## How I Tested These Changes pytest --- .../core/test_resources_v2.py | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_resources_v2.py b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_resources_v2.py index 4ee92b0ebf500..837e0fd71c23c 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_resources_v2.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_resources_v2.py @@ -1,5 +1,6 @@ import os import shutil +from dataclasses import replace from pathlib import Path from typing import Any, Dict, List, Optional, Union, cast @@ -23,6 +24,8 @@ DbtCliResource, ) from dagster_dbt.errors import DagsterDbtCliRuntimeError +from dbt.version import __version__ as dbt_version +from packaging import version from pydantic import ValidationError from pytest_mock import MockerFixture @@ -303,6 +306,42 @@ def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): assert result.success +@pytest.mark.skipif( + version.parse(dbt_version) < version.parse("1.7.9"), + reason="`dbt retry` with `--target-path` support is only available in `dbt-core>=1.7.9`", +) +def test_dbt_retry_execution( + test_jaffle_shop_manifest: Dict[str, Any], dbt: DbtCliResource +) -> None: + @dbt_assets(manifest=test_jaffle_shop_manifest) + def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): + test_jaffle_shop_path.joinpath( + os.environ["DAGSTER_DBT_PYTEST_XDIST_DUCKDB_DBFILE_PATH"] + ).unlink() + + dbt_invocation = dbt.cli(["run"], context=context, raise_on_error=False) + + assert not dbt_invocation.is_successful() + assert not list(dbt_invocation.stream()) + + yield from dbt.cli(["seed"], context=context).stream() + yield from replace( + dbt.cli( + ["retry"], + manifest=dbt_invocation.manifest, + dagster_dbt_translator=dbt_invocation.dagster_dbt_translator, + target_path=dbt_invocation.target_path, + ), + context=context, + ).stream() + + result = materialize([my_dbt_assets], resources={"dbt": dbt}) + assert result.success + assert len(result.filter_events(lambda event: event.is_successful_output)) == len( + my_dbt_assets.keys_by_output_name.values() + ) + + def test_dbt_source_freshness_execution(test_dbt_source_freshness_manifest: Dict[str, Any]) -> None: @dbt_assets(manifest=test_dbt_source_freshness_manifest) def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):