diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py index 2c72c311c6efc..e804495046fcd 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py @@ -1,4 +1,4 @@ -from typing import Iterator +from typing import Any, Iterator, Mapping import pytest import responses @@ -8,6 +8,13 @@ FIVETRAN_CONNECTOR_ENDPOINT, ) +TEST_MAX_TIME_STR = "2024-12-01T15:45:29.013729Z" +TEST_PREVIOUS_MAX_TIME_STR = "2024-12-01T15:43:29.013729Z" + +TEST_ACCOUNT_ID = "test_account_id" +TEST_API_KEY = "test_api_key" +TEST_API_SECRET = "test_api_secret" + # Taken from Fivetran API documentation # https://fivetran.com/docs/rest-api/api-reference/groups/list-all-groups SAMPLE_GROUPS = { @@ -150,70 +157,76 @@ # Taken from Fivetran API documentation # https://fivetran.com/docs/rest-api/api-reference/connectors/connector-details -SAMPLE_CONNECTOR_DETAILS = { - "code": "Success", - "message": "Operation performed.", - "data": { - "id": "connector_id", - "service": "15five", - "schema": "schema.table", - "paused": False, - "status": { - "tasks": [ - { - "code": "resync_table_warning", - "message": "Resync Table Warning", - "details": "string", - } - ], - "warnings": [ +# The sample is parameterized to test the poll method +def get_sample_connection_details(succeeded_at: str, failed_at: str) -> Mapping[str, Any]: + return { + "code": "Success", + "message": "Operation performed.", + "data": { + "id": "connector_id", + "service": "15five", + "schema": "schema.table", + "paused": False, + "status": { + "tasks": [ + { + "code": "resync_table_warning", + "message": "Resync Table Warning", + "details": "string", + } + ], + "warnings": [ + { + "code": "resync_table_warning", + "message": "Resync Table Warning", + "details": "string", + } + ], + "schema_status": "ready", + "update_state": "delayed", + "setup_state": "connected", + "sync_state": "scheduled", + "is_historical_sync": False, + "rescheduled_for": "2024-12-01T15:43:29.013729Z", + }, + "daily_sync_time": "14:00", + "succeeded_at": succeeded_at, + "sync_frequency": 1440, + "group_id": "my_group_destination_id", + "connected_by": "user_id", + "setup_tests": [ { - "code": "resync_table_warning", - "message": "Resync Table Warning", - "details": "string", + "title": "Test Title", + "status": "PASSED", + "message": "Test Passed", + "details": "Test Details", } ], - "schema_status": "ready", - "update_state": "delayed", - "setup_state": "connected", - "sync_state": "scheduled", - "is_historical_sync": False, - "rescheduled_for": "2024-12-01T15:43:29.013729Z", - }, - "daily_sync_time": "14:00", - "succeeded_at": "2024-12-01T15:45:29.013729Z", - "sync_frequency": 1440, - "group_id": "my_group_destination_id", - "connected_by": "user_id", - "setup_tests": [ - { - "title": "Test Title", - "status": "PASSED", - "message": "Test Passed", - "details": "Test Details", - } - ], - "source_sync_details": {}, - "service_version": 0, - "created_at": "2024-12-01T15:41:29.013729Z", - "failed_at": "2024-12-01T15:43:29.013729Z", - "private_link_id": "private_link_id", - "proxy_agent_id": "proxy_agent_id", - "networking_method": "Directly", - "connect_card": { - "token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJkIjp7ImxvZ2luIjp0cnVlLCJ1c2VyIjoiX2FjY291bnR3b3J0aHkiLCJhY2NvdW50IjoiX21vb25iZWFtX2FjYyIsImdyb3VwIjoiX21vb25iZWFtIiwiY29ubmVjdG9yIjoiY29iYWx0X2VsZXZhdGlvbiIsIm1ldGhvZCI6IlBiZkNhcmQiLCJpZGVudGl0eSI6ZmFsc2V9LCJpYXQiOjE2Njc4MzA2MzZ9.YUMGUbzxW96xsKJLo4bTorqzx8Q19GTrUi3WFRFM8BU", - "uri": "https://fivetran.com/connect-card/setup?auth=eyJ0eXAiOiJKV1QiLCJh...", + "source_sync_details": {}, + "service_version": 0, + "created_at": "2024-12-01T15:41:29.013729Z", + "failed_at": failed_at, + "private_link_id": "private_link_id", + "proxy_agent_id": "proxy_agent_id", + "networking_method": "Directly", + "connect_card": { + "token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJkIjp7ImxvZ2luIjp0cnVlLCJ1c2VyIjoiX2FjY291bnR3b3J0aHkiLCJhY2NvdW50IjoiX21vb25iZWFtX2FjYyIsImdyb3VwIjoiX21vb25iZWFtIiwiY29ubmVjdG9yIjoiY29iYWx0X2VsZXZhdGlvbiIsIm1ldGhvZCI6IlBiZkNhcmQiLCJpZGVudGl0eSI6ZmFsc2V9LCJpYXQiOjE2Njc4MzA2MzZ9.YUMGUbzxW96xsKJLo4bTorqzx8Q19GTrUi3WFRFM8BU", + "uri": "https://fivetran.com/connect-card/setup?auth=eyJ0eXAiOiJKV1QiLCJh...", + }, + "pause_after_trial": False, + "data_delay_threshold": 0, + "data_delay_sensitivity": "NORMAL", + "schedule_type": "auto", + "local_processing_agent_id": "local_processing_agent_id", + "connect_card_config": { + "redirect_uri": "https://your.site/path", + "hide_setup_guide": True, + }, + "hybrid_deployment_agent_id": "hybrid_deployment_agent_id", + "config": {"api_key": "your_15five_api_key"}, }, - "pause_after_trial": False, - "data_delay_threshold": 0, - "data_delay_sensitivity": "NORMAL", - "schedule_type": "auto", - "local_processing_agent_id": "local_processing_agent_id", - "connect_card_config": {"redirect_uri": "https://your.site/path", "hide_setup_guide": True}, - "hybrid_deployment_agent_id": "hybrid_deployment_agent_id", - "config": {"api_key": "your_15five_api_key"}, - }, -} + } + # Taken from Fivetran API documentation # https://fivetran.com/docs/rest-api/api-reference/connector-schema/connector-schema-config @@ -382,10 +395,6 @@ SAMPLE_SUCCESS_MESSAGE = {"code": "Success", "message": "Operation performed."} -TEST_ACCOUNT_ID = "test_account_id" -TEST_API_KEY = "test_api_key" -TEST_API_SECRET = "test_api_secret" - @pytest.fixture(name="connector_id") def connector_id_fixture() -> str: @@ -452,13 +461,17 @@ def all_api_mocks_fixture( fetch_workspace_data_api_mocks.add( method=responses.GET, url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}", - json=SAMPLE_CONNECTOR_DETAILS, + json=get_sample_connection_details( + succeeded_at=TEST_MAX_TIME_STR, failed_at=TEST_PREVIOUS_MAX_TIME_STR + ), status=200, ) fetch_workspace_data_api_mocks.add( method=responses.PATCH, url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}", - json=SAMPLE_CONNECTOR_DETAILS, + json=get_sample_connection_details( + succeeded_at=TEST_MAX_TIME_STR, failed_at=TEST_PREVIOUS_MAX_TIME_STR + ), status=200, ) fetch_workspace_data_api_mocks.add( diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py index 0d9fa410226a0..6f99ce709ae22 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py @@ -1,12 +1,20 @@ +import pytest import responses +from dagster import Failure from dagster._vendored.dateutil import parser from dagster_fivetran import FivetranWorkspace from dagster_fivetran.translator import MIN_TIME_STR from dagster_fivetran_tests.experimental.conftest import ( + FIVETRAN_API_BASE, + FIVETRAN_API_VERSION, + FIVETRAN_CONNECTOR_ENDPOINT, TEST_ACCOUNT_ID, TEST_API_KEY, TEST_API_SECRET, + TEST_MAX_TIME_STR, + TEST_PREVIOUS_MAX_TIME_STR, + get_sample_connection_details, ) @@ -63,8 +71,42 @@ def test_basic_resource_request( assert f"{connector_id}/schemas/tables/resync" in all_api_mocks.calls[2].request.url # poll calls + # Succeeded poll all_api_mocks.calls.reset() client.poll_sync( connector_id=connector_id, previous_sync_completed_at=parser.parse(MIN_TIME_STR) ) assert len(all_api_mocks.calls) == 1 + + # Timed out poll + all_api_mocks.calls.reset() + with pytest.raises(Failure) as e: + client.poll_sync( + connector_id=connector_id, + # The poll process will time out because the value of + # `FivetranConnector.last_sync_completed_at` does not change in the test + previous_sync_completed_at=parser.parse(TEST_MAX_TIME_STR), + poll_timeout=2, + poll_interval=1, + ) + assert f"Sync for connector '{connector_id}' timed out" in str(e.value) + + # Failed poll + all_api_mocks.calls.reset() + # Replace the mock API call and set `failed_at` as more recent that `succeeded_at` + all_api_mocks.replace( + method_or_response=responses.GET, + url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}", + json=get_sample_connection_details( + succeeded_at=TEST_PREVIOUS_MAX_TIME_STR, failed_at=TEST_MAX_TIME_STR + ), + status=200, + ) + with pytest.raises(Failure) as e: + client.poll_sync( + connector_id=connector_id, + previous_sync_completed_at=parser.parse(MIN_TIME_STR), + poll_timeout=2, + poll_interval=1, + ) + assert f"Sync for connector '{connector_id}' failed!" in str(e.value)