Skip to content

Commit

Permalink
feat(ingest/mssql): allow filtering by procedure_pattern (datahub-pro…
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate authored Nov 26, 2024
1 parent 07033a7 commit 2e3b429
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
BasicSQLAlchemyConfig,
make_sqlalchemy_uri,
)
from datahub.ingestion.source.sql.sql_report import SQLSourceReport
from datahub.metadata.schema_classes import (
BooleanTypeClass,
NumberTypeClass,
Expand Down Expand Up @@ -78,6 +79,11 @@ class SQLServerConfig(BasicSQLAlchemyConfig):
include_stored_procedures_code: bool = Field(
default=True, description="Include information about object code."
)
procedure_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for stored procedures to filter in ingestion."
"Specify regex to match the entire procedure name in database.schema.procedure_name format. e.g. to match all procedures starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*'",
)
include_jobs: bool = Field(
default=True,
description="Include ingest of MSSQL Jobs. Requires access to the 'msdb' and 'sys' schema.",
Expand Down Expand Up @@ -164,6 +170,8 @@ class SQLServerSource(SQLAlchemySource):
If you do use pyodbc, make sure to change the source type from `mssql` to `mssql-odbc` so that we pull in the right set of dependencies. This will be needed in most cases where encryption is required, such as managed SQL Server services in Azure.
"""

report: SQLSourceReport

def __init__(self, config: SQLServerConfig, ctx: PipelineContext):
super().__init__(config, ctx, "mssql")
# Cache the table and column descriptions
Expand Down Expand Up @@ -416,10 +424,16 @@ def loop_stored_procedures( # noqa: C901
data_flow = MSSQLDataFlow(entity=mssql_default_job)
with inspector.engine.connect() as conn:
procedures_data_list = self._get_stored_procedures(conn, db_name, schema)
procedures = [
StoredProcedure(flow=mssql_default_job, **procedure_data)
for procedure_data in procedures_data_list
]
procedures: List[StoredProcedure] = []
for procedure_data in procedures_data_list:
procedure_full_name = f"{db_name}.{schema}.{procedure_data['name']}"
if not self.config.procedure_pattern.allowed(procedure_full_name):
self.report.report_dropped(procedure_full_name)
continue
procedures.append(
StoredProcedure(flow=mssql_default_job, **procedure_data)
)

if procedures:
yield from self.construct_flow_workunits(data_flow=data_flow)
for procedure in procedures:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,11 @@
"aspect": {
"json": {
"customProperties": {
"job_id": "4130c37d-146c-43da-a671-dd9a413a44b3",
"job_id": "2a055367-5e6a-4162-b3a9-dd60f52c79a8",
"job_name": "Weekly Demo Data Backup",
"description": "No description available.",
"date_created": "2024-11-22 12:58:03.260000",
"date_modified": "2024-11-22 12:58:03.440000",
"date_created": "2024-11-26 07:22:19.640000",
"date_modified": "2024-11-26 07:22:19.773000",
"step_id": "1",
"step_name": "Set database to read only",
"subsystem": "TSQL",
Expand Down Expand Up @@ -2282,8 +2282,8 @@
"code": "CREATE PROCEDURE [Foo].[Proc.With.SpecialChar] @ID INT\nAS\n SELECT @ID AS ThatDB;\n",
"input parameters": "['@ID']",
"parameter @ID": "{'type': 'int'}",
"date_created": "2024-11-22 12:58:03.137000",
"date_modified": "2024-11-22 12:58:03.137000"
"date_created": "2024-11-26 07:22:19.510000",
"date_modified": "2024-11-26 07:22:19.510000"
},
"externalUrl": "",
"name": "DemoData.Foo.Proc.With.SpecialChar",
Expand All @@ -2298,34 +2298,6 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),NewProc)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {
"procedure_depends_on": "{'DemoData.Foo.age_dist': 'USER_TABLE', 'DemoData.Foo.Items': 'USER_TABLE', 'DemoData.Foo.Persons': 'USER_TABLE', 'DemoData.Foo.SalesReason': 'USER_TABLE'}",
"depending_on_procedure": "{}",
"code": "CREATE PROCEDURE [Foo].[NewProc]\n AS\n BEGIN\n --insert into items table from salesreason table\n insert into Foo.Items (ID, ItemName)\n SELECT TempID, Name\n FROM Foo.SalesReason;\n\n\n IF OBJECT_ID('Foo.age_dist', 'U') IS NULL\n BEGIN\n -- Create and populate if table doesn't exist\n SELECT Age, COUNT(*) as Count\n INTO Foo.age_dist\n FROM Foo.Persons\n GROUP BY Age\n END\n ELSE\n BEGIN\n -- Update existing table\n TRUNCATE TABLE Foo.age_dist;\n\n INSERT INTO Foo.age_dist (Age, Count)\n SELECT Age, COUNT(*) as Count\n FROM Foo.Persons\n GROUP BY Age\n END\n\n SELECT ID, Age INTO #TEMPTABLE FROM NewData.FooNew.PersonsNew\n \n UPDATE DemoData.Foo.Persons\n SET Age = t.Age\n FROM DemoData.Foo.Persons p\n JOIN #TEMPTABLE t ON p.ID = t.ID\n\n END\n",
"input parameters": "[]",
"date_created": "2024-11-22 12:58:03.140000",
"date_modified": "2024-11-22 12:58:03.140000"
},
"externalUrl": "",
"name": "DemoData.Foo.NewProc",
"type": {
"string": "MSSQL_STORED_PROCEDURE"
}
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "mssql-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:250ce23f940485303fa5e5d4f5194975",
Expand Down Expand Up @@ -2713,22 +2685,6 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),NewProc)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "mssql-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),Proc.With.SpecialChar)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ source:
database_pattern:
deny:
- NewData
procedure_pattern:
deny:
- DemoData.Foo.NewProc

sink:
type: file
Expand Down

0 comments on commit 2e3b429

Please sign in to comment.