From 34a6e9fbc945c4907347d4e65aebb3a03cf9ab86 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Wed, 20 Nov 2024 16:27:52 -0500 Subject: [PATCH] [dagster-fivetran] Implement base resync method in FivetranClient --- .../dagster_fivetran/resources.py | 23 +++++++++++++++++++ .../experimental/conftest.py | 6 +++++ .../experimental/test_resources.py | 6 +++++ 3 files changed, 35 insertions(+) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 5366c4bfe900f..9a77c317ade89 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -631,6 +631,29 @@ def start_sync(self, connector_id: str) -> None: ) self._start_sync(request_fn=request_fn, connector_id=connector_id) + def start_resync( + self, connector_id: str, resync_parameters: Optional[Mapping[str, Sequence[str]]] = None + ) -> None: + """Initiates a historical sync of all data for multiple schema tables within a Fivetran connector. + + Args: + connector_id (str): The Fivetran Connector ID. You can retrieve this value from the + "Setup" tab of a given connector in the Fivetran UI. + resync_parameters (Optional[Dict[str, List[str]]]): Optional resync parameters to send to the Fivetran API. + An example payload can be found here: https://fivetran.com/docs/rest-api/connectors#request_7 + """ + request_fn = partial( + self._make_connector_request, + method="POST", + endpoint=( + f"{connector_id}/schemas/tables/resync" + if resync_parameters is not None + else f"{connector_id}/resync" + ), + data=json.dumps(resync_parameters) if resync_parameters is not None else None, + ) + self._start_sync(request_fn=request_fn, connector_id=connector_id) + def _start_sync(self, request_fn: Callable[[], Mapping[str, Any]], connector_id: str) -> None: connector = FivetranConnector.from_connector_details( connector_details=self.get_connector_details(connector_id) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py index 6e5a35593627b..664caab3fa02d 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py @@ -467,4 +467,10 @@ def all_api_mocks_fixture( json=SAMPLE_SUCCESS_MESSAGE, status=200, ) + fetch_workspace_data_api_mocks.add( + method=responses.POST, + url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}/resync", + json=SAMPLE_SUCCESS_MESSAGE, + status=200, + ) yield fetch_workspace_data_api_mocks diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py index 882bcf2861222..bd99c892b9aea 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py @@ -47,3 +47,9 @@ def test_basic_resource_request( client.start_sync(connector_id=connector_id) assert len(all_api_mocks.calls) == 3 assert f"{connector_id}/force" in all_api_mocks.calls[2].request.url + + # resync calls + all_api_mocks.calls.reset() + client.start_resync(connector_id=connector_id, resync_parameters=None) + assert len(all_api_mocks.calls) == 3 + assert f"{connector_id}/resync" in all_api_mocks.calls[2].request.url \ No newline at end of file