diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 50349698a6537..f7086e348cc63 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -723,6 +723,52 @@ def poll_sync( ) return connector_details + def sync_and_poll( + self, + connector_id: str, + poll_interval: float = DEFAULT_POLL_INTERVAL, + poll_timeout: Optional[float] = None, + ) -> FivetranOutput: + """Initializes a sync operation for the given connector, and polls until it completes. + + Args: + connector_id (str): The Fivetran Connector ID. You can retrieve this value from the + "Setup" tab of a given connector in the Fivetran UI. + poll_interval (float): The time (in seconds) that will be waited between successive polls. + poll_timeout (float): The maximum time that will wait before this operation is timed + out. By default, this will never time out. + + Returns: + :py:class:`~FivetranOutput`: + Object containing details about the connector and the tables it updates + """ + return self._sync_and_poll( + sync_fn=self.start_sync, + connector_id=connector_id, + poll_interval=poll_interval, + poll_timeout=poll_timeout, + ) + + def _sync_and_poll( + self, + sync_fn: Callable, + connector_id: str, + poll_interval: float = DEFAULT_POLL_INTERVAL, + poll_timeout: Optional[float] = None, + ) -> FivetranOutput: + schema_config_details = self.get_schema_config_for_connector(connector_id) + connector = FivetranConnector.from_connector_details( + connector_details=self.get_connector_details(connector_id) + ) + sync_fn(connector_id=connector_id) + final_details = self.poll_sync( + connector_id=connector_id, + previous_sync_completed_at=connector.last_sync_completed_at, + poll_interval=poll_interval, + poll_timeout=poll_timeout, + ) + return FivetranOutput(connector_details=final_details, schema_config=schema_config_details) + @experimental class FivetranWorkspace(ConfigurableResource):