From 022fd1dba7efde5e419af895ed950ba87e8e4760 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Wed, 20 Nov 2024 13:11:04 -0500 Subject: [PATCH 1/2] [dagster-powerbi] Use get_asset_spec().key in DagsterPowerBITranslator --- .../dagster_powerbi/translator.py | 42 ++++++++++--------- .../dagster_powerbi_tests/test_translator.py | 16 ++++--- 2 files changed, 33 insertions(+), 25 deletions(-) diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py index 3dd2764bfcd59..bd4d683358406 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py @@ -174,12 +174,7 @@ def get_asset_spec(self, data: PowerBIContentData) -> AssetSpec: check.assert_never(data.content_type) def get_dashboard_asset_key(self, data: PowerBIContentData) -> AssetKey: - return AssetKey( - [ - "dashboard", - _clean_asset_name(_remove_file_ext(data.properties["displayName"])), - ] - ) + return self.get_dashboard_spec(data).key def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec: dashboard_id = data.properties["id"] @@ -187,7 +182,7 @@ def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec: tile["reportId"] for tile in data.properties["tiles"] if "reportId" in tile ] report_keys = [ - self.get_report_asset_key(self.workspace_data.reports_by_id[report_id]) + self.get_report_spec(self.workspace_data.reports_by_id[report_id]).key for report_id in tile_report_ids ] url = ( @@ -196,7 +191,12 @@ def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec: ) return AssetSpec( - key=self.get_dashboard_asset_key(data), + key=AssetKey( + [ + "dashboard", + _clean_asset_name(_remove_file_ext(data.properties["displayName"])), + ] + ), deps=report_keys, metadata={**PowerBIMetadataSet(web_url=MetadataValue.url(url) if url else None)}, tags={**PowerBITagSet(asset_type="dashboard")}, @@ -204,7 +204,7 @@ def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec: ) def get_report_asset_key(self, data: PowerBIContentData) -> AssetKey: - return AssetKey(["report", _clean_asset_name(data.properties["name"])]) + return self.get_report_spec(data).key def get_report_spec(self, data: PowerBIContentData) -> AssetSpec: report_id = data.properties["id"] @@ -212,7 +212,7 @@ def get_report_spec(self, data: PowerBIContentData) -> AssetSpec: dataset_data = ( self.workspace_data.semantic_models_by_id.get(dataset_id) if dataset_id else None ) - dataset_key = self.get_semantic_model_asset_key(dataset_data) if dataset_data else None + dataset_key = self.get_semantic_model_spec(dataset_data).key if dataset_data else None url = ( data.properties.get("webUrl") or f"https://app.powerbi.com/groups/{self.workspace_data.workspace_id}/reports/{report_id}" @@ -221,7 +221,7 @@ def get_report_spec(self, data: PowerBIContentData) -> AssetSpec: owner = data.properties.get("createdBy") return AssetSpec( - key=self.get_report_asset_key(data), + key=AssetKey(["report", _clean_asset_name(data.properties["name"])]), deps=[dataset_key] if dataset_key else None, metadata={**PowerBIMetadataSet(web_url=MetadataValue.url(url) if url else None)}, tags={**PowerBITagSet(asset_type="report")}, @@ -230,13 +230,13 @@ def get_report_spec(self, data: PowerBIContentData) -> AssetSpec: ) def get_semantic_model_asset_key(self, data: PowerBIContentData) -> AssetKey: - return AssetKey(["semantic_model", _clean_asset_name(data.properties["name"])]) + return self.get_semantic_model_spec(data).key def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec: dataset_id = data.properties["id"] source_ids = data.properties.get("sources", []) source_keys = [ - self.get_data_source_asset_key(self.workspace_data.data_sources_by_id[source_id]) + self.get_data_source_spec(self.workspace_data.data_sources_by_id[source_id]).key for source_id in source_ids ] url = ( @@ -266,7 +266,7 @@ def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec: } return AssetSpec( - key=self.get_semantic_model_asset_key(data), + key=AssetKey(["semantic_model", _clean_asset_name(data.properties["name"])]), deps=source_keys, metadata={ **PowerBIMetadataSet( @@ -280,20 +280,22 @@ def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec: ) def get_data_source_asset_key(self, data: PowerBIContentData) -> AssetKey: + return self.get_data_source_spec(data).key + + def get_data_source_spec(self, data: PowerBIContentData) -> AssetSpec: connection_name = ( data.properties["connectionDetails"].get("path") or data.properties["connectionDetails"].get("url") or data.properties["connectionDetails"].get("database") ) if not connection_name: - return AssetKey([_clean_asset_name(data.properties["datasourceId"])]) - - obj_name = _get_last_filepath_component(urllib.parse.unquote(connection_name)) - return AssetKey(path=[_clean_asset_name(obj_name)]) + asset_key = AssetKey([_clean_asset_name(data.properties["datasourceId"])]) + else: + obj_name = _get_last_filepath_component(urllib.parse.unquote(connection_name)) + asset_key = AssetKey(path=[_clean_asset_name(obj_name)]) - def get_data_source_spec(self, data: PowerBIContentData) -> AssetSpec: return AssetSpec( - key=self.get_data_source_asset_key(data), + key=asset_key, tags={**PowerBITagSet(asset_type="data_source")}, kinds={"powerbi"}, ) diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py index 83848a40d70bb..f6dfb6131fab9 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py @@ -1,5 +1,5 @@ from dagster._core.definitions.asset_key import AssetKey -from dagster._core.definitions.asset_spec import AssetSpec +from dagster._core.definitions.asset_spec import AssetSpec, replace_attributes from dagster._core.definitions.metadata.metadata_value import MetadataValue from dagster._core.definitions.metadata.table import TableColumn, TableSchema from dagster._core.definitions.tags import build_kind_tag @@ -117,8 +117,13 @@ def test_translator_semantic_model_many_tables(second_workspace_data: PowerBIWor class MyCustomTranslator(DagsterPowerBITranslator): - def get_dashboard_spec(self, dashboard: PowerBIContentData) -> AssetSpec: - return super().get_dashboard_spec(dashboard)._replace(metadata={"custom": "metadata"}) + def get_asset_spec(self, data: PowerBIContentData) -> AssetSpec: + default_spec = super().get_asset_spec(data) + return replace_attributes( + default_spec, + key=default_spec.key.with_prefix("prefix"), + metadata={**default_spec.metadata, "custom": "metadata"}, + ) def test_translator_custom_metadata(workspace_data: PowerBIWorkspaceData) -> None: @@ -127,8 +132,9 @@ def test_translator_custom_metadata(workspace_data: PowerBIWorkspaceData) -> Non translator = MyCustomTranslator(workspace_data) asset_spec = translator.get_asset_spec(dashboard) - assert asset_spec.metadata == {"custom": "metadata"} - assert asset_spec.key.path == ["dashboard", "Sales_Returns_Sample_v201912"] + assert "custom" in asset_spec.metadata + assert asset_spec.metadata["custom"] == "metadata" + assert asset_spec.key.path == ["prefix", "dashboard", "Sales_Returns_Sample_v201912"] def test_translator_report_spec_no_dataset(workspace_data: PowerBIWorkspaceData) -> None: From 9aab0972b5972c425796221185561e7dc4a899ff Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Mon, 25 Nov 2024 14:25:10 -0500 Subject: [PATCH 2/2] Use AssetSpec.replace_attributes --- .../dagster-powerbi/dagster_powerbi_tests/test_translator.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py index f6dfb6131fab9..cb43e33c7f00a 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py @@ -1,5 +1,5 @@ from dagster._core.definitions.asset_key import AssetKey -from dagster._core.definitions.asset_spec import AssetSpec, replace_attributes +from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.metadata.metadata_value import MetadataValue from dagster._core.definitions.metadata.table import TableColumn, TableSchema from dagster._core.definitions.tags import build_kind_tag @@ -119,8 +119,7 @@ def test_translator_semantic_model_many_tables(second_workspace_data: PowerBIWor class MyCustomTranslator(DagsterPowerBITranslator): def get_asset_spec(self, data: PowerBIContentData) -> AssetSpec: default_spec = super().get_asset_spec(data) - return replace_attributes( - default_spec, + return default_spec.replace_attributes( key=default_spec.key.with_prefix("prefix"), metadata={**default_spec.metadata, "custom": "metadata"}, )