Skip to content

Commit

Permalink
retry policy for dbt assets
Browse files Browse the repository at this point in the history
  • Loading branch information
prha committed May 30, 2024
1 parent 1138555 commit 014e8f1
Showing 1 changed file with 36 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
Definitions,
DependencyDefinition,
FreshnessPolicy,
Jitter,
LastPartitionMapping,
NodeInvocation,
OpDefinition,
PartitionMapping,
PartitionsDefinition,
RetryPolicy,
StaticPartitionsDefinition,
TimeWindowPartitionMapping,
asset,
Expand Down Expand Up @@ -305,6 +308,39 @@ def my_dbt_assets(): ...
assert my_dbt_assets.backfill_policy == expected_backfill_policy


@pytest.mark.parametrize(
"retry_policy",
[
None,
RetryPolicy(max_retries=1),
RetryPolicy(max_retries=2, delay=1, jitter=Jitter.FULL),
],
ids=[
"no retry policy",
"retry policy",
"retry policy with jitter",
],
)
def test_retry_policy(
test_jaffle_shop_manifest: Dict[str, Any],
retry_policy: Optional[RetryPolicy],
) -> None:
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_freshness_policy(self, _: Mapping[str, Any]) -> Optional[FreshnessPolicy]:
# Disable freshness policies when using static partitions
return None

@dbt_assets(
manifest=test_jaffle_shop_manifest,
retry_policy=retry_policy,
dagster_dbt_translator=CustomDagsterDbtTranslator(),
)
def my_dbt_assets(): ...

assert isinstance(my_dbt_assets.node_def, OpDefinition)
assert my_dbt_assets.node_def.retry_policy == retry_policy


def test_op_tags(test_jaffle_shop_manifest: Dict[str, Any]):
op_tags = {"a": "b", "c": "d"}

Expand Down

0 comments on commit 014e8f1

Please sign in to comment.