diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index a4fb0af2ce432..119695e23b365 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -723,6 +723,52 @@ def poll_sync( ) return connector_details + def sync_and_poll( + self, + connector_id: str, + poll_interval: float = DEFAULT_POLL_INTERVAL, + poll_timeout: Optional[float] = None, + ) -> FivetranOutput: + """Initializes a sync operation for the given connector, and polls until it completes. + + Args: + connector_id (str): The Fivetran Connector ID. You can retrieve this value from the + "Setup" tab of a given connector in the Fivetran UI. + poll_interval (float): The time (in seconds) that will be waited between successive polls. + poll_timeout (float): The maximum time that will wait before this operation is timed + out. By default, this will never time out. + + Returns: + :py:class:`~FivetranOutput`: + Object containing details about the connector and the tables it updates + """ + return self._sync_and_poll( + sync_fn=self.start_sync, + connector_id=connector_id, + poll_interval=poll_interval, + poll_timeout=poll_timeout, + ) + + def _sync_and_poll( + self, + sync_fn: Callable, + connector_id: str, + poll_interval: float = DEFAULT_POLL_INTERVAL, + poll_timeout: Optional[float] = None, + ) -> FivetranOutput: + schema_config_details = self.get_schema_config_for_connector(connector_id) + connector = FivetranConnector.from_connector_details( + connector_details=self.get_connector_details(connector_id) + ) + sync_fn(connector_id=connector_id) + final_details = self.poll_sync( + connector_id=connector_id, + previous_sync_completed_at=connector.last_sync_completed_at, + poll_interval=poll_interval, + poll_timeout=poll_timeout, + ) + return FivetranOutput(connector_details=final_details, schema_config=schema_config_details) + @experimental class FivetranWorkspace(ConfigurableResource): diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py index 86d10da45c82b..691af75950039 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py @@ -397,6 +397,12 @@ def get_sample_connection_details(succeeded_at: str, failed_at: str) -> Mapping[ SAMPLE_SUCCESS_MESSAGE = {"code": "Success", "message": "Operation performed."} +def get_fivetran_connector_api_url(connector_id: str) -> str: + return ( + f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}" + ) + + @pytest.fixture(name="connector_id") def connector_id_fixture() -> str: return "connector_id" @@ -442,7 +448,7 @@ def fetch_workspace_data_api_mocks_fixture( response.add( method=responses.GET, - url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}/schemas", + url=f"{get_fivetran_connector_api_url(connector_id)}/schemas", json=SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR, status=200, ) @@ -461,7 +467,7 @@ def all_api_mocks_fixture( ) -> Iterator[responses.RequestsMock]: fetch_workspace_data_api_mocks.add( method=responses.GET, - url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}", + url=get_fivetran_connector_api_url(connector_id), json=get_sample_connection_details( succeeded_at=TEST_MAX_TIME_STR, failed_at=TEST_PREVIOUS_MAX_TIME_STR ), @@ -469,7 +475,7 @@ def all_api_mocks_fixture( ) fetch_workspace_data_api_mocks.add( method=responses.PATCH, - url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}", + url=get_fivetran_connector_api_url(connector_id), json=get_sample_connection_details( succeeded_at=TEST_MAX_TIME_STR, failed_at=TEST_PREVIOUS_MAX_TIME_STR ), @@ -477,19 +483,19 @@ def all_api_mocks_fixture( ) fetch_workspace_data_api_mocks.add( method=responses.POST, - url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}/force", + url=f"{get_fivetran_connector_api_url(connector_id)}/force", json=SAMPLE_SUCCESS_MESSAGE, status=200, ) fetch_workspace_data_api_mocks.add( method=responses.POST, - url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}/resync", + url=f"{get_fivetran_connector_api_url(connector_id)}/resync", json=SAMPLE_SUCCESS_MESSAGE, status=200, ) fetch_workspace_data_api_mocks.add( method=responses.POST, - url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}/schemas/tables/resync", + url=f"{get_fivetran_connector_api_url(connector_id)}/schemas/tables/resync", json=SAMPLE_SUCCESS_MESSAGE, status=200, ) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py index a2ee8f6f5fa02..dad8c12264953 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py @@ -2,18 +2,18 @@ import responses from dagster import Failure from dagster._vendored.dateutil import parser -from dagster_fivetran import FivetranWorkspace +from dagster_fivetran import FivetranOutput, FivetranWorkspace from dagster_fivetran.translator import MIN_TIME_STR from dagster_fivetran_tests.experimental.conftest import ( - FIVETRAN_API_BASE, - FIVETRAN_API_VERSION, - FIVETRAN_CONNECTOR_ENDPOINT, + SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR, + SAMPLE_SUCCESS_MESSAGE, TEST_ACCOUNT_ID, TEST_API_KEY, TEST_API_SECRET, TEST_MAX_TIME_STR, TEST_PREVIOUS_MAX_TIME_STR, + get_fivetran_connector_api_url, get_sample_connection_details, ) @@ -95,7 +95,7 @@ def test_basic_resource_request( # Replace the mock API call and set `failed_at` as more recent that `succeeded_at` all_api_mocks.replace( method_or_response=responses.GET, - url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}", + url=get_fivetran_connector_api_url(connector_id), json=get_sample_connection_details( succeeded_at=TEST_PREVIOUS_MAX_TIME_STR, failed_at=TEST_MAX_TIME_STR ), @@ -108,3 +108,74 @@ def test_basic_resource_request( poll_timeout=2, poll_interval=1, ) + + +@pytest.mark.parametrize( + "n_polls, succeed_at_end", + [(0, True), (0, False), (4, True), (4, False), (30, True)], + ids=["short_success", "short_failure", "medium_success", "medium_failure", "long_success"], +) +def test_sync_and_poll(n_polls, succeed_at_end, connector_id): + resource = FivetranWorkspace( + account_id=TEST_ACCOUNT_ID, api_key=TEST_API_KEY, api_secret=TEST_API_SECRET + ) + client = resource.get_client() + + test_connector_api_url = get_fivetran_connector_api_url(connector_id) + + test_succeeded_at = TEST_MAX_TIME_STR + test_failed_at = TEST_PREVIOUS_MAX_TIME_STR + # Set `failed_at` as more recent that `succeeded_at` if the sync and poll process is expected to fail + if not succeed_at_end: + test_succeeded_at = TEST_PREVIOUS_MAX_TIME_STR + test_failed_at = TEST_MAX_TIME_STR + + # Create mock responses to mock full sync and poll behavior, used only in this test + def _mock_interaction(): + with responses.RequestsMock() as response: + response.add( + responses.GET, + f"{test_connector_api_url}/schemas", + json=SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR, + ) + response.add(responses.PATCH, test_connector_api_url, json=SAMPLE_SUCCESS_MESSAGE) + response.add( + responses.POST, f"{test_connector_api_url}/force", json=SAMPLE_SUCCESS_MESSAGE + ) + # initial state + response.add( + responses.GET, + test_connector_api_url, + json=get_sample_connection_details( + succeeded_at=MIN_TIME_STR, failed_at=MIN_TIME_STR + ), + ) + # n polls before updating + for _ in range(n_polls): + response.add( + responses.GET, + test_connector_api_url, + json=get_sample_connection_details( + succeeded_at=MIN_TIME_STR, failed_at=MIN_TIME_STR + ), + ) + # final state will be updated + response.add( + responses.GET, + test_connector_api_url, + json=get_sample_connection_details( + succeeded_at=test_succeeded_at, failed_at=test_failed_at + ), + ) + return client.sync_and_poll(connector_id, poll_interval=0.1) + + if succeed_at_end: + assert _mock_interaction() == FivetranOutput( + connector_details=get_sample_connection_details( + succeeded_at=test_succeeded_at, failed_at=test_failed_at + )["data"], + schema_config=SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR["data"], + ) + else: + with pytest.raises(Failure, match="failed!"): + _mock_interaction()