diff --git a/elementary/monitor/api/alerts/alerts.py b/elementary/monitor/api/alerts/alerts.py index 8ea628b4f..c91986592 100644 --- a/elementary/monitor/api/alerts/alerts.py +++ b/elementary/monitor/api/alerts/alerts.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import Dict, List +from typing import Dict, List, Optional from elementary.clients.api.api_client import APIClient from elementary.clients.dbt.dbt_runner import DbtRunner @@ -24,13 +24,13 @@ def __init__( config=self.config, ) - def get_new_alerts(self, days_back: int) -> List[PendingAlertSchema]: - pending_alerts = self.alerts_fetcher.query_pending_alerts(days_back=days_back) + def get_new_alerts(self, days_back: int, hours_back: Optional[int] = None) -> List[PendingAlertSchema]: + pending_alerts = self.alerts_fetcher.query_pending_alerts(days_back=days_back, hours_back=hours_back) return pending_alerts - def get_alerts_last_sent_times(self, days_back: int) -> Dict[str, datetime]: + def get_alerts_last_sent_times(self, days_back: int, hours_back: Optional[int] = None) -> Dict[str, datetime]: alerts_last_sent_times = self.alerts_fetcher.query_last_alert_times( - days_back=days_back + days_back=days_back, hours_back=hours_back ) last_sent_times = dict() for alert_class_id, last_sent_time_as_string in alerts_last_sent_times.items(): diff --git a/elementary/monitor/cli.py b/elementary/monitor/cli.py index c896977f5..da648c0a9 100644 --- a/elementary/monitor/cli.py +++ b/elementary/monitor/cli.py @@ -93,6 +93,14 @@ def decorator(func): default=1 if cmd == Command.MONITOR else 7, help="Set a limit to how far back should edr collect data.", )(func) + if cmd in (Command.MONITOR): + func = click.option( + "--hours-back", + "-h", + type=int, + default=None, + help="Optionally set an hourly limit to how far back should edr collect data. If provided, it overrides --days-back.", + )(func) func = click.option( "--env", type=str, @@ -166,6 +174,7 @@ def get_cli_properties() -> dict: full_refresh_dbt_package = params.get("full_refresh_dbt_package") select = params.get("select") days_back = params.get("days_back") + hours_back = params.get("hours_back") timezone = params.get("timezone") group_by = params.get("group_by") suppression_interval = params.get("suppression_interval") @@ -177,6 +186,7 @@ def get_cli_properties() -> dict: "full_refresh_dbt_package": full_refresh_dbt_package, "select": select, "days_back": days_back, + "hours_back": hours_back, "timezone": timezone, "group_by": group_by, "suppression_interval": suppression_interval, @@ -272,6 +282,7 @@ def get_cli_properties() -> dict: def monitor( ctx, days_back, + hours_back, slack_webhook, deprecated_slack_webhook, slack_token, @@ -362,7 +373,7 @@ def monitor( Command.MONITOR, get_cli_properties(), ctx.command.name ) success = data_monitoring.run_alerts( - days_back, full_refresh_dbt_package, dbt_vars=vars + days_back, hours_back, full_refresh_dbt_package, dbt_vars=vars ) anonymous_tracking.track_cli_end( Command.MONITOR, data_monitoring.properties(), ctx.command.name diff --git a/elementary/monitor/data_monitoring/alerts/data_monitoring_alerts.py b/elementary/monitor/data_monitoring/alerts/data_monitoring_alerts.py index 0494b3e7e..5e058fbb8 100644 --- a/elementary/monitor/data_monitoring/alerts/data_monitoring_alerts.py +++ b/elementary/monitor/data_monitoring/alerts/data_monitoring_alerts.py @@ -68,6 +68,7 @@ def _get_integration_client(self) -> BaseIntegration: def _populate_data( self, days_back: Optional[int] = None, + hours_back: Optional[int] = None, dbt_full_refresh: bool = False, dbt_vars: Optional[dict] = None, ) -> bool: @@ -75,6 +76,8 @@ def _populate_data( vars = dbt_vars or dict() if days_back: vars.update(days_back=days_back) + if hours_back: + vars.update(hours_back=hours_back) success = self.internal_dbt_runner.run( models="elementary_cli.alerts.alerts_v2", full_refresh=dbt_full_refresh, @@ -86,17 +89,17 @@ def _populate_data( return success - def _fetch_data(self, days_back: int) -> List[PendingAlertSchema]: + def _fetch_data(self, days_back: int, hours_back: Optional[int] = None) -> List[PendingAlertSchema]: return self.alerts_api.get_new_alerts( - days_back=days_back, + days_back=days_back, hours_back=hours_back, ) def _filter_data(self, data: List[PendingAlertSchema]) -> List[PendingAlertSchema]: return filter_alerts(data, alerts_filter=self.selector_filter) - def _fetch_last_sent_times(self, days_back: int) -> Dict[str, datetime]: + def _fetch_last_sent_times(self, days_back: int, hours_back: Optional[int] = None) -> Dict[str, datetime]: return self.alerts_api.get_alerts_last_sent_times( - days_back=days_back, + days_back=days_back, hours_back=hours_back, ) def _sort_alerts( @@ -286,6 +289,7 @@ def _skip_alerts(self, alerts: List[PendingAlertSchema]): def run_alerts( self, days_back: int, + hours_back: int, dbt_full_refresh: bool = False, dbt_vars: Optional[dict] = None, ) -> bool: @@ -293,6 +297,7 @@ def run_alerts( if self.should_populate_data: popopulated_data_successfully = self._populate_data( days_back=days_back, + hours_back=hours_back, dbt_full_refresh=dbt_full_refresh, dbt_vars=dbt_vars, ) @@ -302,9 +307,9 @@ def run_alerts( return self.success # Fetch and filter data - alerts = self._fetch_data(days_back) + alerts = self._fetch_data(days_back=days_back, hours_back=hours_back) alerts = self._filter_data(alerts) - alerts_last_sent_times = self._fetch_last_sent_times(days_back) + alerts_last_sent_times = self._fetch_last_sent_times(days_back=days_back, hours_back=hours_back) sorted_alerts = self._sort_alerts( alerts=alerts, alerts_last_sent_times=alerts_last_sent_times ) diff --git a/elementary/monitor/dbt_project/macros/alerts/fetchers/get_alerts.sql b/elementary/monitor/dbt_project/macros/alerts/fetchers/get_alerts.sql index 17af8b76c..49cc12daf 100644 --- a/elementary/monitor/dbt_project/macros/alerts/fetchers/get_alerts.sql +++ b/elementary/monitor/dbt_project/macros/alerts/fetchers/get_alerts.sql @@ -1,10 +1,15 @@ -{% macro get_pending_alerts(days_back, type=none) %} +{% macro get_pending_alerts(days_back, type=none, hours_back=none) %} -- depends_on: {{ ref('alerts_v2') }} + {% if hours_back %} + {% set alerts_time_limit = elementary_cli.get_alerts_time_limit_hour(hours_back) %} + {% else %} + {% set alerts_time_limit = elementary_cli.get_alerts_time_limit(days_back) %} + {% endif %} {% set select_pending_alerts_query %} with alerts_in_time_limit as ( select * from {{ ref('elementary_cli', 'alerts_v2') }} - where {{ elementary.edr_cast_as_timestamp('detected_at') }} >= {{ elementary_cli.get_alerts_time_limit(days_back) }} + where {{ elementary.edr_cast_as_timestamp('detected_at') }} >= {{ alerts_time_limit }} {% if type %} and lower(type) = {{ elementary.edr_quote(type | lower) }} {% endif %} @@ -30,8 +35,13 @@ {% endmacro %} -{% macro get_last_alert_sent_times(days_back, type=none) %} +{% macro get_last_alert_sent_times(days_back, type=none, hours_back=none) %} -- depends_on: {{ ref('alerts_v2') }} + {% if hours_back %} + {% set alerts_time_limit = elementary_cli.get_alerts_time_limit_hour(hours_back) %} + {% else %} + {% set alerts_time_limit = elementary_cli.get_alerts_time_limit(days_back) %} + {% endif %} {% set select_last_alert_sent_times_query %} with alerts_in_time_limit as ( select @@ -39,7 +49,7 @@ status, sent_at from {{ ref('alerts_v2') }} - where {{ elementary.edr_cast_as_timestamp('detected_at') }} >= {{ elementary_cli.get_alerts_time_limit(days_back) }} + where {{ elementary.edr_cast_as_timestamp('detected_at') }} >= {{ alerts_time_limit }} {% if type %} and lower(type) = {{ elementary.edr_quote(type | lower) }} {% endif %} diff --git a/elementary/monitor/dbt_project/macros/get_alerts_time_limit_hour.sql b/elementary/monitor/dbt_project/macros/get_alerts_time_limit_hour.sql new file mode 100644 index 000000000..330037f61 --- /dev/null +++ b/elementary/monitor/dbt_project/macros/get_alerts_time_limit_hour.sql @@ -0,0 +1,5 @@ +{% macro get_alerts_time_limit_hour(hours_back=24) %} + {% set nowtime = elementary.edr_current_timestamp() %} + {% set datetime_limit = elementary.edr_timeadd('hour', hours_back * -1, nowtime) %} + {{ return(datetime_limit) }} +{% endmacro %} diff --git a/elementary/monitor/fetchers/alerts/alerts.py b/elementary/monitor/fetchers/alerts/alerts.py index e14768095..ca176490a 100644 --- a/elementary/monitor/fetchers/alerts/alerts.py +++ b/elementary/monitor/fetchers/alerts/alerts.py @@ -38,11 +38,11 @@ def skip_alerts( ) def query_pending_alerts( - self, days_back: int, type: Optional[AlertTypes] = None + self, days_back: int, type: Optional[AlertTypes] = None, hours_back: Optional[int] = None ) -> List[PendingAlertSchema]: pending_alerts_results = self.dbt_runner.run_operation( macro_name="elementary_cli.get_pending_alerts", - macro_args={"days_back": days_back, "type": type.value if type else None}, + macro_args={"days_back": days_back, "type": type.value if type else None, "hours_back": hours_back}, ) return [ PendingAlertSchema(**result) @@ -50,11 +50,11 @@ def query_pending_alerts( ] def query_last_alert_times( - self, days_back: int, type: Optional[AlertTypes] = None + self, days_back: int, type: Optional[AlertTypes] = None, hours_back: Optional[int] = None ) -> Dict[str, str]: response = self.dbt_runner.run_operation( macro_name="elementary_cli.get_last_alert_sent_times", - macro_args={"days_back": days_back, "type": type.value if type else None}, + macro_args={"days_back": days_back, "type": type.value if type else None, "hours_back": hours_back}, ) return json.loads(response[0]) diff --git a/pyproject.toml b/pyproject.toml index c24394e7a..70bb7a507 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "elementary-data" -version = "0.15.1" +version = "0.15.1+rv10" description = "Data monitoring and lineage" authors = ["Elementary"] keywords = ["data", "lineage", "data lineage", "data warehouse", "DWH", "observability", "data monitoring", "data observability", "Snowflake", "BigQuery", "Redshift", "data reliability", "analytics engineering"]