Skip to content

Commit

Permalink
Clean code
Browse files Browse the repository at this point in the history
  • Loading branch information
maximearmstrong committed Nov 15, 2024
1 parent e8436f2 commit d175fa0
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 45 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import datetime
import json
import logging
import os
import time
from datetime import datetime, timedelta
from functools import partial
from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Type
from urllib.parse import urljoin
Expand Down Expand Up @@ -171,7 +171,7 @@ def _assert_syncable_connector(self, connector_id: str):
if connector_details["status"]["setup_state"] != "connected":
raise Failure(f"Connector '{connector_id}' cannot be synced as it has not been setup")

def get_connector_sync_status(self, connector_id: str) -> Tuple[datetime.datetime, bool, str]:
def get_connector_sync_status(self, connector_id: str) -> Tuple[datetime, bool, str]:
"""Gets details about the status of the most recent Fivetran sync operation for a given
connector.
Expand Down Expand Up @@ -296,7 +296,7 @@ def start_resync(
def poll_sync(
self,
connector_id: str,
initial_last_sync_completion: datetime.datetime,
initial_last_sync_completion: datetime,
poll_interval: float = DEFAULT_POLL_INTERVAL,
poll_timeout: Optional[float] = None,
) -> Mapping[str, Any]:
Expand All @@ -318,7 +318,7 @@ def poll_sync(
Returns:
Dict[str, Any]: Parsed json data representing the API response.
"""
poll_start = datetime.datetime.now()
poll_start = datetime.now()
while True:
(
curr_last_sync_completion,
Expand All @@ -330,12 +330,10 @@ def poll_sync(
if curr_last_sync_completion > initial_last_sync_completion:
break

if poll_timeout and datetime.datetime.now() > poll_start + datetime.timedelta(
seconds=poll_timeout
):
if poll_timeout and datetime.now() > poll_start + timedelta(seconds=poll_timeout):
raise Failure(
f"Sync for connector '{connector_id}' timed out after "
f"{datetime.datetime.now() - poll_start}."
f"{datetime.now() - poll_start}."
)

# Sleep for the configured time interval before polling again.
Expand Down Expand Up @@ -664,9 +662,6 @@ def _start_sync(self, request_fn: Callable, connector_id: str) -> None:
)
connector.assert_syncable()
request_fn()
connector = FivetranConnector.from_connector_details(
connector_details=self.get_connector_details(connector_id)
)
self._log.info(
f"Sync initialized for connector_id={connector_id}. View this sync in the Fivetran"
" UI: " + connector.url
Expand All @@ -675,7 +670,7 @@ def _start_sync(self, request_fn: Callable, connector_id: str) -> None:
def poll_sync(
self,
connector_id: str,
initial_last_sync_completion: datetime.datetime,
previous_sync_completed_at: datetime,
poll_interval: float = DEFAULT_POLL_INTERVAL,
poll_timeout: Optional[float] = None,
) -> Mapping[str, Any]:
Expand All @@ -688,7 +683,7 @@ def poll_sync(
Args:
connector_id (str): The Fivetran Connector ID. You can retrieve this value from the
"Setup" tab of a given connector in the Fivetran UI.
initial_last_sync_completion (datetime.datetime): The timestamp of the last completed sync
previous_sync_completed_at (datetime.datetime): The datetime of the previous completed sync
(successful or otherwise) for this connector, prior to running this method.
poll_interval (float): The time (in seconds) that will be waited between successive polls.
poll_timeout (float): The maximum time that will wait before this operation is timed
Expand All @@ -697,34 +692,27 @@ def poll_sync(
Returns:
Dict[str, Any]: Parsed json data representing the API response.
"""
poll_start = datetime.datetime.now()
poll_start = datetime.now()
while True:
connector = FivetranConnector.from_connector_details(
connector_details=self.get_connector_details(connector_id)
)
(
curr_last_sync_completion,
curr_last_sync_succeeded,
curr_sync_state,
) = connector.sync_status
self._log.info(f"Polled '{connector_id}'. Status: [{curr_sync_state}]")
self._log.info(f"Polled '{connector_id}'. Status: [{connector.sync_state}]")

if curr_last_sync_completion > initial_last_sync_completion:
if connector.last_sync_completed_at > previous_sync_completed_at:
break

if poll_timeout and datetime.datetime.now() > poll_start + datetime.timedelta(
seconds=poll_timeout
):
if poll_timeout and datetime.now() > poll_start + timedelta(seconds=poll_timeout):
raise Failure(
f"Sync for connector '{connector_id}' timed out after "
f"{datetime.datetime.now() - poll_start}."
f"{datetime.now() - poll_start}."
)

# Sleep for the configured time interval before polling again.
time.sleep(poll_interval)

post_raw_connector_details = self.get_connector_details(connector_id)
if not curr_last_sync_succeeded:
if not connector.is_last_sync_successful:
raise Failure(
f"Sync for connector '{connector_id}' failed!",
metadata={
Expand Down Expand Up @@ -801,11 +789,10 @@ def _sync_and_poll(
connector = FivetranConnector.from_connector_details(
connector_details=self.get_connector_details(connector_id)
)
init_last_sync_timestamp, _, _ = connector.sync_status
sync_fn(connector_id)
sync_fn(connector_id=connector_id)
final_details = self.poll_sync(
connector_id,
init_last_sync_timestamp,
connector_id=connector_id,
previous_sync_completed_at=connector.last_sync_completed_at,
poll_interval=poll_interval,
poll_timeout=poll_timeout,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from datetime import datetime
from enum import Enum
from typing import Any, List, Mapping, NamedTuple, Optional, Sequence, Tuple
from typing import Any, List, Mapping, NamedTuple, Optional, Sequence

from dagster import Failure
from dagster._core.definitions.asset_key import AssetKey
Expand Down Expand Up @@ -80,22 +80,30 @@ def is_paused(self) -> bool:
return self.paused

@property
def sync_status(self) -> Tuple[datetime, bool, str]:
"""Gets details about the status of the Fivetran connector.
def last_sync_completed_at(self) -> datetime:
"""Gets the datetime of the last completed sync of the Fivetran connector.
Returns:
Tuple[datetime.datetime, bool, str]:
Tuple representing the timestamp of the last completed sync, if it succeeded, and
the currently reported sync status.
datetime.datetime:
The datetime of the last completed sync of the Fivetran connector.
"""
succeeded_at = parser.parse(self.succeeded_at or MIN_TIME_STR)
failed_at = parser.parse(self.failed_at or MIN_TIME_STR)

return (
max(succeeded_at, failed_at),
succeeded_at > failed_at,
self.sync_state,
)
return max(succeeded_at, failed_at)

@property
def is_last_sync_successful(self) -> bool:
"""Gets a boolean representing whether the last completed sync of the Fivetran connector was successful or not.
Returns:
bool:
Whether the last completed sync of the Fivetran connector was successful or not.
"""
succeeded_at = parser.parse(self.succeeded_at or MIN_TIME_STR)
failed_at = parser.parse(self.failed_at or MIN_TIME_STR)

return succeeded_at > failed_at

def assert_syncable(self) -> bool:
"""Confirms that the connector can be sync. Will raise a Failure in the event that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,21 @@ def test_basic_resource_request(

all_api_mocks.calls.reset()
client.start_sync(connector_id=connector_id)
assert len(all_api_mocks.calls) == 4
assert len(all_api_mocks.calls) == 3
assert f"{connector_id}/force" in all_api_mocks.calls[2].request.url

all_api_mocks.calls.reset()
client.start_resync(connector_id=connector_id, resync_parameters=None)
assert len(all_api_mocks.calls) == 4
assert len(all_api_mocks.calls) == 3
assert f"{connector_id}/resync" in all_api_mocks.calls[2].request.url

all_api_mocks.calls.reset()
client.start_resync(connector_id=connector_id, resync_parameters={"property1": ["string"]})
assert len(all_api_mocks.calls) == 4
assert len(all_api_mocks.calls) == 3
assert f"{connector_id}/schemas/tables/resync" in all_api_mocks.calls[2].request.url

all_api_mocks.calls.reset()
client.poll_sync(
connector_id=connector_id, initial_last_sync_completion=parser.parse(MIN_TIME_STR)
connector_id=connector_id, previous_sync_completed_at=parser.parse(MIN_TIME_STR)
)
assert len(all_api_mocks.calls) == 2

0 comments on commit d175fa0

Please sign in to comment.