Skip to content

Commit

Permalink
[4/n][dagster-tableau] Implement fetch_fivetran_workspace_data
Browse files Browse the repository at this point in the history
  • Loading branch information
maximearmstrong committed Nov 8, 2024
1 parent af34d01 commit 87a063b
Show file tree
Hide file tree
Showing 5 changed files with 269 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@
from requests.auth import HTTPBasicAuth
from requests.exceptions import RequestException

from dagster_fivetran.translator import FivetranWorkspaceData
from dagster_fivetran.translator import (
FivetranContentData,
FivetranContentType,
FivetranWorkspaceData,
)
from dagster_fivetran.types import FivetranOutput
from dagster_fivetran.utils import get_fivetran_connector_url, get_fivetran_logs_url

Expand Down Expand Up @@ -550,6 +554,17 @@ def get_connectors_for_group(self, group_id: str) -> Mapping[str, Any]:
"""
return self._make_request("GET", f"groups/{group_id}/connectors")

def get_schema_config_for_connector(self, connector_id: str) -> Mapping[str, Any]:
"""Fetches the connector schema config for a given connector from the Fivetran API.
Args:
connector_id (str): The Fivetran Connector ID.
Returns:
Dict[str, Any]: Parsed json data from the response to this request.
"""
return self._make_request("GET", f"connectors/{connector_id}/schemas")

def get_destination_details(self, destination_id: str) -> Mapping[str, Any]:
"""Fetches details about a given destination from the Fivetran API.
Expand Down Expand Up @@ -608,4 +623,38 @@ def fetch_fivetran_workspace_data(
Returns:
FivetranWorkspaceData: A snapshot of the Fivetran workspace's content.
"""
raise NotImplementedError()
connectors = []
destinations = []

client = self.get_client()
groups = client.get_groups()["items"]

for group in groups:
group_id = group["id"]

destination_details = client.get_destination_details(destination_id=group_id)
destinations.append(
FivetranContentData(
content_type=FivetranContentType.DESTINATION, properties=destination_details
)
)

connectors_details = client.get_connectors_for_group(group_id=group_id)["items"]
for connector_details in connectors_details:
connector_id = connector_details["id"]

setup_state = connector_details.get("status", {}).get("setup_state")
if setup_state and setup_state in ("incomplete", "broken"):
continue

schema_config = client.get_schema_config_for_connector(connector_id=connector_id)

augmented_connector_details = {**connector_details, "schema_config": schema_config}
connectors.append(
FivetranContentData(
content_type=FivetranContentType.CONNECTOR,
properties=augmented_connector_details,
)
)

return FivetranWorkspaceData.from_content_data(connectors + destinations)
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,18 @@ class FivetranWorkspaceData:
def from_content_data(
cls, content_data: Sequence[FivetranContentData]
) -> "FivetranWorkspaceData":
raise NotImplementedError()
return cls(
connectors_by_id={
connector.properties["id"]: connector
for connector in content_data
if connector.content_type == FivetranContentType.CONNECTOR
},
destinations_by_id={
destination.properties["id"]: destination
for destination in content_data
if destination.content_type == FivetranContentType.DESTINATION
},
)

def to_fivetran_connector_table_props_data(self) -> Sequence[FivetranConnectorTableProps]:
"""Method that converts a `FivetranWorkspaceData` object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@
"code": "Success",
"message": "Operation performed.",
"data": {
"items": [{"id": "group_id", "name": "Group_Name", "created_at": "2024-01-01T00:00:00Z"}],
"items": [
{
"id": "my_group_destination_id",
"name": "Group_Name",
"created_at": "2024-01-01T00:00:00Z",
}
],
"nextCursor": "cursor_value",
},
}
Expand Down Expand Up @@ -54,7 +60,7 @@
"daily_sync_time": "14:00",
"succeeded_at": "2024-12-01T15:43:29.013729Z",
"sync_frequency": 360,
"group_id": "group_id",
"group_id": "my_group_destination_id",
"connected_by": "user_id",
"setup_tests": [
{
Expand Down Expand Up @@ -95,13 +101,13 @@
"code": "Success",
"message": "Operation performed.",
"data": {
"id": "destination_id",
"id": "my_group_destination_id",
"service": "adls",
"region": "GCP_US_EAST4",
"networking_method": "Directly",
"setup_status": "CONNECTED",
"daylight_saving_time_enabled": True,
"group_id": "group_id",
"group_id": "my_group_destination_id",
"time_zone_offset": "+3",
"setup_tests": [
{
Expand Down Expand Up @@ -170,7 +176,7 @@
"daily_sync_time": "14:00",
"succeeded_at": "2024-03-17T12:31:40.870504Z",
"sync_frequency": 1440,
"group_id": "group_id",
"group_id": "my_group_destination_id",
"connected_by": "user_id",
"setup_tests": [
{
Expand Down Expand Up @@ -202,6 +208,169 @@
},
}

SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR = {
"code": "Success",
"message": "Operation performed.",
"data": {
"enable_new_by_default": True,
"schemas": {
"property1": {
"name_in_destination": "schema_name_in_destination",
"enabled": True,
"tables": {
"property1": {
"sync_mode": "SOFT_DELETE",
"name_in_destination": "table_name_in_destination",
"enabled": True,
"columns": {
"property1": {
"name_in_destination": "column_name_in_destination",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
"allowed": False,
"reason": "...",
"reason_code": "SYSTEM_COLUMN",
},
"is_primary_key": True,
},
"property2": {
"name_in_destination": "column_name_in_destination",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
"allowed": False,
"reason": "...",
"reason_code": "SYSTEM_COLUMN",
},
"is_primary_key": True,
},
},
"enabled_patch_settings": {
"allowed": False,
"reason": "...",
"reason_code": "SYSTEM_TABLE",
},
"supports_columns_config": True,
},
"property2": {
"sync_mode": "SOFT_DELETE",
"name_in_destination": "table_name_in_destination",
"enabled": True,
"columns": {
"property1": {
"name_in_destination": "column_name_in_destination",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
"allowed": False,
"reason": "...",
"reason_code": "SYSTEM_COLUMN",
},
"is_primary_key": True,
},
"property2": {
"name_in_destination": "column_name_in_destination",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
"allowed": False,
"reason": "...",
"reason_code": "SYSTEM_COLUMN",
},
"is_primary_key": True,
},
},
"enabled_patch_settings": {
"allowed": False,
"reason": "...",
"reason_code": "SYSTEM_TABLE",
},
"supports_columns_config": True,
},
},
},
"property2": {
"name_in_destination": "schema_name_in_destination",
"enabled": True,
"tables": {
"property1": {
"sync_mode": "SOFT_DELETE",
"name_in_destination": "table_name_in_destination",
"enabled": True,
"columns": {
"property1": {
"name_in_destination": "column_name_in_destination",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
"allowed": False,
"reason": "...",
"reason_code": "SYSTEM_COLUMN",
},
"is_primary_key": True,
},
"property2": {
"name_in_destination": "column_name_in_destination",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
"allowed": False,
"reason": "...",
"reason_code": "SYSTEM_COLUMN",
},
"is_primary_key": True,
},
},
"enabled_patch_settings": {
"allowed": False,
"reason": "...",
"reason_code": "SYSTEM_TABLE",
},
"supports_columns_config": True,
},
"property2": {
"sync_mode": "SOFT_DELETE",
"name_in_destination": "table_name_in_destination",
"enabled": True,
"columns": {
"property1": {
"name_in_destination": "column_name_in_destination",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
"allowed": False,
"reason": "...",
"reason_code": "SYSTEM_COLUMN",
},
"is_primary_key": True,
},
"property2": {
"name_in_destination": "column_name_in_destination",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
"allowed": False,
"reason": "...",
"reason_code": "SYSTEM_COLUMN",
},
"is_primary_key": True,
},
},
"enabled_patch_settings": {
"allowed": False,
"reason": "...",
"reason_code": "SYSTEM_TABLE",
},
"supports_columns_config": True,
},
},
},
},
"schema_change_handling": "ALLOW_ALL",
},
}


@pytest.fixture(name="connector_id")
def connector_id_fixture() -> str:
Expand All @@ -210,12 +379,12 @@ def connector_id_fixture() -> str:

@pytest.fixture(name="destination_id")
def destination_id_fixture() -> str:
return "destination_id"
return "my_group_destination_id"


@pytest.fixture(name="group_id")
def group_id_fixture() -> str:
return "group_id"
return "my_group_destination_id"


@pytest.fixture(
Expand All @@ -225,7 +394,7 @@ def workspace_data_api_mocks_fn_fixture(
connector_id: str, destination_id: str, group_id: str
) -> Callable:
@contextlib.contextmanager
def _method() -> Iterator[responses.RequestsMock]:
def _method(include_sync_endpoints: bool = True) -> Iterator[responses.RequestsMock]:
with responses.RequestsMock() as response:
response.add(
method=responses.GET,
Expand All @@ -250,11 +419,19 @@ def _method() -> Iterator[responses.RequestsMock]:

response.add(
method=responses.GET,
url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}",
json=SAMPLE_CONNECTOR_DETAILS,
url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}/schemas",
json=SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR,
status=200,
)

if include_sync_endpoints:
response.add(
method=responses.GET,
url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}",
json=SAMPLE_CONNECTOR_DETAILS,
status=200,
)

yield response

return _method
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import uuid
from typing import Callable

from dagster_fivetran import FivetranWorkspace


def test_fetch_fivetran_workspace_data(workspace_data_api_mocks_fn: Callable) -> None:
api_key = uuid.uuid4().hex
api_secret = uuid.uuid4().hex

resource = FivetranWorkspace(api_key=api_key, api_secret=api_secret)

with workspace_data_api_mocks_fn(include_sync_endpoints=False):
actual_workspace_data = resource.fetch_fivetran_workspace_data()
assert len(actual_workspace_data.connectors_by_id) == 1
assert len(actual_workspace_data.destinations_by_id) == 1
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ def test_basic_resource_request(
client.get_connectors_for_group(group_id=group_id)
client.get_destination_details(destination_id=destination_id)
client.get_groups()
client.get_schema_config_for_connector(connector_id=connector_id)

assert len(response.calls) == 4
assert len(response.calls) == 5

assert "Basic" in response.calls[0].request.headers["Authorization"]
assert connector_id in response.calls[0].request.url
assert group_id in response.calls[1].request.url
assert destination_id in response.calls[2].request.url
assert "groups" in response.calls[3].request.url
assert f"{connector_id}/schemas" in response.calls[4].request.url

0 comments on commit 87a063b

Please sign in to comment.