Skip to content

Commit

Permalink
feat(ingest/fivetran): avoid duplicate table lineage entries (datahub…
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Oct 29, 2024
1 parent 20eed21 commit a11ac8d
Show file tree
Hide file tree
Showing 6 changed files with 249 additions and 194 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ class FivetranSourceReport(StaleEntityRemovalSourceReport):
def report_connectors_scanned(self, count: int = 1) -> None:
self.connectors_scanned += count

def report_connectors_dropped(self, model: str) -> None:
self.filtered_connectors.append(model)
def report_connectors_dropped(self, connector: str) -> None:
self.filtered_connectors.append(connector)


class PlatformDetail(ConfigModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def __init__(self, config: FivetranSourceConfig, ctx: PipelineContext):

self.audit_log = FivetranLogAPI(self.config.fivetran_log_config)

def _extend_lineage(self, connector: Connector, datajob: DataJob) -> None:
def _extend_lineage(self, connector: Connector, datajob: DataJob) -> Dict[str, str]:
input_dataset_urn_list: List[DatasetUrn] = []
output_dataset_urn_list: List[DatasetUrn] = []
fine_grained_lineage: List[FineGrainedLineage] = []
Expand All @@ -93,8 +93,11 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> None:
connector.connector_type
]
else:
logger.info(
f"Fivetran connector source type: {connector.connector_type} is not supported to mapped with Datahub dataset entity."
self.report.info(
title="Guessing source platform for lineage",
message="We encountered a connector type that we don't fully support yet. "
"We will attempt to guess the platform based on the connector type.",
context=f"{connector.connector_name} (connector_id: {connector.connector_id}, connector_type: {connector.connector_type})",
)
source_details.platform = connector.connector_type

Expand Down Expand Up @@ -170,7 +173,19 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> None:
datajob.inlets.extend(input_dataset_urn_list)
datajob.outlets.extend(output_dataset_urn_list)
datajob.fine_grained_lineages.extend(fine_grained_lineage)
return None

return dict(
**{
f"source.{k}": str(v)
for k, v in source_details.dict().items()
if v is not None
},
**{
f"destination.{k}": str(v)
for k, v in destination_details.dict().items()
if v is not None
},
)

def _generate_dataflow_from_connector(self, connector: Connector) -> DataFlow:
return DataFlow(
Expand All @@ -196,23 +211,23 @@ def _generate_datajob_from_connector(self, connector: Connector) -> DataJob:
owners={owner_email} if owner_email else set(),
)

job_property_bag: Dict[str, str] = {}
allowed_connection_keys = [
Constant.PAUSED,
Constant.SYNC_FREQUENCY,
Constant.DESTINATION_ID,
]
for key in allowed_connection_keys:
if hasattr(connector, key) and getattr(connector, key) is not None:
job_property_bag[key] = repr(getattr(connector, key))
datajob.properties = job_property_bag

# Map connector source and destination table with dataset entity
# Also extend the fine grained lineage of column if include_column_lineage is True
self._extend_lineage(connector=connector, datajob=datajob)

lineage_properties = self._extend_lineage(connector=connector, datajob=datajob)
# TODO: Add fine grained lineages of dataset after FineGrainedLineageDownstreamType.DATASET enabled

connector_properties: Dict[str, str] = {
"connector_id": connector.connector_id,
"connector_type": connector.connector_type,
"paused": str(connector.paused),
"sync_frequency": str(connector.sync_frequency),
"destination_id": connector.destination_id,
}
datajob.properties = {
**connector_properties,
**lineage_properties,
}

return datajob

def _generate_dpi_from_job(self, job: Job, datajob: DataJob) -> DataProcessInstance:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,20 +259,23 @@ def get_allowed_connectors_list(
logger.info("Fetching connector list")
connector_list = self._query(self.fivetran_log_query.get_connectors_query())
for connector in connector_list:
connector_id = connector[Constant.CONNECTOR_ID]
connector_name = connector[Constant.CONNECTOR_NAME]
if not connector_patterns.allowed(connector_name):
report.report_connectors_dropped(connector_name)
report.report_connectors_dropped(
f"{connector_name} (connector_id: {connector_id}, dropped due to filter pattern)"
)
continue
if not destination_patterns.allowed(
destination_id := connector[Constant.DESTINATION_ID]
):
report.report_connectors_dropped(
f"{connector_name} (destination_id: {destination_id})"
f"{connector_name} (connector_id: {connector_id}, destination_id: {destination_id})"
)
continue
connectors.append(
Connector(
connector_id=connector[Constant.CONNECTOR_ID],
connector_id=connector_id,
connector_name=connector_name,
connector_type=connector[Constant.CONNECTOR_TYPE_ID],
paused=connector[Constant.PAUSED],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from typing import List

# Safeguards to prevent fetching massive amounts of data.
MAX_TABLE_LINEAGE_PER_CONNECTOR = 50
MAX_COLUMN_LINEAGE_PER_CONNECTOR = 500
MAX_TABLE_LINEAGE_PER_CONNECTOR = 120
MAX_COLUMN_LINEAGE_PER_CONNECTOR = 1000
MAX_JOBS_PER_CONNECTOR = 500


Expand Down Expand Up @@ -33,6 +33,7 @@ def get_connectors_query(self) -> str:
FROM {self.db_clause}connector
WHERE
_fivetran_deleted = FALSE
QUALIFY ROW_NUMBER() OVER (PARTITION BY connector_id ORDER BY _fivetran_synced DESC) = 1
"""

def get_users_query(self) -> str:
Expand Down Expand Up @@ -86,21 +87,29 @@ def get_table_lineage_query(self, connector_ids: List[str]) -> str:

return f"""\
SELECT
stm.connector_id as connector_id,
stm.id as source_table_id,
stm.name as source_table_name,
ssm.name as source_schema_name,
dtm.id as destination_table_id,
dtm.name as destination_table_name,
dsm.name as destination_schema_name
FROM {self.db_clause}table_lineage as tl
JOIN {self.db_clause}source_table_metadata as stm on tl.source_table_id = stm.id
JOIN {self.db_clause}destination_table_metadata as dtm on tl.destination_table_id = dtm.id
JOIN {self.db_clause}source_schema_metadata as ssm on stm.schema_id = ssm.id
JOIN {self.db_clause}destination_schema_metadata as dsm on dtm.schema_id = dsm.id
WHERE stm.connector_id IN ({formatted_connector_ids})
QUALIFY ROW_NUMBER() OVER (PARTITION BY stm.connector_id ORDER BY tl.created_at DESC) <= {MAX_TABLE_LINEAGE_PER_CONNECTOR}
ORDER BY stm.connector_id, tl.created_at DESC
*
FROM (
SELECT
stm.connector_id as connector_id,
stm.id as source_table_id,
stm.name as source_table_name,
ssm.name as source_schema_name,
dtm.id as destination_table_id,
dtm.name as destination_table_name,
dsm.name as destination_schema_name,
tl.created_at as created_at,
ROW_NUMBER() OVER (PARTITION BY stm.connector_id, stm.id, dtm.id ORDER BY tl.created_at DESC) as table_combo_rn
FROM {self.db_clause}table_lineage as tl
JOIN {self.db_clause}source_table_metadata as stm on tl.source_table_id = stm.id
JOIN {self.db_clause}destination_table_metadata as dtm on tl.destination_table_id = dtm.id
JOIN {self.db_clause}source_schema_metadata as ssm on stm.schema_id = ssm.id
JOIN {self.db_clause}destination_schema_metadata as dsm on dtm.schema_id = dsm.id
WHERE stm.connector_id IN ({formatted_connector_ids})
)
-- Ensure that we only get back one entry per source and destination pair.
WHERE table_combo_rn = 1
QUALIFY ROW_NUMBER() OVER (PARTITION BY connector_id ORDER BY created_at DESC) <= {MAX_TABLE_LINEAGE_PER_CONNECTOR}
ORDER BY connector_id, created_at DESC
"""

def get_column_lineage_query(self, connector_ids: List[str]) -> str:
Expand All @@ -109,19 +118,31 @@ def get_column_lineage_query(self, connector_ids: List[str]) -> str:

return f"""\
SELECT
scm.table_id as source_table_id,
dcm.table_id as destination_table_id,
scm.name as source_column_name,
dcm.name as destination_column_name
FROM {self.db_clause}column_lineage as cl
JOIN {self.db_clause}source_column_metadata as scm
ON cl.source_column_id = scm.id
JOIN {self.db_clause}destination_column_metadata as dcm
ON cl.destination_column_id = dcm.id
-- Only joining source_table_metadata to get the connector_id.
JOIN {self.db_clause}source_table_metadata as stm
ON scm.table_id = stm.id
WHERE stm.connector_id IN ({formatted_connector_ids})
QUALIFY ROW_NUMBER() OVER (PARTITION BY stm.connector_id ORDER BY cl.created_at DESC) <= {MAX_COLUMN_LINEAGE_PER_CONNECTOR}
ORDER BY stm.connector_id, cl.created_at DESC
source_table_id,
destination_table_id,
source_column_name,
destination_column_name
FROM (
SELECT
stm.connector_id as connector_id,
scm.table_id as source_table_id,
dcm.table_id as destination_table_id,
scm.name as source_column_name,
dcm.name as destination_column_name,
cl.created_at as created_at,
ROW_NUMBER() OVER (PARTITION BY stm.connector_id, cl.source_column_id, cl.destination_column_id ORDER BY cl.created_at DESC) as column_combo_rn
FROM {self.db_clause}column_lineage as cl
JOIN {self.db_clause}source_column_metadata as scm
ON cl.source_column_id = scm.id
JOIN {self.db_clause}destination_column_metadata as dcm
ON cl.destination_column_id = dcm.id
-- Only joining source_table_metadata to get the connector_id.
JOIN {self.db_clause}source_table_metadata as stm
ON scm.table_id = stm.id
WHERE stm.connector_id IN ({formatted_connector_ids})
)
-- Ensure that we only get back one entry per (connector, source column, destination column) pair.
WHERE column_combo_rn = 1
QUALIFY ROW_NUMBER() OVER (PARTITION BY connector_id ORDER BY created_at DESC) <= {MAX_COLUMN_LINEAGE_PER_CONNECTOR}
ORDER BY connector_id, created_at DESC
"""
Loading

0 comments on commit a11ac8d

Please sign in to comment.