From 8b9efafe489f18839fe09e6b523a2b7db97ca0b8 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Mon, 25 Nov 2024 18:15:04 -0500 Subject: [PATCH] Move materialization logic to method --- .../dagster_fivetran/resources.py | 73 ++++++++++--------- 1 file changed, 40 insertions(+), 33 deletions(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index fb139928037b9..1581f110edc73 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -941,22 +941,11 @@ def load_asset_specs( workspace=self, dagster_fivetran_translator=dagster_fivetran_translator.__class__ ) - def sync_and_poll( - self, context: Optional[Union[OpExecutionContext, AssetExecutionContext]] = None + def _generate_materialization( + self, + fivetran_output: FivetranOutput, + dagster_fivetran_translator: DagsterFivetranTranslator, ): - # TODO: Add docstrings - assets_def = context.assets_def - dagster_fivetran_translator = get_translator_from_fivetran_assets(assets_def) - - connector_id = next( - check.not_none(FivetranMetadataSet.extract(spec.metadata).connector_id) - for spec in assets_def.specs - ) - - client = self.get_client() - fivetran_output = client.sync_and_poll( - connector_id=connector_id, - ) connector = FivetranConnector.from_connector_details( connector_details=fivetran_output.connector_details ) @@ -964,7 +953,6 @@ def sync_and_poll( schema_config_details=fivetran_output.schema_config ) - materialized_asset_keys = set() for schema_source_name, schema in schema_config.schemas.items(): if not schema.enabled: continue @@ -988,7 +976,7 @@ def sync_and_poll( ) ).key - materialization = AssetMaterialization( + yield AssetMaterialization( asset_key=asset_key, description=f"Table generated via Fivetran sync: {schema.name}.{table.name}", metadata={ @@ -1004,23 +992,42 @@ def sync_and_poll( "table_source_name": table_source_name, }, ) - if not materialization: - continue - # scan through all tables actually created, if it was expected then emit an Output. - # otherwise, emit a runtime AssetMaterialization - if materialization.asset_key in context.selected_asset_keys: - yield Output( - value=None, - output_name=materialization.asset_key.to_python_identifier(), - metadata=materialization.metadata, - ) - materialized_asset_keys.add(materialization.asset_key) - else: - context.log.warning( - f"An unexpected asset was materialized: {materialization.asset_key}" - ) - yield materialization + def sync_and_poll( + self, context: Optional[Union[OpExecutionContext, AssetExecutionContext]] = None + ): + # TODO: Add docstrings + assets_def = context.assets_def + dagster_fivetran_translator = get_translator_from_fivetran_assets(assets_def) + + connector_id = next( + check.not_none(FivetranMetadataSet.extract(spec.metadata).connector_id) + for spec in assets_def.specs + ) + + client = self.get_client() + fivetran_output = client.sync_and_poll( + connector_id=connector_id, + ) + + materialized_asset_keys = set() + for materialization in self._generate_materialization( + fivetran_output=fivetran_output, dagster_fivetran_translator=dagster_fivetran_translator + ): + # scan through all tables actually created, if it was expected then emit an Output. + # otherwise, emit a runtime AssetMaterialization + if materialization.asset_key in context.selected_asset_keys: + yield Output( + value=None, + output_name=materialization.asset_key.to_python_identifier(), + metadata=materialization.metadata, + ) + materialized_asset_keys.add(materialization.asset_key) + else: + context.log.warning( + f"An unexpected asset was materialized: {materialization.asset_key}" + ) + yield materialization unmaterialized_asset_keys = context.selected_asset_keys - materialized_asset_keys if unmaterialized_asset_keys: