Skip to content

Commit

Permalink
Merge commit '79fd56c4aae313ed2643ef5f9e2c08999b863a00' into ele-3465…
Browse files Browse the repository at this point in the history
…-sync-elementary-schema-dump-batching
  • Loading branch information
dapollak committed Aug 26, 2024
2 parents b1e96a2 + 79fd56c commit a2a8137
Show file tree
Hide file tree
Showing 29 changed files with 395 additions and 204 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test-warehouse.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ jobs:
- name: Start Postgres
if: inputs.warehouse-type == 'postgres'
working-directory: ${{ env.TESTS_DIR }}
run: docker-compose up -d postgres
run: docker compose up -d postgres

- name: Start Trino
if: inputs.warehouse-type == 'trino'
working-directory: ${{ env.TESTS_DIR }}
run: docker-compose -f docker-compose-trino.yml up -d
run: docker compose -f docker-compose-trino.yml up -d

- name: Setup Python
uses: actions/setup-python@v4
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ Available as self-hosted or Cloud service with premium features.
```yml packages.yml
packages:
- package: elementary-data/elementary
version: 0.15.2
version: 0.16.0
## Docs: https://docs.elementary-data.com
```

Expand Down
2 changes: 1 addition & 1 deletion dbt_project.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: "elementary"
version: "0.15.2"
version: "0.16.0"

require-dbt-version: [">=1.0.0", "<2.0.0"]

Expand Down
10 changes: 9 additions & 1 deletion integration_tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,15 @@ elementary_tests:
pip install -r requirements.txt
```

4. Run the tests.
4. Install elementary-data

`elementary-data` is required for testing. Install specific version if latest doesn't fit your needs.

```shell
pip install elementary-data
```

5. Run the tests.

```shell
pytest -vvv -n8
Expand Down
54 changes: 37 additions & 17 deletions integration_tests/tests/test_collect_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,27 @@
TIMESTAMP_COLUMN = "updated_at"
DBT_TEST_NAME = "elementary.collect_metrics"

COL_TO_METRIC_NAMES = {
COL_TO_METRIC_TYPES = {
None: {"row_count"},
"id": {"average"},
"name": {"average_length"},
"*": {"null_count"},
("id", "name"): {"zero_count"}, # Shouldn't do anything on 'name'.
}
EXPECTED_COL_TO_METRIC_NAMES = {
None: {"row_count"},
"id": {"average", "null_count", "zero_count"},
"name": {"average_length", "null_count"},
"updated_at": {"null_count"},
None: {"custom_row_count"},
"id": {"custom_average", "custom_null_count", "custom_zero_count"},
"name": {"custom_average_length", "custom_null_count"},
"updated_at": {"custom_null_count"},
}


DBT_TEST_ARGS = {
"timestamp_column": TIMESTAMP_COLUMN,
"metrics": [
{"name": metric_name, "columns": col_name}
for col_name, metric_names in COL_TO_METRIC_NAMES.items()
for metric_name in metric_names
{"type": metric_type, "name": f"custom_{metric_type}", "columns": col_name}
for col_name, metric_types in COL_TO_METRIC_TYPES.items()
for metric_type in metric_types
],
}

Expand All @@ -54,6 +54,7 @@ def test_collect_metrics(test_id: str, dbt_project: DbtProject):
)
col_to_metric_names = defaultdict(set)
for metric in metrics:
assert metric["metric_type"] is not None
col_name = metric["column_name"].lower() if metric["column_name"] else None
metric_name = metric["metric_name"]
col_to_metric_names[col_name].add(metric_name)
Expand Down Expand Up @@ -109,23 +110,22 @@ def test_collect_group_by_metrics(test_id: str, dbt_project: DbtProject):
DBT_TEST_NAME,
{
**DBT_TEST_ARGS,
"metrics": [
metric
for metric in DBT_TEST_ARGS["metrics"]
if metric["type"] == "row_count"
],
"dimensions": ["dimension"],
},
data=data,
)

assert test_result["status"] == "pass"

# Unfortunately, the dimension's metric name is 'dimension' rather than 'row_count'.
expected_col_to_metric_names = {
**EXPECTED_COL_TO_METRIC_NAMES,
"dimension": {"null_count"},
None: {"dimension"},
}
expected_dim_to_col_to_metric_names = {
"dim1": expected_col_to_metric_names,
"dim2": expected_col_to_metric_names,
None: {None: {"dimension"}},
"dim1": {None: {"custom_row_count"}},
"dim2": {None: {"custom_row_count"}},
None: {None: {"custom_row_count"}},
}
metrics = dbt_project.read_table(
METRICS_TABLE,
Expand All @@ -141,6 +141,26 @@ def test_collect_group_by_metrics(test_id: str, dbt_project: DbtProject):
assert dim_to_col_to_metric_names == expected_dim_to_col_to_metric_names


def test_collect_metrics_unique_metric_name(test_id: str, dbt_project: DbtProject):
args = DBT_TEST_ARGS.copy()
args["metrics"].append(args["metrics"][0])
test_result = dbt_project.test(
test_id,
DBT_TEST_NAME,
args,
)
assert test_result["status"] == "error"

args = DBT_TEST_ARGS.copy()
args["metrics"][0].pop("name")
test_result = dbt_project.test(
test_id,
DBT_TEST_NAME,
args,
)
assert test_result["status"] == "error"


def test_collect_metrics_elementary_disabled(test_id: str, dbt_project: DbtProject):
utc_today = datetime.utcnow().date()
data: List[Dict[str, Any]] = [
Expand Down
32 changes: 32 additions & 0 deletions integration_tests/tests/test_dbt_artifacts/test_artifacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,38 @@ def test_dbt_invocations(dbt_project: DbtProject):
)


def test_seed_run_results(dbt_project: DbtProject):
dbt_project.read_table("seed_run_results", raise_if_empty=False)


def test_job_run_results(dbt_project: DbtProject):
dbt_project.read_table("job_run_results", raise_if_empty=False)


def test_model_run_results(dbt_project: DbtProject):
dbt_project.read_table("model_run_results", raise_if_empty=False)


def test_snapshot_run_results(dbt_project: DbtProject):
dbt_project.read_table("snapshot_run_results", raise_if_empty=False)


def test_monitors_runs(dbt_project: DbtProject):
dbt_project.read_table("monitors_runs", raise_if_empty=False)


def test_dbt_artifacts_hashes(dbt_project: DbtProject):
dbt_project.read_table("dbt_artifacts_hashes", raise_if_empty=False)


def test_anomaly_threshold_sensitivity(dbt_project: DbtProject):
dbt_project.read_table("anomaly_threshold_sensitivity", raise_if_empty=False)


def test_metrics_anomaly_score(dbt_project: DbtProject):
dbt_project.read_table("metrics_anomaly_score", raise_if_empty=False)


@pytest.mark.requires_dbt_version("1.8.0")
def test_source_freshness_results(test_id: str, dbt_project: DbtProject):
source_config = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{% macro get_anomaly_scores_query(test_metrics_table_relation, model_relation, test_configuration, monitors, column_name = none, columns_only = false, metric_properties = none, data_monitoring_metrics_table=none) %}
{% macro get_anomaly_scores_query(test_metrics_table_relation, model_relation, test_configuration, metric_names, column_name = none, columns_only = false, metric_properties = none, data_monitoring_metrics_table=none) %}
{%- set model_graph_node = elementary.get_model_graph_node(model_relation) %}
{%- set full_table_name = elementary.model_node_to_full_name(model_graph_node) %}
{%- set test_execution_id = elementary.get_test_execution_id() %}
Expand Down Expand Up @@ -53,6 +53,7 @@
full_table_name,
column_name,
metric_name,
metric_type,
metric_value,
source_value,
bucket_start,
Expand Down Expand Up @@ -80,7 +81,7 @@
and updated_at > {{ elementary.edr_cast_as_timestamp(elementary.edr_quote(latest_full_refresh)) }}
{% endif %}
and upper(full_table_name) = upper('{{ full_table_name }}')
and metric_name in {{ elementary.strings_list_to_tuple(monitors) }}
and metric_name in {{ elementary.strings_list_to_tuple(metric_names) }}
{%- if column_name %}
and upper(column_name) = upper('{{ column_name }}')
{%- endif %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
{% endmacro %}


{% macro get_metric_buckets_min_and_max(model_relation, backfill_days, days_back, detection_delay=none, monitors=none, column_name=none, metric_properties=none, unit_test=false, unit_test_relation=none) %}
{% macro get_metric_buckets_min_and_max(model_relation, backfill_days, days_back, detection_delay=none, metric_names=none, column_name=none, metric_properties=none, unit_test=false, unit_test_relation=none) %}

{%- set detection_end = elementary.get_detection_end(detection_delay) %}
{%- set detection_end_expr = elementary.edr_cast_as_timestamp(elementary.edr_quote(detection_end)) %}
Expand All @@ -29,8 +29,8 @@
{%- set full_table_name = elementary.relation_to_full_name(model_relation) %}
{%- set force_metrics_backfill = elementary.get_config_var('force_metrics_backfill') %}

{%- if monitors %}
{%- set monitors_tuple = elementary.strings_list_to_tuple(monitors) %}
{%- if metric_names %}
{%- set metric_names_tuple = elementary.strings_list_to_tuple(metric_names) %}
{%- endif %}

{%- if unit_test %}
Expand Down Expand Up @@ -71,8 +71,8 @@
and bucket_end <= {{ detection_end_expr }}
and upper(full_table_name) = upper('{{ full_table_name }}')
and metric_properties = {{ elementary.dict_to_quoted_json(metric_properties) }}
{%- if monitors %}
and metric_name in {{ monitors_tuple }}
{%- if metric_names %}
and metric_name in {{ metric_names_tuple }}
{%- endif %}
{%- if column_name %}
and upper(column_name) = upper('{{ column_name }}')
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
{% macro column_monitoring_query(monitored_table, monitored_table_relation, min_bucket_start, max_bucket_end, days_back, column_obj, column_monitors, metric_properties, dimensions) %}
{% macro column_monitoring_query(monitored_table, monitored_table_relation, min_bucket_start, max_bucket_end, days_back, column_obj, column_metrics, metric_properties, dimensions) %}
{%- set full_table_name_str = elementary.edr_quote(elementary.relation_to_full_name(monitored_table_relation)) %}
{%- set timestamp_column = metric_properties.timestamp_column %}
{% set prefixed_dimensions = [] %}
{% for dimension_column in dimensions %}
{% do prefixed_dimensions.append("dimension_" ~ dimension_column) %}
{% endfor %}

{% set metric_types = [] %}
{% set metric_name_to_type = {} %}
{% for metric in column_metrics %}
{% do metric_types.append(metric.type) %}
{% do metric_name_to_type.update({metric.name: metric.type}) %}
{% endfor %}


with monitored_table as (
select * from {{ monitored_table }}
Expand Down Expand Up @@ -38,9 +45,9 @@
),
{% endif %}

column_monitors as (
column_metrics as (

{%- if column_monitors %}
{%- if column_metrics %}
{%- set column = column_obj.quoted -%}
select
{%- if timestamp_column %}
Expand All @@ -53,26 +60,26 @@
{% if dimensions | length > 0 %}
{{ elementary.select_dimensions_columns(prefixed_dimensions) }},
{% endif %}
{%- if 'null_count' in column_monitors -%} {{ elementary.null_count(column) }} {%- else -%} null {% endif %} as null_count,
{%- if 'null_percent' in column_monitors -%} {{ elementary.null_percent(column) }} {%- else -%} null {% endif %} as null_percent,
{%- if 'not_null_percent' in column_monitors -%} {{ elementary.not_null_percent(column) }} {%- else -%} null {% endif %} as not_null_percent,
{%- if 'max' in column_monitors -%} {{ elementary.max(column) }} {%- else -%} null {% endif %} as max,
{%- if 'min' in column_monitors -%} {{ elementary.min(column) }} {%- else -%} null {% endif %} as min,
{%- if 'average' in column_monitors -%} {{ elementary.average(column) }} {%- else -%} null {% endif %} as average,
{%- if 'zero_count' in column_monitors -%} {{ elementary.zero_count(column) }} {%- else -%} null {% endif %} as zero_count,
{%- if 'zero_percent' in column_monitors -%} {{ elementary.zero_percent(column) }} {%- else -%} null {% endif %} as zero_percent,
{%- if 'not_zero_percent' in column_monitors -%} {{ elementary.not_zero_percent(column) }} {%- else -%} null {% endif %} as not_zero_percent,
{%- if 'standard_deviation' in column_monitors -%} {{ elementary.standard_deviation(column) }} {%- else -%} null {% endif %} as standard_deviation,
{%- if 'variance' in column_monitors -%} {{ elementary.variance(column) }} {%- else -%} null {% endif %} as variance,
{%- if 'max_length' in column_monitors -%} {{ elementary.max_length(column) }} {%- else -%} null {% endif %} as max_length,
{%- if 'min_length' in column_monitors -%} {{ elementary.min_length(column) }} {%- else -%} null {% endif %} as min_length,
{%- if 'average_length' in column_monitors -%} {{ elementary.average_length(column) }} {%- else -%} null {% endif %} as average_length,
{%- if 'missing_count' in column_monitors -%} {{ elementary.missing_count(column) }} {%- else -%} null {% endif %} as missing_count,
{%- if 'missing_percent' in column_monitors -%} {{ elementary.missing_percent(column) }} {%- else -%} null {% endif %} as missing_percent,
{%- if 'count_true' in column_monitors -%} {{ elementary.count_true(column) }} {%- else -%} null {% endif %} as count_true,
{%- if 'count_false' in column_monitors -%} {{ elementary.count_false(column) }} {%- else -%} null {% endif %} as count_false,
{%- if 'not_missing_percent' in column_monitors -%} {{ elementary.not_missing_percent(column) }} {%- else -%} null {% endif %} as not_missing_percent,
{%- if 'sum' in column_monitors -%} {{ elementary.sum(column) }} {%- else -%} null {% endif %} as sum
{%- if 'null_count' in metric_types -%} {{ elementary.null_count(column) }} {%- else -%} null {% endif %} as null_count,
{%- if 'null_percent' in metric_types -%} {{ elementary.null_percent(column) }} {%- else -%} null {% endif %} as null_percent,
{%- if 'not_null_percent' in metric_types -%} {{ elementary.not_null_percent(column) }} {%- else -%} null {% endif %} as not_null_percent,
{%- if 'max' in metric_types -%} {{ elementary.max(column) }} {%- else -%} null {% endif %} as max,
{%- if 'min' in metric_types -%} {{ elementary.min(column) }} {%- else -%} null {% endif %} as min,
{%- if 'average' in metric_types -%} {{ elementary.average(column) }} {%- else -%} null {% endif %} as average,
{%- if 'zero_count' in metric_types -%} {{ elementary.zero_count(column) }} {%- else -%} null {% endif %} as zero_count,
{%- if 'zero_percent' in metric_types -%} {{ elementary.zero_percent(column) }} {%- else -%} null {% endif %} as zero_percent,
{%- if 'not_zero_percent' in metric_types -%} {{ elementary.not_zero_percent(column) }} {%- else -%} null {% endif %} as not_zero_percent,
{%- if 'standard_deviation' in metric_types -%} {{ elementary.standard_deviation(column) }} {%- else -%} null {% endif %} as standard_deviation,
{%- if 'variance' in metric_types -%} {{ elementary.variance(column) }} {%- else -%} null {% endif %} as variance,
{%- if 'max_length' in metric_types -%} {{ elementary.max_length(column) }} {%- else -%} null {% endif %} as max_length,
{%- if 'min_length' in metric_types -%} {{ elementary.min_length(column) }} {%- else -%} null {% endif %} as min_length,
{%- if 'average_length' in metric_types -%} {{ elementary.average_length(column) }} {%- else -%} null {% endif %} as average_length,
{%- if 'missing_count' in metric_types -%} {{ elementary.missing_count(column) }} {%- else -%} null {% endif %} as missing_count,
{%- if 'missing_percent' in metric_types -%} {{ elementary.missing_percent(column) }} {%- else -%} null {% endif %} as missing_percent,
{%- if 'count_true' in metric_types -%} {{ elementary.count_true(column) }} {%- else -%} null {% endif %} as count_true,
{%- if 'count_false' in metric_types -%} {{ elementary.count_false(column) }} {%- else -%} null {% endif %} as count_false,
{%- if 'not_missing_percent' in metric_types -%} {{ elementary.not_missing_percent(column) }} {%- else -%} null {% endif %} as not_missing_percent,
{%- if 'sum' in metric_types -%} {{ elementary.sum(column) }} {%- else -%} null {% endif %} as sum
from filtered_monitored_table
{%- if timestamp_column %}
left join buckets on (edr_bucket_start = start_bucket_in_data)
Expand All @@ -88,10 +95,10 @@

),

column_monitors_unpivot as (
column_metrics_unpivot as (

{%- if column_monitors %}
{% for monitor in column_monitors %}
{%- if column_metrics %}
{% for metric_name, metric_type in metric_name_to_type.items() %}
select
{{ elementary.const_as_string(column_obj.name) }} as edr_column_name,
bucket_start,
Expand All @@ -108,13 +115,14 @@
{{ elementary.null_string() }} as dimension,
{{ elementary.null_string() }} as dimension_value,
{% endif %}
{{ elementary.edr_cast_as_float(monitor) }} as metric_value,
{{ elementary.edr_cast_as_string(elementary.edr_quote(monitor)) }} as metric_name
from column_monitors where {{ monitor }} is not null
{{ elementary.edr_cast_as_float(metric_type) }} as metric_value,
{{ elementary.edr_cast_as_string(elementary.edr_quote(metric_name)) }} as metric_name,
{{ elementary.edr_cast_as_string(elementary.edr_quote(metric_type)) }} as metric_type
from column_metrics where {{ metric_type }} is not null
{% if not loop.last %} union all {% endif %}
{%- endfor %}
{%- else %}
{{ elementary.empty_table([('edr_column_name','string'),('bucket_start','timestamp'),('bucket_end','timestamp'),('bucket_duration_hours','int'),('dimension','string'),('dimension_value','string'),('metric_name','string'),('metric_value','float')]) }}
{{ elementary.empty_table([('edr_column_name','string'),('bucket_start','timestamp'),('bucket_end','timestamp'),('bucket_duration_hours','int'),('dimension','string'),('dimension_value','string'),('metric_name','string'),('metric_type','string'),('metric_value','float')]) }}
{%- endif %}

),
Expand All @@ -125,6 +133,7 @@
{{ elementary.edr_cast_as_string(full_table_name_str) }} as full_table_name,
edr_column_name as column_name,
metric_name,
metric_type,
{{ elementary.edr_cast_as_float('metric_value') }} as metric_value,
{{ elementary.null_string() }} as source_value,
bucket_start,
Expand All @@ -133,7 +142,7 @@
dimension,
dimension_value,
{{elementary.dict_to_quoted_json(metric_properties) }} as metric_properties
from column_monitors_unpivot
from column_metrics_unpivot

)

Expand All @@ -142,6 +151,7 @@
'full_table_name',
'column_name',
'metric_name',
'metric_type',
'dimension',
'dimension_value',
'bucket_end',
Expand All @@ -150,6 +160,7 @@
full_table_name,
column_name,
metric_name,
metric_type,
metric_value,
source_value,
bucket_start,
Expand All @@ -166,9 +177,6 @@
{% macro select_dimensions_columns(dimension_columns, as_prefix="") %}
{% set select_statements %}
{%- for column in dimension_columns -%}
{%- if col_prefix -%}
{{ col_prefix ~ "_" }}
{%- endif -%}
{{ column }}
{%- if as_prefix -%}
{{ " as " ~ as_prefix ~ "_" ~ column }}
Expand Down
Loading

0 comments on commit a2a8137

Please sign in to comment.