diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/v2/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/v2/resources.py index 744d6ee39858e..4a5a54d892bba 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/v2/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/v2/resources.py @@ -1,16 +1,26 @@ import logging from typing import Any, Mapping, Optional +from urllib.parse import urljoin +import time +import os -from dagster import get_dagster_logger +import requests +from dagster import get_dagster_logger, __version__, Failure from dagster._annotations import experimental from dagster._config.pythonic_config import ConfigurableResource from dagster._utils.cached_method import cached_method from pydantic import Field, PrivateAttr from requests.auth import HTTPBasicAuth +from requests.exceptions import RequestException from dagster_fivetran.v2.translator import FivetranWorkspaceData +FIVETRAN_API_BASE = "https://api.fivetran.com" +FIVETRAN_API_VERSION = "v1" +FIVETRAN_CONNECTOR_ENDPOINT = "connectors" + + @experimental class FivetranClient: """This class exposes methods on top of the Fivetran REST API.""" @@ -29,7 +39,7 @@ def __init__( @property def _auth(self) -> HTTPBasicAuth: - raise NotImplementedError() + return HTTPBasicAuth(self.api_key, self.api_secret) @property @cached_method @@ -38,37 +48,100 @@ def _log(self) -> logging.Logger: @property def api_base_url(self) -> str: - raise NotImplementedError() + return f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}" @property def api_connector_url(self) -> str: - raise NotImplementedError() + return f"{self.api_base_url}/{FIVETRAN_CONNECTOR_ENDPOINT}" - def make_connector_request( + def _make_connector_request( self, method: str, endpoint: str, data: Optional[str] = None ) -> Mapping[str, Any]: - raise NotImplementedError() + return self._make_request(method, f"{FIVETRAN_CONNECTOR_ENDPOINT}/{endpoint}", data) - def make_request( + def _make_request( self, method: str, endpoint: str, data: Optional[str] = None ) -> Mapping[str, Any]: - raise NotImplementedError() + """Creates and sends a request to the desired Fivetran API endpoint. + + Args: + method (str): The http method to use for this request (e.g. "POST", "GET", "PATCH"). + endpoint (str): The Fivetran API endpoint to send this request to. + data (Optional[str]): JSON-formatted data string to be included in the request. + + Returns: + Dict[str, Any]: Parsed json data from the response to this request. + """ + url = f"{self.api_base_url}/{endpoint}" + headers = { + "User-Agent": f"dagster-fivetran/{__version__}", + "Content-Type": "application/json;version=2", + } + + num_retries = 0 + while True: + try: + response = requests.request( + method=method, + url=url, + headers=headers, + auth=self._auth, + data=data, + timeout=int(os.getenv("DAGSTER_FIVETRAN_API_REQUEST_TIMEOUT", "60")), + ) + response.raise_for_status() + resp_dict = response.json() + return resp_dict["data"] if "data" in resp_dict else resp_dict + except RequestException as e: + self._log.error("Request to Fivetran API failed: %s", e) + if num_retries == self.request_max_retries: + break + num_retries += 1 + time.sleep(self.request_retry_delay) + + raise Failure(f"Max retries ({self.request_max_retries}) exceeded with url: {url}.") def get_connector_details(self, connector_id: str) -> Mapping[str, Any]: - """Fetches details about a given connector from the Fivetran API.""" - raise NotImplementedError() + """Gets details about a given connector from the Fivetran API. + + Args: + connector_id (str): The Fivetran Connector ID. You can retrieve this value from the + "Setup" tab of a given connector in the Fivetran UI. + + Returns: + Dict[str, Any]: Parsed json data from the response to this request. + """ + return self._make_connector_request(method="GET", endpoint=connector_id) def get_connectors_for_group(self, group_id: str) -> Mapping[str, Any]: - """Fetches all connectors for a given group from the Fivetran API.""" - raise NotImplementedError() + """Fetches all connectors for a given group from the Fivetran API. + + Args: + group_id (str): The Fivetran Group ID. + + Returns: + Dict[str, Any]: Parsed json data from the response to this request. + """ + return self._make_request("GET", f"groups/{group_id}/connectors") def get_destination_details(self, destination_id: str) -> Mapping[str, Any]: - """Fetches details about a given destination from the Fivetran API.""" - raise NotImplementedError() + """Fetches details about a given destination from the Fivetran API. + + Args: + destination_id (str): The Fivetran Destination ID. + + Returns: + Dict[str, Any]: Parsed json data from the response to this request. + """ + return self._make_request("GET", f"destinations/{destination_id}") def get_groups(self) -> Mapping[str, Any]: - """Fetches all groups from the Fivetran API.""" - raise NotImplementedError() + """Fetches all groups from the Fivetran API. + + Returns: + Dict[str, Any]: Parsed json data from the response to this request. + """ + return self._make_request("GET", f"groups") class FivetranWorkspace(ConfigurableResource): diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/v2/__init__.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/v2/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/v2/conftest.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/v2/conftest.py new file mode 100644 index 0000000000000..7df08a674b5b1 --- /dev/null +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/v2/conftest.py @@ -0,0 +1,269 @@ +from typing import Iterator, Callable +import contextlib + +import pytest + +import responses + +from dagster_fivetran.v2.resources import FIVETRAN_API_BASE, FIVETRAN_API_VERSION, FIVETRAN_CONNECTOR_ENDPOINT + +SAMPLE_GROUPS = { + "code": "Success", + "message": "Operation performed.", + "data": { + "items": [ + { + "id": "group_id", + "name": "Group_Name", + "created_at": "2024-01-01T00:00:00Z" + } + ], + "nextCursor": "cursor_value" + } +} + +SAMPLE_CONNECTORS_FOR_GROUP = { + "code": "Success", + "message": "Operation performed.", + "data": { + "items": [ + { + "id": "connector_id", + "service": "adls", + "schema": "gsheets.table", + "paused": False, + "status": { + "tasks": [ + { + "code": "resync_table_warning", + "message": "Resync Table Warning", + "details": "string" + } + ], + "warnings": [ + { + "code": "resync_table_warning", + "message": "Resync Table Warning", + "details": "string" + } + ], + "schema_status": "ready", + "update_state": "delayed", + "setup_state": "connected", + "sync_state": "scheduled", + "is_historical_sync": False, + "rescheduled_for": "2024-12-01T15:43:29.013729Z" + }, + "config": { + "property1": {}, + "property2": {} + }, + "daily_sync_time": "14:00", + "succeeded_at": "2024-12-01T15:43:29.013729Z", + "sync_frequency": 360, + "group_id": "group_id", + "connected_by": "user_id", + "setup_tests": [ + { + "title": "Test Title", + "status": "PASSED", + "message": "Test Passed", + "details": "Test Details" + } + ], + "source_sync_details": {}, + "service_version": 0, + "created_at": "2024-12-01T15:43:29.013729Z", + "failed_at": "2024-12-01T15:43:29.013729Z", + "private_link_id": "string", + "proxy_agent_id": "string", + "networking_method": "Directly", + "connect_card": { + "token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJkIjp7ImxvZ2luIjp0cnVlLCJ1c2VyIjoiX2FjY291bnR3b3J0aHkiLCJhY2NvdW50IjoiX21vb25iZWFtX2FjYyIsImdyb3VwIjoiX21vb25iZWFtIiwiY29ubmVjdG9yIjoiY29iYWx0X2VsZXZhdGlvbiIsIm1ldGhvZCI6IlBiZkNhcmQiLCJpZGVudGl0eSI6ZmFsc2V9LCJpYXQiOjE2Njc4MzA2MzZ9.YUMGUbzxW96xsKJLo4bTorqzx8Q19GTrUi3WFRFM8BU", + "uri": "https://fivetran.com/connect-card/setup?auth=eyJ0eXAiOiJKV1QiLCJh..." + }, + "pause_after_trial": False, + "data_delay_threshold": 0, + "data_delay_sensitivity": "LOW", + "schedule_type": "auto", + "local_processing_agent_id": "string", + "connect_card_config": { + "redirect_uri": "https://your.site/path", + "hide_setup_guide": True + }, + "hybrid_deployment_agent_id": "string" + } + ], + "nextCursor": "cursor_value" + } +} + +SAMPLE_DESTINATION_DETAILS = { + "code": "Success", + "message": "Operation performed.", + "data": { + "id": "destination_id", + "service": "adls", + "region": "GCP_US_EAST4", + "networking_method": "Directly", + "setup_status": "CONNECTED", + "daylight_saving_time_enabled": True, + "group_id": "group_id", + "time_zone_offset": "+3", + "setup_tests": [ + { + "title": "Test Title", + "status": "PASSED", + "message": "Test Passed", + "details": "Test Details" + } + ], + "local_processing_agent_id": "local_processing_agent_id", + "private_link_id": "private_link_id", + "hybrid_deployment_agent_id": "hybrid_deployment_agent_id", + "config": { + "tenant_id": "service_principal_tenant_id", + "auth_type": "PERSONAL_ACCESS_TOKEN | OAUTH2", + "storage_account_name": "adls_storage_account_name", + "connection_type": "Directly | PrivateLink | SshTunnel | ProxyAgent", + "catalog": "string", + "should_maintain_tables_in_databricks": True, + "http_path": "string", + "oauth2_secret": "string", + "snapshot_retention_period": "RETAIN_ALL_SNAPSHOTS | ONE_WEEK | TWO_WEEKS | FOUR_WEEKS | SIX_WEEKS", + "server_host_name": "string", + "client_id": "service_principal_client_id", + "prefix_path": "adls_container_path_prefix", + "container_name": "adls_container_name", + "port": 0, + "databricks_connection_type": "Directly | PrivateLink | SshTunnel | ProxyAgent", + "secret_value": "service_principal_secret_value", + "oauth2_client_id": "string", + "personal_access_token": "string" + } + } +} + +SAMPLE_CONNECTOR_DETAILS = { + "code": "Success", + "message": "Operation performed.", + "data": { + "id": "connector_id", + "service": "15five", + "schema": "schema.table", + "paused": False, + "status": { + "tasks": [ + { + "code": "resync_table_warning", + "message": "Resync Table Warning", + "details": "string" + } + ], + "warnings": [ + { + "code": "resync_table_warning", + "message": "Resync Table Warning", + "details": "string" + } + ], + "schema_status": "ready", + "update_state": "delayed", + "setup_state": "connected", + "sync_state": "scheduled", + "is_historical_sync": False, + "rescheduled_for": "2024-12-01T15:43:29.013729Z" + }, + "daily_sync_time": "14:00", + "succeeded_at": "2024-03-17T12:31:40.870504Z", + "sync_frequency": 1440, + "group_id": "group_id", + "connected_by": "user_id", + "setup_tests": [ + { + "title": "Test Title", + "status": "PASSED", + "message": "Test Passed", + "details": "Test Details" + } + ], + "source_sync_details": {}, + "service_version": 0, + "created_at": "2023-12-01T15:43:29.013729Z", + "failed_at": "2024-04-01T18:13:25.043659Z", + "private_link_id": "private_link_id", + "proxy_agent_id": "proxy_agent_id", + "networking_method": "Directly", + "connect_card": { + "token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJkIjp7ImxvZ2luIjp0cnVlLCJ1c2VyIjoiX2FjY291bnR3b3J0aHkiLCJhY2NvdW50IjoiX21vb25iZWFtX2FjYyIsImdyb3VwIjoiX21vb25iZWFtIiwiY29ubmVjdG9yIjoiY29iYWx0X2VsZXZhdGlvbiIsIm1ldGhvZCI6IlBiZkNhcmQiLCJpZGVudGl0eSI6ZmFsc2V9LCJpYXQiOjE2Njc4MzA2MzZ9.YUMGUbzxW96xsKJLo4bTorqzx8Q19GTrUi3WFRFM8BU", + "uri": "https://fivetran.com/connect-card/setup?auth=eyJ0eXAiOiJKV1QiLCJh..." + }, + "pause_after_trial": False, + "data_delay_threshold": 0, + "data_delay_sensitivity": "NORMAL", + "schedule_type": "auto", + "local_processing_agent_id": "local_processing_agent_id", + "connect_card_config": { + "redirect_uri": "https://your.site/path", + "hide_setup_guide": True + }, + "hybrid_deployment_agent_id": "hybrid_deployment_agent_id", + "config": { + "api_key": "your_15five_api_key" + } + } +} + +@pytest.fixture(name="connector_id") +def connector_id_fixture() -> str: + return "connector_id" + +@pytest.fixture(name="destination_id") +def destination_id_fixture() -> str: + return "destination_id" + +@pytest.fixture(name="group_id") +def group_id_fixture() -> str: + return "group_id" + + +@pytest.fixture( + name="workspace_data_api_mocks_fn", +) +def workspace_data_api_mocks_fn_fixture( + connector_id: str, destination_id: str, group_id: str +) -> Callable: + @contextlib.contextmanager + def _method() -> Iterator[responses.RequestsMock]: + with responses.RequestsMock() as response: + response.add( + method=responses.GET, + url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/groups", + json=SAMPLE_GROUPS, + status=200, + ) + + response.add( + method=responses.GET, + url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/groups/{group_id}/connectors", + json=SAMPLE_CONNECTORS_FOR_GROUP, + status=200, + ) + + response.add( + method=responses.GET, + url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/destinations/{destination_id}", + json=SAMPLE_DESTINATION_DETAILS, + status=200, + ) + + response.add( + method=responses.GET, + url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}", + json=SAMPLE_CONNECTOR_DETAILS, + status=200, + ) + + yield response + + return _method \ No newline at end of file diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/v2/test_resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/v2/test_resources.py new file mode 100644 index 0000000000000..86a5d1450f250 --- /dev/null +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/v2/test_resources.py @@ -0,0 +1,34 @@ +import responses +import uuid +from typing import Callable + +from dagster_fivetran.v2 import FivetranWorkspace + + +@responses.activate +def test_basic_resource_request( + connector_id: str, + destination_id: str, + group_id: str, + workspace_data_api_mocks_fn: Callable +) -> None: + api_key = uuid.uuid4().hex + api_secret = uuid.uuid4().hex + + resource = FivetranWorkspace(api_key=api_key, api_secret=api_secret) + + with workspace_data_api_mocks_fn() as response: + + client = resource.get_client() + client.get_connector_details(connector_id=connector_id) + client.get_connectors_for_group(group_id=group_id) + client.get_destination_details(destination_id=destination_id) + client.get_groups() + + assert len(response.calls) == 4 + + assert "Basic" in response.calls[0].request.headers["Authorization"] + assert connector_id in response.calls[0].request.url + assert group_id in response.calls[1].request.url + assert destination_id in response.calls[2].request.url + assert "groups" in response.calls[3].request.url