diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 413cb6d70747f..5366c4bfe900f 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -1,9 +1,10 @@ -import datetime import json import logging import os import time -from typing import Any, Mapping, Optional, Sequence, Tuple, Type +from datetime import datetime, timedelta +from functools import partial +from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Type from urllib.parse import urljoin import requests @@ -32,6 +33,7 @@ from dagster_fivetran.translator import ( DagsterFivetranTranslator, FivetranConnector, + FivetranConnectorScheduleType, FivetranDestination, FivetranSchemaConfig, FivetranWorkspaceData, @@ -169,7 +171,7 @@ def _assert_syncable_connector(self, connector_id: str): if connector_details["status"]["setup_state"] != "connected": raise Failure(f"Connector '{connector_id}' cannot be synced as it has not been setup") - def get_connector_sync_status(self, connector_id: str) -> Tuple[datetime.datetime, bool, str]: + def get_connector_sync_status(self, connector_id: str) -> Tuple[datetime, bool, str]: """Gets details about the status of the most recent Fivetran sync operation for a given connector. @@ -294,7 +296,7 @@ def start_resync( def poll_sync( self, connector_id: str, - initial_last_sync_completion: datetime.datetime, + initial_last_sync_completion: datetime, poll_interval: float = DEFAULT_POLL_INTERVAL, poll_timeout: Optional[float] = None, ) -> Mapping[str, Any]: @@ -316,7 +318,7 @@ def poll_sync( Returns: Dict[str, Any]: Parsed json data representing the API response. """ - poll_start = datetime.datetime.now() + poll_start = datetime.now() while True: ( curr_last_sync_completion, @@ -328,12 +330,10 @@ def poll_sync( if curr_last_sync_completion > initial_last_sync_completion: break - if poll_timeout and datetime.datetime.now() > poll_start + datetime.timedelta( - seconds=poll_timeout - ): + if poll_timeout and datetime.now() > poll_start + timedelta(seconds=poll_timeout): raise Failure( f"Sync for connector '{connector_id}' timed out after " - f"{datetime.datetime.now() - poll_start}." + f"{datetime.now() - poll_start}." ) # Sleep for the configured time interval before polling again. @@ -469,11 +469,13 @@ def __init__( api_secret: str, request_max_retries: int, request_retry_delay: float, + disable_schedule_on_trigger: bool, ): self.api_key = api_key self.api_secret = api_secret self.request_max_retries = request_max_retries self.request_retry_delay = request_retry_delay + self.disable_schedule_on_trigger = disable_schedule_on_trigger @property def _auth(self) -> HTTPBasicAuth: @@ -592,6 +594,57 @@ def get_groups(self) -> Mapping[str, Any]: """ return self._make_request("GET", "groups") + def update_schedule_type_for_connector( + self, connector_id: str, schedule_type: str + ) -> Mapping[str, Any]: + """Updates the schedule type property of the connector to either "auto" or "manual". + + Args: + connector_id (str): The Fivetran Connector ID. You can retrieve this value from the + "Setup" tab of a given connector in the Fivetran UI. + schedule_type (str): Either "auto" (to turn the schedule on) or "manual" (to + turn it off). + + Returns: + Dict[str, Any]: Parsed json data representing the API response. + """ + schedule_types = {s for s in FivetranConnectorScheduleType} + if schedule_type not in schedule_types: + check.failed( + f"The schedule_type for connector {connector_id} must be in {schedule_types}: " + f"got '{schedule_type}'" + ) + return self._make_connector_request( + method="PATCH", endpoint=connector_id, data=json.dumps({"schedule_type": schedule_type}) + ) + + def start_sync(self, connector_id: str) -> None: + """Initiates a sync of a Fivetran connector. + + Args: + connector_id (str): The Fivetran Connector ID. You can retrieve this value from the + "Setup" tab of a given connector in the Fivetran UI. + + """ + request_fn = partial( + self._make_connector_request, method="POST", endpoint=f"{connector_id}/force" + ) + self._start_sync(request_fn=request_fn, connector_id=connector_id) + + def _start_sync(self, request_fn: Callable[[], Mapping[str, Any]], connector_id: str) -> None: + connector = FivetranConnector.from_connector_details( + connector_details=self.get_connector_details(connector_id) + ) + connector.validate_syncable() + if self.disable_schedule_on_trigger: + self._log.info(f"Disabling Fivetran sync schedule for connector {connector_id}.") + self.update_schedule_type_for_connector(connector_id, "manual") + request_fn() + self._log.info( + f"Sync initialized for connector {connector_id}. View this sync in the Fivetran" + " UI: " + connector.url + ) + @experimental class FivetranWorkspace(ConfigurableResource): @@ -613,6 +666,13 @@ class FivetranWorkspace(ConfigurableResource): default=0.25, description="Time (in seconds) to wait between each request retry.", ) + disable_schedule_on_trigger: bool = Field( + default=True, + description=( + "Whether to disable the schedule of a connector when it is synchronized using this resource." + "Defaults to True." + ), + ) _client: FivetranClient = PrivateAttr(default=None) @@ -622,6 +682,7 @@ def get_client(self) -> FivetranClient: api_secret=self.api_secret, request_max_retries=self.request_max_retries, request_retry_delay=self.request_retry_delay, + disable_schedule_on_trigger=self.disable_schedule_on_trigger, ) def fetch_fivetran_workspace_data( diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py index 554ae463fd768..253113051c7f5 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py @@ -1,6 +1,7 @@ from enum import Enum from typing import Any, List, Mapping, NamedTuple, Optional, Sequence +from dagster import Failure from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.asset_spec import AssetSpec from dagster._record import as_dict, record @@ -20,6 +21,13 @@ class FivetranConnectorTableProps(NamedTuple): service: Optional[str] +class FivetranConnectorScheduleType(str, Enum): + """Enum representing each schedule type for a connector in Fivetran's ontology.""" + + AUTO = "auto" + MANUAL = "manual" + + class FivetranConnectorSetupStateType(Enum): """Enum representing each setup state for a connector in Fivetran's ontology.""" @@ -38,6 +46,7 @@ class FivetranConnector: service: str group_id: str setup_state: str + paused: bool @property def url(self) -> str: @@ -51,6 +60,20 @@ def destination_id(self) -> str: def is_connected(self) -> bool: return self.setup_state == FivetranConnectorSetupStateType.CONNECTED.value + @property + def is_paused(self) -> bool: + return self.paused + + def validate_syncable(self) -> bool: + """Confirms that the connector can be sync. Will raise a Failure in the event that + the connector is either paused or not fully set up. + """ + if self.is_paused: + raise Failure(f"Connector '{self.id}' cannot be synced as it is currently paused.") + if not self.is_connected: + raise Failure(f"Connector '{self.id}' cannot be synced as it has not been setup") + return True + @classmethod def from_connector_details( cls, @@ -62,6 +85,7 @@ def from_connector_details( service=connector_details["service"], group_id=connector_details["group_id"], setup_state=connector_details["status"]["setup_state"], + paused=connector_details["paused"], ) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py index 82b44013d6a67..6e5a35593627b 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py @@ -380,6 +380,8 @@ }, } +SAMPLE_SUCCESS_MESSAGE = {"code": "Success", "message": "Operation performed."} + TEST_ACCOUNT_ID = "test_account_id" TEST_API_KEY = "test_api_key" TEST_API_SECRET = "test_api_secret" @@ -453,4 +455,16 @@ def all_api_mocks_fixture( json=SAMPLE_CONNECTOR_DETAILS, status=200, ) + fetch_workspace_data_api_mocks.add( + method=responses.PATCH, + url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}", + json=SAMPLE_CONNECTOR_DETAILS, + status=200, + ) + fetch_workspace_data_api_mocks.add( + method=responses.POST, + url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}/force", + json=SAMPLE_SUCCESS_MESSAGE, + status=200, + ) yield fetch_workspace_data_api_mocks diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py index 4a7b9db5298c2..882bcf2861222 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py @@ -17,19 +17,33 @@ def test_basic_resource_request( resource = FivetranWorkspace( account_id=TEST_ACCOUNT_ID, api_key=TEST_API_KEY, api_secret=TEST_API_SECRET ) - client = resource.get_client() - client.get_connector_details(connector_id=connector_id) + + # fetch workspace data calls client.get_connectors_for_group(group_id=group_id) client.get_destination_details(destination_id=destination_id) client.get_groups() client.get_schema_config_for_connector(connector_id=connector_id) - assert len(all_api_mocks.calls) == 5 - + assert len(all_api_mocks.calls) == 4 assert "Basic" in all_api_mocks.calls[0].request.headers["Authorization"] + assert group_id in all_api_mocks.calls[0].request.url + assert destination_id in all_api_mocks.calls[1].request.url + assert "groups" in all_api_mocks.calls[2].request.url + assert f"{connector_id}/schemas" in all_api_mocks.calls[3].request.url + + # connector details calls + all_api_mocks.calls.reset() + client.get_connector_details(connector_id=connector_id) + client.update_schedule_type_for_connector(connector_id=connector_id, schedule_type="auto") + + assert len(all_api_mocks.calls) == 2 assert connector_id in all_api_mocks.calls[0].request.url - assert group_id in all_api_mocks.calls[1].request.url - assert destination_id in all_api_mocks.calls[2].request.url - assert "groups" in all_api_mocks.calls[3].request.url - assert f"{connector_id}/schemas" in all_api_mocks.calls[4].request.url + assert connector_id in all_api_mocks.calls[1].request.url + assert all_api_mocks.calls[1].request.method == "PATCH" + + # sync calls + all_api_mocks.calls.reset() + client.start_sync(connector_id=connector_id) + assert len(all_api_mocks.calls) == 3 + assert f"{connector_id}/force" in all_api_mocks.calls[2].request.url