Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test detection delay #541

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test-warehouse.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ jobs:

- name: Test
working-directory: "${{ env.TESTS_DIR }}/tests"
run: py.test -n8 -vvv --target "${{ inputs.warehouse-type }}" --junit-xml=test-results.xml --html=detailed_report.html --self-contained-html
run: py.test -n8 -vvv --target "${{ inputs.warehouse-type }}" --junit-xml=test-results.xml --html=detailed_report.html --self-contained-html -k "test_stop_with_delay"

- name: Upload test results
if: always()
Expand Down
54 changes: 50 additions & 4 deletions integration_tests/tests/test_freshness_anomalies.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from copy import copy
from dataclasses import dataclass
from datetime import datetime, timedelta

Expand All @@ -16,22 +17,39 @@ class FreshnessAnomaliesConfig:
step: timedelta
days_back: int
backfill_days: int
detection_delay_hours: int


HOURLY_CONFIG = FreshnessAnomaliesConfig(
period="hour", step=timedelta(minutes=10), days_back=14, backfill_days=2
period="hour",
step=timedelta(minutes=10),
days_back=14,
backfill_days=2,
detection_delay_hours=0,
)

DAILY_CONFIG = FreshnessAnomaliesConfig(
period="day", step=timedelta(hours=2), days_back=30, backfill_days=3
period="day",
step=timedelta(hours=2),
days_back=30,
backfill_days=3,
detection_delay_hours=0,
)

WEEKLY_CONFIG = FreshnessAnomaliesConfig(
period="week", step=timedelta(hours=12), days_back=7 * 15, backfill_days=14
period="week",
step=timedelta(hours=12),
days_back=7 * 15,
backfill_days=14,
detection_delay_hours=0,
)

MONTHLY_CONFIG = FreshnessAnomaliesConfig(
period="month", step=timedelta(days=2), days_back=30 * 15, backfill_days=60
period="month",
step=timedelta(days=2),
days_back=30 * 15,
backfill_days=60,
detection_delay_hours=0,
)


Expand All @@ -47,6 +65,7 @@ def _get_test_config(self, config: FreshnessAnomaliesConfig) -> dict:
days_back=config.days_back,
backfill_days=config.backfill_days,
time_bucket=dict(period=config.period, count=1),
detection_delay=dict(period="hour", count=config.detection_delay_hours),
)

def _skip_redshift_monthly(
Expand Down Expand Up @@ -94,6 +113,33 @@ def test_stop(
)
assert result["status"] == "fail"

def test_stop_with_delay(
self,
test_id: str,
dbt_project: DbtProject,
config: FreshnessAnomaliesConfig,
target: str,
):
self._skip_redshift_monthly(target, config)
now = datetime.utcnow()
anomaly_date = datetime.now() - timedelta(days=config.backfill_days)
data = [
{TIMESTAMP_COLUMN: date.strftime(DATE_FORMAT)}
for date in generate_dates(
anomaly_date, step=config.step, days_back=(config.days_back)
)
]
delayed_config = copy(config)
delayed_config.detection_delay_hours = 24 * config.backfill_days + 1
result = dbt_project.test(
test_id,
TEST_NAME,
self._get_test_config(delayed_config),
data=data,
test_vars={"custom_run_started_at": now.isoformat()},
)
assert result["status"] == "pass"

def test_slower_rate(
self,
test_id: str,
Expand Down
8 changes: 7 additions & 1 deletion integration_tests/tests/test_volume_anomalies.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ def test_volume_anomalies_with_sensitivity(test_id: str, dbt_project: DbtProject

def test_volume_anomalies_no_timestamp(test_id: str, dbt_project: DbtProject):
data = [{"hello": "world"}]
now = datetime.utcnow()
min_training_set_size = 4
test_args = {
# Using smaller training set size to avoid needing to run many tests.
Expand All @@ -179,7 +180,12 @@ def test_volume_anomalies_no_timestamp(test_id: str, dbt_project: DbtProject):
}
dbt_project.seed(data, test_id)
for _ in range(min_training_set_size):
test_result = dbt_project.test(test_id, DBT_TEST_NAME, test_args)
test_result = dbt_project.test(
test_id,
DBT_TEST_NAME,
test_args,
test_vars={"custom_run_started_at": now.isoformat()},
)
assert test_result["status"] == "pass"

dbt_project.seed(data * 2, test_id)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{% macro get_anomaly_scores_query(test_metrics_table_relation, model_relation, test_configuration, monitors, column_name = none, columns_only = false, metric_properties = none, data_monitoring_metrics_table=none) %}
{% macro get_anomaly_scores_query(test_metrics_table_relation, model_relation, test_configuration, monitors, min_bucket_start, max_bucket_end, column_name = none, columns_only = false, metric_properties = none, data_monitoring_metrics_table=none) %}
{%- set model_graph_node = elementary.get_model_graph_node(model_relation) %}
{%- set full_table_name = elementary.model_node_to_full_name(model_graph_node) %}
{%- set test_execution_id = elementary.get_test_execution_id() %}
Expand Down Expand Up @@ -28,7 +28,6 @@
{%- else %}
{%- set bucket_seasonality_expr = elementary.const_as_text('no_seasonality') %}
{%- endif %}
{%- set min_bucket_start_expr = elementary.get_trunc_min_bucket_start_expr(metric_properties, test_configuration.days_back) %}

{%- set anomaly_scores_query %}

Expand All @@ -51,7 +50,7 @@
from {{ data_monitoring_metrics_table }}
{# We use bucket_end because non-timestamp tests have only bucket_end field. #}
where
bucket_end > {{ min_bucket_start_expr }}
bucket_end > {{ min_bucket_start }} and bucket_end <= {{ max_bucket_end }}
and metric_properties = {{ elementary.dict_to_quoted_json(metric_properties) }}
{% if latest_full_refresh %}
and updated_at > {{ elementary.edr_cast_as_timestamp(elementary.edr_quote(latest_full_refresh)) }}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
{% macro get_trunc_min_bucket_start_expr(metric_properties, days_back) %}
{%- set untruncated_min = (elementary.get_run_started_at() - modules.datetime.timedelta(days_back | int)).strftime("%Y-%m-%d 00:00:00") %}
{% macro get_detection_end(detection_delay) %}
{%- set kwargs = {detection_delay.period+'s': detection_delay.count} %}
{%- set detection_end = elementary.get_run_started_at() - modules.datetime.timedelta(**kwargs) %}
{{ return(detection_end) }}
{% endmacro %}

{% macro get_trunc_min_bucket_start_expr(detection_end, metric_properties, days_back) %}
{%- set untruncated_min = (detection_end - modules.datetime.timedelta(days_back | int)).strftime("%Y-%m-%d 00:00:00") %}
{%- set trunc_min_bucket_start_expr = elementary.edr_date_trunc(metric_properties.time_bucket.period, elementary.edr_cast_as_timestamp(elementary.edr_quote(untruncated_min)))%}
{{ return(trunc_min_bucket_start_expr) }}
{% endmacro %}

{# This macro can't be used without truncating to full buckets #}
{% macro get_backfill_bucket_start(backfill_days, metric_properties) %}
{% do return((elementary.get_run_started_at() - modules.datetime.timedelta(backfill_days)).strftime("%Y-%m-%d 00:00:00")) %}
{% macro get_backfill_bucket_start(detection_end, backfill_days) %}
{% do return((detection_end - modules.datetime.timedelta(backfill_days)).strftime("%Y-%m-%d 00:00:00")) %}
{% endmacro %}


{% macro get_test_buckets_min_and_max(model_relation, backfill_days, days_back, monitors=none, column_name=none, metric_properties=none, unit_test=false, unit_test_relation=none) %}
{% macro get_test_buckets_min_and_max(model_relation, backfill_days, days_back, detection_delay, monitors=none, column_name=none, metric_properties=none, unit_test=false, unit_test_relation=none) %}

{%- set run_start_expr = elementary.edr_cast_as_timestamp(elementary.edr_quote(elementary.run_started_at_as_string())) %}
{%- set trunc_min_bucket_start_expr = elementary.get_trunc_min_bucket_start_expr(metric_properties, days_back) %}
{%- set backfill_bucket_start = elementary.edr_cast_as_timestamp(elementary.edr_quote(elementary.get_backfill_bucket_start(backfill_days))) %}
{%- set detection_end = elementary.get_detection_end(detection_delay) %}
{%- set detection_end_expr = elementary.edr_cast_as_timestamp(elementary.edr_quote(detection_end)) %}
{%- set trunc_min_bucket_start_expr = elementary.get_trunc_min_bucket_start_expr(detection_end, metric_properties, days_back) %}
{%- set backfill_bucket_start = elementary.edr_cast_as_timestamp(elementary.edr_quote(elementary.get_backfill_bucket_start(detection_end, backfill_days))) %}
{%- set full_table_name = elementary.relation_to_full_name(model_relation) %}
{%- set force_metrics_backfill = elementary.get_config_var('force_metrics_backfill') %}

Expand All @@ -32,11 +39,11 @@
with bucket_times as (
select
{{ trunc_min_bucket_start_expr }} as days_back_start
, {{ run_start_expr }} as run_started
, {{ detection_end_expr }} as detection_end
),
full_buckets_calc as (
select *,
floor({{ elementary.edr_datediff('days_back_start', 'run_started', metric_properties.time_bucket.period) }} / {{ metric_properties.time_bucket.count }}) * {{ metric_properties.time_bucket.count }} as periods_until_max
floor({{ elementary.edr_datediff('days_back_start', 'detection_end', metric_properties.time_bucket.period) }} / {{ metric_properties.time_bucket.count }}) * {{ metric_properties.time_bucket.count }} as periods_until_max
from bucket_times
)
select
Expand All @@ -52,7 +59,7 @@
{{ elementary.edr_cast_as_timestamp(elementary.edr_timeadd(metric_properties.time_bucket.period, -1 * metric_properties.time_bucket.count, 'min(first_bucket_end)')) }} as min_existing_bucket_start,
{{ trunc_min_bucket_start_expr }} as days_back_start,
{{ backfill_bucket_start }} as backfill_start,
{{ run_start_expr }} as run_started
{{ detection_end_expr }} as detection_end
from {{ monitors_runs_relation }}
where upper(full_table_name) = upper('{{ full_table_name }}')
and metric_properties = {{ elementary.dict_to_quoted_json(metric_properties) }}
Expand All @@ -74,8 +81,8 @@
{# How many periods we need to add to last run time to get only full time buckets #}
case
when max_existing_bucket_end is not null and max_existing_bucket_end > days_back_start and min_existing_bucket_start <= days_back_start
then floor({{ elementary.edr_datediff('max_existing_bucket_end', 'run_started', metric_properties.time_bucket.period) }} / {{ metric_properties.time_bucket.count }}) * {{ metric_properties.time_bucket.count }}
else floor({{ elementary.edr_datediff('days_back_start', 'run_started', metric_properties.time_bucket.period) }} / {{ metric_properties.time_bucket.count }}) * {{ metric_properties.time_bucket.count }}
then floor({{ elementary.edr_datediff('max_existing_bucket_end', 'detection_end', metric_properties.time_bucket.period) }} / {{ metric_properties.time_bucket.count }}) * {{ metric_properties.time_bucket.count }}
else floor({{ elementary.edr_datediff('days_back_start', 'detection_end', metric_properties.time_bucket.period) }} / {{ metric_properties.time_bucket.count }}) * {{ metric_properties.time_bucket.count }}
end as periods_until_max
from bucket_times
)
Expand Down Expand Up @@ -112,4 +119,4 @@
{{ exceptions.raise_compiler_error("Failed to calc test buckets min and max") }}
{%- endif %}

{% endmacro %}
{% endmacro %}
6 changes: 4 additions & 2 deletions macros/edr/tests/test_all_columns_anomalies.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{% test all_columns_anomalies(model, column_anomalies, exclude_prefix, exclude_regexp, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity) %}
{% test all_columns_anomalies(model, column_anomalies, exclude_prefix, exclude_regexp, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity, detection_delay) %}
-- depends_on: {{ ref('monitors_runs') }}
-- depends_on: {{ ref('data_monitoring_metrics') }}
-- depends_on: {{ ref('dbt_run_results') }}
Expand Down Expand Up @@ -34,7 +34,8 @@
days_back=days_back,
backfill_days=backfill_days,
seasonality=seasonality,
sensitivity=sensitivity) %}
sensitivity=sensitivity,
detection_delay=detection_delay) %}
{%- if not test_configuration %}
{{ exceptions.raise_compiler_error("Failed to create test configuration dict for test `{}`".format(test_table_name)) }}
{%- endif %}
Expand All @@ -56,6 +57,7 @@
{%- set min_bucket_start, max_bucket_end = elementary.get_test_buckets_min_and_max(model_relation=model_relation,
backfill_days=test_configuration.backfill_days,
days_back=test_configuration.days_back,
detection_delay=test_configuration.detection_delay,
monitors=column_monitors,
column_name=column_name,
metric_properties=metric_properties) %}
Expand Down
6 changes: 4 additions & 2 deletions macros/edr/tests/test_column_anomalies.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{% test column_anomalies(model, column_name, column_anomalies, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity) %}
{% test column_anomalies(model, column_name, column_anomalies, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity, detection_delay) %}
-- depends_on: {{ ref('monitors_runs') }}
-- depends_on: {{ ref('data_monitoring_metrics') }}
-- depends_on: {{ ref('dbt_run_results') }}
Expand Down Expand Up @@ -32,7 +32,8 @@
days_back=days_back,
backfill_days=backfill_days,
seasonality=seasonality,
sensitivity=sensitivity) %}
sensitivity=sensitivity,
detection_delay=detection_delay) %}
{%- if not test_configuration %}
{{ exceptions.raise_compiler_error("Failed to create test configuration dict for test `{}`".format(test_table_name)) }}
{%- endif %}
Expand All @@ -50,6 +51,7 @@
{%- set min_bucket_start, max_bucket_end = elementary.get_test_buckets_min_and_max(model_relation=model_relation,
backfill_days=test_configuration.backfill_days,
days_back=test_configuration.days_back,
detection_delay=test_configuration.detection_delay,
monitors=column_monitors,
column_name=column_name,
metric_properties=metric_properties) %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
event_timestamp_column,
dimensions,
sensitivity,
ignore_small_changes) %}
ignore_small_changes,
detection_delay) %}

{%- set model_graph_node = elementary.get_model_graph_node(model_relation) %}

Expand All @@ -34,6 +35,7 @@
{%- set time_bucket = elementary.get_time_bucket(time_bucket, model_graph_node) %}
{%- set days_back = elementary.get_days_back(days_back, model_graph_node, seasonality) %}
{%- set seasonality = elementary.get_seasonality(seasonality, model_graph_node, time_bucket, timestamp_column) %}
{%- set detection_delay = elementary.get_detection_delay(detection_delay, model_graph_node) %}

{%- set ignore_small_changes = elementary.get_test_argument('ignore_small_changes', ignore_small_changes, model_graph_node) %}
{# Validate ignore_small_changes #}
Expand All @@ -51,7 +53,8 @@
'freshness_column': freshness_column,
'event_timestamp_column': event_timestamp_column,
'dimensions': dimensions,
'ignore_small_changes': ignore_small_changes
'ignore_small_changes': ignore_small_changes,
'detection_delay': detection_delay
} %}
{%- set test_configuration = elementary.empty_dict_keys_to_none(test_configuration) -%}
{%- do elementary.validate_mandatory_configuration(test_configuration, mandatory_params) -%}
Expand Down
Loading
Loading