From fe4ad8b8c471109e7765ca81059da413798c2d34 Mon Sep 17 00:00:00 2001 From: joaolug Date: Wed, 1 Jun 2022 13:55:01 -0300 Subject: [PATCH 1/7] Including failed_rows_processor to sodaspark - Allowing users to use the FailedRowsProcessor feature by passing it in the execute method --- src/sodaspark/scan.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/sodaspark/scan.py b/src/sodaspark/scan.py index 79ce6a6..f8d3d3c 100644 --- a/src/sodaspark/scan.py +++ b/src/sodaspark/scan.py @@ -9,6 +9,7 @@ from pyspark.sql import types as T # noqa: N812 from sodasql.common.yaml_helper import YamlHelper from sodasql.dialects.spark_dialect import SparkDialect +from sodasql.scan.failed_rows_processor import FailedRowsProcessor from sodasql.scan.file_system import FileSystemSingleton from sodasql.scan.measurement import Measurement from sodasql.scan.scan import Scan @@ -255,6 +256,7 @@ def create_scan( warehouse_name: str = "sodaspark", soda_server_client: SodaServerClient | None = None, time: str | None = None, + failed_rows_processor: FailedRowsProcessor | None = None, ) -> Scan: """ Create a scan object. @@ -285,6 +287,7 @@ def create_scan( soda_server_client=soda_server_client, variables=variables, time=time, + failed_rows_processor=failed_rows_processor, ) return scan @@ -430,6 +433,7 @@ def execute( soda_server_client: SodaServerClient | None = None, as_frames: bool | None = False, time: str | None = None, + failed_rows_processor: FailedRowsProcessor | None = None, ) -> ScanResult: """ Execute a scan on a data frame. @@ -463,6 +467,7 @@ def execute( soda_server_client=soda_server_client, time=time, warehouse_name=warehouse_name, + failed_rows_processor=failed_rows_processor, ) scan.execute() From a23c01f5f35256fdfb2c71f472fd867800d0131b Mon Sep 17 00:00:00 2001 From: joaolug Date: Wed, 1 Jun 2022 13:56:12 -0300 Subject: [PATCH 2/7] Adding docstring to failed_rows_processor --- src/sodaspark/scan.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/sodaspark/scan.py b/src/sodaspark/scan.py index f8d3d3c..35a187c 100644 --- a/src/sodaspark/scan.py +++ b/src/sodaspark/scan.py @@ -270,6 +270,8 @@ def create_scan( A soda server client. time: Optional[str] (default: None) Timestamp date in ISO8601 format. If None, use datatime.now() in ISO8601 format. + failed_rows_processor: Optional[FailedRowsProcessor] (default: None) + A FailedRowsProcessor implementation Returns ------- @@ -452,6 +454,8 @@ def execute( Flag to return results in Dataframe time: str (default : None) Timestamp date in ISO8601 format at the start of a scan + failed_rows_processor: Optional[FailedRowsProcessor] (default: None) + A FailedRowsProcessor implementation Returns ------- From 0cd3b21692b1b0ada7626ed7c9930e75c77aeda6 Mon Sep 17 00:00:00 2001 From: joaolug Date: Wed, 1 Jun 2022 14:01:58 -0300 Subject: [PATCH 3/7] Adding missing docstrings --- src/sodaspark/scan.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/sodaspark/scan.py b/src/sodaspark/scan.py index 35a187c..d813d69 100644 --- a/src/sodaspark/scan.py +++ b/src/sodaspark/scan.py @@ -265,7 +265,10 @@ def create_scan( ---------- scan_yml : ScanYml The scan yml. - variables: variables to be substituted in scan yml + variables: Optional[dict] (default: None) + variables to be substituted in scan yml + warehouse_name: Optional[str] (default: sodapsark) + The name of the warehouse soda_server_client : Optional[SodaServerClient] (default : None) A soda server client. time: Optional[str] (default: None) @@ -448,6 +451,8 @@ def execute( The data frame to be scanned. variables: Optional[dict] (default : None) Variables to be substituted in scan yml + warehouse_name: Optional[str] (default: sodapsark) + The name of the warehouse soda_server_client : Optional[SodaServerClient] (default : None) A soda server client. as_frames : bool (default : False) From abc73e3a3c0ab6f8788af0bdaf3210c0cdeaa031 Mon Sep 17 00:00:00 2001 From: joaolug Date: Wed, 22 Jun 2022 08:48:03 -0300 Subject: [PATCH 4/7] including unit tests for failed_row_processor --- tests/test_scan.py | 75 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/tests/test_scan.py b/tests/test_scan.py index abb17b8..34209e8 100644 --- a/tests/test_scan.py +++ b/tests/test_scan.py @@ -6,10 +6,13 @@ from typing import BinaryIO import pytest +from _pytest.capture import CaptureFixture from pyspark.sql import DataFrame, Row, SparkSession from pyspark.sql import functions as F # noqa: N812 from pyspark.sql import types as T # noqa: N812 +from pyspark.sql.types import IntegerType, StringType, StructField, StructType from sodasql.dialects.spark_dialect import SparkDialect +from sodasql.scan.failed_rows_processor import FailedRowsProcessor from sodasql.scan.group_value import GroupValue from sodasql.scan.measurement import Measurement from sodasql.scan.scan_error import TestExecutionScanError @@ -183,6 +186,22 @@ def df(spark_session: SparkSession) -> DataFrame: return df +class InMemoryFailedRowProcessor(FailedRowsProcessor): + def process(self, context: dict) -> dict: + + try: + print(context) + except Exception: + raise Exception + + return {"message": "All failed rows were printed in your terminal"} + + +@pytest.fixture +def failed_rows_processor() -> FailedRowsProcessor: + return InMemoryFailedRowProcessor() + + def test_create_scan_yml_table_name_is_demodata( scan_definition: str, ) -> None: @@ -507,3 +526,59 @@ def test_scan_execute_return_as_data_frame( (scan_result[1].count(), len(scan_result[1].columns)), (scan_result[2].count(), len(scan_result[2].columns)), ) + + +def test_failed_row_processor_return_correct_values( + spark_session: SparkSession, + failed_rows_processor: FailedRowsProcessor, + capsys: CaptureFixture, +) -> None: + + expected_output = [ + "{'sample_name': 'dataset', 'column_name': None, 'test_ids': None, " + "'sample_columns': [{'name': 'id', 'type': 'string'}, {'name': 'number', " + "'type': 'int'}], 'sample_rows': [['1', 100], ['2', 200], ['3', None], ['4', " + "400]], 'sample_description': 'my_table.sample', 'total_row_count': 4}", + "{'sample_name': 'missing', 'column_name': 'number', 'test_ids': " + '[\'{"column":"number","expression":"missing_count == 0"}\'], ' + "'sample_columns': [{'name': 'id', 'type': 'string'}, {'name': 'number', " + "'type': 'int'}], 'sample_rows': [['3', None]], 'sample_description': " + "'my_table.number.missing', 'total_row_count': 1}", + "", + ] + + data = [("1", 100), ("2", 200), ("3", None), ("4", 400)] + + schema = StructType( + [ + StructField("id", StringType(), True), + StructField("number", IntegerType(), True), + ] + ) + + df = spark_session.createDataFrame(data=data, schema=schema) + + scan_definition = """ + table_name: my_table + metric_groups: + - all + samples: + table_limit: 5 + failed_limit: 5 + tests: + - row_count > 0 + columns: + number: + tests: + - duplicate_count == 0 + - missing_count == 0 + """ + + scan.execute( + scan_definition=scan_definition, + df=df, + failed_rows_processor=failed_rows_processor, + ) + + out, err = capsys.readouterr() + assert expected_output == out.split("\n") From 2dcf5b16eb1fdbb1864caffb8c88a568ea8bb248 Mon Sep 17 00:00:00 2001 From: joaolug Date: Wed, 22 Jun 2022 17:16:48 -0300 Subject: [PATCH 5/7] Removing try/except flow since due to no use --- tests/test_scan.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/tests/test_scan.py b/tests/test_scan.py index 34209e8..10579c8 100644 --- a/tests/test_scan.py +++ b/tests/test_scan.py @@ -186,20 +186,17 @@ def df(spark_session: SparkSession) -> DataFrame: return df -class InMemoryFailedRowProcessor(FailedRowsProcessor): +class PrintFailedRowProcessor(FailedRowsProcessor): def process(self, context: dict) -> dict: - try: - print(context) - except Exception: - raise Exception + print(context) return {"message": "All failed rows were printed in your terminal"} @pytest.fixture def failed_rows_processor() -> FailedRowsProcessor: - return InMemoryFailedRowProcessor() + return PrintFailedRowProcessor() def test_create_scan_yml_table_name_is_demodata( From 9254165008f0241ad23c97b06093b446cba3dc6e Mon Sep 17 00:00:00 2001 From: joaolug Date: Wed, 22 Jun 2022 17:17:35 -0300 Subject: [PATCH 6/7] Adding test description --- tests/test_scan.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_scan.py b/tests/test_scan.py index 10579c8..30ea2ac 100644 --- a/tests/test_scan.py +++ b/tests/test_scan.py @@ -530,6 +530,7 @@ def test_failed_row_processor_return_correct_values( failed_rows_processor: FailedRowsProcessor, capsys: CaptureFixture, ) -> None: + """We expect the failed rows to show up in the system output.""" expected_output = [ "{'sample_name': 'dataset', 'column_name': None, 'test_ids': None, " From b7cd8afaddb52240812e2b3126231139e5ec7f29 Mon Sep 17 00:00:00 2001 From: joaolug Date: Wed, 22 Jun 2022 17:19:23 -0300 Subject: [PATCH 7/7] Fixing test assert --- tests/test_scan.py | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/tests/test_scan.py b/tests/test_scan.py index 30ea2ac..9fd6702 100644 --- a/tests/test_scan.py +++ b/tests/test_scan.py @@ -1,5 +1,6 @@ from __future__ import annotations +import ast import datetime as dt import json from dataclasses import dataclass @@ -525,25 +526,25 @@ def test_scan_execute_return_as_data_frame( ) -def test_failed_row_processor_return_correct_values( +def test_failed_rows_processor_return_correct_values( spark_session: SparkSession, failed_rows_processor: FailedRowsProcessor, capsys: CaptureFixture, ) -> None: """We expect the failed rows to show up in the system output.""" - expected_output = [ - "{'sample_name': 'dataset', 'column_name': None, 'test_ids': None, " - "'sample_columns': [{'name': 'id', 'type': 'string'}, {'name': 'number', " - "'type': 'int'}], 'sample_rows': [['1', 100], ['2', 200], ['3', None], ['4', " - "400]], 'sample_description': 'my_table.sample', 'total_row_count': 4}", - "{'sample_name': 'missing', 'column_name': 'number', 'test_ids': " - '[\'{"column":"number","expression":"missing_count == 0"}\'], ' - "'sample_columns': [{'name': 'id', 'type': 'string'}, {'name': 'number', " - "'type': 'int'}], 'sample_rows': [['3', None]], 'sample_description': " - "'my_table.number.missing', 'total_row_count': 1}", - "", - ] + expected_output = { + "sample_name": "missing", + "column_name": "number", + "test_ids": ['{"column":"number","expression":"missing_count == 0"}'], + "sample_columns": [ + {"name": "id", "type": "string"}, + {"name": "number", "type": "int"}, + ], + "sample_rows": [["3", None]], + "sample_description": "my_table.number.missing", + "total_row_count": 1, + } data = [("1", 100), ("2", 200), ("3", None), ("4", 400)] @@ -561,7 +562,6 @@ def test_failed_row_processor_return_correct_values( metric_groups: - all samples: - table_limit: 5 failed_limit: 5 tests: - row_count > 0 @@ -579,4 +579,4 @@ def test_failed_row_processor_return_correct_values( ) out, err = capsys.readouterr() - assert expected_output == out.split("\n") + assert expected_output == ast.literal_eval(out)