Skip to content

Commit

Permalink
Merge pull request #54 from aaron-westlake/add-slack-workflow-config
Browse files Browse the repository at this point in the history
add config to send alerts to slack workflow endpoints
  • Loading branch information
oravi authored Mar 10, 2022
2 parents c77cde1 + b91ceb7 commit 359b53a
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 14 deletions.
10 changes: 8 additions & 2 deletions config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@


class Config(object):
SLACK_NOTIFICATION_WEBHOOK = 'slack_notification_webhook'
SLACK = 'slack'
NOTIFICATION_WEBHOOK = 'notification_webhook'
WORKFLOWS = 'workflows'
CONFIG_FILE_NAME = 'config.yml'

def __init__(self, config_dir: str, profiles_dir: str, profile_name: str) -> None:
Expand Down Expand Up @@ -47,7 +49,11 @@ def anonymous_tracking_enabled(self) -> bool:

@property
def slack_notification_webhook(self) -> Union[str, None]:
return self.config_dict.get(self.SLACK_NOTIFICATION_WEBHOOK)
return self.config_dict.get(self.SLACK).get(self.NOTIFICATION_WEBHOOK) if self.config_dict.get(self.SLACK) else None

@property
def is_slack_workflow(self) -> bool:
return True if self.config_dict.get(self.SLACK) and self.config_dict.get(self.SLACK).get(self.WORKFLOWS) else False

@property
def target_dir(self) -> str:
Expand Down
28 changes: 26 additions & 2 deletions monitor/alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,14 @@ def send(webhook: str, data: dict, content_types: str = "application/json"):
def to_slack_message(self) -> dict:
pass

def send_to_slack(self, webhook: str):
data = self.to_slack_message()
def to_slack_workflows_message(self) -> dict:
pass

def send_to_slack(self, webhook: str, is_slack_workflow: bool = False):
if is_slack_workflow:
data = self.to_slack_workflows_message()
else:
data = self.to_slack_message()
self.send(webhook, data)

@property
Expand Down Expand Up @@ -91,6 +97,15 @@ def to_slack_message(self) -> dict:
]
}

def to_slack_workflows_message(self) -> dict:
return {
"alert_description": self.ALERT_DESCRIPTION,
"table_name": self.table_name,
"detected_at": self.detected_at,
"type": self.change_type,
"description": self.description
}


class AnomalyDetectionAlert(Alert):
ALERT_DESCRIPTION = "Data anomaly detected"
Expand Down Expand Up @@ -143,3 +158,12 @@ def to_slack_message(self) -> dict:
}
]
}

def to_slack_workflows_message(self) -> dict:
return {
"alert_description": self.ALERT_DESCRIPTION,
"table_name": self.table_name,
"detected_at": self.detected_at,
"type": self.anomaly_type,
"description": self.description
}
2 changes: 1 addition & 1 deletion monitor/data_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def _send_to_slack(self, alerts: [Alert]) -> None:
if slack_webhook is not None:
sent_alerts = []
for alert in alerts:
alert.send_to_slack(slack_webhook)
alert.send_to_slack(slack_webhook, self.config.is_slack_workflow)
sent_alerts.append(alert.id)

sent_alert_count = len(sent_alerts)
Expand Down
22 changes: 16 additions & 6 deletions tests/config/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

FILE_DIR = os.path.dirname(__file__)

CONFIG = {'slack_notification_webhook': 'test_slack_webhook', 'dbt_projects': [FILE_DIR]}
CONFIG = {'slack': { 'notification_webhook': 'test_slack_webhook', 'workflows': False }, 'dbt_projects': [FILE_DIR]}

WORKFLOWS_CONFIG = {'slack': { 'notification_webhook': 'test_slack_webhook', 'workflows': True }, 'dbt_projects': [FILE_DIR]}

SOURCES = {'sources':
[{'name': 'unit_tests',
Expand Down Expand Up @@ -53,15 +55,15 @@
'query_tag': 'best_tag'}}}}


def create_config_files():
def create_config_files(config: dict):
yml = OrderedYaml()
yml.dump(CONFIG, os.path.join(FILE_DIR, 'config.yml'))
yml.dump(config, os.path.join(FILE_DIR, 'config.yml'))
schema_path = os.path.join(FILE_DIR, 'models')
if not os.path.exists(schema_path):
os.makedirs(schema_path)
yml.dump(SOURCES, os.path.join(schema_path, 'schema.yml'))
yml.dump(DBT_PROJECT, os.path.join(FILE_DIR, 'dbt_project.yml'))
yml.dump(PROFILES, os.path.join(FILE_DIR, 'profiles.yml'))
yml.dump(PROFILES, os.path.join(FILE_DIR, 'profiles.yml'))


def read_csv(csv_path):
Expand All @@ -75,12 +77,20 @@ def read_csv(csv_path):

@pytest.fixture
def config():
create_config_files()
create_config_files(CONFIG)
return Config(config_dir=FILE_DIR, profiles_dir=FILE_DIR, profile_name='elementary')

@pytest.fixture
def slack_workflows_config():
create_config_files(WORKFLOWS_CONFIG)
return Config(config_dir=FILE_DIR, profiles_dir=FILE_DIR, profile_name='elementary')


def test_config_get_slack_notification_webhook(config):
assert config.slack_notification_webhook == CONFIG['slack_notification_webhook']
assert config.slack_notification_webhook == CONFIG['slack']['notification_webhook']

def test_slack_workflows_config_get_workflows(slack_workflows_config):
assert slack_workflows_config.is_slack_workflow == WORKFLOWS_CONFIG['slack']['workflows']


def test_config__get_sources_from_all_dbt_projects(config):
Expand Down
11 changes: 11 additions & 0 deletions tests/monitor/test_alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,14 @@ def test_schema_change_alert_to_slack_message():
assert alert.description.lower() in schema_change_slack_msg_str
local_time = convert_utc_time_to_local_time(alert_time)
assert local_time.strftime('%Y-%m-%d %H:%M:%S') in schema_change_slack_msg_str


def test_schema_change_alert_to_slack_workflow_message():
alert_time = datetime.now()
alert = SchemaChangeAlert('123', 'db', 'sc', 't1', alert_time, 'column_added', 'Column was added')
schema_change_slack_msg_str = json.dumps(alert.to_slack_workflows_message()).lower()
assert alert.table_name.lower() in schema_change_slack_msg_str
assert alert.change_type.lower() in schema_change_slack_msg_str
assert alert.description.lower() in schema_change_slack_msg_str
local_time = convert_utc_time_to_local_time(alert_time)
assert local_time.strftime('%Y-%m-%d %H:%M:%S') in schema_change_slack_msg_str
32 changes: 29 additions & 3 deletions tests/monitor/test_data_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,18 @@ def config_mock():
config_mock = mock.create_autospec(Config)
config_mock.profiles_dir = 'profiles_dir_path'
config_mock.slack_notification_webhook = WEBHOOK_URL
config_mock.is_slack_workflow = False
config_mock.monitoring_configuration_in_dbt_sources_to_csv.return_value = None
return config_mock

@pytest.fixture
def slack_workflows_config_mock():
config_mock = mock.create_autospec(Config)
config_mock.profiles_dir = 'profiles_dir_path'
config_mock.slack_notification_webhook = WEBHOOK_URL
config_mock.is_slack_workflow = True
config_mock.monitoring_configuration_in_dbt_sources_to_csv.return_value = None
return config_mock

@pytest.fixture
def dbt_runner_mock():
Expand All @@ -50,8 +59,7 @@ def snowflake_data_monitoring_with_empty_config_in_db(config_mock, snowflake_con
return snowflake_data_mon


@pytest.fixture
def snowflake_data_monitoring(config_mock, snowflake_con_mock, dbt_runner_mock):
def create_snowflake_data_mon(config_mock, snowflake_con_mock, dbt_runner_mock):
# This cursor mock will use the side effect to return non empty configuration
snowflake_cursor_context_manager_return_value = snowflake_con_mock.cursor.return_value.__enter__.return_value

Expand All @@ -65,7 +73,17 @@ def execute_query_side_effect(*args, **kwargs):

snowflake_data_mon = SnowflakeDataMonitoring(config_mock, snowflake_con_mock)
snowflake_data_mon.dbt_runner = dbt_runner_mock
return snowflake_data_mon
return snowflake_data_mon


@pytest.fixture
def snowflake_data_monitoring(config_mock, snowflake_con_mock, dbt_runner_mock):
return create_snowflake_data_mon(config_mock, snowflake_con_mock, dbt_runner_mock)


@pytest.fixture
def snowflake_data_monitoring_slack_workflow(slack_workflows_config_mock, snowflake_con_mock, dbt_runner_mock):
return create_snowflake_data_mon(slack_workflows_config_mock, snowflake_con_mock, dbt_runner_mock)


@pytest.fixture
Expand Down Expand Up @@ -203,6 +221,12 @@ def test_data_monitoring_send_alert_to_slack(requests_post_mock, snowflake_data_
requests_post_mock.assert_called_once_with(url=WEBHOOK_URL, headers={'Content-type': 'application/json'},
data=json.dumps(alert.to_slack_message()))

@mock.patch('requests.post')
def test_data_monitoring_send_alert_to_slack_workflows(requests_post_mock, snowflake_data_monitoring_slack_workflow):
alert = Alert.create_alert_from_row(ALERT_ROW)
snowflake_data_monitoring_slack_workflow._send_to_slack([alert])
requests_post_mock.assert_called_once_with(url=WEBHOOK_URL, headers={'Content-type': 'application/json'},
data=json.dumps(alert.to_slack_workflows_message()))

def test_data_monitoring_send_alerts(snowflake_data_monitoring_with_alerts_in_db):
snowflake_data_monitoring = snowflake_data_monitoring_with_alerts_in_db
Expand All @@ -211,3 +235,5 @@ def test_data_monitoring_send_alerts(snowflake_data_monitoring_with_alerts_in_db
assert len(alerts) == len(expected_alerts)
assert alerts[0].id == expected_alerts[0].id
assert type(alerts[0]) == type(expected_alerts[0])


0 comments on commit 359b53a

Please sign in to comment.