Skip to content

Commit

Permalink
Merge pull request #89 from elementary-data/no_failures_on_singular_t…
Browse files Browse the repository at this point in the history
…ests

Fixed failure on singular tests
  • Loading branch information
oravi authored Jun 20, 2022
2 parents 0fbd55b + 1e78beb commit 39383b6
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 24 deletions.
11 changes: 8 additions & 3 deletions monitor/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,11 @@ def monitor(ctx, days_back, slack_webhook, config_dir, profiles_dir, update_dbt_
track_cli_start(anonymous_tracking, 'monitor', get_cli_properties(), ctx.command.name)
try:
data_monitoring = DataMonitoring(config, update_dbt_package, slack_webhook)
data_monitoring.run(days_back, full_refresh_dbt_package)
success = data_monitoring.run(days_back, full_refresh_dbt_package)
track_cli_end(anonymous_tracking, 'monitor', data_monitoring.properties(), ctx.command.name)
if not success:
return 1
return 0
except Exception as exc:
track_cli_exception(anonymous_tracking, 'monitor', exc, ctx.command.name)
raise
Expand Down Expand Up @@ -132,12 +135,14 @@ def report(ctx, config_dir, profiles_dir, update_dbt_package, profile_target):
track_cli_start(anonymous_tracking, 'monitor-report', get_cli_properties(), ctx.command.name)
try:
data_monitoring = DataMonitoring(config, update_dbt_package)
data_monitoring.generate_report()
success = data_monitoring.generate_report()
track_cli_end(anonymous_tracking, 'monitor-report', data_monitoring.properties(), ctx.command.name)
if not success:
return 1
return 0
except Exception as exc:
track_cli_exception(anonymous_tracking, 'monitor-report', exc, ctx.command.name)
raise
return


if __name__ == "__main__":
Expand Down
42 changes: 30 additions & 12 deletions monitor/data_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def __init__(self, config: Config, force_update_dbt_package: bool = False,
self.execution_properties = {}
self.slack_webhook = slack_webhook or self.config.slack_notification_webhook
self._download_dbt_package_if_needed(force_update_dbt_package)
self.success = True

def _dbt_package_exists(self) -> bool:
return os.path.exists(self.DBT_PROJECT_PACKAGES_PATH) or os.path.exists(self.DBT_PROJECT_MODULES_PATH)
Expand All @@ -55,7 +56,12 @@ def _query_alerts(self, days_back: int) -> list:
test_result_alert_dicts = json.loads(results[0])
self.execution_properties['alert_rows'] = len(test_result_alert_dicts)
for test_result_alert_dict in test_result_alert_dicts:
test_result_alerts.append(TestResult.create_test_result_from_dict(test_result_alert_dict))
test_result_object = TestResult.create_test_result_from_dict(test_result_alert_dict)
if test_result_object:
test_result_alerts.append(test_result_object)
else:
self.success = False

return test_result_alerts

def _send_to_slack(self, test_result_alerts: [TestResult]) -> None:
Expand All @@ -71,6 +77,7 @@ def _send_to_slack(self, test_result_alerts: [TestResult]) -> None:
if sent_alert_count > 0:
self._update_sent_alerts(sent_alerts)
else:
self.success = False
logger.info("Alerts found but slack webhook is not configured (see documentation on how to configure "
"a slack webhook)")

Expand All @@ -84,6 +91,7 @@ def _download_dbt_package_if_needed(self, force_update_dbt_packages: bool):
self.execution_properties['package_downloaded'] = package_downloaded
if not package_downloaded:
logger.info('Could not download internal dbt package')
self.success = False
return

def _send_alerts(self, days_back: int):
Expand All @@ -93,18 +101,23 @@ def _send_alerts(self, days_back: int):
if alert_count > 0:
self._send_to_slack(alerts)

def run(self, days_back: int, dbt_full_refresh: bool = False) -> None:
def run(self, days_back: int, dbt_full_refresh: bool = False) -> bool:

logger.info("Running internal dbt run to aggregate alerts")
success = self.dbt_runner.run(models='alerts', full_refresh=dbt_full_refresh)
self.execution_properties['alerts_run_success'] = success
if not success:
logger.info('Could not aggregate alerts successfully')
return
self.success = False
self.execution_properties['success'] = self.success
return self.success

self._send_alerts(days_back)
self.execution_properties['run_end'] = True
self.execution_properties['success'] = self.success
return self.success

def generate_report(self):
def generate_report(self) -> bool:
elementary_output = {}
elementary_output['creation_time'] = get_now_utc_str()
test_results, test_result_totals = self._get_test_results_and_totals()
Expand Down Expand Up @@ -133,7 +146,9 @@ def generate_report(self):

elementary_html_file_path = 'file://' + elementary_html_file_path
webbrowser.open_new_tab(elementary_html_file_path)
self.execution_properties['report_success'] = True
self.execution_properties['report_end'] = True
self.execution_properties['success'] = self.success
return self.success

def _get_test_results_and_totals(self):
results = self.dbt_runner.run_operation(macro_name='get_test_results')
Expand All @@ -144,14 +159,17 @@ def _get_test_results_and_totals(self):
for test_result_dict in test_result_dicts:
days_diff = test_result_dict.pop('days_diff')
test_result_object = TestResult.create_test_result_from_dict(test_result_dict)
model_unique_id = test_result_object.model_unique_id
if model_unique_id in test_results_api_dict:
test_results_api_dict[model_unique_id].append(test_result_object.to_test_result_api_dict())
if test_result_object:
model_unique_id = test_result_object.model_unique_id
if model_unique_id in test_results_api_dict:
test_results_api_dict[model_unique_id].append(test_result_object.to_test_result_api_dict())
else:
test_results_api_dict[model_unique_id] = [test_result_object.to_test_result_api_dict()]

self._update_test_results_totals(test_result_totals_api_dict, model_unique_id, days_diff,
test_result_object.status)
else:
test_results_api_dict[model_unique_id] = [test_result_object.to_test_result_api_dict()]

self._update_test_results_totals(test_result_totals_api_dict, model_unique_id, days_diff,
test_result_object.status)
self.success = False

self.execution_properties['test_results'] = len(test_result_dicts)
return test_results_api_dict, test_result_totals_api_dict
Expand Down
24 changes: 15 additions & 9 deletions monitor/test_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
from utils.time import convert_utc_time_to_local_time
from datetime import datetime
from utils.json_utils import try_load_json
from typing import Optional
from utils.log import get_logger
import re

logger = get_logger(__name__)

#TODO: handle last min, max in metrics graph, last or anomalous?
class TestResult(object):
def __init__(self, id, model_unique_id, test_unique_id, status) -> None:
self.id = id
Expand All @@ -15,12 +17,16 @@ def __init__(self, id, model_unique_id, test_unique_id, status) -> None:
self.status = status

@staticmethod
def create_test_result_from_dict(test_result_dict: dict) -> 'TestResult':
def create_test_result_from_dict(test_result_dict: dict) -> Optional['TestResult']:
test_type = test_result_dict.get('test_type')
if test_type == 'dbt_test':
return DbtTestResult(**test_result_dict)
else:
return ElementaryTestResult(**test_result_dict)
try:
if test_type == 'dbt_test':
return DbtTestResult(**test_result_dict)
else:
return ElementaryTestResult(**test_result_dict)
except Exception:
logger.exception(f"Failed parsing test result - {json.dumps(test_result_dict)}")
return None

def to_slack_message(self, slack_workflows: bool = False) -> dict:
pass
Expand Down Expand Up @@ -62,7 +68,7 @@ def send_to_slack(self, webhook: str, is_slack_workflow: bool = False):
requests.post(url=webhook, headers={'Content-type': "application/json"}, data=json.dumps(data))

@staticmethod
def display_name(str_value):
def display_name(str_value: str) -> str:
return ' '.join([word[0].upper() + word[1:] for word in str_value.split('_')])

@staticmethod
Expand Down Expand Up @@ -98,8 +104,8 @@ def __init__(self, id, model_unique_id, test_unique_id, detected_at, database_na
self.test_name = test_name
self.test_display_name = self.display_name(test_name)
self.other = other
self.test_sub_type = test_sub_type
self.test_sub_type_display_name = self.display_name(test_sub_type)
self.test_sub_type = test_sub_type if test_sub_type else ''
self.test_sub_type_display_name = self.display_name(test_sub_type) if test_sub_type else ''
self.test_results_query = test_results_query.strip() if test_results_query else ''
self.test_rows_sample = test_rows_sample if test_rows_sample else ''
self.test_params = test_params
Expand Down

0 comments on commit 39383b6

Please sign in to comment.