From 0d85b0bf13b8120b17e6181c45c016b7f4197ff3 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong <46797220+maximearmstrong@users.noreply.github.com> Date: Wed, 27 Nov 2024 08:43:58 -0500 Subject: [PATCH] [dagster-fivetran] Implement base resync method in FivetranClient (#26059) ## Summary & Motivation This PR reworks legacy resync method and implements it in the `FivetranClient`: - `start_resync` is added based on legacy `start_resync` - `start_resync` leverages `_start_sync` introduced in #25911 - a [resync in Fivetran](https://fivetran.com/docs/rest-api/api-reference/connectors/resync-connector) is historical data sync - the endpoint and result is different, but logic around how to call and handle a resync is the same as a sync. - a resync can be done with or without resync parameters, using a different endpoint. Tests mock the request API calls and make sure that all calls are made. ## How I Tested These Changes Additional unit tests with BK --- .../dagster_fivetran/resources.py | 23 +++++++++++++++++++ .../experimental/conftest.py | 12 ++++++++++ .../experimental/test_resources.py | 12 ++++++++++ 3 files changed, 47 insertions(+) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 5366c4bfe900f..2c9072934bfd6 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -631,6 +631,29 @@ def start_sync(self, connector_id: str) -> None: ) self._start_sync(request_fn=request_fn, connector_id=connector_id) + def start_resync( + self, connector_id: str, resync_parameters: Optional[Mapping[str, Sequence[str]]] = None + ) -> None: + """Initiates a historical sync of all data for multiple schema tables within a Fivetran connector. + + Args: + connector_id (str): The Fivetran Connector ID. You can retrieve this value from the + "Setup" tab of a given connector in the Fivetran UI. + resync_parameters (Optional[Dict[str, List[str]]]): Optional resync parameters to send to the Fivetran API. + An example payload can be found here: https://fivetran.com/docs/rest-api/connectors#request_7 + """ + request_fn = partial( + self._make_connector_request, + method="POST", + endpoint=( + f"{connector_id}/schemas/tables/resync" + if resync_parameters is not None + else f"{connector_id}/resync" + ), + data=json.dumps(resync_parameters) if resync_parameters is not None else None, + ) + self._start_sync(request_fn=request_fn, connector_id=connector_id) + def _start_sync(self, request_fn: Callable[[], Mapping[str, Any]], connector_id: str) -> None: connector = FivetranConnector.from_connector_details( connector_details=self.get_connector_details(connector_id) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py index 6e5a35593627b..267b536263d13 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py @@ -467,4 +467,16 @@ def all_api_mocks_fixture( json=SAMPLE_SUCCESS_MESSAGE, status=200, ) + fetch_workspace_data_api_mocks.add( + method=responses.POST, + url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}/resync", + json=SAMPLE_SUCCESS_MESSAGE, + status=200, + ) + fetch_workspace_data_api_mocks.add( + method=responses.POST, + url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}/schemas/tables/resync", + json=SAMPLE_SUCCESS_MESSAGE, + status=200, + ) yield fetch_workspace_data_api_mocks diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py index 882bcf2861222..43b1bd744918e 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py @@ -47,3 +47,15 @@ def test_basic_resource_request( client.start_sync(connector_id=connector_id) assert len(all_api_mocks.calls) == 3 assert f"{connector_id}/force" in all_api_mocks.calls[2].request.url + + # resync calls + all_api_mocks.calls.reset() + client.start_resync(connector_id=connector_id, resync_parameters=None) + assert len(all_api_mocks.calls) == 3 + assert f"{connector_id}/resync" in all_api_mocks.calls[2].request.url + + # resync calls with parameters + all_api_mocks.calls.reset() + client.start_resync(connector_id=connector_id, resync_parameters={"property1": ["string"]}) + assert len(all_api_mocks.calls) == 3 + assert f"{connector_id}/schemas/tables/resync" in all_api_mocks.calls[2].request.url