diff --git a/docs/content/api/modules.json.gz b/docs/content/api/modules.json.gz index 85a65b37bb5d3..61fd3bb3ca2e4 100644 Binary files a/docs/content/api/modules.json.gz and b/docs/content/api/modules.json.gz differ diff --git a/docs/content/api/searchindex.json.gz b/docs/content/api/searchindex.json.gz index e4595fb122329..5d1768af29bbd 100644 Binary files a/docs/content/api/searchindex.json.gz and b/docs/content/api/searchindex.json.gz differ diff --git a/docs/content/api/sections.json.gz b/docs/content/api/sections.json.gz index 1d6ae3c49784c..255819e998ecb 100644 Binary files a/docs/content/api/sections.json.gz and b/docs/content/api/sections.json.gz differ diff --git a/docs/content/integrations/airlift/federation-tutorial/federated-execution.mdx b/docs/content/integrations/airlift/federation-tutorial/federated-execution.mdx index 8bb08785362d5..0e26552874fbc 100644 --- a/docs/content/integrations/airlift/federation-tutorial/federated-execution.mdx +++ b/docs/content/integrations/airlift/federation-tutorial/federated-execution.mdx @@ -37,8 +37,7 @@ Ultimately, we would like to kick off a run of `customer_metrics` whenever `load ```python file=../../airlift-federation-tutorial/snippets/federated_execution.py startafter=start_eager endbefore=end_eager from dagster import AutomationCondition -customer_metrics_dag_asset = replace_attributes( - customer_metrics_dag_asset, +customer_metrics_dag_asset = customer_metrics_dag_asset.replace_attributes( automation_condition=AutomationCondition.eager(), ) ``` @@ -77,7 +76,6 @@ from dagster import ( MaterializeResult, multi_asset, ) -from dagster._core.definitions.asset_spec import replace_attributes from dagster._core.definitions.declarative_automation.automation_condition import ( AutomationCondition, ) @@ -114,16 +112,15 @@ load_customers_dag_asset = next( ) ) ) -customer_metrics_dag_asset = replace_attributes( - next( - iter( - load_airflow_dag_asset_specs( - airflow_instance=metrics_airflow_instance, - dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics", - ) +customer_metrics_dag_asset = next( + iter( + load_airflow_dag_asset_specs( + airflow_instance=metrics_airflow_instance, + dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics", ) - # Add a dependency on the load_customers_dag_asset - ), + ) + # Add a dependency on the load_customers_dag_asset +).replace_attributes( deps=[load_customers_dag_asset], automation_condition=AutomationCondition.eager(), ) diff --git a/docs/content/integrations/airlift/federation-tutorial/observe.mdx b/docs/content/integrations/airlift/federation-tutorial/observe.mdx index 416d56093a32d..1b0915ea89349 100644 --- a/docs/content/integrations/airlift/federation-tutorial/observe.mdx +++ b/docs/content/integrations/airlift/federation-tutorial/observe.mdx @@ -190,10 +190,7 @@ defs = Definitions( Now that we have both DAGs loaded into Dagster, we can observe the cross-dag lineage between them. To do this, we'll use the `replace_attributes` function to add a dependency from the `load_customers` asset to the `customer_metrics` asset: ```python file=../../airlift-federation-tutorial/snippets/observe.py startafter=start_lineage endbefore=end_lineage -from dagster._core.definitions.asset_spec import replace_attributes - -customer_metrics_dag_asset = replace_attributes( - customer_metrics_dag_asset, +customer_metrics_dag_asset = customer_metrics_dag_asset.replace_attributes( deps=[load_customers], ) ``` diff --git a/docs/next/public/objects.inv b/docs/next/public/objects.inv index 13c20d8258739..f2d55b4609921 100644 Binary files a/docs/next/public/objects.inv and b/docs/next/public/objects.inv differ diff --git a/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/executable_and_da.py b/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/executable_and_da.py index ff91d7544f4cf..f2403005a7a92 100644 --- a/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/executable_and_da.py +++ b/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/executable_and_da.py @@ -5,7 +5,6 @@ MaterializeResult, multi_asset, ) -from dagster._core.definitions.asset_spec import replace_attributes from dagster._core.definitions.declarative_automation.automation_condition import ( AutomationCondition, ) @@ -42,16 +41,15 @@ ) ) ) -customer_metrics_dag_asset = replace_attributes( - next( - iter( - load_airflow_dag_asset_specs( - airflow_instance=metrics_airflow_instance, - dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics", - ) +customer_metrics_dag_asset = next( + iter( + load_airflow_dag_asset_specs( + airflow_instance=metrics_airflow_instance, + dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics", ) - # Add a dependency on the load_customers_dag_asset - ), + ) + # Add a dependency on the load_customers_dag_asset +).replace_attributes( deps=[load_customers_dag_asset], automation_condition=AutomationCondition.eager(), ) diff --git a/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/observe_with_deps.py b/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/observe_with_deps.py index 69a2888046618..bb655b6b3191c 100644 --- a/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/observe_with_deps.py +++ b/examples/airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/observe_with_deps.py @@ -1,5 +1,4 @@ from dagster import Definitions -from dagster._core.definitions.asset_spec import replace_attributes from dagster_airlift.core import ( AirflowBasicAuthBackend, AirflowInstance, @@ -33,16 +32,15 @@ ) ) ) -customer_metrics_dag_asset = replace_attributes( - next( - iter( - load_airflow_dag_asset_specs( - airflow_instance=metrics_airflow_instance, - dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics", - ) +customer_metrics_dag_asset = next( + iter( + load_airflow_dag_asset_specs( + airflow_instance=metrics_airflow_instance, + dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics", ) - # Add a dependency on the load_customers_dag_asset - ), + ) + # Add a dependency on the load_customers_dag_asset +).replace_attributes( deps=[load_customers_dag_asset], ) diff --git a/examples/airlift-federation-tutorial/snippets/federated_execution.py b/examples/airlift-federation-tutorial/snippets/federated_execution.py index d28e126679b94..4bbf24d8387b8 100644 --- a/examples/airlift-federation-tutorial/snippets/federated_execution.py +++ b/examples/airlift-federation-tutorial/snippets/federated_execution.py @@ -5,7 +5,6 @@ MaterializeResult, multi_asset, ) -from dagster._core.definitions.asset_spec import replace_attributes from dagster._core.definitions.declarative_automation.automation_condition import ( AutomationCondition, ) @@ -42,15 +41,14 @@ ) ) ) -customer_metrics_dag_asset = replace_attributes( - next( - iter( - load_airflow_dag_asset_specs( - airflow_instance=metrics_airflow_instance, - dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics", - ) +customer_metrics_dag_asset = next( + iter( + load_airflow_dag_asset_specs( + airflow_instance=metrics_airflow_instance, + dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics", ) - ), + ) +).replace_attributes( deps=[load_customers_dag_asset], automation_condition=AutomationCondition.eager(), ) @@ -88,8 +86,7 @@ def run_customer_metrics() -> MaterializeResult: # start_eager from dagster import AutomationCondition -customer_metrics_dag_asset = replace_attributes( - customer_metrics_dag_asset, +customer_metrics_dag_asset = customer_metrics_dag_asset.replace_attributes( automation_condition=AutomationCondition.eager(), ) # end_eager diff --git a/examples/airlift-federation-tutorial/snippets/observe.py b/examples/airlift-federation-tutorial/snippets/observe.py index 8af35b8350b2d..8a09c5953b2b3 100644 --- a/examples/airlift-federation-tutorial/snippets/observe.py +++ b/examples/airlift-federation-tutorial/snippets/observe.py @@ -72,10 +72,7 @@ ) # start_lineage -from dagster._core.definitions.asset_spec import replace_attributes - -customer_metrics_dag_asset = replace_attributes( - customer_metrics_dag_asset, +customer_metrics_dag_asset = customer_metrics_dag_asset.replace_attributes( deps=[load_customers], ) # end_lineage diff --git a/python_modules/dagster/dagster/_core/definitions/asset_out.py b/python_modules/dagster/dagster/_core/definitions/asset_out.py index 835bc644ac8d0..050e1d9e03ea8 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_out.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_out.py @@ -8,7 +8,7 @@ public, ) from dagster._core.definitions.asset_dep import AssetDep -from dagster._core.definitions.asset_spec import AssetSpec, replace_attributes +from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy from dagster._core.definitions.backfill_policy import BackfillPolicy from dagster._core.definitions.declarative_automation.automation_condition import ( @@ -219,8 +219,7 @@ def to_out(self) -> Out: def to_spec( self, key: AssetKey, deps: Sequence[AssetDep], additional_tags: Mapping[str, str] = {} ) -> AssetSpec: - return replace_attributes( - self._spec, + return self._spec.replace_attributes( key=key, tags={**additional_tags, **self.tags} if self.tags else additional_tags, deps=[*self._spec.deps, *deps], diff --git a/python_modules/dagster/dagster/_core/definitions/asset_spec.py b/python_modules/dagster/dagster/_core/definitions/asset_spec.py index 7b6bde4f1ac48..b8214bb2c9de5 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_spec.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_spec.py @@ -293,81 +293,92 @@ def with_io_manager_key(self, io_manager_key: str) -> "AssetSpec": metadata={**self.metadata, SYSTEM_METADATA_KEY_IO_MANAGER_KEY: io_manager_key} ) + @public + def replace_attributes( + self, + *, + key: CoercibleToAssetKey = ..., + deps: Optional[Iterable["CoercibleToAssetDep"]] = ..., + description: Optional[str] = ..., + metadata: Optional[Mapping[str, Any]] = ..., + skippable: bool = ..., + group_name: Optional[str] = ..., + code_version: Optional[str] = ..., + automation_condition: Optional[AutomationCondition] = ..., + owners: Optional[Sequence[str]] = ..., + tags: Optional[Mapping[str, str]] = ..., + kinds: Optional[Set[str]] = ..., + partitions_def: Optional[PartitionsDefinition] = ..., + ) -> "AssetSpec": + """Returns a new AssetSpec with the specified attributes replaced.""" + current_tags_without_kinds = { + tag_key: tag_value + for tag_key, tag_value in self.tags.items() + if not tag_key.startswith(KIND_PREFIX) + } + with disable_dagster_warnings(): + return self.dagster_internal_init( + key=key if key is not ... else self.key, + deps=deps if deps is not ... else self.deps, + description=description if description is not ... else self.description, + metadata=metadata if metadata is not ... else self.metadata, + skippable=skippable if skippable is not ... else self.skippable, + group_name=group_name if group_name is not ... else self.group_name, + code_version=code_version if code_version is not ... else self.code_version, + freshness_policy=self.freshness_policy, + automation_condition=automation_condition + if automation_condition is not ... + else self.automation_condition, + owners=owners if owners is not ... else self.owners, + tags=tags if tags is not ... else current_tags_without_kinds, + kinds=kinds if kinds is not ... else self.kinds, + partitions_def=partitions_def if partitions_def is not ... else self.partitions_def, + ) -def replace_attributes( - spec: AssetSpec, - *, - key: CoercibleToAssetKey = ..., - deps: Optional[Iterable["CoercibleToAssetDep"]] = ..., - description: Optional[str] = ..., - metadata: Optional[Mapping[str, Any]] = ..., - skippable: bool = ..., - group_name: Optional[str] = ..., - code_version: Optional[str] = ..., - freshness_policy: Optional[FreshnessPolicy] = ..., - automation_condition: Optional[AutomationCondition] = ..., - owners: Optional[Sequence[str]] = ..., - tags: Optional[Mapping[str, str]] = ..., - kinds: Optional[Set[str]] = ..., - partitions_def: Optional[PartitionsDefinition] = ..., -) -> "AssetSpec": - """Returns a new AssetSpec with the specified attributes replaced.""" - current_tags_without_kinds = { - tag_key: tag_value - for tag_key, tag_value in spec.tags.items() - if not tag_key.startswith(KIND_PREFIX) - } - with disable_dagster_warnings(): - return spec.dagster_internal_init( - key=key if key is not ... else spec.key, - deps=deps if deps is not ... else spec.deps, - description=description if description is not ... else spec.description, - metadata=metadata if metadata is not ... else spec.metadata, - skippable=skippable if skippable is not ... else spec.skippable, - group_name=group_name if group_name is not ... else spec.group_name, - code_version=code_version if code_version is not ... else spec.code_version, - freshness_policy=freshness_policy - if freshness_policy is not ... - else spec.freshness_policy, - automation_condition=automation_condition - if automation_condition is not ... - else spec.automation_condition, - owners=owners if owners is not ... else spec.owners, - tags=tags if tags is not ... else current_tags_without_kinds, - kinds=kinds if kinds is not ... else spec.kinds, - partitions_def=partitions_def if partitions_def is not ... else spec.partitions_def, - ) + @public + def merge_attributes( + self, + *, + deps: Iterable["CoercibleToAssetDep"] = ..., + metadata: Mapping[str, Any] = ..., + owners: Sequence[str] = ..., + tags: Mapping[str, str] = ..., + kinds: Set[str] = ..., + ) -> "AssetSpec": + """Returns a new AssetSpec with the specified attributes merged with the current attributes. + Args: + deps (Optional[Iterable[CoercibleToAssetDep]]): A set of asset dependencies to add to + the asset self. + metadata (Optional[Mapping[str, Any]]): A set of metadata to add to the asset self. + Will overwrite any existing metadata with the same key. + owners (Optional[Sequence[str]]): A set of owners to add to the asset self. + tags (Optional[Mapping[str, str]]): A set of tags to add to the asset self. + Will overwrite any existing tags with the same key. + kinds (Optional[Set[str]]): A set of kinds to add to the asset self. -def merge_attributes( - spec: AssetSpec, - *, - deps: Iterable["CoercibleToAssetDep"] = ..., - metadata: Mapping[str, Any] = ..., - owners: Sequence[str] = ..., - tags: Mapping[str, str] = ..., - kinds: Set[str] = ..., -) -> "AssetSpec": - """Returns a new AssetSpec with the specified attributes merged with the current attributes.""" - current_tags_without_kinds = { - tag_key: tag_value - for tag_key, tag_value in spec.tags.items() - if not tag_key.startswith(KIND_PREFIX) - } - with disable_dagster_warnings(): - return spec.dagster_internal_init( - key=spec.key, - deps=[*spec.deps, *(deps if deps is not ... else [])], - description=spec.description, - metadata={**spec.metadata, **(metadata if metadata is not ... else {})}, - skippable=spec.skippable, - group_name=spec.group_name, - code_version=spec.code_version, - freshness_policy=spec.freshness_policy, - automation_condition=spec.automation_condition, - owners=[*spec.owners, *(owners if owners is not ... else [])], - tags={**current_tags_without_kinds, **(tags if tags is not ... else {})}, - kinds={*spec.kinds, *(kinds if kinds is not ... else {})}, - auto_materialize_policy=spec.auto_materialize_policy, - partitions_def=spec.partitions_def, - ) + Returns: + AssetSpec + """ + current_tags_without_kinds = { + tag_key: tag_value + for tag_key, tag_value in self.tags.items() + if not tag_key.startswith(KIND_PREFIX) + } + with disable_dagster_warnings(): + return self.dagster_internal_init( + key=self.key, + deps=[*self.deps, *(deps if deps is not ... else [])], + description=self.description, + metadata={**self.metadata, **(metadata if metadata is not ... else {})}, + skippable=self.skippable, + group_name=self.group_name, + code_version=self.code_version, + freshness_policy=self.freshness_policy, + automation_condition=self.automation_condition, + owners=[*self.owners, *(owners if owners is not ... else [])], + tags={**current_tags_without_kinds, **(tags if tags is not ... else {})}, + kinds={*self.kinds, *(kinds if kinds is not ... else {})}, + auto_materialize_policy=self.auto_materialize_policy, + partitions_def=self.partitions_def, + ) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_asset_spec.py b/python_modules/dagster/dagster_tests/definitions_tests/test_asset_spec.py index bd8b8e129aad0..71d036e590db9 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_asset_spec.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_asset_spec.py @@ -2,7 +2,6 @@ from dagster import AssetSpec, AutoMaterializePolicy, AutomationCondition from dagster._core.definitions.asset_dep import AssetDep from dagster._core.definitions.asset_key import AssetKey -from dagster._core.definitions.asset_spec import merge_attributes, replace_attributes from dagster._core.errors import DagsterInvalidDefinitionError, DagsterInvariantViolationError @@ -46,13 +45,13 @@ def test_replace_attributes_basic() -> None: spec = AssetSpec(key="foo") assert spec.key == AssetKey("foo") - new_spec = replace_attributes(spec, key="bar") + new_spec = spec.replace_attributes(key="bar") assert new_spec.key == AssetKey("bar") spec_with_metadata = AssetSpec(key="foo", metadata={"foo": "bar"}) assert spec_with_metadata.metadata == {"foo": "bar"} - spec_with_replace_metadata = replace_attributes(spec_with_metadata, metadata={"bar": "baz"}) + spec_with_replace_metadata = spec_with_metadata.replace_attributes(metadata={"bar": "baz"}) assert spec_with_replace_metadata.metadata == {"bar": "baz"} @@ -61,19 +60,19 @@ def test_replace_attributes_kinds() -> None: assert spec.kinds == {"foo"} assert spec.tags == {"a": "b", "dagster/kind/foo": ""} - new_spec = replace_attributes(spec, kinds={"bar"}, tags={"c": "d"}) + new_spec = spec.replace_attributes(kinds={"bar"}, tags={"c": "d"}) assert new_spec.kinds == {"bar"} assert new_spec.tags == {"c": "d", "dagster/kind/bar": ""} with pytest.raises(DagsterInvalidDefinitionError): - replace_attributes(spec, kinds={"a", "b", "c", "d", "e"}) + spec.replace_attributes(kinds={"a", "b", "c", "d", "e"}) def test_replace_attributes_deps_coercion() -> None: spec = AssetSpec(key="foo", deps={AssetKey("bar")}) assert spec.deps == [AssetDep(AssetKey("bar"))] - new_spec = replace_attributes(spec, deps={AssetKey("baz")}) + new_spec = spec.replace_attributes(deps={AssetKey("baz")}) assert new_spec.deps == [AssetDep(AssetKey("baz"))] @@ -81,10 +80,10 @@ def test_replace_attributes_group() -> None: spec = AssetSpec(key="foo", group_name="group1") assert spec.group_name == "group1" - new_spec = replace_attributes(spec, group_name="group2") + new_spec = spec.replace_attributes(group_name="group2") assert new_spec.group_name == "group2" - new_spec_no_group = replace_attributes(spec, group_name=None) + new_spec_no_group = spec.replace_attributes(group_name=None) assert new_spec_no_group.group_name is None @@ -92,14 +91,14 @@ def test_merge_attributes_metadata() -> None: spec = AssetSpec(key="foo") assert spec.key == AssetKey("foo") - new_spec = merge_attributes(spec, metadata={"bar": "baz"}) + new_spec = spec.merge_attributes(metadata={"bar": "baz"}) assert new_spec.key == AssetKey("foo") assert new_spec.metadata == {"bar": "baz"} - spec_new_meta_key = merge_attributes(new_spec, metadata={"baz": "qux"}) + spec_new_meta_key = new_spec.merge_attributes(metadata={"baz": "qux"}) assert spec_new_meta_key.metadata == {"bar": "baz", "baz": "qux"} - spec_replace_meta = merge_attributes(spec_new_meta_key, metadata={"bar": "qux"}) + spec_replace_meta = spec_new_meta_key.merge_attributes(metadata={"bar": "qux"}) assert spec_replace_meta.metadata == {"bar": "qux", "baz": "qux"} @@ -107,14 +106,14 @@ def test_merge_attributes_tags() -> None: spec = AssetSpec(key="foo") assert spec.key == AssetKey("foo") - new_spec = merge_attributes(spec, tags={"bar": "baz"}) + new_spec = spec.merge_attributes(tags={"bar": "baz"}) assert new_spec.key == AssetKey("foo") assert new_spec.tags == {"bar": "baz"} - spec_new_tags_key = merge_attributes(new_spec, tags={"baz": "qux"}) + spec_new_tags_key = new_spec.merge_attributes(tags={"baz": "qux"}) assert spec_new_tags_key.tags == {"bar": "baz", "baz": "qux"} - spec_replace_tags = merge_attributes(spec_new_tags_key, tags={"bar": "qux"}) + spec_replace_tags = spec_new_tags_key.merge_attributes(tags={"bar": "qux"}) assert spec_replace_tags.tags == {"bar": "qux", "baz": "qux"} @@ -122,24 +121,24 @@ def test_merge_attributes_owners() -> None: spec = AssetSpec(key="foo") assert spec.key == AssetKey("foo") - new_spec = merge_attributes(spec, owners=["owner1@dagsterlabs.com"]) + new_spec = spec.merge_attributes(owners=["owner1@dagsterlabs.com"]) assert new_spec.key == AssetKey("foo") assert new_spec.owners == ["owner1@dagsterlabs.com"] - spec_new_owner = merge_attributes(new_spec, owners=["owner2@dagsterlabs.com"]) + spec_new_owner = new_spec.merge_attributes(owners=["owner2@dagsterlabs.com"]) assert spec_new_owner.owners == ["owner1@dagsterlabs.com", "owner2@dagsterlabs.com"] with pytest.raises(DagsterInvalidDefinitionError): - merge_attributes(spec_new_owner, owners=["notvalid"]) + spec_new_owner.merge_attributes(owners=["notvalid"]) def test_merge_attributes_deps() -> None: spec = AssetSpec(key="foo") assert spec.key == AssetKey("foo") - new_spec = merge_attributes(spec, deps={AssetKey("bar")}) + new_spec = spec.merge_attributes(deps={AssetKey("bar")}) assert new_spec.key == AssetKey("foo") assert new_spec.deps == [AssetDep(AssetKey("bar"))] - spec_new_dep = merge_attributes(new_spec, deps={AssetKey("baz")}) + spec_new_dep = new_spec.merge_attributes(deps={AssetKey("baz")}) assert spec_new_dep.deps == [AssetDep(AssetKey("bar")), AssetDep(AssetKey("baz"))] diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_load_from_instance.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_load_from_instance.py index 3e19d14d61cba..ba47d39a56fd1 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_load_from_instance.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_load_from_instance.py @@ -12,7 +12,7 @@ asset, io_manager, ) -from dagster._core.definitions.asset_spec import AssetSpec, replace_attributes +from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.materialize import materialize from dagster._core.definitions.metadata.metadata_value import MetadataValue from dagster._core.definitions.metadata.table import TableColumn, TableSchema @@ -271,8 +271,7 @@ def downstream_asset(xyz): class CustomDagsterFivetranTranslator(DagsterFivetranTranslator): def get_asset_spec(self, props: FivetranConnectorTableProps) -> AssetSpec: asset_spec = super().get_asset_spec(props) - return replace_attributes( - asset_spec, + return asset_spec.replace_attributes( key=asset_spec.key.with_prefix("my_prefix"), metadata={"foo": "bar", **asset_spec.metadata}, )