From f7bf80c00dfc7132c6aa366ddc3a1263edc31341 Mon Sep 17 00:00:00 2001 From: oravi Date: Sun, 19 Jun 2022 23:32:14 +0300 Subject: [PATCH 1/4] Fixed failure on singular tests --- monitor/cli.py | 8 ++++---- monitor/data_monitoring.py | 41 +++++++++++++++++++++++++++----------- monitor/test_result.py | 18 +++++++++++------ 3 files changed, 45 insertions(+), 22 deletions(-) diff --git a/monitor/cli.py b/monitor/cli.py index a2f717212..5112a75a7 100644 --- a/monitor/cli.py +++ b/monitor/cli.py @@ -90,12 +90,12 @@ def monitor(ctx, days_back, slack_webhook, config_dir, profiles_dir, update_dbt_ track_cli_start(anonymous_tracking, 'monitor', get_cli_properties(), ctx.command.name) try: data_monitoring = DataMonitoring(config, update_dbt_package, slack_webhook) - data_monitoring.run(days_back, full_refresh_dbt_package) + status_code = data_monitoring.run(days_back, full_refresh_dbt_package) track_cli_end(anonymous_tracking, 'monitor', data_monitoring.properties(), ctx.command.name) except Exception as exc: track_cli_exception(anonymous_tracking, 'monitor', exc, ctx.command.name) raise - + return status_code @monitor.command() @click.option( @@ -132,12 +132,12 @@ def report(ctx, config_dir, profiles_dir, update_dbt_package, profile_target): track_cli_start(anonymous_tracking, 'monitor-report', get_cli_properties(), ctx.command.name) try: data_monitoring = DataMonitoring(config, update_dbt_package) - data_monitoring.generate_report() + status_code = data_monitoring.generate_report() track_cli_end(anonymous_tracking, 'monitor-report', data_monitoring.properties(), ctx.command.name) except Exception as exc: track_cli_exception(anonymous_tracking, 'monitor-report', exc, ctx.command.name) raise - return + return status_code if __name__ == "__main__": diff --git a/monitor/data_monitoring.py b/monitor/data_monitoring.py index 46c0dfe75..813679f97 100644 --- a/monitor/data_monitoring.py +++ b/monitor/data_monitoring.py @@ -30,6 +30,7 @@ def __init__(self, config: Config, force_update_dbt_package: bool = False, self.execution_properties = {} self.slack_webhook = slack_webhook or self.config.slack_notification_webhook self._download_dbt_package_if_needed(force_update_dbt_package) + self.status_code = 0 def _dbt_package_exists(self) -> bool: return os.path.exists(self.DBT_PROJECT_PACKAGES_PATH) or os.path.exists(self.DBT_PROJECT_MODULES_PATH) @@ -55,7 +56,12 @@ def _query_alerts(self, days_back: int) -> list: test_result_alert_dicts = json.loads(results[0]) self.execution_properties['alert_rows'] = len(test_result_alert_dicts) for test_result_alert_dict in test_result_alert_dicts: - test_result_alerts.append(TestResult.create_test_result_from_dict(test_result_alert_dict)) + test_result_object = TestResult.create_test_result_from_dict(test_result_alert_dict) + if test_result_object: + test_result_alerts.append(test_result_object) + else: + self.status_code = 1 + return test_result_alerts def _send_to_slack(self, test_result_alerts: [TestResult]) -> None: @@ -71,6 +77,7 @@ def _send_to_slack(self, test_result_alerts: [TestResult]) -> None: if sent_alert_count > 0: self._update_sent_alerts(sent_alerts) else: + self.status_code = 1 logger.info("Alerts found but slack webhook is not configured (see documentation on how to configure " "a slack webhook)") @@ -84,6 +91,7 @@ def _download_dbt_package_if_needed(self, force_update_dbt_packages: bool): self.execution_properties['package_downloaded'] = package_downloaded if not package_downloaded: logger.info('Could not download internal dbt package') + self.status_code = 1 return def _send_alerts(self, days_back: int): @@ -93,18 +101,22 @@ def _send_alerts(self, days_back: int): if alert_count > 0: self._send_to_slack(alerts) - def run(self, days_back: int, dbt_full_refresh: bool = False) -> None: + def run(self, days_back: int, dbt_full_refresh: bool = False) -> int: logger.info("Running internal dbt run to aggregate alerts") success = self.dbt_runner.run(models='alerts', full_refresh=dbt_full_refresh) self.execution_properties['alerts_run_success'] = success if not success: logger.info('Could not aggregate alerts successfully') - return + self.execution_properties['run_status_code'] = 1 + return 1 self._send_alerts(days_back) + self.execution_properties['run_end'] = True + self.execution_properties['run_status_code'] = self.status_code + return self.status_code - def generate_report(self): + def generate_report(self) -> int: elementary_output = {} elementary_output['creation_time'] = get_now_utc_str() test_results, test_result_totals = self._get_test_results_and_totals() @@ -133,7 +145,9 @@ def generate_report(self): elementary_html_file_path = 'file://' + elementary_html_file_path webbrowser.open_new_tab(elementary_html_file_path) - self.execution_properties['report_success'] = True + self.execution_properties['report_end'] = True + self.execution_properties['report_status_code'] = self.status_code + return self.status_code def _get_test_results_and_totals(self): results = self.dbt_runner.run_operation(macro_name='get_test_results') @@ -144,14 +158,17 @@ def _get_test_results_and_totals(self): for test_result_dict in test_result_dicts: days_diff = test_result_dict.pop('days_diff') test_result_object = TestResult.create_test_result_from_dict(test_result_dict) - model_unique_id = test_result_object.model_unique_id - if model_unique_id in test_results_api_dict: - test_results_api_dict[model_unique_id].append(test_result_object.to_test_result_api_dict()) + if test_result_object: + model_unique_id = test_result_object.model_unique_id + if model_unique_id in test_results_api_dict: + test_results_api_dict[model_unique_id].append(test_result_object.to_test_result_api_dict()) + else: + test_results_api_dict[model_unique_id] = [test_result_object.to_test_result_api_dict()] + + self._update_test_results_totals(test_result_totals_api_dict, model_unique_id, days_diff, + test_result_object.status) else: - test_results_api_dict[model_unique_id] = [test_result_object.to_test_result_api_dict()] - - self._update_test_results_totals(test_result_totals_api_dict, model_unique_id, days_diff, - test_result_object.status) + self.status_code = 1 self.execution_properties['test_results'] = len(test_result_dicts) return test_results_api_dict, test_result_totals_api_dict diff --git a/monitor/test_result.py b/monitor/test_result.py index 00ac3909b..8949e70be 100644 --- a/monitor/test_result.py +++ b/monitor/test_result.py @@ -3,10 +3,12 @@ from utils.time import convert_utc_time_to_local_time from datetime import datetime from utils.json_utils import try_load_json +from typing import Optional +from utils.log import get_logger import re +logger = get_logger(__name__) -#TODO: handle last min, max in metrics graph, last or anomalous? class TestResult(object): def __init__(self, id, model_unique_id, test_unique_id, status) -> None: self.id = id @@ -15,12 +17,16 @@ def __init__(self, id, model_unique_id, test_unique_id, status) -> None: self.status = status @staticmethod - def create_test_result_from_dict(test_result_dict: dict) -> 'TestResult': + def create_test_result_from_dict(test_result_dict: dict) -> Optional['TestResult']: test_type = test_result_dict.get('test_type') - if test_type == 'dbt_test': - return DbtTestResult(**test_result_dict) - else: - return ElementaryTestResult(**test_result_dict) + try: + if test_type == 'dbt_test': + return DbtTestResult(**test_result_dict) + else: + return ElementaryTestResult(**test_result_dict) + except Exception: + logger.error(f"Failed parsing test result - {json.dumps(test_result_dict)}") + return None def to_slack_message(self, slack_workflows: bool = False) -> dict: pass From f4044a13f6f6541b4c2b098c3ad9e4bb6a1e80b2 Mon Sep 17 00:00:00 2001 From: oravi Date: Mon, 20 Jun 2022 17:18:36 +0300 Subject: [PATCH 2/4] Fixed failure on singular tests --- monitor/test_result.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/monitor/test_result.py b/monitor/test_result.py index 8949e70be..492792645 100644 --- a/monitor/test_result.py +++ b/monitor/test_result.py @@ -68,7 +68,7 @@ def send_to_slack(self, webhook: str, is_slack_workflow: bool = False): requests.post(url=webhook, headers={'Content-type': "application/json"}, data=json.dumps(data)) @staticmethod - def display_name(str_value): + def display_name(str_value: Optional[str]) -> str: return ' '.join([word[0].upper() + word[1:] for word in str_value.split('_')]) @staticmethod @@ -104,8 +104,12 @@ def __init__(self, id, model_unique_id, test_unique_id, detected_at, database_na self.test_name = test_name self.test_display_name = self.display_name(test_name) self.other = other - self.test_sub_type = test_sub_type - self.test_sub_type_display_name = self.display_name(test_sub_type) + if test_sub_type: + self.test_sub_type = test_sub_type + self.test_sub_type_display_name = self.display_name(test_sub_type) + else: + self.test_sub_type = '' + self.test_sub_type_display_name = '' self.test_results_query = test_results_query.strip() if test_results_query else '' self.test_rows_sample = test_rows_sample if test_rows_sample else '' self.test_params = test_params From 405f06730281312bfae78e990d422f6b7636df4e Mon Sep 17 00:00:00 2001 From: oravi Date: Mon, 20 Jun 2022 17:21:43 +0300 Subject: [PATCH 3/4] Fixed typing --- monitor/test_result.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/monitor/test_result.py b/monitor/test_result.py index 492792645..ee47cb7f3 100644 --- a/monitor/test_result.py +++ b/monitor/test_result.py @@ -68,7 +68,7 @@ def send_to_slack(self, webhook: str, is_slack_workflow: bool = False): requests.post(url=webhook, headers={'Content-type': "application/json"}, data=json.dumps(data)) @staticmethod - def display_name(str_value: Optional[str]) -> str: + def display_name(str_value: str) -> str: return ' '.join([word[0].upper() + word[1:] for word in str_value.split('_')]) @staticmethod From 1e78beba36a05ee48eda7fa06735ca3e317438a5 Mon Sep 17 00:00:00 2001 From: oravi Date: Mon, 20 Jun 2022 19:50:15 +0300 Subject: [PATCH 4/4] CR fixes --- monitor/cli.py | 13 +++++++++---- monitor/data_monitoring.py | 27 ++++++++++++++------------- monitor/test_result.py | 12 ++++-------- 3 files changed, 27 insertions(+), 25 deletions(-) diff --git a/monitor/cli.py b/monitor/cli.py index 5112a75a7..8130e9080 100644 --- a/monitor/cli.py +++ b/monitor/cli.py @@ -90,12 +90,15 @@ def monitor(ctx, days_back, slack_webhook, config_dir, profiles_dir, update_dbt_ track_cli_start(anonymous_tracking, 'monitor', get_cli_properties(), ctx.command.name) try: data_monitoring = DataMonitoring(config, update_dbt_package, slack_webhook) - status_code = data_monitoring.run(days_back, full_refresh_dbt_package) + success = data_monitoring.run(days_back, full_refresh_dbt_package) track_cli_end(anonymous_tracking, 'monitor', data_monitoring.properties(), ctx.command.name) + if not success: + return 1 + return 0 except Exception as exc: track_cli_exception(anonymous_tracking, 'monitor', exc, ctx.command.name) raise - return status_code + @monitor.command() @click.option( @@ -132,12 +135,14 @@ def report(ctx, config_dir, profiles_dir, update_dbt_package, profile_target): track_cli_start(anonymous_tracking, 'monitor-report', get_cli_properties(), ctx.command.name) try: data_monitoring = DataMonitoring(config, update_dbt_package) - status_code = data_monitoring.generate_report() + success = data_monitoring.generate_report() track_cli_end(anonymous_tracking, 'monitor-report', data_monitoring.properties(), ctx.command.name) + if not success: + return 1 + return 0 except Exception as exc: track_cli_exception(anonymous_tracking, 'monitor-report', exc, ctx.command.name) raise - return status_code if __name__ == "__main__": diff --git a/monitor/data_monitoring.py b/monitor/data_monitoring.py index 813679f97..b4f3ae640 100644 --- a/monitor/data_monitoring.py +++ b/monitor/data_monitoring.py @@ -30,7 +30,7 @@ def __init__(self, config: Config, force_update_dbt_package: bool = False, self.execution_properties = {} self.slack_webhook = slack_webhook or self.config.slack_notification_webhook self._download_dbt_package_if_needed(force_update_dbt_package) - self.status_code = 0 + self.success = True def _dbt_package_exists(self) -> bool: return os.path.exists(self.DBT_PROJECT_PACKAGES_PATH) or os.path.exists(self.DBT_PROJECT_MODULES_PATH) @@ -60,7 +60,7 @@ def _query_alerts(self, days_back: int) -> list: if test_result_object: test_result_alerts.append(test_result_object) else: - self.status_code = 1 + self.success = False return test_result_alerts @@ -77,7 +77,7 @@ def _send_to_slack(self, test_result_alerts: [TestResult]) -> None: if sent_alert_count > 0: self._update_sent_alerts(sent_alerts) else: - self.status_code = 1 + self.success = False logger.info("Alerts found but slack webhook is not configured (see documentation on how to configure " "a slack webhook)") @@ -91,7 +91,7 @@ def _download_dbt_package_if_needed(self, force_update_dbt_packages: bool): self.execution_properties['package_downloaded'] = package_downloaded if not package_downloaded: logger.info('Could not download internal dbt package') - self.status_code = 1 + self.success = False return def _send_alerts(self, days_back: int): @@ -101,22 +101,23 @@ def _send_alerts(self, days_back: int): if alert_count > 0: self._send_to_slack(alerts) - def run(self, days_back: int, dbt_full_refresh: bool = False) -> int: + def run(self, days_back: int, dbt_full_refresh: bool = False) -> bool: logger.info("Running internal dbt run to aggregate alerts") success = self.dbt_runner.run(models='alerts', full_refresh=dbt_full_refresh) self.execution_properties['alerts_run_success'] = success if not success: logger.info('Could not aggregate alerts successfully') - self.execution_properties['run_status_code'] = 1 - return 1 + self.success = False + self.execution_properties['success'] = self.success + return self.success self._send_alerts(days_back) self.execution_properties['run_end'] = True - self.execution_properties['run_status_code'] = self.status_code - return self.status_code + self.execution_properties['success'] = self.success + return self.success - def generate_report(self) -> int: + def generate_report(self) -> bool: elementary_output = {} elementary_output['creation_time'] = get_now_utc_str() test_results, test_result_totals = self._get_test_results_and_totals() @@ -146,8 +147,8 @@ def generate_report(self) -> int: elementary_html_file_path = 'file://' + elementary_html_file_path webbrowser.open_new_tab(elementary_html_file_path) self.execution_properties['report_end'] = True - self.execution_properties['report_status_code'] = self.status_code - return self.status_code + self.execution_properties['success'] = self.success + return self.success def _get_test_results_and_totals(self): results = self.dbt_runner.run_operation(macro_name='get_test_results') @@ -168,7 +169,7 @@ def _get_test_results_and_totals(self): self._update_test_results_totals(test_result_totals_api_dict, model_unique_id, days_diff, test_result_object.status) else: - self.status_code = 1 + self.success = False self.execution_properties['test_results'] = len(test_result_dicts) return test_results_api_dict, test_result_totals_api_dict diff --git a/monitor/test_result.py b/monitor/test_result.py index ee47cb7f3..933d14212 100644 --- a/monitor/test_result.py +++ b/monitor/test_result.py @@ -25,8 +25,8 @@ def create_test_result_from_dict(test_result_dict: dict) -> Optional['TestResult else: return ElementaryTestResult(**test_result_dict) except Exception: - logger.error(f"Failed parsing test result - {json.dumps(test_result_dict)}") - return None + logger.exception(f"Failed parsing test result - {json.dumps(test_result_dict)}") + return None def to_slack_message(self, slack_workflows: bool = False) -> dict: pass @@ -104,12 +104,8 @@ def __init__(self, id, model_unique_id, test_unique_id, detected_at, database_na self.test_name = test_name self.test_display_name = self.display_name(test_name) self.other = other - if test_sub_type: - self.test_sub_type = test_sub_type - self.test_sub_type_display_name = self.display_name(test_sub_type) - else: - self.test_sub_type = '' - self.test_sub_type_display_name = '' + self.test_sub_type = test_sub_type if test_sub_type else '' + self.test_sub_type_display_name = self.display_name(test_sub_type) if test_sub_type else '' self.test_results_query = test_results_query.strip() if test_results_query else '' self.test_rows_sample = test_rows_sample if test_rows_sample else '' self.test_params = test_params