Skip to content

Commit

Permalink
Ele 1694 bugfixes + tests for metric backfill logic (#522)
Browse files Browse the repository at this point in the history
* get_test_buckets_min_and_max: bugfixes + clearer names

* get_config_var: make the min_training_set_size much smaller than days_back

* test_table_anomalies: log fix

* tests: add coverage for metrics backfill logic in various cases

* test-warehouse: remove --pre

* test fixes

* test fixes

* get_buckets_configuration: add cast as timestamp to deal with BQ issue when the value is null

* tests fix: don't use ilike since BQ doesn't have it

* tests: copy the dbt project to a temp dir, to avoid races

* CI: allow running with --pre as a parameter

* CI: bugfix

* CI: bugfix

* tests: only copy the dbt project dir once per process

* CI: add daily workflow for running on pre-releases

* CI: bugfix

* CI: bugfix

* tests: temp test - remove project copy to see if it speeds things up

* temporary test number #2 - don't run incremental query

* return to regular logic

* add HTML report as artifact

* make HTML report self contained

* test-warehouse: make report artifact unique

* tests: use UTC today not date.today()

* test fixes

* tests performance optimization + add force_metrics_backfill var
  • Loading branch information
haritamar authored Sep 12, 2023
1 parent 2ea1c6f commit 8fbc24f
Show file tree
Hide file tree
Showing 16 changed files with 491 additions and 97 deletions.
12 changes: 12 additions & 0 deletions .github/workflows/test-all-warehouses-dbt-pre-releases.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
name: Test all warehouse platforms on dbt pre-releases
on:
schedule:
- cron: "0 6 * * *"
workflow_dispatch:

jobs:
test:
uses: ./.github/workflows/test-all-warehouses.yml
secrets: inherit
with:
dbt-version: latest_pre
16 changes: 14 additions & 2 deletions .github/workflows/test-all-warehouses.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,24 @@ on:
required: false
description: Branch or tag to checkout for 'dbt-data-reliability' repository

workflow_call:
inputs:
dbt-version:
type: string
required: false
elementary-ref:
type: string
required: false
dbt-data-reliability-ref:
type: string
required: false

jobs:
test:
strategy:
fail-fast: false
matrix:
dbt-version: ${{ inputs.dbt-version && fromJSON(format('["{0}"]', inputs.dbt-version)) || fromJSON('["1.3.0", null]') }}
dbt-version: ${{ inputs.dbt-version && fromJSON(format('["{0}"]', inputs.dbt-version)) || fromJSON('["1.3.0", "latest_official"]') }}
warehouse-type:
[
postgres,
Expand Down Expand Up @@ -58,4 +70,4 @@ jobs:
with:
result: "failure"
run_id: ${{ github.run_id }}
workflow_name: "Test all warehouse platforms"
workflow_name: ${{ github.workflow }}
17 changes: 13 additions & 4 deletions .github/workflows/test-warehouse.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ on:
dbt-version:
type: string
required: false
default: "latest_official"
description: dbt's version to test with

workflow_call:
Expand All @@ -40,6 +41,7 @@ on:
required: false
dbt-version:
type: string
default: "latest_official"
required: false

env:
Expand Down Expand Up @@ -82,9 +84,10 @@ jobs:
run: sudo apt-get install python-dev libsasl2-dev gcc

- name: Install dbt
run: pip install --pre
"dbt-core${{ inputs.dbt-version && format('=={0}', inputs.dbt-version) }}"
"dbt-${{ (inputs.warehouse-type == 'databricks_catalog' && 'databricks') || (inputs.warehouse-type == 'spark' && 'spark[PyHive]') || inputs.warehouse-type }}${{ inputs.dbt-version && format('<={0}', inputs.dbt-version) }}"
run:
pip install${{ (inputs.dbt-version == 'latest_pre' && ' --pre') || '' }}
"dbt-core${{ (!startsWith(inputs.dbt-version, 'latest') && format('=={0}', inputs.dbt-version)) || '' }}"
"dbt-${{ (inputs.warehouse-type == 'databricks_catalog' && 'databricks') || (inputs.warehouse-type == 'spark' && 'spark[PyHive]') || inputs.warehouse-type }}${{ (!startsWith(inputs.dbt-version, 'latest') && format('<={0}', inputs.dbt-version)) || '' }}"

- name: Install Elementary
run: pip install "./elementary[${{ (inputs.warehouse-type == 'databricks_catalog' && 'databricks') || inputs.warehouse-type }}]"
Expand All @@ -111,7 +114,7 @@ jobs:
- name: Test
working-directory: "${{ env.TESTS_DIR }}/tests"
run: py.test -n8 -vvv --target "${{ inputs.warehouse-type }}" --junit-xml=test-results.xml
run: py.test -n8 -vvv --target "${{ inputs.warehouse-type }}" --junit-xml=test-results.xml --html=detailed_report.html --self-contained-html

- name: Upload test results
if: always()
Expand All @@ -121,3 +124,9 @@ jobs:
summary: true
display-options: fEX
fail-on-empty: true

- name: Upload HTML report
uses: actions/upload-artifact@v3
with:
name: detailed_report_${{ inputs.warehouse-type }}_dbt_${{ inputs.dbt-version }}
path: ${{ env.TESTS_DIR }}/tests/detailed_report.html
1 change: 1 addition & 0 deletions integration_tests/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pytest
pytest-xdist
pytest-parametrization
pytest-html
filelock
urllib3==2.0.4
31 changes: 26 additions & 5 deletions integration_tests/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,41 @@
import shutil
from pathlib import Path
from tempfile import mkdtemp

import env
import pytest
from dbt.version import __version__ as dbt_version
from dbt_project import DbtProject
from filelock import FileLock
from packaging import version

DBT_PROJECT_PATH = Path(__file__).parent.parent / "dbt_project"


def pytest_addoption(parser):
parser.addoption("--target", action="store", default="postgres")


@pytest.fixture(scope="session")
def project_dir_copy():
dbt_project_copy_dir = mkdtemp(prefix="integration_tests_project_")
try:
shutil.copytree(
DBT_PROJECT_PATH,
dbt_project_copy_dir,
dirs_exist_ok=True,
symlinks=True,
)
yield dbt_project_copy_dir
finally:
shutil.rmtree(dbt_project_copy_dir)


@pytest.fixture(scope="session", autouse=True)
def init_tests_env(target, tmp_path_factory, worker_id: str):
def init_tests_env(target, tmp_path_factory, worker_id: str, project_dir_copy: str):
# Tests are not multi-threaded.
if worker_id == "master":
env.init(target)
env.init(target, project_dir_copy)
return

# Temp dir shared by all workers.
Expand All @@ -24,7 +45,7 @@ def init_tests_env(target, tmp_path_factory, worker_id: str):
if env_ready_indicator_path.is_file():
return
else:
env.init(target)
env.init(target, project_dir_copy)
env_ready_indicator_path.touch()


Expand Down Expand Up @@ -59,8 +80,8 @@ def requires_dbt_version(request):


@pytest.fixture
def dbt_project(target: str) -> DbtProject:
return DbtProject(target)
def dbt_project(target: str, project_dir_copy: str) -> DbtProject:
return DbtProject(target, project_dir_copy)


@pytest.fixture(scope="session")
Expand Down
12 changes: 8 additions & 4 deletions integration_tests/tests/data_seeder.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import csv
from pathlib import Path
from typing import List

import dbt_project
from elementary.clients.dbt.dbt_runner import DbtRunner
from logger import get_logger

Expand All @@ -11,14 +11,18 @@


class DbtDataSeeder:
def __init__(self, dbt_runner: DbtRunner):
def __init__(
self, dbt_runner: DbtRunner, dbt_project_path: Path, seeds_dir_path: Path
):
self.dbt_runner = dbt_runner
self.dbt_project_path = dbt_project_path
self.seeds_dir_path = seeds_dir_path

def seed(self, data: List[dict], table_name: str):
seed_path = dbt_project.SEEDS_DIR_PATH.joinpath(f"{table_name}.csv")
seed_path = self.seeds_dir_path.joinpath(f"{table_name}.csv")
try:
with seed_path.open("w") as seed_file:
relative_seed_path = seed_path.relative_to(dbt_project.PATH)
relative_seed_path = seed_path.relative_to(self.dbt_project_path)
writer = csv.DictWriter(seed_file, fieldnames=data[0].keys())
writer.writeheader()
writer.writerows(data)
Expand Down
64 changes: 46 additions & 18 deletions integration_tests/tests/dbt_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,6 @@
from logger import get_logger
from ruamel.yaml import YAML

PATH = Path(__file__).parent.parent / "dbt_project"
MODELS_DIR_PATH = PATH / "models"
TMP_MODELS_DIR_PATH = MODELS_DIR_PATH / "tmp"
SEEDS_DIR_PATH = PATH / "data"

_DEFAULT_VARS = {
"disable_dbt_invocation_autoupload": True,
"disable_dbt_artifacts_autoupload": True,
Expand All @@ -22,21 +17,36 @@
"collect_metrics": False,
}

DUMMY_MODEL_FILE_PATTERN = """
{{{{
config (
materialized = '{materialization}'
)
}}}}
SELECT 1 AS col
"""

logger = get_logger(__name__)


def get_dbt_runner(target: str) -> DbtRunner:
def get_dbt_runner(target: str, project_dir: str) -> DbtRunner:
return DbtRunner(
str(PATH),
project_dir,
target=target,
vars=_DEFAULT_VARS.copy(),
raise_on_failure=False,
)


class DbtProject:
def __init__(self, target: str):
self.dbt_runner = get_dbt_runner(target)
def __init__(self, target: str, project_dir: str):
self.dbt_runner = get_dbt_runner(target, project_dir)

self.project_dir_path = Path(project_dir)
self.models_dir_path = self.project_dir_path / "models"
self.tmp_models_dir_path = self.models_dir_path / "tmp"
self.seeds_dir_path = self.project_dir_path / "data"

def run_query(self, prerendered_query: str):
results = json.loads(
Expand Down Expand Up @@ -91,6 +101,8 @@ def test(
data: Optional[List[dict]] = None,
as_model: bool = False,
table_name: Optional[str] = None,
materialization: str = "table", # Only relevant if as_model=True
test_vars: Optional[dict] = None,
*,
multiple_results: Literal[False] = False,
) -> Dict[str, Any]:
Expand All @@ -107,6 +119,8 @@ def test(
data: Optional[List[dict]] = None,
as_model: bool = False,
table_name: Optional[str] = None,
materialization: str = "table", # Only relevant if as_model=True
test_vars: Optional[dict] = None,
*,
multiple_results: Literal[True],
) -> List[Dict[str, Any]]:
Expand All @@ -122,6 +136,8 @@ def test(
data: Optional[List[dict]] = None,
as_model: bool = False,
table_name: Optional[str] = None,
materialization: str = "table", # Only relevant if as_model=True
test_vars: Optional[dict] = None,
*,
multiple_results: bool = False,
) -> Union[Dict[str, Any], List[Dict[str, Any]]]:
Expand Down Expand Up @@ -151,7 +167,9 @@ def test(
"version": 2,
"models": [table_yaml],
}
temp_table_ctx = self.create_temp_model_for_existing_table(test_id)
temp_table_ctx = self.create_temp_model_for_existing_table(
test_id, materialization
)
else:
props_yaml = {
"version": 2,
Expand All @@ -169,25 +187,35 @@ def test(
self.seed(data, table_name)
with temp_table_ctx:
with NamedTemporaryFile(
dir=TMP_MODELS_DIR_PATH, suffix=".yaml"
dir=self.tmp_models_dir_path,
prefix="integration_tests_",
suffix=".yaml",
) as props_file:
YAML().dump(props_yaml, props_file)
relative_props_path = Path(props_file.name).relative_to(PATH)
self.dbt_runner.test(select=str(relative_props_path))
relative_props_path = Path(props_file.name).relative_to(
self.project_dir_path
)
self.dbt_runner.test(select=str(relative_props_path), vars=test_vars)

if multiple_results:
return self._read_test_results(test_id)
else:
return self._read_single_test_result(test_id)

def seed(self, data: List[dict], table_name: str):
return DbtDataSeeder(self.dbt_runner).seed(data, table_name)
return DbtDataSeeder(
self.dbt_runner, self.project_dir_path, self.seeds_dir_path
).seed(data, table_name)

@contextmanager
def create_temp_model_for_existing_table(self, table_name: str):
model_path = TMP_MODELS_DIR_PATH.joinpath(f"{table_name}.sql")
model_path.write_text("SELECT 1 AS col")
relative_model_path = model_path.relative_to(PATH)
def create_temp_model_for_existing_table(
self, table_name: str, materialization: str
):
model_path = self.tmp_models_dir_path.joinpath(f"{table_name}.sql")
model_path.write_text(
DUMMY_MODEL_FILE_PATTERN.format(materialization=materialization)
)
relative_model_path = model_path.relative_to(self.project_dir_path)
try:
yield relative_model_path
finally:
Expand Down
8 changes: 4 additions & 4 deletions integration_tests/tests/env.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import dbt_project


def init(target: str):
tests_env = Environment(target)
def init(target: str, project_dir: str):
tests_env = Environment(target, project_dir)
tests_env.clear()
tests_env.init()


class Environment:
def __init__(self, target: str):
self.dbt_runner = dbt_project.get_dbt_runner(target)
def __init__(self, target: str, project_dir: str):
self.dbt_runner = dbt_project.get_dbt_runner(target, project_dir)

def clear(self):
self.dbt_runner.run_operation("elementary_tests.clear_env")
Expand Down
Loading

0 comments on commit 8fbc24f

Please sign in to comment.