Skip to content

Commit

Permalink
Rework validation with tests
Browse files Browse the repository at this point in the history
  • Loading branch information
BTheunissen committed Nov 9, 2023
1 parent d0b2aab commit 9a81e9e
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 14 deletions.
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ ignore = [
"D401", # First line should be in imperative mood
"D407", # Missing dashed underline after section
"FA100", # Forbidden import
"S101", # Use of assert detected
"G004", # Logging statement uses string formatting
"TCH002",
"TCH003",
]
select = ["ALL"]
src = ["target_clickhouse"]
Expand Down
49 changes: 35 additions & 14 deletions target_clickhouse/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

from __future__ import annotations

import logging
from typing import Any, Iterable

import jsonschema.exceptions as jsonschema_exceptions
import simplejson as json
import sqlalchemy
from jsonschema import ValidationError
from pendulum import now
from singer_sdk.sinks import SQLSink
from sqlalchemy.sql.expression import bindparam
Expand Down Expand Up @@ -150,24 +152,43 @@ def _validate_and_parse(self, record: dict) -> dict:
try:
self._validator.validate(record)
except jsonschema_exceptions.ValidationError as e:
if "is not of type" in e.message and "'string'" in e.message:
self.logger.warning(
"Received non valid record for string type, "
"attempting forced conversion to string",
)
for key, value in record.items():
if isinstance(value, dict):
record[key] = json.dumps(value)
elif not isinstance(value, str):
record[key] = str(value)
self.logger.warning("Validating converted record")
self._validator.validate(record)
else:
raise
record = handle_validation_error(record, e, self.logger)
self._validator.validate(record)

self._parse_timestamps_in_record(
record=record,
schema=self.schema,
treatment=self.datetime_error_treatment,
)
return record


def handle_validation_error(record,
e: ValidationError,
logger: logging.Logger | None = None):
if "'string'" in e.message:
if logger:
logger.warning(
f"Received non valid record for types 'string', {e.path}, "
f"attempting conversion for record, {record}",
)

# e.path is deque which is iterable, we convert it to list to access by index
key_path = list(e.path)

# Access the problematic value using the key_path
current_level = record
for key in key_path[:-1]: # Go to the parent level of the problematic key
current_level = current_level[key]

problem_key = key_path[-1]
problem_value = current_level[problem_key]

# Convert the problematic value to string only if it's not null
if problem_value is not None:
current_level[problem_key] = str(problem_value)
if logger:
logger.warning("Validating converted record")
return record
return None
return None
130 changes: 130 additions & 0 deletions tests/test_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import logging
from typing import Optional

from jsonschema import Draft7Validator, ValidationError

# Schema that allows a field to be either a string or null
schema = {
"type": "object",
"properties": {
"name": {"type": ["string", "null"]},
"age": {"type": "number"},
},
"required": ["name", "age"],
}

nested_schema = {
"type": "object",
"properties": {
"name": {"type": ["string", "null"]},
"age": {"type": "number"},
"address": {
"type": "object",
"properties": {
"street": {"type": "string"},
"city": {"type": "string"},
},
"required": ["street", "city"],
},
},
"required": ["name", "age", "address"],
}


# Validator instance
validator = Draft7Validator(schema)

# Function to handle validation errors
def handle_validation_error(record,
e: ValidationError,
logger: Optional[logging.Logger] = None):
if "'string'" in e.message:
if logger:
logger.warning(
f"Received non valid record for types 'string', {e.path}, "
f"attempting conversion for record, {record}",
)


key_path = list(e.path)

# Access the problematic value using the key_path
current_level = record
for key in key_path[:-1]: # Go to parent of the problematic key
current_level = current_level[key]

problem_key = key_path[-1]
problem_value = current_level[problem_key]

# Convert the problematic value to string only if it's not null.
if problem_value is not None:
current_level[problem_key] = str(problem_value)
if logger:
logger.warning("Validating converted record")
return record
return None
return None

# Set up the logger
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Test cases
def test_validation_string_conversion():
record = {"name": 123, "age": 30}
try:
validator.validate(record)
except ValidationError as e:
updated_record = handle_validation_error(record, e, logger)
assert (
updated_record["name"] == "123"
), "The 'name' should have been converted to a string."
validator.validate(updated_record) # This should not raise an error

def test_validation_no_error_raised():
record = {"name": "John", "age": 30}
# This should not raise an error, hence no need to handle validation
validator.validate(record) # This should not raise an error

def test_validation_null_allowed():
record = {"name": None, "age": 30}
try:
validator.validate(record)
except ValidationError as e:
updated_record = handle_validation_error(record, e, logger)
assert (
updated_record is None
), "The 'name' field is null and should be valid."

def test_validation_non_string_non_null_field():
record = {"name": {"first": "John", "last": "Doe"}, "age": 30}
try:
validator.validate(record)
except ValidationError as e:
updated_record = handle_validation_error(record, e, logger)
assert (
isinstance(updated_record["name"], str)
), "The 'name' should have been converted to a string."

def test_nested_dict_string_conversion():
record = {"name": "John", "age": 30, "address": {"street": 123, "city": "New York"}}
try:
validator.validate(record)
except ValidationError as e:
updated_record = handle_validation_error(record, e, logger)
assert (
updated_record["address"]["street"] == "123"
), "The 'street' should have been converted to a string."
validator.validate(updated_record) # This should not raise an error

def test_nested_dict_with_nested_non_string():
record = {"name": "John", "age": 30,
"address": {"street": "Main", "city": {"name": "New York"}}}
try:
validator.validate(record)
except ValidationError as e:
updated_record = handle_validation_error(record, e, logger)
assert (
isinstance(updated_record["address"]["city"], str)
), "The 'city' should have been converted to a string."
validator.validate(updated_record) # This should not raise an error

0 comments on commit 9a81e9e

Please sign in to comment.