-
Notifications
You must be signed in to change notification settings - Fork 95
/
dbt_healthcheck.py
33 lines (30 loc) · 1.19 KB
/
dbt_healthcheck.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def project_healthcheck(
manifest_path, catalog_path=None, config_path=None, config=None
):
try:
import logging
import json
from datapilot.config.config import load_config
from datapilot.core.platforms.dbt.utils import load_catalog
from datapilot.core.platforms.dbt.utils import load_manifest
from datapilot.core.platforms.dbt.constants import MODEL
from datapilot.core.platforms.dbt.executor import DBTInsightGenerator
logging.basicConfig(level=logging.INFO)
manifest = load_manifest(manifest_path)
catalog = load_catalog(catalog_path) if catalog_path else None
if not config and config_path:
config = load_config(config_path)
insight_generator = DBTInsightGenerator(
manifest=manifest,
catalog=catalog,
config=config,
)
reports = insight_generator.run()
# package_insights = reports[PROJECT]
model_insights = {
k: [json.loads(item.json()) for item in v]
for k, v in reports[MODEL].items()
}
return {"model_insights": model_insights}
except Exception as e:
raise Exception(str(e))