Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-powerbi] Use get_asset_spec().key in DagsterPowerBITranslator #26051

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,20 +174,15 @@ def get_asset_spec(self, data: PowerBIContentData) -> AssetSpec:
check.assert_never(data.content_type)

def get_dashboard_asset_key(self, data: PowerBIContentData) -> AssetKey:
return AssetKey(
[
"dashboard",
_clean_asset_name(_remove_file_ext(data.properties["displayName"])),
]
)
return self.get_dashboard_spec(data).key

def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec:
dashboard_id = data.properties["id"]
tile_report_ids = [
tile["reportId"] for tile in data.properties["tiles"] if "reportId" in tile
]
report_keys = [
self.get_report_asset_key(self.workspace_data.reports_by_id[report_id])
self.get_report_spec(self.workspace_data.reports_by_id[report_id]).key
for report_id in tile_report_ids
]
url = (
Expand All @@ -196,23 +191,28 @@ def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec:
)

return AssetSpec(
key=self.get_dashboard_asset_key(data),
key=AssetKey(
[
"dashboard",
_clean_asset_name(_remove_file_ext(data.properties["displayName"])),
]
),
deps=report_keys,
metadata={**PowerBIMetadataSet(web_url=MetadataValue.url(url) if url else None)},
tags={**PowerBITagSet(asset_type="dashboard")},
kinds={"powerbi", "dashboard"},
)

def get_report_asset_key(self, data: PowerBIContentData) -> AssetKey:
return AssetKey(["report", _clean_asset_name(data.properties["name"])])
return self.get_report_spec(data).key

def get_report_spec(self, data: PowerBIContentData) -> AssetSpec:
report_id = data.properties["id"]
dataset_id = data.properties.get("datasetId")
dataset_data = (
self.workspace_data.semantic_models_by_id.get(dataset_id) if dataset_id else None
)
dataset_key = self.get_semantic_model_asset_key(dataset_data) if dataset_data else None
dataset_key = self.get_semantic_model_spec(dataset_data).key if dataset_data else None
url = (
data.properties.get("webUrl")
or f"https://app.powerbi.com/groups/{self.workspace_data.workspace_id}/reports/{report_id}"
Expand All @@ -221,7 +221,7 @@ def get_report_spec(self, data: PowerBIContentData) -> AssetSpec:
owner = data.properties.get("createdBy")

return AssetSpec(
key=self.get_report_asset_key(data),
key=AssetKey(["report", _clean_asset_name(data.properties["name"])]),
deps=[dataset_key] if dataset_key else None,
metadata={**PowerBIMetadataSet(web_url=MetadataValue.url(url) if url else None)},
tags={**PowerBITagSet(asset_type="report")},
Expand All @@ -230,13 +230,13 @@ def get_report_spec(self, data: PowerBIContentData) -> AssetSpec:
)

def get_semantic_model_asset_key(self, data: PowerBIContentData) -> AssetKey:
return AssetKey(["semantic_model", _clean_asset_name(data.properties["name"])])
return self.get_semantic_model_spec(data).key

def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec:
dataset_id = data.properties["id"]
source_ids = data.properties.get("sources", [])
source_keys = [
self.get_data_source_asset_key(self.workspace_data.data_sources_by_id[source_id])
self.get_data_source_spec(self.workspace_data.data_sources_by_id[source_id]).key
for source_id in source_ids
]
url = (
Expand Down Expand Up @@ -266,7 +266,7 @@ def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec:
}

return AssetSpec(
key=self.get_semantic_model_asset_key(data),
key=AssetKey(["semantic_model", _clean_asset_name(data.properties["name"])]),
deps=source_keys,
metadata={
**PowerBIMetadataSet(
Expand All @@ -280,20 +280,22 @@ def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec:
)

def get_data_source_asset_key(self, data: PowerBIContentData) -> AssetKey:
return self.get_data_source_spec(data).key

def get_data_source_spec(self, data: PowerBIContentData) -> AssetSpec:
connection_name = (
data.properties["connectionDetails"].get("path")
or data.properties["connectionDetails"].get("url")
or data.properties["connectionDetails"].get("database")
)
if not connection_name:
return AssetKey([_clean_asset_name(data.properties["datasourceId"])])

obj_name = _get_last_filepath_component(urllib.parse.unquote(connection_name))
return AssetKey(path=[_clean_asset_name(obj_name)])
asset_key = AssetKey([_clean_asset_name(data.properties["datasourceId"])])
else:
obj_name = _get_last_filepath_component(urllib.parse.unquote(connection_name))
asset_key = AssetKey(path=[_clean_asset_name(obj_name)])

def get_data_source_spec(self, data: PowerBIContentData) -> AssetSpec:
return AssetSpec(
key=self.get_data_source_asset_key(data),
key=asset_key,
tags={**PowerBITagSet(asset_type="data_source")},
kinds={"powerbi"},
)
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,12 @@ def test_translator_semantic_model_many_tables(second_workspace_data: PowerBIWor


class MyCustomTranslator(DagsterPowerBITranslator):
def get_dashboard_spec(self, dashboard: PowerBIContentData) -> AssetSpec:
return super().get_dashboard_spec(dashboard)._replace(metadata={"custom": "metadata"})
def get_asset_spec(self, data: PowerBIContentData) -> AssetSpec:
default_spec = super().get_asset_spec(data)
return default_spec.replace_attributes(
key=default_spec.key.with_prefix("prefix"),
metadata={**default_spec.metadata, "custom": "metadata"},
)


def test_translator_custom_metadata(workspace_data: PowerBIWorkspaceData) -> None:
Expand All @@ -127,8 +131,9 @@ def test_translator_custom_metadata(workspace_data: PowerBIWorkspaceData) -> Non
translator = MyCustomTranslator(workspace_data)
asset_spec = translator.get_asset_spec(dashboard)

assert asset_spec.metadata == {"custom": "metadata"}
assert asset_spec.key.path == ["dashboard", "Sales_Returns_Sample_v201912"]
assert "custom" in asset_spec.metadata
assert asset_spec.metadata["custom"] == "metadata"
assert asset_spec.key.path == ["prefix", "dashboard", "Sales_Returns_Sample_v201912"]


def test_translator_report_spec_no_dataset(workspace_data: PowerBIWorkspaceData) -> None:
Expand Down