diff --git a/docs/assertions-builtin.md b/docs/assertions-builtin.md deleted file mode 100644 index 8cbf28da8..000000000 --- a/docs/assertions-builtin.md +++ /dev/null @@ -1,227 +0,0 @@ -# Basic Assertions -## assert_column_unique - -- Description: The column values should be unique. -- Assert: `None` - -
- YAML Example - -``` -your_table_name: - columns: - your_column_name: - tests: - - name: assert_column_unique -``` - -
- -## assert_column_not_null - -- Description: The column values should not be null. -- Assert: `None` - -
- YAML Example - -``` -your_table_name: - columns: - your_column_name: - tests: - - name: assert_column_not_null -``` -
- - -## assert_column_value - -- Description: The column value should pass the given assertion. - -### range -- Assert: - - `gte`: the value should be greater than or equal to - - `gt`: the value should be greater than - - `lte`: the value should be less than or equal to - - `lt`: the value should be less than - -
- YAML Example: The value should be between [0, 10000) - -``` -world_city: - columns: - population: - tests: - - name: assert_column_value - assert: - gte: 0 - le: 10000 -``` -
-
- YAML Example: The value of a datetime column should be >= '2022-01-01' - -``` -world_city: - columns: - create_at: - tests: - - name: assert_column_value - assert: - gte: '2022-01-01; -``` -
- -### set -- Assert: - - `in`: the value should one element of the given set - -
- YAML Example: The value should be one of following values ['US', 'Japan', 'India'] - -``` -world_city: - columns: - country: - tests: - - name: assert_column_value - assert: - in: ['US', 'Japan', 'India'] -``` -
- -# Schema Assertions - -## assert_column_exist - -- Description: The column should exist. -- Assert: None - -
- YAML Example - -``` -your_table_name: - columns: - your_column_name: - tests: - - name: assert_column_exist -``` -
- -## assert_column_type - -- Description: The column type should be specific type. -- Assert: - - type: one of `string`, `integer`, `numeric`, `datetime`, `boolean`, `other` - -
- YAML Example - -``` -your_table_name: - columns: - your_column_name: - tests: - - name: assert_column_type - assert: - type: numeric -``` -
- -## assert_column_schema_type - -- Description: The column schema type should be specific schema type. -- Assert: - - schema_type: the schema type in data source. (e.g. `TEXT`, `DATE`, `VARCHAR(128)`, ...) - -
- YAML Example - -``` -your_table_name: - columns: - your_column_name: - tests: - - name: assert_column_schema_type - assert: - schema_type: TEXT -``` -
- -## assert_column_in_types - -- Description: The column type should be one of the type in the list. -- Assert: - - types: [...], list of `string`, `integer`, `numeric`, `datetime`, `boolean`, `other` - -
- YAML Example - -``` -your_table_name: - columns: - your_column_name: - tests: - - name: assert_column_in_types - assert: - types: [string, datetime] -``` -
- - -# Metric-based Assertion - - -- Description: Metric-based assertions are assert the value of a metric. -- Assert: - - `gte`: the value should be greater than or equal to - - `gt`: the value should be greater than - - `lte`: the value should be less than or equal to - - `lt`: the value should be less than - - `eq`: the value should equal to - - `ne`: the value should not equal to - -
- YAML Example: The row count should be <= 1000000 - -``` -world_city: - tests: - - metric: row_count - assert: - lte: 1000000 -``` -
-
- YAML Example: The missing percentage should be <= 0.01 - -``` -world_city: - columns: - country_code: - tests: - - metrics: nulls_p - assert: - lte: 0.01 -``` -
-
- YAML Example: The median should be between [10, 20] - -``` -world_city: - columns: - country_code: - tests: - - metrics: p50 - assert: - gte: 10 - lte: 20 -``` -
- - -For all available metrics, please see the [metric document](metrics.md) diff --git a/docs/assertions-custom.md b/docs/assertions-custom.md deleted file mode 100644 index 3d086b710..000000000 --- a/docs/assertions-custom.md +++ /dev/null @@ -1,212 +0,0 @@ -# User defined test function - -In addition to builtins assertion function, PipeRider also supports adding customized assertion function. User defined -test functions are put at a regular Python Module. - -## How does it work? - -There is a customized assertion in our assertion configuration `customized_assertions.assert_nothing_table_example`. - -```yaml - -data: # Table Name - # Test Cases for Table - tests: - - name: assert_nothing_table_example - assert: - param1: value1 -``` - -That means - -1. You have put a python module `customized_assertions.py` into PipeRider plugin search path. -2. There must be an assertion class named `assert_nothing_table_example` in the `customized_assertions.py` - -### Plugin Search Path - -PipeRider will find plugins in this order: - -* ./piperider/plugins -* environment variable `PIPERIDER_PLUGINS` - -For example, the user defined test functions could be at: - -* the default search path: `./piperider/plugins/customized_assertions.py` -* somewhere of `PIPERIDER_PLUGINS` - -```bash -export PIPERIDER_PLUGINS=/foo/bar -/foo/bar/customized_assertions.py -``` - -## Implementation customized assertion - -After you `init` the PipeRider, you can find two examples at the `./piperider/plugins/customized_assertions.py`. - -* assert_nothing_table_example -* assert_nothing_column_example - -The customized assertion must implementation `BaseAssertionType` like this: - -```python -from piperider_cli.assertion_engine.assertion import AssertionContext, AssertionResult, ValidationResult -from piperider_cli.assertion_engine.types import BaseAssertionType, register_assertion_function - - -class AssertNothingTableExample(BaseAssertionType): - def name(self): - return 'assert_nothing_table_example' - - def execute(self, context: AssertionContext) -> AssertionResult: - table = context.table - column = context.column - metrics = context.profiler_result - - table_metrics = metrics.get('tables', {}).get(table) - if table_metrics is None: - # cannot find the table in the metrics - return context.result.fail() - - # 1. Get the metric for the current table - # We support two metrics for table level metrics: ['row_count', 'col_count'] - row_count = table_metrics.get('row_count') - # col_count = table_metrics.get('col_count') - - # 2. Get expectation from assert input - expected = context.asserts.get('something', []) - - # 3. Implement your logic to check requirement between expectation and actual value in the metrics - - # 4. send result - - # 4.1 mark it as failed result - # return context.result.fail('what I saw in the metric') - - # 4.2 mark it as success result - # return context.result.success('what I saw in the metric') - - return context.result.success('what I saw in the metric') - - def validate(self, context: AssertionContext) -> ValidationResult: - result = ValidationResult(context) - result.errors.append('explain to users why this broken') - return result - - -# register new assertions -register_assertion_function(AssertNothingTableExample) -``` - -* `name` method provides the function name that will be used in the assertion yaml. -* `execute` method is your implementation to give the result of the assertion function. -* `validate` method could help you check `assert` parameters before PipeRider is profiling. - -#### Execute details - -When the assertion has been called, PipeRider will put assertion context to `execute` method for you: - -* `context`: helper object for assembling result entry to the report. -* `context.table`: the table name you are asserting. -* `context.column`: the column name you are checking, but it could be `null` when the assertion is put on the table level assertion. -* `context.profiler_result`: the profiler output could be asserted. - -### Assertion process - -You might follow this steps to implement the assertion process: - -1. Get metric by table and column (The fact you see from the metric) -2. Get assert settings from context (The expectation written in the assertion yaml) -3. Check metric and assert and decide the result `pass` or `failed` - -We will explain each steps with concrete examples in the `customized_assertions.py` - -#### Get metric - -`metrics` is a python dict object, we could look up it with `table` and `column`. - -Table level metric: - -```python -table_metrics = metrics.get('tables', {}).get(table) -if table_metrics is None: - # cannot find the table in the metrics - return context.result.fail() -``` - -Column level metric: - -```python -column_metrics = metrics.get('tables', {}).get(table, {}).get('columns', {}).get(column) -if column_metrics is None: - # cannot find the column in the metrics - return context.result.fail() -``` - -#### Get assert - -`assert` is the expectation put on the configuration file, like this: - -```yaml -tests: - - name: customized_assertions.assert_nothing_table_example - assert: - param1: value1 -``` - -In the function, you can get the value of `param1` in this way: - -```python -expected = context.asserts.get('param1', '') -``` - -#### Return result - -You can conclude a result to the report. Before that, you need an actual value in the result. Don't forget to set up it: - -```python -context.result.actual = 'something I saw from the metrics' -``` - -Finally, you could return the result: - -```python -return context.result.success() -``` - -```python -return context.result.fail() -``` - -## Profiling Data - -You might want to know what kinds of data you have in the `metric` dict? There is a `run.json` in the output path -of the `run` command. Open it and check the content: - -```bash -ls -alh $(MY_PROJECT)/.piperider/outputs/$(DATASOURCE)-$(TIMESTAMP) -# total 48 -# drwxr-xr-x 4 piperider staff 128B 6 1 09:35 . -# drwxr-xr-x 26 piperider staff 832B 6 1 09:35 .. -# -rw-r--r-- 1 piperider staff 6.2K 6 1 09:35 run.json -``` - -#### Validation - -Validation is used when `diagnose` or before `profiling` to verify the `assert` are well-defined in the assertion file. - -If everything was fine, the `validate` could return an ValidationResult without errors: - -```python - def validate(self, context: AssertionContext) -> ValidationResult: - result = ValidationResult(context) - return result -``` - -If you have found something wrong, tried to add the reason to the result object: - -```python - def validate(self, context: AssertionContext) -> ValidationResult: - result = ValidationResult(context) - result.errors.append('explain to users why this broken') - return result -``` diff --git a/docs/assertions.md b/docs/assertions.md deleted file mode 100644 index e29aa4f3f..000000000 --- a/docs/assertions.md +++ /dev/null @@ -1,33 +0,0 @@ -# Assertion YAML - -PipeRider will parse all yaml files at `.piperider/assertions`. Grouping assertions by files is a way to organize your assertions. Using piperider diagnose to verify the format of assertion yaml files. -Whether recommended assertions or assertion templates, the format looks like the example below. You can use [built-in assertion](assertions-builtin.md) or [custom assertions](assertions-custom.md) against tables or columns depending on the assertions. - -``` -Table_name_1: - # Test Cases for Table - tests: - - metric: row_count - assert: - lte: 1000000 - columns: - column_name: - tests: - - name: assert_column_not_null - - name: assert_column_unique - - name: assert_column_value - assert: - gte: 0 - lte: 100 - - metric: avg - assert: - gte: 45 - le: 55 - -Table_name_2: - columns: - column_id: - tests: - - name: assert_column_not_null - - name: assert_column_unique -``` \ No newline at end of file diff --git a/docs/assertions/assert_column_exist.md b/docs/assertions/assert_column_exist.md deleted file mode 100644 index 4cc402244..000000000 --- a/docs/assertions/assert_column_exist.md +++ /dev/null @@ -1,16 +0,0 @@ -# assert_column_exist - -- Description: The column should exist. -- Assert: None -- Tags: - -### YAML -``` -your_table_name: - columns: - your_coluumn_name: - tests: - - name: assert_column_exist - tags: - - OPTIONAL -``` diff --git a/docs/assertions/assert_column_in_range.md b/docs/assertions/assert_column_in_range.md deleted file mode 100644 index fea13d02a..000000000 --- a/docs/assertions/assert_column_in_range.md +++ /dev/null @@ -1,19 +0,0 @@ -# assert_column_in_range - -- Description: The minimum and maximum values of a column should be between min_value and max_value. -- Assert: - - range: [min_value, max_value] -- Tags: - -### YAML -``` -your_table_name: - columns: - your_coluumn_name: - tests: - - name: assert_column_in_range - assert: - range: [10, 20] - tags: - - OPTIONAL -``` diff --git a/docs/assertions/assert_column_in_types.md b/docs/assertions/assert_column_in_types.md deleted file mode 100644 index 1d41bf53a..000000000 --- a/docs/assertions/assert_column_in_types.md +++ /dev/null @@ -1,19 +0,0 @@ -# assert_column_in_types - -- Description: The column type should be one of the type in the list. -- Assert: - - types: [`numeric`, `string`, or `datetime`] -- Tags: - -### YAML -``` -your_table_name: - columns: - your_coluumn_name: - tests: - - name: assert_column_in_types - assert: - types: [string, datetime] - tags: - - OPTIONAL -``` diff --git a/docs/assertions/assert_column_max_in_range.md b/docs/assertions/assert_column_max_in_range.md deleted file mode 100644 index e5a32e45e..000000000 --- a/docs/assertions/assert_column_max_in_range.md +++ /dev/null @@ -1,19 +0,0 @@ -# assert_column_max_in_range - -- Description: The maximum value of column should be between min_value and max_value. -- Assert: - - max: [min_value, max_value] -- Tags: - -### YAML -``` -your_table_name: - columns: - your_coluumn_name: - tests: - - name: assert_column_max_in_range - assert: - max: [10, 20] - tags: - - OPTIONAL -``` diff --git a/docs/assertions/assert_column_min_in_range.md b/docs/assertions/assert_column_min_in_range.md deleted file mode 100644 index a9eb0d6a9..000000000 --- a/docs/assertions/assert_column_min_in_range.md +++ /dev/null @@ -1,19 +0,0 @@ -# assert_column_min_in_range - -- Description: The minimum value of column should be between min_value and max_value. -- Assert: - - min: [min_value, max_value] -- Tags: - -### YAML -``` -your_table_name: - columns: - your_column_name: - tests: - - name: assert_column_min_in_range - assert: - min: [10, 20] - tags: - - OPTIONAL -``` diff --git a/docs/assertions/assert_column_not_null.md b/docs/assertions/assert_column_not_null.md deleted file mode 100644 index acf13a4f8..000000000 --- a/docs/assertions/assert_column_not_null.md +++ /dev/null @@ -1,16 +0,0 @@ -# assert_column_not_null - -- Description: The column values should not be null. -- Assert: None -- Tags: - -### YAML -``` -your_table_name: - columns: - your_coluumn_name: - tests: - - name: assert_column_not_null - tags: - - OPTIONAL -``` diff --git a/docs/assertions/assert_column_null.md b/docs/assertions/assert_column_null.md deleted file mode 100644 index 618257f67..000000000 --- a/docs/assertions/assert_column_null.md +++ /dev/null @@ -1,16 +0,0 @@ -# assert_column_null - -- Description: The column values should be null. -- Assert: None -- Tags: - -### YAML -``` -your_table_name: - columns: - your_coluumn_name: - tests: - - name: assert_column_null - tags: - - OPTIONAL -``` diff --git a/docs/assertions/assert_column_schema_type.md b/docs/assertions/assert_column_schema_type.md deleted file mode 100644 index 2884602ec..000000000 --- a/docs/assertions/assert_column_schema_type.md +++ /dev/null @@ -1,20 +0,0 @@ -# assert_column_schema_type - -- Description: The column schema type should be specific schema type. -- Assert: - - schema_type: `TEXT`, `DATE`, `VARCHAR(128)`, or etc... -- Tags: - -### YAML - -``` -your_table_name: - columns: - your_coluumn_name: - tests: - - name: assert_column_schema_type - assert: - schema_type: TEXT - tags: - - OPTIONAL -``` diff --git a/docs/assertions/assert_column_type.md b/docs/assertions/assert_column_type.md deleted file mode 100644 index fe9ee9b93..000000000 --- a/docs/assertions/assert_column_type.md +++ /dev/null @@ -1,19 +0,0 @@ -# assert_column_type - -- Description: The column type should be specific type. -- Assert: - - type: `numeric`, `string`, or `datetime` -- Tags: - -### YAML -``` -your_table_name: - columns: - your_coluumn_name: - tests: - - name: assert_column_type - assert: - type: numeric - tags: - - OPTIONAL -``` diff --git a/docs/assertions/assert_column_unique.md b/docs/assertions/assert_column_unique.md deleted file mode 100644 index 8a2c2f20b..000000000 --- a/docs/assertions/assert_column_unique.md +++ /dev/null @@ -1,16 +0,0 @@ -# assert_column_unique - -- Description: The column values should be unique. -- Assert: None -- Tags: - -### YAML -``` -your_table_name: - columns: - your_coluumn_name: - tests: - - name: assert_column_unique - tags: - - OPTIONAL -``` diff --git a/docs/assertions/assert_row_count.md b/docs/assertions/assert_row_count.md deleted file mode 100644 index 43a13cdf3..000000000 --- a/docs/assertions/assert_row_count.md +++ /dev/null @@ -1,35 +0,0 @@ -# assert_row_count - -- Description: The row count should be >= min or <= max. If min is not specified, it is assumed to be 0. If max is not - specified, it is assumed to be infinity. If min and max are both specified, min must be less than or equal to max. -- Assert: - - min: min_count - - max: max_count -- Tags: - -### YAML - -Provide the minimum and maximum row count in the following format - -``` -your_table_name: - tests: - - name: assert_row_count - assert: - min: 10 - max: 20 - tags: - - OPTIONAL -``` - -Or only provide the minimum row count in the following format - -``` -your_table_name: - tests: - - name: assert_row_count - assert: - min: 100 - tags: - - OPTIONAL -``` diff --git a/docs/assertions/assert_row_count_in_range.md b/docs/assertions/assert_row_count_in_range.md deleted file mode 100644 index a616d2a55..000000000 --- a/docs/assertions/assert_row_count_in_range.md +++ /dev/null @@ -1,17 +0,0 @@ -# assert_row_count_in_range - -- Description: The row count should be between min_count and max_count. -- Assert: - - count: [min_count, max_count] -- Tags: - -### YAML -``` -your_table_name: - tests: - - name: assert_row_count_in_range - assert: - count: [10, 20] - tags: - - OPTIONAL -``` diff --git a/docs/project.md b/docs/project.md index a52e17034..ab95ba3d4 100644 --- a/docs/project.md +++ b/docs/project.md @@ -7,9 +7,7 @@ The `.piperider/` folder may contains these files Name | Description ------------|--------------- config.yml | The project config file. Generated by `piperider init`. Contains the data source and dbt integration settings. -credentials.yml | The data source connection parameters and credentials to connect to data sources. This is supposed to not be source controlled. -assertions/ | The folder to define assertions. Please see [assertions](./assertions.md) -plugins/ | The folder to define custom defined test function. Please see [user defined test function](./user-defined-test-function.md) +credentials.yml | The data source connection parameters and credentials to connect to data sources. This is supposed to not be source controlled. outputs/ | The piperider run raw result generated by `piperider run` reports/ | The piperider report generated by `piperider generate-report` comparisons/ | The piperider report generated by `piperider compare-report` diff --git a/docs/user-defined-test-function.md b/docs/user-defined-test-function.md deleted file mode 100644 index 7c4546272..000000000 --- a/docs/user-defined-test-function.md +++ /dev/null @@ -1,209 +0,0 @@ -# User defined test function - -In addition to builtins assertion function, PipeRider also supports adding customized assertion function. User defined -test functions are put at a regular Python Module. - -## How does it work? - -There is a customized assertion in our assertion configuration `customized_assertions.assert_nothing_table_example`. - -```yaml - -data: # Table Name - # Test Cases for Table - tests: - - name: assert_nothing_table_example - assert: - param1: value1 -``` - -That means - -1. You have put a python module `customized_assertions.py` into PipeRider plugin search path. -2. There must be an assertion class named `assert_nothing_table_example` in the `customized_assertions.py` - -### Plugin Search Path - -PipeRider will find plugins in this order: - -* ./piperider/plugins -* environment variable `PIPERIDER_PLUGINS` - -For example, the user defined test functions could be at: - -* the default search path: `./piperider/plugins/customized_assertions.py` -* somewhere of `PIPERIDER_PLUGINS` - -```bash -export PIPERIDER_PLUGINS=/foo/bar -/foo/bar/customized_assertions.py -``` - -## Implementation customized assertion - -After you `init` the PipeRider, you can find two examples at the `./piperider/plugins/customized_assertions.py`. - -* assert_nothing_table_example -* assert_nothing_column_example - -The customized assertion must implementation `BaseAssertionType` like this: - -```python -from piperider_cli.assertion_engine.assertion import AssertionContext, AssertionResult, ValidationResult -from piperider_cli.assertion_engine.types import BaseAssertionType, register_assertion_function - - -class AssertNothingTableExample(BaseAssertionType): - def name(self): - return 'assert_nothing_table_example' - - def execute(self, context: AssertionContext, table: str, column: str, metrics: dict) -> AssertionResult: - table_metrics = metrics.get('tables', {}).get(table) - if table_metrics is None: - # cannot find the table in the metrics - return context.result.fail() - - # 1. Get the metric for the current table - # We support two metrics for table level metrics: ['row_count', 'col_count'] - row_count = table_metrics.get('row_count') - # col_count = table_metrics.get('col_count') - - # 2. Get expectation from assert input - expected = context.asserts.get('something', []) - - # 3. Implement your logic to check requirement between expectation and actual value in the metrics - - # 4. send result - - # 4.1 mark it as failed result - # return context.result.fail('what I saw in the metric') - - # 4.2 mark it as success result - # return context.result.success('what I saw in the metric') - - return context.result.success('what I saw in the metric') - - def validate(self, context: AssertionContext) -> ValidationResult: - result = ValidationResult(context) - result.errors.append('explain to users why this broken') - return result - - -# register new assertions -register_assertion_function(AssertNothingTableExample) -``` - -* name method provides the function name that will be used in the assertion yaml. -* execute method is your implementation to give the result of the assertion function. -* validate method could help you check `assert` parameters before PipeRider is profiling. - -#### execute details - -When the assertion has been called, PipeRider will put all arguments to `execute` method for you: - -* context: helper object for assembling result entry to the report. -* table: the table name you are asserting. -* column: the column name you are checking, but it could be null when the assertion is put on the table level assertion. -* metrics: the profiling output could be asserted. - -### Assertion process - -You might follow this steps to implement the assertion process: - -1. Get metric by table and column (The fact you see from the metric) -2. Get assert settings from context (The expectation written in the assertion yaml) -3. Check metric and assert and decide the result `pass` or `failed` - -We will explain each steps with concrete examples in the `customized_assertions.py` - -#### Get metric - -`metrics` is a python dict object, we could look up it with `table` and `column`. - -Table level metric: - -```python -table_metrics = metrics.get('tables', {}).get(table) -if table_metrics is None: - # cannot find the table in the metrics - return context.result.fail() -``` - -Column level metric: - -```python -column_metrics = metrics.get('tables', {}).get(table, {}).get('columns', {}).get(column) -if column_metrics is None: - # cannot find the column in the metrics - return context.result.fail() -``` - -#### Get assert - -`assert` is the expectation put on the configuration file, like this: - -```yaml -tests: - - name: customized_assertions.assert_nothing_table_example - assert: - param1: value1 -``` - -In the function, you can get the value of `param1` in this way: - -```python -expected = context.asserts.get('param1', '') -``` - -#### Return result - -You can conclude a result to the report. Before that, you need an actual value in the result. Don't forget to set up it: - -```python -context.result.actual = 'something I saw from the metrics' -``` - -Finally, you could return the result: - -```python -return context.result.success() -``` - -```python -return context.result.fail() -``` - -## Profiling Data - -You might want to know what kinds of data you have in the `metric` dict? There is a `.profiler.json` in the output path -of the `run` command. Open it and check the content: - -```bash -ls -alh $(MY_PROJECT)/.piperider/outputs/$(DATASOURCE)-$(TIMESTAMP) -# total 48 -# drwxr-xr-x 4 piperider staff 128B 6 1 09:35 . -# drwxr-xr-x 26 piperider staff 832B 6 1 09:35 .. -# -rw-r--r-- 1 piperider staff 6.2K 6 1 09:35 .profiler.json -# -rw-r--r-- 1 piperider staff 15K 6 1 09:35 data.json -``` - -#### Validation - -Validation is used when `diagnose` or before `profiling` to verify the `assert` are well-defined in the assertion file. - -If everything was fine, the `validate` could return an ValidationResult without errors - -```python - def validate(self, context: AssertionContext) -> ValidationResult: - result = ValidationResult(context) - return result -``` - -If you have found something wrong, tried to add the reason to the result object: - -```python - def validate(self, context: AssertionContext) -> ValidationResult: - result = ValidationResult(context) - result.errors.append('explain to users why this broken') - return result -``` diff --git a/piperider_cli/assertion_engine/__init__.py b/piperider_cli/assertion_engine/__init__.py deleted file mode 100644 index 92027ed14..000000000 --- a/piperider_cli/assertion_engine/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .assertion import AssertionEngine, AssertionContext, AssertionResult, ValidationResult diff --git a/piperider_cli/assertion_engine/assertion.py b/piperider_cli/assertion_engine/assertion.py deleted file mode 100644 index 01179502a..000000000 --- a/piperider_cli/assertion_engine/assertion.py +++ /dev/null @@ -1,707 +0,0 @@ -import json -import os -import sys -from datetime import datetime, date, time -from importlib import import_module -from typing import List, Dict - -from deepmerge import always_merger -from sqlalchemy import inspect -from sqlalchemy.engine import Engine - -from piperider_cli import yaml as pyml -from piperider_cli.yaml import safe_load_yaml, round_trip_load_yaml -from piperider_cli.configuration import FileSystem -from piperider_cli.error import \ - AssertionError, \ - IllegalStateAssertionError -from .event import AssertionEventHandler, DefaultAssertionEventHandler -from .recommender import AssertionRecommender -from .recommender import RECOMMENDED_ASSERTION_TAG - - -def load_yaml_configs(path, config_path): - passed: List[str] = [] - failed: List[str] = [] - content: Dict = {} - - def _get_content_by_key(source: Dict, key: str, default_value): - result = {} - for t in source: - result[t] = {} - result[t][key] = source[t].get(key, default_value) - result[t]['columns'] = {} - for c in source[t].get('columns', []): - result[t]['columns'][c] = {} - result[t]['columns'][c][key] = source[t]['columns'][c].get(key, default_value) - - return result - - for root, dirs, files in os.walk(path): - for file in files: - if file.endswith('.yml') or file.endswith('.yaml'): - file_path = os.path.join(root, file) - payload = safe_load_yaml(file_path) - if not payload: - failed.append(file_path) - else: - passed.append(file_path) - assertions = _get_content_by_key(payload, 'tests', []) - always_merger.merge(content, assertions) - - if not config_path: - return passed, failed, content - - project_config = safe_load_yaml(config_path) - if project_config: - payload = project_config.get('tables', {}) - descriptions = _get_content_by_key(payload, 'description', '') - always_merger.merge(content, descriptions) - - return passed, failed, content - - -class ValidationResult: - - def __init__(self, context): - self.context = context - self.errors = [] - - def has_errors(self): - return self.errors != [] - - def require(self, name: str, specific_type=None): - configuration: dict = self.context.asserts - if name not in configuration: - self.errors.append(f'{name} parameter is required') - if specific_type is not None: - if not isinstance(configuration.get(name), specific_type): - self.errors.append(f'{name} parameter should be a {specific_type} value') - return self - - def require_metric_consistency(self, *names): - configuration: dict = self.context.asserts - - if configuration is None: - return self - - metric_type = set() - err_msg = 'parameter should be a numeric value or a datetime string in ISO 8601 format' - for name in names: - v = configuration.get(name) - if v is None: - continue - if isinstance(v, int) or isinstance(v, float): - metric_type.add('numeric') - elif isinstance(v, str): - metric_type.add('datetime') - try: - datetime.fromisoformat(v) - except ValueError: - self.errors.append(f'\'{name}\' {err_msg}') - else: - self.errors.append(f'\'{name}\' {err_msg}') - - if len(metric_type) > 1: - self.errors.append(f'parameter type should be consistent, found {metric_type}') - - return self - - def _require_numeric_pair(self, name, valid_types: set): - configuration: dict = self.context.asserts - values = configuration.get(name) - if not isinstance(values, list): - self.errors.append(f'{name} parameter should be a list') - return self - - if len(values) != 2: - self.errors.append(f'{name} parameter should contain two values') - return self - - if not set([type(x) for x in values]).issubset(valid_types): - self.errors.append(f'{name} parameter should be one of the types {valid_types}, input: {values}') - return self - - return self - - def require_int_pair(self, name): - return self._require_numeric_pair(name, {int}) - - def require_range_pair(self, name): - return self._require_numeric_pair(name, {int, float, datetime, date, time}) - - def require_same_types(self, name): - values = self.context.asserts.get(name) - - base_type = type(values[0]) - for v in values: - if not isinstance(v, base_type): - self.errors.append(f'{name} parameter should be the same types') - return self - - return self - - def require_one_of_parameters(self, names: list): - found = False - columns = self.context.asserts.keys() - for c in names: - if c in columns: - found = True - break - if not found: - self.errors.append(f'There should contain any parameter names in {names}') - return self - return self - - def allow_only(self, *names): - if self.context.asserts is None: - return self - - for column in self.context.asserts.keys(): - if column not in names: - self.errors.append(f'\'{column}\' is not allowed, only allow {names}') - return self - - def int_if_present(self, name: str): - if name not in self.context.asserts: - return self - - if not isinstance(self.context.asserts.get(name), int): - self.errors.append(f'{name} parameter should be a int value') - return self - - return self - - def keep_no_args(self): - if self.context.asserts is None: - return self - - if self.context.asserts.keys(): - self.errors.append('parameters are not allowed') - return self - return self - - def as_internal_report(self): - def to_str(x: str): - return f'ERROR: {x}' - - header = 'Found assertion syntax problem =>' - if self.context.column is None: - where = f"{header} name: {self.context.name} for table {self.context.table}" - else: - where = f"{header} name: {self.context.name} for table {self.context.table} and column {self.context.column}" - - return "\n".join([where] + [to_str(x) for x in self.errors]) - - def as_user_report(self): - def to_str(x): - return ' ' + x - - msg = f"name: '[bold]{self.context.name}[/bold]'" if self.context.name else f"metric: '[bold]{self.context.metric}[/bold]'" - - where = f'{msg} for [bold yellow]{self.context.table}[/bold yellow]' - if self.context.column is not None: - where = f'{where}.[bold blue]{self.context.column}[/bold blue]' - - return '\n'.join([where] + [to_str(x) for x in self.errors]) - - -class AssertionResult: - - def __init__(self): - self.name: str = None - self._success: bool = False - self._exception: Exception = None - self.actual: dict = None - self._expected: dict = None - - def status(self): - return self._success - - @property - def expected(self): - return self._expected - - @expected.setter - def expected(self, value): - self._expected = value - - @property - def exception(self): - return self._exception - - def validate(self): - if self._exception: - return self - - return self - - def success(self, actual=None): - if actual is not None: - self.actual = actual - - self._success = True - return self - - def fail(self, actual=None): - if actual is not None: - self.actual = actual - - self._success = False - return self - - def fail_with_exception(self, exception): - self._success = False - self._exception = exception - return self - - def fail_with_assertion_error(self, message): - self._success = False - self._exception = AssertionError(message) - return self - - def fail_with_metric_not_found_error(self, table, column): - self._success = False - if not column: - self._exception = AssertionError( - f"Table '{table}' metric not found.") - else: - self._exception = AssertionError( - f"Column '{column}' metric not found.") - return self - - def fail_with_profile_metric_not_found_error(self, table, column, metric): - self._success = False - if not column: - self._exception = AssertionError( - f"Metric '{metric}' is not found in Table '{table}' profiling result.") - else: - self._exception = AssertionError( - f"Metric '{metric}' is not found in Column '{table}-{column}' profiling result.") - return self - - def fail_with_no_assert_is_required(self): - self._success = False - self._exception = AssertionError("No assert is required.") - return self - - def fail_with_assertion_implementation_error(self): - self._success = False - self._exception = IllegalStateAssertionError( - "Assertion Function should fill 'actual' and 'expected' fields.") - return self - - def get_internal_error(self): - if isinstance(self._exception, AssertionError): - return self._exception - - def __repr__(self): - return str(dict(success=self._success, - exception=str(self._exception), - actual=self.actual, - expected=self.expected)) - - def _metric_assertion_to_string(self): - if len(self._expected.keys()) == 2: - operators = { - 'lte': ']', - 'lt': ')', - 'gte': '[', - 'gt': '(' - } - # TODO: optimization needed - boundary = [] - for k, v in self._expected.items(): - if k.startswith('lt'): - boundary.append(f'{v}{operators[k]}') - else: - boundary.insert(0, f'{operators[k]}{v}') - - return ', '.join(boundary) - else: - operators = { - 'gt': '>', - 'gte': '≥', - 'eq': '=', - 'ne': '≠', - 'lt': '<', - 'lte': '≤' - } - k, v = list(self._expected.items())[0] - return f'{operators[k]} {v}' - - -class AssertionContext: - def __init__(self, table_name: str, column_name: str, payload: dict, profiler_result=None, engine=None): - self.name: str = payload.get('name') - self.metric: str = payload.get('metric') - self.table: str = table_name - self.column: str = column_name - self.asserts: dict = {} - self.tags: list = [] - self.is_builtin = False - self.profiler_result = profiler_result - self.engine = engine - - # result - self.result: AssertionResult = AssertionResult() - self.result.name = self.name if self.name is not None else self.metric - - self._load(payload) - - def _load(self, payload): - self.asserts = payload.get('assert', {}) - self.tags = payload.get('tags', []) - self.result.expected = payload.get('assert') - - def __repr__(self): - return str(self.__dict__) - - def _get_assertion_id(self): - assertion_id = f'piperider.{self.table}' - if self.column: - assertion_id = f'{assertion_id}.{self.column}' - - if self.name: - assertion_id = f'{assertion_id}.{self.name}' - elif self.metric: - assertion_id = f'{assertion_id}.{self.metric}' - else: - assert False - - return assertion_id - - def _get_assertion_message(self): - if self.result.expected is not None and self.result.actual is not None: - if not self.result._success: - return f'expected {self.result.expected} but got {self.result.actual}' - return None - - def to_result_entry(self): - entry = dict(id=self._get_assertion_id()) - if self.metric: - entry['metric'] = self.metric - else: - entry['name'] = self.name - entry.update( - dict( - table=self.table, - column=self.column, - status='passed' if self.result._success is True else 'failed', - expected=self.result.expected, - actual=self.result.actual, - tags=self.tags, - message=self._get_assertion_message(), - display_name=self.result.name, - source='piperider' - ) - ) - - return entry - - -class AssertionEngine: - """ - This class is used to evaluate the assertion. - """ - PIPERIDER_ASSERTION_SUPPORT_METRICS = ['distribution', 'range', 'missing_value'] - - def __init__(self, engine: Engine, assertion_search_path=None, - event_handler: AssertionEventHandler = DefaultAssertionEventHandler()): - self.engine = engine - self.assertion_search_path = assertion_search_path or FileSystem.PIPERIDER_ASSERTION_SEARCH_PATH - self.assertions_content: Dict = {} - self.assertions: List[AssertionContext] = [] - self.recommender: AssertionRecommender = AssertionRecommender() - self.event_handler: AssertionEventHandler = event_handler - - self.default_plugins_dir = FileSystem.PIPERIDER_ASSERTION_PLUGIN_PATH - if not os.path.isdir(self.default_plugins_dir): - self.default_plugins_dir = None - - @staticmethod - def check_assertions_syntax(assertion_search_path=None): - """ - This method is used to check the syntax of the assertion. - :param assertion_search_path: - :return: - """ - assertion_search_path = assertion_search_path or FileSystem.PIPERIDER_ASSERTION_SEARCH_PATH - return load_yaml_configs(assertion_search_path) - - def load_assertions(self, profiler_result=None, config_path=None): - """ - This method is used to load assertions from the specific path. - :param assertion_search_path: - :return: - """ - """ - Example: - {'nodes': {'tests': [], 'columns': {'uid': {'tests': []}, 'type': {'tests': []}, 'name': {'tests': []}, 'created_at': {'tests': []}, 'updated_at': {'tests': []}, 'metadata': {'tests': []}}}, 'edges': {'tests': [], 'columns': {'n1_uid': {'tests': []}, 'n2_uid': {'tests': []}, 'created_at': {'tests': []}, 'updated_at': {'tests': []}, 'type': {'tests': []}, 'metadata': {'tests': []}}}} - """ - config_path = config_path or FileSystem.PIPERIDER_CONFIG_PATH - - self.assertions = [] - self.load_assertion_content(config_path) - - # Load assertion context - if profiler_result: - selected_tables = profiler_result.get('tables').keys() - else: - selected_tables = inspect(self.engine).get_table_names() - for t in self.assertions_content: - # only append specified table's assertions - if t in selected_tables: - if self.assertions_content[t] is None: - continue - table_assertions = self.assertions_content[t].get('tests') \ - if self.assertions_content[t].get('tests') else [] - for ta in table_assertions: - self.assertions.append(AssertionContext(t, None, ta, profiler_result, self.engine)) - - columns_content = self.assertions_content[t].get('columns') \ - if self.assertions_content[t].get('columns') else {} - for c in columns_content: - if columns_content[c] is None: - continue - column_assertions = columns_content[c].get('tests') if columns_content[c].get('tests') else [] - for ca in column_assertions: - self.assertions.append(AssertionContext(t, c, ca, profiler_result, self.engine)) - - def load_all_assertions_for_validation(self) -> (List[str], List[str]): - passed_assertion_files, failed_assertion_files = self.load_assertion_content() - - self.assertions = [] - - for t in self.assertions_content: - if self.assertions_content[t] is None: - continue - table_assertions = self.assertions_content[t].get('tests') \ - if self.assertions_content[t].get('tests') else [] - for ta in table_assertions: - self.assertions.append(AssertionContext(t, None, ta)) - - columns_content = self.assertions_content[t].get('columns') \ - if self.assertions_content[t].get('columns') else {} - for c in columns_content: - if columns_content[c] is None: - continue - column_assertions = columns_content[c].get('tests') if columns_content[c].get('tests') else [] - for ca in column_assertions: - self.assertions.append(AssertionContext(t, c, ca)) - - return passed_assertion_files, failed_assertion_files - - def load_assertion_content(self, config_path=None): - config_path = config_path or FileSystem.PIPERIDER_CONFIG_PATH - passed_assertion_files, failed_assertion_files, self.assertions_content = load_yaml_configs( - self.assertion_search_path, config_path) - - return passed_assertion_files, failed_assertion_files - - def generate_template_assertions(self, profiling_result): - self.recommender.prepare_assertion_template(profiling_result) - template_assertions = self.recommender.assertions - return self._dump_assertions_files(template_assertions) - - def generate_recommended_assertions(self, profiling_result): - # Load existing assertions - if not self.assertions_content: - self.load_assertions(profiler_result=profiling_result) - - # Generate recommended assertions based on the profiling result - self.recommender.prepare_assertion_template(profiling_result) - self.recommender.run(profiling_result) - - recommended_assertions = self.recommender.assertions - - # Dump recommended assertions - return self._dump_assertions_files(recommended_assertions, prefix='recommended') - - def _is_recommended_assertion(self, assertion): - if RECOMMENDED_ASSERTION_TAG in assertion.get('tags', []): - return True - return False - - def _update_existing_recommended_assertions(self, recommended_assertions): - comment_remove_entire_assertions = 'TODO: Suggest to remove following assertions (no table/column found)' - comment_remove_assertion = 'TODO: Suggest to remove this assertion (no recommended found)' - comment_update_assertion = 'TODO: {recommended_assertion_value} (new recommended assert)' - - def merge_assertions(target: str, existed_items: List, new_generating_items: List): - if new_generating_items.get(target) is None: - # Column or table doesn't exist in the existing assertions - new_generating_items[target] = pyml.CommentedMap(existed_items[target]) - is_generated_by_us = False - for assertion in new_generating_items[target].get('tests', []): - is_generated_by_us = self._is_recommended_assertion(assertion) - if is_generated_by_us: - break - if is_generated_by_us: - new_generating_items.yaml_add_eol_comment(comment_remove_entire_assertions, target) - else: - # Merge with existed - existed_desc = existed_items[target].get('description') - if existed_desc and isinstance(existed_desc, str): - new_generating_items[target]['description'] = existed_desc - for existed_assertion in existed_items[target]['tests']: - is_new_assertion_found = False - for new_assertion in new_generating_items[target]['tests']: - if new_assertion['name'] == existed_assertion['name']: - is_new_assertion_found = True - if existed_assertion.get('assert') is None: - continue - if dict(new_assertion['assert']) != existed_assertion['assert']: - # Update new generating assertion with new assert in comment - recommended_assertion_value = json.dumps(new_assertion['assert']).replace('\"', '') - new_assertion.yaml_add_eol_comment( - comment_update_assertion.format( - recommended_assertion_value=recommended_assertion_value), - 'assert') - new_assertion['assert'] = existed_assertion['assert'] - if is_new_assertion_found is False: - new_generating_items[target]['tests'].append(existed_assertion) - if self._is_recommended_assertion(existed_assertion): - new_generating_items[target]['tests'][-1].yaml_add_eol_comment(comment_remove_assertion, - 'name') - return new_generating_items - - for name, recommended_assertion in recommended_assertions.items(): - existing_assertion_path = os.path.join(self.assertion_search_path, self._recommend_assertion_filename(name)) - if os.path.exists(existing_assertion_path): - # Use round trip loader to load existing assertions with comments - existing_assertion = round_trip_load_yaml(existing_assertion_path) - if existing_assertion: - # Table assertions - recommended_assertion = merge_assertions(name, existing_assertion, recommended_assertion) - - # Column assertions - for ca in existing_assertion[name]['columns']: - recommended_assertion[name]['columns'] = merge_assertions(ca, - existing_assertion[name]['columns'], - recommended_assertion[name][ - 'columns']) - pass - - def _recommend_assertion_filename(self, name): - return f'recommended_{name}.yml' - - def _backup_assertion_file(self, assertions, prefix=''): - for name in assertions.keys(): - filename = f'{prefix}_{name}.yml' if prefix else f'{name}.yml' - file_path = os.path.join(self.assertion_search_path, filename) - if os.path.exists(file_path): - backup_file_name = f'{filename}.bak' - backup_path = os.path.join(self.assertion_search_path, backup_file_name) - os.rename(file_path, backup_path) - - def _dump_assertions_files(self, assertions, prefix=''): - paths = [] - os.makedirs(self.assertion_search_path, exist_ok=True) - for name, assertion in assertions.items(): - filename = f'{prefix}_{name}.yml' if prefix else f'{name}.yml' - file_path = os.path.join(self.assertion_search_path, filename) - if assertion.get('skip'): # skip if it already exists user-defined assertions - continue - with open(file_path, 'w') as f: - pyml.dump(assertion, f) - paths.append(file_path) - return paths - - def evaluate(self, assertion: AssertionContext): - """ - This method is used to evaluate the assertion. - :param assertion: - :return: - """ - - """ - example: - - - name: get_outliers # test method used under distribution metric - parameters: - method: method1 # get_outliers's input parameter - window: 3 # get_outliers's input parameter - threshold: [15,100] # get_outliers's input parameter, range from 15 to 100 - assert: - outliers: 5 # in get_outliers's verification logic, check outliers parameter and return true if it's less than 5 - """ - - from piperider_cli.assertion_engine.types import get_assertion - try: - assertion_instance = get_assertion(assertion.name, assertion.metric) - - try: - result = assertion_instance.execute(assertion) - assertion.is_builtin = assertion_instance.__class__.__module__.startswith(get_assertion.__module__) - result.validate() - except Exception as e: - assertion.result.fail_with_exception(e) - return assertion - except ValueError as e: - assertion.result.fail_with_exception(AssertionError(f'Cannot find the assertion: {assertion.name}', e)) - return assertion - - def evaluate_all(self): - results = [] - exceptions = [] - - self.event_handler.handle_assertion_start(self.assertions) - self.load_plugins() - for assertion in self.assertions: - try: - self.event_handler.handle_execution_start(assertion) - assertion_result: AssertionContext = self.evaluate(assertion) - results.append(assertion_result) - - if assertion_result.result.get_internal_error(): - raise assertion_result.result.get_internal_error() - self.event_handler.handle_execution_end(assertion_result) - except AssertionError as e: - exceptions.append((assertion, e)) - except IllegalStateAssertionError as e: - exceptions.append((assertion, e)) - except BaseException as e: - exceptions.append((assertion, e)) - self.event_handler.handle_assertion_end(results, exceptions) - return results, exceptions - - def validate_assertions(self): - from piperider_cli.assertion_engine.types import get_assertion - results = [] - - self.load_plugins() - for assertion in self.assertions: - assertion_instance = get_assertion(assertion.name, assertion.metric) - result = assertion_instance.validate(assertion) - if result and result.has_errors(): - results.append(result) - return results - - def load_plugins(self): - - def to_dirs(path_list: str): - if path_list is None: - return [] - if sys.platform == 'win32': - return [x.strip() for x in path_list.split(';')] - return [x.strip() for x in path_list.split(':')] - - plugin_dirs = [] - plugin_context = os.environ.get('PIPERIDER_PLUGINS') - if plugin_context: - sys.path.append(plugin_context) - plugin_dirs += to_dirs(plugin_context) - - if self.default_plugins_dir: - sys.path.append(self.default_plugins_dir) - plugin_dirs += to_dirs(self.default_plugins_dir) - - for d in plugin_dirs: - module_names = [x.split('.py')[0] for x in os.listdir(d) if x.endswith(".py")] - for m in module_names: - try: - import_module(m) - except BaseException: - print(f"Failed to load module {m} from {d}") - raise diff --git a/piperider_cli/assertion_engine/event.py b/piperider_cli/assertion_engine/event.py deleted file mode 100644 index b8a9473ca..000000000 --- a/piperider_cli/assertion_engine/event.py +++ /dev/null @@ -1,48 +0,0 @@ -from rich.color import Color -from rich.progress import Progress, Column, TextColumn, BarColumn, TimeElapsedColumn, MofNCompleteColumn -from rich.style import Style - - -class AssertionEventHandler: - def handle_assertion_start(self, assertions): - pass - - def handle_assertion_end(self, results, exceptions): - pass - - def handle_execution_start(self, assertion_context): - pass - - def handle_execution_end(self, assertion_result): - pass - - -class DefaultAssertionEventHandler(AssertionEventHandler): - - def __init__(self): - subject_column = TextColumn("{task.fields[test_subject]}") - assert_column = TextColumn("{task.fields[display_name]}", style='green') - bar_column = BarColumn(bar_width=60, pulse_style=Style.from_color(Color.from_rgb(244, 164, 96))) - mofn_column = MofNCompleteColumn(table_column=Column(width=5, justify="right")) - time_elapsed_column = TimeElapsedColumn() - self.progress = Progress(subject_column, assert_column, bar_column, mofn_column, time_elapsed_column) - self.task_id = None - - def handle_assertion_start(self, assertions): - self.progress.start() - fields = dict(display_name='', test_subject='') - self.task_id = self.progress.add_task('assertions', total=len(assertions), **fields) - - def handle_assertion_end(self, results, exceptions): - self.progress.update(self.task_id, **dict(display_name='', test_subject='Test completed')) - self.progress.stop() - - def handle_execution_start(self, assertion_context): - display_name = assertion_context.name if assertion_context.name is not None else assertion_context.metric - test_subject = assertion_context.table if assertion_context.table else '-' - if assertion_context.column: - test_subject = f'{test_subject}.{assertion_context.column}' - self.progress.update(self.task_id, **dict(display_name=display_name, test_subject=test_subject)) - - def handle_execution_end(self, assertion_result): - self.progress.update(self.task_id, advance=1, **dict(display_name='')) diff --git a/piperider_cli/assertion_engine/recommended_rules/__init__.py b/piperider_cli/assertion_engine/recommended_rules/__init__.py deleted file mode 100644 index 15249bb1b..000000000 --- a/piperider_cli/assertion_engine/recommended_rules/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from .table_assertions import RecommendedRules -from .recommender_assertion import RecommendedAssertion diff --git a/piperider_cli/assertion_engine/recommended_rules/recommender_assertion.py b/piperider_cli/assertion_engine/recommended_rules/recommender_assertion.py deleted file mode 100644 index 778b1eeb6..000000000 --- a/piperider_cli/assertion_engine/recommended_rules/recommender_assertion.py +++ /dev/null @@ -1,19 +0,0 @@ -import json -from typing import Optional - - -class RecommendedAssertion: - def __init__(self, name: Optional[str], metric: Optional[str], asserts: dict): - self.table: str = None - self.column: str = None - self.name: str = name - self.metric: str = metric - self.asserts: dict = asserts - - def __repr__(self): - table = self.table if self.table else '' - column = ('.' + self.column) if self.column else '' - - asserts = json.dumps(self.asserts).replace('\"', '') if self.asserts else '' - - return f'{table}{column}: {self.name} {asserts}' diff --git a/piperider_cli/assertion_engine/recommended_rules/table_assertions.py b/piperider_cli/assertion_engine/recommended_rules/table_assertions.py deleted file mode 100644 index 1861573ab..000000000 --- a/piperider_cli/assertion_engine/recommended_rules/table_assertions.py +++ /dev/null @@ -1,205 +0,0 @@ -import math -from typing import Optional - -from .recommender_assertion import RecommendedAssertion - - -def recommended_table_row_count_assertion(table, column, profiling_result) -> RecommendedAssertion: - if column is not None: - return None - - row_count = profiling_result['tables'][table].get('row_count') - if row_count is None: - return None - - test_metric_name = 'row_count' - assertion_values = { - 'gte': int(row_count * 0.9), - } - assertion = RecommendedAssertion(None, test_metric_name, assertion_values) - return assertion - - -def recommended_column_schema_type_assertion(table, column, profiling_result) -> RecommendedAssertion: - if column is None: - return None - - schema_type = profiling_result['tables'][table]['columns'][column].get('schema_type') - if schema_type is None: - return None - - test_function_name = 'assert_column_schema_type' - assertion_values = { - 'schema_type': schema_type - } - assertion = RecommendedAssertion(test_function_name, None, assertion_values) - return assertion - - -def recommended_column_min_assertion(table, column, profiling_result) -> RecommendedAssertion: - if column is None: - return None - - column_metric = profiling_result['tables'][table]['columns'][column] - column_type = column_metric['type'] - if column_type == 'numeric': - valids = column_metric.get('valids') - if not valids: - return None - - column_min = column_metric['min'] - histogram_counts = column_metric['histogram']['counts'] if column_metric['histogram'] else [] - - count = 0 - for i, v in enumerate(reversed(histogram_counts)): - count = count + v - if i == len(histogram_counts) // 2: - break - - if count / valids > 0.95: - test_function_name = 'assert_column_min_in_range' - assertion_values = { - 'min': sorted([round(column_min * 0.9, 4), round(column_min * 1.1, 4)]) - } - assertion = RecommendedAssertion(test_function_name, None, assertion_values) - return assertion - else: - return None - - -def recommended_column_max_assertion(table, column, profiling_result) -> RecommendedAssertion: - if column is None: - return None - - column_metric = profiling_result['tables'][table]['columns'][column] - column_type = column_metric['type'] - if column_type == 'numeric': - valids = column_metric.get('valids') - if not valids: - return None - - column_max = column_metric['max'] - histogram_counts = column_metric['histogram']['counts'] if column_metric['histogram'] else [] - - count = 0 - for i, v in enumerate(histogram_counts): - count = count + v - if i == len(histogram_counts) // 2: - break - - if count / valids > 0.95: - test_function_name = 'assert_column_max_in_range' - assertion_values = { - 'max': sorted([round(column_max * 0.9, 4), round(column_max * 1.1, 4)]) - } - assertion = RecommendedAssertion(test_function_name, None, assertion_values) - return assertion - else: - return None - - -def recommended_column_value_assertion(table, column, profiling_result) -> Optional[RecommendedAssertion]: - if column is None: - return None - - column_metric = profiling_result['tables'][table]['columns'][column] - column_type = column_metric['type'] - assert_name = 'assert_column_value' - if column_type == 'numeric': - valids = column_metric.get('valids') - if not valids: - return None - - histogram_counts = column_metric['histogram']['counts'] if column_metric['histogram'] else [] - if not histogram_counts: - return None - - column_min = column_metric.get('min') - column_max = column_metric.get('max') - - if column_min is None or column_max is None: - return None - - count_first_half = sum(histogram_counts[0:len(histogram_counts) // 2]) - count_second_half = sum(histogram_counts[math.ceil(len(histogram_counts) / 2):]) - - if count_first_half / valids > 0.95: - boundary = column_max * 1.1 if column_max > 0 else column_max * 0.9 - assertion_values = { - 'lte': round(boundary, 4) - } - return RecommendedAssertion(assert_name, None, assertion_values) - elif count_second_half / valids > 0.95: - boundary = column_min * 0.9 if column_min > 0 else column_min * 1.1 - assertion_values = { - 'gte': round(boundary, 4) - } - return RecommendedAssertion(assert_name, None, assertion_values) - elif column_type == 'string': - distinct = column_metric.get('distinct') - if distinct is None: - return None - - topk = [] - if column_metric.get('topk'): - topk = column_metric.get('topk').get('values', []) - - if distinct and distinct <= 50 and topk: - assertion_values = { - 'in': topk - } - return RecommendedAssertion(assert_name, None, assertion_values) - else: - return None - - -def recommended_column_unique_assertion(table, column, profiling_result) -> RecommendedAssertion: - if column is None: - return None - - column_metric = profiling_result['tables'][table]['columns'][column] - column_type = column_metric.get('type') - if column_type is None: - return None - - if column_type == 'string': - valids = column_metric.get('valids') - distinct = column_metric.get('distinct') - - if valids is None or distinct is None: - return None - - if valids > 0 and distinct == valids: - test_function_name = 'assert_column_unique' - assertion = RecommendedAssertion(test_function_name, None, None) - return assertion - else: - return None - - -def recommended_column_not_null_assertion(table, column, profiling_result) -> RecommendedAssertion: - if column is None: - return None - - column_metric = profiling_result['tables'][table]['columns'][column] - non_nulls = column_metric.get('non_nulls') - if non_nulls is None: - return - - total = column_metric['total'] - - if total > 0 and non_nulls == total: - test_function_name = 'assert_column_not_null' - assertion = RecommendedAssertion(test_function_name, None, None) - return assertion - else: - return None - - -RecommendedRules = [ - recommended_table_row_count_assertion, - recommended_column_schema_type_assertion, - recommended_column_unique_assertion, - recommended_column_not_null_assertion, - recommended_column_value_assertion -] diff --git a/piperider_cli/assertion_engine/recommender.py b/piperider_cli/assertion_engine/recommender.py deleted file mode 100644 index 68cbeee77..000000000 --- a/piperider_cli/assertion_engine/recommender.py +++ /dev/null @@ -1,104 +0,0 @@ -import inspect -from typing import Callable, List, Dict - -from piperider_cli import yaml as pyml - -from .recommended_rules import RecommendedAssertion, RecommendedRules - -recommended_rule_parameter_keys = ['table', 'column', 'profiling_result'] - -RECOMMENDED_ASSERTION_TAG = 'RECOMMENDED' - - -class AssertionRecommender: - def __init__(self): - self.assertions: Dict[pyml.CommentedMap] = {} - self.recommended_rule_callbacks = [] - self.load_recommended_rules() - self.generated_assertions: List[RecommendedAssertion] = [] - - def prepare_assertion_template(self, profiling_result): - for name, table in profiling_result.get('tables', {}).items(): - # Generate template of assertions - table_assertions = pyml.CommentedSeq() - columns = pyml.CommentedMap() - - # Generate assertions for columns - for col in table.get('columns', {}).keys(): - column_name = str(col) - column_assertions = pyml.CommentedSeq() - columns[column_name] = pyml.CommentedMap({ - 'tests': column_assertions, - }) - - # Generate assertions for table - recommended_assertion = pyml.CommentedMap({ - name: pyml.CommentedMap({ - 'tests': table_assertions, - 'columns': columns, - })}) - recommended_assertion.yaml_set_start_comment(f'# Auto-generated by Piperider based on table "{name}"') - table_level_comment = f'Test Cases for Table "{name}"' - columns_level_comment = 'Test Cases for Columns' - recommended_assertion[name].yaml_set_comment_before_after_key('tests', indent=2, - before=table_level_comment) - recommended_assertion[name].yaml_set_comment_before_after_key('columns', after_indent=4, - after=columns_level_comment) - self.assertions[name] = recommended_assertion - return self.assertions - - def run(self, profiling_result) -> List[RecommendedAssertion]: - if not self.assertions: - self.prepare_assertion_template(profiling_result) - - for table, ta in self.assertions.items(): - table_assertions: pyml.CommentedSeq = ta[table]['tests'] - for callback in self.recommended_rule_callbacks: - assertion: RecommendedAssertion = callback(table, None, profiling_result) - if assertion: - if assertion.name: - table_assertions.append(pyml.CommentedMap({ - 'name': assertion.name, - 'assert': pyml.CommentedMap(assertion.asserts), - 'tags': [RECOMMENDED_ASSERTION_TAG] - })) - else: - table_assertions.append(pyml.CommentedMap({ - 'metric': assertion.metric, - 'assert': pyml.CommentedMap(assertion.asserts), - 'tags': [RECOMMENDED_ASSERTION_TAG] - })) - assertion.table = table - self.generated_assertions.append(assertion) - for column, col in ta[table]['columns'].items(): - column_assertions = col['tests'] - for callback in self.recommended_rule_callbacks: - assertion: RecommendedAssertion = callback(table, column, profiling_result) - if not assertion: - continue - - assertion.table = table - assertion.column = column - if assertion.asserts: - column_assertions.append(pyml.CommentedMap({ - 'name': assertion.name, - 'assert': pyml.CommentedMap(assertion.asserts), - 'tags': [RECOMMENDED_ASSERTION_TAG] - })) - else: - column_assertions.append(pyml.CommentedMap({ - 'name': assertion.name, - 'tags': [RECOMMENDED_ASSERTION_TAG] - })) - self.generated_assertions.append(assertion) - return self.generated_assertions - - def load_recommended_rules(self): - for callback in RecommendedRules: - if isinstance(callback, Callable): - args = inspect.signature(callback) - parameters = list(args.parameters.keys()) - if parameters == recommended_rule_parameter_keys: - self.recommended_rule_callbacks.append(callback) - pass - pass diff --git a/piperider_cli/assertion_engine/types/__init__.py b/piperider_cli/assertion_engine/types/__init__.py deleted file mode 100644 index 9b8ec17be..000000000 --- a/piperider_cli/assertion_engine/types/__init__.py +++ /dev/null @@ -1,76 +0,0 @@ -from piperider_cli.assertion_engine import AssertionContext, ValidationResult, AssertionResult -from piperider_cli.assertion_engine.types.assert_column_misc import \ - AssertColumnNotNull, \ - AssertColumnNull, \ - AssertColumnUnique, \ - AssertColumnExist, \ - AssertColumnValue -from piperider_cli.assertion_engine.types.assert_column_ranges import \ - AssertColumnMinInRange, \ - AssertColumnMaxInRange, \ - AssertColumnInRange -from piperider_cli.assertion_engine.types.assert_column_types import \ - AssertColumnSchemaType, \ - AssertColumnType, \ - AssertColumnInTypes -from piperider_cli.assertion_engine.types.assert_metrics import AssertMetric -from piperider_cli.assertion_engine.types.assert_rows import \ - AssertRowCountInRange, \ - AssertRowCount -from piperider_cli.assertion_engine.types.base import BaseAssertionType - -custom_registry = {} - - -class _NotFoundAssertion(BaseAssertionType): - - def __init__(self, assertion_name): - self.assertion_name = assertion_name - - def name(self): - return self.assertion_name - - def execute(self, context: AssertionContext) -> AssertionResult: - raise BaseException('this is not-found-assertion, cannot be used') - - def validate(self, context: AssertionContext) -> ValidationResult: - result = ValidationResult(context) - result.errors.append(f"cannot find an assertion by name '{context.name}'") - return result - - -def register_assertion_function(typename: BaseAssertionType): - instance: BaseAssertionType = typename() - if instance.name() in custom_registry: - raise ValueError(f'{instance.name()} has been registered by {custom_registry.get(instance.name())}') - custom_registry[instance.name()] = instance - - -def get_assertion(name: str, metric: str) -> BaseAssertionType: - if metric: - return AssertMetric() - - if name not in custom_registry: - return _NotFoundAssertion(name) - return custom_registry[name] - - -register_assertion_function(AssertRowCountInRange) -register_assertion_function(AssertRowCount) - -register_assertion_function(AssertColumnSchemaType) -register_assertion_function(AssertColumnType) -register_assertion_function(AssertColumnInTypes) - -register_assertion_function(AssertColumnMinInRange) -register_assertion_function(AssertColumnMaxInRange) -register_assertion_function(AssertColumnInRange) - -register_assertion_function(AssertColumnNotNull) -register_assertion_function(AssertColumnNull) -register_assertion_function(AssertColumnUnique) -register_assertion_function(AssertColumnExist) -register_assertion_function(AssertColumnValue) - -if __name__ == '__main__': - pass diff --git a/piperider_cli/assertion_engine/types/assert_column_misc.py b/piperider_cli/assertion_engine/types/assert_column_misc.py deleted file mode 100644 index ba1b7f361..000000000 --- a/piperider_cli/assertion_engine/types/assert_column_misc.py +++ /dev/null @@ -1,376 +0,0 @@ -from datetime import datetime - -from sqlalchemy import select, MetaData, Table, func - -from piperider_cli.assertion_engine import AssertionContext, AssertionResult -from piperider_cli.assertion_engine.assertion import ValidationResult -from piperider_cli.assertion_engine.types.base import BaseAssertionType -from piperider_cli.assertion_engine.types.assert_metrics import AssertMetric - - -class AssertColumnNotNull(BaseAssertionType): - def name(self): - return "assert_column_not_null" - - def execute(self, context: AssertionContext): - return assert_column_not_null(context) - - def validate(self, context: AssertionContext) -> ValidationResult: - return ValidationResult(context).keep_no_args() - - -class AssertColumnNull(BaseAssertionType): - def name(self): - return "assert_column_null" - - def execute(self, context: AssertionContext): - return assert_column_null(context) - - def validate(self, context: AssertionContext) -> ValidationResult: - return ValidationResult(context).keep_no_args() - - -class AssertColumnUnique(BaseAssertionType): - def name(self): - return "assert_column_unique" - - def execute(self, context: AssertionContext): - return assert_column_unique(context) - - def validate(self, context: AssertionContext) -> ValidationResult: - return ValidationResult(context).keep_no_args() - - -class AssertColumnExist(BaseAssertionType): - def name(self): - return "assert_column_exist" - - def execute(self, context: AssertionContext): - return assert_column_exist(context) - - def validate(self, context: AssertionContext) -> ValidationResult: - return ValidationResult(context).keep_no_args() - - -class AssertColumnValue(BaseAssertionType): - def __init__(self): - self.set_op = 'in' - self.range_ops = ['gte', 'lte', 'gt', 'lt'] - - def name(self): - return "assert_column_value" - - def execute(self, context: AssertionContext): - table = context.table - column = context.column - metrics = context.profiler_result - - target_metrics = None - if metrics: - target_metrics = metrics.get('tables', {}).get(table) - if column: - target_metrics = target_metrics.get('columns', {}).get(column) - - if not target_metrics: - return context.result.fail_with_metric_not_found_error(context.table, context.column) - - assert_ops = list(context.asserts.keys()) - if assert_ops[0] in self.range_ops: - return self._assert_column_value_range(context, target_metrics) - elif assert_ops[0] == self.set_op: - return self._assert_column_value_set(context, target_metrics) - - return context.result.fail() - - def validate(self, context: AssertionContext) -> ValidationResult: - results = ValidationResult(context) - - ops = [self.set_op] + self.range_ops - - if not context.asserts: - results.errors.append(f'At least one of {ops} is needed.') - return results - - if not isinstance(context.asserts, dict): - results.errors.append('assert should be an associative array') - return results - - results = results.allow_only(*ops) - if results.errors: - return results - - assert_ops = list(context.asserts.keys()) - if assert_ops[0] in self.range_ops: - results = results.require_metric_consistency(*self.range_ops) - if results.errors: - return results - - self._assert_value_validation(context.asserts, results) - elif assert_ops[0] == self.set_op: - if not isinstance(context.asserts.get(self.set_op), list): - results.errors.append(f"'{self.set_op}' should specify a list") - return results - results = results.require_same_types(self.set_op) - - return results - - @staticmethod - def _assert_value_validation(value_boundary: dict, results: ValidationResult): - if len(value_boundary.keys()) == 1: - pass - elif len(value_boundary.keys()) == 2: - lower = None - upper = None - for op, v in value_boundary.items(): - if op.startswith('lt'): - upper = v - elif op.startswith('gt'): - lower = v - - if upper is None or lower is None: - results.errors.append('Please specified your metric upper and lower boundary') - return - - if isinstance(upper, str) and isinstance(lower, str): - upper = datetime.fromisoformat(upper) - lower = datetime.fromisoformat(lower) - if upper < lower: - results.errors.append("The 'lt' or 'lte' value should be greater than or equal to " - "the 'gt' or 'gte' value.") - else: - results.errors.append('The number of operator should be 1 or 2.') - - @staticmethod - def _assert_column_value_range(context, target_metrics): - context.result.expected = AssertMetric.to_interval_notation(context.asserts) - has_fail = False - - if target_metrics.get('min') is not None: - if not AssertMetric.assert_metric_boundary(target_metrics.get('min'), context.asserts): - has_fail = True - else: - return context.result.fail() - - if target_metrics.get('max') is not None: - if not AssertMetric.assert_metric_boundary(target_metrics.get('max'), context.asserts): - has_fail = True - else: - return context.result.fail() - - context.result.actual = AssertMetric.to_interval_notation({ - 'gte': target_metrics.get('min'), - 'lte': target_metrics.get('max') - }) - - if has_fail: - return context.result.fail() - - return context.result.success() - - @staticmethod - def _assert_column_value_set(context, target_metrics): - assert_set = list(context.asserts.values())[0] - context.result.expected = assert_set - assert_set = set(assert_set) - - samples = None - if target_metrics: - samples = target_metrics.get('samples') - distinct = target_metrics.get('distinct') - topk = [] - if target_metrics.get('topk'): - topk = target_metrics.get('topk').get('values', []) - - if distinct and distinct > len(assert_set): - subtraction = AssertColumnValue._column_value_set_subtraction(context, target_metrics, None) - context.result.actual = AssertColumnValue._get_column_value_set_actual_msg(subtraction) - return context.result.fail() - - # TODO: define topk default max length - if distinct and distinct <= 50 and topk: - if len(assert_set) < len(topk): - subtraction = AssertColumnValue._column_value_set_subtraction(context, None, topk) - context.result.actual = AssertColumnValue._get_column_value_set_actual_msg(subtraction) - return context.result.fail() - - for k in topk: - if k not in assert_set: - subtraction = AssertColumnValue._column_value_set_subtraction(context, None, topk) - context.result.actual = AssertColumnValue._get_column_value_set_actual_msg(subtraction) - return context.result.fail() - - return context.result.success() - - # try to query 4 additional categories - # if there's additional categories, it means the dataset has more data category than assertion - # also we show up to 3 items in the category subtraction - result = AssertColumnValue._query_column_category(context, samples, len(assert_set) + 4) - if len(result) > len(assert_set): - subtraction = AssertColumnValue._column_value_set_subtraction(context, None, result) - context.result.actual = AssertColumnValue._get_column_value_set_actual_msg(subtraction) - return context.result.fail() - - for row in result: - if row not in assert_set: - subtraction = AssertColumnValue._column_value_set_subtraction(context, None, result) - context.result.actual = AssertColumnValue._get_column_value_set_actual_msg(subtraction) - return context.result.fail() - - return context.result.success() - - @staticmethod - def _query_column_category(context, samples, size): - table = context.table - column = context.column - - metadata = MetaData() - Table(table, metadata, autoload_with=context.engine) - t = metadata.tables[table] - c = t.columns[column] - - with context.engine.connect() as conn: - base = select(c.label('c')).select_from(t) - if samples: - base = base.limit(samples) - cte = base.cte() - stmt = select( - func.distinct(cte.c.c) - ).select_from( - cte - ).where( - cte.c.c.isnot(None) - ) - results = conn.execute(stmt).fetchmany(size=size) - - return [r[0] for r in results] - - @staticmethod - def _column_value_set_subtraction(context, target_metrics, actual_set): - assert_set = set(list(context.asserts.values())[0]) - samples = None - if target_metrics: - samples = target_metrics.get('samples') - distinct = target_metrics.get('distinct') - topk = [] - if target_metrics.get('topk'): - topk = target_metrics.get('topk').get('values', []) - - if distinct and distinct <= 50 and topk: - actual_set = topk - - if actual_set: - return set(actual_set) - assert_set - - # try to query 4 additional categories - # if there's additional categories, it means the dataset has more data category than assertion - # also we show up to 3 items in the category subtraction - result = AssertColumnValue._query_column_category(context, samples, len(assert_set) + 4) - - return set(result) - assert_set - - @staticmethod - def _get_column_value_set_actual_msg(subtraction): - subtraction = list(subtraction) - if len(subtraction) == 1: - actual_msg = f'{subtraction} is in the set' - elif len(subtraction) <= 3: - actual_msg = f'{subtraction} are in the set' - else: - actual_msg = f'{subtraction[0:3]} and more are in the set' - - return actual_msg - - -def assert_column_not_null(context: AssertionContext) -> AssertionResult: - table = context.table - column = context.column - metrics = context.profiler_result - - column_metrics = metrics.get('tables', {}).get(table, {}).get('columns', {}).get(column) - if not column_metrics: - return context.result.fail_with_metric_not_found_error(context.table, context.column) - - if context.asserts: - return context.result.fail_with_no_assert_is_required() - - samples = column_metrics.get('samples') - non_nulls = column_metrics.get('non_nulls') - - success = (samples == non_nulls) - context.result.actual = None - - if success: - return context.result.success() - - return context.result.fail() - - -def assert_column_null(context: AssertionContext) -> AssertionResult: - table = context.table - column = context.column - metrics = context.profiler_result - - column_metrics = metrics.get('tables', {}).get(table, {}).get('columns', {}).get(column) - if not column_metrics: - # cannot find the column in the metrics - return context.result.fail_with_metric_not_found_error(context.table, context.column) - - if context.asserts: - return context.result.fail_with_no_assert_is_required() - - non_nulls = column_metrics.get('non_nulls') - - success = (non_nulls == 0) - context.result.actual = dict(success=success) - - if success: - return context.result.success() - - return context.result.fail() - - -def assert_column_unique(context: AssertionContext) -> AssertionResult: - table = context.table - column = context.column - metrics = context.profiler_result - - column_metrics = metrics.get('tables', {}).get(table, {}).get('columns', {}).get(column) - if not column_metrics: - # cannot find the column in the metrics - return context.result.fail_with_metric_not_found_error(context.table, context.column) - - if context.asserts: - return context.result.fail_with_no_assert_is_required() - - valids = column_metrics.get('valids') - distinct = column_metrics.get('distinct') - - success = (valids == distinct) - context.result.actual = None - - if success: - return context.result.success() - - return context.result.fail() - - -def assert_column_exist(context: AssertionContext) -> AssertionResult: - table = context.table - column = context.column - metrics = context.profiler_result - - table_metrics = metrics.get('tables', {}).get(table) - if not table_metrics: - # cannot find the table in the metrics - return context.result.fail_with_metric_not_found_error(context.table, None) - - if context.asserts: - return context.result.fail_with_no_assert_is_required() - - column_metrics = table_metrics.get('columns', {}).get(column) - if column_metrics: - context.result.actual = dict(success=True) - return context.result.success() - - context.result.actual = dict(success=False) - return context.result.fail() diff --git a/piperider_cli/assertion_engine/types/assert_column_ranges.py b/piperider_cli/assertion_engine/types/assert_column_ranges.py deleted file mode 100644 index c795a4215..000000000 --- a/piperider_cli/assertion_engine/types/assert_column_ranges.py +++ /dev/null @@ -1,138 +0,0 @@ -from datetime import datetime - -from piperider_cli.assertion_engine import AssertionContext, AssertionResult -from piperider_cli.assertion_engine.assertion import ValidationResult -from piperider_cli.assertion_engine.types.base import BaseAssertionType - - -class AssertColumnMinInRange(BaseAssertionType): - def name(self): - return "assert_column_min_in_range" - - def execute(self, context: AssertionContext): - return assert_column_min_in_range(context) - - def validate(self, context: AssertionContext) -> ValidationResult: - result = ValidationResult(context).require('min') - if result.has_errors(): - return result - - return result.require_range_pair('min').require_same_types('min') - - -class AssertColumnMaxInRange(BaseAssertionType): - def name(self): - return "assert_column_max_in_range" - - def execute(self, context: AssertionContext): - return assert_column_max_in_range(context) - - def validate(self, context: AssertionContext) -> ValidationResult: - result = ValidationResult(context).require('max') - if result.has_errors(): - return result - - return result.require_range_pair('max').require_same_types('max') - - -class AssertColumnInRange(BaseAssertionType): - def name(self): - return "assert_column_in_range" - - def execute(self, context: AssertionContext): - return assert_column_in_range(context) - - def validate(self, context: AssertionContext) -> ValidationResult: - result = ValidationResult(context).require('range') - if result.has_errors(): - return result - - return result.require_range_pair('range').require_same_types('range') - - -def assert_column_min_in_range(context: AssertionContext) -> AssertionResult: - return _assert_column_in_range(context, target_metric='min') - - -def assert_column_max_in_range(context: AssertionContext) -> AssertionResult: - return _assert_column_in_range(context, target_metric='max') - - -def assert_column_in_range(context: AssertionContext) -> AssertionResult: - return _assert_column_in_range(context, target_metric='range') - - -def _assert_column_in_range(context: AssertionContext, **kwargs) -> AssertionResult: - table = context.table - column = context.column - metrics = context.profiler_result - - table_metrics = metrics.get('tables', {}).get(table) - if table_metrics is None: - return context.result.fail_with_metric_not_found_error(context.table, None) - - column_metrics = table_metrics.get('columns', {}).get(column) - if column_metrics is None: - return context.result.fail_with_metric_not_found_error(context.table, context.column) - - # Check assertion input - target_metric = kwargs.get('target_metric') - values = context.asserts.get(target_metric) - if values is None or len(values) != 2: - return context.result.fail_with_assertion_error('Expect a range [min_value, max_value].') - - class Observed(object): - def __init__(self, column_metrics: dict, target_metric: str): - self.column_metrics = column_metrics - self.target_metric = target_metric - self.column_type = column_metrics.get('type') - self.actual = [] - - if self.target_metric == 'range': - self.actual = [column_metrics.get('min'), column_metrics.get('max')] - else: - self.actual = [column_metrics.get(target_metric)] - - def is_metric_available(self): - return [x for x in self.actual if x is None] == [] - - def check_range(self, min_value, max_value): - for metric in self.actual: - metric = self.to_numeric(metric) - if metric is None: - yield context.result.fail_with_assertion_error('Column not support range.') - else: - yield min_value <= metric <= max_value - - def to_numeric(self, metric): - if self.column_type == 'datetime': - # TODO: check datetime format. Maybe we can leverage the format checking by YAML parser - return datetime.strptime(metric, '%Y-%m-%d %H:%M:%S.%f') - elif self.column_type in ['integer', 'numeric']: - return metric - else: - return None - - def actual_value(self): - if len(self.actual) == 1: - return self.actual[0] - return self.actual - - observed = Observed(column_metrics, target_metric) - if not observed.is_metric_available(): - return context.result.fail_with_metric_not_found_error(context.table, context.column) - - context.result.actual = observed.actual_value() - - results = [] - for result in observed.check_range(values[0], values[1]): - results.append(result) - - non_bools = [x for x in results if not isinstance(x, bool)] - if non_bools: - return non_bools[0] - - bools = [x for x in results if isinstance(x, bool)] - if set(bools) == set([True]): - return context.result.success() - return context.result.fail() diff --git a/piperider_cli/assertion_engine/types/assert_column_types.py b/piperider_cli/assertion_engine/types/assert_column_types.py deleted file mode 100644 index f1aeb81a0..000000000 --- a/piperider_cli/assertion_engine/types/assert_column_types.py +++ /dev/null @@ -1,139 +0,0 @@ -from piperider_cli.assertion_engine import AssertionContext, AssertionResult -from piperider_cli.assertion_engine.assertion import ValidationResult -from piperider_cli.assertion_engine.types.base import BaseAssertionType - -COLUMN_TYPES = ['string', 'integer', 'numeric', 'datetime', 'date', 'time', 'boolean', 'other'] - - -class AssertColumnSchemaType(BaseAssertionType): - def name(self): - return "assert_column_schema_type" - - def execute(self, context: AssertionContext): - return assert_column_schema_type(context) - - def validate(self, context: AssertionContext) -> ValidationResult: - # type list: https://docs.sqlalchemy.org/en/14/core/type_basics.html#sql-standard-and-multiple-vendor-types - return ValidationResult(context).require('schema_type', str) - - -class AssertColumnType(BaseAssertionType): - def name(self): - return "assert_column_type" - - def execute(self, context: AssertionContext): - return assert_column_type(context) - - def validate(self, context: AssertionContext) -> ValidationResult: - result = ValidationResult(context).require('type', str) - if result.has_errors(): - return result - - if not set([context.asserts.get("type")]).issubset(set(COLUMN_TYPES)): - result.errors.append( - f'type parameter should be one of {COLUMN_TYPES}, input: {context.asserts.get("type")}') - return result - - -class AssertColumnInTypes(BaseAssertionType): - def name(self): - return "assert_column_in_types" - - def execute(self, context: AssertionContext): - return assert_column_in_types(context) - - def validate(self, context: AssertionContext) -> ValidationResult: - result = ValidationResult(context).require('types', list) - if result.has_errors(): - return result - - if not set(context.asserts.get("types")).issubset(set(COLUMN_TYPES)): - result.errors.append(f'types parameter should be one of {COLUMN_TYPES}, ' - f'input: {context.asserts.get("types")}') - - return result - - -def assert_column_schema_type(context: AssertionContext) -> AssertionResult: - table = context.table - column = context.column - metrics = context.profiler_result - - column_metrics = metrics.get('tables', {}).get(table, {}).get('columns', {}).get(column) - if not column_metrics: - return context.result.fail_with_metric_not_found_error(context.table, context.column) - - # Check assertion input - assert_schema_type = context.asserts.get('schema_type').upper() - if not assert_schema_type: - return context.result.fail_with_assertion_error('Expect a SQL schema type') - context.result.expected = assert_schema_type - - schema_type = column_metrics.get('schema_type') - context.result.actual = schema_type - - if schema_type == assert_schema_type: - return context.result.success() - - return context.result.fail() - - -def assert_column_type(context: AssertionContext) -> AssertionResult: - table = context.table - column = context.column - metrics = context.profiler_result - - column_metrics = metrics.get('tables', {}).get(table, {}).get('columns', {}).get(column) - if not column_metrics: - return context.result.fail_with_metric_not_found_error(context.table, context.column) - - # Check assertion input - assert_type = context.asserts.get('type').lower() - if not assert_type: - return context.result.fail_with_assertion_error(f'Expect a type in {COLUMN_TYPES}') - - if assert_type not in COLUMN_TYPES: - return context.result.fail_with_assertion_error(f'The column type should one of {COLUMN_TYPES}.') - - context.result.expected = assert_type - - column_type = column_metrics.get('type') - - context.result.actual = column_type - - if column_type == assert_type: - return context.result.success() - - return context.result.fail() - - -def assert_column_in_types(context: AssertionContext) -> AssertionResult: - table = context.table - column = context.column - metrics = context.profiler_result - - column_metrics = metrics.get('tables', {}).get(table, {}).get('columns', {}).get(column) - if not column_metrics: - return context.result.fail_with_metric_not_found_error(context.table, context.column) - - # Check assertion input - assert_types = context.asserts.get('types', []) - assert_types = [x.lower() for x in assert_types] - if not assert_types: - return context.result.fail_with_assertion_error(f'Expect a list of types in {COLUMN_TYPES}') - - invalid_types = [t for t in assert_types if t not in COLUMN_TYPES] - if invalid_types: - return context.result.fail_with_assertion_error( - f'Invalid types {invalid_types}. The column type should one of {COLUMN_TYPES}.') - - context.result.expected = assert_types - - column_type = column_metrics.get('type') - - context.result.actual = column_type - - if column_type in set(assert_types): - return context.result.success() - - return context.result.fail() diff --git a/piperider_cli/assertion_engine/types/assert_metrics.py b/piperider_cli/assertion_engine/types/assert_metrics.py deleted file mode 100644 index 0242e0d47..000000000 --- a/piperider_cli/assertion_engine/types/assert_metrics.py +++ /dev/null @@ -1,234 +0,0 @@ -from datetime import datetime -from typing import Union, List - -from piperider_cli.assertion_engine import AssertionContext, ValidationResult -from piperider_cli.assertion_engine.types.base import BaseAssertionType - - -class AssertMetric(BaseAssertionType): - - def __init__(self): - self.mapping = MetricName() - - def name(self): - return '' - - def execute(self, context: AssertionContext): - table = context.table - column = context.column - metrics = context.profiler_result - - target_metrics = metrics.get('tables', {}).get(table) - if column: - target_metrics = target_metrics.get('columns', {}).get(column) - - if not target_metrics: - return context.result.fail_with_metric_not_found_error(context.table, context.column) - - context.result.name = self.mapping.get(context.metric, target_metrics.get('type')) - context.result.expected = self.to_interval_notation(context.asserts) - - if context.metric in target_metrics: - if target_metrics.get(context.metric) is None: - return context.result.fail() - else: - return context.result.fail_with_profile_metric_not_found_error(context.table, context.column, context.metric) - - value = target_metrics.get(context.metric) - context.result.actual = value - if not self.assert_metric_boundary(value, context.asserts): - return context.result.fail() - - return context.result.success() - - def validate(self, context: AssertionContext) -> ValidationResult: - results = ValidationResult(context) - - if not self.mapping.is_exist(context.metric): - results.errors.append(f"cannot find a metric assertion by metric '{context.metric}'") - - names = ['gte', 'lte', 'gt', 'lt', 'eq', 'ne'] - results = results.allow_only(*names) \ - .require_metric_consistency(*names) - - if context.asserts is None: - results.errors.append(f'At least one of {names} is needed.') - - if results.errors: - return results - - self._assert_metric_validation(context.asserts, results) - - return results - - @staticmethod - def to_interval_notation(asserts): - if len(asserts.keys()) == 2: - operators = { - 'lte': ']', - 'lt': ')', - 'gte': '[', - 'gt': '(' - } - boundary = '' - for k, v in asserts.items(): - if k.startswith('lt'): - boundary += f'{v}{operators[k]}' - else: - boundary = f'{operators[k]}{v}, {boundary}' - return boundary - else: - operators = { - 'gt': '>', - 'gte': '≥', - 'eq': '=', - 'ne': '≠', - 'lt': '<', - 'lte': '≤' - } - k, v = list(asserts.items())[0] - return f'{operators[k]} {v}' - - @staticmethod - def assert_metric_boundary(metric: Union[int, float, str], metric_boundary: dict) -> bool: - if isinstance(metric, str): - metric = datetime.fromisoformat(metric) - - for op, v in metric_boundary.items(): - if isinstance(v, str): - v = datetime.fromisoformat(v) - if op == 'gt' and not metric > v: - return False - elif op == 'gte' and not metric >= v: - return False - elif op == 'eq' and not metric == v: - return False - elif op == 'ne' and not metric != v: - return False - elif op == 'lt' and not metric < v: - return False - elif op == 'lte' and not metric <= v: - return False - return True - - @staticmethod - def _assert_metric_validation(metric_boundary: dict, results: ValidationResult): - if len(metric_boundary.keys()) == 1: - pass - elif len(metric_boundary.keys()) == 2: - lower = None - upper = None - for op, v in metric_boundary.items(): - if op == 'eq' or op == 'ne': - results.errors.append("Only one operator allowed if the expression contains 'eq' and 'ne'") - return - - if op.startswith('lt'): - upper = v - elif op.startswith('gt'): - lower = v - - if upper is None or lower is None: - results.errors.append('Please specified your metric upper and lower boundary') - return - - if isinstance(upper, str) and isinstance(lower, str): - upper = datetime.fromisoformat(upper) - lower = datetime.fromisoformat(lower) - if upper < lower: - results.errors.append("The 'lt' or 'lte' value should be greater than or equal to " - "the 'gt' or 'gte' value.") - else: - results.errors.append('The number of operator should be 1 or 2.') - - -class MetricName: - def __init__(self): - self.mapping = {} - self.all_type = 'ALL' - - # table metric - self._add('row_count', 'row count') - self._add('bytes', 'volume size') - self._add('freshness', 'freshness') - self._add('duplicate_rows', 'duplicate row count') - self._add('duplicate_rows_p', 'duplicate row percentage') - - self._add('total', 'row count') - self._add('samples', 'sample count') - self._add('samples_p', 'sample percentage') - self._add('nulls', 'missing count') - self._add('nulls_p', 'missing percentage') - self._add('non_nulls', 'non null count') - self._add('non_nulls_p', 'non null percentage') - self._add('invalids', 'invalid count') - self._add('invalids_p', 'invalid percentage') - self._add('valids', 'valid count') - self._add('valids_p', 'valid percentage') - self._add('zeros', 'zero count', ['integer', 'numeric']) - self._add('zeros_p', 'zero percentage', ['integer', 'numeric']) - self._add('negatives', 'negative value count', ['integer', 'numeric']) - self._add('negatives_p', 'negative value percentage', ['integer', 'numeric']) - self._add('positives', 'positive value count', ['integer', 'numeric']) - self._add('positives_p', 'positive value percentage', ['integer', 'numeric']) - self._add('zero_length', 'zero length string count', ['string']) - self._add('zero_length_p', 'zero length string percentage', ['string']) - self._add('non_zero_length', 'non zero length string count', ['string']) - self._add('non_zero_length_p', 'non zero length string percentage', ['string']) - self._add('trues', 'true count', ['boolean']) - self._add('trues_p', 'true percentage', ['boolean']) - self._add('falses', 'false count', ['boolean']) - self._add('falses_p', 'false percentage', ['boolean']) - self._add('min', 'min', ['integer', 'numeric', 'datetime']) - self._add('max', 'max', ['integer', 'numeric', 'datetime']) - self._add('avg', 'average', ['integer', 'numeric']) - self._add('sum', 'sum', ['integer', 'numeric']) - self._add('stddev', 'standard deviation', ['integer', 'numeric']) - self._add('min', 'min length', ['string']) - self._add('max', 'max length', ['string']) - self._add('avg', 'average length', ['string']) - self._add('stddev', 'std. deviation of length', ['string']) - self._add('distinct', 'distinct count', ['integer', 'string', 'datetime']) - self._add('distinct_p', 'distinct percentage', ['integer', 'string', 'datetime']) - self._add('duplicates', 'duplicate count', ['integer', 'numeric', 'string', 'datetime']) - self._add('duplicates_p', 'duplicate percentage', ['integer', 'numeric', 'string', 'datetime']) - self._add('non_duplicates', 'non duplicate count', ['integer', 'numeric', 'string', 'datetime']) - self._add('non_duplicates_p', 'non duplicate percentage', ['integer', 'numeric', 'string', 'datetime']) - self._add('min', 'min', ['integer', 'numeric']) - self._add('p5', '5th percentile', ['integer', 'numeric']) - self._add('p25', '25th percentile ', ['integer', 'numeric']) - self._add('p50', 'median', ['integer', 'numeric']) - self._add('p75', '75th percentile', ['integer', 'numeric']) - self._add('p95', '95th percentile', ['integer', 'numeric']) - self._add('max', 'max', ['integer', 'numeric']) - - def _add(self, field, name, col_types: List[str] = None): - if col_types is None or len(col_types) == 0: - if self.all_type not in self.mapping: - self.mapping[self.all_type] = {} - self.mapping[self.all_type][field] = name - else: - for t in col_types: - if t not in self.mapping: - self.mapping[t] = {} - self.mapping[t][field] = name - - def get(self, field, col_type=None): - name = self.mapping[self.all_type].get(field) - - if name is None and col_type is not None: - name = self.mapping[col_type].get(field) - - if name is None: - return field - - return name - - def is_exist(self, field): - found = False - for type_field_mapping in self.mapping.values(): - if field in type_field_mapping.keys(): - found = True - break - - return found diff --git a/piperider_cli/assertion_engine/types/assert_rows.py b/piperider_cli/assertion_engine/types/assert_rows.py deleted file mode 100644 index 8841a4a92..000000000 --- a/piperider_cli/assertion_engine/types/assert_rows.py +++ /dev/null @@ -1,108 +0,0 @@ -from piperider_cli.assertion_engine import AssertionContext, AssertionResult -from piperider_cli.assertion_engine.assertion import ValidationResult -from piperider_cli.assertion_engine.types.base import BaseAssertionType - - -class AssertRowCountInRange(BaseAssertionType): - - def name(self): - return "assert_row_count_in_range" - - def execute(self, context: AssertionContext): - return assert_row_count_in_range(context) - - def validate(self, context: AssertionContext) -> ValidationResult: - return ValidationResult(context).require('count').require_int_pair('count') - - -class AssertRowCount(BaseAssertionType): - def name(self): - return "assert_row_count" - - def execute(self, context: AssertionContext): - return assert_row_count(context) - - def validate(self, context: AssertionContext) -> ValidationResult: - results = ValidationResult(context) \ - .require_one_of_parameters(['min', 'max']) \ - .int_if_present('min') \ - .int_if_present('max') - - if results.errors: - return results - - if context.asserts.get('min') is not None and context.asserts.get('max') is not None: - if context.asserts.get('min') > context.asserts.get('max'): - results.errors.append('The max value should be greater than or equal to the min value.') - - return results - - -def assert_row_count(context: AssertionContext) -> AssertionResult: - table = context.table - column = context.column - metrics = context.profiler_result - - table_metrics = metrics.get('tables', {}).get(table) - if not table_metrics: - return context.result.fail_with_metric_not_found_error(table, column) - - # Get the metric for the current table - row_count = table_metrics.get('row_count') - context.result.actual = row_count - - min = context.asserts.get('min', 0) - max = context.asserts.get('max') - - if context.asserts.get('min') is None and context.asserts.get('max') is None: - return context.result.fail_with_assertion_error('Expect a min or max value.') - if not isinstance(min, int): - return context.result.fail_with_assertion_error('The min value should be an integer.') - if min < 0: - return context.result.fail_with_assertion_error('The min value should be greater than or equal to 0.') - - # Only provide min - if max is None: - if min <= row_count: - return context.result.success() - else: - if not isinstance(max, int): - return context.result.fail_with_assertion_error('The max value should be an integer.') - if max < min: - return context.result.fail_with_assertion_error( - 'The max value should be greater than or equal to the min value.') - if min <= row_count <= max: - return context.result.success() - return context.result.fail() - - -def assert_row_count_in_range(context: AssertionContext) -> AssertionResult: - table = context.table - column = context.column - metrics = context.profiler_result - - table_metrics = metrics.get('tables', {}).get(table) - if not table_metrics: - return context.result.fail_with_metric_not_found_error(table, column) - - # Get the metric for the current table - row_count = table_metrics.get('row_count') - context.result.actual = row_count - - between_criteria = context.asserts.get('count', []) - - if len(between_criteria) != 2: - return context.result.fail_with_assertion_error('Expect a range [min_value, max_value].') - - valid_type = isinstance(between_criteria[0], int) and isinstance(between_criteria[1], int) - if not valid_type: - return context.result.fail_with_assertion_error('The range should be integers.') - - if between_criteria[0] > between_criteria[1]: - return context.result.fail_with_assertion_error( - 'The minimum value of the range should be the first number.') - - if between_criteria[0] <= row_count <= between_criteria[1]: - return context.result.success() - - return context.result.fail() diff --git a/piperider_cli/assertion_engine/types/base.py b/piperider_cli/assertion_engine/types/base.py deleted file mode 100644 index 7e7ecff30..000000000 --- a/piperider_cli/assertion_engine/types/base.py +++ /dev/null @@ -1,22 +0,0 @@ -import abc - -from piperider_cli.assertion_engine import AssertionContext, AssertionResult -from piperider_cli.assertion_engine.assertion import ValidationResult - - -class BaseAssertionType(metaclass=abc.ABCMeta): - - @abc.abstractmethod - def name(self): - """ - function name of the assertion - """ - pass - - @abc.abstractmethod - def execute(self, context: AssertionContext) -> AssertionResult: - pass - - @abc.abstractmethod - def validate(self, context: AssertionContext) -> ValidationResult: - pass diff --git a/piperider_cli/assertion_generator.py b/piperider_cli/assertion_generator.py deleted file mode 100644 index 46bfd1218..000000000 --- a/piperider_cli/assertion_generator.py +++ /dev/null @@ -1,64 +0,0 @@ -import json -import os - -from rich.console import Console - -from piperider_cli import raise_exception_when_directory_not_writable, get_run_json_path -from piperider_cli.assertion_engine import AssertionEngine -from piperider_cli.configuration import Configuration -from piperider_cli.error import PipeRiderNoProfilingResultError - -console = Console() - - -def _validate_input_result(result): - for f in ['tables', 'id', 'created_at', 'datasource']: - if f not in result: - return False - return True - - -class AssertionGenerator(): - @staticmethod - def exec(input_path=None, report_dir: str = None, no_recommend: bool = False, table: str = None): - console.rule('Deprecated', style='bold red') - console.print( - 'Assertions Generator is deprecated and will be removed in the future. If you have a strong need for assertions, please contact us by "piperider feedback".\n') - filesystem = Configuration.instance().activate_report_directory(report_dir=report_dir) - raise_exception_when_directory_not_writable(report_dir) - - run_json_path = get_run_json_path(filesystem.get_output_dir(), input_path) - if not os.path.isfile(run_json_path): - raise PipeRiderNoProfilingResultError(run_json_path) - - with open(run_json_path) as f: - profiling_result = json.loads(f.read()) - if not _validate_input_result(profiling_result): - console.print(f'[bold red]Error: {run_json_path} is invalid[/bold red]') - return 1 - console.print(f'[bold dark_orange]Generating recommended assertions from:[/bold dark_orange] {run_json_path}') - - if table: - # only keep the profiling result of the specified table - profiling_result['tables'] = {k: v for k, v in profiling_result['tables'].items() if k == table} - if not profiling_result['tables']: - console.print(f'[bold red]Error: {table} is not found from {run_json_path}[/bold red]') - return 1 - - assertion_engine = AssertionEngine(None) - if no_recommend: - template_assertions = assertion_engine.generate_template_assertions(profiling_result) - - # Show the assertion template files - console.rule('Generated Assertions Templates') - for f in template_assertions: - console.print(f'[bold green]Assertion Templates[/bold green]: {f}') - else: - # Generate recommended assertions - assertion_engine.load_assertions(profiler_result=profiling_result) - recommended_assertions = assertion_engine.generate_recommended_assertions(profiling_result) - - # Show the recommended assertions files - console.rule('Generated Recommended Assertions') - for f in recommended_assertions: - console.print(f'[bold green]Recommended Assertion[/bold green]: {f}') diff --git a/piperider_cli/cli.py b/piperider_cli/cli.py index f9b02c2b2..e14b64677 100644 --- a/piperider_cli/cli.py +++ b/piperider_cli/cli.py @@ -201,7 +201,7 @@ def init(**kwargs): @add_options(dbt_related_options) @add_options(debug_option) def diagnose(**kwargs): - 'Check project configuration, datasource, connections, and assertion configuration.' + 'Check project configuration, datasource, and connections configuration.' console = Console() @@ -256,33 +256,13 @@ def diagnose(**kwargs): @add_options(debug_option) def run(**kwargs): """ - Profile data source, run assertions, and generate report(s). By default, the raw results and reports are saved in ".piperider/outputs". + Profile data source and generate report(s). By default, the raw results and reports are saved in ".piperider/outputs". """ from piperider_cli.cli_utils.run_cmd import run as cmd return cmd(**kwargs) -@cli.command(short_help='Generate recommended assertions. - Deprecated', cls=TrackCommand) -@click.option('--input', default=None, type=click.Path(exists=True), help='Specify the raw result file.') -@click.option('--no-recommend', is_flag=True, help='Generate assertions templates only.') -@click.option('--report-dir', default=None, type=click.STRING, help='Use a different report directory.') -@click.option('--table', default=None, type=click.STRING, help='Generate assertions for the given table') -@add_options(debug_option) -def generate_assertions(**kwargs): - 'Generate recommended assertions based on the latest result. By default, the profiling result will be loaded from ".piperider/outputs".' - input_path = kwargs.get('input') - report_dir = kwargs.get('report_dir') - no_recommend = kwargs.get('no_recommend') - table = kwargs.get('table') - - from piperider_cli.assertion_generator import AssertionGenerator - ret = AssertionGenerator.exec(input_path=input_path, report_dir=report_dir, no_recommend=no_recommend, table=table) - if ret != 0: - sys.exit(ret) - return ret - - @cli.command(short_help='Generate a report.', cls=TrackCommand) @click.option('--input', default=None, type=RunDataPath(), help='Specify the raw result file.') @click.option('--output', '-o', default=None, type=click.STRING, help='Directory to save the results.') diff --git a/piperider_cli/cli_utils/run_cmd.py b/piperider_cli/cli_utils/run_cmd.py index 2cf092baa..c16c8e5c7 100644 --- a/piperider_cli/cli_utils/run_cmd.py +++ b/piperider_cli/cli_utils/run_cmd.py @@ -9,7 +9,7 @@ def run(**kwargs): - 'Profile data source, run assertions, and generate report(s). By default, the raw results and reports are saved in ".piperider/outputs".' + 'Profile data source and generate report(s). By default, the raw results and reports are saved in ".piperider/outputs".' from piperider_cli.cli_utils import DbtUtil from piperider_cli.cli_utils.cloud import CloudConnectorHelper diff --git a/piperider_cli/event/events.py b/piperider_cli/event/events.py index 8c1567ef7..4955f08a4 100644 --- a/piperider_cli/event/events.py +++ b/piperider_cli/event/events.py @@ -9,13 +9,8 @@ def __init__(self): self.tables = 0 self.columns = [] self.rows = [] - self.passed_assertions = 0 - self.failed_assertions = 0 self.passed_dbt_testcases = 0 self.failed_dbt_testcases = 0 - self.build_in_assertions = 0 - self.custom_assertions = 0 - self.recommended_assertions = 0 def to_dict(self): return self.__dict__ diff --git a/piperider_cli/runner.py b/piperider_cli/runner.py index 75c5c61df..32124d26c 100644 --- a/piperider_cli/runner.py +++ b/piperider_cli/runner.py @@ -14,7 +14,6 @@ from rich import box from rich.color import Color from rich.console import Console -from rich.pretty import Pretty from rich.progress import BarColumn, Column, MofNCompleteColumn, Progress, TextColumn, TimeElapsedColumn from rich.style import Style from rich.table import Table @@ -24,8 +23,6 @@ import piperider_cli.dbtutil as dbtutil from piperider_cli import clone_directory, convert_to_tzlocal, datetime_to_str, event, \ raise_exception_when_directory_not_writable -from piperider_cli.assertion_engine import AssertionEngine -from piperider_cli.assertion_engine.recommender import RECOMMENDED_ASSERTION_TAG from piperider_cli.configuration import Configuration, FileSystem, ReportDirectory from piperider_cli.datasource import DataSource from piperider_cli.datasource.unsupported import UnsupportedDataSource @@ -196,19 +193,6 @@ def _filter_subject(name: str, includes: List[str], excludes: List[str]) -> bool return True -def _execute_assertions(console: Console, engine, ds_name: str, output, profiler_result, created_at): - # TODO: Implement running test cases based on profiling result - assertion_engine = AssertionEngine(engine) - assertion_engine.load_assertions(profiler_result) - - results = exceptions = [] - # Execute assertions - if len(assertion_engine.assertions): - console.rule('Testing') - results, exceptions = assertion_engine.evaluate_all() - return results, exceptions - - def _show_dbt_test_result(dbt_test_results, title=None, failed_only=False): console = Console() ascii_table = Table(show_header=True, show_edge=True, header_style='bold magenta', @@ -249,78 +233,6 @@ def _show_dbt_test_result(dbt_test_results, title=None, failed_only=False): console.print(ascii_table) -def _show_assertion_result(results, exceptions, failed_only=False, single_table=None, title=None): - console = Console() - - def _wrap_pretty(obj): - if obj is None: - return '-' - return obj if isinstance(obj, str) else Pretty(obj) - - if results: - ascii_table = Table(show_header=True, show_edge=True, header_style='bold magenta', - box=box.SIMPLE, title=title) - ascii_table.add_column('Status', style='bold white') - ascii_table.add_column('Test Subject', style='bold') - ascii_table.add_column('Assertion', style='bold green') - ascii_table.add_column('Expected', style='bold') - ascii_table.add_column('Actual', style='cyan') - - for assertion in results: - if single_table and single_table != assertion.table: - continue - if failed_only and assertion.result.status(): - continue - table = assertion.table - column = assertion.column - test_function = assertion.result.name - success = assertion.result.status() - target = f'[yellow]{table}[/yellow].[blue]{column}[/blue]' if column else f'[yellow]{table}[/yellow]' - if success: - ascii_table.add_row( - '[[bold green] OK [/bold green]]', - target, - test_function, - _wrap_pretty(assertion.result.expected), - _wrap_pretty(assertion.result.actual) - ) - else: - ascii_table.add_row( - '[[bold red]FAILED[/bold red]]', - target, - test_function, - _wrap_pretty(assertion.result.expected), - _wrap_pretty(assertion.result.actual) - ) - if assertion.result.exception: - msg = f'[grey11 on white][purple4]{type(assertion.result.exception).__name__}[/purple4](\'{assertion.result.exception}\')[/grey11 on white]' - ascii_table.add_row('', '', '', msg) - - if ascii_table.rows: - console.print(ascii_table) - # TODO: Handle exceptions - pass - - -def _transform_assertion_result(table: str, results): - tests = [] - columns = {} - if results is None: - return dict(tests=tests, columns=columns) - - for r in results: - if r.table == table: - entry = r.to_result_entry() - if r.column: - if r.column not in columns: - columns[r.column] = [] - columns[r.column].append(entry) - else: - tests.append(entry) - - return dict(tests=tests, columns=columns) - - class PreRunValidatingResult(Enum): OK = 0 ERROR = 1 @@ -329,7 +241,6 @@ class PreRunValidatingResult(Enum): def _pre_run_validating(ds: DataSource) -> (PreRunValidatingResult, Exception): - console = Console() err = ds.verify_connector() if err: return PreRunValidatingResult.FAILED_TO_LOAD_CONNECTOR, err @@ -339,30 +250,9 @@ def _pre_run_validating(ds: DataSource) -> (PreRunValidatingResult, Exception): except Exception as err: return PreRunValidatingResult.FAILED_TO_CONNECT_DATASOURCE, err - stop_runner = _validate_assertions(console) - if stop_runner: - return PreRunValidatingResult.ERROR, None - return PreRunValidatingResult.OK, None -def _validate_assertions(console: Console): - assertion_engine = AssertionEngine(None) - assertion_engine.load_all_assertions_for_validation() - results = assertion_engine.validate_assertions() - # if results - for result in results: - # result - console.print(f' [[bold red]FAILED[/bold red]] {result.as_user_report()}') - - if results: - # stop runner - return True - - # continue to run profiling - return False - - def prepare_default_output_path(filesystem: ReportDirectory, created_at, ds): latest_symlink_path = os.path.join(filesystem.get_output_dir(), 'latest') latest_source = f"{ds.name}-{convert_to_tzlocal(created_at).strftime('%Y%m%d%H%M%S')}" @@ -416,27 +306,7 @@ def _clean_up_profile_null_properties(table_results): del table_results['columns'][r['col']][r['key']] -def _append_descriptions_from_assertion(profile_result): - engine = AssertionEngine(None) - engine.load_assertion_content() - for table_name, table_v in engine.assertions_content.items(): - if table_name not in profile_result['tables'] or table_v is None: - continue - table_desc = table_v.get('description', '') - if table_desc: - profile_result['tables'][table_name]['description'] = f'{table_desc}' - - columns_content = table_v.get('columns') if table_v.get('columns') else {} - for column_name, column_v in columns_content.items(): - if column_name not in profile_result['tables'][table_name]['columns'] or column_v is None: - continue - column_desc = column_v.get('description', '') - if column_desc: - profile_result['tables'][table_name]['columns'][column_name][ - 'description'] = f'{column_desc}' - - -def _analyse_run_event(event_payload: RunEventPayload, profiled_result, assertion_results, dbt_test_results): +def _analyse_run_event(event_payload: RunEventPayload, profiled_result, dbt_test_results): tables = profiled_result.get('tables', []) tables = {k: v for k, v in tables.items() if v} event_payload.tables = len(tables) @@ -449,21 +319,6 @@ def _analyse_run_event(event_payload: RunEventPayload, profiled_result, assertio # null row_count when the table is not profiled event_payload.rows.append(table.get('row_count')) - # Count PipeRider assertions - for r in assertion_results or []: - if r.is_builtin: - event_payload.build_in_assertions += 1 - else: - event_payload.custom_assertions += 1 - - if RECOMMENDED_ASSERTION_TAG in r.tags: - event_payload.recommended_assertions += 1 - - if r.result.status(): - event_payload.passed_assertions += 1 - else: - event_payload.failed_assertions += 1 - # Count dbt-test cases if dbt_test_results: for r in dbt_test_results: @@ -486,18 +341,6 @@ def decorate_with_metadata(profile_result: dict): profile_result['metadata_version'] = schema_version() -def _check_assertion_status(assertion_results, assertion_exceptions): - if assertion_exceptions and len(assertion_exceptions) > 0: - return False - - if assertion_results: - for assertion in assertion_results: - if not assertion.result.status(): - return False - - return True - - def check_dbt_manifest_compatibility(ds: DataSource, dbt_state_dir: str): database = ds.get_database() schema = ds.get_schema() @@ -664,10 +507,7 @@ def exec(datasource=None, table=None, output=None, skip_report=False, dbt_target return 1 result, err = _pre_run_validating(ds) - if result is PreRunValidatingResult.ERROR: - console.print('\n\n[[bold red]ERROR[/bold red]] Stop profiling, please fix the syntax errors above.') - return 1 - elif result is PreRunValidatingResult.FAILED_TO_LOAD_CONNECTOR: + if result is PreRunValidatingResult.FAILED_TO_LOAD_CONNECTOR: if skip_datasource_connection is False: if isinstance(err, PipeRiderConnectorUnsupportedError): console.print(f'[[bold red]Error[/bold red]] {err}') @@ -798,26 +638,14 @@ def filter_fn(subject: ProfileSubject): ).execute() # TODO: refactor input unused arguments - if skip_datasource_connection: - assertion_results, assertion_exceptions = [], [] - else: - assertion_results, assertion_exceptions = _execute_assertions(console, engine, ds.name, output, - profiler_result, created_at) - # Assertion - event_payload.step = 'assertion' + # DBT Test + event_payload.step = 'dbt test' run_result['tests'] = [] - if assertion_results or dbt_test_results: - console.rule('Assertion Results') - if dbt_test_results: - console.rule('dbt') - _show_dbt_test_result(dbt_test_results) - run_result['tests'].extend(dbt_test_results) - if assertion_results: - console.rule('PipeRider') - if assertion_results: - _show_assertion_result(assertion_results, assertion_exceptions) - run_result['tests'].extend([r.to_result_entry() for r in assertion_results]) + if dbt_test_results: + console.rule('DBT Test Results') + _show_dbt_test_result(dbt_test_results) + run_result['tests'].extend(dbt_test_results) if not table: if dbt_config: @@ -845,7 +673,6 @@ def _slim_dbt_manifest(manifest): if dbt_config: dbtutil.append_descriptions(run_result, dbt_target_path) - _append_descriptions_from_assertion(run_result) # Generate report event_payload.step = 'report' @@ -881,11 +708,7 @@ def _slim_dbt_manifest(manifest): if skip_report: console.print(f'Results saved to {output if output else output_path}') - _analyse_run_event(event_payload, run_result, assertion_results, dbt_test_results) - - # The assertion is deprecated. We should not run failed event the dbt test failed. - # if not _check_assertion_status(assertion_results, assertion_exceptions): - # return EC_ERR_TEST_FAILED + _analyse_run_event(event_payload, run_result, dbt_test_results) if len(subjects) == 0 and len(run_result.get('metrics', [])) == 0 and not skip_datasource_connection: return EC_WARN_NO_PROFILED_MODULES diff --git a/piperider_cli/validator.py b/piperider_cli/validator.py index 01442e9fa..5795e1fa3 100644 --- a/piperider_cli/validator.py +++ b/piperider_cli/validator.py @@ -1,12 +1,10 @@ import io import platform from abc import ABCMeta, abstractmethod -from typing import List from rich.console import Console, _STD_STREAMS from rich.markup import escape -from piperider_cli.assertion_engine import AssertionEngine, ValidationResult from piperider_cli.cloud import PipeRiderCloud from piperider_cli.configuration import Configuration, FileSystem from piperider_cli.error import PipeRiderError @@ -152,36 +150,6 @@ def check_function(self, configurator: Configuration) -> (bool, str): return all_passed, reason -class CheckAssertionFiles(AbstractChecker): - def check_function(self, configurator: Configuration) -> (bool, str): - engine = AssertionEngine(None) - passed_files, failed_files = engine.load_all_assertions_for_validation() - results: List[ValidationResult] = engine.validate_assertions() - - for file in passed_files: - self.console.print(f' {file}: [[bold green]OK[/bold green]]') - - for file in failed_files: - self.console.print(f' {file}: [[bold red]FAILED[/bold red]]') - - newline_section = False - validate_fail = False - error_msg = '' - for result in results: - if result.has_errors(): - if not newline_section: - self.console.line() - newline_section = True - self.console.print(f' [[bold red]FAILED[/bold red]] {result.as_user_report()}') - validate_fail = True - - if validate_fail or len(failed_files): - error_msg = 'Syntax problem of PipeRider assertion yaml files' - self.console.line() - - return error_msg == '', error_msg - - class CloudAccountChecker(AbstractChecker): def check_function(self, configurator: Configuration) -> (bool, str): if not piperider_cloud.available: @@ -205,7 +173,6 @@ def diagnose(dbt_profile: str = None, dbt_target: str = None): handler.set_checker('config files', CheckConfiguration) handler.set_checker('format of data sources', CheckDataSources) handler.set_checker('connections', CheckConnections) - handler.set_checker('assertion files', CheckAssertionFiles) if piperider_cloud.has_configured(): handler.set_checker('cloud account', CloudAccountChecker) return handler.execute() diff --git a/tests/_user_defined_assertion_functions.py b/tests/_user_defined_assertion_functions.py deleted file mode 100644 index 780b71752..000000000 --- a/tests/_user_defined_assertion_functions.py +++ /dev/null @@ -1,20 +0,0 @@ -from piperider_cli.assertion_engine.assertion import AssertionContext, AssertionResult, ValidationResult -from piperider_cli.assertion_engine.types import BaseAssertionType, register_assertion_function - - -class UserDefinedTests(BaseAssertionType): - def name(self): - return 'user-defined-test-test' - - def execute(self, context: AssertionContext) -> AssertionResult: - context.result.actual = 'I see you' - context.result._expected = dict(magic_number=5566) - return context.result.success() - - def validate(self, context: AssertionContext) -> ValidationResult: - result = ValidationResult(context) - result.errors.append('explain to users why this broken') - return result - - -register_assertion_function(UserDefinedTests) diff --git a/tests/test_builtin_assertions.py b/tests/test_builtin_assertions.py deleted file mode 100644 index 9f6183a3b..000000000 --- a/tests/test_builtin_assertions.py +++ /dev/null @@ -1,255 +0,0 @@ -import json -import os -import tempfile -from unittest import TestCase - -from sqlalchemy import create_engine, MetaData, Table, Column, Integer - -from piperider_cli.assertion_engine import AssertionEngine - - -def build_test_assertions(assertion_config_text: str): - assertions_dir = None - with tempfile.NamedTemporaryFile() as tmp: - assertions_dir = os.path.join(tmp.name, 'assertions') - os.makedirs(assertions_dir, exist_ok=True) - - with open(os.path.join(assertions_dir, "assertions.yml"), "w") as fh: - fh.write(assertion_config_text) - return assertions_dir - - -def build_assertion_engine(table, profiler_result, assertions): - db_engine = create_engine('sqlite://') - metadata = MetaData() - Table(table, metadata, Column('misc', Integer)) - metadata.create_all(db_engine) - - engine = AssertionEngine(db_engine, build_test_assertions(assertions)) - engine.load_assertions(profiler_result=profiler_result, config_path=None) - return engine - - -class BuiltinAssertionsTests(TestCase): - - def setUp(self) -> None: - # load metrics - metrics_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), "orders_1k.json") - with open(metrics_file) as fh: - self.metrics = json.loads(fh.read()) - - def test_assert_row_count_in_range2(self): - assertions = """ - orders_1k: # Table Name - # Test Cases for Table - tests: - - name: assert_row_count_in_range - assert: - count: [1000, 200000.5] - tags: - - OPTIONAL - """ - engine = build_assertion_engine('orders_1k', self.metrics, assertions) - results = engine.validate_assertions() - - self.assertEqual(1, len(results)) - - self.assertEqual('Found assertion syntax problem => name: assert_row_count_in_range for table ' - 'orders_1k\n' - "ERROR: count parameter should be one of the types {}, input: " - '[1000, 200000.5]', - results[0].as_internal_report()) - - def test_assert_row_count_in_range(self): - assertions = """ - orders_1k: # Table Name - # Test Cases for Table - tests: - - name: assert_row_count_in_range - assert: - count: [1000, 200000] - tags: - - OPTIONAL - """ - engine = build_assertion_engine('orders_1k', self.metrics, assertions) - - results, exceptions = engine.evaluate_all() - self.assertEqual([], exceptions) - - assertion_result = results[0].result - self.assertEqual(dict(success=True, exceptions=None), - dict(success=assertion_result._success, exceptions=assertion_result._exception)) - - def test_assert_column_type(self): - assertions = """ - orders_1k: # Table Name - # Test Cases for Table - columns: - o_totalprice: - tests: - - name: assert_column_type - assert: - type: numeric - tags: - - OPTIONAL - o_orderdate: - tests: - - name: assert_column_type - assert: - type: datetime - tags: - - OPTIONAL - o_orderpriority: - tests: - - name: assert_column_type - assert: - type: string - tags: - - OPTIONAL - """ - engine = build_assertion_engine('orders_1k', self.metrics, assertions) - - results, exceptions = engine.evaluate_all() - self.assertEqual([], exceptions) - - for result in results: - assertion_result = result.result - self.assertEqual(dict(success=True, exceptions=None), - dict(success=assertion_result._success, exceptions=assertion_result._exception)) - - def test_assert_column_in_range(self): - assertions = """ - orders_1k: # Table Name - # Test Cases for Table - columns: - o_totalprice: - tests: - - name: assert_column_min_in_range - assert: - min: [1000, 1200] - tags: - - OPTIONAL - - name: assert_column_max_in_range - assert: - max: [440000, 450000] - tags: - - OPTIONAL - - name: assert_column_in_range - assert: - range: [0, 450000] - tags: - - OPTIONAL - """ - engine = build_assertion_engine('orders_1k', self.metrics, assertions) - - results, exceptions = engine.evaluate_all() - self.assertEqual([], exceptions) - - for result in results: - assertion_result = result.result - self.assertEqual(dict(success=True, exceptions=None), - dict(success=assertion_result._success, exceptions=assertion_result._exception)) - - def test_assert_column_not_null(self): - assertions = """ - orders_1k: # Table Name - # Test Cases for Table - columns: - o_totalprice: - tests: - - name: assert_column_not_null - tags: - - OPTIONAL - """ - engine = build_assertion_engine('orders_1k', self.metrics, assertions) - - results, exceptions = engine.evaluate_all() - self.assertEqual([], exceptions) - - for result in results: - assertion_result = result.result - self.assertEqual(dict(success=True, exceptions=None), - dict(success=assertion_result._success, exceptions=assertion_result._exception)) - - def test_assert_column_unique(self): - assertions = """ - orders_1k: # Table Name - # Test Cases for Table - columns: - o_totalprice: - tests: - - name: assert_column_unique - tags: - - OPTIONAL - """ - engine = build_assertion_engine('orders_1k', self.metrics, assertions) - - results, exceptions = engine.evaluate_all() - self.assertEqual([], exceptions) - - for result in results: - assertion_result = result.result - self.assertEqual(dict(success=True, exceptions=None), - dict(success=assertion_result._success, exceptions=assertion_result._exception)) - - def test_assert_column_exist(self): - assertions = """ - orders_1k: # Table Name - # Test Cases for Table - columns: - o_totalprice: - tests: - - name: assert_column_exist - tags: - - OPTIONAL - """ - engine = build_assertion_engine('orders_1k', self.metrics, assertions) - - results, exceptions = engine.evaluate_all() - self.assertEqual([], exceptions) - - for result in results: - assertion_result = result.result - self.assertEqual(dict(success=True, exceptions=None), - dict(success=assertion_result._success, exceptions=assertion_result._exception)) - - def test_assert_column_value_range(self): - assertions = """ - orders_1k: # Table Name - columns: - o_totalprice: - tests: - - name: assert_column_value - assert: - lte: 440269.51 - gte: 1106.99 - tags: - - OPTIONAL - """ - engine = build_assertion_engine('orders_1k', self.metrics, assertions) - results, exceptions = engine.evaluate_all() - self.assertEqual([], exceptions) - - assertion_result = results[0].result - self.assertEqual(dict(success=True, exceptions=None), - dict(success=assertion_result._success, exceptions=assertion_result._exception)) - - def test_assert_column_value_set(self): - assertions = """ - orders_1k: # Table Name - columns: - o_orderstatus: - tests: - - name: assert_column_value - assert: - in: ['O', 'F', 'P'] - tags: - - OPTIONAL - """ - engine = build_assertion_engine('orders_1k', self.metrics, assertions) - results, exceptions = engine.evaluate_all() - self.assertEqual([], exceptions) - - assertion_result = results[0].result - self.assertEqual(dict(success=True, exceptions=None), - dict(success=assertion_result._success, exceptions=assertion_result._exception)) diff --git a/tests/test_builtin_validations.py b/tests/test_builtin_validations.py deleted file mode 100644 index 3fed7fb98..000000000 --- a/tests/test_builtin_validations.py +++ /dev/null @@ -1,344 +0,0 @@ -from io import StringIO -from unittest import TestCase - -from piperider_cli import yaml as pyml - -from piperider_cli.assertion_engine import AssertionEngine - - -def _(assertion_content: str): - def content_provider(self): - self.assertions_content = pyml.safe_load(StringIO(assertion_content)) - return [], [] - - return content_provider - - -class BuiltinValidationTests(TestCase): - - def setUp(self) -> None: - self.engine = AssertionEngine(None) - self.origin_function = AssertionEngine.load_assertion_content - - def tearDown(self) -> None: - AssertionEngine.load_assertion_content = self.origin_function - - def test_validation_assert_row_count_in_range(self): - # test with valid format - AssertionEngine.load_assertion_content = _(""" - orders_1k: # Table Name - # Test Cases for Table - tests: - - name: assert_row_count_in_range - assert: - count: [1000, 200000] - tags: - - OPTIONAL - """) - - # expect no errors and warnings - self.engine.load_all_assertions_for_validation() - results = self.engine.validate_assertions() - self.assertListEqual([], results) - - def test_validation_assert_row_count_in_range_no_args(self): - # test with valid format - AssertionEngine.load_assertion_content = _(""" - orders_1k: # Table Name - # Test Cases for Table - tests: - - name: assert_row_count_in_range - assert: - # it should be "count" - range: [1000, 200000] - tags: - - OPTIONAL - """) - - # expect no errors and warnings - self.engine.load_all_assertions_for_validation() - results = self.engine.validate_assertions() - self.assertEqual(1, len(results)) - - self.assertEqual("""Found assertion syntax problem => name: assert_row_count_in_range for table orders_1k -ERROR: count parameter is required -ERROR: count parameter should be a list""", results[0].as_internal_report()) - - def test_validation_assert_row_count_in_range_invalid_args(self): - # test with valid format - AssertionEngine.load_assertion_content = _(""" - orders_1k: # Table Name - # Test Cases for Table - tests: - - name: assert_row_count_in_range - assert: - count: [1000, 200000, 2] - tags: - - OPTIONAL - """) - - # expect no errors and warnings - self.engine.load_all_assertions_for_validation() - results = self.engine.validate_assertions() - self.assertEqual(1, len(results)) - - self.assertEqual("""Found assertion syntax problem => name: assert_row_count_in_range for table orders_1k -ERROR: count parameter should contain two values""", results[0].as_internal_report()) - - def test_validation_assert_row_count(self): - # test with valid format with min only - AssertionEngine.load_assertion_content = _(""" - orders_1k: # Table Name - # Test Cases for Table - tests: - - name: assert_row_count - assert: - min: 10 - tags: - - OPTIONAL - """) - - # expect no errors and warnings - self.engine.load_all_assertions_for_validation() - results = self.engine.validate_assertions() - self.assertListEqual([], results) - - # test with valid format with max only - AssertionEngine.load_assertion_content = _(""" - orders_1k: # Table Name - # Test Cases for Table - tests: - - name: assert_row_count - assert: - max: 10 - tags: - - OPTIONAL - """) - - # expect no errors and warnings - self.engine.load_all_assertions_for_validation() - results = self.engine.validate_assertions() - self.assertListEqual([], results) - - # test with valid format with min and max - AssertionEngine.load_assertion_content = _(""" - orders_1k: # Table Name - # Test Cases for Table - tests: - - name: assert_row_count - assert: - min: 10 - max: 100 - tags: - - OPTIONAL - """) - - # expect no errors and warnings - self.engine.load_all_assertions_for_validation() - results = self.engine.validate_assertions() - self.assertListEqual([], results) - - def test_validation_assert_row_count_invalid_args(self): - # test with invalid syntax - AssertionEngine.load_assertion_content = _(""" - orders_1k: # Table Name - # Test Cases for Table - tests: - - name: assert_row_count - assert: - x: 10 - tags: - - OPTIONAL - """) - - # expect no errors and warnings - self.engine.load_all_assertions_for_validation() - results = self.engine.validate_assertions() - self.assertEqual(1, len(results)) - self.assertEqual("""Found assertion syntax problem => name: assert_row_count for table orders_1k -ERROR: There should contain any parameter names in ['min', 'max']""", results[0].as_internal_report()) - - # test with invalid syntax: max < min - AssertionEngine.load_assertion_content = _(""" - orders_1k: # Table Name - # Test Cases for Table - tests: - - name: assert_row_count - assert: - min: 100 - max: 1 - tags: - - OPTIONAL - """) - - # expect no errors and warnings - self.engine.load_all_assertions_for_validation() - results = self.engine.validate_assertions() - self.assertEqual(1, len(results)) - - self.assertEqual("""Found assertion syntax problem => name: assert_row_count for table orders_1k -ERROR: The max value should be greater than or equal to the min value.""", results[0].as_internal_report()) - - # test with invalid syntax: not int - AssertionEngine.load_assertion_content = _(""" - orders_1k: # Table Name - # Test Cases for Table - tests: - - name: assert_row_count - assert: - min: 1.0 - tags: - - OPTIONAL - """) - - # expect no errors and warnings - self.engine.load_all_assertions_for_validation() - results = self.engine.validate_assertions() - self.assertEqual(1, len(results)) - - self.assertEqual("""Found assertion syntax problem => name: assert_row_count for table orders_1k -ERROR: min parameter should be a int value""", results[0].as_internal_report()) - - def test_validation_assert_column_not_null(self): - self._test_no_args_assertion('assert_column_not_null') - - def test_validation_assert_column_null(self): - self._test_no_args_assertion('assert_column_null') - - def test_validation_assert_column_unique(self): - self._test_no_args_assertion('assert_column_unique') - - def test_validation_assert_column_exist(self): - self._test_no_args_assertion('assert_column_exist') - - def _test_no_args_assertion(self, function_name): - # test with no problem syntax - AssertionEngine.load_assertion_content = _(f""" - orders_1k: # Table Name - # Test Cases for Table - columns: - foobarbar: - tests: - - name: {function_name} - """) - # expect no errors and warnings - self.engine.load_all_assertions_for_validation() - results = self.engine.validate_assertions() - self.assertListEqual([], results) - # test with error syntax - AssertionEngine.load_assertion_content = _(f""" - orders_1k: # Table Name - # Test Cases for Table - columns: - foobarbar: - tests: - - name: {function_name} - assert: - foo: bar - """) - # expect no errors and warnings - self.engine.load_all_assertions_for_validation() - results = self.engine.validate_assertions() - self.assertEqual(1, len(results)) - self.assertEqual(f"""Found assertion syntax problem => name: {function_name} for table orders_1k and column foobarbar -ERROR: parameters are not allowed""", results[0].as_internal_report()) - - def test_validation_assert_column_min_in_range(self): - # test with valid format - AssertionEngine.load_assertion_content = _(""" - orders_1k: # Table Name - # Test Cases for Table - columns: - foobarbar: - tests: - - name: assert_column_min_in_range - assert: - min: [10, 30] - """) - - # expect no errors and warnings - self.engine.load_all_assertions_for_validation() - results = self.engine.validate_assertions() - self.assertListEqual([], results) - - def test_validation_assert_column_min_in_range_date(self): - # test with valid format - AssertionEngine.load_assertion_content = _(""" - orders_1k: # Table Name - # Test Cases for Table - columns: - foobarbar: - tests: - - name: assert_column_min_in_range - assert: - min: [2022-05-20, 2022-05-31] - """) - - # expect no errors and warnings - self.engine.load_all_assertions_for_validation() - results = self.engine.validate_assertions() - self.assertListEqual([], results) - - def test_validation_assert_column_min_in_range_no_args(self): - # test with valid format - AssertionEngine.load_assertion_content = _(""" - orders_1k: # Table Name - # Test Cases for Table - columns: - foobarbar: - tests: - - name: assert_column_min_in_range - assert: - # it should be "min" - max: [10, 100] - """) - - # expect no errors and warnings - self.engine.load_all_assertions_for_validation() - results = self.engine.validate_assertions() - self.assertEqual(1, len(results)) - - self.assertEqual("""Found assertion syntax problem => name: assert_column_min_in_range for table orders_1k and column foobarbar -ERROR: min parameter is required""", results[0].as_internal_report()) - - def test_validation_assert_column_min_in_range_invalid_args(self): - # test with valid format - AssertionEngine.load_assertion_content = _(""" - orders_1k: # Table Name - # Test Cases for Table - columns: - foobarbar: - tests: - - name: assert_column_min_in_range - assert: - min: [10, 2022-05-23] - """) - - # expect no errors and warnings - self.engine.load_all_assertions_for_validation() - results = self.engine.validate_assertions() - self.assertEqual(1, len(results)) - - self.assertEqual("""Found assertion syntax problem => name: assert_column_min_in_range for table orders_1k and column foobarbar -ERROR: min parameter should be the same types""", results[0].as_internal_report()) - - def test_validation_assert_no_such_assertion(self): - # test with valid format - AssertionEngine.load_assertion_content = _(""" - orders_1k: # Table Name - # Test Cases for Table - tests: - - name: there_is_no_such_assertion - assert: - count: [1000, 200000] - tags: - - OPTIONAL - """) - - # expect no errors and warnings - self.engine.load_all_assertions_for_validation() - results = self.engine.validate_assertions() - self.assertEqual(1, len(results)) - self.assertEqual("""Found assertion syntax problem => name: there_is_no_such_assertion for table orders_1k -ERROR: cannot find an assertion by name 'there_is_no_such_assertion'""", - results[0].as_internal_report()) diff --git a/tests/test_builtin_validations_column_types.py b/tests/test_builtin_validations_column_types.py deleted file mode 100644 index 4f7829bca..000000000 --- a/tests/test_builtin_validations_column_types.py +++ /dev/null @@ -1,136 +0,0 @@ -from io import StringIO -from unittest import TestCase - -from piperider_cli import yaml as pyml -from piperider_cli.assertion_engine import AssertionEngine - - -def _(assertion_content: str): - def content_provider(self): - self.assertions_content = pyml.safe_load(StringIO(assertion_content)) - return [], [] - - return content_provider - - -class BuiltinValidationColumnTypesTests(TestCase): - - def setUp(self) -> None: - self.engine = AssertionEngine(None) - self.origin_function = AssertionEngine.load_assertion_content - - def tearDown(self) -> None: - AssertionEngine.load_assertion_content = self.origin_function - - def test_validation_assert_column_schema_type(self): - # test with valid format - AssertionEngine.load_assertion_content = _(""" - column_types: - columns: - column_a: - tests: - - name: assert_column_schema_type - assert: - schema_type: JSON - tags: - - OPTIONAL - """) - - self.engine.load_all_assertions_for_validation() - results = self.engine.validate_assertions() - self.assertListEqual([], results) - - # test with invalid format - AssertionEngine.load_assertion_content = _(""" - column_types: - columns: - column_a: - tests: - - name: assert_column_schema_type - tags: - - OPTIONAL - """) - - self.engine.load_all_assertions_for_validation() - results = self.engine.validate_assertions() - self.assertEqual(results[0].as_internal_report(), r""" -Found assertion syntax problem => name: assert_column_schema_type for table column_types and column column_a -ERROR: schema_type parameter is required -ERROR: schema_type parameter should be a value - """.strip()) - - def test_validation_assert_column_type(self): - # test with valid format - AssertionEngine.load_assertion_content = _(""" - column_types: - columns: - column_a: - tests: - - name: assert_column_type - assert: - type: string - tags: - - OPTIONAL - """) - - self.engine.load_all_assertions_for_validation() - results = self.engine.validate_assertions() - self.assertListEqual([], results) - - # test with invalid format - AssertionEngine.load_assertion_content = _(""" - column_types: - columns: - column_a: - tests: - - name: assert_column_type - assert: - type: foobarbar - tags: - - OPTIONAL - """) - - self.engine.load_all_assertions_for_validation() - results = self.engine.validate_assertions() - self.assertEqual(results[0].as_internal_report(), r""" -Found assertion syntax problem => name: assert_column_type for table column_types and column column_a -ERROR: type parameter should be one of ['string', 'integer', 'numeric', 'datetime', 'date', 'time', 'boolean', 'other'], input: foobarbar - """.strip()) - - def test_validation_assert_column_in_types(self): - # test with valid format - AssertionEngine.load_assertion_content = _(""" - column_types: - columns: - column_a: - tests: - - name: assert_column_in_types - assert: - types: [string] - tags: - - OPTIONAL - """) - - self.engine.load_all_assertions_for_validation() - results = self.engine.validate_assertions() - self.assertListEqual([], results) - - # test with invalid format - AssertionEngine.load_assertion_content = _(""" - column_types: - columns: - column_a: - tests: - - name: assert_column_in_types - assert: - types: [foobarbar] - tags: - - OPTIONAL - """) - - self.engine.load_all_assertions_for_validation() - results = self.engine.validate_assertions() - self.assertEqual(results[0].as_internal_report(), r""" -Found assertion syntax problem => name: assert_column_in_types for table column_types and column column_a -ERROR: types parameter should be one of ['string', 'integer', 'numeric', 'datetime', 'date', 'time', 'boolean', 'other'], input: ['foobarbar'] - """.strip()) diff --git a/tests/test_docs_existing.py b/tests/test_docs_existing.py deleted file mode 100644 index 4145970ad..000000000 --- a/tests/test_docs_existing.py +++ /dev/null @@ -1,30 +0,0 @@ -import ast -import glob -import os -from unittest import TestCase - - -class DoYouWriteTheDocsTests(TestCase): - - def test_find_all_builtins(self): - import piperider_cli - - search_root = os.path.dirname(piperider_cli.__file__) - docs_root = os.path.abspath(os.path.join(search_root, '../docs/assertions')) - not_founds = [] - - from piperider_cli.assertion_engine.types import custom_registry - assertion_functions = custom_registry.keys() - - for func in assertion_functions: - # skip this due to docs structure change - if func == 'assert_column_value': - continue - # check docs - docs_path = os.path.join(docs_root, f'{func}.md') - if not os.path.exists(docs_path): - not_founds.append((func, docs_path)) - - for func, filename in not_founds: - print(f'require docs for function [{func}] at {filename}') - self.assertEqual([], not_founds) diff --git a/tests/test_user_defined_assertions.py b/tests/test_user_defined_assertions.py deleted file mode 100644 index 3a5ccd002..000000000 --- a/tests/test_user_defined_assertions.py +++ /dev/null @@ -1,127 +0,0 @@ -import os -import shutil -import tempfile -import uuid -from unittest import TestCase, skip - -from sqlalchemy import create_engine, MetaData, Table, Column, Integer - -from piperider_cli.assertion_engine import AssertionEngine -from piperider_cli.configuration import FileSystem - - -def prepare_project_structure(): - from piperider_cli.initializer import _generate_piperider_workspace - - with tempfile.NamedTemporaryFile() as tmp: - # tmp will be removed after exiting context - pass - os.makedirs(tmp.name, exist_ok=True) - os.chdir(tmp.name) - _generate_piperider_workspace() - - return tmp.name - - -def generate_random_directory(): - with tempfile.NamedTemporaryFile() as tmp: - pass - os.makedirs(tmp.name, exist_ok=True) - return tmp.name - - -def build_assertion_engine(project_dir, table, assertions): - assertion_file = f'{uuid.uuid4().hex}.yml' - assertion_path = os.path.join(f'{project_dir}/{FileSystem.PIPERIDER_WORKSPACE_NAME}/assertions', - assertion_file) - if not os.path.exists(os.path.dirname(assertion_path)): - os.makedirs(os.path.dirname(assertion_path)) - - with open(assertion_path, 'w') as fh: - fh.write(assertions) - - db_engine = create_engine('sqlite://') - metadata = MetaData() - Table(table, metadata, Column('misc', Integer)) - metadata.create_all(db_engine) - - engine = AssertionEngine(db_engine, project_dir) - engine.load_assertions(config_path=None) - return engine - - -@skip("deprecated, skipping") -class UserDefinedTestAssertionsTests(TestCase): - - def setUp(self) -> None: - self.project_dir = prepare_project_structure() - - # reset the plugin path for testing - FileSystem.PIPERIDER_ASSERTION_PLUGIN_PATH = os.path.join(f'{self.project_dir}', - FileSystem.PIPERIDER_WORKSPACE_NAME, - 'plugins') - - if not os.path.exists(FileSystem.PIPERIDER_ASSERTION_PLUGIN_PATH): - os.makedirs(FileSystem.PIPERIDER_ASSERTION_PLUGIN_PATH) - - self.current_module_name = '_user_defined_assertion_functions.py' - self.custom_asertions = os.path.join(os.path.dirname(__file__), self.current_module_name) - - def test_user_defined_test_from_default_plugin_path(self): - # put the user defined test function - shutil.copyfile(self.custom_asertions, os.path.join(f'{FileSystem.PIPERIDER_ASSERTION_PLUGIN_PATH}', - f'{self.current_module_name}')) - - # use the function in the assertion configuration - assertions = f""" - foobarbar: - tests: - - name: user-defined-test-test - assert: - param1: a - param2: b - """ - engine = build_assertion_engine(self.project_dir, 'foobarbar', assertions) - - # invoke customized assertion - results, exceptions = engine.evaluate_all() - - # check no exceptions - self.assertEqual([], exceptions) - - # check result - assertion_result = results[0].result - self.assertEqual(dict(success=True, exceptions=None), - dict(success=assertion_result._success, exceptions=assertion_result._exception)) - self.assertEqual(assertion_result.actual, 'I see you') - self.assertEqual(assertion_result.expected, {'magic_number': 5566}) - - def test_user_defined_test_invoked_from_env_PIPERIDER_PLUGINS(self): - random_dir = generate_random_directory() - - os.environ['PIPERIDER_PLUGINS'] = random_dir - print("PIPERIDER_PLUGINS => ", os.environ['PIPERIDER_PLUGINS']) - # copy this file to PIPERIDER_PLUGINS - shutil.copyfile(self.custom_asertions, - os.path.join(random_dir, f'{self.current_module_name}')) - - # use the function in the assertion configuration - assertions = f""" - foobarbar: - tests: - - name: user-defined-test-test - assert: - param1: a - param2: b - """ - engine = build_assertion_engine(self.project_dir, 'foobarbar', assertions) - - # invoke customized assertion - results, exceptions = engine.evaluate_all() - self.assertEqual([], exceptions) - - assertion_result = results[0].result - self.assertEqual(dict(success=True, exceptions=None), - dict(success=assertion_result._success, exceptions=assertion_result._exception)) - self.assertEqual(assertion_result.actual, 'I see you') - self.assertEqual(assertion_result.expected, {'magic_number': 5566})