Skip to content

Commit

Permalink
test(dbt): assert that dbt retry can be invoked to yield Output e…
Browse files Browse the repository at this point in the history
…vents
  • Loading branch information
rexledesma committed Mar 11, 2024
1 parent 0562933 commit 5101b91
Showing 1 changed file with 39 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import shutil
from dataclasses import replace
from pathlib import Path
from typing import Any, Dict, List, Optional, Union, cast

Expand All @@ -23,6 +24,8 @@
DbtCliResource,
)
from dagster_dbt.errors import DagsterDbtCliRuntimeError
from dbt.version import __version__ as dbt_version
from packaging import version
from pydantic import ValidationError
from pytest_mock import MockerFixture

Expand Down Expand Up @@ -303,6 +306,42 @@ def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
assert result.success


@pytest.mark.skipif(
version.parse(dbt_version) < version.parse("1.7.9"),
reason="`dbt retry` with `--target-path` support is only available in `dbt-core>=1.7.9`",
)
def test_dbt_retry_execution(
test_jaffle_shop_manifest: Dict[str, Any], dbt: DbtCliResource
) -> None:
test_jaffle_shop_path.joinpath(
os.getenv("DAGSTER_DBT_PYTEST_XDIST_DUCKDB_DBFILE_PATH", "")
).unlink()

@dbt_assets(manifest=test_jaffle_shop_manifest)
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
dbt_invocation = dbt.cli(["run"], context=context, raise_on_error=False)

assert not dbt_invocation.is_successful()
assert not list(dbt_invocation.stream())

yield from dbt.cli(["seed"], context=context).stream()
yield from replace(
dbt.cli(
["retry"],
manifest=dbt_invocation.manifest,
dagster_dbt_translator=dbt_invocation.dagster_dbt_translator,
target_path=dbt_invocation.target_path,
),
context=context,
).stream()

result = materialize([my_dbt_assets], resources={"dbt": dbt})
assert result.success
assert len(result.filter_events(lambda event: event.is_successful_output)) == len(
my_dbt_assets.keys_by_output_name.values()
)


def test_dbt_source_freshness_execution(test_dbt_source_freshness_manifest: Dict[str, Any]) -> None:
@dbt_assets(manifest=test_dbt_source_freshness_manifest)
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
Expand Down

0 comments on commit 5101b91

Please sign in to comment.