Skip to content

Commit

Permalink
[dagster-fivetran] Implement FivetranClient for rework
Browse files Browse the repository at this point in the history
  • Loading branch information
maximearmstrong committed Nov 5, 2024
1 parent 7c954b2 commit da802b0
Show file tree
Hide file tree
Showing 4 changed files with 392 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -1,16 +1,26 @@
import logging
from typing import Any, Mapping, Optional
from urllib.parse import urljoin
import time
import os

from dagster import get_dagster_logger
import requests
from dagster import get_dagster_logger, __version__, Failure
from dagster._annotations import experimental
from dagster._config.pythonic_config import ConfigurableResource
from dagster._utils.cached_method import cached_method
from pydantic import Field, PrivateAttr
from requests.auth import HTTPBasicAuth
from requests.exceptions import RequestException

from dagster_fivetran.v2.translator import FivetranWorkspaceData


FIVETRAN_API_BASE = "https://api.fivetran.com"
FIVETRAN_API_VERSION = "v1"
FIVETRAN_CONNECTOR_ENDPOINT = "connectors"


@experimental
class FivetranClient:
"""This class exposes methods on top of the Fivetran REST API."""
Expand All @@ -29,7 +39,7 @@ def __init__(

@property
def _auth(self) -> HTTPBasicAuth:
raise NotImplementedError()
return HTTPBasicAuth(self.api_key, self.api_secret)

@property
@cached_method
Expand All @@ -38,37 +48,100 @@ def _log(self) -> logging.Logger:

@property
def api_base_url(self) -> str:
raise NotImplementedError()
return f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}"

@property
def api_connector_url(self) -> str:
raise NotImplementedError()
return f"{self.api_base_url}/{FIVETRAN_CONNECTOR_ENDPOINT}"

def make_connector_request(
def _make_connector_request(
self, method: str, endpoint: str, data: Optional[str] = None
) -> Mapping[str, Any]:
raise NotImplementedError()
return self._make_request(method, f"{FIVETRAN_CONNECTOR_ENDPOINT}/{endpoint}", data)

def make_request(
def _make_request(
self, method: str, endpoint: str, data: Optional[str] = None
) -> Mapping[str, Any]:
raise NotImplementedError()
"""Creates and sends a request to the desired Fivetran API endpoint.
Args:
method (str): The http method to use for this request (e.g. "POST", "GET", "PATCH").
endpoint (str): The Fivetran API endpoint to send this request to.
data (Optional[str]): JSON-formatted data string to be included in the request.
Returns:
Dict[str, Any]: Parsed json data from the response to this request.
"""
url = f"{self.api_base_url}/{endpoint}"
headers = {
"User-Agent": f"dagster-fivetran/{__version__}",
"Content-Type": "application/json;version=2",
}

num_retries = 0
while True:
try:
response = requests.request(
method=method,
url=url,
headers=headers,
auth=self._auth,
data=data,
timeout=int(os.getenv("DAGSTER_FIVETRAN_API_REQUEST_TIMEOUT", "60")),
)
response.raise_for_status()
resp_dict = response.json()
return resp_dict["data"] if "data" in resp_dict else resp_dict
except RequestException as e:
self._log.error("Request to Fivetran API failed: %s", e)
if num_retries == self.request_max_retries:
break
num_retries += 1
time.sleep(self.request_retry_delay)

raise Failure(f"Max retries ({self.request_max_retries}) exceeded with url: {url}.")

def get_connector_details(self, connector_id: str) -> Mapping[str, Any]:
"""Fetches details about a given connector from the Fivetran API."""
raise NotImplementedError()
"""Gets details about a given connector from the Fivetran API.
Args:
connector_id (str): The Fivetran Connector ID. You can retrieve this value from the
"Setup" tab of a given connector in the Fivetran UI.
Returns:
Dict[str, Any]: Parsed json data from the response to this request.
"""
return self._make_connector_request(method="GET", endpoint=connector_id)

def get_connectors_for_group(self, group_id: str) -> Mapping[str, Any]:
"""Fetches all connectors for a given group from the Fivetran API."""
raise NotImplementedError()
"""Fetches all connectors for a given group from the Fivetran API.
Args:
group_id (str): The Fivetran Group ID.
Returns:
Dict[str, Any]: Parsed json data from the response to this request.
"""
return self._make_request("GET", f"groups/{group_id}/connectors")

def get_destination_details(self, destination_id: str) -> Mapping[str, Any]:
"""Fetches details about a given destination from the Fivetran API."""
raise NotImplementedError()
"""Fetches details about a given destination from the Fivetran API.
Args:
destination_id (str): The Fivetran Destination ID.
Returns:
Dict[str, Any]: Parsed json data from the response to this request.
"""
return self._make_request("GET", f"destinations/{destination_id}")

def get_groups(self) -> Mapping[str, Any]:
"""Fetches all groups from the Fivetran API."""
raise NotImplementedError()
"""Fetches all groups from the Fivetran API.
Returns:
Dict[str, Any]: Parsed json data from the response to this request.
"""
return self._make_request("GET", f"groups")


class FivetranWorkspace(ConfigurableResource):
Expand Down
Empty file.
Loading

0 comments on commit da802b0

Please sign in to comment.