diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_defs.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_defs.py index 0443de1bdc1e1..a21783690767c 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_defs.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_defs.py @@ -122,9 +122,9 @@ def _build_fivetran_assets( tracked_asset_keys = { table: AssetKey([*asset_key_prefix, *table.split(".")]) if not translator_instance or not connection_metadata - else translator_instance.get_asset_key( + else translator_instance.get_asset_spec( FivetranConnectorTableProps(table=table, **connection_metadata._asdict()) - ) + ).key for table in destination_tables } user_facing_asset_keys = table_to_asset_key_map or tracked_asset_keys diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py index c8bc2e05a8419..4b93bb83fe588 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py @@ -64,10 +64,6 @@ class DagsterFivetranTranslator: Subclass this class to implement custom logic for each type of Fivetran content. """ - def get_asset_key(self, props: FivetranConnectorTableProps) -> AssetKey: - """Get the AssetKey for a table synced by a Fivetran connector.""" - return AssetKey(props.table.split(".")) - def get_asset_spec(self, props: FivetranConnectorTableProps) -> AssetSpec: """Get the AssetSpec for a table synced by a Fivetran connector.""" schema_name, table_name = props.table.split(".") @@ -91,7 +87,7 @@ def get_asset_spec(self, props: FivetranConnectorTableProps) -> AssetSpec: ) return AssetSpec( - key=self.get_asset_key(props), + key=AssetKey(props.table.split(".")), metadata=metadata, kinds={"fivetran", *({props.service} if props.service else set())}, ) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_load_from_instance.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_load_from_instance.py index b6bc8e5e06811..3e19d14d61cba 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_load_from_instance.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_load_from_instance.py @@ -12,7 +12,7 @@ asset, io_manager, ) -from dagster._core.definitions.asset_spec import AssetSpec +from dagster._core.definitions.asset_spec import AssetSpec, replace_attributes from dagster._core.definitions.materialize import materialize from dagster._core.definitions.metadata.metadata_value import MetadataValue from dagster._core.definitions.metadata.table import TableColumn, TableSchema @@ -269,12 +269,13 @@ def downstream_asset(xyz): class CustomDagsterFivetranTranslator(DagsterFivetranTranslator): - def get_asset_key(self, props: FivetranConnectorTableProps) -> AssetKey: - return super().get_asset_key(props).with_prefix("my_prefix") - def get_asset_spec(self, props: FivetranConnectorTableProps) -> AssetSpec: asset_spec = super().get_asset_spec(props) - return asset_spec._replace(metadata={"foo": "bar", **asset_spec.metadata}) + return replace_attributes( + asset_spec, + key=asset_spec.key.with_prefix("my_prefix"), + metadata={"foo": "bar", **asset_spec.metadata}, + ) @responses.activate