From e99c10cf8c8e6d04c6a83ff79cb14e94aee67b06 Mon Sep 17 00:00:00 2001 From: Vinicius Dalpiccol Date: Wed, 3 Jan 2024 13:49:04 +0100 Subject: [PATCH 1/2] added retry policy arg to dbt assets decorator --- .../dagster-dbt/dagster_dbt/asset_decorator.py | 15 +++++++++++++-- .../dagster_dbt_tests/test_asset_decorator.py | 18 +++++++++++++++++- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/asset_decorator.py b/python_modules/libraries/dagster-dbt/dagster_dbt/asset_decorator.py index 6a2646b05f8a5..b5fa4bfb11f7d 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/asset_decorator.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/asset_decorator.py @@ -20,6 +20,7 @@ DagsterInvalidDefinitionError, Nothing, PartitionsDefinition, + RetryPolicy, multi_asset, ) from dagster._utils.warnings import ( @@ -35,7 +36,11 @@ get_deps, has_self_dependency, ) -from .dagster_dbt_translator import DagsterDbtTranslator, DbtManifestWrapper, validate_translator +from .dagster_dbt_translator import ( + DagsterDbtTranslator, + DbtManifestWrapper, + validate_translator, +) from .dbt_manifest import DbtManifestParam, validate_manifest from .utils import ( ASSET_RESOURCE_TYPES, @@ -55,6 +60,7 @@ def dbt_assets( partitions_def: Optional[PartitionsDefinition] = None, dagster_dbt_translator: DagsterDbtTranslator = DagsterDbtTranslator(), backfill_policy: Optional[BackfillPolicy] = None, + retry_policy: Optional[RetryPolicy] = None, op_tags: Optional[Mapping[str, Any]] = None, ) -> Callable[..., AssetsDefinition]: """Create a definition for how to compute a set of dbt resources, described by a manifest.json. @@ -81,6 +87,7 @@ def dbt_assets( dbt models, seeds, etc. to asset keys and asset metadata. backfill_policy (Optional[BackfillPolicy]): If a partitions_def is defined, this determines how to execute backfills that target multiple partitions. + retry_policy (Optional[RetryPolicy]): The retry policy for the underlying asset function. op_tags (Optional[Dict[str, Any]]): A dictionary of tags for the op that computes the assets. Frameworks may expect and require certain metadata to be attached to a op. Values that are not strings will be json encoded and must meet the criteria that @@ -311,6 +318,7 @@ def inner(fn) -> AssetsDefinition: op_tags=resolved_op_tags, check_specs=check_specs, backfill_policy=backfill_policy, + retry_policy=retry_policy, )(fn) return asset_definition @@ -368,7 +376,10 @@ def get_dbt_multi_asset_args( for test_unique_id in test_unique_ids: test_resource_props = manifest["nodes"][test_unique_id] check_spec = default_asset_check_fn( - asset_key, unique_id, dagster_dbt_translator.settings, test_resource_props + asset_key, + unique_id, + dagster_dbt_translator.settings, + test_resource_props, ) if check_spec: diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/test_asset_decorator.py b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/test_asset_decorator.py index 0944afde1cbb8..627853db706b4 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/test_asset_decorator.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/test_asset_decorator.py @@ -17,6 +17,7 @@ NodeInvocation, PartitionMapping, PartitionsDefinition, + RetryPolicy, TimeWindowPartitionMapping, asset, ) @@ -165,7 +166,9 @@ def my_dbt_assets(): ], ) def test_selections( - select: Optional[str], exclude: Optional[str], expected_asset_names: AbstractSet[str] + select: Optional[str], + exclude: Optional[str], + expected_asset_names: AbstractSet[str], ) -> None: @dbt_assets( manifest=manifest, @@ -231,6 +234,19 @@ def my_dbt_assets(): assert my_dbt_assets.backfill_policy == backfill_policy +def test_retry_policy(): + retry_policy = RetryPolicy(max_retries=2) + + @dbt_assets( + manifest=manifest, + retry_policy=retry_policy, + ) + def my_dbt_assets(): + ... + + assert my_dbt_assets.op.retry_policy == retry_policy + + def test_op_tags(): @dbt_assets(manifest=manifest, op_tags={"a": "b", "c": "d"}) def my_dbt_assets(): From e0b77644db762987be47f079367f33ac0b27c61d Mon Sep 17 00:00:00 2001 From: Vinicius Dalpiccol Date: Wed, 3 Jan 2024 13:55:45 +0100 Subject: [PATCH 2/2] formatting --- .../dagster-dbt/dagster_dbt/asset_decorator.py | 11 ++--------- .../dagster_dbt_tests/test_asset_decorator.py | 4 +--- 2 files changed, 3 insertions(+), 12 deletions(-) diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/asset_decorator.py b/python_modules/libraries/dagster-dbt/dagster_dbt/asset_decorator.py index b5fa4bfb11f7d..11c52d762d293 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/asset_decorator.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/asset_decorator.py @@ -36,11 +36,7 @@ get_deps, has_self_dependency, ) -from .dagster_dbt_translator import ( - DagsterDbtTranslator, - DbtManifestWrapper, - validate_translator, -) +from .dagster_dbt_translator import DagsterDbtTranslator, DbtManifestWrapper, validate_translator from .dbt_manifest import DbtManifestParam, validate_manifest from .utils import ( ASSET_RESOURCE_TYPES, @@ -376,10 +372,7 @@ def get_dbt_multi_asset_args( for test_unique_id in test_unique_ids: test_resource_props = manifest["nodes"][test_unique_id] check_spec = default_asset_check_fn( - asset_key, - unique_id, - dagster_dbt_translator.settings, - test_resource_props, + asset_key, unique_id, dagster_dbt_translator.settings, test_resource_props ) if check_spec: diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/test_asset_decorator.py b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/test_asset_decorator.py index 627853db706b4..5e95d168ce4ff 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/test_asset_decorator.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/test_asset_decorator.py @@ -166,9 +166,7 @@ def my_dbt_assets(): ], ) def test_selections( - select: Optional[str], - exclude: Optional[str], - expected_asset_names: AbstractSet[str], + select: Optional[str], exclude: Optional[str], expected_asset_names: AbstractSet[str] ) -> None: @dbt_assets( manifest=manifest,