Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-fivetran] Implement base poll method in FivetranClient #26060

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,61 @@ def _start_sync(self, request_fn: Callable[[], Mapping[str, Any]], connector_id:
" UI: " + connector.url
)

def poll_sync(
self,
connector_id: str,
previous_sync_completed_at: datetime,
poll_interval: float = DEFAULT_POLL_INTERVAL,
poll_timeout: Optional[float] = None,
) -> Mapping[str, Any]:
"""Given a Fivetran connector and the timestamp at which the previous sync completed, poll
until the next sync completes.

The previous sync completion time is necessary because the only way to tell when a sync
completes is when this value changes.

Args:
connector_id (str): The Fivetran Connector ID. You can retrieve this value from the
"Setup" tab of a given connector in the Fivetran UI.
previous_sync_completed_at (datetime.datetime): The datetime of the previous completed sync
(successful or otherwise) for this connector, prior to running this method.
poll_interval (float): The time (in seconds) that will be waited between successive polls.
poll_timeout (float): The maximum time that will wait before this operation is timed
out. By default, this will never time out.

Returns:
Dict[str, Any]: Parsed json data representing the API response.
"""
poll_start = datetime.now()
while True:
connector_details = self.get_connector_details(connector_id)
connector = FivetranConnector.from_connector_details(
connector_details=connector_details
)
self._log.info(f"Polled '{connector_id}'. Status: [{connector.sync_state}]")

if connector.last_sync_completed_at > previous_sync_completed_at:
break

if poll_timeout and datetime.now() > poll_start + timedelta(seconds=poll_timeout):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[1]

raise Failure(
f"Sync for connector '{connector_id}' timed out after "
f"{datetime.now() - poll_start}."
)

# Sleep for the configured time interval before polling again.
time.sleep(poll_interval)

if not connector.is_last_sync_successful:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[2]

raise Failure(
f"Sync for connector '{connector_id}' failed!",
metadata={
"connector_details": MetadataValue.json(connector_details),
"log_url": MetadataValue.url(connector.url),
},
)
return connector_details


@experimental
class FivetranWorkspace(ConfigurableResource):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime
from enum import Enum
from typing import Any, List, Mapping, NamedTuple, Optional, Sequence

Expand All @@ -7,9 +8,12 @@
from dagster._record import as_dict, record
from dagster._serdes.serdes import whitelist_for_serdes
from dagster._utils.cached_method import cached_method
from dagster._vendored.dateutil import parser

from dagster_fivetran.utils import get_fivetran_connector_table_name, metadata_for_table

MIN_TIME_STR = "0001-01-01 00:00:00+00"


class FivetranConnectorTableProps(NamedTuple):
table: str
Expand Down Expand Up @@ -46,7 +50,10 @@ class FivetranConnector:
service: str
group_id: str
setup_state: str
sync_state: str
paused: bool
succeeded_at: Optional[str]
failed_at: Optional[str]

@property
def url(self) -> str:
Expand All @@ -64,6 +71,32 @@ def is_connected(self) -> bool:
def is_paused(self) -> bool:
return self.paused

@property
def last_sync_completed_at(self) -> datetime:
"""Gets the datetime of the last completed sync of the Fivetran connector.

Returns:
datetime.datetime:
The datetime of the last completed sync of the Fivetran connector.
"""
succeeded_at = parser.parse(self.succeeded_at or MIN_TIME_STR)
failed_at = parser.parse(self.failed_at or MIN_TIME_STR)

return max(succeeded_at, failed_at)

@property
def is_last_sync_successful(self) -> bool:
"""Gets a boolean representing whether the last completed sync of the Fivetran connector was successful or not.

Returns:
bool:
Whether the last completed sync of the Fivetran connector was successful or not.
"""
succeeded_at = parser.parse(self.succeeded_at or MIN_TIME_STR)
failed_at = parser.parse(self.failed_at or MIN_TIME_STR)

return succeeded_at > failed_at

def validate_syncable(self) -> bool:
"""Confirms that the connector can be sync. Will raise a Failure in the event that
the connector is either paused or not fully set up.
Expand All @@ -85,7 +118,10 @@ def from_connector_details(
service=connector_details["service"],
group_id=connector_details["group_id"],
setup_state=connector_details["status"]["setup_state"],
sync_state=connector_details["status"]["sync_state"],
paused=connector_details["paused"],
succeeded_at=connector_details.get("succeeded_at"),
failed_at=connector_details.get("failed_at"),
)


Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Iterator
from typing import Any, Iterator, Mapping

import pytest
import responses
Expand All @@ -8,6 +8,13 @@
FIVETRAN_CONNECTOR_ENDPOINT,
)

TEST_MAX_TIME_STR = "2024-12-01T15:45:29.013729Z"
TEST_PREVIOUS_MAX_TIME_STR = "2024-12-01T15:43:29.013729Z"

TEST_ACCOUNT_ID = "test_account_id"
TEST_API_KEY = "test_api_key"
TEST_API_SECRET = "test_api_secret"

# Taken from Fivetran API documentation
# https://fivetran.com/docs/rest-api/api-reference/groups/list-all-groups
SAMPLE_GROUPS = {
Expand Down Expand Up @@ -61,7 +68,7 @@
},
"config": {"property1": {}, "property2": {}},
"daily_sync_time": "14:00",
"succeeded_at": "2024-12-01T15:43:29.013729Z",
"succeeded_at": "2024-12-01T15:45:29.013729Z",
"sync_frequency": 360,
"group_id": "my_group_destination_id",
"connected_by": "user_id",
Expand All @@ -75,7 +82,7 @@
],
"source_sync_details": {},
"service_version": 0,
"created_at": "2024-12-01T15:43:29.013729Z",
"created_at": "2024-12-01T15:41:29.013729Z",
"failed_at": "2024-12-01T15:43:29.013729Z",
"private_link_id": "string",
"proxy_agent_id": "string",
Expand Down Expand Up @@ -148,72 +155,79 @@
},
}


# Taken from Fivetran API documentation
# https://fivetran.com/docs/rest-api/api-reference/connectors/connector-details
SAMPLE_CONNECTOR_DETAILS = {
"code": "Success",
"message": "Operation performed.",
"data": {
"id": "connector_id",
"service": "15five",
"schema": "schema.table",
"paused": False,
"status": {
"tasks": [
{
"code": "resync_table_warning",
"message": "Resync Table Warning",
"details": "string",
}
],
"warnings": [
# The sample is parameterized to test the poll method
def get_sample_connection_details(succeeded_at: str, failed_at: str) -> Mapping[str, Any]:
return {
"code": "Success",
"message": "Operation performed.",
"data": {
"id": "connector_id",
"service": "15five",
"schema": "schema.table",
"paused": False,
"status": {
"tasks": [
{
"code": "resync_table_warning",
"message": "Resync Table Warning",
"details": "string",
}
],
"warnings": [
{
"code": "resync_table_warning",
"message": "Resync Table Warning",
"details": "string",
}
],
"schema_status": "ready",
"update_state": "delayed",
"setup_state": "connected",
"sync_state": "scheduled",
"is_historical_sync": False,
"rescheduled_for": "2024-12-01T15:43:29.013729Z",
},
"daily_sync_time": "14:00",
"succeeded_at": succeeded_at,
"sync_frequency": 1440,
"group_id": "my_group_destination_id",
"connected_by": "user_id",
"setup_tests": [
{
"code": "resync_table_warning",
"message": "Resync Table Warning",
"details": "string",
"title": "Test Title",
"status": "PASSED",
"message": "Test Passed",
"details": "Test Details",
}
],
"schema_status": "ready",
"update_state": "delayed",
"setup_state": "connected",
"sync_state": "scheduled",
"is_historical_sync": False,
"rescheduled_for": "2024-12-01T15:43:29.013729Z",
},
"daily_sync_time": "14:00",
"succeeded_at": "2024-03-17T12:31:40.870504Z",
"sync_frequency": 1440,
"group_id": "my_group_destination_id",
"connected_by": "user_id",
"setup_tests": [
{
"title": "Test Title",
"status": "PASSED",
"message": "Test Passed",
"details": "Test Details",
}
],
"source_sync_details": {},
"service_version": 0,
"created_at": "2023-12-01T15:43:29.013729Z",
"failed_at": "2024-04-01T18:13:25.043659Z",
"private_link_id": "private_link_id",
"proxy_agent_id": "proxy_agent_id",
"networking_method": "Directly",
"connect_card": {
"token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJkIjp7ImxvZ2luIjp0cnVlLCJ1c2VyIjoiX2FjY291bnR3b3J0aHkiLCJhY2NvdW50IjoiX21vb25iZWFtX2FjYyIsImdyb3VwIjoiX21vb25iZWFtIiwiY29ubmVjdG9yIjoiY29iYWx0X2VsZXZhdGlvbiIsIm1ldGhvZCI6IlBiZkNhcmQiLCJpZGVudGl0eSI6ZmFsc2V9LCJpYXQiOjE2Njc4MzA2MzZ9.YUMGUbzxW96xsKJLo4bTorqzx8Q19GTrUi3WFRFM8BU",
"uri": "https://fivetran.com/connect-card/setup?auth=eyJ0eXAiOiJKV1QiLCJh...",
"source_sync_details": {},
"service_version": 0,
"created_at": "2024-12-01T15:41:29.013729Z",
"failed_at": failed_at,
"private_link_id": "private_link_id",
"proxy_agent_id": "proxy_agent_id",
"networking_method": "Directly",
"connect_card": {
"token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJkIjp7ImxvZ2luIjp0cnVlLCJ1c2VyIjoiX2FjY291bnR3b3J0aHkiLCJhY2NvdW50IjoiX21vb25iZWFtX2FjYyIsImdyb3VwIjoiX21vb25iZWFtIiwiY29ubmVjdG9yIjoiY29iYWx0X2VsZXZhdGlvbiIsIm1ldGhvZCI6IlBiZkNhcmQiLCJpZGVudGl0eSI6ZmFsc2V9LCJpYXQiOjE2Njc4MzA2MzZ9.YUMGUbzxW96xsKJLo4bTorqzx8Q19GTrUi3WFRFM8BU",
"uri": "https://fivetran.com/connect-card/setup?auth=eyJ0eXAiOiJKV1QiLCJh...",
},
"pause_after_trial": False,
"data_delay_threshold": 0,
"data_delay_sensitivity": "NORMAL",
"schedule_type": "auto",
"local_processing_agent_id": "local_processing_agent_id",
"connect_card_config": {
"redirect_uri": "https://your.site/path",
"hide_setup_guide": True,
},
"hybrid_deployment_agent_id": "hybrid_deployment_agent_id",
"config": {"api_key": "your_15five_api_key"},
},
"pause_after_trial": False,
"data_delay_threshold": 0,
"data_delay_sensitivity": "NORMAL",
"schedule_type": "auto",
"local_processing_agent_id": "local_processing_agent_id",
"connect_card_config": {"redirect_uri": "https://your.site/path", "hide_setup_guide": True},
"hybrid_deployment_agent_id": "hybrid_deployment_agent_id",
"config": {"api_key": "your_15five_api_key"},
},
}
}


# Taken from Fivetran API documentation
# https://fivetran.com/docs/rest-api/api-reference/connector-schema/connector-schema-config
Expand Down Expand Up @@ -382,10 +396,6 @@

SAMPLE_SUCCESS_MESSAGE = {"code": "Success", "message": "Operation performed."}

TEST_ACCOUNT_ID = "test_account_id"
TEST_API_KEY = "test_api_key"
TEST_API_SECRET = "test_api_secret"


@pytest.fixture(name="connector_id")
def connector_id_fixture() -> str:
Expand Down Expand Up @@ -452,13 +462,17 @@ def all_api_mocks_fixture(
fetch_workspace_data_api_mocks.add(
method=responses.GET,
url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}",
json=SAMPLE_CONNECTOR_DETAILS,
json=get_sample_connection_details(
succeeded_at=TEST_MAX_TIME_STR, failed_at=TEST_PREVIOUS_MAX_TIME_STR
),
status=200,
)
fetch_workspace_data_api_mocks.add(
method=responses.PATCH,
url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}",
json=SAMPLE_CONNECTOR_DETAILS,
json=get_sample_connection_details(
succeeded_at=TEST_MAX_TIME_STR, failed_at=TEST_PREVIOUS_MAX_TIME_STR
),
status=200,
)
fetch_workspace_data_api_mocks.add(
Expand Down
Loading