diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 847c34d79689d..fa86acbffa20e 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -668,6 +668,61 @@ def _start_sync(self, request_fn: Callable[[], Mapping[str, Any]], connector_id: " UI: " + connector.url ) + def poll_sync( + self, + connector_id: str, + previous_sync_completed_at: datetime, + poll_interval: float = DEFAULT_POLL_INTERVAL, + poll_timeout: Optional[float] = None, + ) -> Mapping[str, Any]: + """Given a Fivetran connector and the timestamp at which the previous sync completed, poll + until the next sync completes. + + The previous sync completion time is necessary because the only way to tell when a sync + completes is when this value changes. + + Args: + connector_id (str): The Fivetran Connector ID. You can retrieve this value from the + "Setup" tab of a given connector in the Fivetran UI. + previous_sync_completed_at (datetime.datetime): The datetime of the previous completed sync + (successful or otherwise) for this connector, prior to running this method. + poll_interval (float): The time (in seconds) that will be waited between successive polls. + poll_timeout (float): The maximum time that will wait before this operation is timed + out. By default, this will never time out. + + Returns: + Dict[str, Any]: Parsed json data representing the API response. + """ + poll_start = datetime.now() + while True: + connector = FivetranConnector.from_connector_details( + connector_details=self.get_connector_details(connector_id) + ) + self._log.info(f"Polled '{connector_id}'. Status: [{connector.sync_state}]") + + if connector.last_sync_completed_at > previous_sync_completed_at: + break + + if poll_timeout and datetime.now() > poll_start + timedelta(seconds=poll_timeout): + raise Failure( + f"Sync for connector '{connector_id}' timed out after " + f"{datetime.now() - poll_start}." + ) + + # Sleep for the configured time interval before polling again. + time.sleep(poll_interval) + + post_raw_connector_details = self.get_connector_details(connector_id) + if not connector.is_last_sync_successful: + raise Failure( + f"Sync for connector '{connector_id}' failed!", + metadata={ + "connector_details": MetadataValue.json(post_raw_connector_details), + "log_url": MetadataValue.url(connector.url), + }, + ) + return post_raw_connector_details + @experimental class FivetranWorkspace(ConfigurableResource): diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py index 5011c1a09a073..50acb8e51868c 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py @@ -1,3 +1,4 @@ +from datetime import datetime from enum import Enum from typing import Any, List, Mapping, NamedTuple, Optional, Sequence @@ -7,6 +8,7 @@ from dagster._record import as_dict, record from dagster._serdes.serdes import whitelist_for_serdes from dagster._utils.cached_method import cached_method +from dagster._vendored.dateutil import parser from dagster_fivetran.utils import get_fivetran_connector_table_name, metadata_for_table @@ -48,7 +50,10 @@ class FivetranConnector: service: str group_id: str setup_state: str + sync_state: str paused: bool + succeeded_at: Optional[str] + failed_at: Optional[str] @property def url(self) -> str: @@ -66,6 +71,32 @@ def is_connected(self) -> bool: def is_paused(self) -> bool: return self.paused + @property + def last_sync_completed_at(self) -> datetime: + """Gets the datetime of the last completed sync of the Fivetran connector. + + Returns: + datetime.datetime: + The datetime of the last completed sync of the Fivetran connector. + """ + succeeded_at = parser.parse(self.succeeded_at or MIN_TIME_STR) + failed_at = parser.parse(self.failed_at or MIN_TIME_STR) + + return max(succeeded_at, failed_at) + + @property + def is_last_sync_successful(self) -> bool: + """Gets a boolean representing whether the last completed sync of the Fivetran connector was successful or not. + + Returns: + bool: + Whether the last completed sync of the Fivetran connector was successful or not. + """ + succeeded_at = parser.parse(self.succeeded_at or MIN_TIME_STR) + failed_at = parser.parse(self.failed_at or MIN_TIME_STR) + + return succeeded_at > failed_at + def assert_syncable(self) -> bool: """Confirms that the connector can be sync. Will raise a Failure in the event that the connector is either paused or not fully set up. @@ -87,7 +118,10 @@ def from_connector_details( service=connector_details["service"], group_id=connector_details["group_id"], setup_state=connector_details["status"]["setup_state"], + sync_state=connector_details["status"]["sync_state"], paused=connector_details["paused"], + succeeded_at=connector_details.get("succeeded_at"), + failed_at=connector_details.get("failed_at"), ) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py index 267b536263d13..2c72c311c6efc 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py @@ -61,7 +61,7 @@ }, "config": {"property1": {}, "property2": {}}, "daily_sync_time": "14:00", - "succeeded_at": "2024-12-01T15:43:29.013729Z", + "succeeded_at": "2024-12-01T15:45:29.013729Z", "sync_frequency": 360, "group_id": "my_group_destination_id", "connected_by": "user_id", @@ -75,7 +75,7 @@ ], "source_sync_details": {}, "service_version": 0, - "created_at": "2024-12-01T15:43:29.013729Z", + "created_at": "2024-12-01T15:41:29.013729Z", "failed_at": "2024-12-01T15:43:29.013729Z", "private_link_id": "string", "proxy_agent_id": "string", @@ -181,7 +181,7 @@ "rescheduled_for": "2024-12-01T15:43:29.013729Z", }, "daily_sync_time": "14:00", - "succeeded_at": "2024-03-17T12:31:40.870504Z", + "succeeded_at": "2024-12-01T15:45:29.013729Z", "sync_frequency": 1440, "group_id": "my_group_destination_id", "connected_by": "user_id", @@ -195,8 +195,8 @@ ], "source_sync_details": {}, "service_version": 0, - "created_at": "2023-12-01T15:43:29.013729Z", - "failed_at": "2024-04-01T18:13:25.043659Z", + "created_at": "2024-12-01T15:41:29.013729Z", + "failed_at": "2024-12-01T15:43:29.013729Z", "private_link_id": "private_link_id", "proxy_agent_id": "proxy_agent_id", "networking_method": "Directly", diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py index 43b1bd744918e..ff77c6e46d0de 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py @@ -1,5 +1,7 @@ import responses +from dagster._vendored.dateutil import parser from dagster_fivetran import FivetranWorkspace +from dagster_fivetran.translator import MIN_TIME_STR from dagster_fivetran_tests.experimental.conftest import ( TEST_ACCOUNT_ID, @@ -59,3 +61,10 @@ def test_basic_resource_request( client.start_resync(connector_id=connector_id, resync_parameters={"property1": ["string"]}) assert len(all_api_mocks.calls) == 3 assert f"{connector_id}/schemas/tables/resync" in all_api_mocks.calls[2].request.url + + # poll calls + all_api_mocks.calls.reset() + client.poll_sync( + connector_id=connector_id, previous_sync_completed_at=parser.parse(MIN_TIME_STR) + ) + assert len(all_api_mocks.calls) == 2