From d175fa06a1b226a94fcb2eb3a73c70814e2140be Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Fri, 15 Nov 2024 09:31:44 -0500 Subject: [PATCH] Clean code --- .../dagster_fivetran/resources.py | 47 +++++++------------ .../dagster_fivetran/translator.py | 30 +++++++----- .../experimental/test_resources.py | 8 ++-- 3 files changed, 40 insertions(+), 45 deletions(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 0433a7209049b..5e553783c78f8 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -1,8 +1,8 @@ -import datetime import json import logging import os import time +from datetime import datetime, timedelta from functools import partial from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Type from urllib.parse import urljoin @@ -171,7 +171,7 @@ def _assert_syncable_connector(self, connector_id: str): if connector_details["status"]["setup_state"] != "connected": raise Failure(f"Connector '{connector_id}' cannot be synced as it has not been setup") - def get_connector_sync_status(self, connector_id: str) -> Tuple[datetime.datetime, bool, str]: + def get_connector_sync_status(self, connector_id: str) -> Tuple[datetime, bool, str]: """Gets details about the status of the most recent Fivetran sync operation for a given connector. @@ -296,7 +296,7 @@ def start_resync( def poll_sync( self, connector_id: str, - initial_last_sync_completion: datetime.datetime, + initial_last_sync_completion: datetime, poll_interval: float = DEFAULT_POLL_INTERVAL, poll_timeout: Optional[float] = None, ) -> Mapping[str, Any]: @@ -318,7 +318,7 @@ def poll_sync( Returns: Dict[str, Any]: Parsed json data representing the API response. """ - poll_start = datetime.datetime.now() + poll_start = datetime.now() while True: ( curr_last_sync_completion, @@ -330,12 +330,10 @@ def poll_sync( if curr_last_sync_completion > initial_last_sync_completion: break - if poll_timeout and datetime.datetime.now() > poll_start + datetime.timedelta( - seconds=poll_timeout - ): + if poll_timeout and datetime.now() > poll_start + timedelta(seconds=poll_timeout): raise Failure( f"Sync for connector '{connector_id}' timed out after " - f"{datetime.datetime.now() - poll_start}." + f"{datetime.now() - poll_start}." ) # Sleep for the configured time interval before polling again. @@ -664,9 +662,6 @@ def _start_sync(self, request_fn: Callable, connector_id: str) -> None: ) connector.assert_syncable() request_fn() - connector = FivetranConnector.from_connector_details( - connector_details=self.get_connector_details(connector_id) - ) self._log.info( f"Sync initialized for connector_id={connector_id}. View this sync in the Fivetran" " UI: " + connector.url @@ -675,7 +670,7 @@ def _start_sync(self, request_fn: Callable, connector_id: str) -> None: def poll_sync( self, connector_id: str, - initial_last_sync_completion: datetime.datetime, + previous_sync_completed_at: datetime, poll_interval: float = DEFAULT_POLL_INTERVAL, poll_timeout: Optional[float] = None, ) -> Mapping[str, Any]: @@ -688,7 +683,7 @@ def poll_sync( Args: connector_id (str): The Fivetran Connector ID. You can retrieve this value from the "Setup" tab of a given connector in the Fivetran UI. - initial_last_sync_completion (datetime.datetime): The timestamp of the last completed sync + previous_sync_completed_at (datetime.datetime): The datetime of the previous completed sync (successful or otherwise) for this connector, prior to running this method. poll_interval (float): The time (in seconds) that will be waited between successive polls. poll_timeout (float): The maximum time that will wait before this operation is timed @@ -697,34 +692,27 @@ def poll_sync( Returns: Dict[str, Any]: Parsed json data representing the API response. """ - poll_start = datetime.datetime.now() + poll_start = datetime.now() while True: connector = FivetranConnector.from_connector_details( connector_details=self.get_connector_details(connector_id) ) - ( - curr_last_sync_completion, - curr_last_sync_succeeded, - curr_sync_state, - ) = connector.sync_status - self._log.info(f"Polled '{connector_id}'. Status: [{curr_sync_state}]") + self._log.info(f"Polled '{connector_id}'. Status: [{connector.sync_state}]") - if curr_last_sync_completion > initial_last_sync_completion: + if connector.last_sync_completed_at > previous_sync_completed_at: break - if poll_timeout and datetime.datetime.now() > poll_start + datetime.timedelta( - seconds=poll_timeout - ): + if poll_timeout and datetime.now() > poll_start + timedelta(seconds=poll_timeout): raise Failure( f"Sync for connector '{connector_id}' timed out after " - f"{datetime.datetime.now() - poll_start}." + f"{datetime.now() - poll_start}." ) # Sleep for the configured time interval before polling again. time.sleep(poll_interval) post_raw_connector_details = self.get_connector_details(connector_id) - if not curr_last_sync_succeeded: + if not connector.is_last_sync_successful: raise Failure( f"Sync for connector '{connector_id}' failed!", metadata={ @@ -801,11 +789,10 @@ def _sync_and_poll( connector = FivetranConnector.from_connector_details( connector_details=self.get_connector_details(connector_id) ) - init_last_sync_timestamp, _, _ = connector.sync_status - sync_fn(connector_id) + sync_fn(connector_id=connector_id) final_details = self.poll_sync( - connector_id, - init_last_sync_timestamp, + connector_id=connector_id, + previous_sync_completed_at=connector.last_sync_completed_at, poll_interval=poll_interval, poll_timeout=poll_timeout, ) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py index 1c92fe760c8fc..a653e9a88464c 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py @@ -1,6 +1,6 @@ from datetime import datetime from enum import Enum -from typing import Any, List, Mapping, NamedTuple, Optional, Sequence, Tuple +from typing import Any, List, Mapping, NamedTuple, Optional, Sequence from dagster import Failure from dagster._core.definitions.asset_key import AssetKey @@ -80,22 +80,30 @@ def is_paused(self) -> bool: return self.paused @property - def sync_status(self) -> Tuple[datetime, bool, str]: - """Gets details about the status of the Fivetran connector. + def last_sync_completed_at(self) -> datetime: + """Gets the datetime of the last completed sync of the Fivetran connector. Returns: - Tuple[datetime.datetime, bool, str]: - Tuple representing the timestamp of the last completed sync, if it succeeded, and - the currently reported sync status. + datetime.datetime: + The datetime of the last completed sync of the Fivetran connector. """ succeeded_at = parser.parse(self.succeeded_at or MIN_TIME_STR) failed_at = parser.parse(self.failed_at or MIN_TIME_STR) - return ( - max(succeeded_at, failed_at), - succeeded_at > failed_at, - self.sync_state, - ) + return max(succeeded_at, failed_at) + + @property + def is_last_sync_successful(self) -> bool: + """Gets a boolean representing whether the last completed sync of the Fivetran connector was successful or not. + + Returns: + bool: + Whether the last completed sync of the Fivetran connector was successful or not. + """ + succeeded_at = parser.parse(self.succeeded_at or MIN_TIME_STR) + failed_at = parser.parse(self.failed_at or MIN_TIME_STR) + + return succeeded_at > failed_at def assert_syncable(self) -> bool: """Confirms that the connector can be sync. Will raise a Failure in the event that diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py index 457ecc6070a4e..1e73d8535a224 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py @@ -45,21 +45,21 @@ def test_basic_resource_request( all_api_mocks.calls.reset() client.start_sync(connector_id=connector_id) - assert len(all_api_mocks.calls) == 4 + assert len(all_api_mocks.calls) == 3 assert f"{connector_id}/force" in all_api_mocks.calls[2].request.url all_api_mocks.calls.reset() client.start_resync(connector_id=connector_id, resync_parameters=None) - assert len(all_api_mocks.calls) == 4 + assert len(all_api_mocks.calls) == 3 assert f"{connector_id}/resync" in all_api_mocks.calls[2].request.url all_api_mocks.calls.reset() client.start_resync(connector_id=connector_id, resync_parameters={"property1": ["string"]}) - assert len(all_api_mocks.calls) == 4 + assert len(all_api_mocks.calls) == 3 assert f"{connector_id}/schemas/tables/resync" in all_api_mocks.calls[2].request.url all_api_mocks.calls.reset() client.poll_sync( - connector_id=connector_id, initial_last_sync_completion=parser.parse(MIN_TIME_STR) + connector_id=connector_id, previous_sync_completed_at=parser.parse(MIN_TIME_STR) ) assert len(all_api_mocks.calls) == 2