From 24ed962486df46021953118380722f4707d1dbd8 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Wed, 6 Nov 2024 16:10:35 -0500 Subject: [PATCH] Update folder structure --- .../dagster_fivetran/__init__.py | 1 + .../dagster_fivetran/experimental/__init__.py | 1 - .../experimental/resources.py | 147 ------------------ .../dagster_fivetran/resources.py | 145 ++++++++++++++++- 4 files changed, 145 insertions(+), 149 deletions(-) delete mode 100644 python_modules/libraries/dagster-fivetran/dagster_fivetran/experimental/__init__.py delete mode 100644 python_modules/libraries/dagster-fivetran/dagster_fivetran/experimental/resources.py diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/__init__.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/__init__.py index 6e65515e20f45..2f0716a2e6c54 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/__init__.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/__init__.py @@ -11,6 +11,7 @@ ) from dagster_fivetran.resources import ( FivetranResource as FivetranResource, + FivetranWorkspace as FivetranWorkspace, fivetran_resource as fivetran_resource, ) from dagster_fivetran.types import FivetranOutput as FivetranOutput diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/experimental/__init__.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/experimental/__init__.py deleted file mode 100644 index e06302e5a4e26..0000000000000 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/experimental/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from dagster_fivetran.experimental.resources import FivetranWorkspace as FivetranWorkspace diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/experimental/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/experimental/resources.py deleted file mode 100644 index c1be8948a9aee..0000000000000 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/experimental/resources.py +++ /dev/null @@ -1,147 +0,0 @@ -import logging -from enum import Enum -from typing import Any, Mapping, Optional, Sequence - -from dagster import get_dagster_logger -from dagster._annotations import experimental -from dagster._config.pythonic_config import ConfigurableResource -from dagster._record import record -from dagster._serdes.serdes import whitelist_for_serdes -from dagster._utils.cached_method import cached_method -from pydantic import Field, PrivateAttr -from requests.auth import HTTPBasicAuth - - -class FivetranContentType(Enum): - """Enum representing each object in Fivetran's ontology.""" - - CONNECTOR = "connector" - DESTINATION = "destination" - - -@whitelist_for_serdes -@record -class FivetranContentData: - """A record representing a piece of content in a Fivetran workspace. - Includes the object's type and data as returned from the API. - """ - - content_type: FivetranContentType - properties: Mapping[str, Any] - - -@record -class FivetranWorkspaceData: - """A record representing all content in a Fivetran workspace. - Provided as context for the translator so that it can resolve dependencies between content. - """ - - connectors_by_id: Mapping[str, FivetranContentData] - destinations_by_id: Mapping[str, FivetranContentData] - - @classmethod - def from_content_data( - cls, content_data: Sequence[FivetranContentData] - ) -> "FivetranWorkspaceData": - raise NotImplementedError() - - -@experimental -class FivetranClient: - """This class exposes methods on top of the Fivetran REST API.""" - - def __init__( - self, - api_key: str, - api_secret: str, - request_max_retries: int, - request_retry_delay: float, - ): - self.api_key = api_key - self.api_secret = api_secret - self.request_max_retries = request_max_retries - self.request_retry_delay = request_retry_delay - - @property - def _auth(self) -> HTTPBasicAuth: - raise NotImplementedError() - - @property - @cached_method - def _log(self) -> logging.Logger: - return get_dagster_logger() - - @property - def api_base_url(self) -> str: - raise NotImplementedError() - - @property - def api_connector_url(self) -> str: - raise NotImplementedError() - - def make_connector_request( - self, method: str, endpoint: str, data: Optional[str] = None - ) -> Mapping[str, Any]: - raise NotImplementedError() - - def make_request( - self, method: str, endpoint: str, data: Optional[str] = None - ) -> Mapping[str, Any]: - raise NotImplementedError() - - def get_connector_details(self, connector_id: str) -> Mapping[str, Any]: - """Fetches details about a given connector from the Fivetran API.""" - raise NotImplementedError() - - def get_connectors_for_group(self, group_id: str) -> Mapping[str, Any]: - """Fetches all connectors for a given group from the Fivetran API.""" - raise NotImplementedError() - - def get_destination_details(self, destination_id: str) -> Mapping[str, Any]: - """Fetches details about a given destination from the Fivetran API.""" - raise NotImplementedError() - - def get_groups(self) -> Mapping[str, Any]: - """Fetches all groups from the Fivetran API.""" - raise NotImplementedError() - - -class FivetranWorkspace(ConfigurableResource): - """This class represents a Fivetran workspace and provides utilities - to interact with Fivetran APIs. - """ - - api_key: str = Field(description="The Fivetran API key to use for this resource.") - api_secret: str = Field(description="The Fivetran API secret to use for this resource.") - request_max_retries: int = Field( - default=3, - description=( - "The maximum number of times requests to the Fivetran API should be retried " - "before failing." - ), - ) - request_retry_delay: float = Field( - default=0.25, - description="Time (in seconds) to wait between each request retry.", - ) - - _client: FivetranClient = PrivateAttr(default=None) - - def get_client(self) -> FivetranClient: - return FivetranClient( - api_key=self.api_key, - api_secret=self.api_secret, - request_max_retries=self.request_max_retries, - request_retry_delay=self.request_retry_delay, - ) - - def fetch_fivetran_workspace_data( - self, - ) -> FivetranWorkspaceData: - """Retrieves all Fivetran content from the workspace and returns it as a FivetranWorkspaceData object. - Future work will cache this data to avoid repeated calls to the Fivetran API. - - Returns: - FivetranWorkspaceData: A snapshot of the Fivetran workspace's content. - """ - raise NotImplementedError() diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 2009bdea1ec37..5bd61ffb1b86d 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -3,6 +3,7 @@ import logging import os import time +from enum import Enum from typing import Any, Mapping, Optional, Sequence, Tuple from urllib.parse import urljoin @@ -16,11 +17,14 @@ get_dagster_logger, resource, ) +from dagster._annotations import experimental from dagster._config.pythonic_config import ConfigurableResource from dagster._core.definitions.resource_definition import dagster_maintained_resource +from dagster._record import record +from dagster._serdes.serdes import whitelist_for_serdes from dagster._utils.cached_method import cached_method from dagster._vendored.dateutil import parser -from pydantic import Field +from pydantic import Field, PrivateAttr from requests.auth import HTTPBasicAuth from requests.exceptions import RequestException @@ -436,3 +440,142 @@ def my_fivetran_job(): """ return FivetranResource.from_resource_context(context) + + +# ------------------ +# Reworked resources +# ------------------ +class FivetranContentType(Enum): + """Enum representing each object in Fivetran's ontology.""" + + CONNECTOR = "connector" + DESTINATION = "destination" + + +@whitelist_for_serdes +@record +class FivetranContentData: + """A record representing a piece of content in a Fivetran workspace. + Includes the object's type and data as returned from the API. + """ + + content_type: FivetranContentType + properties: Mapping[str, Any] + + +@record +class FivetranWorkspaceData: + """A record representing all content in a Fivetran workspace. + Provided as context for the translator so that it can resolve dependencies between content. + """ + + connectors_by_id: Mapping[str, FivetranContentData] + destinations_by_id: Mapping[str, FivetranContentData] + + @classmethod + def from_content_data( + cls, content_data: Sequence[FivetranContentData] + ) -> "FivetranWorkspaceData": + raise NotImplementedError() + + +@experimental +class FivetranClient: + """This class exposes methods on top of the Fivetran REST API.""" + + def __init__( + self, + api_key: str, + api_secret: str, + request_max_retries: int, + request_retry_delay: float, + ): + self.api_key = api_key + self.api_secret = api_secret + self.request_max_retries = request_max_retries + self.request_retry_delay = request_retry_delay + + @property + def _auth(self) -> HTTPBasicAuth: + raise NotImplementedError() + + @property + @cached_method + def _log(self) -> logging.Logger: + return get_dagster_logger() + + @property + def api_base_url(self) -> str: + raise NotImplementedError() + + @property + def api_connector_url(self) -> str: + raise NotImplementedError() + + def make_connector_request( + self, method: str, endpoint: str, data: Optional[str] = None + ) -> Mapping[str, Any]: + raise NotImplementedError() + + def make_request( + self, method: str, endpoint: str, data: Optional[str] = None + ) -> Mapping[str, Any]: + raise NotImplementedError() + + def get_connector_details(self, connector_id: str) -> Mapping[str, Any]: + """Fetches details about a given connector from the Fivetran API.""" + raise NotImplementedError() + + def get_connectors_for_group(self, group_id: str) -> Mapping[str, Any]: + """Fetches all connectors for a given group from the Fivetran API.""" + raise NotImplementedError() + + def get_destination_details(self, destination_id: str) -> Mapping[str, Any]: + """Fetches details about a given destination from the Fivetran API.""" + raise NotImplementedError() + + def get_groups(self) -> Mapping[str, Any]: + """Fetches all groups from the Fivetran API.""" + raise NotImplementedError() + + +@experimental +class FivetranWorkspace(ConfigurableResource): + """This class represents a Fivetran workspace and provides utilities + to interact with Fivetran APIs. + """ + + api_key: str = Field(description="The Fivetran API key to use for this resource.") + api_secret: str = Field(description="The Fivetran API secret to use for this resource.") + request_max_retries: int = Field( + default=3, + description=( + "The maximum number of times requests to the Fivetran API should be retried " + "before failing." + ), + ) + request_retry_delay: float = Field( + default=0.25, + description="Time (in seconds) to wait between each request retry.", + ) + + _client: FivetranClient = PrivateAttr(default=None) + + def get_client(self) -> FivetranClient: + return FivetranClient( + api_key=self.api_key, + api_secret=self.api_secret, + request_max_retries=self.request_max_retries, + request_retry_delay=self.request_retry_delay, + ) + + def fetch_fivetran_workspace_data( + self, + ) -> FivetranWorkspaceData: + """Retrieves all Fivetran content from the workspace and returns it as a FivetranWorkspaceData object. + Future work will cache this data to avoid repeated calls to the Fivetran API. + + Returns: + FivetranWorkspaceData: A snapshot of the Fivetran workspace's content. + """ + raise NotImplementedError()