diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_decorator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_decorator.py index 86c8929421dac..9c276d552b070 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_decorator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_decorator.py @@ -5,6 +5,7 @@ from dagster_fivetran.resources import FivetranWorkspace from dagster_fivetran.translator import DagsterFivetranTranslator, FivetranMetadataSet +from dagster_fivetran.utils import DAGSTER_FIVETRAN_TRANSLATOR_METADATA_KEY @experimental @@ -100,15 +101,18 @@ def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: Fivet ) """ + dagster_fivetran_translator = dagster_fivetran_translator or DagsterFivetranTranslator() + return multi_asset( name=name, group_name=group_name, can_subset=True, specs=[ - spec + spec.merge_attributes( + metadata={DAGSTER_FIVETRAN_TRANSLATOR_METADATA_KEY: dagster_fivetran_translator} + ) for spec in workspace.load_asset_specs( dagster_fivetran_translator=dagster_fivetran_translator - or DagsterFivetranTranslator() ) if FivetranMetadataSet.extract(spec.metadata).connector_id == connector_id ], diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index effbc8432d286..a2c82215d0e46 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -4,15 +4,17 @@ import time from datetime import datetime, timedelta from functools import partial -from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Union +from typing import Any, Callable, Iterator, Mapping, Optional, Sequence, Tuple, Union from urllib.parse import urljoin import requests from dagster import ( AssetExecutionContext, + AssetMaterialization, Definitions, Failure, InitResourceContext, + MaterializeResult, MetadataValue, OpExecutionContext, __version__, @@ -25,7 +27,7 @@ from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader from dagster._core.definitions.resource_definition import dagster_maintained_resource -from dagster._record import record +from dagster._record import as_dict, record from dagster._utils.cached_method import cached_method from dagster._vendored.dateutil import parser from pydantic import Field, PrivateAttr @@ -36,12 +38,20 @@ DagsterFivetranTranslator, FivetranConnector, FivetranConnectorScheduleType, + FivetranConnectorTableProps, FivetranDestination, + FivetranMetadataSet, FivetranSchemaConfig, FivetranWorkspaceData, ) from dagster_fivetran.types import FivetranOutput -from dagster_fivetran.utils import get_fivetran_connector_url, get_fivetran_logs_url +from dagster_fivetran.utils import ( + get_fivetran_connector_table_name, + get_fivetran_connector_url, + get_fivetran_logs_url, + get_translator_from_fivetran_assets, + metadata_for_table, +) FIVETRAN_API_BASE = "https://api.fivetran.com" FIVETRAN_API_VERSION = "v1" @@ -832,6 +842,7 @@ class FivetranWorkspace(ConfigurableResource): _client: FivetranClient = PrivateAttr(default=None) + @cached_method def get_client(self) -> FivetranClient: return FivetranClient( api_key=self.api_key, @@ -929,10 +940,108 @@ def load_asset_specs( dagster_fivetran_translator=dagster_fivetran_translator or DagsterFivetranTranslator(), ) - def sync_and_poll( - self, context: Optional[Union[OpExecutionContext, AssetExecutionContext]] = None + def _generate_materialization( + self, + fivetran_output: FivetranOutput, + dagster_fivetran_translator: DagsterFivetranTranslator, ): - raise NotImplementedError() + connector = FivetranConnector.from_connector_details( + connector_details=fivetran_output.connector_details + ) + schema_config = FivetranSchemaConfig.from_schema_config_details( + schema_config_details=fivetran_output.schema_config + ) + + for schema_source_name, schema in schema_config.schemas.items(): + if not schema.enabled: + continue + + for table_source_name, table in schema.tables.items(): + if not table.enabled: + continue + + asset_key = dagster_fivetran_translator.get_asset_spec( + props=FivetranConnectorTableProps( + table=get_fivetran_connector_table_name( + schema_name=schema.name_in_destination, + table_name=table.name_in_destination, + ), + connector_id=connector.id, + name=connector.name, + connector_url=connector.url, + schema_config=schema_config, + database=None, + service=None, + ) + ).key + + yield AssetMaterialization( + asset_key=asset_key, + description=( + f"Table generated via Fivetran sync: {schema.name_in_destination}.{table.name_in_destination}" + ), + metadata={ + **metadata_for_table( + as_dict(table), + get_fivetran_connector_url(fivetran_output.connector_details), + include_column_info=True, + database=None, + schema=schema.name_in_destination, + table=table.name_in_destination, + ), + "schema_source_name": schema_source_name, + "table_source_name": table_source_name, + }, + ) + + def sync_and_poll( + self, context: Union[OpExecutionContext, AssetExecutionContext] + ) -> Iterator[Union[AssetMaterialization, MaterializeResult]]: + """Executes a sync and poll process to materialize Fivetran assets. + + Args: + context (Union[OpExecutionContext, AssetExecutionContext]): The execution context + from within `@fivetran_assets`. If an AssetExecutionContext is passed, + its underlying OpExecutionContext will be used. + + Returns: + Iterator[Union[AssetMaterialization, MaterializeResult]]: An iterator of MaterializeResult + or AssetMaterialization. + """ + assets_def = context.assets_def + dagster_fivetran_translator = get_translator_from_fivetran_assets(assets_def) + + connector_id = next( + check.not_none(FivetranMetadataSet.extract(spec.metadata).connector_id) + for spec in assets_def.specs + ) + + client = self.get_client() + fivetran_output = client.sync_and_poll( + connector_id=connector_id, + ) + + materialized_asset_keys = set() + for materialization in self._generate_materialization( + fivetran_output=fivetran_output, dagster_fivetran_translator=dagster_fivetran_translator + ): + # Scan through all tables actually created, if it was expected then emit a MaterializeResult. + # Otherwise, emit a runtime AssetMaterialization. + if materialization.asset_key in context.selected_asset_keys: + yield MaterializeResult( + asset_key=materialization.asset_key, metadata=materialization.metadata + ) + materialized_asset_keys.add(materialization.asset_key) + else: + context.log.warning( + f"An unexpected asset was materialized: {materialization.asset_key}. " + f"Yielding a materialization event." + ) + yield materialization + + unmaterialized_asset_keys = context.selected_asset_keys - materialized_asset_keys + if unmaterialized_asset_keys: + context.log.warning(f"Assets were not materialized: {unmaterialized_asset_keys}") @experimental diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/utils.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/utils.py index 62589c4deb5b6..13c67b76135b0 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/utils.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/utils.py @@ -1,13 +1,23 @@ -from typing import Any, Dict, Iterator, Mapping, Optional, Sequence +from typing import TYPE_CHECKING, Any, Dict, Iterator, Mapping, Optional, Sequence import dagster._check as check -from dagster import AssetMaterialization, MetadataValue +from dagster import ( + AssetMaterialization, + AssetsDefinition, + DagsterInvariantViolationError, + MetadataValue, +) from dagster._core.definitions.metadata import RawMetadataMapping from dagster._core.definitions.metadata.metadata_set import TableMetadataSet from dagster._core.definitions.metadata.table import TableColumn, TableSchema from dagster_fivetran.types import FivetranOutput +if TYPE_CHECKING: + from dagster_fivetran import DagsterFivetranTranslator + +DAGSTER_FIVETRAN_TRANSLATOR_METADATA_KEY = "dagster-fivetran/dagster_fivetran_translator" + def get_fivetran_connector_url(connector_details: Mapping[str, Any]) -> str: service = connector_details["service"] @@ -23,6 +33,21 @@ def get_fivetran_connector_table_name(schema_name: str, table_name: str) -> str: return f"{schema_name}.{table_name}" +def get_translator_from_fivetran_assets( + fivetran_assets: AssetsDefinition, +) -> "DagsterFivetranTranslator": + metadata_by_key = fivetran_assets.metadata_by_key or {} + first_asset_key = next(iter(fivetran_assets.metadata_by_key.keys())) + first_metadata = metadata_by_key.get(first_asset_key, {}) + dagster_fivetran_translator = first_metadata.get(DAGSTER_FIVETRAN_TRANSLATOR_METADATA_KEY) + if dagster_fivetran_translator is None: + raise DagsterInvariantViolationError( + f"Expected to find fivetran translator metadata on asset {first_asset_key.to_user_string()}," + " but did not. Did you pass in assets that weren't generated by @fivetran_assets?" + ) + return dagster_fivetran_translator + + def metadata_for_table( table_data: Mapping[str, Any], connector_url: str, diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py index 7fcdc69cc9cc2..3d8c5db724e0c 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py @@ -1,4 +1,5 @@ from typing import Any, Iterator, Mapping +from unittest.mock import patch import pytest import responses @@ -7,6 +8,7 @@ FIVETRAN_API_VERSION, FIVETRAN_CONNECTOR_ENDPOINT, ) +from dagster_fivetran.types import FivetranOutput TEST_MAX_TIME_STR = "2024-12-01T15:45:29.013729Z" TEST_PREVIOUS_MAX_TIME_STR = "2024-12-01T15:43:29.013729Z" @@ -232,168 +234,180 @@ def get_sample_connection_details(succeeded_at: str, failed_at: str) -> Mapping[ # Taken from Fivetran API documentation # https://fivetran.com/docs/rest-api/api-reference/connector-schema/connector-schema-config -SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR = { - "code": "Success", - "message": "Operation performed.", - "data": { - "enable_new_by_default": True, - "schemas": { - "property1": { - "name_in_destination": "schema_name_in_destination_1", - "enabled": True, - "tables": { - "property1": { - "sync_mode": "SOFT_DELETE", - "name_in_destination": "table_name_in_destination_1", - "enabled": True, - "columns": { - "property1": { - "name_in_destination": "column_name_in_destination_1", - "enabled": True, - "hashed": False, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_COLUMN", +# The sample is parameterized to test the sync and poll materialization method +def get_sample_schema_config_for_connector(table_name: str) -> Mapping[str, Any]: + return { + "code": "Success", + "message": "Operation performed.", + "data": { + "enable_new_by_default": True, + "schemas": { + "property1": { + "name_in_destination": "schema_name_in_destination_1", + "enabled": True, + "tables": { + "property1": { + "sync_mode": "SOFT_DELETE", + "name_in_destination": table_name, + "enabled": True, + "columns": { + "property1": { + "name_in_destination": "column_name_in_destination_1", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, }, - "is_primary_key": True, - }, - "property2": { - "name_in_destination": "column_name_in_destination_2", - "enabled": True, - "hashed": False, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_COLUMN", + "property2": { + "name_in_destination": "column_name_in_destination_2", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, }, - "is_primary_key": True, }, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_TABLE", + }, + "supports_columns_config": True, }, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_TABLE", - }, - "supports_columns_config": True, - }, - "property2": { - "sync_mode": "SOFT_DELETE", - "name_in_destination": "table_name_in_destination_2", - "enabled": True, - "columns": { - "property1": { - "name_in_destination": "column_name_in_destination_1", - "enabled": True, - "hashed": False, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_COLUMN", + "property2": { + "sync_mode": "SOFT_DELETE", + "name_in_destination": "table_name_in_destination_2", + "enabled": True, + "columns": { + "property1": { + "name_in_destination": "column_name_in_destination_1", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, }, - "is_primary_key": True, - }, - "property2": { - "name_in_destination": "column_name_in_destination_2", - "enabled": True, - "hashed": False, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_COLUMN", + "property2": { + "name_in_destination": "column_name_in_destination_2", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, }, - "is_primary_key": True, }, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_TABLE", + }, + "supports_columns_config": True, }, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_TABLE", - }, - "supports_columns_config": True, }, }, - }, - "property2": { - "name_in_destination": "schema_name_in_destination_2", - "enabled": True, - "tables": { - "property1": { - "sync_mode": "SOFT_DELETE", - "name_in_destination": "table_name_in_destination_1", - "enabled": True, - "columns": { - "property1": { - "name_in_destination": "column_name_in_destination_1", - "enabled": True, - "hashed": False, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_COLUMN", + "property2": { + "name_in_destination": "schema_name_in_destination_2", + "enabled": True, + "tables": { + "property1": { + "sync_mode": "SOFT_DELETE", + "name_in_destination": "table_name_in_destination_1", + "enabled": True, + "columns": { + "property1": { + "name_in_destination": "column_name_in_destination_1", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, }, - "is_primary_key": True, - }, - "property2": { - "name_in_destination": "column_name_in_destination_1", - "enabled": True, - "hashed": False, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_COLUMN", + "property2": { + "name_in_destination": "column_name_in_destination_1", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, }, - "is_primary_key": True, }, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_TABLE", + }, + "supports_columns_config": True, }, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_TABLE", - }, - "supports_columns_config": True, - }, - "property2": { - "sync_mode": "SOFT_DELETE", - "name_in_destination": "table_name_in_destination_2", - "enabled": True, - "columns": { - "property1": { - "name_in_destination": "column_name_in_destination_1", - "enabled": True, - "hashed": False, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_COLUMN", + "property2": { + "sync_mode": "SOFT_DELETE", + "name_in_destination": "table_name_in_destination_2", + "enabled": True, + "columns": { + "property1": { + "name_in_destination": "column_name_in_destination_1", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, }, - "is_primary_key": True, - }, - "property2": { - "name_in_destination": "column_name_in_destination_2", - "enabled": True, - "hashed": False, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_COLUMN", + "property2": { + "name_in_destination": "column_name_in_destination_2", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, }, - "is_primary_key": True, }, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_TABLE", + }, + "supports_columns_config": True, }, - "enabled_patch_settings": { - "allowed": False, - "reason": "...", - "reason_code": "SYSTEM_TABLE", - }, - "supports_columns_config": True, }, }, }, + "schema_change_handling": "ALLOW_ALL", }, - "schema_change_handling": "ALLOW_ALL", - }, -} + } + + +SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR = get_sample_schema_config_for_connector( + table_name="table_name_in_destination_1" +) + +# We change the name of the original example to test the sync and poll materialization method +ALTERED_SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR = get_sample_schema_config_for_connector( + table_name="another_table_name_in_destination_1" +) SAMPLE_SUCCESS_MESSAGE = {"code": "Success", "message": "Operation performed."} @@ -501,3 +515,25 @@ def all_api_mocks_fixture( status=200, ) yield fetch_workspace_data_api_mocks + + +@pytest.fixture(name="sync_and_poll") +def sync_and_poll_fixture(): + with patch("dagster_fivetran.resources.FivetranClient.sync_and_poll") as mocked_function: + # Fivetran output where all sync'd tables match the workspace data that was used to create the assets def + expected_fivetran_output = FivetranOutput( + connector_details=get_sample_connection_details( + succeeded_at=TEST_MAX_TIME_STR, failed_at=TEST_PREVIOUS_MAX_TIME_STR + )["data"], + schema_config=SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR["data"], + ) + # Fivetran output where a table is missing and an unexpected table is sync'd, + # compared to the workspace data that was used to create the assets def + unexpected_fivetran_output = FivetranOutput( + connector_details=get_sample_connection_details( + succeeded_at=TEST_MAX_TIME_STR, failed_at=TEST_PREVIOUS_MAX_TIME_STR + )["data"], + schema_config=ALTERED_SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR["data"], + ) + mocked_function.side_effect = [expected_fivetran_output, unexpected_fivetran_output] + yield mocked_function diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py index 9c4fb9594253b..a371a44feec57 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py @@ -1,8 +1,14 @@ +import re +from unittest.mock import MagicMock + import pytest import responses -from dagster import Failure +from dagster import AssetExecutionContext, AssetKey, Failure +from dagster._config.field_utils import EnvVar +from dagster._core.definitions.materialize import materialize +from dagster._core.test_utils import environ from dagster._vendored.dateutil import parser -from dagster_fivetran import FivetranOutput, FivetranWorkspace +from dagster_fivetran import FivetranOutput, FivetranWorkspace, fivetran_assets from dagster_fivetran.translator import MIN_TIME_STR from dagster_fivetran_tests.experimental.conftest import ( @@ -137,7 +143,7 @@ def test_basic_resource_request( "resync_long_success", ], ) -def test_sync_and_poll_methods(method, n_polls, succeed_at_end, connector_id): +def test_sync_and_poll_client_methods(method, n_polls, succeed_at_end, connector_id): resource = FivetranWorkspace( account_id=TEST_ACCOUNT_ID, api_key=TEST_API_KEY, api_secret=TEST_API_SECRET ) @@ -205,3 +211,75 @@ def _mock_interaction(): else: with pytest.raises(Failure, match="failed!"): _mock_interaction() + + +def test_fivetran_sync_and_poll_materialization_method( + connector_id: str, + fetch_workspace_data_api_mocks: responses.RequestsMock, + sync_and_poll: MagicMock, + capsys: pytest.CaptureFixture, +) -> None: + with environ({"FIVETRAN_API_KEY": TEST_API_KEY, "FIVETRAN_API_SECRET": TEST_API_SECRET}): + workspace = FivetranWorkspace( + account_id=TEST_ACCOUNT_ID, + api_key=EnvVar("FIVETRAN_API_KEY"), + api_secret=EnvVar("FIVETRAN_API_SECRET"), + ) + + @fivetran_assets(connector_id=connector_id, workspace=workspace, name=connector_id) + def my_fivetran_assets(context: AssetExecutionContext, fivetran: FivetranWorkspace): + yield from fivetran.sync_and_poll(context=context) + + # Mocked FivetranClient.sync_and_poll returns API response where all connector tables are expected + result = materialize( + [my_fivetran_assets], + resources={"fivetran": workspace}, + ) + assert result.success + asset_materializations = [ + event + for event in result.events_for_node(connector_id) + if event.event_type_value == "ASSET_MATERIALIZATION" + ] + assert len(asset_materializations) == 4 + materialized_asset_keys = { + asset_materialization.asset_key for asset_materialization in asset_materializations + } + assert len(materialized_asset_keys) == 4 + assert my_fivetran_assets.keys == materialized_asset_keys + + # Mocked FivetranClient.sync_and_poll returns API response + # where one expected table is missing and an unexpected table is present + result = materialize( + [my_fivetran_assets], + resources={"fivetran": workspace}, + ) + + assert result.success + asset_materializations = [ + event + for event in result.events_for_node(connector_id) + if event.event_type_value == "ASSET_MATERIALIZATION" + ] + assert len(asset_materializations) == 4 + materialized_asset_keys = { + asset_materialization.asset_key for asset_materialization in asset_materializations + } + assert len(materialized_asset_keys) == 4 + assert my_fivetran_assets.keys != materialized_asset_keys + assert ( + AssetKey(["schema_name_in_destination_1", "another_table_name_in_destination_1"]) + in materialized_asset_keys + ) + assert ( + AssetKey(["schema_name_in_destination_1", "table_name_in_destination_1"]) + not in materialized_asset_keys + ) + + captured = capsys.readouterr() + assert re.search( + r"dagster - WARNING - (?s:.)+ - An unexpected asset was materialized", captured.err + ) + assert re.search( + r"dagster - WARNING - (?s:.)+ - Assets were not materialized", captured.err + )