diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 455249fbd089c..ef4acdc929d11 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -25,7 +25,11 @@ from requests.auth import HTTPBasicAuth from requests.exceptions import RequestException -from dagster_fivetran.translator import FivetranWorkspaceData +from dagster_fivetran.translator import ( + FivetranContentData, + FivetranContentType, + FivetranWorkspaceData, +) from dagster_fivetran.types import FivetranOutput from dagster_fivetran.utils import get_fivetran_connector_url, get_fivetran_logs_url @@ -550,6 +554,17 @@ def get_connectors_for_group(self, group_id: str) -> Mapping[str, Any]: """ return self._make_request("GET", f"groups/{group_id}/connectors") + def get_schema_config_for_connector(self, connector_id: str) -> Mapping[str, Any]: + """Fetches the connector schema config for a given connector from the Fivetran API. + + Args: + connector_id (str): The Fivetran Connector ID. + + Returns: + Dict[str, Any]: Parsed json data from the response to this request. + """ + return self._make_request("GET", f"connectors/{connector_id}/schemas") + def get_destination_details(self, destination_id: str) -> Mapping[str, Any]: """Fetches details about a given destination from the Fivetran API. @@ -608,4 +623,38 @@ def fetch_fivetran_workspace_data( Returns: FivetranWorkspaceData: A snapshot of the Fivetran workspace's content. """ - raise NotImplementedError() + connectors = [] + destinations = [] + + client = self.get_client() + groups = client.get_groups()["items"] + + for group in groups: + group_id = group["id"] + + destination_details = client.get_destination_details(destination_id=group_id) + destinations.append( + FivetranContentData( + content_type=FivetranContentType.DESTINATION, properties=destination_details + ) + ) + + connectors_details = client.get_connectors_for_group(group_id=group_id)["items"] + for connector_details in connectors_details: + connector_id = connector_details["id"] + + setup_state = connector_details.get("status", {}).get("setup_state") + if setup_state and setup_state in ("incomplete", "broken"): + continue + + schema_config = client.get_schema_config_for_connector(connector_id=connector_id) + + augmented_connector_details = {**connector_details, "schema_config": schema_config} + connectors.append( + FivetranContentData( + content_type=FivetranContentType.CONNECTOR, + properties=augmented_connector_details, + ) + ) + + return FivetranWorkspaceData.from_content_data(connectors + destinations) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py index 4b93bb83fe588..65a23b82096ab 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py @@ -50,7 +50,18 @@ class FivetranWorkspaceData: def from_content_data( cls, content_data: Sequence[FivetranContentData] ) -> "FivetranWorkspaceData": - raise NotImplementedError() + return cls( + connectors_by_id={ + connector.properties["id"]: connector + for connector in content_data + if connector.content_type == FivetranContentType.CONNECTOR + }, + destinations_by_id={ + destination.properties["id"]: destination + for destination in content_data + if destination.content_type == FivetranContentType.DESTINATION + }, + ) def to_fivetran_connector_table_props_data(self) -> Sequence[FivetranConnectorTableProps]: """Method that converts a `FivetranWorkspaceData` object diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py index 54a4184c18a38..1b3ab546c353d 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py @@ -13,7 +13,13 @@ "code": "Success", "message": "Operation performed.", "data": { - "items": [{"id": "group_id", "name": "Group_Name", "created_at": "2024-01-01T00:00:00Z"}], + "items": [ + { + "id": "my_group_destination_id", + "name": "Group_Name", + "created_at": "2024-01-01T00:00:00Z", + } + ], "nextCursor": "cursor_value", }, } @@ -54,7 +60,7 @@ "daily_sync_time": "14:00", "succeeded_at": "2024-12-01T15:43:29.013729Z", "sync_frequency": 360, - "group_id": "group_id", + "group_id": "my_group_destination_id", "connected_by": "user_id", "setup_tests": [ { @@ -95,13 +101,13 @@ "code": "Success", "message": "Operation performed.", "data": { - "id": "destination_id", + "id": "my_group_destination_id", "service": "adls", "region": "GCP_US_EAST4", "networking_method": "Directly", "setup_status": "CONNECTED", "daylight_saving_time_enabled": True, - "group_id": "group_id", + "group_id": "my_group_destination_id", "time_zone_offset": "+3", "setup_tests": [ { @@ -170,7 +176,7 @@ "daily_sync_time": "14:00", "succeeded_at": "2024-03-17T12:31:40.870504Z", "sync_frequency": 1440, - "group_id": "group_id", + "group_id": "my_group_destination_id", "connected_by": "user_id", "setup_tests": [ { @@ -202,6 +208,169 @@ }, } +SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR = { + "code": "Success", + "message": "Operation performed.", + "data": { + "enable_new_by_default": True, + "schemas": { + "property1": { + "name_in_destination": "schema_name_in_destination", + "enabled": True, + "tables": { + "property1": { + "sync_mode": "SOFT_DELETE", + "name_in_destination": "table_name_in_destination", + "enabled": True, + "columns": { + "property1": { + "name_in_destination": "column_name_in_destination", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, + }, + "property2": { + "name_in_destination": "column_name_in_destination", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, + }, + }, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_TABLE", + }, + "supports_columns_config": True, + }, + "property2": { + "sync_mode": "SOFT_DELETE", + "name_in_destination": "table_name_in_destination", + "enabled": True, + "columns": { + "property1": { + "name_in_destination": "column_name_in_destination", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, + }, + "property2": { + "name_in_destination": "column_name_in_destination", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, + }, + }, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_TABLE", + }, + "supports_columns_config": True, + }, + }, + }, + "property2": { + "name_in_destination": "schema_name_in_destination", + "enabled": True, + "tables": { + "property1": { + "sync_mode": "SOFT_DELETE", + "name_in_destination": "table_name_in_destination", + "enabled": True, + "columns": { + "property1": { + "name_in_destination": "column_name_in_destination", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, + }, + "property2": { + "name_in_destination": "column_name_in_destination", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, + }, + }, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_TABLE", + }, + "supports_columns_config": True, + }, + "property2": { + "sync_mode": "SOFT_DELETE", + "name_in_destination": "table_name_in_destination", + "enabled": True, + "columns": { + "property1": { + "name_in_destination": "column_name_in_destination", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, + }, + "property2": { + "name_in_destination": "column_name_in_destination", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, + }, + }, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_TABLE", + }, + "supports_columns_config": True, + }, + }, + }, + }, + "schema_change_handling": "ALLOW_ALL", + }, +} + @pytest.fixture(name="connector_id") def connector_id_fixture() -> str: @@ -210,12 +379,12 @@ def connector_id_fixture() -> str: @pytest.fixture(name="destination_id") def destination_id_fixture() -> str: - return "destination_id" + return "my_group_destination_id" @pytest.fixture(name="group_id") def group_id_fixture() -> str: - return "group_id" + return "my_group_destination_id" @pytest.fixture( @@ -225,7 +394,7 @@ def workspace_data_api_mocks_fn_fixture( connector_id: str, destination_id: str, group_id: str ) -> Callable: @contextlib.contextmanager - def _method() -> Iterator[responses.RequestsMock]: + def _method(include_sync_endpoints: bool = True) -> Iterator[responses.RequestsMock]: with responses.RequestsMock() as response: response.add( method=responses.GET, @@ -250,11 +419,19 @@ def _method() -> Iterator[responses.RequestsMock]: response.add( method=responses.GET, - url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}", - json=SAMPLE_CONNECTOR_DETAILS, + url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}/schemas", + json=SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR, status=200, ) + if include_sync_endpoints: + response.add( + method=responses.GET, + url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}", + json=SAMPLE_CONNECTOR_DETAILS, + status=200, + ) + yield response return _method diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py new file mode 100644 index 0000000000000..56ec77d74a067 --- /dev/null +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py @@ -0,0 +1,16 @@ +import uuid +from typing import Callable + +from dagster_fivetran import FivetranWorkspace + + +def test_fetch_fivetran_workspace_data(workspace_data_api_mocks_fn: Callable) -> None: + api_key = uuid.uuid4().hex + api_secret = uuid.uuid4().hex + + resource = FivetranWorkspace(api_key=api_key, api_secret=api_secret) + + with workspace_data_api_mocks_fn(include_sync_endpoints=False): + actual_workspace_data = resource.fetch_fivetran_workspace_data() + assert len(actual_workspace_data.connectors_by_id) == 1 + assert len(actual_workspace_data.destinations_by_id) == 1 diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py index 8b198669e9173..9d79a23dcfa19 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py @@ -18,11 +18,13 @@ def test_basic_resource_request( client.get_connectors_for_group(group_id=group_id) client.get_destination_details(destination_id=destination_id) client.get_groups() + client.get_schema_config_for_connector(connector_id=connector_id) - assert len(response.calls) == 4 + assert len(response.calls) == 5 assert "Basic" in response.calls[0].request.headers["Authorization"] assert connector_id in response.calls[0].request.url assert group_id in response.calls[1].request.url assert destination_id in response.calls[2].request.url assert "groups" in response.calls[3].request.url + assert f"{connector_id}/schemas" in response.calls[4].request.url