Skip to content

Commit

Permalink
[dagster-fivetran] Implement get_columns_config_for_table in Fivetran…
Browse files Browse the repository at this point in the history
…Client (#26181)

## Summary & Motivation

Implements `get_columns_config_for_table` with tests, to be used #26110.

## How I Tested These Changes

Additional test with BK
  • Loading branch information
maximearmstrong authored Dec 9, 2024
1 parent d1594bc commit dcabca7
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,24 @@ def update_schedule_type_for_connector(
method="PATCH", endpoint=connector_id, data=json.dumps({"schedule_type": schedule_type})
)

def get_columns_config_for_table(
self, connector_id: str, schema_name: str, table_name: str
) -> Mapping[str, Any]:
"""Fetches the source table columns config for a given table from the Fivetran API.
Args:
connector_id (str): The Fivetran Connector ID.
schema_name (str): The Fivetran Schema name.
table_name (str): The Fivetran Table name.
Returns:
Dict[str, Any]: Parsed json data from the response to this request.
"""
return self._make_connector_request(
method="GET",
endpoint=f"{connector_id}/schemas/{schema_name}/tables/{table_name}/columns",
)

def start_sync(self, connector_id: str) -> None:
"""Initiates a sync of a Fivetran connector.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ class FivetranTable:

enabled: bool
name_in_destination: str
# We keep the raw data for columns to add it as `column_info in the metadata.
# We keep the raw data for columns to add it as `column_info` in the metadata.
columns: Optional[Mapping[str, Any]]

@classmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
TEST_API_SECRET = "test_api_secret"
TEST_ANOTHER_ACCOUNT_ID = "test_another_account_id"

TEST_SCHEMA_NAME = "schema_name_in_destination_1"
TEST_TABLE_NAME = "table_name_in_destination_1"
TEST_ANOTHER_TABLE_NAME = "another_table_name_in_destination_1"

# Taken from Fivetran API documentation
# https://fivetran.com/docs/rest-api/api-reference/groups/list-all-groups
SAMPLE_GROUPS = {
Expand Down Expand Up @@ -401,16 +405,47 @@ def get_sample_schema_config_for_connector(table_name: str) -> Mapping[str, Any]


SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR = get_sample_schema_config_for_connector(
table_name="table_name_in_destination_1"
table_name=TEST_TABLE_NAME
)

# We change the name of the original example to test the sync and poll materialization method
ALTERED_SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR = get_sample_schema_config_for_connector(
table_name="another_table_name_in_destination_1"
table_name=TEST_ANOTHER_TABLE_NAME
)

SAMPLE_SUCCESS_MESSAGE = {"code": "Success", "message": "Operation performed."}

SAMPLE_SOURCE_TABLE_COLUMNS_CONFIG = {
"code": "Success",
"message": "Operation performed.",
"data": {
"columns": {
"property1": {
"name_in_destination": "column_name_in_destination_1",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
"allowed": False,
"reason": "...",
"reason_code": "SYSTEM_COLUMN",
},
"is_primary_key": True,
},
"property2": {
"name_in_destination": "column_name_in_destination_2",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
"allowed": False,
"reason": "...",
"reason_code": "SYSTEM_COLUMN",
},
"is_primary_key": True,
},
}
},
}


def get_fivetran_connector_api_url(connector_id: str) -> str:
return (
Expand Down Expand Up @@ -480,40 +515,47 @@ def all_api_mocks_fixture(
group_id: str,
fetch_workspace_data_api_mocks: responses.RequestsMock,
) -> Iterator[responses.RequestsMock]:
test_connector_api_url = get_fivetran_connector_api_url(connector_id)
fetch_workspace_data_api_mocks.add(
method=responses.GET,
url=get_fivetran_connector_api_url(connector_id),
url=test_connector_api_url,
json=get_sample_connection_details(
succeeded_at=TEST_MAX_TIME_STR, failed_at=TEST_PREVIOUS_MAX_TIME_STR
),
status=200,
)
fetch_workspace_data_api_mocks.add(
method=responses.PATCH,
url=get_fivetran_connector_api_url(connector_id),
url=test_connector_api_url,
json=get_sample_connection_details(
succeeded_at=TEST_MAX_TIME_STR, failed_at=TEST_PREVIOUS_MAX_TIME_STR
),
status=200,
)
fetch_workspace_data_api_mocks.add(
method=responses.POST,
url=f"{get_fivetran_connector_api_url(connector_id)}/force",
url=f"{test_connector_api_url}/force",
json=SAMPLE_SUCCESS_MESSAGE,
status=200,
)
fetch_workspace_data_api_mocks.add(
method=responses.POST,
url=f"{get_fivetran_connector_api_url(connector_id)}/resync",
url=f"{test_connector_api_url}/resync",
json=SAMPLE_SUCCESS_MESSAGE,
status=200,
)
fetch_workspace_data_api_mocks.add(
method=responses.POST,
url=f"{get_fivetran_connector_api_url(connector_id)}/schemas/tables/resync",
url=f"{test_connector_api_url}/schemas/tables/resync",
json=SAMPLE_SUCCESS_MESSAGE,
status=200,
)
fetch_workspace_data_api_mocks.add(
method=responses.GET,
url=f"{test_connector_api_url}/schemas/{TEST_SCHEMA_NAME}/tables/{TEST_TABLE_NAME}/columns",
json=SAMPLE_SOURCE_TABLE_COLUMNS_CONFIG,
status=200,
)
yield fetch_workspace_data_api_mocks


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
TEST_API_SECRET,
TEST_MAX_TIME_STR,
TEST_PREVIOUS_MAX_TIME_STR,
TEST_SCHEMA_NAME,
TEST_TABLE_NAME,
get_fivetran_connector_api_url,
get_sample_connection_details,
)
Expand Down Expand Up @@ -58,6 +60,17 @@ def test_basic_resource_request(
assert connector_id in all_api_mocks.calls[1].request.url
assert all_api_mocks.calls[1].request.method == "PATCH"

# columns config calls
all_api_mocks.calls.reset()
client.get_columns_config_for_table(
connector_id=connector_id, schema_name=TEST_SCHEMA_NAME, table_name=TEST_TABLE_NAME
)
assert len(all_api_mocks.calls) == 1
assert (
f"{connector_id}/schemas/{TEST_SCHEMA_NAME}/tables/{TEST_TABLE_NAME}/columns"
in all_api_mocks.calls[0].request.url
)

# sync calls
all_api_mocks.calls.reset()
client.start_sync(connector_id=connector_id)
Expand Down

0 comments on commit dcabca7

Please sign in to comment.