From a11ac8d104649c953037d4b85afac67d6828ad18 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 29 Oct 2024 01:13:34 -0700 Subject: [PATCH] feat(ingest/fivetran): avoid duplicate table lineage entries (#11712) --- .../ingestion/source/fivetran/config.py | 4 +- .../ingestion/source/fivetran/fivetran.py | 49 ++++-- .../source/fivetran/fivetran_log_api.py | 9 +- .../source/fivetran/fivetran_query.py | 85 ++++++---- ...nowflake_empty_connection_user_golden.json | 140 ++++++++-------- .../fivetran/fivetran_snowflake_golden.json | 156 +++++++++--------- 6 files changed, 249 insertions(+), 194 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py index 1e15f6b395ca5..e40e284d6e0a4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py @@ -144,8 +144,8 @@ class FivetranSourceReport(StaleEntityRemovalSourceReport): def report_connectors_scanned(self, count: int = 1) -> None: self.connectors_scanned += count - def report_connectors_dropped(self, model: str) -> None: - self.filtered_connectors.append(model) + def report_connectors_dropped(self, connector: str) -> None: + self.filtered_connectors.append(connector) class PlatformDetail(ConfigModel): diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py index 907bfa3a167aa..21c967e162891 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py @@ -76,7 +76,7 @@ def __init__(self, config: FivetranSourceConfig, ctx: PipelineContext): self.audit_log = FivetranLogAPI(self.config.fivetran_log_config) - def _extend_lineage(self, connector: Connector, datajob: DataJob) -> None: + def _extend_lineage(self, connector: Connector, datajob: DataJob) -> Dict[str, str]: input_dataset_urn_list: List[DatasetUrn] = [] output_dataset_urn_list: List[DatasetUrn] = [] fine_grained_lineage: List[FineGrainedLineage] = [] @@ -93,8 +93,11 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> None: connector.connector_type ] else: - logger.info( - f"Fivetran connector source type: {connector.connector_type} is not supported to mapped with Datahub dataset entity." + self.report.info( + title="Guessing source platform for lineage", + message="We encountered a connector type that we don't fully support yet. " + "We will attempt to guess the platform based on the connector type.", + context=f"{connector.connector_name} (connector_id: {connector.connector_id}, connector_type: {connector.connector_type})", ) source_details.platform = connector.connector_type @@ -170,7 +173,19 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> None: datajob.inlets.extend(input_dataset_urn_list) datajob.outlets.extend(output_dataset_urn_list) datajob.fine_grained_lineages.extend(fine_grained_lineage) - return None + + return dict( + **{ + f"source.{k}": str(v) + for k, v in source_details.dict().items() + if v is not None + }, + **{ + f"destination.{k}": str(v) + for k, v in destination_details.dict().items() + if v is not None + }, + ) def _generate_dataflow_from_connector(self, connector: Connector) -> DataFlow: return DataFlow( @@ -196,23 +211,23 @@ def _generate_datajob_from_connector(self, connector: Connector) -> DataJob: owners={owner_email} if owner_email else set(), ) - job_property_bag: Dict[str, str] = {} - allowed_connection_keys = [ - Constant.PAUSED, - Constant.SYNC_FREQUENCY, - Constant.DESTINATION_ID, - ] - for key in allowed_connection_keys: - if hasattr(connector, key) and getattr(connector, key) is not None: - job_property_bag[key] = repr(getattr(connector, key)) - datajob.properties = job_property_bag - # Map connector source and destination table with dataset entity # Also extend the fine grained lineage of column if include_column_lineage is True - self._extend_lineage(connector=connector, datajob=datajob) - + lineage_properties = self._extend_lineage(connector=connector, datajob=datajob) # TODO: Add fine grained lineages of dataset after FineGrainedLineageDownstreamType.DATASET enabled + connector_properties: Dict[str, str] = { + "connector_id": connector.connector_id, + "connector_type": connector.connector_type, + "paused": str(connector.paused), + "sync_frequency": str(connector.sync_frequency), + "destination_id": connector.destination_id, + } + datajob.properties = { + **connector_properties, + **lineage_properties, + } + return datajob def _generate_dpi_from_job(self, job: Job, datajob: DataJob) -> DataProcessInstance: diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py index 79f9d513bfb7c..529002270cdd9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py @@ -259,20 +259,23 @@ def get_allowed_connectors_list( logger.info("Fetching connector list") connector_list = self._query(self.fivetran_log_query.get_connectors_query()) for connector in connector_list: + connector_id = connector[Constant.CONNECTOR_ID] connector_name = connector[Constant.CONNECTOR_NAME] if not connector_patterns.allowed(connector_name): - report.report_connectors_dropped(connector_name) + report.report_connectors_dropped( + f"{connector_name} (connector_id: {connector_id}, dropped due to filter pattern)" + ) continue if not destination_patterns.allowed( destination_id := connector[Constant.DESTINATION_ID] ): report.report_connectors_dropped( - f"{connector_name} (destination_id: {destination_id})" + f"{connector_name} (connector_id: {connector_id}, destination_id: {destination_id})" ) continue connectors.append( Connector( - connector_id=connector[Constant.CONNECTOR_ID], + connector_id=connector_id, connector_name=connector_name, connector_type=connector[Constant.CONNECTOR_TYPE_ID], paused=connector[Constant.PAUSED], diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py index 39c4d7712b4fc..65378928b244d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py @@ -1,8 +1,8 @@ from typing import List # Safeguards to prevent fetching massive amounts of data. -MAX_TABLE_LINEAGE_PER_CONNECTOR = 50 -MAX_COLUMN_LINEAGE_PER_CONNECTOR = 500 +MAX_TABLE_LINEAGE_PER_CONNECTOR = 120 +MAX_COLUMN_LINEAGE_PER_CONNECTOR = 1000 MAX_JOBS_PER_CONNECTOR = 500 @@ -33,6 +33,7 @@ def get_connectors_query(self) -> str: FROM {self.db_clause}connector WHERE _fivetran_deleted = FALSE +QUALIFY ROW_NUMBER() OVER (PARTITION BY connector_id ORDER BY _fivetran_synced DESC) = 1 """ def get_users_query(self) -> str: @@ -86,21 +87,29 @@ def get_table_lineage_query(self, connector_ids: List[str]) -> str: return f"""\ SELECT - stm.connector_id as connector_id, - stm.id as source_table_id, - stm.name as source_table_name, - ssm.name as source_schema_name, - dtm.id as destination_table_id, - dtm.name as destination_table_name, - dsm.name as destination_schema_name -FROM {self.db_clause}table_lineage as tl -JOIN {self.db_clause}source_table_metadata as stm on tl.source_table_id = stm.id -JOIN {self.db_clause}destination_table_metadata as dtm on tl.destination_table_id = dtm.id -JOIN {self.db_clause}source_schema_metadata as ssm on stm.schema_id = ssm.id -JOIN {self.db_clause}destination_schema_metadata as dsm on dtm.schema_id = dsm.id -WHERE stm.connector_id IN ({formatted_connector_ids}) -QUALIFY ROW_NUMBER() OVER (PARTITION BY stm.connector_id ORDER BY tl.created_at DESC) <= {MAX_TABLE_LINEAGE_PER_CONNECTOR} -ORDER BY stm.connector_id, tl.created_at DESC + * +FROM ( + SELECT + stm.connector_id as connector_id, + stm.id as source_table_id, + stm.name as source_table_name, + ssm.name as source_schema_name, + dtm.id as destination_table_id, + dtm.name as destination_table_name, + dsm.name as destination_schema_name, + tl.created_at as created_at, + ROW_NUMBER() OVER (PARTITION BY stm.connector_id, stm.id, dtm.id ORDER BY tl.created_at DESC) as table_combo_rn + FROM {self.db_clause}table_lineage as tl + JOIN {self.db_clause}source_table_metadata as stm on tl.source_table_id = stm.id + JOIN {self.db_clause}destination_table_metadata as dtm on tl.destination_table_id = dtm.id + JOIN {self.db_clause}source_schema_metadata as ssm on stm.schema_id = ssm.id + JOIN {self.db_clause}destination_schema_metadata as dsm on dtm.schema_id = dsm.id + WHERE stm.connector_id IN ({formatted_connector_ids}) +) +-- Ensure that we only get back one entry per source and destination pair. +WHERE table_combo_rn = 1 +QUALIFY ROW_NUMBER() OVER (PARTITION BY connector_id ORDER BY created_at DESC) <= {MAX_TABLE_LINEAGE_PER_CONNECTOR} +ORDER BY connector_id, created_at DESC """ def get_column_lineage_query(self, connector_ids: List[str]) -> str: @@ -109,19 +118,31 @@ def get_column_lineage_query(self, connector_ids: List[str]) -> str: return f"""\ SELECT - scm.table_id as source_table_id, - dcm.table_id as destination_table_id, - scm.name as source_column_name, - dcm.name as destination_column_name -FROM {self.db_clause}column_lineage as cl -JOIN {self.db_clause}source_column_metadata as scm - ON cl.source_column_id = scm.id -JOIN {self.db_clause}destination_column_metadata as dcm - ON cl.destination_column_id = dcm.id --- Only joining source_table_metadata to get the connector_id. -JOIN {self.db_clause}source_table_metadata as stm - ON scm.table_id = stm.id -WHERE stm.connector_id IN ({formatted_connector_ids}) -QUALIFY ROW_NUMBER() OVER (PARTITION BY stm.connector_id ORDER BY cl.created_at DESC) <= {MAX_COLUMN_LINEAGE_PER_CONNECTOR} -ORDER BY stm.connector_id, cl.created_at DESC + source_table_id, + destination_table_id, + source_column_name, + destination_column_name +FROM ( + SELECT + stm.connector_id as connector_id, + scm.table_id as source_table_id, + dcm.table_id as destination_table_id, + scm.name as source_column_name, + dcm.name as destination_column_name, + cl.created_at as created_at, + ROW_NUMBER() OVER (PARTITION BY stm.connector_id, cl.source_column_id, cl.destination_column_id ORDER BY cl.created_at DESC) as column_combo_rn + FROM {self.db_clause}column_lineage as cl + JOIN {self.db_clause}source_column_metadata as scm + ON cl.source_column_id = scm.id + JOIN {self.db_clause}destination_column_metadata as dcm + ON cl.destination_column_id = dcm.id + -- Only joining source_table_metadata to get the connector_id. + JOIN {self.db_clause}source_table_metadata as stm + ON scm.table_id = stm.id + WHERE stm.connector_id IN ({formatted_connector_ids}) +) +-- Ensure that we only get back one entry per (connector, source column, destination column) pair. +WHERE column_combo_rn = 1 +QUALIFY ROW_NUMBER() OVER (PARTITION BY connector_id ORDER BY created_at DESC) <= {MAX_COLUMN_LINEAGE_PER_CONNECTOR} +ORDER BY connector_id, created_at DESC """ diff --git a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json index 29b186978a76a..0f8f4cc64e7ca 100644 --- a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json +++ b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json @@ -17,6 +17,22 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(fivetran,calendar_elected,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataFlow", "entityUrn": "urn:li:dataFlow:(fivetran,calendar_elected,PROD)", @@ -62,9 +78,17 @@ "aspect": { "json": { "customProperties": { + "connector_id": "calendar_elected", + "connector_type": "postgres", "paused": "False", "sync_frequency": "1440", - "destination_id": "'interval_unconstitutional'" + "destination_id": "interval_unconstitutional", + "source.platform": "postgres", + "source.env": "DEV", + "source.database": "postgres_db", + "destination.platform": "snowflake", + "destination.env": "PROD", + "destination.database": "test_database" }, "name": "postgres", "type": { @@ -79,6 +103,22 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)", @@ -150,13 +190,18 @@ } }, { - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.employee,DEV)", + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)", "changeType": "UPSERT", - "aspectName": "status", + "aspectName": "ownership", "aspect": { "json": { - "removed": false + "owners": [], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:fivetran" + } } }, "systemMetadata": { @@ -166,13 +211,13 @@ } }, { - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.company,DEV)", + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)", "changeType": "UPSERT", - "aspectName": "status", + "aspectName": "globalTags", "aspect": { "json": { - "removed": false + "tags": [] } }, "systemMetadata": { @@ -182,18 +227,13 @@ } }, { - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)", + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.employee,DEV)", "changeType": "UPSERT", - "aspectName": "ownership", + "aspectName": "status", "aspect": { "json": { - "owners": [], - "ownerTypes": {}, - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:fivetran" - } + "removed": false } }, "systemMetadata": { @@ -203,13 +243,13 @@ } }, { - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)", + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.company,DEV)", "changeType": "UPSERT", - "aspectName": "globalTags", + "aspectName": "status", "aspect": { "json": { - "tags": [] + "removed": false } }, "systemMetadata": { @@ -304,8 +344,8 @@ "json": { "timestampMillis": 1695191853000, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "status": "STARTED" } @@ -325,8 +365,8 @@ "json": { "timestampMillis": 1695191885000, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "status": "COMPLETE", "result": { @@ -427,8 +467,8 @@ "json": { "timestampMillis": 1696343730000, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "status": "STARTED" } @@ -448,8 +488,8 @@ "json": { "timestampMillis": 1696343732000, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "status": "COMPLETE", "result": { @@ -550,8 +590,8 @@ "json": { "timestampMillis": 1696343755000, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "status": "STARTED" } @@ -571,8 +611,8 @@ "json": { "timestampMillis": 1696343790000, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "status": "COMPLETE", "result": { @@ -587,38 +627,6 @@ "lastRunId": "no-run-id-provided" } }, -{ - "entityType": "dataFlow", - "entityUrn": "urn:li:dataFlow:(fivetran,calendar_elected,PROD)", - "changeType": "UPSERT", - "aspectName": "status", - "aspect": { - "json": { - "removed": false - } - }, - "systemMetadata": { - "lastObserved": 1654621200000, - "runId": "powerbi-test", - "lastRunId": "no-run-id-provided" - } -}, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)", - "changeType": "UPSERT", - "aspectName": "status", - "aspect": { - "json": { - "removed": false - } - }, - "systemMetadata": { - "lastObserved": 1654621200000, - "runId": "powerbi-test", - "lastRunId": "no-run-id-provided" - } -}, { "entityType": "dataProcessInstance", "entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a", diff --git a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json index 0cd3bb83f90f5..22933f3483e76 100644 --- a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json +++ b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json @@ -17,6 +17,22 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(fivetran,calendar_elected,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataFlow", "entityUrn": "urn:li:dataFlow:(fivetran,calendar_elected,PROD)", @@ -62,9 +78,17 @@ "aspect": { "json": { "customProperties": { + "connector_id": "calendar_elected", + "connector_type": "postgres", "paused": "False", "sync_frequency": "1440", - "destination_id": "'interval_unconstitutional'" + "destination_id": "interval_unconstitutional", + "source.platform": "postgres", + "source.env": "DEV", + "source.database": "postgres_db", + "destination.platform": "snowflake", + "destination.env": "PROD", + "destination.database": "test_database" }, "name": "postgres", "type": { @@ -79,6 +103,22 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)", @@ -150,13 +190,26 @@ } }, { - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.employee,DEV)", + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)", "changeType": "UPSERT", - "aspectName": "status", + "aspectName": "ownership", "aspect": { "json": { - "removed": false + "owners": [ + { + "owner": "urn:li:corpuser:abc.xyz@email.com", + "type": "DEVELOPER", + "source": { + "type": "SERVICE" + } + } + ], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:fivetran" + } } }, "systemMetadata": { @@ -166,13 +219,13 @@ } }, { - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.company,DEV)", + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)", "changeType": "UPSERT", - "aspectName": "status", + "aspectName": "globalTags", "aspect": { "json": { - "removed": false + "tags": [] } }, "systemMetadata": { @@ -182,26 +235,13 @@ } }, { - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)", + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.employee,DEV)", "changeType": "UPSERT", - "aspectName": "ownership", + "aspectName": "status", "aspect": { "json": { - "owners": [ - { - "owner": "urn:li:corpuser:abc.xyz@email.com", - "type": "DEVELOPER", - "source": { - "type": "SERVICE" - } - } - ], - "ownerTypes": {}, - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:fivetran" - } + "removed": false } }, "systemMetadata": { @@ -211,13 +251,13 @@ } }, { - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)", + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.company,DEV)", "changeType": "UPSERT", - "aspectName": "globalTags", + "aspectName": "status", "aspect": { "json": { - "tags": [] + "removed": false } }, "systemMetadata": { @@ -312,8 +352,8 @@ "json": { "timestampMillis": 1695191853000, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "status": "STARTED" } @@ -333,8 +373,8 @@ "json": { "timestampMillis": 1695191885000, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "status": "COMPLETE", "result": { @@ -435,8 +475,8 @@ "json": { "timestampMillis": 1696343730000, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "status": "STARTED" } @@ -456,8 +496,8 @@ "json": { "timestampMillis": 1696343732000, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "status": "COMPLETE", "result": { @@ -558,8 +598,8 @@ "json": { "timestampMillis": 1696343755000, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "status": "STARTED" } @@ -579,8 +619,8 @@ "json": { "timestampMillis": 1696343790000, "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" }, "status": "COMPLETE", "result": { @@ -595,38 +635,6 @@ "lastRunId": "no-run-id-provided" } }, -{ - "entityType": "dataFlow", - "entityUrn": "urn:li:dataFlow:(fivetran,calendar_elected,PROD)", - "changeType": "UPSERT", - "aspectName": "status", - "aspect": { - "json": { - "removed": false - } - }, - "systemMetadata": { - "lastObserved": 1654621200000, - "runId": "powerbi-test", - "lastRunId": "no-run-id-provided" - } -}, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)", - "changeType": "UPSERT", - "aspectName": "status", - "aspect": { - "json": { - "removed": false - } - }, - "systemMetadata": { - "lastObserved": 1654621200000, - "runId": "powerbi-test", - "lastRunId": "no-run-id-provided" - } -}, { "entityType": "dataProcessInstance", "entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a",