Skip to content

Commit

Permalink
Move replace_attributes, merge_attributes onto AssetSpec, publicise (#…
Browse files Browse the repository at this point in the history
…25941)

## Summary

Now that they've baked for a little bit, moves the `replace_attributes`
and `merge_attributes` utilities directly onto `AssetSpec`, and marks
them public:

```python
spec: AssetSpec = ...

updated_spec = spec.replace_attributes(description="A new description.")
```

```python
spec: AssetSpec = ...

# add new python kind, does not affect existing kinds
updated_spec = spec.merge_attributes(kinds={"python"})
```




## How I Tested These Changes

Update unit test.

## Changelog

> Introduced AssetSpec.replace_attributes and AssetSpec.merge_attributes
to easily alter properties of an asset spec.
  • Loading branch information
benpankow authored Nov 25, 2024
1 parent 6fae350 commit 0de7987
Show file tree
Hide file tree
Showing 14 changed files with 143 additions and 151 deletions.
Binary file modified docs/content/api/modules.json.gz
Binary file not shown.
Binary file modified docs/content/api/searchindex.json.gz
Binary file not shown.
Binary file modified docs/content/api/sections.json.gz
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ Ultimately, we would like to kick off a run of `customer_metrics` whenever `load
```python file=../../airlift-federation-tutorial/snippets/federated_execution.py startafter=start_eager endbefore=end_eager
from dagster import AutomationCondition

customer_metrics_dag_asset = replace_attributes(
customer_metrics_dag_asset,
customer_metrics_dag_asset = customer_metrics_dag_asset.replace_attributes(
automation_condition=AutomationCondition.eager(),
)
```
Expand Down Expand Up @@ -77,7 +76,6 @@ from dagster import (
MaterializeResult,
multi_asset,
)
from dagster._core.definitions.asset_spec import replace_attributes
from dagster._core.definitions.declarative_automation.automation_condition import (
AutomationCondition,
)
Expand Down Expand Up @@ -114,16 +112,15 @@ load_customers_dag_asset = next(
)
)
)
customer_metrics_dag_asset = replace_attributes(
next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=metrics_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics",
)
customer_metrics_dag_asset = next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=metrics_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics",
)
# Add a dependency on the load_customers_dag_asset
),
)
# Add a dependency on the load_customers_dag_asset
).replace_attributes(
deps=[load_customers_dag_asset],
automation_condition=AutomationCondition.eager(),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,7 @@ defs = Definitions(
Now that we have both DAGs loaded into Dagster, we can observe the cross-dag lineage between them. To do this, we'll use the `replace_attributes` function to add a dependency from the `load_customers` asset to the `customer_metrics` asset:

```python file=../../airlift-federation-tutorial/snippets/observe.py startafter=start_lineage endbefore=end_lineage
from dagster._core.definitions.asset_spec import replace_attributes

customer_metrics_dag_asset = replace_attributes(
customer_metrics_dag_asset,
customer_metrics_dag_asset = customer_metrics_dag_asset.replace_attributes(
deps=[load_customers],
)
```
Expand Down
Binary file modified docs/next/public/objects.inv
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
MaterializeResult,
multi_asset,
)
from dagster._core.definitions.asset_spec import replace_attributes
from dagster._core.definitions.declarative_automation.automation_condition import (
AutomationCondition,
)
Expand Down Expand Up @@ -42,16 +41,15 @@
)
)
)
customer_metrics_dag_asset = replace_attributes(
next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=metrics_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics",
)
customer_metrics_dag_asset = next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=metrics_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics",
)
# Add a dependency on the load_customers_dag_asset
),
)
# Add a dependency on the load_customers_dag_asset
).replace_attributes(
deps=[load_customers_dag_asset],
automation_condition=AutomationCondition.eager(),
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from dagster import Definitions
from dagster._core.definitions.asset_spec import replace_attributes
from dagster_airlift.core import (
AirflowBasicAuthBackend,
AirflowInstance,
Expand Down Expand Up @@ -33,16 +32,15 @@
)
)
)
customer_metrics_dag_asset = replace_attributes(
next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=metrics_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics",
)
customer_metrics_dag_asset = next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=metrics_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics",
)
# Add a dependency on the load_customers_dag_asset
),
)
# Add a dependency on the load_customers_dag_asset
).replace_attributes(
deps=[load_customers_dag_asset],
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
MaterializeResult,
multi_asset,
)
from dagster._core.definitions.asset_spec import replace_attributes
from dagster._core.definitions.declarative_automation.automation_condition import (
AutomationCondition,
)
Expand Down Expand Up @@ -42,15 +41,14 @@
)
)
)
customer_metrics_dag_asset = replace_attributes(
next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=metrics_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics",
)
customer_metrics_dag_asset = next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=metrics_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics",
)
),
)
).replace_attributes(
deps=[load_customers_dag_asset],
automation_condition=AutomationCondition.eager(),
)
Expand Down Expand Up @@ -88,8 +86,7 @@ def run_customer_metrics() -> MaterializeResult:
# start_eager
from dagster import AutomationCondition

customer_metrics_dag_asset = replace_attributes(
customer_metrics_dag_asset,
customer_metrics_dag_asset = customer_metrics_dag_asset.replace_attributes(
automation_condition=AutomationCondition.eager(),
)
# end_eager
Expand Down
5 changes: 1 addition & 4 deletions examples/airlift-federation-tutorial/snippets/observe.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,7 @@
)

# start_lineage
from dagster._core.definitions.asset_spec import replace_attributes

customer_metrics_dag_asset = replace_attributes(
customer_metrics_dag_asset,
customer_metrics_dag_asset = customer_metrics_dag_asset.replace_attributes(
deps=[load_customers],
)
# end_lineage
Expand Down
5 changes: 2 additions & 3 deletions python_modules/dagster/dagster/_core/definitions/asset_out.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
public,
)
from dagster._core.definitions.asset_dep import AssetDep
from dagster._core.definitions.asset_spec import AssetSpec, replace_attributes
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.backfill_policy import BackfillPolicy
from dagster._core.definitions.declarative_automation.automation_condition import (
Expand Down Expand Up @@ -219,8 +219,7 @@ def to_out(self) -> Out:
def to_spec(
self, key: AssetKey, deps: Sequence[AssetDep], additional_tags: Mapping[str, str] = {}
) -> AssetSpec:
return replace_attributes(
self._spec,
return self._spec.replace_attributes(
key=key,
tags={**additional_tags, **self.tags} if self.tags else additional_tags,
deps=[*self._spec.deps, *deps],
Expand Down
161 changes: 86 additions & 75 deletions python_modules/dagster/dagster/_core/definitions/asset_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,81 +293,92 @@ def with_io_manager_key(self, io_manager_key: str) -> "AssetSpec":
metadata={**self.metadata, SYSTEM_METADATA_KEY_IO_MANAGER_KEY: io_manager_key}
)

@public
def replace_attributes(
self,
*,
key: CoercibleToAssetKey = ...,
deps: Optional[Iterable["CoercibleToAssetDep"]] = ...,
description: Optional[str] = ...,
metadata: Optional[Mapping[str, Any]] = ...,
skippable: bool = ...,
group_name: Optional[str] = ...,
code_version: Optional[str] = ...,
automation_condition: Optional[AutomationCondition] = ...,
owners: Optional[Sequence[str]] = ...,
tags: Optional[Mapping[str, str]] = ...,
kinds: Optional[Set[str]] = ...,
partitions_def: Optional[PartitionsDefinition] = ...,
) -> "AssetSpec":
"""Returns a new AssetSpec with the specified attributes replaced."""
current_tags_without_kinds = {
tag_key: tag_value
for tag_key, tag_value in self.tags.items()
if not tag_key.startswith(KIND_PREFIX)
}
with disable_dagster_warnings():
return self.dagster_internal_init(
key=key if key is not ... else self.key,
deps=deps if deps is not ... else self.deps,
description=description if description is not ... else self.description,
metadata=metadata if metadata is not ... else self.metadata,
skippable=skippable if skippable is not ... else self.skippable,
group_name=group_name if group_name is not ... else self.group_name,
code_version=code_version if code_version is not ... else self.code_version,
freshness_policy=self.freshness_policy,
automation_condition=automation_condition
if automation_condition is not ...
else self.automation_condition,
owners=owners if owners is not ... else self.owners,
tags=tags if tags is not ... else current_tags_without_kinds,
kinds=kinds if kinds is not ... else self.kinds,
partitions_def=partitions_def if partitions_def is not ... else self.partitions_def,
)

def replace_attributes(
spec: AssetSpec,
*,
key: CoercibleToAssetKey = ...,
deps: Optional[Iterable["CoercibleToAssetDep"]] = ...,
description: Optional[str] = ...,
metadata: Optional[Mapping[str, Any]] = ...,
skippable: bool = ...,
group_name: Optional[str] = ...,
code_version: Optional[str] = ...,
freshness_policy: Optional[FreshnessPolicy] = ...,
automation_condition: Optional[AutomationCondition] = ...,
owners: Optional[Sequence[str]] = ...,
tags: Optional[Mapping[str, str]] = ...,
kinds: Optional[Set[str]] = ...,
partitions_def: Optional[PartitionsDefinition] = ...,
) -> "AssetSpec":
"""Returns a new AssetSpec with the specified attributes replaced."""
current_tags_without_kinds = {
tag_key: tag_value
for tag_key, tag_value in spec.tags.items()
if not tag_key.startswith(KIND_PREFIX)
}
with disable_dagster_warnings():
return spec.dagster_internal_init(
key=key if key is not ... else spec.key,
deps=deps if deps is not ... else spec.deps,
description=description if description is not ... else spec.description,
metadata=metadata if metadata is not ... else spec.metadata,
skippable=skippable if skippable is not ... else spec.skippable,
group_name=group_name if group_name is not ... else spec.group_name,
code_version=code_version if code_version is not ... else spec.code_version,
freshness_policy=freshness_policy
if freshness_policy is not ...
else spec.freshness_policy,
automation_condition=automation_condition
if automation_condition is not ...
else spec.automation_condition,
owners=owners if owners is not ... else spec.owners,
tags=tags if tags is not ... else current_tags_without_kinds,
kinds=kinds if kinds is not ... else spec.kinds,
partitions_def=partitions_def if partitions_def is not ... else spec.partitions_def,
)
@public
def merge_attributes(
self,
*,
deps: Iterable["CoercibleToAssetDep"] = ...,
metadata: Mapping[str, Any] = ...,
owners: Sequence[str] = ...,
tags: Mapping[str, str] = ...,
kinds: Set[str] = ...,
) -> "AssetSpec":
"""Returns a new AssetSpec with the specified attributes merged with the current attributes.
Args:
deps (Optional[Iterable[CoercibleToAssetDep]]): A set of asset dependencies to add to
the asset self.
metadata (Optional[Mapping[str, Any]]): A set of metadata to add to the asset self.
Will overwrite any existing metadata with the same key.
owners (Optional[Sequence[str]]): A set of owners to add to the asset self.
tags (Optional[Mapping[str, str]]): A set of tags to add to the asset self.
Will overwrite any existing tags with the same key.
kinds (Optional[Set[str]]): A set of kinds to add to the asset self.
def merge_attributes(
spec: AssetSpec,
*,
deps: Iterable["CoercibleToAssetDep"] = ...,
metadata: Mapping[str, Any] = ...,
owners: Sequence[str] = ...,
tags: Mapping[str, str] = ...,
kinds: Set[str] = ...,
) -> "AssetSpec":
"""Returns a new AssetSpec with the specified attributes merged with the current attributes."""
current_tags_without_kinds = {
tag_key: tag_value
for tag_key, tag_value in spec.tags.items()
if not tag_key.startswith(KIND_PREFIX)
}
with disable_dagster_warnings():
return spec.dagster_internal_init(
key=spec.key,
deps=[*spec.deps, *(deps if deps is not ... else [])],
description=spec.description,
metadata={**spec.metadata, **(metadata if metadata is not ... else {})},
skippable=spec.skippable,
group_name=spec.group_name,
code_version=spec.code_version,
freshness_policy=spec.freshness_policy,
automation_condition=spec.automation_condition,
owners=[*spec.owners, *(owners if owners is not ... else [])],
tags={**current_tags_without_kinds, **(tags if tags is not ... else {})},
kinds={*spec.kinds, *(kinds if kinds is not ... else {})},
auto_materialize_policy=spec.auto_materialize_policy,
partitions_def=spec.partitions_def,
)
Returns:
AssetSpec
"""
current_tags_without_kinds = {
tag_key: tag_value
for tag_key, tag_value in self.tags.items()
if not tag_key.startswith(KIND_PREFIX)
}
with disable_dagster_warnings():
return self.dagster_internal_init(
key=self.key,
deps=[*self.deps, *(deps if deps is not ... else [])],
description=self.description,
metadata={**self.metadata, **(metadata if metadata is not ... else {})},
skippable=self.skippable,
group_name=self.group_name,
code_version=self.code_version,
freshness_policy=self.freshness_policy,
automation_condition=self.automation_condition,
owners=[*self.owners, *(owners if owners is not ... else [])],
tags={**current_tags_without_kinds, **(tags if tags is not ... else {})},
kinds={*self.kinds, *(kinds if kinds is not ... else {})},
auto_materialize_policy=self.auto_materialize_policy,
partitions_def=self.partitions_def,
)
Loading

1 comment on commit 0de7987

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagster-docs ready!

✅ Preview
https://dagster-docs-cgopkie4d-elementl.vercel.app
https://master.dagster.dagster-docs.io

Built with commit 0de7987.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.