Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-fivetran] Support infer_missing_tables in FivetranWorkspace.sync_and_poll #26111

Draft
wants to merge 1 commit into
base: maxime/add-fetch-column-metadata-to-fivetran-sync-and-poll
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from dagster._core.definitions.metadata.metadata_set import TableMetadataSet
from dagster._core.definitions.metadata.table import TableColumn, TableSchema
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from dagster._core.errors import DagsterStepOutputNotFoundError
from dagster._core.utils import imap
from dagster._record import record
from dagster._utils.cached_method import cached_method
Expand Down Expand Up @@ -64,11 +65,13 @@
FIVETRAN_CONNECTOR_PATH = f"{FIVETRAN_CONNECTOR_ENDPOINT}/"

DAGSTER_FIVETRAN_FETCH_COLUMN_METADATA_KEY = "dagster-fivetran/fetch_column_metadata"
DEFAULT_MAX_THREADPOOL_WORKERS = 10
DAGSTER_FIVETRAN_INFER_MISSING_TABLES_METADATA_KEY = "dagster-fivetran/infer_missing_tables"

# default polling interval (in seconds)
DEFAULT_POLL_INTERVAL = 10

DEFAULT_MAX_THREADPOOL_WORKERS = 10

FIVETRAN_RECONSTRUCTION_METADATA_KEY_PREFIX = "dagster-fivetran/reconstruction_metadata"


Expand Down Expand Up @@ -1028,6 +1031,9 @@ def sync_and_poll(

# TODO: Add op tags to fivetran_assets decorator and build_fivetran_assets_definitions factory
fetch_column_metadata = context.op.tags.get(DAGSTER_FIVETRAN_FETCH_COLUMN_METADATA_KEY)
infer_missing_tables = context.op.tags.get(
DAGSTER_FIVETRAN_INFER_MISSING_TABLES_METADATA_KEY
)

connector_id = next(
check.not_none(FivetranMetadataSet.extract(spec.metadata).connector_id)
Expand Down Expand Up @@ -1076,8 +1082,19 @@ def sync_and_poll(
yield materialization

unmaterialized_asset_keys = context.selected_asset_keys - materialized_asset_keys
if unmaterialized_asset_keys:
context.log.warning(f"Assets were not materialized: {unmaterialized_asset_keys}")
if infer_missing_tables:
for asset_key in unmaterialized_asset_keys:
yield Output(value=None, output_name=asset_key.to_python_identifier())
else:
if unmaterialized_asset_keys:
asset_key = next(iter(unmaterialized_asset_keys))
output_name = "_".join(asset_key.path)
raise DagsterStepOutputNotFoundError(
f"Core compute for {context.op_def.name} did not return an output for"
f' non-optional output "{output_name}".',
step_key=context.get_step_execution_context().step.key,
output_name=output_name,
)

def _fetch_and_attach_col_metadata(
self, connector_id: str, materialization: AssetMaterialization
Expand Down