Skip to content

Commit

Permalink
anomalies: add support for anomaly_exclude_filter (#582)
Browse files Browse the repository at this point in the history
* anomalies: add support for anomalies_exclude_dates

* anomalies_exclude_dates: don't allow ranges without "before"

* anomalies_exclude_dates: bugfix

* anomalies_exclude_dates: several bugfixes, didn't work well for non-daily buckets

* test-warehouse: increase schema name prefix

* test_slower_rate_event_freshness: solve race hopefully

* test-warehouse: name the detailed report html like we name the artifact

* changed to "anomaly_exclude_metrics" - which is now a simple "where" expression

* get_anomaly_scores_query - remove unnecessary code

* get_test_argument: better logic

* add comment

* bugfixes
  • Loading branch information
haritamar authored Oct 24, 2023
1 parent 4113c0d commit 8fc09ad
Show file tree
Hide file tree
Showing 16 changed files with 242 additions and 34 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/test-warehouse.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ jobs:
run: |
mkdir -p ~/.dbt
DBT_VERSION=$(pip show dbt-core | grep -i version | awk '{print $2}' | sed 's/\.//g')
UNDERSCORED_REF_NAME=$(echo "${{ inputs.warehouse-type }}_dbt_${DBT_VERSION}_${BRANCH_NAME}" | head -c 32 | sed "s/-/_/g")
UNDERSCORED_REF_NAME=$(echo "${{ inputs.warehouse-type }}_dbt_${DBT_VERSION}_${BRANCH_NAME}" | head -c 40 | sed "s/-/_/g")
echo "$PROFILES_YML" | base64 -d | sed "s/<SCHEMA_NAME>/dbt_pkg_$UNDERSCORED_REF_NAME/g" > ~/.dbt/profiles.yml
- name: Check DWH connection
Expand All @@ -115,7 +115,7 @@ jobs:
- name: Test
working-directory: "${{ env.TESTS_DIR }}/tests"
run: py.test -n8 -vvv --target "${{ inputs.warehouse-type }}" --junit-xml=test-results.xml --html=detailed_report.html --self-contained-html
run: py.test -n8 -vvv --target "${{ inputs.warehouse-type }}" --junit-xml=test-results.xml --html=detailed_report_${{ inputs.warehouse-type }}_dbt_${{ inputs.dbt-version }}.html --self-contained-html

- name: Upload test results
if: always()
Expand All @@ -131,4 +131,4 @@ jobs:
uses: actions/upload-artifact@v3
with:
name: detailed_report_${{ inputs.warehouse-type }}_dbt_${{ inputs.dbt-version }}
path: ${{ env.TESTS_DIR }}/tests/detailed_report.html
path: ${{ env.TESTS_DIR }}/tests/detailed_report_${{ inputs.warehouse-type }}_dbt_${{ inputs.dbt-version }}.html
185 changes: 185 additions & 0 deletions integration_tests/tests/test_anomaly_exclude_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
from datetime import datetime, timedelta
from typing import Any, Dict, List

from data_generator import DATE_FORMAT, generate_dates
from dbt_project import DbtProject
from parametrization import Parametrization

TIMESTAMP_COLUMN = "updated_at"
DBT_TEST_NAME = "elementary.column_anomalies"
DBT_TEST_ARGS = {
"timestamp_column": TIMESTAMP_COLUMN,
"column_anomalies": ["sum"],
}


@Parametrization.autodetect_parameters()
@Parametrization.case(
name="daily_buckets",
time_bucket={"period": "day", "count": 1},
dates_step=timedelta(days=1),
)
@Parametrization.case(
name="six_hour_buckets",
time_bucket={"period": "hour", "count": 6},
dates_step=timedelta(hours=6),
)
def test_exclude_specific_dates(
test_id: str, dbt_project: DbtProject, time_bucket: dict, dates_step: timedelta
):
utc_now = datetime.utcnow()
test_bucket, *training_buckets = generate_dates(
base_date=utc_now - timedelta(1), step=dates_step
)

exclude_dates = [
(utc_now - timedelta(5)).date(),
(utc_now - timedelta(3)).date(),
]

data: List[Dict[str, Any]] = [
{TIMESTAMP_COLUMN: test_bucket.strftime(DATE_FORMAT), "metric": 10}
]
data += [
{
TIMESTAMP_COLUMN: cur_bucket.strftime(DATE_FORMAT),
"metric": 1 if cur_bucket.date() not in exclude_dates else 10,
}
for cur_bucket in training_buckets
]

test_args = {**DBT_TEST_ARGS, "time_bucket": time_bucket}
test_result = dbt_project.test(
test_id, DBT_TEST_NAME, test_args, data=data, test_column="metric"
)
assert test_result["status"] == "pass"

excluded_dates_str = ", ".join([f"'{cur_date}'" for cur_date in exclude_dates])
test_args = {
**DBT_TEST_ARGS,
"anomaly_exclude_metrics": f"metric_date in ({excluded_dates_str})",
"time_bucket": time_bucket,
}
test_result = dbt_project.test(
test_id, DBT_TEST_NAME, test_args, test_column="metric"
)
assert test_result["status"] == "fail"


def test_exclude_specific_timestamps(test_id: str, dbt_project: DbtProject):
# To avoid races, set the "custom_started_at" to the beginning of the hour
test_started_at = datetime.utcnow().replace(minute=0, second=0)

test_bucket, *training_buckets = generate_dates(
base_date=test_started_at - timedelta(hours=1),
step=timedelta(hours=1),
days_back=1,
)

excluded_buckets = [
test_started_at - timedelta(hours=22),
test_started_at - timedelta(hours=20),
]

data: List[Dict[str, Any]] = [
{TIMESTAMP_COLUMN: test_bucket.strftime(DATE_FORMAT), "metric": 10}
]
data += [
{
TIMESTAMP_COLUMN: cur_bucket.strftime(DATE_FORMAT),
"metric": 1 if cur_bucket not in excluded_buckets else 10,
}
for cur_bucket in training_buckets
]

time_bucket = {"period": "hour", "count": 1}
test_args = {**DBT_TEST_ARGS, "time_bucket": time_bucket, "days_back": 1}
test_result = dbt_project.test(
test_id, DBT_TEST_NAME, test_args, data=data, test_column="metric"
)
assert test_result["status"] == "pass"

excluded_buckets_str = ", ".join(
["'%s'" % cur_ts.strftime(DATE_FORMAT) for cur_ts in excluded_buckets]
)
test_args = {
**DBT_TEST_ARGS,
"time_bucket": time_bucket,
"days_back": 1,
"anomaly_exclude_metrics": f"metric_time_bucket in ({excluded_buckets_str})",
}
test_result = dbt_project.test(
test_id,
DBT_TEST_NAME,
test_args,
test_column="metric",
test_vars={"custom_run_started_at": test_started_at.isoformat()},
)
assert test_result["status"] == "fail"


def test_exclude_date_range(test_id: str, dbt_project: DbtProject):
utc_today = datetime.utcnow().date()
test_date, *training_dates = generate_dates(base_date=utc_today - timedelta(1))

start_date = utc_today - timedelta(6)
end_date = utc_today - timedelta(3)

data: List[Dict[str, Any]] = [
{TIMESTAMP_COLUMN: test_date.strftime(DATE_FORMAT), "metric": 10}
]
data += [
{
TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT),
"metric": 1 if cur_date < start_date or cur_date > end_date else 10,
}
for cur_date in training_dates
]

test_args = {**DBT_TEST_ARGS, "days_back": 30}
test_result = dbt_project.test(
test_id, DBT_TEST_NAME, test_args, data=data, test_column="metric"
)
assert test_result["status"] == "pass"

test_args = {
**DBT_TEST_ARGS,
"anomaly_exclude_metrics": f"metric_date >= '{start_date}' and metric_date <= '{end_date}'",
"days_back": 30,
}
test_result = dbt_project.test(
test_id, DBT_TEST_NAME, test_args, test_column="metric"
)
assert test_result["status"] == "fail"


def test_exclude_by_metric_value(test_id: str, dbt_project: DbtProject):
utc_today = datetime.utcnow().date()
test_date, *training_dates = generate_dates(base_date=utc_today - timedelta(1))

data: List[Dict[str, Any]] = [
{TIMESTAMP_COLUMN: test_date.strftime(DATE_FORMAT), "metric": 10}
]
data += [
{
TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT),
"metric": 1 if cur_date.day % 3 > 0 else 10,
}
for cur_date in training_dates
]

test_args = {**DBT_TEST_ARGS, "days_back": 30}
test_result = dbt_project.test(
test_id, DBT_TEST_NAME, test_args, data=data, test_column="metric"
)
assert test_result["status"] == "pass"

test_args = {
**DBT_TEST_ARGS,
"anomaly_exclude_metrics": f"metric_date < '{test_date}' and metric_value >= 5",
"days_back": 30,
}
test_result = dbt_project.test(
test_id, DBT_TEST_NAME, test_args, test_column="metric"
)
assert test_result["status"] == "fail"
8 changes: 6 additions & 2 deletions integration_tests/tests/test_event_freshness_anomalies.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ def test_stop_event_freshness(test_id: str, dbt_project: DbtProject):


def test_slower_rate_event_freshness(test_id: str, dbt_project: DbtProject):
anomaly_date = datetime.now() - timedelta(days=1)
# To avoid races, set the "custom_started_at" to the beginning of the hour
test_started_at = datetime.utcnow().replace(minute=0, second=0)

anomaly_date = test_started_at - timedelta(days=1)
data = [
{
EVENT_TIMESTAMP_COLUMN: date.strftime(DATE_FORMAT),
Expand All @@ -64,7 +67,7 @@ def test_slower_rate_event_freshness(test_id: str, dbt_project: DbtProject):
EVENT_TIMESTAMP_COLUMN: date.strftime(DATE_FORMAT),
UPDATE_TIMESTAMP_COLUMN: (date + STEP).strftime(DATE_FORMAT),
}
for date in generate_dates(datetime.now(), step=STEP, days_back=1)
for date in generate_dates(test_started_at, step=STEP, days_back=1)
]
data.extend(slow_data)
result = dbt_project.test(
Expand All @@ -75,5 +78,6 @@ def test_slower_rate_event_freshness(test_id: str, dbt_project: DbtProject):
update_timestamp_column=UPDATE_TIMESTAMP_COLUMN,
),
data=data,
test_vars={"custom_run_started_at": test_started_at.isoformat()},
)
assert result["status"] == "fail"
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
{%- set detection_end = elementary.get_detection_end(test_configuration.detection_delay) %}
{%- set min_bucket_start_expr = elementary.get_trunc_min_bucket_start_expr(detection_end, metric_properties, test_configuration.days_back) %}

{# For timestamped tests, this will be the bucket start, and for non-timestamped tests it will be the
bucket end (which is the actual time of the test) #}
{%- set metric_time_bucket_expr = 'case when bucket_start is not null then bucket_start else bucket_end end' %}

{%- set anomaly_scores_query %}

with data_monitoring_metrics as (
Expand Down Expand Up @@ -93,6 +97,11 @@
updated_at,
dimension,
dimension_value,

-- Fields added for the anomaly_exclude_metrics expression used below
{{ metric_time_bucket_expr }} as metric_time_bucket,
{{ elementary.edr_cast_as_date(elementary.edr_date_trunc('day', metric_time_bucket_expr))}} as metric_date,

row_number() over (partition by id order by updated_at desc) as row_number
from union_metrics

Expand All @@ -112,11 +121,11 @@
bucket_start,
bucket_end,
{{ bucket_seasonality_expr }} as bucket_seasonality,
{{ test_configuration.anomaly_exclude_metrics or 'FALSE' }} as is_excluded,
bucket_duration_hours,
updated_at
from grouped_metrics_duplicates
where row_number = 1

),

time_window_aggregation as (
Expand All @@ -141,6 +150,7 @@
last_value(bucket_end) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) training_end,
first_value(bucket_end) over (partition by metric_name, full_table_name, column_name, dimension, dimension_value, bucket_seasonality order by bucket_end asc rows between unbounded preceding and current row) as training_start
from grouped_metrics
where not is_excluded
{{ dbt_utils.group_by(13) }}
),

Expand Down
3 changes: 2 additions & 1 deletion macros/edr/system/system_utils/get_config_var.sql
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@
'drop_failure_percent_threshold': none
},
'include_other_warehouse_specific_columns': false,
'fail_on_zero': false
'fail_on_zero': false,
'anomaly_exclude_metrics': none
} %}
{{- return(default_config) -}}
{%- endmacro -%}
Expand Down
7 changes: 4 additions & 3 deletions macros/edr/system/system_utils/get_test_argument.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
{%- endif %}
{% endif %}
{% endif %}
{%- if elementary.get_config_var(argument_name) %}
{{ return(elementary.get_config_var(argument_name)) }}
{% set config_value = elementary.get_config_var(argument_name) %}
{% if config_value is defined %}
{% do return(config_value) %}
{% endif %}
{{ return(none) }}
{% do return(none) %}
{% endmacro %}
5 changes: 3 additions & 2 deletions macros/edr/tests/test_all_columns_anomalies.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{% test all_columns_anomalies(model, column_anomalies, exclude_prefix, exclude_regexp, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity, detection_delay) %}
{% test all_columns_anomalies(model, column_anomalies, exclude_prefix, exclude_regexp, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity, detection_delay, anomaly_exclude_metrics) %}
-- depends_on: {{ ref('monitors_runs') }}
-- depends_on: {{ ref('data_monitoring_metrics') }}
-- depends_on: {{ ref('dbt_run_results') }}
Expand Down Expand Up @@ -35,7 +35,8 @@
backfill_days=backfill_days,
seasonality=seasonality,
sensitivity=sensitivity,
detection_delay=detection_delay) %}
detection_delay=detection_delay,
anomaly_exclude_metrics=anomaly_exclude_metrics) %}
{%- if not test_configuration %}
{{ exceptions.raise_compiler_error("Failed to create test configuration dict for test `{}`".format(test_table_name)) }}
{%- endif %}
Expand Down
5 changes: 3 additions & 2 deletions macros/edr/tests/test_column_anomalies.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{% test column_anomalies(model, column_name, column_anomalies, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity, detection_delay) %}
{% test column_anomalies(model, column_name, column_anomalies, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity, detection_delay, anomaly_exclude_metrics) %}
-- depends_on: {{ ref('monitors_runs') }}
-- depends_on: {{ ref('data_monitoring_metrics') }}
-- depends_on: {{ ref('dbt_run_results') }}
Expand Down Expand Up @@ -33,7 +33,8 @@
backfill_days=backfill_days,
seasonality=seasonality,
sensitivity=sensitivity,
detection_delay=detection_delay) %}
detection_delay=detection_delay,
anomaly_exclude_metrics=anomaly_exclude_metrics) %}
{%- if not test_configuration %}
{{ exceptions.raise_compiler_error("Failed to create test configuration dict for test `{}`".format(test_table_name)) }}
{%- endif %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
sensitivity,
ignore_small_changes,
fail_on_zero,
detection_delay) %}
detection_delay,
anomaly_exclude_metrics) %}

{%- set model_graph_node = elementary.get_model_graph_node(model_relation) %}

Expand All @@ -42,6 +43,8 @@
{%- set ignore_small_changes = elementary.get_test_argument('ignore_small_changes', ignore_small_changes, model_graph_node) %}
{# Validate ignore_small_changes #}

{% set anomaly_exclude_metrics = elementary.get_test_argument('anomaly_exclude_metrics', anomaly_exclude_metrics, model_graph_node) %}

{% set test_configuration =
{'timestamp_column': timestamp_column,
'where_expression': where_expression,
Expand All @@ -57,9 +60,10 @@
'dimensions': dimensions,
'ignore_small_changes': ignore_small_changes,
'fail_on_zero': fail_on_zero,
'detection_delay': detection_delay
'detection_delay': detection_delay,
'anomaly_exclude_metrics': anomaly_exclude_metrics
} %}
{%- set test_configuration = elementary.empty_dict_keys_to_none(test_configuration) -%}
{%- set test_configuration = elementary.undefined_dict_keys_to_none(test_configuration) -%}
{%- do elementary.validate_mandatory_configuration(test_configuration, mandatory_params) -%}
{%- do elementary.validate_ignore_small_changes(test_configuration) -%}

Expand All @@ -73,7 +77,7 @@
'event_timestamp_column': event_timestamp_column,
'dimensions': dimensions
} %}
{%- set metric_properties = elementary.empty_dict_keys_to_none(metric_properties) -%}
{%- set metric_properties = elementary.undefined_dict_keys_to_none(metric_properties) -%}

{# Adding to cache so test configuration will be available outside the test context #}
{%- set test_unique_id = elementary.get_test_unique_id() %}
Expand Down
5 changes: 3 additions & 2 deletions macros/edr/tests/test_dimension_anomalies.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{% test dimension_anomalies(model, dimensions, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity, detection_delay) %}
{% test dimension_anomalies(model, dimensions, timestamp_column, where_expression, anomaly_sensitivity, anomaly_direction, min_training_set_size, time_bucket, days_back, backfill_days, seasonality, sensitivity, detection_delay, anomaly_exclude_metrics) %}
-- depends_on: {{ ref('monitors_runs') }}
-- depends_on: {{ ref('data_monitoring_metrics') }}
-- depends_on: {{ ref('dbt_run_results') }}
Expand Down Expand Up @@ -36,7 +36,8 @@
seasonality=seasonality,
dimensions=dimensions,
sensitivity=sensitivity,
detection_delay=detection_delay,) %}
detection_delay=detection_delay,
anomaly_exclude_metrics=anomaly_exclude_metrics) %}
{%- if not test_configuration %}
{{ exceptions.raise_compiler_error("Failed to create test configuration dict for test `{}`".format(test_table_name)) }}
{%- endif %}
Expand Down
Loading

0 comments on commit 8fc09ad

Please sign in to comment.