Skip to content

Commit

Permalink
[dagster-fivetran] Implement sync_and_poll method in FivetranClient (#…
Browse files Browse the repository at this point in the history
…26061)

## Summary & Motivation

This PR uses the sync and poll methods implemented in previous PRs to
implement `sync_and_poll` in `FivetranClient`. This method will be used
in a subsequent PR to materialize Fivetran assets.

Tests are added to test the full sync and poll behavior.

## How I Tested These Changes

Additional unit tests with BK
  • Loading branch information
maximearmstrong authored Nov 27, 2024
1 parent 9fdb658 commit d1742d2
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,52 @@ def poll_sync(
)
return connector_details

def sync_and_poll(
self,
connector_id: str,
poll_interval: float = DEFAULT_POLL_INTERVAL,
poll_timeout: Optional[float] = None,
) -> FivetranOutput:
"""Initializes a sync operation for the given connector, and polls until it completes.
Args:
connector_id (str): The Fivetran Connector ID. You can retrieve this value from the
"Setup" tab of a given connector in the Fivetran UI.
poll_interval (float): The time (in seconds) that will be waited between successive polls.
poll_timeout (float): The maximum time that will wait before this operation is timed
out. By default, this will never time out.
Returns:
:py:class:`~FivetranOutput`:
Object containing details about the connector and the tables it updates
"""
return self._sync_and_poll(
sync_fn=self.start_sync,
connector_id=connector_id,
poll_interval=poll_interval,
poll_timeout=poll_timeout,
)

def _sync_and_poll(
self,
sync_fn: Callable,
connector_id: str,
poll_interval: float = DEFAULT_POLL_INTERVAL,
poll_timeout: Optional[float] = None,
) -> FivetranOutput:
schema_config_details = self.get_schema_config_for_connector(connector_id)
connector = FivetranConnector.from_connector_details(
connector_details=self.get_connector_details(connector_id)
)
sync_fn(connector_id=connector_id)
final_details = self.poll_sync(
connector_id=connector_id,
previous_sync_completed_at=connector.last_sync_completed_at,
poll_interval=poll_interval,
poll_timeout=poll_timeout,
)
return FivetranOutput(connector_details=final_details, schema_config=schema_config_details)


@experimental
class FivetranWorkspace(ConfigurableResource):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,12 @@ def get_sample_connection_details(succeeded_at: str, failed_at: str) -> Mapping[
SAMPLE_SUCCESS_MESSAGE = {"code": "Success", "message": "Operation performed."}


def get_fivetran_connector_api_url(connector_id: str) -> str:
return (
f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}"
)


@pytest.fixture(name="connector_id")
def connector_id_fixture() -> str:
return "connector_id"
Expand Down Expand Up @@ -442,7 +448,7 @@ def fetch_workspace_data_api_mocks_fixture(

response.add(
method=responses.GET,
url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}/schemas",
url=f"{get_fivetran_connector_api_url(connector_id)}/schemas",
json=SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR,
status=200,
)
Expand All @@ -461,35 +467,35 @@ def all_api_mocks_fixture(
) -> Iterator[responses.RequestsMock]:
fetch_workspace_data_api_mocks.add(
method=responses.GET,
url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}",
url=get_fivetran_connector_api_url(connector_id),
json=get_sample_connection_details(
succeeded_at=TEST_MAX_TIME_STR, failed_at=TEST_PREVIOUS_MAX_TIME_STR
),
status=200,
)
fetch_workspace_data_api_mocks.add(
method=responses.PATCH,
url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}",
url=get_fivetran_connector_api_url(connector_id),
json=get_sample_connection_details(
succeeded_at=TEST_MAX_TIME_STR, failed_at=TEST_PREVIOUS_MAX_TIME_STR
),
status=200,
)
fetch_workspace_data_api_mocks.add(
method=responses.POST,
url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}/force",
url=f"{get_fivetran_connector_api_url(connector_id)}/force",
json=SAMPLE_SUCCESS_MESSAGE,
status=200,
)
fetch_workspace_data_api_mocks.add(
method=responses.POST,
url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}/resync",
url=f"{get_fivetran_connector_api_url(connector_id)}/resync",
json=SAMPLE_SUCCESS_MESSAGE,
status=200,
)
fetch_workspace_data_api_mocks.add(
method=responses.POST,
url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}/schemas/tables/resync",
url=f"{get_fivetran_connector_api_url(connector_id)}/schemas/tables/resync",
json=SAMPLE_SUCCESS_MESSAGE,
status=200,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@
import responses
from dagster import Failure
from dagster._vendored.dateutil import parser
from dagster_fivetran import FivetranWorkspace
from dagster_fivetran import FivetranOutput, FivetranWorkspace
from dagster_fivetran.translator import MIN_TIME_STR

from dagster_fivetran_tests.experimental.conftest import (
FIVETRAN_API_BASE,
FIVETRAN_API_VERSION,
FIVETRAN_CONNECTOR_ENDPOINT,
SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR,
SAMPLE_SUCCESS_MESSAGE,
TEST_ACCOUNT_ID,
TEST_API_KEY,
TEST_API_SECRET,
TEST_MAX_TIME_STR,
TEST_PREVIOUS_MAX_TIME_STR,
get_fivetran_connector_api_url,
get_sample_connection_details,
)

Expand Down Expand Up @@ -95,7 +95,7 @@ def test_basic_resource_request(
# Replace the mock API call and set `failed_at` as more recent that `succeeded_at`
all_api_mocks.replace(
method_or_response=responses.GET,
url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}",
url=get_fivetran_connector_api_url(connector_id),
json=get_sample_connection_details(
succeeded_at=TEST_PREVIOUS_MAX_TIME_STR, failed_at=TEST_MAX_TIME_STR
),
Expand All @@ -108,3 +108,74 @@ def test_basic_resource_request(
poll_timeout=2,
poll_interval=1,
)


@pytest.mark.parametrize(
"n_polls, succeed_at_end",
[(0, True), (0, False), (4, True), (4, False), (30, True)],
ids=["short_success", "short_failure", "medium_success", "medium_failure", "long_success"],
)
def test_sync_and_poll(n_polls, succeed_at_end, connector_id):
resource = FivetranWorkspace(
account_id=TEST_ACCOUNT_ID, api_key=TEST_API_KEY, api_secret=TEST_API_SECRET
)
client = resource.get_client()

test_connector_api_url = get_fivetran_connector_api_url(connector_id)

test_succeeded_at = TEST_MAX_TIME_STR
test_failed_at = TEST_PREVIOUS_MAX_TIME_STR
# Set `failed_at` as more recent that `succeeded_at` if the sync and poll process is expected to fail
if not succeed_at_end:
test_succeeded_at = TEST_PREVIOUS_MAX_TIME_STR
test_failed_at = TEST_MAX_TIME_STR

# Create mock responses to mock full sync and poll behavior, used only in this test
def _mock_interaction():
with responses.RequestsMock() as response:
response.add(
responses.GET,
f"{test_connector_api_url}/schemas",
json=SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR,
)
response.add(responses.PATCH, test_connector_api_url, json=SAMPLE_SUCCESS_MESSAGE)
response.add(
responses.POST, f"{test_connector_api_url}/force", json=SAMPLE_SUCCESS_MESSAGE
)
# initial state
response.add(
responses.GET,
test_connector_api_url,
json=get_sample_connection_details(
succeeded_at=MIN_TIME_STR, failed_at=MIN_TIME_STR
),
)
# n polls before updating
for _ in range(n_polls):
response.add(
responses.GET,
test_connector_api_url,
json=get_sample_connection_details(
succeeded_at=MIN_TIME_STR, failed_at=MIN_TIME_STR
),
)
# final state will be updated
response.add(
responses.GET,
test_connector_api_url,
json=get_sample_connection_details(
succeeded_at=test_succeeded_at, failed_at=test_failed_at
),
)
return client.sync_and_poll(connector_id, poll_interval=0.1)

if succeed_at_end:
assert _mock_interaction() == FivetranOutput(
connector_details=get_sample_connection_details(
succeeded_at=test_succeeded_at, failed_at=test_failed_at
)["data"],
schema_config=SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR["data"],
)
else:
with pytest.raises(Failure, match="failed!"):
_mock_interaction()

0 comments on commit d1742d2

Please sign in to comment.