From b3e4e7cf44d11c5a6fa9a7ed79e8c6697eb7a387 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong <46797220+maximearmstrong@users.noreply.github.com> Date: Fri, 8 Nov 2024 12:40:38 -0500 Subject: [PATCH] [2/n][dagster-fivetran] Update DagsterFivetranTranslator and related classes for rework (#25751) ## Summary & Motivation ~~Builds out a very barebones translator class for the new version of the Fivetran integration.~~ ~~The implementation for this translator will be inspired by the `DagsterFivetranTranslator` introduced in https://github.com/dagster-io/dagster/pull/25557, but a new implementation is required to leverage the workspace context and state-backed definitions, which is incompatible with the current translator and way of building assets.~~ Edit after Ben's comment [here](https://github.com/dagster-io/dagster/pull/25751#discussion_r1830086548): Move things around under translator.py. This PR leverages the `DagsterFivetranTranslator` introduced in introduced in https://github.com/dagster-io/dagster/pull/25557. `FivetranWorkspaceData` will implement the method `to_fivetran_connector_table_props_data` in a subsequent PR, that will map raw connector and destination data fetched using the Fivetran API into `FivetranConnectorTableProps` objects, that are compatible with the translator. This process will match what we currently do. ## How I Tested These Changes Tests will be added in subsequent PRs. --- .../dagster_fivetran/__init__.py | 2 +- .../dagster_fivetran/asset_defs.py | 49 +--------- .../dagster_fivetran/resources.py | 36 +------ .../dagster_fivetran/translator.py | 93 +++++++++++++++++++ .../test_load_from_instance.py | 11 ++- 5 files changed, 104 insertions(+), 87 deletions(-) create mode 100644 python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/__init__.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/__init__.py index 2f0716a2e6c54..962a739de8c5c 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/__init__.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/__init__.py @@ -1,7 +1,6 @@ from dagster._core.libraries import DagsterLibraryRegistry from dagster_fivetran.asset_defs import ( - DagsterFivetranTranslator as DagsterFivetranTranslator, build_fivetran_assets as build_fivetran_assets, load_assets_from_fivetran_instance as load_assets_from_fivetran_instance, ) @@ -14,6 +13,7 @@ FivetranWorkspace as FivetranWorkspace, fivetran_resource as fivetran_resource, ) +from dagster_fivetran.translator import DagsterFivetranTranslator as DagsterFivetranTranslator from dagster_fivetran.types import FivetranOutput as FivetranOutput from dagster_fivetran.version import __version__ as __version__ diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_defs.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_defs.py index 0c72109c22414..a21783690767c 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_defs.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_defs.py @@ -43,6 +43,7 @@ from dagster._utils.log import get_dagster_logger from dagster_fivetran.resources import DEFAULT_POLL_INTERVAL, FivetranResource +from dagster_fivetran.translator import DagsterFivetranTranslator, FivetranConnectorTableProps from dagster_fivetran.utils import ( generate_materializations, get_fivetran_connector_url, @@ -53,50 +54,6 @@ logger = get_dagster_logger() -class FivetranConnectorTableProps(NamedTuple): - table: str - connector_id: str - name: str - connector_url: str - schemas: Mapping[str, Any] - database: Optional[str] - service: Optional[str] - - -class DagsterFivetranTranslator: - def get_asset_key(self, props: FivetranConnectorTableProps) -> AssetKey: - """Get the AssetKey for a table synced by a Fivetran connector.""" - return AssetKey(props.table.split(".")) - - def get_asset_spec(self, props: FivetranConnectorTableProps) -> AssetSpec: - """Get the AssetSpec for a table synced by a Fivetran connector.""" - schema_name, table_name = props.table.split(".") - schema_entry = next( - schema - for schema in props.schemas["schemas"].values() - if schema["name_in_destination"] == schema_name - ) - table_entry = next( - table_entry - for table_entry in schema_entry["tables"].values() - if table_entry["name_in_destination"] == table_name - ) - - metadata = metadata_for_table( - table_entry, - props.connector_url, - database=props.database, - schema=schema_name, - table=table_name, - ) - - return AssetSpec( - key=self.get_asset_key(props), - metadata=metadata, - kinds={"fivetran", *({props.service} if props.service else set())}, - ) - - def _fetch_and_attach_col_metadata( fivetran_resource: FivetranResource, connector_id: str, materialization: AssetMaterialization ) -> AssetMaterialization: @@ -165,9 +122,9 @@ def _build_fivetran_assets( tracked_asset_keys = { table: AssetKey([*asset_key_prefix, *table.split(".")]) if not translator_instance or not connection_metadata - else translator_instance.get_asset_key( + else translator_instance.get_asset_spec( FivetranConnectorTableProps(table=table, **connection_metadata._asdict()) - ) + ).key for table in destination_tables } user_facing_asset_keys = table_to_asset_key_map or tracked_asset_keys diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 5bd61ffb1b86d..b5c7043415856 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -3,7 +3,6 @@ import logging import os import time -from enum import Enum from typing import Any, Mapping, Optional, Sequence, Tuple from urllib.parse import urljoin @@ -20,14 +19,13 @@ from dagster._annotations import experimental from dagster._config.pythonic_config import ConfigurableResource from dagster._core.definitions.resource_definition import dagster_maintained_resource -from dagster._record import record -from dagster._serdes.serdes import whitelist_for_serdes from dagster._utils.cached_method import cached_method from dagster._vendored.dateutil import parser from pydantic import Field, PrivateAttr from requests.auth import HTTPBasicAuth from requests.exceptions import RequestException +from dagster_fivetran.translator import FivetranWorkspaceData from dagster_fivetran.types import FivetranOutput from dagster_fivetran.utils import get_fivetran_connector_url, get_fivetran_logs_url @@ -445,38 +443,6 @@ def my_fivetran_job(): # ------------------ # Reworked resources # ------------------ -class FivetranContentType(Enum): - """Enum representing each object in Fivetran's ontology.""" - - CONNECTOR = "connector" - DESTINATION = "destination" - - -@whitelist_for_serdes -@record -class FivetranContentData: - """A record representing a piece of content in a Fivetran workspace. - Includes the object's type and data as returned from the API. - """ - - content_type: FivetranContentType - properties: Mapping[str, Any] - - -@record -class FivetranWorkspaceData: - """A record representing all content in a Fivetran workspace. - Provided as context for the translator so that it can resolve dependencies between content. - """ - - connectors_by_id: Mapping[str, FivetranContentData] - destinations_by_id: Mapping[str, FivetranContentData] - - @classmethod - def from_content_data( - cls, content_data: Sequence[FivetranContentData] - ) -> "FivetranWorkspaceData": - raise NotImplementedError() @experimental diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py new file mode 100644 index 0000000000000..4b93bb83fe588 --- /dev/null +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py @@ -0,0 +1,93 @@ +from enum import Enum +from typing import Any, Mapping, NamedTuple, Optional, Sequence + +from dagster._core.definitions.asset_key import AssetKey +from dagster._core.definitions.asset_spec import AssetSpec +from dagster._record import record +from dagster._serdes.serdes import whitelist_for_serdes + +from dagster_fivetran.utils import metadata_for_table + + +class FivetranConnectorTableProps(NamedTuple): + table: str + connector_id: str + name: str + connector_url: str + schemas: Mapping[str, Any] + database: Optional[str] + service: Optional[str] + + +class FivetranContentType(Enum): + """Enum representing each object in Fivetran's ontology.""" + + CONNECTOR = "connector" + DESTINATION = "destination" + + +@whitelist_for_serdes +@record +class FivetranContentData: + """A record representing a piece of content in a Fivetran workspace. + Includes the object's type and data as returned from the API. + """ + + content_type: FivetranContentType + properties: Mapping[str, Any] + + +@record +class FivetranWorkspaceData: + """A record representing all content in a Fivetran workspace. + Provided as context for the translator so that it can resolve dependencies between content. + """ + + connectors_by_id: Mapping[str, FivetranContentData] + destinations_by_id: Mapping[str, FivetranContentData] + + @classmethod + def from_content_data( + cls, content_data: Sequence[FivetranContentData] + ) -> "FivetranWorkspaceData": + raise NotImplementedError() + + def to_fivetran_connector_table_props_data(self) -> Sequence[FivetranConnectorTableProps]: + """Method that converts a `FivetranWorkspaceData` object + to a collection of `FivetranConnectorTableProps` objects. + """ + raise NotImplementedError() + + +class DagsterFivetranTranslator: + """Translator class which converts a `FivetranConnectorTableProps` object into AssetSpecs. + Subclass this class to implement custom logic for each type of Fivetran content. + """ + + def get_asset_spec(self, props: FivetranConnectorTableProps) -> AssetSpec: + """Get the AssetSpec for a table synced by a Fivetran connector.""" + schema_name, table_name = props.table.split(".") + schema_entry = next( + schema + for schema in props.schemas["schemas"].values() + if schema["name_in_destination"] == schema_name + ) + table_entry = next( + table_entry + for table_entry in schema_entry["tables"].values() + if table_entry["name_in_destination"] == table_name + ) + + metadata = metadata_for_table( + table_entry, + props.connector_url, + database=props.database, + schema=schema_name, + table=table_name, + ) + + return AssetSpec( + key=AssetKey(props.table.split(".")), + metadata=metadata, + kinds={"fivetran", *({props.service} if props.service else set())}, + ) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_load_from_instance.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_load_from_instance.py index b6bc8e5e06811..3e19d14d61cba 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_load_from_instance.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_load_from_instance.py @@ -12,7 +12,7 @@ asset, io_manager, ) -from dagster._core.definitions.asset_spec import AssetSpec +from dagster._core.definitions.asset_spec import AssetSpec, replace_attributes from dagster._core.definitions.materialize import materialize from dagster._core.definitions.metadata.metadata_value import MetadataValue from dagster._core.definitions.metadata.table import TableColumn, TableSchema @@ -269,12 +269,13 @@ def downstream_asset(xyz): class CustomDagsterFivetranTranslator(DagsterFivetranTranslator): - def get_asset_key(self, props: FivetranConnectorTableProps) -> AssetKey: - return super().get_asset_key(props).with_prefix("my_prefix") - def get_asset_spec(self, props: FivetranConnectorTableProps) -> AssetSpec: asset_spec = super().get_asset_spec(props) - return asset_spec._replace(metadata={"foo": "bar", **asset_spec.metadata}) + return replace_attributes( + asset_spec, + key=asset_spec.key.with_prefix("my_prefix"), + metadata={"foo": "bar", **asset_spec.metadata}, + ) @responses.activate