Skip to content

Commit

Permalink
[dagster-sigma] Use get_asset_spec().key in DagsterSigmaTranslator
Browse files Browse the repository at this point in the history
  • Loading branch information
maximearmstrong committed Nov 20, 2024
1 parent db070bc commit baecfec
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def organization_data(self) -> SigmaOrganizationData:

def get_asset_key(self, data: Union[SigmaDataset, SigmaWorkbook]) -> AssetKey:
"""Get the AssetKey for a Sigma object, such as a workbook or dataset."""
return AssetKey(_coerce_input_to_valid_name(data.properties["name"]))
return self.get_asset_spec(data).key

def get_asset_spec(self, data: Union[SigmaDataset, SigmaWorkbook]) -> AssetSpec:
"""Get the AssetSpec for a Sigma object, such as a workbook or dataset."""
Expand All @@ -119,7 +119,7 @@ def get_asset_spec(self, data: Union[SigmaDataset, SigmaWorkbook]) -> AssetSpec:
]

return AssetSpec(
key=self.get_asset_key(data),
key=AssetKey(_coerce_input_to_valid_name(data.properties["name"])),
metadata=metadata,
kinds={"sigma", "workbook"},
deps={
Expand Down Expand Up @@ -148,7 +148,7 @@ def get_asset_spec(self, data: Union[SigmaDataset, SigmaWorkbook]) -> AssetSpec:
}

return AssetSpec(
key=self.get_asset_key(data),
key=AssetKey(_coerce_input_to_valid_name(data.properties["name"])),
metadata=metadata,
kinds={"sigma", "dataset"},
deps={
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dagster import EnvVar, define_asset_job
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_spec import AssetSpec, replace_attributes
from dagster._core.definitions.definitions_class import Definitions
from dagster._utils.env import environ
from dagster_sigma import (
Expand All @@ -16,8 +16,12 @@
fake_token = "fake_token"

class MyCoolTranslator(DagsterSigmaTranslator):
def get_asset_key(self, data) -> AssetKey:
return super().get_asset_key(data).with_prefix("my_prefix")
def get_asset_spec(self, data) -> AssetSpec:
spec = super().get_asset_spec(data)
return replace_attributes(
spec,
key=spec.key.with_prefix("my_prefix"),
)

resource = SigmaOrganization(
base_url=SigmaBaseUrl.AWS_US,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import Union

from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.asset_spec import AssetSpec, replace_attributes
from dagster._core.definitions.metadata.table import TableColumn, TableSchema
from dagster._core.definitions.tags import build_kind_tag_key
from dagster_sigma.translator import (
Expand Down Expand Up @@ -88,13 +90,12 @@ def test_dataset_translation() -> None:

def test_dataset_translation_custom_translator() -> None:
class MyCustomTranslator(DagsterSigmaTranslator):
def get_asset_key(self, data: SigmaDataset) -> AssetKey:
return super().get_asset_key(data).with_prefix("sigma")

def get_asset_spec(self, data: SigmaDataset) -> AssetSpec:
def get_asset_spec(self, data: Union[SigmaDataset, SigmaWorkbook]) -> AssetSpec:
spec = super().get_asset_spec(data)
if isinstance(data, SigmaDataset):
return spec._replace(description="Custom description")
spec = replace_attributes(
spec, key=spec.key.with_prefix("sigma"), description="Custom description"
)
return spec

sample_dataset = SigmaDataset(
Expand Down

0 comments on commit baecfec

Please sign in to comment.