Skip to content

Commit

Permalink
Use translator instance in asset decorator, factory and cached method
Browse files Browse the repository at this point in the history
  • Loading branch information
maximearmstrong committed Nov 25, 2024
1 parent de5b336 commit 1cc89f8
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def fivetran_assets(
workspace: FivetranWorkspace,
name: Optional[str] = None,
group_name: Optional[str] = None,
dagster_fivetran_translator: Type[DagsterFivetranTranslator] = DagsterFivetranTranslator,
dagster_fivetran_translator: Optional[DagsterFivetranTranslator] = None,
) -> Callable[[Callable[..., Any]], AssetsDefinition]:
"""Create a definition for how to sync the tables of a given Fivetran connector.
Expand All @@ -24,7 +24,7 @@ def fivetran_assets(
workspace (FivetranWorkspace): The Fivetran workspace to fetch assets from.
name (Optional[str], optional): The name of the op.
group_name (Optional[str], optional): The name of the asset group.
dagster_fivetran_translator (Type[DagsterFivetranTranslator]): The translator to use
dagster_fivetran_translator (Optional[DagsterFivetranTranslator], optional): The translator to use
to convert Fivetran content into :py:class:`dagster.AssetSpec`.
Defaults to :py:class:`DagsterFivetranTranslator`.
Expand Down Expand Up @@ -89,7 +89,7 @@ def get_asset_spec(self, props: FivetranConnectorTableProps) -> dg.AssetSpec:
name="fivetran_connector_id",
group_name="fivetran_connector_id",
workspace=fivetran_workspace,
dagster_fivetran_translator=CustomDagsterFivetranTranslator,
dagster_fivetran_translator=CustomDagsterFivetranTranslator(),
)
def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: FivetranWorkspace):
yield from fivetran.sync_and_poll(context=context)
Expand All @@ -107,7 +107,7 @@ def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: Fivet
specs=[
spec
for spec in workspace.load_asset_specs(
dagster_fivetran_translator=dagster_fivetran_translator
dagster_fivetran_translator=dagster_fivetran_translator or DagsterFivetranTranslator()
)
if FivetranMetadataSet.extract(spec.metadata).connector_id == connector_id
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -740,14 +740,15 @@ def load_assets_from_fivetran_instance(
def build_fivetran_assets_definitions(
*,
workspace: FivetranWorkspace,
dagster_fivetran_translator: Type[DagsterFivetranTranslator] = DagsterFivetranTranslator,
dagster_fivetran_translator: Optional[DagsterFivetranTranslator] = None,
) -> Sequence[AssetsDefinition]:
"""The list of AssetsDefinition for all connectors in the Fivetran workspace.
Args:
workspace (FivetranWorkspace): The Fivetran workspace to fetch assets from.
dagster_fivetran_translator (Type[DagsterFivetranTranslator]): The translator to use
to convert Fivetran content into AssetSpecs. Defaults to DagsterFivetranTranslator.
dagster_fivetran_translator (Optional[DagsterFivetranTranslator], optional): The translator to use
to convert Fivetran content into :py:class:`dagster.AssetSpec`.
Defaults to :py:class:`DagsterFivetranTranslator`.
Returns:
List[AssetsDefinition]: The list of AssetsDefinition for all connectors in the Fivetran workspace.
Expand Down Expand Up @@ -803,7 +804,7 @@ def get_asset_spec(self, props: FivetranConnectorTableProps) -> dg.AssetSpec:
fivetran_assets = build_fivetran_assets_definitions(
workspace=workspace,
dagster_fivetran_translator=CustomDagsterFivetranTranslator
dagster_fivetran_translator=CustomDagsterFivetranTranslator()
)
defs = dg.Definitions(
Expand All @@ -812,6 +813,8 @@ def get_asset_spec(self, props: FivetranConnectorTableProps) -> dg.AssetSpec:
)
"""
dagster_fivetran_translator = dagster_fivetran_translator or DagsterFivetranTranslator()

all_asset_specs = workspace.load_asset_specs(
dagster_fivetran_translator=dagster_fivetran_translator
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -895,13 +895,14 @@ def fetch_fivetran_workspace_data(
@cached_method
def load_asset_specs(
self,
dagster_fivetran_translator: Type[DagsterFivetranTranslator] = DagsterFivetranTranslator,
dagster_fivetran_translator: Optional[DagsterFivetranTranslator] = None,
) -> Sequence[AssetSpec]:
"""Returns a list of AssetSpecs representing the Fivetran content in the workspace.
Args:
dagster_fivetran_translator (Type[DagsterFivetranTranslator]): The translator to use
to convert Fivetran content into AssetSpecs. Defaults to DagsterFivetranTranslator.
dagster_fivetran_translator (Optional[DagsterFivetranTranslator], optional): The translator to use
to convert Fivetran content into :py:class:`dagster.AssetSpec`.
Defaults to :py:class:`DagsterFivetranTranslator`.
Returns:
List[AssetSpec]: The set of assets representing the Fivetran content in the workspace.
Expand All @@ -923,8 +924,10 @@ def load_asset_specs(
fivetran_specs = fivetran_workspace.load_asset_specs()
defs = dg.Definitions(assets=[*fivetran_specs], resources={"fivetran": fivetran_workspace}
"""
dagster_fivetran_translator = dagster_fivetran_translator or DagsterFivetranTranslator()

return load_fivetran_asset_specs(
workspace=self, dagster_fivetran_translator=dagster_fivetran_translator
workspace=self, dagster_fivetran_translator=dagster_fivetran_translator.__class__
)

def sync_and_poll(
Expand Down

0 comments on commit 1cc89f8

Please sign in to comment.