diff --git a/monitor/cli.py b/monitor/cli.py index a2f717212..8130e9080 100644 --- a/monitor/cli.py +++ b/monitor/cli.py @@ -90,8 +90,11 @@ def monitor(ctx, days_back, slack_webhook, config_dir, profiles_dir, update_dbt_ track_cli_start(anonymous_tracking, 'monitor', get_cli_properties(), ctx.command.name) try: data_monitoring = DataMonitoring(config, update_dbt_package, slack_webhook) - data_monitoring.run(days_back, full_refresh_dbt_package) + success = data_monitoring.run(days_back, full_refresh_dbt_package) track_cli_end(anonymous_tracking, 'monitor', data_monitoring.properties(), ctx.command.name) + if not success: + return 1 + return 0 except Exception as exc: track_cli_exception(anonymous_tracking, 'monitor', exc, ctx.command.name) raise @@ -132,12 +135,14 @@ def report(ctx, config_dir, profiles_dir, update_dbt_package, profile_target): track_cli_start(anonymous_tracking, 'monitor-report', get_cli_properties(), ctx.command.name) try: data_monitoring = DataMonitoring(config, update_dbt_package) - data_monitoring.generate_report() + success = data_monitoring.generate_report() track_cli_end(anonymous_tracking, 'monitor-report', data_monitoring.properties(), ctx.command.name) + if not success: + return 1 + return 0 except Exception as exc: track_cli_exception(anonymous_tracking, 'monitor-report', exc, ctx.command.name) raise - return if __name__ == "__main__": diff --git a/monitor/data_monitoring.py b/monitor/data_monitoring.py index 46c0dfe75..b4f3ae640 100644 --- a/monitor/data_monitoring.py +++ b/monitor/data_monitoring.py @@ -30,6 +30,7 @@ def __init__(self, config: Config, force_update_dbt_package: bool = False, self.execution_properties = {} self.slack_webhook = slack_webhook or self.config.slack_notification_webhook self._download_dbt_package_if_needed(force_update_dbt_package) + self.success = True def _dbt_package_exists(self) -> bool: return os.path.exists(self.DBT_PROJECT_PACKAGES_PATH) or os.path.exists(self.DBT_PROJECT_MODULES_PATH) @@ -55,7 +56,12 @@ def _query_alerts(self, days_back: int) -> list: test_result_alert_dicts = json.loads(results[0]) self.execution_properties['alert_rows'] = len(test_result_alert_dicts) for test_result_alert_dict in test_result_alert_dicts: - test_result_alerts.append(TestResult.create_test_result_from_dict(test_result_alert_dict)) + test_result_object = TestResult.create_test_result_from_dict(test_result_alert_dict) + if test_result_object: + test_result_alerts.append(test_result_object) + else: + self.success = False + return test_result_alerts def _send_to_slack(self, test_result_alerts: [TestResult]) -> None: @@ -71,6 +77,7 @@ def _send_to_slack(self, test_result_alerts: [TestResult]) -> None: if sent_alert_count > 0: self._update_sent_alerts(sent_alerts) else: + self.success = False logger.info("Alerts found but slack webhook is not configured (see documentation on how to configure " "a slack webhook)") @@ -84,6 +91,7 @@ def _download_dbt_package_if_needed(self, force_update_dbt_packages: bool): self.execution_properties['package_downloaded'] = package_downloaded if not package_downloaded: logger.info('Could not download internal dbt package') + self.success = False return def _send_alerts(self, days_back: int): @@ -93,18 +101,23 @@ def _send_alerts(self, days_back: int): if alert_count > 0: self._send_to_slack(alerts) - def run(self, days_back: int, dbt_full_refresh: bool = False) -> None: + def run(self, days_back: int, dbt_full_refresh: bool = False) -> bool: logger.info("Running internal dbt run to aggregate alerts") success = self.dbt_runner.run(models='alerts', full_refresh=dbt_full_refresh) self.execution_properties['alerts_run_success'] = success if not success: logger.info('Could not aggregate alerts successfully') - return + self.success = False + self.execution_properties['success'] = self.success + return self.success self._send_alerts(days_back) + self.execution_properties['run_end'] = True + self.execution_properties['success'] = self.success + return self.success - def generate_report(self): + def generate_report(self) -> bool: elementary_output = {} elementary_output['creation_time'] = get_now_utc_str() test_results, test_result_totals = self._get_test_results_and_totals() @@ -133,7 +146,9 @@ def generate_report(self): elementary_html_file_path = 'file://' + elementary_html_file_path webbrowser.open_new_tab(elementary_html_file_path) - self.execution_properties['report_success'] = True + self.execution_properties['report_end'] = True + self.execution_properties['success'] = self.success + return self.success def _get_test_results_and_totals(self): results = self.dbt_runner.run_operation(macro_name='get_test_results') @@ -144,14 +159,17 @@ def _get_test_results_and_totals(self): for test_result_dict in test_result_dicts: days_diff = test_result_dict.pop('days_diff') test_result_object = TestResult.create_test_result_from_dict(test_result_dict) - model_unique_id = test_result_object.model_unique_id - if model_unique_id in test_results_api_dict: - test_results_api_dict[model_unique_id].append(test_result_object.to_test_result_api_dict()) + if test_result_object: + model_unique_id = test_result_object.model_unique_id + if model_unique_id in test_results_api_dict: + test_results_api_dict[model_unique_id].append(test_result_object.to_test_result_api_dict()) + else: + test_results_api_dict[model_unique_id] = [test_result_object.to_test_result_api_dict()] + + self._update_test_results_totals(test_result_totals_api_dict, model_unique_id, days_diff, + test_result_object.status) else: - test_results_api_dict[model_unique_id] = [test_result_object.to_test_result_api_dict()] - - self._update_test_results_totals(test_result_totals_api_dict, model_unique_id, days_diff, - test_result_object.status) + self.success = False self.execution_properties['test_results'] = len(test_result_dicts) return test_results_api_dict, test_result_totals_api_dict diff --git a/monitor/test_result.py b/monitor/test_result.py index 00ac3909b..933d14212 100644 --- a/monitor/test_result.py +++ b/monitor/test_result.py @@ -3,10 +3,12 @@ from utils.time import convert_utc_time_to_local_time from datetime import datetime from utils.json_utils import try_load_json +from typing import Optional +from utils.log import get_logger import re +logger = get_logger(__name__) -#TODO: handle last min, max in metrics graph, last or anomalous? class TestResult(object): def __init__(self, id, model_unique_id, test_unique_id, status) -> None: self.id = id @@ -15,12 +17,16 @@ def __init__(self, id, model_unique_id, test_unique_id, status) -> None: self.status = status @staticmethod - def create_test_result_from_dict(test_result_dict: dict) -> 'TestResult': + def create_test_result_from_dict(test_result_dict: dict) -> Optional['TestResult']: test_type = test_result_dict.get('test_type') - if test_type == 'dbt_test': - return DbtTestResult(**test_result_dict) - else: - return ElementaryTestResult(**test_result_dict) + try: + if test_type == 'dbt_test': + return DbtTestResult(**test_result_dict) + else: + return ElementaryTestResult(**test_result_dict) + except Exception: + logger.exception(f"Failed parsing test result - {json.dumps(test_result_dict)}") + return None def to_slack_message(self, slack_workflows: bool = False) -> dict: pass @@ -62,7 +68,7 @@ def send_to_slack(self, webhook: str, is_slack_workflow: bool = False): requests.post(url=webhook, headers={'Content-type': "application/json"}, data=json.dumps(data)) @staticmethod - def display_name(str_value): + def display_name(str_value: str) -> str: return ' '.join([word[0].upper() + word[1:] for word in str_value.split('_')]) @staticmethod @@ -98,8 +104,8 @@ def __init__(self, id, model_unique_id, test_unique_id, detected_at, database_na self.test_name = test_name self.test_display_name = self.display_name(test_name) self.other = other - self.test_sub_type = test_sub_type - self.test_sub_type_display_name = self.display_name(test_sub_type) + self.test_sub_type = test_sub_type if test_sub_type else '' + self.test_sub_type_display_name = self.display_name(test_sub_type) if test_sub_type else '' self.test_results_query = test_results_query.strip() if test_results_query else '' self.test_rows_sample = test_rows_sample if test_rows_sample else '' self.test_params = test_params