Skip to content

Commit

Permalink
Add generic test for exposure schema validation
Browse files Browse the repository at this point in the history
[NOTE] Skipped spark mocked integration tests, covered in non mocked tests, was failing on spark 1.3.0
  • Loading branch information
erikzaadi committed Sep 26, 2023
1 parent dd492e0 commit b408673
Show file tree
Hide file tree
Showing 12 changed files with 373 additions and 4 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test-warehouse.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ jobs:
uses: actions/setup-python@v4
with:
python-version: "3.8.17"
cache: "pip"

- name: Install Spark requirements
if: inputs.warehouse-type == 'spark'
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/dbt_project/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ profile: "elementary_tests"
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["data"]
seed-paths: ["data", "seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]

Expand Down
1 change: 1 addition & 0 deletions integration_tests/dbt_project/models/customers.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
select * from {{ ref('stg_customers') }}
42 changes: 42 additions & 0 deletions integration_tests/dbt_project/models/exposures.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
version: 2

exposures:
- name: customers
label: CustomersFTW
type: dashboard
maturity: high
url: https://bi.tool/dashboards/1
description: >
Did someone say "exponential growth"?
depends_on:
- ref('customers')

owner:
name: Callum McData
email: [email protected]
meta:
referenced_columns:
- column_name: id
data_type: numeric
node: ref('customers')

- name: orders
label: Returned Orders
type: dashboard
maturity: high
url: https://bi.tool/dashboards/2
description: >
Did someone say "exponential growth"?
depends_on:
- ref('orders')

owner:
name: Callum McData
email: [email protected]
meta:
referenced_columns:
- column_name: "order_id"
data_type: "string"
- column_name: "ZOMG"
1 change: 1 addition & 0 deletions integration_tests/dbt_project/models/orders.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
select order_id, customer_id, amount from {{ ref('stg_orders') }}
35 changes: 35 additions & 0 deletions integration_tests/dbt_project/models/schema.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
version: 2

models:
- name: customers
description: This table has basic information about a customer, as well as some derived facts based on a customer's orders
tests:
- elementary.exposure_schema_validity:
tags: [exposure_customers]

columns:
- name: id
description: This is a unique identifier for a customer

- name: name
description: Customer's name.

- name: orders
description: This table has basic information about orders, as well as some derived facts based on payments

tests:
- elementary.exposure_schema_validity:
tags: [exposure_orders]

columns:
- name: order_id
description: This is a unique identifier for an order

- name: customer_id
description: Foreign key to the customers table

- name: order_date
description: Date (UTC) that the order was placed

- name: amount
description: Total amount (AUD) of the order
3 changes: 3 additions & 0 deletions integration_tests/dbt_project/seeds/stg_customers.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
id,name
1,Erik
2,Zaadi
6 changes: 6 additions & 0 deletions integration_tests/dbt_project/seeds/stg_orders.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
order_id,customer_id,amount
1,1,42
2,1,42
3,1,42
4,1,42
5,2,42
200 changes: 200 additions & 0 deletions integration_tests/tests/test_exposure_schema_validity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
from typing import List

import pytest
from dbt_project import DbtProject

DBT_TEST_NAME = "elementary.exposure_schema_validity"


def seed(dbt_project: DbtProject):
seed_result, seed_output = dbt_project.dbt_runner._run_command(
command_args=["seed", "--full-refresh"],
log_format="text",
capture_output=True,
quiet=True,
log_output=False,
)
if seed_result is False:
assert "" == seed_output
assert seed_result is True


def validate_failing_query_output(
dbt_dir: str,
test_output: str,
table_name: str,
expected_content_strings: List[str],
):
generated_query = f"target/compiled/elementary_tests/models/schema.yml/elementary_exposure_schema_validity_{table_name}_.sql"
assert generated_query in test_output
output_file = f"{dbt_dir}/{generated_query}"
with open(output_file) as f:
generated_query_content = f.read()
for content_string_to_validate in expected_content_strings:
assert content_string_to_validate in generated_query_content


def test_exposure_schema_validity_existing_exposure_yml_invalid(
test_id: str, dbt_project: DbtProject
):
seed(dbt_project)
run_result = dbt_project.dbt_runner.run(
models="orders", full_refresh=True, quiet=True
)
assert run_result is True
test_result, test_output = dbt_project.dbt_runner._run_command(
command_args=["test", "-s", "tag:exposure_orders"],
log_format="text",
capture_output=True,
quiet=True,
log_output=False,
)
assert test_result is False
validate_failing_query_output(
dbt_project.project_dir_path,
test_output,
"orders",
[
"different data type for the column order_id string vs",
"ZOMG column missing in the model",
],
)


def test_exposure_schema_validity_existing_exposure_yml_valid(
test_id: str, dbt_project: DbtProject
):
seed(dbt_project)
run_result = dbt_project.dbt_runner.run(
models="customers", full_refresh=True, quiet=True
)
assert run_result is True
test_result, test_output = dbt_project.dbt_runner._run_command(
command_args=["test", "-s", "tag:exposure_customers"],
capture_output=True,
quiet=True,
log_output=False,
)
assert test_result is True


@pytest.mark.skip_targets(["spark"])
def test_exposure_schema_validity_no_exposures(test_id: str, dbt_project: DbtProject):
test_result = dbt_project.test(test_id, DBT_TEST_NAME)
assert test_result["status"] == "pass"


@pytest.mark.skip_targets(["spark"])
def test_exposure_schema_validity_correct_columns_and_types(
test_id: str, dbt_project: DbtProject
):
explicit_target_for_bigquery = (
"other"
if dbt_project.dbt_runner.target in ["bigquery", "snowflake", ""]
else "string"
)
DBT_TEST_ARGS = {
"node": "models.exposures_test",
"columns": [{"name": "order_id", "dtype": "string"}],
"exposures": {
"ZOMG": {
"meta": {
"referenced_columns": [
{
"column_name": "order_id",
"data_type": explicit_target_for_bigquery,
}
]
},
"url": "http://bla.com",
"name": "ZOMG",
"depends_on": {"nodes": ["models.exposures_test"]},
}
},
}
test_result = dbt_project.test(
test_id, DBT_TEST_NAME, DBT_TEST_ARGS, columns=[dict(name="bla")], as_model=True
)
assert test_result["status"] == "pass"


@pytest.mark.skip_targets(["spark"])
def test_exposure_schema_validity_correct_columns_and_invalid_type(
test_id: str, dbt_project: DbtProject
):
DBT_TEST_ARGS = {
"node": "models.exposures_test",
"columns": [{"name": "order_id", "dtype": "numeric"}],
"exposures": {
"ZOMG": {
"meta": {
"referenced_columns": [
{"column_name": "order_id", "data_type": "string"}
]
},
"url": "http://bla.com",
"name": "ZOMG",
"depends_on": {"nodes": ["models.exposures_test"]},
}
},
}
test_result = dbt_project.test(
test_id, DBT_TEST_NAME, DBT_TEST_ARGS, columns=[dict(name="bla")], as_model=True
)

assert (
"different data type for the column order_id string vs"
in test_result["test_results_query"]
)
assert test_result["status"] == "fail"


@pytest.mark.skip_targets(["spark"])
def test_exposure_schema_validity_correct_columns_and_missing_type(
test_id: str, dbt_project: DbtProject
):
DBT_TEST_ARGS = {
"node": "models.exposures_test",
"columns": [{"name": "order_id", "dtype": "numeric"}],
"exposures": {
"ZOMG": {
"meta": {"referenced_columns": [{"column_name": "order_id"}]},
"url": "http://bla.com",
"name": "ZOMG",
"depends_on": {"nodes": ["models.exposures_test"]},
}
},
}
test_result = dbt_project.test(
test_id, DBT_TEST_NAME, DBT_TEST_ARGS, columns=[dict(name="bla")], as_model=True
)

assert test_result["status"] == "pass"


@pytest.mark.skip_targets(["spark"])
def test_exposure_schema_validity_missing_columns(
test_id: str, dbt_project: DbtProject
):
DBT_TEST_ARGS = {
"node": "models.exposures_test",
"columns": [{"name": "order", "dtype": "numeric"}],
"exposures": {
"ZOMG": {
"meta": {
"referenced_columns": [
{"column_name": "order_id", "data_type": "string"}
]
},
"url": "http://bla.com",
"name": "ZOMG",
"depends_on": {"nodes": ["models.exposures_test"]},
}
},
}
test_result = dbt_project.test(
test_id, DBT_TEST_NAME, DBT_TEST_ARGS, columns=[dict(name="bla")], as_model=True
)

assert "order_id column missing in the model" in test_result["test_results_query"]
assert test_result["status"] == "fail"
72 changes: 72 additions & 0 deletions macros/edr/tests/test_exposure_schema_validity.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
{% test exposure_schema_validity(model, exposures, node, columns) %}
{%- if not execute -%}
{%- do return(none) -%}
{%- endif -%}

{%- if dbt_version <= '1.3.0' -%}
{# attached_node is only available on newer dbt versions #}
{%- set base_node = context['model']['depends_on']['nodes'][0] -%}
{%- else -%}
{%- set base_node = context['model']['attached_node'] -%}
{%- endif -%}

{# Parameters used only for dependency injection in integration tests #}
{%- set node = node or base_node -%}
{%- set exposures = (exposures or graph.exposures).values() -%}
{%- set columns = columns or adapter.get_columns_in_relation(model) -%}

{%- set model_relation = elementary.get_model_relation_for_test(model, context["model"]) -%}
{%- set full_table_name = elementary.relation_to_full_name(model_relation) -%}
{{- elementary.test_log('start', full_table_name, 'exposure validation') -}}

{%- set matching_exposures = [] -%}

{%- for exposure in exposures -%}
{%- if node in exposure.depends_on.nodes and (exposure['meta'] or none) is not none -%}
{%- do matching_exposures.append(exposure) -%}
{%- endif -%}
{%- endfor -%}
{%- if matching_exposures | length > 0 -%}
{%- set columns_dict = {} -%}
{%- for column in columns -%}
{%- do columns_dict.update({ column['name'].strip('"').strip("'") | upper : elementary.normalize_data_type(column['dtype']) }) -%}
{%- endfor -%}
{%- set invalid_exposures = [] -%}
{%- for exposure in matching_exposures -%}
{# Depend on meta since column level info is not available on exposures #}
{%- set meta = exposure['meta'] or none -%}
{%- if meta != none and (meta['referenced_columns'] or none) is iterable -%}
{%- for exposure_column in meta['referenced_columns'] -%}
{%- if matching_exposures | length == 1 or (context['render'](exposure_column['node'] or '')) == node -%}
{%- if exposure_column['column_name'] | upper not in columns_dict.keys() -%}
{%- do invalid_exposures.append({
'exposure': exposure['name'],
'url': exposure['url'],
'error': exposure_column['column_name'] ~ ' column missing in the model'
})
-%}
{%- elif (exposure_column['data_type'] or '') != '' and exposure_column['data_type'] != columns_dict[exposure_column['column_name'] | upper] -%}
{%- do invalid_exposures.append({
'exposure': exposure['name'],
'url': exposure['url'],
'error': 'different data type for the column ' ~ exposure_column['column_name'] ~ ' ' ~ exposure_column['data_type'] ~ ' vs ' ~ columns_dict[exposure_column['name'] | upper]
})
-%}
{%- endif -%}
{%- endif -%}
{%- endfor -%}
{%- endif -%}
{%- endfor -%}
{%- if invalid_exposures | length > 0 -%}
{%- for invalid_exposure in invalid_exposures %}
{{ 'UNION ALL ' if not loop.first }}SELECT '{{ invalid_exposure['exposure'] }}' as exposure, '{{ invalid_exposure['url'] }}' as url, '{{ invalid_exposure['error'] }}' as error
{%- endfor -%}
{%- else -%}
{{ elementary.no_results_query() }}
{%- endif -%}
{%- else -%}
{{ elementary.no_results_query() }}

{%- endif -%}
{{ elementary.test_log('end', full_table_name, 'exposure validation') }}
{% endtest %}
Loading

0 comments on commit b408673

Please sign in to comment.