Skip to content

Commit

Permalink
Fix asset check selection for required asset checks (#24781)
Browse files Browse the repository at this point in the history
This is a stopgap measure. This solution would cause undesired behavior
in some edge cases where users are automating asset checks separately
from their associated asset (essentially, the check would double-fire in
some circumstances.

NOCHANGELOG

- [ ] `NEW` _(added new feature or capability)_
- [ ] `BUGFIX` _(fixed a bug)_
- [ ] `DOCS` _(added or updated documentation)_
  • Loading branch information
OwenKephart authored and gibsondan committed Sep 26, 2024
1 parent c91ee94 commit c4bbfb4
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ def _build_run_requests_from_partitions_def_mapping(
tags.update({**partitions_def.get_tags_for_partition_key(partition_key)})

for entity_keys_for_repo in asset_graph.split_entity_keys_by_repository(entity_keys):
asset_check_keys = [k for k in entity_keys_for_repo if isinstance(k, AssetCheckKey)]
run_requests.append(
# Do not call run_request.with_resolved_tags_and_config as the partition key is
# valid and there is no config.
Expand All @@ -282,9 +283,11 @@ def _build_run_requests_from_partitions_def_mapping(
asset_selection=[k for k in entity_keys_for_repo if isinstance(k, AssetKey)],
partition_key=partition_key,
tags=tags,
asset_check_keys=[
k for k in entity_keys_for_repo if isinstance(k, AssetCheckKey)
],
# for now, we explicitly set asset_check_keys to None if none are selected,
# which allows for required asset check keys to be grouped as part of the
# run if any exist, avoiding errors. however, this should actually be handled
# in the AutomationConditionEvaluator class in the future.
asset_check_keys=asset_check_keys if len(asset_check_keys) > 0 else None,
)
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import dagster as dg
from dagster._core.definitions.asset_check_spec import AssetCheckSpec

any_dep_newly_updated = dg.AutomationCondition.any_deps_match(
dg.AutomationCondition.newly_updated() | dg.AutomationCondition.will_be_requested()
)


@dg.asset(
check_specs=[AssetCheckSpec(asset="asset_w_check", name="row_count")],
automation_condition=dg.AutomationCondition.eager(),
)
def asset_w_check() -> dg.MaterializeResult:
return dg.MaterializeResult(check_results=[dg.AssetCheckResult(passed=True)])


defs = dg.Definitions(assets=[asset_w_check])
Original file line number Diff line number Diff line change
Expand Up @@ -291,3 +291,18 @@ def test_cross_location_checks() -> None:
AssetCheckKey(AssetKey("processed_files"), "no_nulls")
}
assert len(runs[1].asset_selection or []) == 0


def test_non_subsettable_check() -> None:
with get_workspace_request_context(
["check_not_subsettable"]
) as context, get_threadpool_executor() as executor:
time = get_current_datetime()
with freeze_time(time):
_execute_ticks(context, executor)

# eager asset materializes
runs = _get_runs_for_latest_ticks(context)
assert len(runs) == 1
assert runs[0].asset_selection == {AssetKey("asset_w_check")}
assert runs[0].asset_check_selection is None

0 comments on commit c4bbfb4

Please sign in to comment.