Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[11/n][dagster-fivetran] Implement materialization method in FivetranWorkspace #25961

Open
wants to merge 6 commits into
base: maxime/use-translator-instance-in-load-fivetran-asset-specs
Choose a base branch
from

Conversation

maximearmstrong
Copy link
Contributor

@maximearmstrong maximearmstrong commented Nov 15, 2024

Summary & Motivation

This PR implements FivetranWorkspace.sync_and_poll, the materialization method for Fivetran assets. This method:

  • calls FivetranClient.sync_and_poll
  • takes the FivetranOutput returned by FivetranClient.sync_and_poll and generates the asset materializations
  • yields MaterializeResult for each expected asset and AssetMaterialization for each unexpected asset
    • a connector table that was not in the connector at definitions loading time can be in the FivetranOutput. Eg. the table was added after definitions loading time and before sync.
  • logs a warning for each unmaterialized table
    • a connector table can be created at definitions loading time, but can be missing in the FivetranOutput. Eg. the table was deleted after definitions loading time and before sync.

Can be leveraged like:

from dagster_fivetran import FivetranWorkspace, fivetran_assets

import dagster as dg

fivetran_workspace = FivetranWorkspace(
    account_id=dg.EnvVar("FIVETRAN_API_KEY"),
    api_key=dg.EnvVar("FIVETRAN_API_KEY"),
    api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)


@fivetran_assets(
    connector_id="connector_id",
    name="connector_id",
    group_name="connector_id",
    workspace=fivetran_workspace,
)
def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: FivetranWorkspace):
    yield from fivetran.sync_and_poll(context=context)


defs = dg.Definitions(
    assets=[fivetran_connector_assets],
    resources={"fivetran": fivetran_workspace},
)

How I Tested These Changes

Additional tests with BK

Tested with a live Fivetran instance:

Screenshot 2024-11-26 at 5 46 31 PM

Changelog

[dagster-fivetran]

Copy link
Contributor Author

maximearmstrong commented Nov 15, 2024

Warning

This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
Learn more

This stack of pull requests is managed by Graphite. Learn more about stacking.

Comment on lines 610 to 614
self._make_connector_request(
method="GET",
endpoint=f"{connector_id}/schemas/{schema_name}/tables/{table_name}/columns",
)
return self._make_request("GET", f"connectors/{connector_id}/schemas")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There appears to be a mismatch between the API call and its return value. The method makes a request for column data via _make_connector_request() but then discards that result and returns schema config data instead. To fix this, replace the two calls with a single return:

return self._make_connector_request(
    method='GET',
    endpoint=f'{connector_id}/schemas/{schema_name}/tables/{table_name}/columns'
)

This will properly return the column data that was requested.

Spotted by Graphite Reviewer

Is this helpful? React 👍 or 👎 to let us know.

Comment on lines 962 to 1000
"An error occurred while fetching column metadata for table %s",
f"Exception: {e}",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The string formatting appears incorrect here. The %s placeholder expects a value to substitute, but the second argument is an f-string that's already been evaluated. To properly log both the table info and exception, consider:

self._log.warning("An error occurred while fetching column metadata: %s", str(e))

or if table information is needed:

self._log.warning(
    "An error occurred while fetching column metadata for table %s: %s",
    f"{schema_source_name}.{table_source_name}",
    str(e)
)

Spotted by Graphite Reviewer

Is this helpful? React 👍 or 👎 to let us know.

@maximearmstrong maximearmstrong force-pushed the maxime/rework-fivetran-11 branch 4 times, most recently from b417e4e to 3150e3e Compare November 22, 2024 14:22
@maximearmstrong maximearmstrong marked this pull request as ready for review November 26, 2024 23:06
@maximearmstrong maximearmstrong self-assigned this Nov 26, 2024
@maximearmstrong maximearmstrong changed the base branch from maxime/rework-fivetran-10 to maxime/use-translator-instance-in-load-fivetran-asset-specs November 27, 2024 19:28
@maximearmstrong maximearmstrong force-pushed the maxime/use-translator-instance-in-load-fivetran-asset-specs branch from d2fa78b to 67d0fcd Compare November 27, 2024 20:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant