Skip to content

Commit

Permalink
Add detection delay
Browse files Browse the repository at this point in the history
  • Loading branch information
francoisforster committed Sep 18, 2023
1 parent 41a1180 commit a4f8514
Show file tree
Hide file tree
Showing 12 changed files with 174 additions and 39 deletions.
49 changes: 45 additions & 4 deletions integration_tests/tests/test_freshness_anomalies.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from copy import copy
from dataclasses import dataclass
from datetime import datetime, timedelta

Expand All @@ -16,22 +17,39 @@ class FreshnessAnomaliesConfig:
step: timedelta
days_back: int
backfill_days: int
detection_delay_hours: int


HOURLY_CONFIG = FreshnessAnomaliesConfig(
period="hour", step=timedelta(minutes=10), days_back=14, backfill_days=2
period="hour",
step=timedelta(minutes=10),
days_back=14,
backfill_days=2,
detection_delay_hours=0,
)

DAILY_CONFIG = FreshnessAnomaliesConfig(
period="day", step=timedelta(hours=2), days_back=30, backfill_days=3
period="day",
step=timedelta(hours=2),
days_back=30,
backfill_days=3,
detection_delay_hours=0,
)

WEEKLY_CONFIG = FreshnessAnomaliesConfig(
period="week", step=timedelta(hours=12), days_back=7 * 15, backfill_days=14
period="week",
step=timedelta(hours=12),
days_back=7 * 15,
backfill_days=14,
detection_delay_hours=0,
)

MONTHLY_CONFIG = FreshnessAnomaliesConfig(
period="month", step=timedelta(days=2), days_back=30 * 15, backfill_days=60
period="month",
step=timedelta(days=2),
days_back=30 * 15,
backfill_days=60,
detection_delay_hours=0,
)


Expand All @@ -47,6 +65,7 @@ def _get_test_config(self, config: FreshnessAnomaliesConfig) -> dict:
days_back=config.days_back,
backfill_days=config.backfill_days,
time_bucket=dict(period=config.period, count=1),
detection_delay=dict(period='hour', count=config.detection_delay_hours),
)

def _skip_redshift_monthly(
Expand Down Expand Up @@ -94,6 +113,28 @@ def test_stop(
)
assert result["status"] == "fail"

def test_stop_with_delay(
self,
test_id: str,
dbt_project: DbtProject,
config: FreshnessAnomaliesConfig,
target: str,
):
self._skip_redshift_monthly(target, config)
anomaly_date = datetime.now() - timedelta(days=config.backfill_days)
data = [
{TIMESTAMP_COLUMN: date.strftime(DATE_FORMAT)}
for date in generate_dates(
anomaly_date, step=config.step, days_back=(config.days_back)
)
]
delayed_config = copy(config)
delayed_config.detection_delay_hours = 24 * config.backfill_days
result = dbt_project.test(
test_id, TEST_NAME, self._get_test_config(delayed_config), data=data
)
assert result["status"] == "pass"

def test_slower_rate(
self,
test_id: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
{%- else %}
{%- set bucket_seasonality_expr = elementary.const_as_text('no_seasonality') %}
{%- endif %}
{%- set min_bucket_start_expr = elementary.get_trunc_min_bucket_start_expr(metric_properties, test_configuration.days_back) %}
{%- set detection_end = elementary.get_detection_end(test_configuration.detection_delay) %}
{%- set min_bucket_start_expr = elementary.get_trunc_min_bucket_start_expr(detection_end, metric_properties, test_configuration.days_back) %}

{%- set anomaly_scores_query %}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
{% macro get_trunc_min_bucket_start_expr(metric_properties, days_back) %}
{%- set untruncated_min = (elementary.get_run_started_at() - modules.datetime.timedelta(days_back | int)).strftime("%Y-%m-%d 00:00:00") %}
{% macro get_detection_end(detection_delay) %}
{%- set kwargs = {detection_delay.period+'s': detection_delay.count} %}
{%- set detection_end = elementary.get_run_started_at() - modules.datetime.timedelta(**kwargs) %}
{{ return(detection_end) }}
{% endmacro %}

{% macro get_trunc_min_bucket_start_expr(detection_end, metric_properties, days_back) %}
{%- set untruncated_min = (detection_end - modules.datetime.timedelta(days_back | int)).strftime("%Y-%m-%d 00:00:00") %}
{%- set trunc_min_bucket_start_expr = elementary.edr_date_trunc(metric_properties.time_bucket.period, elementary.edr_cast_as_timestamp(elementary.edr_quote(untruncated_min)))%}
{{ return(trunc_min_bucket_start_expr) }}
{% endmacro %}

{# This macro can't be used without truncating to full buckets #}
{% macro get_backfill_bucket_start(backfill_days, metric_properties) %}
{% do return((elementary.get_run_started_at() - modules.datetime.timedelta(backfill_days)).strftime("%Y-%m-%d 00:00:00")) %}
{% macro get_backfill_bucket_start(detection_end, backfill_days) %}
{% do return((detection_end - modules.datetime.timedelta(backfill_days)).strftime("%Y-%m-%d 00:00:00")) %}
{% endmacro %}


{% macro get_test_buckets_min_and_max(model_relation, backfill_days, days_back, monitors=none, column_name=none, metric_properties=none, unit_test=false, unit_test_relation=none) %}
{% macro get_test_buckets_min_and_max(model_relation, backfill_days, days_back, detection_delay, monitors=none, column_name=none, metric_properties=none, unit_test=false, unit_test_relation=none) %}

{%- set run_start_expr = elementary.edr_cast_as_timestamp(elementary.edr_quote(elementary.run_started_at_as_string())) %}
{%- set trunc_min_bucket_start_expr = elementary.get_trunc_min_bucket_start_expr(metric_properties, days_back) %}
{%- set backfill_bucket_start = elementary.edr_cast_as_timestamp(elementary.edr_quote(elementary.get_backfill_bucket_start(backfill_days))) %}
{%- set detection_end = elementary.get_detection_end(detection_delay) %}
{%- set detection_end_expr = elementary.edr_cast_as_timestamp(elementary.edr_quote(detection_end)) %}
{%- set trunc_min_bucket_start_expr = elementary.get_trunc_min_bucket_start_expr(detection_end, metric_properties, days_back) %}
{%- set backfill_bucket_start = elementary.edr_cast_as_timestamp(elementary.edr_quote(elementary.get_backfill_bucket_start(detection_end, backfill_days))) %}
{%- set full_table_name = elementary.relation_to_full_name(model_relation) %}
{%- set force_metrics_backfill = elementary.get_config_var('force_metrics_backfill') %}

Expand All @@ -32,11 +39,11 @@
with bucket_times as (
select
{{ trunc_min_bucket_start_expr }} as days_back_start
, {{ run_start_expr }} as run_started
, {{ detection_end_expr }} as detection_end
),
full_buckets_calc as (
select *,
floor({{ elementary.edr_datediff('days_back_start', 'run_started', metric_properties.time_bucket.period) }} / {{ metric_properties.time_bucket.count }}) * {{ metric_properties.time_bucket.count }} as periods_until_max
floor({{ elementary.edr_datediff('days_back_start', 'detection_end', metric_properties.time_bucket.period) }} / {{ metric_properties.time_bucket.count }}) * {{ metric_properties.time_bucket.count }} as periods_until_max
from bucket_times
)
select
Expand All @@ -52,7 +59,7 @@
{{ elementary.edr_cast_as_timestamp(elementary.edr_timeadd(metric_properties.time_bucket.period, -1 * metric_properties.time_bucket.count, 'min(first_bucket_end)')) }} as min_existing_bucket_start,
{{ trunc_min_bucket_start_expr }} as days_back_start,
{{ backfill_bucket_start }} as backfill_start,
{{ run_start_expr }} as run_started
{{ detection_end_expr }} as detection_end
from {{ monitors_runs_relation }}
where upper(full_table_name) = upper('{{ full_table_name }}')
and metric_properties = {{ elementary.dict_to_quoted_json(metric_properties) }}
Expand All @@ -74,8 +81,8 @@
{# How many periods we need to add to last run time to get only full time buckets #}
case
when max_existing_bucket_end is not null and max_existing_bucket_end > days_back_start and min_existing_bucket_start <= days_back_start
then floor({{ elementary.edr_datediff('max_existing_bucket_end', 'run_started', metric_properties.time_bucket.period) }} / {{ metric_properties.time_bucket.count }}) * {{ metric_properties.time_bucket.count }}
else floor({{ elementary.edr_datediff('days_back_start', 'run_started', metric_properties.time_bucket.period) }} / {{ metric_properties.time_bucket.count }}) * {{ metric_properties.time_bucket.count }}
then floor({{ elementary.edr_datediff('max_existing_bucket_end', 'detection_end', metric_properties.time_bucket.period) }} / {{ metric_properties.time_bucket.count }}) * {{ metric_properties.time_bucket.count }}
else floor({{ elementary.edr_datediff('days_back_start', 'detection_end', metric_properties.time_bucket.period) }} / {{ metric_properties.time_bucket.count }}) * {{ metric_properties.time_bucket.count }}
end as periods_until_max
from bucket_times
)
Expand Down Expand Up @@ -112,4 +119,4 @@
{{ exceptions.raise_compiler_error("Failed to calc test buckets min and max") }}
{%- endif %}

{% endmacro %}
{% endmacro %}
6 changes: 4 additions & 2 deletions macros/edr/tests/test_all_columns_anomalies.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{% test all_columns_anomalies(model, column_anomalies, exclude_prefix, exclude_regexp, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity) %}
{% test all_columns_anomalies(model, column_anomalies, exclude_prefix, exclude_regexp, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity, detection_delay) %}
-- depends_on: {{ ref('monitors_runs') }}
-- depends_on: {{ ref('data_monitoring_metrics') }}
-- depends_on: {{ ref('dbt_run_results') }}
Expand Down Expand Up @@ -34,7 +34,8 @@
days_back=days_back,
backfill_days=backfill_days,
seasonality=seasonality,
sensitivity=sensitivity) %}
sensitivity=sensitivity,
detection_delay=detection_delay) %}
{%- if not test_configuration %}
{{ exceptions.raise_compiler_error("Failed to create test configuration dict for test `{}`".format(test_table_name)) }}
{%- endif %}
Expand All @@ -56,6 +57,7 @@
{%- set min_bucket_start, max_bucket_end = elementary.get_test_buckets_min_and_max(model_relation=model_relation,
backfill_days=test_configuration.backfill_days,
days_back=test_configuration.days_back,
detection_delay=test_configuration.detection_delay,
monitors=column_monitors,
column_name=column_name,
metric_properties=metric_properties) %}
Expand Down
6 changes: 4 additions & 2 deletions macros/edr/tests/test_column_anomalies.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{% test column_anomalies(model, column_name, column_anomalies, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity) %}
{% test column_anomalies(model, column_name, column_anomalies, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity, detection_delay) %}
-- depends_on: {{ ref('monitors_runs') }}
-- depends_on: {{ ref('data_monitoring_metrics') }}
-- depends_on: {{ ref('dbt_run_results') }}
Expand Down Expand Up @@ -32,7 +32,8 @@
days_back=days_back,
backfill_days=backfill_days,
seasonality=seasonality,
sensitivity=sensitivity) %}
sensitivity=sensitivity,
detection_delay=detection_delay) %}
{%- if not test_configuration %}
{{ exceptions.raise_compiler_error("Failed to create test configuration dict for test `{}`".format(test_table_name)) }}
{%- endif %}
Expand All @@ -50,6 +51,7 @@
{%- set min_bucket_start, max_bucket_end = elementary.get_test_buckets_min_and_max(model_relation=model_relation,
backfill_days=test_configuration.backfill_days,
days_back=test_configuration.days_back,
detection_delay=test_configuration.detection_delay,
monitors=column_monitors,
column_name=column_name,
metric_properties=metric_properties) %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
event_timestamp_column,
dimensions,
sensitivity,
ignore_small_changes) %}
ignore_small_changes,
detection_delay) %}

{%- set model_graph_node = elementary.get_model_graph_node(model_relation) %}

Expand All @@ -34,6 +35,7 @@
{%- set time_bucket = elementary.get_time_bucket(time_bucket, model_graph_node) %}
{%- set days_back = elementary.get_days_back(days_back, model_graph_node, seasonality) %}
{%- set seasonality = elementary.get_seasonality(seasonality, model_graph_node, time_bucket, timestamp_column) %}
{%- set detection_delay = elementary.get_detection_delay(detection_delay, model_graph_node) %}

{%- set ignore_small_changes = elementary.get_test_argument('ignore_small_changes', ignore_small_changes, model_graph_node) %}
{# Validate ignore_small_changes #}
Expand All @@ -51,7 +53,8 @@
'freshness_column': freshness_column,
'event_timestamp_column': event_timestamp_column,
'dimensions': dimensions,
'ignore_small_changes': ignore_small_changes
'ignore_small_changes': ignore_small_changes,
'detection_delay': detection_delay
} %}
{%- set test_configuration = elementary.empty_dict_keys_to_none(test_configuration) -%}
{%- do elementary.validate_mandatory_configuration(test_configuration, mandatory_params) -%}
Expand Down
72 changes: 72 additions & 0 deletions macros/edr/tests/test_configuration/get_detection_delay.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
{% macro get_no_detection_delay() %}
{% do return({"period": "hour", "count": 0}) %}
{% endmacro %}

{% macro get_default_detection_delay() %}
{% do return(elementary.get_no_detection_delay()) %}
{% endmacro %}

{% macro get_detection_delay_supported_periods() %}
{% do return(["hour", "day", "week"]) %}
{% endmacro %}

{% macro get_detection_delay(detection_delay, model_graph_node) %}
{%- set configured_detection_delay = elementary.get_test_argument('detection_delay', detection_delay, model_graph_node) %}
{%- do elementary.validate_detection_delay(configured_detection_delay) %}
{%- set default_detection_delay = elementary.get_default_detection_delay() %}

{%- if not configured_detection_delay %}
{{ return(default_detection_delay) }}
{%- else %}
{%- set detection_delay = default_detection_delay.copy() %}
{%- do detection_delay.update(configured_detection_delay) %}
{{ return(detection_delay) }}
{%- endif %}
{% endmacro %}

{% macro validate_detection_delay(detection_delay) %}
{% if detection_delay %}
{%- if detection_delay is not mapping %}
{% do exceptions.raise_compiler_error(
"
Invalid detection_delay format. Expected format:
detection_delay:
count: int
period: string
") %}
{%- endif %}
{%- if detection_delay is mapping %}
{%- set invalid_keys = [] %}
{%- set valid_keys = ['period', 'count'] %}
{%- for key, value in detection_delay.items() %}
{%- if key not in valid_keys %}
{%- do invalid_keys.append(key) -%}
{%- endif %}
{%- endfor %}
{%- if invalid_keys | length > 0 %}
{% do exceptions.raise_compiler_error(
("
Found invalid keys in detection_delay: {0}.
Supported keys: {1}.
Expected format:
detection_delay:
count: int
period: string
").format(invalid_keys, valid_keys)) %}
{%- endif %}
{%- endif %}

{% if detection_delay.count and detection_delay.count is not integer %}
{% do exceptions.raise_compiler_error("detection_delay.count expects valid integer, got: {} (If it's an integer, try to remove quotes)".format(detection_delay.count)) %}
{% endif %}
{% if detection_delay.count < 0 %}
{% do exceptions.raise_compiler_error("detection_delay.count can't be negative, got: {})".format(detection_delay.count)) %}
{% endif %}
{% set supported_periods = elementary.get_detection_delay_supported_periods() %}
{% if detection_delay.period and detection_delay.period not in supported_periods %}
{% do exceptions.raise_compiler_error("detection_delay.period value should be one of {0}, got: {1}".format(supported_periods, detection_delay.period)) %}
{% endif %}
{% endif %}
{% endmacro %}
Loading

4 comments on commit a4f8514

@devtrev-o
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason that "month" is excluded from get_detection_delay_supported_periods() when the documentation lists 'month' as a supported period? @francoisforster

@francoisforster
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only implemented it for hours, days and weeks as shown in get_detection_delay_supported_periods. Not sure why the documentation is different.

@devtrev-o
Copy link
Contributor

@devtrev-o devtrev-o commented on a4f8514 Oct 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking of implementing month as a supported period using 'relativedelta' instead of 'timedelta' for determing detection_end. Does that sound like a reasonable approach to you?

@francoisforster
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds reasonable

Please sign in to comment.