diff --git a/pyproject.toml b/pyproject.toml index e230c11..530b7e6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,6 +43,10 @@ ignore = [ "D401", # First line should be in imperative mood "D407", # Missing dashed underline after section "FA100", # Forbidden import + "S101", # Use of assert detected + "G004", # Logging statement uses string formatting + "TCH002", + "TCH003", ] select = ["ALL"] src = ["target_clickhouse"] diff --git a/target_clickhouse/sinks.py b/target_clickhouse/sinks.py index 433525f..adcd516 100644 --- a/target_clickhouse/sinks.py +++ b/target_clickhouse/sinks.py @@ -2,11 +2,13 @@ from __future__ import annotations +import logging from typing import Any, Iterable import jsonschema.exceptions as jsonschema_exceptions import simplejson as json import sqlalchemy +from jsonschema import ValidationError from pendulum import now from singer_sdk.sinks import SQLSink from sqlalchemy.sql.expression import bindparam @@ -150,20 +152,8 @@ def _validate_and_parse(self, record: dict) -> dict: try: self._validator.validate(record) except jsonschema_exceptions.ValidationError as e: - if "is not of type" in e.message and "'string'" in e.message: - self.logger.warning( - "Received non valid record for string type, " - "attempting forced conversion to string", - ) - for key, value in record.items(): - if isinstance(value, dict): - record[key] = json.dumps(value) - elif not isinstance(value, str): - record[key] = str(value) - self.logger.warning("Validating converted record") - self._validator.validate(record) - else: - raise + record = handle_validation_error(record, e, self.logger) + self._validator.validate(record) self._parse_timestamps_in_record( record=record, @@ -171,3 +161,34 @@ def _validate_and_parse(self, record: dict) -> dict: treatment=self.datetime_error_treatment, ) return record + + +def handle_validation_error(record, + e: ValidationError, + logger: logging.Logger | None = None): + if "'string'" in e.message: + if logger: + logger.warning( + f"Received non valid record for types 'string', {e.path}, " + f"attempting conversion for record, {record}", + ) + + # e.path is deque which is iterable, we convert it to list to access by index + key_path = list(e.path) + + # Access the problematic value using the key_path + current_level = record + for key in key_path[:-1]: # Go to the parent level of the problematic key + current_level = current_level[key] + + problem_key = key_path[-1] + problem_value = current_level[problem_key] + + # Convert the problematic value to string only if it's not null + if problem_value is not None: + current_level[problem_key] = str(problem_value) + if logger: + logger.warning("Validating converted record") + return record + return None + return None diff --git a/tests/test_validation.py b/tests/test_validation.py new file mode 100644 index 0000000..abb2158 --- /dev/null +++ b/tests/test_validation.py @@ -0,0 +1,130 @@ +import logging +from typing import Optional + +from jsonschema import Draft7Validator, ValidationError + +# Schema that allows a field to be either a string or null +schema = { + "type": "object", + "properties": { + "name": {"type": ["string", "null"]}, + "age": {"type": "number"}, + }, + "required": ["name", "age"], +} + +nested_schema = { + "type": "object", + "properties": { + "name": {"type": ["string", "null"]}, + "age": {"type": "number"}, + "address": { + "type": "object", + "properties": { + "street": {"type": "string"}, + "city": {"type": "string"}, + }, + "required": ["street", "city"], + }, + }, + "required": ["name", "age", "address"], +} + + +# Validator instance +validator = Draft7Validator(schema) + +# Function to handle validation errors +def handle_validation_error(record, + e: ValidationError, + logger: Optional[logging.Logger] = None): + if "'string'" in e.message: + if logger: + logger.warning( + f"Received non valid record for types 'string', {e.path}, " + f"attempting conversion for record, {record}", + ) + + + key_path = list(e.path) + + # Access the problematic value using the key_path + current_level = record + for key in key_path[:-1]: # Go to parent of the problematic key + current_level = current_level[key] + + problem_key = key_path[-1] + problem_value = current_level[problem_key] + + # Convert the problematic value to string only if it's not null. + if problem_value is not None: + current_level[problem_key] = str(problem_value) + if logger: + logger.warning("Validating converted record") + return record + return None + return None + +# Set up the logger +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Test cases +def test_validation_string_conversion(): + record = {"name": 123, "age": 30} + try: + validator.validate(record) + except ValidationError as e: + updated_record = handle_validation_error(record, e, logger) + assert ( + updated_record["name"] == "123" + ), "The 'name' should have been converted to a string." + validator.validate(updated_record) # This should not raise an error + +def test_validation_no_error_raised(): + record = {"name": "John", "age": 30} + # This should not raise an error, hence no need to handle validation + validator.validate(record) # This should not raise an error + +def test_validation_null_allowed(): + record = {"name": None, "age": 30} + try: + validator.validate(record) + except ValidationError as e: + updated_record = handle_validation_error(record, e, logger) + assert ( + updated_record is None + ), "The 'name' field is null and should be valid." + +def test_validation_non_string_non_null_field(): + record = {"name": {"first": "John", "last": "Doe"}, "age": 30} + try: + validator.validate(record) + except ValidationError as e: + updated_record = handle_validation_error(record, e, logger) + assert ( + isinstance(updated_record["name"], str) + ), "The 'name' should have been converted to a string." + +def test_nested_dict_string_conversion(): + record = {"name": "John", "age": 30, "address": {"street": 123, "city": "New York"}} + try: + validator.validate(record) + except ValidationError as e: + updated_record = handle_validation_error(record, e, logger) + assert ( + updated_record["address"]["street"] == "123" + ), "The 'street' should have been converted to a string." + validator.validate(updated_record) # This should not raise an error + +def test_nested_dict_with_nested_non_string(): + record = {"name": "John", "age": 30, + "address": {"street": "Main", "city": {"name": "New York"}}} + try: + validator.validate(record) + except ValidationError as e: + updated_record = handle_validation_error(record, e, logger) + assert ( + isinstance(updated_record["address"]["city"], str) + ), "The 'city' should have been converted to a string." + validator.validate(updated_record) # This should not raise an error