From 35ae56ddd76ba5b76ecf86271b1142939ac58fb2 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Fri, 15 Nov 2024 15:42:09 -0500 Subject: [PATCH 1/6] [11/n][dagster-fivetran] Implement materialization method in FivetranWorkspace --- .../dagster_fivetran/resources.py | 146 +++++++++++++++++- 1 file changed, 144 insertions(+), 2 deletions(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index effbc8432d286..2727339f05fce 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -2,6 +2,7 @@ import logging import os import time +from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timedelta from functools import partial from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Union @@ -10,11 +11,13 @@ import requests from dagster import ( AssetExecutionContext, + AssetMaterialization, Definitions, Failure, InitResourceContext, MetadataValue, OpExecutionContext, + Output, __version__, _check as check, get_dagster_logger, @@ -24,7 +27,11 @@ from dagster._config.pythonic_config import ConfigurableResource from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader +from dagster._core.definitions.metadata.metadata_set import TableMetadataSet +from dagster._core.definitions.metadata.table import TableColumn, TableSchema from dagster._core.definitions.resource_definition import dagster_maintained_resource +from dagster._core.errors import DagsterStepOutputNotFoundError +from dagster._core.utils import imap from dagster._record import record from dagster._utils.cached_method import cached_method from dagster._vendored.dateutil import parser @@ -37,11 +44,16 @@ FivetranConnector, FivetranConnectorScheduleType, FivetranDestination, + FivetranMetadataSet, FivetranSchemaConfig, FivetranWorkspaceData, ) from dagster_fivetran.types import FivetranOutput -from dagster_fivetran.utils import get_fivetran_connector_url, get_fivetran_logs_url +from dagster_fivetran.utils import ( + generate_materializations, + get_fivetran_connector_url, + get_fivetran_logs_url, +) FIVETRAN_API_BASE = "https://api.fivetran.com" FIVETRAN_API_VERSION = "v1" @@ -49,9 +61,14 @@ FIVETRAN_API_VERSION_PATH = f"{FIVETRAN_API_VERSION}/" FIVETRAN_CONNECTOR_PATH = f"{FIVETRAN_CONNECTOR_ENDPOINT}/" +DAGSTER_FIVETRAN_FETCH_COLUMN_METADATA_KEY = "dagster-fivetran/fetch_column_metadata" +DAGSTER_FIVETRAN_INFER_MISSING_TABLES_METADATA_KEY = "dagster-fivetran/infer_missing_tables" + # default polling interval (in seconds) DEFAULT_POLL_INTERVAL = 10 +DEFAULT_MAX_THREADPOOL_WORKERS = 10 + FIVETRAN_RECONSTRUCTION_METADATA_KEY_PREFIX = "dagster-fivetran/reconstruction_metadata" @@ -577,6 +594,25 @@ def get_schema_config_for_connector(self, connector_id: str) -> Mapping[str, Any """ return self._make_request("GET", f"connectors/{connector_id}/schemas") + def get_columns_for_table( + self, connector_id: str, schema_name: str, table_name: str + ) -> Mapping[str, Any]: + """Fetches the connector schema config for a given connector from the Fivetran API. + + Args: + connector_id (str): The Fivetran Connector ID. + schema_name (str): The Fivetran Schema name. + table_name (str): The Fivetran Table name. + + Returns: + Dict[str, Any]: Parsed json data from the response to this request. + """ + self._make_connector_request( + method="GET", + endpoint=f"{connector_id}/schemas/{schema_name}/tables/{table_name}/columns", + ) + return self._make_request("GET", f"connectors/{connector_id}/schemas") + def get_destination_details(self, destination_id: str) -> Mapping[str, Any]: """Fetches details about a given destination from the Fivetran API. @@ -832,6 +868,7 @@ class FivetranWorkspace(ConfigurableResource): _client: FivetranClient = PrivateAttr(default=None) + @cached_method def get_client(self) -> FivetranClient: return FivetranClient( api_key=self.api_key, @@ -929,10 +966,115 @@ def load_asset_specs( dagster_fivetran_translator=dagster_fivetran_translator or DagsterFivetranTranslator(), ) + def _fetch_and_attach_col_metadata( + self, connector_id: str, materialization: AssetMaterialization + ) -> AssetMaterialization: + """Subroutine to fetch column metadata for a given table from the Fivetran API and attach it to the + materialization. + """ + try: + schema_source_name = materialization.metadata["schema_source_name"].value + table_source_name = materialization.metadata["table_source_name"].value + + table_conn_data = self.get_client().get_columns_for_table( + connector_id=connector_id, + schema_name=schema_source_name, + table_name=table_source_name, + ) + columns = check.dict_elem(table_conn_data, "columns") + table_columns = sorted( + [ + TableColumn(name=col["name_in_destination"], type="") + for col in columns.values() + if "name_in_destination" in col and col.get("enabled") + ], + key=lambda col: col.name, + ) + return materialization.with_metadata( + { + **materialization.metadata, + **TableMetadataSet(column_schema=TableSchema(table_columns)), + } + ) + except Exception as e: + self._log.warning( + "An error occurred while fetching column metadata for table %s", + f"Exception: {e}", + exc_info=True, + ) + return materialization + def sync_and_poll( self, context: Optional[Union[OpExecutionContext, AssetExecutionContext]] = None ): - raise NotImplementedError() + # TODO: Add docstrings + assets_def = context.assets_def + + # TODO: Add op tags to fivetran_assets decorator and build_fivetran_assets_definitions factory + fetch_column_metadata = context.op.tags.get(DAGSTER_FIVETRAN_FETCH_COLUMN_METADATA_KEY) + infer_missing_tables = context.op.tags.get( + DAGSTER_FIVETRAN_INFER_MISSING_TABLES_METADATA_KEY + ) + + connector_id = next( + check.not_none(FivetranMetadataSet.extract(spec.metadata).connector_id) + for spec in assets_def.specs + ) + + client = self.get_client() + fivetran_output = client.sync_and_poll( + connector_id=connector_id, + ) + + materialized_asset_keys = set() + + _map_fn: Callable[[AssetMaterialization], AssetMaterialization] = ( + lambda materialization: self._fetch_and_attach_col_metadata( + connector_id, materialization + ) + if fetch_column_metadata + else materialization + ) + with ThreadPoolExecutor( + max_workers=DEFAULT_MAX_THREADPOOL_WORKERS, + thread_name_prefix=f"fivetran_{connector_id}", + ) as executor: + for materialization in imap( + executor=executor, + # TODO: Create new asset materialization fn with assets and not asset key prefix + iterable=generate_materializations( + fivetran_output, + asset_key_prefix=[], + ), + func=_map_fn, + ): + # scan through all tables actually created, if it was expected then emit an Output. + # otherwise, emit a runtime AssetMaterialization + if materialization.asset_key in context.selected_asset_keys: + yield Output( + value=None, + output_name=materialization.asset_key.to_python_identifier(), + metadata=materialization.metadata, + ) + materialized_asset_keys.add(materialization.asset_key) + + else: + yield materialization + + unmaterialized_asset_keys = context.selected_asset_keys - materialized_asset_keys + if infer_missing_tables: + for asset_key in unmaterialized_asset_keys: + yield Output(value=None, output_name=asset_key.to_python_identifier()) + else: + if unmaterialized_asset_keys: + asset_key = next(iter(unmaterialized_asset_keys)) + output_name = "_".join(asset_key.path) + raise DagsterStepOutputNotFoundError( + f"Core compute for {context.op_def.name} did not return an output for" + f' non-optional output "{output_name}".', + step_key=context.get_step_execution_context().step.key, + output_name=output_name, + ) @experimental From 8713ef849e8c493da79f14685103ffaa86fe2c6a Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Fri, 22 Nov 2024 19:21:45 -0500 Subject: [PATCH 2/6] Remove fetch_column_metadata and infer_missing_tables --- .../dagster_fivetran/resources.py | 142 +++--------------- 1 file changed, 23 insertions(+), 119 deletions(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 2727339f05fce..c80a7e463a2b5 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -2,7 +2,7 @@ import logging import os import time -from concurrent.futures import ThreadPoolExecutor + from datetime import datetime, timedelta from functools import partial from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Union @@ -11,7 +11,7 @@ import requests from dagster import ( AssetExecutionContext, - AssetMaterialization, + Definitions, Failure, InitResourceContext, @@ -27,11 +27,9 @@ from dagster._config.pythonic_config import ConfigurableResource from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader -from dagster._core.definitions.metadata.metadata_set import TableMetadataSet -from dagster._core.definitions.metadata.table import TableColumn, TableSchema + from dagster._core.definitions.resource_definition import dagster_maintained_resource -from dagster._core.errors import DagsterStepOutputNotFoundError -from dagster._core.utils import imap + from dagster._record import record from dagster._utils.cached_method import cached_method from dagster._vendored.dateutil import parser @@ -61,14 +59,9 @@ FIVETRAN_API_VERSION_PATH = f"{FIVETRAN_API_VERSION}/" FIVETRAN_CONNECTOR_PATH = f"{FIVETRAN_CONNECTOR_ENDPOINT}/" -DAGSTER_FIVETRAN_FETCH_COLUMN_METADATA_KEY = "dagster-fivetran/fetch_column_metadata" -DAGSTER_FIVETRAN_INFER_MISSING_TABLES_METADATA_KEY = "dagster-fivetran/infer_missing_tables" - # default polling interval (in seconds) DEFAULT_POLL_INTERVAL = 10 -DEFAULT_MAX_THREADPOOL_WORKERS = 10 - FIVETRAN_RECONSTRUCTION_METADATA_KEY_PREFIX = "dagster-fivetran/reconstruction_metadata" @@ -594,25 +587,6 @@ def get_schema_config_for_connector(self, connector_id: str) -> Mapping[str, Any """ return self._make_request("GET", f"connectors/{connector_id}/schemas") - def get_columns_for_table( - self, connector_id: str, schema_name: str, table_name: str - ) -> Mapping[str, Any]: - """Fetches the connector schema config for a given connector from the Fivetran API. - - Args: - connector_id (str): The Fivetran Connector ID. - schema_name (str): The Fivetran Schema name. - table_name (str): The Fivetran Table name. - - Returns: - Dict[str, Any]: Parsed json data from the response to this request. - """ - self._make_connector_request( - method="GET", - endpoint=f"{connector_id}/schemas/{schema_name}/tables/{table_name}/columns", - ) - return self._make_request("GET", f"connectors/{connector_id}/schemas") - def get_destination_details(self, destination_id: str) -> Mapping[str, Any]: """Fetches details about a given destination from the Fivetran API. @@ -966,56 +940,12 @@ def load_asset_specs( dagster_fivetran_translator=dagster_fivetran_translator or DagsterFivetranTranslator(), ) - def _fetch_and_attach_col_metadata( - self, connector_id: str, materialization: AssetMaterialization - ) -> AssetMaterialization: - """Subroutine to fetch column metadata for a given table from the Fivetran API and attach it to the - materialization. - """ - try: - schema_source_name = materialization.metadata["schema_source_name"].value - table_source_name = materialization.metadata["table_source_name"].value - - table_conn_data = self.get_client().get_columns_for_table( - connector_id=connector_id, - schema_name=schema_source_name, - table_name=table_source_name, - ) - columns = check.dict_elem(table_conn_data, "columns") - table_columns = sorted( - [ - TableColumn(name=col["name_in_destination"], type="") - for col in columns.values() - if "name_in_destination" in col and col.get("enabled") - ], - key=lambda col: col.name, - ) - return materialization.with_metadata( - { - **materialization.metadata, - **TableMetadataSet(column_schema=TableSchema(table_columns)), - } - ) - except Exception as e: - self._log.warning( - "An error occurred while fetching column metadata for table %s", - f"Exception: {e}", - exc_info=True, - ) - return materialization - def sync_and_poll( self, context: Optional[Union[OpExecutionContext, AssetExecutionContext]] = None ): # TODO: Add docstrings assets_def = context.assets_def - # TODO: Add op tags to fivetran_assets decorator and build_fivetran_assets_definitions factory - fetch_column_metadata = context.op.tags.get(DAGSTER_FIVETRAN_FETCH_COLUMN_METADATA_KEY) - infer_missing_tables = context.op.tags.get( - DAGSTER_FIVETRAN_INFER_MISSING_TABLES_METADATA_KEY - ) - connector_id = next( check.not_none(FivetranMetadataSet.extract(spec.metadata).connector_id) for spec in assets_def.specs @@ -1028,53 +958,27 @@ def sync_and_poll( materialized_asset_keys = set() - _map_fn: Callable[[AssetMaterialization], AssetMaterialization] = ( - lambda materialization: self._fetch_and_attach_col_metadata( - connector_id, materialization - ) - if fetch_column_metadata - else materialization - ) - with ThreadPoolExecutor( - max_workers=DEFAULT_MAX_THREADPOOL_WORKERS, - thread_name_prefix=f"fivetran_{connector_id}", - ) as executor: - for materialization in imap( - executor=executor, - # TODO: Create new asset materialization fn with assets and not asset key prefix - iterable=generate_materializations( - fivetran_output, - asset_key_prefix=[], - ), - func=_map_fn, - ): - # scan through all tables actually created, if it was expected then emit an Output. - # otherwise, emit a runtime AssetMaterialization - if materialization.asset_key in context.selected_asset_keys: - yield Output( - value=None, - output_name=materialization.asset_key.to_python_identifier(), - metadata=materialization.metadata, - ) - materialized_asset_keys.add(materialization.asset_key) - - else: - yield materialization + # TODO: Create new asset materialization fn with assets and not asset key prefix + for materialization in generate_materializations( + fivetran_output, + asset_key_prefix=[], + ): + # scan through all tables actually created, if it was expected then emit an Output. + # otherwise, emit a runtime AssetMaterialization + if materialization.asset_key in context.selected_asset_keys: + yield Output( + value=None, + output_name=materialization.asset_key.to_python_identifier(), + metadata=materialization.metadata, + ) + materialized_asset_keys.add(materialization.asset_key) + + else: + yield materialization unmaterialized_asset_keys = context.selected_asset_keys - materialized_asset_keys - if infer_missing_tables: - for asset_key in unmaterialized_asset_keys: - yield Output(value=None, output_name=asset_key.to_python_identifier()) - else: - if unmaterialized_asset_keys: - asset_key = next(iter(unmaterialized_asset_keys)) - output_name = "_".join(asset_key.path) - raise DagsterStepOutputNotFoundError( - f"Core compute for {context.op_def.name} did not return an output for" - f' non-optional output "{output_name}".', - step_key=context.get_step_execution_context().step.key, - output_name=output_name, - ) + if unmaterialized_asset_keys: + context.log.warning(f"Assets were not materialized: {unmaterialized_asset_keys}") @experimental From bf81d05adea5e7c7f54d6aaff3ccf3e517f4de0b Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Fri, 22 Nov 2024 19:23:09 -0500 Subject: [PATCH 3/6] Lint --- .../libraries/dagster-fivetran/dagster_fivetran/resources.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index c80a7e463a2b5..1289c1ccf0d70 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -2,7 +2,6 @@ import logging import os import time - from datetime import datetime, timedelta from functools import partial from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Union @@ -11,7 +10,6 @@ import requests from dagster import ( AssetExecutionContext, - Definitions, Failure, InitResourceContext, @@ -27,9 +25,7 @@ from dagster._config.pythonic_config import ConfigurableResource from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader - from dagster._core.definitions.resource_definition import dagster_maintained_resource - from dagster._record import record from dagster._utils.cached_method import cached_method from dagster._vendored.dateutil import parser From 65059ef2bc2d1ae3c1621b41d0ca9213b4038182 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Mon, 25 Nov 2024 18:01:18 -0500 Subject: [PATCH 4/6] Update materialization logic --- .../dagster_fivetran/asset_decorator.py | 5 +- .../dagster_fivetran/resources.py | 81 +++++++++++++++---- .../dagster_fivetran/utils.py | 29 ++++++- 3 files changed, 96 insertions(+), 19 deletions(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_decorator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_decorator.py index 86c8929421dac..e926912504953 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_decorator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_decorator.py @@ -5,6 +5,7 @@ from dagster_fivetran.resources import FivetranWorkspace from dagster_fivetran.translator import DagsterFivetranTranslator, FivetranMetadataSet +from dagster_fivetran.utils import DAGSTER_FIVETRAN_TRANSLATOR_METADATA_KEY @experimental @@ -105,7 +106,9 @@ def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: Fivet group_name=group_name, can_subset=True, specs=[ - spec + spec.merge_attributes( + metadata={DAGSTER_FIVETRAN_TRANSLATOR_METADATA_KEY: dagster_fivetran_translator} + ) for spec in workspace.load_asset_specs( dagster_fivetran_translator=dagster_fivetran_translator or DagsterFivetranTranslator() diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 1289c1ccf0d70..cfcd273553ee7 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -10,6 +10,7 @@ import requests from dagster import ( AssetExecutionContext, + AssetMaterialization, Definitions, Failure, InitResourceContext, @@ -37,6 +38,7 @@ DagsterFivetranTranslator, FivetranConnector, FivetranConnectorScheduleType, + FivetranConnectorTableProps, FivetranDestination, FivetranMetadataSet, FivetranSchemaConfig, @@ -44,9 +46,11 @@ ) from dagster_fivetran.types import FivetranOutput from dagster_fivetran.utils import ( - generate_materializations, + get_fivetran_connector_table_name, get_fivetran_connector_url, get_fivetran_logs_url, + get_translator_from_fivetran_assets, + metadata_for_table, ) FIVETRAN_API_BASE = "https://api.fivetran.com" @@ -941,6 +945,7 @@ def sync_and_poll( ): # TODO: Add docstrings assets_def = context.assets_def + dagster_fivetran_translator = get_translator_from_fivetran_assets(assets_def) connector_id = next( check.not_none(FivetranMetadataSet.extract(spec.metadata).connector_id) @@ -951,26 +956,70 @@ def sync_and_poll( fivetran_output = client.sync_and_poll( connector_id=connector_id, ) + connector = FivetranConnector.from_connector_details( + connector_details=fivetran_output.connector_details + ) + schema_config = FivetranSchemaConfig.from_schema_config_details( + schema_config_details=fivetran_output.schema_config + ) materialized_asset_keys = set() + for schema_source_name, schema in schema_config.schemas.items(): + if not schema.enabled: + continue - # TODO: Create new asset materialization fn with assets and not asset key prefix - for materialization in generate_materializations( - fivetran_output, - asset_key_prefix=[], - ): - # scan through all tables actually created, if it was expected then emit an Output. - # otherwise, emit a runtime AssetMaterialization - if materialization.asset_key in context.selected_asset_keys: - yield Output( - value=None, - output_name=materialization.asset_key.to_python_identifier(), - metadata=materialization.metadata, + for table_source_name, table in schema.tables.items(): + if not table.enabled: + continue + + asset_key = dagster_fivetran_translator.get_asset_spec( + props=FivetranConnectorTableProps( + table=get_fivetran_connector_table_name( + schema_name=schema.name_in_destination, + table_name=table.name_in_destination, + ), + connector_id=connector.id, + name=connector.name, + connector_url=connector.url, + schema_config=schema_config, + database=None, + service=None, + ) + ).key + + materialization = AssetMaterialization( + asset_key=asset_key, + description=f"Table generated via Fivetran sync: {schema.name}.{table.name}", + metadata={ + **metadata_for_table( + table, + get_fivetran_connector_url(fivetran_output.connector_details), + include_column_info=True, + database=None, + schema=schema.name, + table=table.name, + ), + "schema_source_name": schema_source_name, + "table_source_name": table_source_name, + }, ) - materialized_asset_keys.add(materialization.asset_key) + if not materialization: + continue - else: - yield materialization + # scan through all tables actually created, if it was expected then emit an Output. + # otherwise, emit a runtime AssetMaterialization + if materialization.asset_key in context.selected_asset_keys: + yield Output( + value=None, + output_name=materialization.asset_key.to_python_identifier(), + metadata=materialization.metadata, + ) + materialized_asset_keys.add(materialization.asset_key) + else: + context.log.warning( + f"An unexpected asset was materialized: {materialization.asset_key}" + ) + yield materialization unmaterialized_asset_keys = context.selected_asset_keys - materialized_asset_keys if unmaterialized_asset_keys: diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/utils.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/utils.py index 62589c4deb5b6..13c67b76135b0 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/utils.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/utils.py @@ -1,13 +1,23 @@ -from typing import Any, Dict, Iterator, Mapping, Optional, Sequence +from typing import TYPE_CHECKING, Any, Dict, Iterator, Mapping, Optional, Sequence import dagster._check as check -from dagster import AssetMaterialization, MetadataValue +from dagster import ( + AssetMaterialization, + AssetsDefinition, + DagsterInvariantViolationError, + MetadataValue, +) from dagster._core.definitions.metadata import RawMetadataMapping from dagster._core.definitions.metadata.metadata_set import TableMetadataSet from dagster._core.definitions.metadata.table import TableColumn, TableSchema from dagster_fivetran.types import FivetranOutput +if TYPE_CHECKING: + from dagster_fivetran import DagsterFivetranTranslator + +DAGSTER_FIVETRAN_TRANSLATOR_METADATA_KEY = "dagster-fivetran/dagster_fivetran_translator" + def get_fivetran_connector_url(connector_details: Mapping[str, Any]) -> str: service = connector_details["service"] @@ -23,6 +33,21 @@ def get_fivetran_connector_table_name(schema_name: str, table_name: str) -> str: return f"{schema_name}.{table_name}" +def get_translator_from_fivetran_assets( + fivetran_assets: AssetsDefinition, +) -> "DagsterFivetranTranslator": + metadata_by_key = fivetran_assets.metadata_by_key or {} + first_asset_key = next(iter(fivetran_assets.metadata_by_key.keys())) + first_metadata = metadata_by_key.get(first_asset_key, {}) + dagster_fivetran_translator = first_metadata.get(DAGSTER_FIVETRAN_TRANSLATOR_METADATA_KEY) + if dagster_fivetran_translator is None: + raise DagsterInvariantViolationError( + f"Expected to find fivetran translator metadata on asset {first_asset_key.to_user_string()}," + " but did not. Did you pass in assets that weren't generated by @fivetran_assets?" + ) + return dagster_fivetran_translator + + def metadata_for_table( table_data: Mapping[str, Any], connector_url: str, From 4982b3b2afe999b360024ed761ba8317e414259b Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Mon, 25 Nov 2024 18:15:04 -0500 Subject: [PATCH 5/6] Move materialization logic to method --- .../dagster_fivetran/resources.py | 73 ++++++++++--------- 1 file changed, 40 insertions(+), 33 deletions(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index cfcd273553ee7..9af774eb076b7 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -940,22 +940,11 @@ def load_asset_specs( dagster_fivetran_translator=dagster_fivetran_translator or DagsterFivetranTranslator(), ) - def sync_and_poll( - self, context: Optional[Union[OpExecutionContext, AssetExecutionContext]] = None + def _generate_materialization( + self, + fivetran_output: FivetranOutput, + dagster_fivetran_translator: DagsterFivetranTranslator, ): - # TODO: Add docstrings - assets_def = context.assets_def - dagster_fivetran_translator = get_translator_from_fivetran_assets(assets_def) - - connector_id = next( - check.not_none(FivetranMetadataSet.extract(spec.metadata).connector_id) - for spec in assets_def.specs - ) - - client = self.get_client() - fivetran_output = client.sync_and_poll( - connector_id=connector_id, - ) connector = FivetranConnector.from_connector_details( connector_details=fivetran_output.connector_details ) @@ -963,7 +952,6 @@ def sync_and_poll( schema_config_details=fivetran_output.schema_config ) - materialized_asset_keys = set() for schema_source_name, schema in schema_config.schemas.items(): if not schema.enabled: continue @@ -987,7 +975,7 @@ def sync_and_poll( ) ).key - materialization = AssetMaterialization( + yield AssetMaterialization( asset_key=asset_key, description=f"Table generated via Fivetran sync: {schema.name}.{table.name}", metadata={ @@ -1003,23 +991,42 @@ def sync_and_poll( "table_source_name": table_source_name, }, ) - if not materialization: - continue - # scan through all tables actually created, if it was expected then emit an Output. - # otherwise, emit a runtime AssetMaterialization - if materialization.asset_key in context.selected_asset_keys: - yield Output( - value=None, - output_name=materialization.asset_key.to_python_identifier(), - metadata=materialization.metadata, - ) - materialized_asset_keys.add(materialization.asset_key) - else: - context.log.warning( - f"An unexpected asset was materialized: {materialization.asset_key}" - ) - yield materialization + def sync_and_poll( + self, context: Optional[Union[OpExecutionContext, AssetExecutionContext]] = None + ): + # TODO: Add docstrings + assets_def = context.assets_def + dagster_fivetran_translator = get_translator_from_fivetran_assets(assets_def) + + connector_id = next( + check.not_none(FivetranMetadataSet.extract(spec.metadata).connector_id) + for spec in assets_def.specs + ) + + client = self.get_client() + fivetran_output = client.sync_and_poll( + connector_id=connector_id, + ) + + materialized_asset_keys = set() + for materialization in self._generate_materialization( + fivetran_output=fivetran_output, dagster_fivetran_translator=dagster_fivetran_translator + ): + # scan through all tables actually created, if it was expected then emit an Output. + # otherwise, emit a runtime AssetMaterialization + if materialization.asset_key in context.selected_asset_keys: + yield Output( + value=None, + output_name=materialization.asset_key.to_python_identifier(), + metadata=materialization.metadata, + ) + materialized_asset_keys.add(materialization.asset_key) + else: + context.log.warning( + f"An unexpected asset was materialized: {materialization.asset_key}" + ) + yield materialization unmaterialized_asset_keys = context.selected_asset_keys - materialized_asset_keys if unmaterialized_asset_keys: From b0e89612e03bd8c61e6de1f9dd0a7779dfc540d8 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Tue, 26 Nov 2024 17:47:20 -0500 Subject: [PATCH 6/6] Update materialization logic; add tests --- .../dagster_fivetran/asset_decorator.py | 3 +- .../dagster_fivetran/resources.py | 42 ++- .../experimental/conftest.py | 316 ++++++++++-------- .../experimental/test_resources.py | 71 +++- 4 files changed, 273 insertions(+), 159 deletions(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_decorator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_decorator.py index e926912504953..9c276d552b070 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_decorator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_decorator.py @@ -101,6 +101,8 @@ def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: Fivet ) """ + dagster_fivetran_translator = dagster_fivetran_translator or DagsterFivetranTranslator() + return multi_asset( name=name, group_name=group_name, @@ -111,7 +113,6 @@ def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: Fivet ) for spec in workspace.load_asset_specs( dagster_fivetran_translator=dagster_fivetran_translator - or DagsterFivetranTranslator() ) if FivetranMetadataSet.extract(spec.metadata).connector_id == connector_id ], diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 9af774eb076b7..3ea581594a96f 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -4,7 +4,7 @@ import time from datetime import datetime, timedelta from functools import partial -from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Union +from typing import Any, Callable, Iterator, Mapping, Optional, Sequence, Tuple, Union from urllib.parse import urljoin import requests @@ -14,9 +14,9 @@ Definitions, Failure, InitResourceContext, + MaterializeResult, MetadataValue, OpExecutionContext, - Output, __version__, _check as check, get_dagster_logger, @@ -27,7 +27,7 @@ from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader from dagster._core.definitions.resource_definition import dagster_maintained_resource -from dagster._record import record +from dagster._record import as_dict, record from dagster._utils.cached_method import cached_method from dagster._vendored.dateutil import parser from pydantic import Field, PrivateAttr @@ -977,15 +977,17 @@ def _generate_materialization( yield AssetMaterialization( asset_key=asset_key, - description=f"Table generated via Fivetran sync: {schema.name}.{table.name}", + description=( + f"Table generated via Fivetran sync: {schema.name_in_destination}.{table.name_in_destination}" + ), metadata={ **metadata_for_table( - table, + as_dict(table), get_fivetran_connector_url(fivetran_output.connector_details), include_column_info=True, database=None, - schema=schema.name, - table=table.name, + schema=schema.name_in_destination, + table=table.name_in_destination, ), "schema_source_name": schema_source_name, "table_source_name": table_source_name, @@ -993,9 +995,19 @@ def _generate_materialization( ) def sync_and_poll( - self, context: Optional[Union[OpExecutionContext, AssetExecutionContext]] = None - ): - # TODO: Add docstrings + self, context: Union[OpExecutionContext, AssetExecutionContext] + ) -> Iterator[Union[AssetMaterialization, MaterializeResult]]: + """Executes a sync and poll process to materialize Fivetran assets. + + Args: + context (Union[OpExecutionContext, AssetExecutionContext]): The execution context + from within `@fivetran_assets`. If an AssetExecutionContext is passed, + its underlying OpExecutionContext will be used. + + Returns: + Iterator[Union[AssetMaterialization, MaterializeResult]]: An iterator of MaterializeResult + or AssetMaterialization. + """ assets_def = context.assets_def dagster_fivetran_translator = get_translator_from_fivetran_assets(assets_def) @@ -1013,13 +1025,11 @@ def sync_and_poll( for materialization in self._generate_materialization( fivetran_output=fivetran_output, dagster_fivetran_translator=dagster_fivetran_translator ): - # scan through all tables actually created, if it was expected then emit an Output. - # otherwise, emit a runtime AssetMaterialization + # Scan through all tables actually created, if it was expected then emit a MaterializeResult. + # Otherwise, emit a runtime AssetMaterialization. if materialization.asset_key in context.selected_asset_keys: - yield Output( - value=None, - output_name=materialization.asset_key.to_python_identifier(), - metadata=materialization.metadata, + yield MaterializeResult( + asset_key=materialization.asset_key, metadata=materialization.metadata ) materialized_asset_keys.add(materialization.asset_key) else: diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py index 7fcdc69cc9cc2..2ae8e65742e8c 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py @@ -1,4 +1,5 @@ from typing import Any, Iterator, Mapping +from unittest.mock import patch import pytest import responses @@ -7,6 +8,7 @@ FIVETRAN_API_VERSION, FIVETRAN_CONNECTOR_ENDPOINT, ) +from dagster_fivetran.types import FivetranOutput TEST_MAX_TIME_STR = "2024-12-01T15:45:29.013729Z" TEST_PREVIOUS_MAX_TIME_STR = "2024-12-01T15:43:29.013729Z" @@ -232,168 +234,180 @@ def get_sample_connection_details(succeeded_at: str, failed_at: str) -> Mapping[ # Taken from Fivetran API documentation # https://fivetran.com/docs/rest-api/api-reference/connector-schema/connector-schema-config -SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR = { - "code": "Success", - "message": "Operation performed.", - "data": { - "enable_new_by_default": True, - "schemas": { - "property1": { - "name_in_destination": "schema_name_in_destination_1", - "enabled": True, - "tables": { - "property1": { - "sync_mode": "SOFT_DELETE", - "name_in_destination": "table_name_in_destination_1", - "enabled": True, - "columns": { - "property1": { - "name_in_destination": "column_name_in_destination_1", - "enabled": True, - "hashed": False, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_COLUMN", +# The sample is parameterized to test the sync and poll materialization method +def get_sample_schema_config_for_connector(table_name: str) -> Mapping[str, Any]: + return { + "code": "Success", + "message": "Operation performed.", + "data": { + "enable_new_by_default": True, + "schemas": { + "property1": { + "name_in_destination": "schema_name_in_destination_1", + "enabled": True, + "tables": { + "property1": { + "sync_mode": "SOFT_DELETE", + "name_in_destination": table_name, + "enabled": True, + "columns": { + "property1": { + "name_in_destination": "column_name_in_destination_1", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, }, - "is_primary_key": True, - }, - "property2": { - "name_in_destination": "column_name_in_destination_2", - "enabled": True, - "hashed": False, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_COLUMN", + "property2": { + "name_in_destination": "column_name_in_destination_2", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, }, - "is_primary_key": True, }, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_TABLE", + }, + "supports_columns_config": True, }, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_TABLE", - }, - "supports_columns_config": True, - }, - "property2": { - "sync_mode": "SOFT_DELETE", - "name_in_destination": "table_name_in_destination_2", - "enabled": True, - "columns": { - "property1": { - "name_in_destination": "column_name_in_destination_1", - "enabled": True, - "hashed": False, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_COLUMN", + "property2": { + "sync_mode": "SOFT_DELETE", + "name_in_destination": "table_name_in_destination_2", + "enabled": True, + "columns": { + "property1": { + "name_in_destination": "column_name_in_destination_1", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, }, - "is_primary_key": True, - }, - "property2": { - "name_in_destination": "column_name_in_destination_2", - "enabled": True, - "hashed": False, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_COLUMN", + "property2": { + "name_in_destination": "column_name_in_destination_2", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, }, - "is_primary_key": True, }, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_TABLE", + }, + "supports_columns_config": True, }, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_TABLE", - }, - "supports_columns_config": True, }, }, - }, - "property2": { - "name_in_destination": "schema_name_in_destination_2", - "enabled": True, - "tables": { - "property1": { - "sync_mode": "SOFT_DELETE", - "name_in_destination": "table_name_in_destination_1", - "enabled": True, - "columns": { - "property1": { - "name_in_destination": "column_name_in_destination_1", - "enabled": True, - "hashed": False, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_COLUMN", + "property2": { + "name_in_destination": "schema_name_in_destination_2", + "enabled": True, + "tables": { + "property1": { + "sync_mode": "SOFT_DELETE", + "name_in_destination": "table_name_in_destination_1", + "enabled": True, + "columns": { + "property1": { + "name_in_destination": "column_name_in_destination_1", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, }, - "is_primary_key": True, - }, - "property2": { - "name_in_destination": "column_name_in_destination_1", - "enabled": True, - "hashed": False, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_COLUMN", + "property2": { + "name_in_destination": "column_name_in_destination_1", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, }, - "is_primary_key": True, }, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_TABLE", + }, + "supports_columns_config": True, }, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_TABLE", - }, - "supports_columns_config": True, - }, - "property2": { - "sync_mode": "SOFT_DELETE", - "name_in_destination": "table_name_in_destination_2", - "enabled": True, - "columns": { - "property1": { - "name_in_destination": "column_name_in_destination_1", - "enabled": True, - "hashed": False, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_COLUMN", + "property2": { + "sync_mode": "SOFT_DELETE", + "name_in_destination": "table_name_in_destination_2", + "enabled": True, + "columns": { + "property1": { + "name_in_destination": "column_name_in_destination_1", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, }, - "is_primary_key": True, - }, - "property2": { - "name_in_destination": "column_name_in_destination_2", - "enabled": True, - "hashed": False, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_COLUMN", + "property2": { + "name_in_destination": "column_name_in_destination_2", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, }, - "is_primary_key": True, }, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_TABLE", + }, + "supports_columns_config": True, }, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_TABLE", - }, - "supports_columns_config": True, }, }, }, + "schema_change_handling": "ALLOW_ALL", }, - "schema_change_handling": "ALLOW_ALL", - }, -} + } + + +SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR = get_sample_schema_config_for_connector( + table_name="table_name_in_destination_1" +) + +# We change the name of the original example to test the sync and poll materialization method +ALTERED_SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR = get_sample_schema_config_for_connector( + table_name="another_table_name_in_destination_1" +) SAMPLE_SUCCESS_MESSAGE = {"code": "Success", "message": "Operation performed."} @@ -501,3 +515,25 @@ def all_api_mocks_fixture( status=200, ) yield fetch_workspace_data_api_mocks + + +@pytest.fixture(name="sync_and_poll") +def poll_and_sync_fixture(): + with patch("dagster_fivetran.resources.FivetranClient.sync_and_poll") as mocked_function: + # Fivetran output where all sync'd tables match the workspace data that was used to create the assets def + expected_fivetran_output = FivetranOutput( + connector_details=get_sample_connection_details( + succeeded_at=TEST_MAX_TIME_STR, failed_at=TEST_PREVIOUS_MAX_TIME_STR + )["data"], + schema_config=SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR["data"], + ) + # Fivetran output where a table is missing and an unexpected table is sync'd, + # compared to the workspace data that was used to create the assets def + unexpected_fivetran_output = FivetranOutput( + connector_details=get_sample_connection_details( + succeeded_at=TEST_MAX_TIME_STR, failed_at=TEST_PREVIOUS_MAX_TIME_STR + )["data"], + schema_config=ALTERED_SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR["data"], + ) + mocked_function.side_effect = [expected_fivetran_output, unexpected_fivetran_output] + yield mocked_function diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py index 9c4fb9594253b..b4e76ba9e892c 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py @@ -1,8 +1,13 @@ +from unittest.mock import MagicMock + import pytest import responses -from dagster import Failure +from dagster import AssetExecutionContext, AssetKey, Failure +from dagster._config.field_utils import EnvVar +from dagster._core.definitions.materialize import materialize +from dagster._core.test_utils import environ from dagster._vendored.dateutil import parser -from dagster_fivetran import FivetranOutput, FivetranWorkspace +from dagster_fivetran import FivetranOutput, FivetranWorkspace, fivetran_assets from dagster_fivetran.translator import MIN_TIME_STR from dagster_fivetran_tests.experimental.conftest import ( @@ -205,3 +210,65 @@ def _mock_interaction(): else: with pytest.raises(Failure, match="failed!"): _mock_interaction() + + +def test_fivetran_materialization( + connector_id: str, + fetch_workspace_data_api_mocks: responses.RequestsMock, + sync_and_poll: MagicMock, +) -> None: + with environ({"FIVETRAN_API_KEY": TEST_API_KEY, "FIVETRAN_API_SECRET": TEST_API_SECRET}): + workspace = FivetranWorkspace( + account_id=TEST_ACCOUNT_ID, + api_key=EnvVar("FIVETRAN_API_KEY"), + api_secret=EnvVar("FIVETRAN_API_SECRET"), + ) + + @fivetran_assets(connector_id=connector_id, workspace=workspace, name=connector_id) + def my_fivetran_assets(context: AssetExecutionContext, fivetran: FivetranWorkspace): + yield from fivetran.sync_and_poll(context=context) + + # Mocked FivetranClient.sync_and_poll returns API response where all connector tables are expected + result = materialize( + [my_fivetran_assets], + resources={"fivetran": workspace}, + ) + assert result.success + asset_materializations = [ + event + for event in result.events_for_node(connector_id) + if event.event_type_value == "ASSET_MATERIALIZATION" + ] + assert len(asset_materializations) == 4 + materialized_asset_keys = { + asset_materialization.asset_key for asset_materialization in asset_materializations + } + assert len(materialized_asset_keys) == 4 + assert my_fivetran_assets.keys == materialized_asset_keys + + # Mocked FivetranClient.sync_and_poll returns API response + # where one expected table is missing and an unexpected table is present + result = materialize( + [my_fivetran_assets], + resources={"fivetran": workspace}, + ) + assert result.success + asset_materializations = [ + event + for event in result.events_for_node(connector_id) + if event.event_type_value == "ASSET_MATERIALIZATION" + ] + assert len(asset_materializations) == 4 + materialized_asset_keys = { + asset_materialization.asset_key for asset_materialization in asset_materializations + } + assert len(materialized_asset_keys) == 4 + assert my_fivetran_assets.keys != materialized_asset_keys + assert ( + AssetKey(["schema_name_in_destination_1", "another_table_name_in_destination_1"]) + in materialized_asset_keys + ) + assert ( + AssetKey(["schema_name_in_destination_1", "table_name_in_destination_1"]) + not in materialized_asset_keys + )