-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
added retry policy param to dbt assets decorator #18990
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Requesting changes for your feedback: I'm wondering if this is the right abstraction we should be exposing to our users to accomplish a "retry" in dbt.
To my knowledge, there are three cases in which a retry could be triggered:
- Syntax error
- Business logic error (e.g. a test assertion is failing)
- Connection flakiness
An existing Dagster retry policy accommodates (3), at the expense of (1) and (2). The entire materialization function will be run again, only for the user to encounter (1) and (2) again. (3) is accommodated, but ideally, the materialization function should only run from the point of failure (the flaky dbt model/test execution). Otherwise, a retry could potentially be incredibly expensive.
With the emergence of dbt retry
(link), I think this retry is better served if users handle the retry on their own, in their decorated function. We should add documentation on how to accomplish this. This retry occurs from the point of failure, which alleviates the concerns about using the built-in Dagster retry policy.
Hey Rex, thanks for the comment. I was actually not aware of the dbt retry command. The purpose of this PR really was to solve flaky connections that we've been seeing recently. I agree that retrying with the command should be left to the users, so I'll just close this. |
@askvinni Great, I'll add some documentation on the |
Important You'll need to be on As a breadcrumb for anyone who see this pull request, I'm providing a small code snippet to add If the dbt command fails, we issue a from dataclasses import replace
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets
@dbt_assets(manifest=dbt_manifest_path)
def jaffle_shop_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
dbt_invocation = dbt.cli(["build"], context=context)
try:
yield from dbt_invocation.stream()
except:
dbt_retry_invocation = dbt.cli(
["retry"],
manifest=dbt_invocation.manifest,
dagster_dbt_translator=dbt_invocation.dagster_dbt_translator,
target_path=dbt_invocation.target_path,
)
dbt_retry_invocation = replace(dbt_retry_invocation, context=context)
yield from dbt_retry_invocation.stream() |
Hey @rexledesma, I finally got around to trying this and sadly, it seems like the |
@askvinni Did you try the code snippet that I provided above? It doesn't use the It works on my machine:
Running the following commands on a modified
|
@rexledesma this might be something that's fixed in |
@askvinni Can you give a more detailed description of how you solved this problem? |
@Baksbany22 dbt has a target folder where it generates its |
@rexledesma I have tried this as well with the same result as @askvinni |
Are you on If not, could you try Vinni's workaround? |
@toddy86 There are two ways to solve this problem:
@dbt_assets(manifest=dbt_manifest_path)
def test_bi_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
command = dbt.cli(["build"], context=context) #, target_path = dbt_target_path)
try:
yield from command.stream()
except:
time.sleep(10)
source_file = os.path.join(command.target_path, "run_results.json")
if os.path.exists(source_file):
shutil.copy(source_file, r'/opt/dagster/app/dbt_project/target/')
else:
raise Exception(f'source_file does not exists')
yield from dbt.cli(
["retry"],
manifest=command.manifest,
dagster_dbt_translator=command.dagster_dbt_translator,
target_path=command.target_path,
).stream() |
I’m on dbt 1.7.x. It isn’t critical for us to have these retries on individual dbt assets. So we are testing just brute forcing this with a job level retry. Which I haven’t tested yet, but I’m presuming the job level retry will only pick up the failed dbt assets (perhaps a wrong assumption and we shall see). |
Also hitting this issue : noticed this |
Can confirm this is now working as expected after bumping to dbt v.1.7.9 |
Thanks for confirming the fix @the4thamigo-uk and @toddy86. I've updated #18990 (comment) with a disclaimer to be on |
@rexledesma There is a hidden gremlin in using the dbt If you have a job which splits the dbt asset materializations into multiple steps (e.g. a job with partitioned and non-partitioned assets), and the parent task initially fails and retries, then some of the downstream dbt tasks can be skipped as Dagster interprets the upstream assets as being skipped. Initial run where some models succeeded, but others failed The dbt retry kicks in and all failed and skipped models are successfully built on the second try Downstream dbt steps are incorrectly skipped, as assets successfully materialized in the retry run are incorrectly labelled as skipped |
@toddy86 Are you yielding Dagster events from the @dbt_assets(manifest=dbt_manifest_path)
def jaffle_shop_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
dbt_invocation = dbt.cli(["build"], context=context)
try:
yield from dbt_invocation.stream()
except:
+ yield from dbt.cli(
["retry"],
manifest=dbt_invocation.manifest,
dagster_dbt_translator=dbt_invocation.dagster_dbt_translator,
target_path=dbt_invocation.target_path,
).stream() |
@rexledesma yep, we are yielding. Full dbt asset code below (we have a thin wrapper around the def build_dbt_assets( # noqa: PLR0913
select: str = "fqn:*",
exclude: str = "",
mode: str = "build",
name: Optional[str] = None,
partitions_def: Optional[PartitionsDefinition] = None,
backfill_policy: Optional[BackfillPolicy] = None,
dbt_retry: bool = False,
) -> list[AssetsDefinition]:
_exclude = exclude + " tag:exclude_dagster"
@dbt_assets(
name=name,
manifest=dbt_manifest_path,
select=select,
exclude=_exclude,
partitions_def=partitions_def,
backfill_policy=backfill_policy,
dagster_dbt_translator=CustomDagsterDbtTranslator(
settings=DagsterDbtTranslatorSettings(enable_asset_checks=True),
),
)
def _assets(
context: OpExecutionContext,
):
dbt_build_args = [mode]
if partitions_def:
dbt_vars = {
"start_date": context.partition_key_range.start,
"end_date": context.partition_key_range.end,
}
dbt_build_args.extend(["--vars", json.dumps(dbt_vars)])
command = dbt_resource.cli(dbt_build_args, context=context)
try:
yield from command.stream()
except: # noqa: E722
if dbt_retry:
yield from dbt_resource.cli(
["retry"],
manifest=command.manifest,
dagster_dbt_translator=command.dagster_dbt_translator,
target_path=command.target_path,
).stream()
else:
raise
return [_assets] |
@toddy86 Ah, I think this is because we need to pass Here's a workaround (under test in #20395) to ensure that the dbt invocation doesn't have the subsetting arguments, but the emitted events still use the + from dataclasses import replace
from dagster import AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets
@dbt_assets(manifest=dbt_manifest_path)
def jaffle_shop_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
dbt_invocation = dbt.cli(["build"], context=context)
try:
yield from dbt_invocation.stream()
except:
+ dbt_retry_invocation = dbt.cli(
+ ["retry"],
+ manifest=dbt_invocation.manifest,
+ dagster_dbt_translator=dbt_invocation.dagster_dbt_translator,
+ target_path=dbt_invocation.target_path,
+ )
+ dbt_retry_invocation = replace(dbt_retry_invocation, context=context)
+
+ yield from dbt_retry_invocation.stream() On my end, I'll see if I can have a fix out so we don't need to call |
…vents (#20395) ## Summary & Motivation Put #18990 (comment) under test. ## How I Tested These Changes pytest
Thanks @rexledesma. I’m on leave for a few weeks, but I’ll give this a try once I’m back. |
…vents (#20395) ## Summary & Motivation Put #18990 (comment) under test. ## How I Tested These Changes pytest
Hi @rexledesma, I am trying to get this to work. I am seeing the following though :
In this case a dbt test failed causing a |
@the4thamigo-uk I assume you're modeling your dbt tests as Dagster asset checks (cc @johannkm) If that's the case, then what happened is:
If you want to do this retry scheme with Dagster asset checks, you'll need to ensure that the failed tests in (1) are not emitted in the event stream. Only the final result from the |
@rexledesma Out of curiosity would calling Something like: @dbt_assets(...)
def dbt_assets(context: AssetExecutionContext):
dbt_command = "build" if context.retry_number == 0 else "retry"
dbt_cli_invocation = dbt.cli([dbt_command], context=context, raise_on_error=True)
try:
yield from dbt_cli_invocation.stream():
except DagsterDbtCliRuntimeError as err:
raise RetryRequested(max_retries=1, seconds_to_wait=300) from err Currently we do @dbt_assets(...)
def dbt_assets(context: AssetExecutionContext):
dbt_cli_invocation = dbt.cli(["build"], context=context, raise_on_error=True)
failed_test_events = {}
try:
for dagster_event in dbt_cli_invocation.stream():
if isinstance(dagster_event, AssetCheckResult) and not dagster_event.passed:
failed_test_events[dagster_event.check_name] = dagster_event
continue
yield dagster_event
if failed_test_events:
# Only some failed tests, if something else failed, it would have already raised before getting here.
raise DagsterDbtCliRuntimeError(description="failed_tests")
except DagsterDbtCliRuntimeError as err:
# Save run_results before retry potentially overwrites it.
build_run_results = dbt_cli_invocation.get_artifact("run_results.json")
dbt_retry_invocation = dbt.cli(
["retry"],
manifest=dbt_cli_invocation.manifest,
dagster_dbt_translator=dbt_cli_invocation.dagster_dbt_translator,
target_path=dbt_cli_invocation.target_path,
)
dbt_retry_invocation = replace(dbt_retry_invocation, context=context)
# (Technically you can add another try/catch and invoke builtin Dagster retry in case the issue is Network related.)
yield from dbt_retry_invocation.stream() Since we deploy on K8s with docker run_results etc. are not overwritten by another run (and I know nowadayas dbt_assets saves to unique target folder anyway) so dbt_retry can be ran just fine even if other dbt commands ran in between the retry. |
Some thoughts that come to mind:
|
Hi @rexledesma. To better understand how this works, please explain if using your snippet, will run retry on the first run of the code. Because I would expect to have it run only when I re-execute my job from failure, not just regularly. I am not sure how dagster parses that function, but from plain python side it looks like retry would be executed always whenever there is an error and I don't understand the use case for this. |
@lokofoko See #18990 (review) on what is happening here. The point is that we are doing a |
@rexledesma Yes I think this is what happened. Can you provide an example of how to do this? Perhaps we need a final canonical example posted in this issue, or ideally in the docs? |
@the4thamigo-uk to materialize AssetObservation instead of AssetCheckResult you have to set the settings property of DagsterDbtTranslator enable_asset_checks to False
|
Thanks for the code above. However this means we never generate
|
Summary & Motivation
Noticed the parameter for retry policies is currently missing from the dbt assets decorator.
How I Tested These Changes
Test suite, added test for the new attribute.