Skip to content

Commit

Permalink
[dagster-powerbi] Use get_asset_spec().key in DagsterPowerBITranslator (
Browse files Browse the repository at this point in the history
#26051)

## Summary & Motivation

Like #26027 but for Power BI

## How I Tested These Changes

Updated unit test with BK
  • Loading branch information
maximearmstrong authored Nov 25, 2024
1 parent d360e33 commit 323d5a3
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,20 +174,15 @@ def get_asset_spec(self, data: PowerBIContentData) -> AssetSpec:
check.assert_never(data.content_type)

def get_dashboard_asset_key(self, data: PowerBIContentData) -> AssetKey:
return AssetKey(
[
"dashboard",
_clean_asset_name(_remove_file_ext(data.properties["displayName"])),
]
)
return self.get_dashboard_spec(data).key

def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec:
dashboard_id = data.properties["id"]
tile_report_ids = [
tile["reportId"] for tile in data.properties["tiles"] if "reportId" in tile
]
report_keys = [
self.get_report_asset_key(self.workspace_data.reports_by_id[report_id])
self.get_report_spec(self.workspace_data.reports_by_id[report_id]).key
for report_id in tile_report_ids
]
url = (
Expand All @@ -196,23 +191,28 @@ def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec:
)

return AssetSpec(
key=self.get_dashboard_asset_key(data),
key=AssetKey(
[
"dashboard",
_clean_asset_name(_remove_file_ext(data.properties["displayName"])),
]
),
deps=report_keys,
metadata={**PowerBIMetadataSet(web_url=MetadataValue.url(url) if url else None)},
tags={**PowerBITagSet(asset_type="dashboard")},
kinds={"powerbi", "dashboard"},
)

def get_report_asset_key(self, data: PowerBIContentData) -> AssetKey:
return AssetKey(["report", _clean_asset_name(data.properties["name"])])
return self.get_report_spec(data).key

def get_report_spec(self, data: PowerBIContentData) -> AssetSpec:
report_id = data.properties["id"]
dataset_id = data.properties.get("datasetId")
dataset_data = (
self.workspace_data.semantic_models_by_id.get(dataset_id) if dataset_id else None
)
dataset_key = self.get_semantic_model_asset_key(dataset_data) if dataset_data else None
dataset_key = self.get_semantic_model_spec(dataset_data).key if dataset_data else None
url = (
data.properties.get("webUrl")
or f"https://app.powerbi.com/groups/{self.workspace_data.workspace_id}/reports/{report_id}"
Expand All @@ -221,7 +221,7 @@ def get_report_spec(self, data: PowerBIContentData) -> AssetSpec:
owner = data.properties.get("createdBy")

return AssetSpec(
key=self.get_report_asset_key(data),
key=AssetKey(["report", _clean_asset_name(data.properties["name"])]),
deps=[dataset_key] if dataset_key else None,
metadata={**PowerBIMetadataSet(web_url=MetadataValue.url(url) if url else None)},
tags={**PowerBITagSet(asset_type="report")},
Expand All @@ -230,13 +230,13 @@ def get_report_spec(self, data: PowerBIContentData) -> AssetSpec:
)

def get_semantic_model_asset_key(self, data: PowerBIContentData) -> AssetKey:
return AssetKey(["semantic_model", _clean_asset_name(data.properties["name"])])
return self.get_semantic_model_spec(data).key

def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec:
dataset_id = data.properties["id"]
source_ids = data.properties.get("sources", [])
source_keys = [
self.get_data_source_asset_key(self.workspace_data.data_sources_by_id[source_id])
self.get_data_source_spec(self.workspace_data.data_sources_by_id[source_id]).key
for source_id in source_ids
]
url = (
Expand Down Expand Up @@ -266,7 +266,7 @@ def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec:
}

return AssetSpec(
key=self.get_semantic_model_asset_key(data),
key=AssetKey(["semantic_model", _clean_asset_name(data.properties["name"])]),
deps=source_keys,
metadata={
**PowerBIMetadataSet(
Expand All @@ -280,20 +280,22 @@ def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec:
)

def get_data_source_asset_key(self, data: PowerBIContentData) -> AssetKey:
return self.get_data_source_spec(data).key

def get_data_source_spec(self, data: PowerBIContentData) -> AssetSpec:
connection_name = (
data.properties["connectionDetails"].get("path")
or data.properties["connectionDetails"].get("url")
or data.properties["connectionDetails"].get("database")
)
if not connection_name:
return AssetKey([_clean_asset_name(data.properties["datasourceId"])])

obj_name = _get_last_filepath_component(urllib.parse.unquote(connection_name))
return AssetKey(path=[_clean_asset_name(obj_name)])
asset_key = AssetKey([_clean_asset_name(data.properties["datasourceId"])])
else:
obj_name = _get_last_filepath_component(urllib.parse.unquote(connection_name))
asset_key = AssetKey(path=[_clean_asset_name(obj_name)])

def get_data_source_spec(self, data: PowerBIContentData) -> AssetSpec:
return AssetSpec(
key=self.get_data_source_asset_key(data),
key=asset_key,
tags={**PowerBITagSet(asset_type="data_source")},
kinds={"powerbi"},
)
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,12 @@ def test_translator_semantic_model_many_tables(second_workspace_data: PowerBIWor


class MyCustomTranslator(DagsterPowerBITranslator):
def get_dashboard_spec(self, dashboard: PowerBIContentData) -> AssetSpec:
return super().get_dashboard_spec(dashboard)._replace(metadata={"custom": "metadata"})
def get_asset_spec(self, data: PowerBIContentData) -> AssetSpec:
default_spec = super().get_asset_spec(data)
return default_spec.replace_attributes(
key=default_spec.key.with_prefix("prefix"),
metadata={**default_spec.metadata, "custom": "metadata"},
)


def test_translator_custom_metadata(workspace_data: PowerBIWorkspaceData) -> None:
Expand All @@ -127,8 +131,9 @@ def test_translator_custom_metadata(workspace_data: PowerBIWorkspaceData) -> Non
translator = MyCustomTranslator(workspace_data)
asset_spec = translator.get_asset_spec(dashboard)

assert asset_spec.metadata == {"custom": "metadata"}
assert asset_spec.key.path == ["dashboard", "Sales_Returns_Sample_v201912"]
assert "custom" in asset_spec.metadata
assert asset_spec.metadata["custom"] == "metadata"
assert asset_spec.key.path == ["prefix", "dashboard", "Sales_Returns_Sample_v201912"]


def test_translator_report_spec_no_dataset(workspace_data: PowerBIWorkspaceData) -> None:
Expand Down

0 comments on commit 323d5a3

Please sign in to comment.