Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[11/n][dagster-fivetran] Implement materialization method in FivetranWorkspace #25961

Open
wants to merge 6 commits into
base: maxime/use-translator-instance-in-load-fivetran-asset-specs
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from dagster_fivetran.resources import FivetranWorkspace
from dagster_fivetran.translator import DagsterFivetranTranslator, FivetranMetadataSet
from dagster_fivetran.utils import DAGSTER_FIVETRAN_TRANSLATOR_METADATA_KEY


@experimental
Expand Down Expand Up @@ -100,15 +101,18 @@ def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: Fivet
)

"""
dagster_fivetran_translator = dagster_fivetran_translator or DagsterFivetranTranslator()

return multi_asset(
name=name,
group_name=group_name,
can_subset=True,
specs=[
spec
spec.merge_attributes(
metadata={DAGSTER_FIVETRAN_TRANSLATOR_METADATA_KEY: dagster_fivetran_translator}
)
for spec in workspace.load_asset_specs(
dagster_fivetran_translator=dagster_fivetran_translator
or DagsterFivetranTranslator()
)
if FivetranMetadataSet.extract(spec.metadata).connector_id == connector_id
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@
import time
from datetime import datetime, timedelta
from functools import partial
from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Union
from typing import Any, Callable, Iterator, Mapping, Optional, Sequence, Tuple, Union
from urllib.parse import urljoin

import requests
from dagster import (
AssetExecutionContext,
AssetMaterialization,
Definitions,
Failure,
InitResourceContext,
MaterializeResult,
MetadataValue,
OpExecutionContext,
__version__,
Expand All @@ -25,7 +27,7 @@
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from dagster._record import record
from dagster._record import as_dict, record
from dagster._utils.cached_method import cached_method
from dagster._vendored.dateutil import parser
from pydantic import Field, PrivateAttr
Expand All @@ -36,12 +38,20 @@
DagsterFivetranTranslator,
FivetranConnector,
FivetranConnectorScheduleType,
FivetranConnectorTableProps,
FivetranDestination,
FivetranMetadataSet,
FivetranSchemaConfig,
FivetranWorkspaceData,
)
from dagster_fivetran.types import FivetranOutput
from dagster_fivetran.utils import get_fivetran_connector_url, get_fivetran_logs_url
from dagster_fivetran.utils import (
get_fivetran_connector_table_name,
get_fivetran_connector_url,
get_fivetran_logs_url,
get_translator_from_fivetran_assets,
metadata_for_table,
)

FIVETRAN_API_BASE = "https://api.fivetran.com"
FIVETRAN_API_VERSION = "v1"
Expand Down Expand Up @@ -832,6 +842,7 @@ class FivetranWorkspace(ConfigurableResource):

_client: FivetranClient = PrivateAttr(default=None)

@cached_method
def get_client(self) -> FivetranClient:
return FivetranClient(
api_key=self.api_key,
Expand Down Expand Up @@ -929,10 +940,107 @@ def load_asset_specs(
dagster_fivetran_translator=dagster_fivetran_translator or DagsterFivetranTranslator(),
)

def sync_and_poll(
self, context: Optional[Union[OpExecutionContext, AssetExecutionContext]] = None
def _generate_materialization(
self,
fivetran_output: FivetranOutput,
dagster_fivetran_translator: DagsterFivetranTranslator,
):
raise NotImplementedError()
connector = FivetranConnector.from_connector_details(
connector_details=fivetran_output.connector_details
)
schema_config = FivetranSchemaConfig.from_schema_config_details(
schema_config_details=fivetran_output.schema_config
)

for schema_source_name, schema in schema_config.schemas.items():
if not schema.enabled:
continue

for table_source_name, table in schema.tables.items():
if not table.enabled:
continue

asset_key = dagster_fivetran_translator.get_asset_spec(
props=FivetranConnectorTableProps(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We reconstruct a FivetranConnectorTableProps to get the asset key using get_asset_spec(...).key

table=get_fivetran_connector_table_name(
schema_name=schema.name_in_destination,
table_name=table.name_in_destination,
),
connector_id=connector.id,
name=connector.name,
connector_url=connector.url,
schema_config=schema_config,
database=None,
service=None,
)
).key

yield AssetMaterialization(
asset_key=asset_key,
description=(
f"Table generated via Fivetran sync: {schema.name_in_destination}.{table.name_in_destination}"
),
metadata={
**metadata_for_table(
as_dict(table),
get_fivetran_connector_url(fivetran_output.connector_details),
include_column_info=True,
database=None,
schema=schema.name_in_destination,
table=table.name_in_destination,
),
"schema_source_name": schema_source_name,
"table_source_name": table_source_name,
},
)

def sync_and_poll(
self, context: Union[OpExecutionContext, AssetExecutionContext]
) -> Iterator[Union[AssetMaterialization, MaterializeResult]]:
"""Executes a sync and poll process to materialize Fivetran assets.

Args:
context (Union[OpExecutionContext, AssetExecutionContext]): The execution context
from within `@fivetran_assets`. If an AssetExecutionContext is passed,
its underlying OpExecutionContext will be used.

Returns:
Iterator[Union[AssetMaterialization, MaterializeResult]]: An iterator of MaterializeResult
or AssetMaterialization.
"""
assets_def = context.assets_def
dagster_fivetran_translator = get_translator_from_fivetran_assets(assets_def)

connector_id = next(
check.not_none(FivetranMetadataSet.extract(spec.metadata).connector_id)
for spec in assets_def.specs
)

client = self.get_client()
fivetran_output = client.sync_and_poll(
connector_id=connector_id,
)

materialized_asset_keys = set()
for materialization in self._generate_materialization(
fivetran_output=fivetran_output, dagster_fivetran_translator=dagster_fivetran_translator
):
# Scan through all tables actually created, if it was expected then emit a MaterializeResult.
# Otherwise, emit a runtime AssetMaterialization.
if materialization.asset_key in context.selected_asset_keys:
yield MaterializeResult(
asset_key=materialization.asset_key, metadata=materialization.metadata
)
materialized_asset_keys.add(materialization.asset_key)
else:
context.log.warning(
f"An unexpected asset was materialized: {materialization.asset_key}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe note that we're still going to yield a materialization event.

)
yield materialization

unmaterialized_asset_keys = context.selected_asset_keys - materialized_asset_keys
if unmaterialized_asset_keys:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

context.log.warning(f"Assets were not materialized: {unmaterialized_asset_keys}")


@experimental
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
from typing import Any, Dict, Iterator, Mapping, Optional, Sequence
from typing import TYPE_CHECKING, Any, Dict, Iterator, Mapping, Optional, Sequence

import dagster._check as check
from dagster import AssetMaterialization, MetadataValue
from dagster import (
AssetMaterialization,
AssetsDefinition,
DagsterInvariantViolationError,
MetadataValue,
)
from dagster._core.definitions.metadata import RawMetadataMapping
from dagster._core.definitions.metadata.metadata_set import TableMetadataSet
from dagster._core.definitions.metadata.table import TableColumn, TableSchema

from dagster_fivetran.types import FivetranOutput

if TYPE_CHECKING:
from dagster_fivetran import DagsterFivetranTranslator

DAGSTER_FIVETRAN_TRANSLATOR_METADATA_KEY = "dagster-fivetran/dagster_fivetran_translator"


def get_fivetran_connector_url(connector_details: Mapping[str, Any]) -> str:
service = connector_details["service"]
Expand All @@ -23,6 +33,21 @@ def get_fivetran_connector_table_name(schema_name: str, table_name: str) -> str:
return f"{schema_name}.{table_name}"


def get_translator_from_fivetran_assets(
fivetran_assets: AssetsDefinition,
) -> "DagsterFivetranTranslator":
metadata_by_key = fivetran_assets.metadata_by_key or {}
first_asset_key = next(iter(fivetran_assets.metadata_by_key.keys()))
first_metadata = metadata_by_key.get(first_asset_key, {})
dagster_fivetran_translator = first_metadata.get(DAGSTER_FIVETRAN_TRANSLATOR_METADATA_KEY)
if dagster_fivetran_translator is None:
raise DagsterInvariantViolationError(
f"Expected to find fivetran translator metadata on asset {first_asset_key.to_user_string()},"
" but did not. Did you pass in assets that weren't generated by @fivetran_assets?"
)
return dagster_fivetran_translator


def metadata_for_table(
table_data: Mapping[str, Any],
connector_url: str,
Expand Down
Loading