Skip to content

Commit

Permalink
fix(ingest): limit number of upstreams generated by sql parsing aggre… (
Browse files Browse the repository at this point in the history
datahub-project#11267)

Co-authored-by: Harshal Sheth <[email protected]>
  • Loading branch information
mayurinehate and hsheth2 authored Aug 30, 2024
1 parent 07a6eed commit 3e5c18f
Showing 1 changed file with 24 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ class QueryLogSetting(enum.Enum):
_DEFAULT_QUERY_LOG_SETTING = QueryLogSetting[
os.getenv("DATAHUB_SQL_AGG_QUERY_LOG") or QueryLogSetting.DISABLED.name
]
MAX_UPSTREAM_TABLES_COUNT = 300
MAX_FINEGRAINEDLINEAGE_COUNT = 2000


@dataclasses.dataclass
Expand Down Expand Up @@ -229,6 +231,8 @@ class SqlAggregatorReport(Report):
num_unique_query_fingerprints: Optional[int] = None
num_urns_with_lineage: Optional[int] = None
num_lineage_skipped_due_to_filters: int = 0
num_table_lineage_trimmed_due_to_large_size: int = 0
num_column_lineage_trimmed_due_to_large_size: int = 0

# Queries.
num_queries_entities_generated: int = 0
Expand Down Expand Up @@ -1154,6 +1158,26 @@ def _gen_lineage_for_downstream(
confidenceScore=queries_map[query_id].confidence_score,
)
)

if len(upstream_aspect.upstreams) > MAX_UPSTREAM_TABLES_COUNT:
logger.warning(
f"Too many upstream tables for {downstream_urn}: {len(upstream_aspect.upstreams)}"
f"Keeping only {MAX_UPSTREAM_TABLES_COUNT} table level upstreams/"
)
upstream_aspect.upstreams = upstream_aspect.upstreams[
:MAX_UPSTREAM_TABLES_COUNT
]
self.report.num_table_lineage_trimmed_due_to_large_size += 1
if len(upstream_aspect.fineGrainedLineages) > MAX_FINEGRAINEDLINEAGE_COUNT:
logger.warning(
f"Too many upstream columns for {downstream_urn}: {len(upstream_aspect.fineGrainedLineages)}"
f"Keeping only {MAX_FINEGRAINEDLINEAGE_COUNT} column level upstreams/"
)
upstream_aspect.fineGrainedLineages = upstream_aspect.fineGrainedLineages[
:MAX_FINEGRAINEDLINEAGE_COUNT
]
self.report.num_column_lineage_trimmed_due_to_large_size += 1

upstream_aspect.fineGrainedLineages = (
upstream_aspect.fineGrainedLineages or None
)
Expand Down

0 comments on commit 3e5c18f

Please sign in to comment.