Skip to content

Commit

Permalink
Update materialization logic; add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
maximearmstrong committed Nov 27, 2024
1 parent 4982b3b commit b0e8961
Show file tree
Hide file tree
Showing 4 changed files with 273 additions and 159 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: Fivet
)
"""
dagster_fivetran_translator = dagster_fivetran_translator or DagsterFivetranTranslator()

return multi_asset(
name=name,
group_name=group_name,
Expand All @@ -111,7 +113,6 @@ def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: Fivet
)
for spec in workspace.load_asset_specs(
dagster_fivetran_translator=dagster_fivetran_translator
or DagsterFivetranTranslator()
)
if FivetranMetadataSet.extract(spec.metadata).connector_id == connector_id
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import time
from datetime import datetime, timedelta
from functools import partial
from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Union
from typing import Any, Callable, Iterator, Mapping, Optional, Sequence, Tuple, Union
from urllib.parse import urljoin

import requests
Expand All @@ -14,9 +14,9 @@
Definitions,
Failure,
InitResourceContext,
MaterializeResult,
MetadataValue,
OpExecutionContext,
Output,
__version__,
_check as check,
get_dagster_logger,
Expand All @@ -27,7 +27,7 @@
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from dagster._record import record
from dagster._record import as_dict, record
from dagster._utils.cached_method import cached_method
from dagster._vendored.dateutil import parser
from pydantic import Field, PrivateAttr
Expand Down Expand Up @@ -977,25 +977,37 @@ def _generate_materialization(

yield AssetMaterialization(
asset_key=asset_key,
description=f"Table generated via Fivetran sync: {schema.name}.{table.name}",
description=(
f"Table generated via Fivetran sync: {schema.name_in_destination}.{table.name_in_destination}"
),
metadata={
**metadata_for_table(
table,
as_dict(table),
get_fivetran_connector_url(fivetran_output.connector_details),
include_column_info=True,
database=None,
schema=schema.name,
table=table.name,
schema=schema.name_in_destination,
table=table.name_in_destination,
),
"schema_source_name": schema_source_name,
"table_source_name": table_source_name,
},
)

def sync_and_poll(
self, context: Optional[Union[OpExecutionContext, AssetExecutionContext]] = None
):
# TODO: Add docstrings
self, context: Union[OpExecutionContext, AssetExecutionContext]
) -> Iterator[Union[AssetMaterialization, MaterializeResult]]:
"""Executes a sync and poll process to materialize Fivetran assets.
Args:
context (Union[OpExecutionContext, AssetExecutionContext]): The execution context
from within `@fivetran_assets`. If an AssetExecutionContext is passed,
its underlying OpExecutionContext will be used.
Returns:
Iterator[Union[AssetMaterialization, MaterializeResult]]: An iterator of MaterializeResult
or AssetMaterialization.
"""
assets_def = context.assets_def
dagster_fivetran_translator = get_translator_from_fivetran_assets(assets_def)

Expand All @@ -1013,13 +1025,11 @@ def sync_and_poll(
for materialization in self._generate_materialization(
fivetran_output=fivetran_output, dagster_fivetran_translator=dagster_fivetran_translator
):
# scan through all tables actually created, if it was expected then emit an Output.
# otherwise, emit a runtime AssetMaterialization
# Scan through all tables actually created, if it was expected then emit a MaterializeResult.
# Otherwise, emit a runtime AssetMaterialization.
if materialization.asset_key in context.selected_asset_keys:
yield Output(
value=None,
output_name=materialization.asset_key.to_python_identifier(),
metadata=materialization.metadata,
yield MaterializeResult(
asset_key=materialization.asset_key, metadata=materialization.metadata
)
materialized_asset_keys.add(materialization.asset_key)
else:
Expand Down
Loading

0 comments on commit b0e8961

Please sign in to comment.