diff --git a/config/config.py b/config/config.py index 2764dfac2..c43e589b9 100644 --- a/config/config.py +++ b/config/config.py @@ -12,7 +12,9 @@ class Config(object): - SLACK_NOTIFICATION_WEBHOOK = 'slack_notification_webhook' + SLACK = 'slack' + NOTIFICATION_WEBHOOK = 'notification_webhook' + WORKFLOWS = 'workflows' CONFIG_FILE_NAME = 'config.yml' def __init__(self, config_dir: str, profiles_dir: str, profile_name: str) -> None: @@ -47,7 +49,11 @@ def anonymous_tracking_enabled(self) -> bool: @property def slack_notification_webhook(self) -> Union[str, None]: - return self.config_dict.get(self.SLACK_NOTIFICATION_WEBHOOK) + return self.config_dict.get(self.SLACK).get(self.NOTIFICATION_WEBHOOK) if self.config_dict.get(self.SLACK) else None + + @property + def is_slack_workflow(self) -> bool: + return True if self.config_dict.get(self.SLACK) and self.config_dict.get(self.SLACK).get(self.WORKFLOWS) else False @property def target_dir(self) -> str: diff --git a/monitor/alerts.py b/monitor/alerts.py index 299e6d531..11e02d366 100644 --- a/monitor/alerts.py +++ b/monitor/alerts.py @@ -30,8 +30,14 @@ def send(webhook: str, data: dict, content_types: str = "application/json"): def to_slack_message(self) -> dict: pass - def send_to_slack(self, webhook: str): - data = self.to_slack_message() + def to_slack_workflows_message(self) -> dict: + pass + + def send_to_slack(self, webhook: str, is_slack_workflow: bool = False): + if is_slack_workflow: + data = self.to_slack_workflows_message() + else: + data = self.to_slack_message() self.send(webhook, data) @property @@ -91,6 +97,15 @@ def to_slack_message(self) -> dict: ] } + def to_slack_workflows_message(self) -> dict: + return { + "alert_description": self.ALERT_DESCRIPTION, + "table_name": self.table_name, + "detected_at": self.detected_at, + "type": self.change_type, + "description": self.description + } + class AnomalyDetectionAlert(Alert): ALERT_DESCRIPTION = "Data anomaly detected" @@ -143,3 +158,12 @@ def to_slack_message(self) -> dict: } ] } + + def to_slack_workflows_message(self) -> dict: + return { + "alert_description": self.ALERT_DESCRIPTION, + "table_name": self.table_name, + "detected_at": self.detected_at, + "type": self.anomaly_type, + "description": self.description + } diff --git a/monitor/data_monitoring.py b/monitor/data_monitoring.py index f1410dd25..49629e2cd 100644 --- a/monitor/data_monitoring.py +++ b/monitor/data_monitoring.py @@ -70,7 +70,7 @@ def _send_to_slack(self, alerts: [Alert]) -> None: if slack_webhook is not None: sent_alerts = [] for alert in alerts: - alert.send_to_slack(slack_webhook) + alert.send_to_slack(slack_webhook, self.config.is_slack_workflow) sent_alerts.append(alert.id) sent_alert_count = len(sent_alerts) diff --git a/tests/config/test_config.py b/tests/config/test_config.py index 44ed59817..238f7fb6d 100644 --- a/tests/config/test_config.py +++ b/tests/config/test_config.py @@ -6,7 +6,9 @@ FILE_DIR = os.path.dirname(__file__) -CONFIG = {'slack_notification_webhook': 'test_slack_webhook', 'dbt_projects': [FILE_DIR]} +CONFIG = {'slack': { 'notification_webhook': 'test_slack_webhook', 'workflows': False }, 'dbt_projects': [FILE_DIR]} + +WORKFLOWS_CONFIG = {'slack': { 'notification_webhook': 'test_slack_webhook', 'workflows': True }, 'dbt_projects': [FILE_DIR]} SOURCES = {'sources': [{'name': 'unit_tests', @@ -53,15 +55,15 @@ 'query_tag': 'best_tag'}}}} -def create_config_files(): +def create_config_files(config: dict): yml = OrderedYaml() - yml.dump(CONFIG, os.path.join(FILE_DIR, 'config.yml')) + yml.dump(config, os.path.join(FILE_DIR, 'config.yml')) schema_path = os.path.join(FILE_DIR, 'models') if not os.path.exists(schema_path): os.makedirs(schema_path) yml.dump(SOURCES, os.path.join(schema_path, 'schema.yml')) yml.dump(DBT_PROJECT, os.path.join(FILE_DIR, 'dbt_project.yml')) - yml.dump(PROFILES, os.path.join(FILE_DIR, 'profiles.yml')) + yml.dump(PROFILES, os.path.join(FILE_DIR, 'profiles.yml')) def read_csv(csv_path): @@ -75,12 +77,20 @@ def read_csv(csv_path): @pytest.fixture def config(): - create_config_files() + create_config_files(CONFIG) + return Config(config_dir=FILE_DIR, profiles_dir=FILE_DIR, profile_name='elementary') + +@pytest.fixture +def slack_workflows_config(): + create_config_files(WORKFLOWS_CONFIG) return Config(config_dir=FILE_DIR, profiles_dir=FILE_DIR, profile_name='elementary') def test_config_get_slack_notification_webhook(config): - assert config.slack_notification_webhook == CONFIG['slack_notification_webhook'] + assert config.slack_notification_webhook == CONFIG['slack']['notification_webhook'] + +def test_slack_workflows_config_get_workflows(slack_workflows_config): + assert slack_workflows_config.is_slack_workflow == WORKFLOWS_CONFIG['slack']['workflows'] def test_config__get_sources_from_all_dbt_projects(config): diff --git a/tests/monitor/test_alerts.py b/tests/monitor/test_alerts.py index 0b3619bff..ea4e1ddf8 100644 --- a/tests/monitor/test_alerts.py +++ b/tests/monitor/test_alerts.py @@ -23,3 +23,14 @@ def test_schema_change_alert_to_slack_message(): assert alert.description.lower() in schema_change_slack_msg_str local_time = convert_utc_time_to_local_time(alert_time) assert local_time.strftime('%Y-%m-%d %H:%M:%S') in schema_change_slack_msg_str + + +def test_schema_change_alert_to_slack_workflow_message(): + alert_time = datetime.now() + alert = SchemaChangeAlert('123', 'db', 'sc', 't1', alert_time, 'column_added', 'Column was added') + schema_change_slack_msg_str = json.dumps(alert.to_slack_workflows_message()).lower() + assert alert.table_name.lower() in schema_change_slack_msg_str + assert alert.change_type.lower() in schema_change_slack_msg_str + assert alert.description.lower() in schema_change_slack_msg_str + local_time = convert_utc_time_to_local_time(alert_time) + assert local_time.strftime('%Y-%m-%d %H:%M:%S') in schema_change_slack_msg_str diff --git a/tests/monitor/test_data_monitoring.py b/tests/monitor/test_data_monitoring.py index 370baaef4..0909f5d05 100644 --- a/tests/monitor/test_data_monitoring.py +++ b/tests/monitor/test_data_monitoring.py @@ -29,9 +29,18 @@ def config_mock(): config_mock = mock.create_autospec(Config) config_mock.profiles_dir = 'profiles_dir_path' config_mock.slack_notification_webhook = WEBHOOK_URL + config_mock.is_slack_workflow = False config_mock.monitoring_configuration_in_dbt_sources_to_csv.return_value = None return config_mock +@pytest.fixture +def slack_workflows_config_mock(): + config_mock = mock.create_autospec(Config) + config_mock.profiles_dir = 'profiles_dir_path' + config_mock.slack_notification_webhook = WEBHOOK_URL + config_mock.is_slack_workflow = True + config_mock.monitoring_configuration_in_dbt_sources_to_csv.return_value = None + return config_mock @pytest.fixture def dbt_runner_mock(): @@ -50,8 +59,7 @@ def snowflake_data_monitoring_with_empty_config_in_db(config_mock, snowflake_con return snowflake_data_mon -@pytest.fixture -def snowflake_data_monitoring(config_mock, snowflake_con_mock, dbt_runner_mock): +def create_snowflake_data_mon(config_mock, snowflake_con_mock, dbt_runner_mock): # This cursor mock will use the side effect to return non empty configuration snowflake_cursor_context_manager_return_value = snowflake_con_mock.cursor.return_value.__enter__.return_value @@ -65,7 +73,17 @@ def execute_query_side_effect(*args, **kwargs): snowflake_data_mon = SnowflakeDataMonitoring(config_mock, snowflake_con_mock) snowflake_data_mon.dbt_runner = dbt_runner_mock - return snowflake_data_mon + return snowflake_data_mon + + +@pytest.fixture +def snowflake_data_monitoring(config_mock, snowflake_con_mock, dbt_runner_mock): + return create_snowflake_data_mon(config_mock, snowflake_con_mock, dbt_runner_mock) + + +@pytest.fixture +def snowflake_data_monitoring_slack_workflow(slack_workflows_config_mock, snowflake_con_mock, dbt_runner_mock): + return create_snowflake_data_mon(slack_workflows_config_mock, snowflake_con_mock, dbt_runner_mock) @pytest.fixture @@ -203,6 +221,12 @@ def test_data_monitoring_send_alert_to_slack(requests_post_mock, snowflake_data_ requests_post_mock.assert_called_once_with(url=WEBHOOK_URL, headers={'Content-type': 'application/json'}, data=json.dumps(alert.to_slack_message())) +@mock.patch('requests.post') +def test_data_monitoring_send_alert_to_slack_workflows(requests_post_mock, snowflake_data_monitoring_slack_workflow): + alert = Alert.create_alert_from_row(ALERT_ROW) + snowflake_data_monitoring_slack_workflow._send_to_slack([alert]) + requests_post_mock.assert_called_once_with(url=WEBHOOK_URL, headers={'Content-type': 'application/json'}, + data=json.dumps(alert.to_slack_workflows_message())) def test_data_monitoring_send_alerts(snowflake_data_monitoring_with_alerts_in_db): snowflake_data_monitoring = snowflake_data_monitoring_with_alerts_in_db @@ -211,3 +235,5 @@ def test_data_monitoring_send_alerts(snowflake_data_monitoring_with_alerts_in_db assert len(alerts) == len(expected_alerts) assert alerts[0].id == expected_alerts[0].id assert type(alerts[0]) == type(expected_alerts[0]) + +