From 8fbc24f4e7597dc78599a52f33f9ea4e12797558 Mon Sep 17 00:00:00 2001 From: Itamar Hartstein Date: Tue, 12 Sep 2023 13:52:19 +0300 Subject: [PATCH] Ele 1694 bugfixes + tests for metric backfill logic (#522) * get_test_buckets_min_and_max: bugfixes + clearer names * get_config_var: make the min_training_set_size much smaller than days_back * test_table_anomalies: log fix * tests: add coverage for metrics backfill logic in various cases * test-warehouse: remove --pre * test fixes * test fixes * get_buckets_configuration: add cast as timestamp to deal with BQ issue when the value is null * tests fix: don't use ilike since BQ doesn't have it * tests: copy the dbt project to a temp dir, to avoid races * CI: allow running with --pre as a parameter * CI: bugfix * CI: bugfix * tests: only copy the dbt project dir once per process * CI: add daily workflow for running on pre-releases * CI: bugfix * CI: bugfix * tests: temp test - remove project copy to see if it speeds things up * temporary test number #2 - don't run incremental query * return to regular logic * add HTML report as artifact * make HTML report self contained * test-warehouse: make report artifact unique * tests: use UTC today not date.today() * test fixes * tests performance optimization + add force_metrics_backfill var --- .../test-all-warehouses-dbt-pre-releases.yml | 12 + .github/workflows/test-all-warehouses.yml | 16 +- .github/workflows/test-warehouse.yml | 17 +- integration_tests/requirements.txt | 1 + integration_tests/tests/conftest.py | 31 ++- integration_tests/tests/data_seeder.py | 12 +- integration_tests/tests/dbt_project.py | 64 +++-- integration_tests/tests/env.py | 8 +- .../tests/test_all_columns_anomalies.py | 23 +- .../tests/test_anomalies_backfill_logic.py | 262 ++++++++++++++++++ .../tests/test_column_anomalies.py | 25 +- .../tests/test_dimension_anomalies.py | 19 +- .../tests/test_volume_anomalies.py | 62 +++-- .../get_buckets_configuration.sql | 31 ++- .../system/system_utils/get_config_var.sql | 3 +- macros/edr/tests/test_table_anomalies.sql | 2 +- 16 files changed, 491 insertions(+), 97 deletions(-) create mode 100644 .github/workflows/test-all-warehouses-dbt-pre-releases.yml create mode 100644 integration_tests/tests/test_anomalies_backfill_logic.py diff --git a/.github/workflows/test-all-warehouses-dbt-pre-releases.yml b/.github/workflows/test-all-warehouses-dbt-pre-releases.yml new file mode 100644 index 000000000..7e50270a0 --- /dev/null +++ b/.github/workflows/test-all-warehouses-dbt-pre-releases.yml @@ -0,0 +1,12 @@ +name: Test all warehouse platforms on dbt pre-releases +on: + schedule: + - cron: "0 6 * * *" + workflow_dispatch: + +jobs: + test: + uses: ./.github/workflows/test-all-warehouses.yml + secrets: inherit + with: + dbt-version: latest_pre diff --git a/.github/workflows/test-all-warehouses.yml b/.github/workflows/test-all-warehouses.yml index 17ef46edb..0ac9033c3 100644 --- a/.github/workflows/test-all-warehouses.yml +++ b/.github/workflows/test-all-warehouses.yml @@ -20,12 +20,24 @@ on: required: false description: Branch or tag to checkout for 'dbt-data-reliability' repository + workflow_call: + inputs: + dbt-version: + type: string + required: false + elementary-ref: + type: string + required: false + dbt-data-reliability-ref: + type: string + required: false + jobs: test: strategy: fail-fast: false matrix: - dbt-version: ${{ inputs.dbt-version && fromJSON(format('["{0}"]', inputs.dbt-version)) || fromJSON('["1.3.0", null]') }} + dbt-version: ${{ inputs.dbt-version && fromJSON(format('["{0}"]', inputs.dbt-version)) || fromJSON('["1.3.0", "latest_official"]') }} warehouse-type: [ postgres, @@ -58,4 +70,4 @@ jobs: with: result: "failure" run_id: ${{ github.run_id }} - workflow_name: "Test all warehouse platforms" + workflow_name: ${{ github.workflow }} diff --git a/.github/workflows/test-warehouse.yml b/.github/workflows/test-warehouse.yml index 768f005e3..3d5217bb0 100644 --- a/.github/workflows/test-warehouse.yml +++ b/.github/workflows/test-warehouse.yml @@ -25,6 +25,7 @@ on: dbt-version: type: string required: false + default: "latest_official" description: dbt's version to test with workflow_call: @@ -40,6 +41,7 @@ on: required: false dbt-version: type: string + default: "latest_official" required: false env: @@ -82,9 +84,10 @@ jobs: run: sudo apt-get install python-dev libsasl2-dev gcc - name: Install dbt - run: pip install --pre - "dbt-core${{ inputs.dbt-version && format('=={0}', inputs.dbt-version) }}" - "dbt-${{ (inputs.warehouse-type == 'databricks_catalog' && 'databricks') || (inputs.warehouse-type == 'spark' && 'spark[PyHive]') || inputs.warehouse-type }}${{ inputs.dbt-version && format('<={0}', inputs.dbt-version) }}" + run: + pip install${{ (inputs.dbt-version == 'latest_pre' && ' --pre') || '' }} + "dbt-core${{ (!startsWith(inputs.dbt-version, 'latest') && format('=={0}', inputs.dbt-version)) || '' }}" + "dbt-${{ (inputs.warehouse-type == 'databricks_catalog' && 'databricks') || (inputs.warehouse-type == 'spark' && 'spark[PyHive]') || inputs.warehouse-type }}${{ (!startsWith(inputs.dbt-version, 'latest') && format('<={0}', inputs.dbt-version)) || '' }}" - name: Install Elementary run: pip install "./elementary[${{ (inputs.warehouse-type == 'databricks_catalog' && 'databricks') || inputs.warehouse-type }}]" @@ -111,7 +114,7 @@ jobs: - name: Test working-directory: "${{ env.TESTS_DIR }}/tests" - run: py.test -n8 -vvv --target "${{ inputs.warehouse-type }}" --junit-xml=test-results.xml + run: py.test -n8 -vvv --target "${{ inputs.warehouse-type }}" --junit-xml=test-results.xml --html=detailed_report.html --self-contained-html - name: Upload test results if: always() @@ -121,3 +124,9 @@ jobs: summary: true display-options: fEX fail-on-empty: true + + - name: Upload HTML report + uses: actions/upload-artifact@v3 + with: + name: detailed_report_${{ inputs.warehouse-type }}_dbt_${{ inputs.dbt-version }} + path: ${{ env.TESTS_DIR }}/tests/detailed_report.html diff --git a/integration_tests/requirements.txt b/integration_tests/requirements.txt index 7b78ea5da..0be2602c5 100644 --- a/integration_tests/requirements.txt +++ b/integration_tests/requirements.txt @@ -1,5 +1,6 @@ pytest pytest-xdist pytest-parametrization +pytest-html filelock urllib3==2.0.4 \ No newline at end of file diff --git a/integration_tests/tests/conftest.py b/integration_tests/tests/conftest.py index d5e09b952..8bcd4ebcd 100644 --- a/integration_tests/tests/conftest.py +++ b/integration_tests/tests/conftest.py @@ -1,3 +1,7 @@ +import shutil +from pathlib import Path +from tempfile import mkdtemp + import env import pytest from dbt.version import __version__ as dbt_version @@ -5,16 +9,33 @@ from filelock import FileLock from packaging import version +DBT_PROJECT_PATH = Path(__file__).parent.parent / "dbt_project" + def pytest_addoption(parser): parser.addoption("--target", action="store", default="postgres") +@pytest.fixture(scope="session") +def project_dir_copy(): + dbt_project_copy_dir = mkdtemp(prefix="integration_tests_project_") + try: + shutil.copytree( + DBT_PROJECT_PATH, + dbt_project_copy_dir, + dirs_exist_ok=True, + symlinks=True, + ) + yield dbt_project_copy_dir + finally: + shutil.rmtree(dbt_project_copy_dir) + + @pytest.fixture(scope="session", autouse=True) -def init_tests_env(target, tmp_path_factory, worker_id: str): +def init_tests_env(target, tmp_path_factory, worker_id: str, project_dir_copy: str): # Tests are not multi-threaded. if worker_id == "master": - env.init(target) + env.init(target, project_dir_copy) return # Temp dir shared by all workers. @@ -24,7 +45,7 @@ def init_tests_env(target, tmp_path_factory, worker_id: str): if env_ready_indicator_path.is_file(): return else: - env.init(target) + env.init(target, project_dir_copy) env_ready_indicator_path.touch() @@ -59,8 +80,8 @@ def requires_dbt_version(request): @pytest.fixture -def dbt_project(target: str) -> DbtProject: - return DbtProject(target) +def dbt_project(target: str, project_dir_copy: str) -> DbtProject: + return DbtProject(target, project_dir_copy) @pytest.fixture(scope="session") diff --git a/integration_tests/tests/data_seeder.py b/integration_tests/tests/data_seeder.py index 3deb7b3bf..720433fec 100644 --- a/integration_tests/tests/data_seeder.py +++ b/integration_tests/tests/data_seeder.py @@ -1,7 +1,7 @@ import csv +from pathlib import Path from typing import List -import dbt_project from elementary.clients.dbt.dbt_runner import DbtRunner from logger import get_logger @@ -11,14 +11,18 @@ class DbtDataSeeder: - def __init__(self, dbt_runner: DbtRunner): + def __init__( + self, dbt_runner: DbtRunner, dbt_project_path: Path, seeds_dir_path: Path + ): self.dbt_runner = dbt_runner + self.dbt_project_path = dbt_project_path + self.seeds_dir_path = seeds_dir_path def seed(self, data: List[dict], table_name: str): - seed_path = dbt_project.SEEDS_DIR_PATH.joinpath(f"{table_name}.csv") + seed_path = self.seeds_dir_path.joinpath(f"{table_name}.csv") try: with seed_path.open("w") as seed_file: - relative_seed_path = seed_path.relative_to(dbt_project.PATH) + relative_seed_path = seed_path.relative_to(self.dbt_project_path) writer = csv.DictWriter(seed_file, fieldnames=data[0].keys()) writer.writeheader() writer.writerows(data) diff --git a/integration_tests/tests/dbt_project.py b/integration_tests/tests/dbt_project.py index 4c5df9b4f..48ebdc90c 100644 --- a/integration_tests/tests/dbt_project.py +++ b/integration_tests/tests/dbt_project.py @@ -9,11 +9,6 @@ from logger import get_logger from ruamel.yaml import YAML -PATH = Path(__file__).parent.parent / "dbt_project" -MODELS_DIR_PATH = PATH / "models" -TMP_MODELS_DIR_PATH = MODELS_DIR_PATH / "tmp" -SEEDS_DIR_PATH = PATH / "data" - _DEFAULT_VARS = { "disable_dbt_invocation_autoupload": True, "disable_dbt_artifacts_autoupload": True, @@ -22,12 +17,22 @@ "collect_metrics": False, } +DUMMY_MODEL_FILE_PATTERN = """ +{{{{ + config ( + materialized = '{materialization}' + ) +}}}} + +SELECT 1 AS col +""" + logger = get_logger(__name__) -def get_dbt_runner(target: str) -> DbtRunner: +def get_dbt_runner(target: str, project_dir: str) -> DbtRunner: return DbtRunner( - str(PATH), + project_dir, target=target, vars=_DEFAULT_VARS.copy(), raise_on_failure=False, @@ -35,8 +40,13 @@ def get_dbt_runner(target: str) -> DbtRunner: class DbtProject: - def __init__(self, target: str): - self.dbt_runner = get_dbt_runner(target) + def __init__(self, target: str, project_dir: str): + self.dbt_runner = get_dbt_runner(target, project_dir) + + self.project_dir_path = Path(project_dir) + self.models_dir_path = self.project_dir_path / "models" + self.tmp_models_dir_path = self.models_dir_path / "tmp" + self.seeds_dir_path = self.project_dir_path / "data" def run_query(self, prerendered_query: str): results = json.loads( @@ -91,6 +101,8 @@ def test( data: Optional[List[dict]] = None, as_model: bool = False, table_name: Optional[str] = None, + materialization: str = "table", # Only relevant if as_model=True + test_vars: Optional[dict] = None, *, multiple_results: Literal[False] = False, ) -> Dict[str, Any]: @@ -107,6 +119,8 @@ def test( data: Optional[List[dict]] = None, as_model: bool = False, table_name: Optional[str] = None, + materialization: str = "table", # Only relevant if as_model=True + test_vars: Optional[dict] = None, *, multiple_results: Literal[True], ) -> List[Dict[str, Any]]: @@ -122,6 +136,8 @@ def test( data: Optional[List[dict]] = None, as_model: bool = False, table_name: Optional[str] = None, + materialization: str = "table", # Only relevant if as_model=True + test_vars: Optional[dict] = None, *, multiple_results: bool = False, ) -> Union[Dict[str, Any], List[Dict[str, Any]]]: @@ -151,7 +167,9 @@ def test( "version": 2, "models": [table_yaml], } - temp_table_ctx = self.create_temp_model_for_existing_table(test_id) + temp_table_ctx = self.create_temp_model_for_existing_table( + test_id, materialization + ) else: props_yaml = { "version": 2, @@ -169,11 +187,15 @@ def test( self.seed(data, table_name) with temp_table_ctx: with NamedTemporaryFile( - dir=TMP_MODELS_DIR_PATH, suffix=".yaml" + dir=self.tmp_models_dir_path, + prefix="integration_tests_", + suffix=".yaml", ) as props_file: YAML().dump(props_yaml, props_file) - relative_props_path = Path(props_file.name).relative_to(PATH) - self.dbt_runner.test(select=str(relative_props_path)) + relative_props_path = Path(props_file.name).relative_to( + self.project_dir_path + ) + self.dbt_runner.test(select=str(relative_props_path), vars=test_vars) if multiple_results: return self._read_test_results(test_id) @@ -181,13 +203,19 @@ def test( return self._read_single_test_result(test_id) def seed(self, data: List[dict], table_name: str): - return DbtDataSeeder(self.dbt_runner).seed(data, table_name) + return DbtDataSeeder( + self.dbt_runner, self.project_dir_path, self.seeds_dir_path + ).seed(data, table_name) @contextmanager - def create_temp_model_for_existing_table(self, table_name: str): - model_path = TMP_MODELS_DIR_PATH.joinpath(f"{table_name}.sql") - model_path.write_text("SELECT 1 AS col") - relative_model_path = model_path.relative_to(PATH) + def create_temp_model_for_existing_table( + self, table_name: str, materialization: str + ): + model_path = self.tmp_models_dir_path.joinpath(f"{table_name}.sql") + model_path.write_text( + DUMMY_MODEL_FILE_PATTERN.format(materialization=materialization) + ) + relative_model_path = model_path.relative_to(self.project_dir_path) try: yield relative_model_path finally: diff --git a/integration_tests/tests/env.py b/integration_tests/tests/env.py index 4076172ec..f60cda7ed 100644 --- a/integration_tests/tests/env.py +++ b/integration_tests/tests/env.py @@ -1,15 +1,15 @@ import dbt_project -def init(target: str): - tests_env = Environment(target) +def init(target: str, project_dir: str): + tests_env = Environment(target, project_dir) tests_env.clear() tests_env.init() class Environment: - def __init__(self, target: str): - self.dbt_runner = dbt_project.get_dbt_runner(target) + def __init__(self, target: str, project_dir: str): + self.dbt_runner = dbt_project.get_dbt_runner(target, project_dir) def clear(self): self.dbt_runner.run_operation("elementary_tests.clear_env") diff --git a/integration_tests/tests/test_all_columns_anomalies.py b/integration_tests/tests/test_all_columns_anomalies.py index 282977faf..bb014752b 100644 --- a/integration_tests/tests/test_all_columns_anomalies.py +++ b/integration_tests/tests/test_all_columns_anomalies.py @@ -1,4 +1,4 @@ -from datetime import date, timedelta +from datetime import datetime, timedelta from typing import Any, Dict, List from data_generator import DATE_FORMAT, generate_dates @@ -13,12 +13,13 @@ def test_anomalyless_all_columns_anomalies(test_id: str, dbt_project: DbtProject): + utc_today = datetime.utcnow().date() data: List[Dict[str, Any]] = [ { TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT), "superhero": superhero, } - for cur_date in generate_dates(base_date=date.today() - timedelta(1)) + for cur_date in generate_dates(base_date=utc_today - timedelta(1)) for superhero in ["Superman", "Batman"] ] test_results = dbt_project.test( @@ -28,7 +29,8 @@ def test_anomalyless_all_columns_anomalies(test_id: str, dbt_project: DbtProject def test_anomalous_all_columns_anomalies(test_id: str, dbt_project: DbtProject): - test_date, *training_dates = generate_dates(base_date=date.today() - timedelta(1)) + utc_today = datetime.utcnow().date() + test_date, *training_dates = generate_dates(base_date=utc_today - timedelta(1)) data: List[Dict[str, Any]] = [ {TIMESTAMP_COLUMN: test_date.strftime(DATE_FORMAT), "superhero": None} @@ -53,7 +55,8 @@ def test_anomalous_all_columns_anomalies(test_id: str, dbt_project: DbtProject): def test_all_columns_anomalies_with_where_expression( test_id: str, dbt_project: DbtProject ): - test_date, *training_dates = generate_dates(base_date=date.today() - timedelta(1)) + utc_today = datetime.utcnow().date() + test_date, *training_dates = generate_dates(base_date=utc_today - timedelta(1)) data: List[Dict[str, Any]] = [ { @@ -95,13 +98,21 @@ def test_all_columns_anomalies_with_where_expression( params = dict(DBT_TEST_ARGS, where="universe = 'Marvel'") test_results = dbt_project.test( - test_id, DBT_TEST_NAME, params, multiple_results=True + test_id, + DBT_TEST_NAME, + params, + multiple_results=True, + test_vars={"force_metrics_backfill": True}, ) assert all([res["status"] == "pass" for res in test_results]) params = dict(DBT_TEST_ARGS, where="universe = 'DC'") test_results = dbt_project.test( - test_id, DBT_TEST_NAME, params, multiple_results=True + test_id, + DBT_TEST_NAME, + params, + multiple_results=True, + test_vars={"force_metrics_backfill": True}, ) col_to_status = {res["column_name"].lower(): res["status"] for res in test_results} assert col_to_status == { diff --git a/integration_tests/tests/test_anomalies_backfill_logic.py b/integration_tests/tests/test_anomalies_backfill_logic.py new file mode 100644 index 000000000..ed2075e93 --- /dev/null +++ b/integration_tests/tests/test_anomalies_backfill_logic.py @@ -0,0 +1,262 @@ +from datetime import datetime, timedelta + +import dateutil.parser +from data_generator import DATE_FORMAT, generate_dates +from dbt_project import DbtProject + +BACKFILL_DAYS = 2 +DAYS_BACK = 14 +TIMESTAMP_COLUMN = "updated_at" +DBT_TEST_NAME = "elementary.volume_anomalies" +DBT_TEST_ARGS = {"timestamp_column": TIMESTAMP_COLUMN} + +LATEST_METRICS_QUERY = """ + with metrics_ordered as ( + select + bucket_start, + metric_value, + row_number() over (partition by id order by updated_at desc) as row_number + from {{{{ ref("data_monitoring_metrics") }}}} + where metric_name = 'row_count' and lower(full_table_name) like '%{test_id}' + ) + select bucket_start, metric_value from metrics_ordered + where row_number = 1 +""" + + +def get_row_count_metrics(dbt_project: DbtProject, test_id: str): + results = dbt_project.run_query(LATEST_METRICS_QUERY.format(test_id=test_id)) + return { + dateutil.parser.parse(result["bucket_start"]).date(): int( + result["metric_value"] + ) + for result in results + } + + +def test_full_backfill_for_non_incremental_model(dbt_project: DbtProject, test_id: str): + utc_today = datetime.utcnow().date() + data_dates = generate_dates(base_date=utc_today - timedelta(1)) + + data = [ + {TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)} + for cur_date in data_dates + for _ in range(5) + ] + test_result = dbt_project.test( + test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data, as_model=True + ) + assert test_result["status"] == "pass" + assert get_row_count_metrics(dbt_project, test_id) == { + cur_date: 5 + for cur_date in data_dates + if cur_date >= utc_today - timedelta(DAYS_BACK) + } + + data = [ + {TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)} for cur_date in data_dates + ] + test_result = dbt_project.test( + test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data, as_model=True + ) + assert test_result["status"] == "pass" + assert get_row_count_metrics(dbt_project, test_id) == { + cur_date: 1 + for cur_date in data_dates + if cur_date >= utc_today - timedelta(DAYS_BACK) + } + + +def test_partial_backfill_for_incremental_models(dbt_project: DbtProject, test_id: str): + utc_today = datetime.utcnow().date() + data_dates = generate_dates(base_date=utc_today - timedelta(1)) + + data = [ + {TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)} + for cur_date in data_dates + for _ in range(5) + ] + test_result = dbt_project.test( + test_id, + DBT_TEST_NAME, + DBT_TEST_ARGS, + data=data, + as_model=True, + materialization="incremental", + ) + assert test_result["status"] == "pass" + assert get_row_count_metrics(dbt_project, test_id) == { + cur_date: 5 + for cur_date in data_dates + if cur_date >= utc_today - timedelta(DAYS_BACK) + } + + # Reload data to the table with 1 row per date instead of 5. If the backfill logic is working, + # only metrics for the last 2 days should be updated and the test should fail because the metric + # drops from 5 to 1 in these days. + data = [ + {TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)} for cur_date in data_dates + ] + test_result = dbt_project.test( + test_id, + DBT_TEST_NAME, + DBT_TEST_ARGS, + data=data, + as_model=True, + materialization="incremental", + ) + assert test_result["status"] == "fail" + assert get_row_count_metrics(dbt_project, test_id) == { + cur_date: 5 if cur_date < utc_today - timedelta(BACKFILL_DAYS) else 1 + for cur_date in data_dates + if cur_date >= utc_today - timedelta(DAYS_BACK) + } + + +def test_longer_backfill_in_case_of_a_gap(dbt_project: DbtProject, test_id: str): + date_gap_size = 5 + utc_today = datetime.utcnow().date() + data_dates = generate_dates(base_date=utc_today - timedelta(1)) + + data = [ + {TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)} + for cur_date in data_dates + for _ in range(5) + if cur_date < utc_today - timedelta(date_gap_size) + ] + dbt_project.test( + test_id, + DBT_TEST_NAME, + DBT_TEST_ARGS, + data=data, + as_model=True, + materialization="incremental", + test_vars={ + "custom_run_started_at": ( + datetime.utcnow() - timedelta(date_gap_size) + ).isoformat() + }, + ) + assert get_row_count_metrics(dbt_project, test_id) == { + cur_date: 5 + for cur_date in data_dates + if utc_today - timedelta(DAYS_BACK + date_gap_size) + <= cur_date + < utc_today - timedelta(date_gap_size) + } + + data = [ + {TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)} for cur_date in data_dates + ] + dbt_project.test( + test_id, + DBT_TEST_NAME, + DBT_TEST_ARGS, + data=data, + as_model=True, + materialization="incremental", + ) + assert get_row_count_metrics(dbt_project, test_id) == { + cur_date: 5 if cur_date < utc_today - timedelta(date_gap_size) else 1 + for cur_date in data_dates + if cur_date >= utc_today - timedelta(DAYS_BACK + date_gap_size) + } + + +def test_full_backfill_if_metric_not_updated_for_a_long_time( + dbt_project: DbtProject, test_id: str +): + date_gap_size = 15 + utc_today = datetime.utcnow().date() + data_dates = generate_dates(base_date=utc_today - timedelta(1)) + + data = [ + {TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)} + for cur_date in data_dates + for _ in range(5) + if cur_date < utc_today - timedelta(date_gap_size) + ] + dbt_project.test( + test_id, + DBT_TEST_NAME, + DBT_TEST_ARGS, + data=data, + as_model=True, + materialization="incremental", + test_vars={ + "custom_run_started_at": ( + datetime.utcnow() - timedelta(date_gap_size) + ).isoformat() + }, + ) + assert get_row_count_metrics(dbt_project, test_id) == { + cur_date: 5 + for cur_date in data_dates + if utc_today - timedelta(DAYS_BACK + date_gap_size) + <= cur_date + < utc_today - timedelta(date_gap_size) + } + + data = [ + {TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)} for cur_date in data_dates + ] + dbt_project.test( + test_id, + DBT_TEST_NAME, + DBT_TEST_ARGS, + data=data, + as_model=True, + materialization="incremental", + ) + assert get_row_count_metrics(dbt_project, test_id) == { + cur_date: 5 if cur_date < utc_today - timedelta(DAYS_BACK) else 1 + for cur_date in data_dates + if ( + utc_today - timedelta(DAYS_BACK + date_gap_size) + <= cur_date + < utc_today - timedelta(date_gap_size) + or cur_date >= utc_today - timedelta(DAYS_BACK) + ) + } + + +def test_backfill_when_metric_doesnt_exist_back_enough( + dbt_project: DbtProject, test_id: str +): + utc_today = datetime.utcnow().date() + data_dates = generate_dates(base_date=utc_today - timedelta(1)) + + data = [ + {TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)} + for cur_date in data_dates + for _ in range(5) + ] + dbt_project.test( + test_id, + DBT_TEST_NAME, + DBT_TEST_ARGS, + data=data, + as_model=True, + materialization="incremental", + ) + assert get_row_count_metrics(dbt_project, test_id) == { + cur_date: 5 + for cur_date in data_dates + if cur_date >= utc_today - timedelta(DAYS_BACK) + } + + data = [ + {TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)} for cur_date in data_dates + ] + dbt_project.test( + test_id, + DBT_TEST_NAME, + DBT_TEST_ARGS, + data=data, + as_model=True, + materialization="incremental", + test_vars={"days_back": 21}, + ) + assert get_row_count_metrics(dbt_project, test_id) == { + cur_date: 1 for cur_date in data_dates if cur_date >= utc_today - timedelta(21) + } diff --git a/integration_tests/tests/test_column_anomalies.py b/integration_tests/tests/test_column_anomalies.py index b41b76279..cc145dd5e 100644 --- a/integration_tests/tests/test_column_anomalies.py +++ b/integration_tests/tests/test_column_anomalies.py @@ -1,4 +1,4 @@ -from datetime import date, timedelta +from datetime import datetime, timedelta from typing import Any, Dict, List from data_generator import DATE_FORMAT, generate_dates @@ -13,12 +13,13 @@ def test_anomalyless_column_anomalies(test_id: str, dbt_project: DbtProject): + utc_today = datetime.utcnow().date() data: List[Dict[str, Any]] = [ { TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT), "superhero": superhero, } - for cur_date in generate_dates(base_date=date.today() - timedelta(1)) + for cur_date in generate_dates(base_date=utc_today - timedelta(1)) for superhero in ["Superman", "Batman"] ] test_result = dbt_project.test( @@ -28,7 +29,8 @@ def test_anomalyless_column_anomalies(test_id: str, dbt_project: DbtProject): def test_anomalous_column_anomalies(test_id: str, dbt_project: DbtProject): - test_date, *training_dates = generate_dates(base_date=date.today() - timedelta(1)) + utc_today = datetime.utcnow().date() + test_date, *training_dates = generate_dates(base_date=utc_today - timedelta(1)) data: List[Dict[str, Any]] = [ {TIMESTAMP_COLUMN: test_date.strftime(DATE_FORMAT), "superhero": None} @@ -49,8 +51,9 @@ def test_anomalous_column_anomalies(test_id: str, dbt_project: DbtProject): assert test_result["status"] == "fail" -def test_column_anomalies_with_where_expression(test_id: str, dbt_project: DbtProject): - test_date, *training_dates = generate_dates(base_date=date.today() - timedelta(1)) +def test_column_anomalies_with_where_parameter(test_id: str, dbt_project: DbtProject): + utc_today = datetime.utcnow().date() + test_date, *training_dates = generate_dates(base_date=utc_today - timedelta(1)) data: List[Dict[str, Any]] = [ { @@ -87,12 +90,20 @@ def test_column_anomalies_with_where_expression(test_id: str, dbt_project: DbtPr params = dict(DBT_TEST_ARGS, where="universe = 'Marvel'") test_result = dbt_project.test( - test_id, DBT_TEST_NAME, params, test_column="superhero" + test_id, + DBT_TEST_NAME, + params, + test_column="superhero", + test_vars={"force_metrics_backfill": True}, ) assert test_result["status"] == "pass" params = dict(DBT_TEST_ARGS, where="universe = 'DC'") test_result = dbt_project.test( - test_id, DBT_TEST_NAME, params, test_column="superhero" + test_id, + DBT_TEST_NAME, + params, + test_column="superhero", + test_vars={"force_metrics_backfill": True}, ) assert test_result["status"] == "fail" diff --git a/integration_tests/tests/test_dimension_anomalies.py b/integration_tests/tests/test_dimension_anomalies.py index c70127a36..8d7d84d08 100644 --- a/integration_tests/tests/test_dimension_anomalies.py +++ b/integration_tests/tests/test_dimension_anomalies.py @@ -1,4 +1,4 @@ -from datetime import date, timedelta +from datetime import datetime, timedelta from typing import Any, Dict, List from data_generator import DATE_FORMAT, generate_dates @@ -10,12 +10,13 @@ def test_anomalyless_dimension_anomalies(test_id: str, dbt_project: DbtProject): + utc_today = datetime.utcnow().date() data: List[Dict[str, Any]] = [ { TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT), "superhero": superhero, } - for cur_date in generate_dates(base_date=date.today() - timedelta(1)) + for cur_date in generate_dates(base_date=utc_today - timedelta(1)) for superhero in ["Superman", "Spiderman"] ] test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data) @@ -23,7 +24,8 @@ def test_anomalyless_dimension_anomalies(test_id: str, dbt_project: DbtProject): def test_anomalous_dimension_anomalies(test_id: str, dbt_project: DbtProject): - test_date, *training_dates = generate_dates(base_date=date.today() - timedelta(1)) + utc_today = datetime.utcnow().date() + test_date, *training_dates = generate_dates(base_date=utc_today - timedelta(1)) data: List[Dict[str, Any]] = [ { @@ -48,7 +50,8 @@ def test_anomalous_dimension_anomalies(test_id: str, dbt_project: DbtProject): def test_dimensions_anomalies_with_where_parameter( test_id: str, dbt_project: DbtProject ): - test_date, *training_dates = generate_dates(base_date=date.today() - timedelta(1)) + utc_today = datetime.utcnow().date() + test_date, *training_dates = generate_dates(base_date=utc_today - timedelta(1)) data: List[Dict[str, Any]] = [ { @@ -77,9 +80,13 @@ def test_dimensions_anomalies_with_where_parameter( assert test_result["status"] == "fail" params = dict(DBT_TEST_ARGS, where="universe = 'Marvel'") - test_result = dbt_project.test(test_id, DBT_TEST_NAME, params) + test_result = dbt_project.test( + test_id, DBT_TEST_NAME, params, test_vars={"force_metrics_backfill": True} + ) assert test_result["status"] == "pass" params = dict(params, where="universe = 'DC'") - test_result = dbt_project.test(test_id, DBT_TEST_NAME, params) + test_result = dbt_project.test( + test_id, DBT_TEST_NAME, params, test_vars={"force_metrics_backfill": True} + ) assert test_result["status"] == "fail" diff --git a/integration_tests/tests/test_volume_anomalies.py b/integration_tests/tests/test_volume_anomalies.py index 53d4dd700..e8dabe421 100644 --- a/integration_tests/tests/test_volume_anomalies.py +++ b/integration_tests/tests/test_volume_anomalies.py @@ -1,4 +1,4 @@ -from datetime import date, datetime, timedelta +from datetime import datetime, timedelta from typing import Any, Dict, List import pytest @@ -12,19 +12,21 @@ def test_anomalyless_table_volume_anomalies(test_id: str, dbt_project: DbtProject): + utc_today = datetime.utcnow().date() data = [ {TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)} - for cur_date in generate_dates(base_date=date.today()) + for cur_date in generate_dates(base_date=utc_today) ] test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data) assert test_result["status"] == "pass" def test_full_drop_table_volume_anomalies(test_id: str, dbt_project: DbtProject): + utc_today = datetime.utcnow().date() data = [ {TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)} - for cur_date in generate_dates(base_date=date.today()) - if cur_date < cur_date.today() - timedelta(days=1) + for cur_date in generate_dates(base_date=utc_today) + if cur_date < utc_today - timedelta(days=1) ] test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data) assert test_result["status"] == "fail" @@ -36,9 +38,8 @@ def test_full_drop_table_volume_anomalies(test_id: str, dbt_project: DbtProject) def test_volume_anomalies_with_where_parameter( test_id: str, dbt_project: DbtProject, as_model: bool ): - test_date, *training_dates = generate_dates( - base_date=date.today() - timedelta(days=1) - ) + utc_today = datetime.utcnow().date() + test_date, *training_dates = generate_dates(base_date=utc_today - timedelta(days=1)) data: List[Dict[str, Any]] = [ {TIMESTAMP_COLUMN: test_date.strftime(DATE_FORMAT), "payback": payback} @@ -57,11 +58,23 @@ def test_volume_anomalies_with_where_parameter( assert test_result["status"] == "fail" params = dict(DBT_TEST_ARGS, where="payback = 'karate'") - test_result = dbt_project.test(test_id, DBT_TEST_NAME, params, as_model=as_model) + test_result = dbt_project.test( + test_id, + DBT_TEST_NAME, + params, + as_model=as_model, + test_vars={"force_metrics_backfill": True}, + ) assert test_result["status"] == "pass" params = dict(DBT_TEST_ARGS, where="payback = 'ka-razy'") - test_result = dbt_project.test(test_id, DBT_TEST_NAME, params, as_model=as_model) + test_result = dbt_project.test( + test_id, + DBT_TEST_NAME, + params, + as_model=as_model, + test_vars={"force_metrics_backfill": True}, + ) assert test_result["status"] == "fail" @@ -88,11 +101,12 @@ def test_volume_anomalies_with_time_buckets(test_id: str, dbt_project: DbtProjec def test_volume_anomalies_with_direction_spike(test_id: str, dbt_project: DbtProject): + utc_today = datetime.utcnow().date() data = [ {TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)} - for cur_date in generate_dates(base_date=date.today()) - if cur_date < cur_date.today() - timedelta(days=1) - for _ in range(1 if cur_date < cur_date.today() - timedelta(days=1) else 2) + for cur_date in generate_dates(base_date=utc_today) + if cur_date < utc_today - timedelta(days=1) + for _ in range(1 if cur_date < utc_today - timedelta(days=1) else 2) ] test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data) assert test_result["status"] == "fail" @@ -103,10 +117,11 @@ def test_volume_anomalies_with_direction_spike(test_id: str, dbt_project: DbtPro def test_volume_anomalies_with_direction_drop(test_id: str, dbt_project: DbtProject): + utc_today = datetime.utcnow().date() data = [ {TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)} - for cur_date in generate_dates(base_date=date.today()) - for _ in range(1 if cur_date < cur_date.today() - timedelta(days=1) else 2) + for cur_date in generate_dates(base_date=utc_today) + for _ in range(1 if cur_date < utc_today - timedelta(days=1) else 2) ] test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data) assert test_result["status"] == "fail" @@ -117,15 +132,16 @@ def test_volume_anomalies_with_direction_drop(test_id: str, dbt_project: DbtProj def test_volume_anomalies_with_seasonality(test_id: str, dbt_project: DbtProject): + utc_today = datetime.utcnow().date() dates = generate_dates( - base_date=date.today() - timedelta(days=1), + base_date=utc_today - timedelta(days=1), step=timedelta(weeks=1), days_back=7 * 14, ) data = [ {TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)} for cur_date in dates - if cur_date < cur_date.today() - timedelta(weeks=1) + if cur_date < utc_today - timedelta(weeks=1) ] test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data) assert test_result["status"] == "pass" @@ -136,15 +152,12 @@ def test_volume_anomalies_with_seasonality(test_id: str, dbt_project: DbtProject def test_volume_anomalies_with_sensitivity(test_id: str, dbt_project: DbtProject): + utc_today = datetime.utcnow().date() data = [ {TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)} - for i, cur_date in enumerate(generate_dates(base_date=date.today())) + for i, cur_date in enumerate(generate_dates(base_date=utc_today)) for _ in range( - 1 - if i % 2 == 0 - else 2 - if cur_date < cur_date.today() - timedelta(days=1) - else 3 + 1 if i % 2 == 0 else 2 if cur_date < utc_today - timedelta(days=1) else 3 ) ] test_result = dbt_project.test(test_id, DBT_TEST_NAME, DBT_TEST_ARGS, data=data) @@ -176,10 +189,11 @@ def test_volume_anomalies_no_timestamp(test_id: str, dbt_project: DbtProject): @pytest.mark.only_on_targets(["bigquery"]) def test_wildcard_name_table_volume_anomalies(test_id: str, dbt_project: DbtProject): + utc_today = datetime.utcnow().date() data = [ {TIMESTAMP_COLUMN: cur_date.strftime(DATE_FORMAT)} - for cur_date in generate_dates(base_date=date.today()) - if cur_date < cur_date.today() - timedelta(days=1) + for cur_date in generate_dates(base_date=utc_today) + if cur_date < utc_today - timedelta(days=1) ] wildcarded_table_name = test_id[:-1] + "*" test_result = dbt_project.test( diff --git a/macros/edr/data_monitoring/data_monitors_configuration/get_buckets_configuration.sql b/macros/edr/data_monitoring/data_monitors_configuration/get_buckets_configuration.sql index a69a4fbff..fbfaee6c1 100644 --- a/macros/edr/data_monitoring/data_monitors_configuration/get_buckets_configuration.sql +++ b/macros/edr/data_monitoring/data_monitors_configuration/get_buckets_configuration.sql @@ -16,6 +16,7 @@ {%- set trunc_min_bucket_start_expr = elementary.get_trunc_min_bucket_start_expr(metric_properties, days_back) %} {%- set backfill_bucket_start = elementary.edr_cast_as_timestamp(elementary.edr_quote(elementary.get_backfill_bucket_start(backfill_days))) %} {%- set full_table_name = elementary.relation_to_full_name(model_relation) %} + {%- set force_metrics_backfill = elementary.get_config_var('force_metrics_backfill') %} {%- if monitors %} {%- set monitors_tuple = elementary.strings_list_to_tuple(monitors) %} @@ -47,8 +48,8 @@ {%- set incremental_bucket_times_query %} with bucket_times as ( - select min(last_bucket_end) as last_max_bucket_end, - min(first_bucket_end) as last_min_bucket_end, + select max(last_bucket_end) as max_existing_bucket_end, + {{ elementary.edr_cast_as_timestamp(elementary.edr_timeadd(metric_properties.time_bucket.period, -1 * metric_properties.time_bucket.count, 'min(first_bucket_end)')) }} as min_existing_bucket_start, {{ trunc_min_bucket_start_expr }} as days_back_start, {{ backfill_bucket_start }} as backfill_start, {{ run_start_expr }} as run_started @@ -64,16 +65,16 @@ ), full_buckets_calc as ( select *, - {# How many periods we need to reduce from last_max_bucket_end to backfill full time buckets #} + {# How many periods we need to reduce from max_existing_bucket_end to backfill full time buckets #} case - when last_max_bucket_end is not null - then least(ceil({{ elementary.edr_datediff('last_max_bucket_end', 'backfill_start', metric_properties.time_bucket.period) }} / {{ metric_properties.time_bucket.count }}), -1) * {{ metric_properties.time_bucket.count }} + when max_existing_bucket_end is not null + then least(ceil({{ elementary.edr_datediff('max_existing_bucket_end', 'backfill_start', metric_properties.time_bucket.period) }} / {{ metric_properties.time_bucket.count }}), -1) * {{ metric_properties.time_bucket.count }} else 0 end as periods_to_backfill, {# How many periods we need to add to last run time to get only full time buckets #} case - when last_max_bucket_end is not null and last_max_bucket_end > days_back_start and last_min_bucket_end < days_back_start - then floor({{ elementary.edr_datediff('last_max_bucket_end', 'run_started', metric_properties.time_bucket.period) }} / {{ metric_properties.time_bucket.count }}) * {{ metric_properties.time_bucket.count }} + when max_existing_bucket_end is not null and max_existing_bucket_end > days_back_start and min_existing_bucket_start <= days_back_start + then floor({{ elementary.edr_datediff('max_existing_bucket_end', 'run_started', metric_properties.time_bucket.period) }} / {{ metric_properties.time_bucket.count }}) * {{ metric_properties.time_bucket.count }} else floor({{ elementary.edr_datediff('days_back_start', 'run_started', metric_properties.time_bucket.period) }} / {{ metric_properties.time_bucket.count }}) * {{ metric_properties.time_bucket.count }} end as periods_until_max from bucket_times @@ -81,23 +82,23 @@ select case {# This prevents gaps in buckets for the metric #} - when last_max_bucket_end is null then days_back_start {# When this is the first run of this metric #} - when last_max_bucket_end < days_back_start then days_back_start {# When the metric was not collected for a period longer than days_back #} - when last_min_bucket_end > days_back_start then days_back_start {# When the metric was collected recently, but for a period that is smaller than days_back #} - when last_max_bucket_end < backfill_start then last_max_bucket_end {# When the metric was not collected for a period longer than backfill_days #} - else {{ elementary.edr_cast_as_timestamp(elementary.edr_timeadd(metric_properties.time_bucket.period, 'periods_to_backfill', 'last_max_bucket_end')) }} {# When backfill reduce full time buckets from last_max_bucket_end to backfill #} + when max_existing_bucket_end is null then days_back_start {# When this is the first run of this metric #} + when max_existing_bucket_end <= days_back_start then days_back_start {# When the metric was not collected for a period longer than days_back #} + when min_existing_bucket_start > days_back_start then days_back_start {# When the metric was collected recently, but for a period that is smaller than days_back #} + when max_existing_bucket_end <= backfill_start then max_existing_bucket_end {# When the metric was not collected for a period longer than backfill_days #} + else {{ elementary.edr_cast_as_timestamp(elementary.edr_timeadd(metric_properties.time_bucket.period, 'periods_to_backfill', 'max_existing_bucket_end')) }} {# When backfill reduce full time buckets from max_existing_bucket_end to backfill #} end as min_bucket_start, case {# This makes sure we collect only full bucket #} - when last_max_bucket_end is null or last_max_bucket_end < days_back_start or last_min_bucket_end > days_back_start + when max_existing_bucket_end is null or max_existing_bucket_end <= days_back_start or min_existing_bucket_start > days_back_start then {{ elementary.edr_cast_as_timestamp(elementary.edr_timeadd(metric_properties.time_bucket.period, 'periods_until_max', 'days_back_start')) }} {# Add full buckets to days_back_start #} - else {{ elementary.edr_cast_as_timestamp(elementary.edr_timeadd(metric_properties.time_bucket.period, 'periods_until_max', 'last_max_bucket_end')) }} {# Add full buckets to last_max_bucket_end #} + else {{ elementary.edr_cast_as_timestamp(elementary.edr_timeadd(metric_properties.time_bucket.period, 'periods_until_max', 'max_existing_bucket_end')) }} {# Add full buckets to max_existing_bucket_end #} end as max_bucket_end from full_buckets_calc {%- endset %} {# We assume we should also cosider sources as incremental #} - {% if not (elementary.is_incremental_model(elementary.get_model_graph_node(model), source_included=true) or unit_test) %} + {% if force_metrics_backfill or not (elementary.is_incremental_model(elementary.get_model_graph_node(model_relation), source_included=true) or unit_test) %} {%- set buckets = elementary.agate_to_dicts(elementary.run_query(regular_bucket_times_query))[0] %} {%- else %} {%- set buckets = elementary.agate_to_dicts(elementary.run_query(incremental_bucket_times_query))[0] %} diff --git a/macros/edr/system/system_utils/get_config_var.sql b/macros/edr/system/system_utils/get_config_var.sql index cee835c2a..d71302efd 100644 --- a/macros/edr/system/system_utils/get_config_var.sql +++ b/macros/edr/system/system_utils/get_config_var.sql @@ -48,7 +48,7 @@ 'upload_artifacts_method': 'diff', 'project_name': none, 'elementary_full_refresh': false, - 'min_training_set_size': 14, + 'min_training_set_size': 7, 'cache_artifacts': true, 'anomaly_direction': 'both', 'store_result_rows_in_own_table': true, @@ -58,6 +58,7 @@ 'collect_metrics': true, 'upload_dbt_columns': false, 'clean_elementary_temp_tables': true, + 'force_metrics_backfill': false } %} {{- return(default_config) -}} {%- endmacro -%} diff --git a/macros/edr/tests/test_table_anomalies.sql b/macros/edr/tests/test_table_anomalies.sql index 4ad925c81..358526676 100644 --- a/macros/edr/tests/test_table_anomalies.sql +++ b/macros/edr/tests/test_table_anomalies.sql @@ -48,7 +48,7 @@ monitors=table_monitors, metric_properties=metric_properties) %} {%- endif %} - {{ elementary.debug_log('min_bucket_start: ' ~ min_bucket_start ~ ' | max_bucket_end: ' ~ min_bucket_start ) }} + {{ elementary.debug_log('min_bucket_start: ' ~ min_bucket_start ~ ' | max_bucket_end: ' ~ max_bucket_end ) }} {#- execute table monitors and write to temp test table -#} {{ elementary.test_log('start', full_table_name) }}