Skip to content

Commit

Permalink
[11/n][dagster-fivetran] Implement materialization method in Fivetran…
Browse files Browse the repository at this point in the history
…Workspace
  • Loading branch information
maximearmstrong committed Nov 19, 2024
1 parent 6e115f1 commit 78fe129
Showing 1 changed file with 144 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import os
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta
from functools import lru_cache, partial
from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Type, Union
Expand All @@ -10,11 +11,13 @@
import requests
from dagster import (
AssetExecutionContext,
AssetMaterialization,
Definitions,
Failure,
InitResourceContext,
MetadataValue,
OpExecutionContext,
Output,
__version__,
_check as check,
get_dagster_logger,
Expand All @@ -24,7 +27,11 @@
from dagster._config.pythonic_config import ConfigurableResource
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader
from dagster._core.definitions.metadata.metadata_set import TableMetadataSet
from dagster._core.definitions.metadata.table import TableColumn, TableSchema
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from dagster._core.errors import DagsterStepOutputNotFoundError
from dagster._core.utils import imap
from dagster._record import record
from dagster._utils.cached_method import cached_method
from dagster._vendored.dateutil import parser
Expand All @@ -37,21 +44,31 @@
FivetranConnector,
FivetranConnectorScheduleType,
FivetranDestination,
FivetranMetadataSet,
FivetranSchemaConfig,
FivetranWorkspaceData,
)
from dagster_fivetran.types import FivetranOutput
from dagster_fivetran.utils import get_fivetran_connector_url, get_fivetran_logs_url
from dagster_fivetran.utils import (
generate_materializations,
get_fivetran_connector_url,
get_fivetran_logs_url,
)

FIVETRAN_API_BASE = "https://api.fivetran.com"
FIVETRAN_API_VERSION = "v1"
FIVETRAN_CONNECTOR_ENDPOINT = "connectors"
FIVETRAN_API_VERSION_PATH = f"{FIVETRAN_API_VERSION}/"
FIVETRAN_CONNECTOR_PATH = f"{FIVETRAN_CONNECTOR_ENDPOINT}/"

DAGSTER_FIVETRAN_FETCH_COLUMN_METADATA_KEY = "dagster-fivetran/fetch_column_metadata"
DAGSTER_FIVETRAN_INFER_MISSING_TABLES_METADATA_KEY = "dagster-fivetran/infer_missing_tables"

# default polling interval (in seconds)
DEFAULT_POLL_INTERVAL = 10

DEFAULT_MAX_THREADPOOL_WORKERS = 10

FIVETRAN_RECONSTRUCTION_METADATA_KEY_PREFIX = "dagster-fivetran/reconstruction_metadata"


Expand Down Expand Up @@ -577,6 +594,25 @@ def get_schema_config_for_connector(self, connector_id: str) -> Mapping[str, Any
"""
return self._make_request("GET", f"connectors/{connector_id}/schemas")

def get_columns_for_table(
self, connector_id: str, schema_name: str, table_name: str
) -> Mapping[str, Any]:
"""Fetches the connector schema config for a given connector from the Fivetran API.
Args:
connector_id (str): The Fivetran Connector ID.
schema_name (str): The Fivetran Schema name.
table_name (str): The Fivetran Table name.
Returns:
Dict[str, Any]: Parsed json data from the response to this request.
"""
self._make_connector_request(
method="GET",
endpoint=f"{connector_id}/schemas/{schema_name}/tables/{table_name}/columns",
)
return self._make_request("GET", f"connectors/{connector_id}/schemas")

def get_destination_details(self, destination_id: str) -> Mapping[str, Any]:
"""Fetches details about a given destination from the Fivetran API.
Expand Down Expand Up @@ -831,6 +867,7 @@ class FivetranWorkspace(ConfigurableResource):

_client: FivetranClient = PrivateAttr(default=None)

@cached_method
def get_client(self) -> FivetranClient:
return FivetranClient(
api_key=self.api_key,
Expand Down Expand Up @@ -891,10 +928,115 @@ def fetch_fivetran_workspace_data(
schema_configs_by_connector_id=schema_configs_by_connector_id,
)

def _fetch_and_attach_col_metadata(
self, connector_id: str, materialization: AssetMaterialization
) -> AssetMaterialization:
"""Subroutine to fetch column metadata for a given table from the Fivetran API and attach it to the
materialization.
"""
try:
schema_source_name = materialization.metadata["schema_source_name"].value
table_source_name = materialization.metadata["table_source_name"].value

table_conn_data = self.get_client().get_columns_for_table(
connector_id=connector_id,
schema_name=schema_source_name,
table_name=table_source_name,
)
columns = check.dict_elem(table_conn_data, "columns")
table_columns = sorted(
[
TableColumn(name=col["name_in_destination"], type="")
for col in columns.values()
if "name_in_destination" in col and col.get("enabled")
],
key=lambda col: col.name,
)
return materialization.with_metadata(
{
**materialization.metadata,
**TableMetadataSet(column_schema=TableSchema(table_columns)),
}
)
except Exception as e:
self._log.warning(
"An error occurred while fetching column metadata for table %s",
f"Exception: {e}",
exc_info=True,
)
return materialization

def sync_and_poll(
self, context: Optional[Union[OpExecutionContext, AssetExecutionContext]] = None
):
raise NotImplementedError()
# TODO: Add docstrings
assets_def = context.assets_def

# TODO: Add op tags to fivetran_assets decorator and build_fivetran_assets_definitions factory
fetch_column_metadata = context.op.tags.get(DAGSTER_FIVETRAN_FETCH_COLUMN_METADATA_KEY)
infer_missing_tables = context.op.tags.get(
DAGSTER_FIVETRAN_INFER_MISSING_TABLES_METADATA_KEY
)

connector_id = next(
check.not_none(FivetranMetadataSet.extract(spec.metadata).connector_id)
for spec in assets_def.specs
)

client = self.get_client()
fivetran_output = client.sync_and_poll(
connector_id=connector_id,
)

materialized_asset_keys = set()

_map_fn: Callable[[AssetMaterialization], AssetMaterialization] = (
lambda materialization: self._fetch_and_attach_col_metadata(
connector_id, materialization
)
if fetch_column_metadata
else materialization
)
with ThreadPoolExecutor(
max_workers=DEFAULT_MAX_THREADPOOL_WORKERS,
thread_name_prefix=f"fivetran_{connector_id}",
) as executor:
for materialization in imap(
executor=executor,
# TODO: Create new asset materialization fn with assets and not asset key prefix
iterable=generate_materializations(
fivetran_output,
asset_key_prefix=[],
),
func=_map_fn,
):
# scan through all tables actually created, if it was expected then emit an Output.
# otherwise, emit a runtime AssetMaterialization
if materialization.asset_key in context.selected_asset_keys:
yield Output(
value=None,
output_name=materialization.asset_key.to_python_identifier(),
metadata=materialization.metadata,
)
materialized_asset_keys.add(materialization.asset_key)

else:
yield materialization

unmaterialized_asset_keys = context.selected_asset_keys - materialized_asset_keys
if infer_missing_tables:
for asset_key in unmaterialized_asset_keys:
yield Output(value=None, output_name=asset_key.to_python_identifier())
else:
if unmaterialized_asset_keys:
asset_key = next(iter(unmaterialized_asset_keys))
output_name = "_".join(asset_key.path)
raise DagsterStepOutputNotFoundError(
f"Core compute for {context.op_def.name} did not return an output for"
f' non-optional output "{output_name}".',
step_key=context.get_step_execution_context().step.key,
output_name=output_name,
)

def __eq__(self, other):
return (
Expand Down

0 comments on commit 78fe129

Please sign in to comment.