Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-fivetran] Implement get_columns_config_for_table in FivetranClient #26181

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,24 @@ def update_schedule_type_for_connector(
method="PATCH", endpoint=connector_id, data=json.dumps({"schedule_type": schedule_type})
)

def get_columns_config_for_table(
self, connector_id: str, schema_name: str, table_name: str
) -> Mapping[str, Any]:
"""Fetches the source table columns config for a given table from the Fivetran API.

Args:
connector_id (str): The Fivetran Connector ID.
schema_name (str): The Fivetran Schema name.
table_name (str): The Fivetran Table name.

Returns:
Dict[str, Any]: Parsed json data from the response to this request.
"""
return self._make_connector_request(
method="GET",
endpoint=f"{connector_id}/schemas/{schema_name}/tables/{table_name}/columns",
)

def start_sync(self, connector_id: str) -> None:
"""Initiates a sync of a Fivetran connector.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ class FivetranTable:

enabled: bool
name_in_destination: str
# We keep the raw data for columns to add it as `column_info in the metadata.
# We keep the raw data for columns to add it as `column_info` in the metadata.
columns: Optional[Mapping[str, Any]]

@classmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
TEST_API_SECRET = "test_api_secret"
TEST_ANOTHER_ACCOUNT_ID = "test_another_account_id"

TEST_SCHEMA_NAME = "schema_name_in_destination_1"
TEST_TABLE_NAME = "table_name_in_destination_1"
TEST_ANOTHER_TABLE_NAME = "another_table_name_in_destination_1"

# Taken from Fivetran API documentation
# https://fivetran.com/docs/rest-api/api-reference/groups/list-all-groups
SAMPLE_GROUPS = {
Expand Down Expand Up @@ -401,16 +405,47 @@ def get_sample_schema_config_for_connector(table_name: str) -> Mapping[str, Any]


SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR = get_sample_schema_config_for_connector(
table_name="table_name_in_destination_1"
table_name=TEST_TABLE_NAME
)

# We change the name of the original example to test the sync and poll materialization method
ALTERED_SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR = get_sample_schema_config_for_connector(
table_name="another_table_name_in_destination_1"
table_name=TEST_ANOTHER_TABLE_NAME
)

SAMPLE_SUCCESS_MESSAGE = {"code": "Success", "message": "Operation performed."}

SAMPLE_SOURCE_TABLE_COLUMNS_CONFIG = {
"code": "Success",
"message": "Operation performed.",
"data": {
"columns": {
"property1": {
"name_in_destination": "column_name_in_destination_1",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
"allowed": False,
"reason": "...",
"reason_code": "SYSTEM_COLUMN",
},
"is_primary_key": True,
},
"property2": {
"name_in_destination": "column_name_in_destination_2",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
"allowed": False,
"reason": "...",
"reason_code": "SYSTEM_COLUMN",
},
"is_primary_key": True,
},
}
},
}


def get_fivetran_connector_api_url(connector_id: str) -> str:
return (
Expand Down Expand Up @@ -480,40 +515,47 @@ def all_api_mocks_fixture(
group_id: str,
fetch_workspace_data_api_mocks: responses.RequestsMock,
) -> Iterator[responses.RequestsMock]:
test_connector_api_url = get_fivetran_connector_api_url(connector_id)
fetch_workspace_data_api_mocks.add(
method=responses.GET,
url=get_fivetran_connector_api_url(connector_id),
url=test_connector_api_url,
json=get_sample_connection_details(
succeeded_at=TEST_MAX_TIME_STR, failed_at=TEST_PREVIOUS_MAX_TIME_STR
),
status=200,
)
fetch_workspace_data_api_mocks.add(
method=responses.PATCH,
url=get_fivetran_connector_api_url(connector_id),
url=test_connector_api_url,
json=get_sample_connection_details(
succeeded_at=TEST_MAX_TIME_STR, failed_at=TEST_PREVIOUS_MAX_TIME_STR
),
status=200,
)
fetch_workspace_data_api_mocks.add(
method=responses.POST,
url=f"{get_fivetran_connector_api_url(connector_id)}/force",
url=f"{test_connector_api_url}/force",
json=SAMPLE_SUCCESS_MESSAGE,
status=200,
)
fetch_workspace_data_api_mocks.add(
method=responses.POST,
url=f"{get_fivetran_connector_api_url(connector_id)}/resync",
url=f"{test_connector_api_url}/resync",
json=SAMPLE_SUCCESS_MESSAGE,
status=200,
)
fetch_workspace_data_api_mocks.add(
method=responses.POST,
url=f"{get_fivetran_connector_api_url(connector_id)}/schemas/tables/resync",
url=f"{test_connector_api_url}/schemas/tables/resync",
json=SAMPLE_SUCCESS_MESSAGE,
status=200,
)
fetch_workspace_data_api_mocks.add(
method=responses.GET,
url=f"{test_connector_api_url}/schemas/{TEST_SCHEMA_NAME}/tables/{TEST_TABLE_NAME}/columns",
json=SAMPLE_SOURCE_TABLE_COLUMNS_CONFIG,
status=200,
)
yield fetch_workspace_data_api_mocks


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
TEST_API_SECRET,
TEST_MAX_TIME_STR,
TEST_PREVIOUS_MAX_TIME_STR,
TEST_SCHEMA_NAME,
TEST_TABLE_NAME,
get_fivetran_connector_api_url,
get_sample_connection_details,
)
Expand Down Expand Up @@ -58,6 +60,17 @@ def test_basic_resource_request(
assert connector_id in all_api_mocks.calls[1].request.url
assert all_api_mocks.calls[1].request.method == "PATCH"

# columns config calls
all_api_mocks.calls.reset()
client.get_columns_config_for_table(
connector_id=connector_id, schema_name=TEST_SCHEMA_NAME, table_name=TEST_TABLE_NAME
)
assert len(all_api_mocks.calls) == 1
assert (
f"{connector_id}/schemas/{TEST_SCHEMA_NAME}/tables/{TEST_TABLE_NAME}/columns"
in all_api_mocks.calls[0].request.url
)

# sync calls
all_api_mocks.calls.reset()
client.start_sync(connector_id=connector_id)
Expand Down