Skip to content

Commit

Permalink
Force decimal to string type (#79)
Browse files Browse the repository at this point in the history
* Force decimal to str

* Refactor string casting to be much more permissive
  • Loading branch information
BTheunissen authored Nov 22, 2023
1 parent 5650946 commit 42fa89b
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 105 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "shaped-target-clickhouse"
version = "0.1.11"
version = "0.1.13"
description = "`target-clickhouse` is a Singer target for clickhouse, built with the Meltano Singer SDK."
readme = "README.md"
authors = ["Ben Theunissen"]
Expand Down
2 changes: 1 addition & 1 deletion target_clickhouse/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def to_sql_type(self, jsonschema_type: dict) -> sqlalchemy.types.TypeEngine:
# so we need to use the FLOAT type.
if type(sql_type) == sqlalchemy.types.DECIMAL:
sql_type = typing.cast(
sqlalchemy.types.TypeEngine, sqlalchemy.types.FLOAT(),
sqlalchemy.types.TypeEngine, sqlalchemy.types.String(),
)
elif type(sql_type) == sqlalchemy.types.INTEGER:
sql_type = typing.cast(
Expand Down
93 changes: 45 additions & 48 deletions target_clickhouse/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

from __future__ import annotations

import decimal
from collections.abc import MutableMapping
from logging import Logger
from typing import Any, Iterable

import jsonschema.exceptions as jsonschema_exceptions
Expand Down Expand Up @@ -155,7 +154,7 @@ def _validate_and_parse(self, record: dict) -> dict:
Validated record.
"""
# Pre-validate and correct string type mismatches.
record = self._pre_validate_for_string_type(record)
record = pre_validate_for_string_type(record, self.schema, self.logger)

try:
self._validator.validate(record)
Expand All @@ -171,51 +170,49 @@ def _validate_and_parse(self, record: dict) -> dict:

return record

def _pre_validate_for_string_type(self, record: dict) -> dict:
"""Pre-validate record for string type mismatches and correct them.
Args:
record: Individual record in the stream.
def pre_validate_for_string_type(
record: dict,
schema: dict,
logger: Logger | None = None,
) -> dict:
"""Pre-validate record for string type mismatches and correct them.
Args:
record: Individual record in the stream.
schema: JSON schema for the stream.
logger: Logger to use for logging.
Returns:
Record with corrected string type mismatches.
"""
if schema is None:
if logger:
logger.debug("Schema is None, skipping pre-validation.")
return record

Returns:
Record with corrected string type mismatches.
"""
for key, value in record.items():
# Checking if the schema expects a string for this key.
expected_type = self.schema.get("properties", {}).get(key, {}).get("type")
if expected_type is None:
continue
if not isinstance(expected_type, list):
expected_type = [expected_type]
if "string" in expected_type and not isinstance(value, str):
# Convert the value to string if it's not already a string.
record[key] = (
json.dumps(record[key])
if isinstance(value, (dict, list)) else str(value)
for key, value in record.items():
# Checking if the schema expects a string for this key.
expected_type = schema.get("properties", {}).get(key, {}).get("type")
if expected_type is None:
continue
if not isinstance(expected_type, list):
expected_type = [expected_type]

if "object" in expected_type and isinstance(value, dict):
pre_validate_for_string_type(
value,
schema.get("properties", {}).get(key),
logger,
)
elif "string" in expected_type and not isinstance(value, str):
# Convert the value to string if it's not already a string.
record[key] = (
json.dumps(record[key])
if isinstance(value, (dict, list)) else str(value)
)
if logger:
logger.debug(
f"Converted field {key} to string: {record[key]}",
)
if self.logger:
self.logger.debug(
f"Converted field {key} to string: {record[key]}",
)

return self._convert_decimal_to_float(record)

def _convert_decimal_to_float(self, obj):
"""Recursively convert all Decimal values in a dictionary to floats.
Args:
obj: The input object (dictionary, list, or any other data type).
Returns:
The object with all Decimal values converted to strings.
"""
if isinstance(obj, MutableMapping):
for key, value in obj.items():
obj[key] = self._convert_decimal_to_float(value)
elif isinstance(obj, list):
for i, item in enumerate(obj):
obj[i] = self._convert_decimal_to_float(item)
elif isinstance(obj, decimal.Decimal):
return float(obj)

return obj
return record
83 changes: 28 additions & 55 deletions tests/test_validation.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import json
import logging

from jsonschema import Draft7Validator

from target_clickhouse.sinks import pre_validate_for_string_type

# Schema definitions
schema = {
"type": "object",
Expand All @@ -17,9 +18,16 @@
"type": "object",
"properties": {
"name": {"type": ["string", "null"]},
"age": {"type": "number"},
"age": {"type": "string"},
"address": {
"type": ["string", "null"],
"type": "object",
"properties": {
"street": {"type": ["string", "null"]},
"city": {"type": "string"},
"state": {"type": "string"},
"zip": {"type": "string"},
},
"required": ["street", "city", "state", "zip"],
},
},
"required": ["name", "age", "address"],
Expand All @@ -33,18 +41,6 @@
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def pre_validate_for_string_type(record: dict, schema: dict) -> dict:
for key, value in record.items():
expected_type = schema.get("properties", {}).get(key, {}).get("type")
if "string" in expected_type and not isinstance(value, str):
record[key] = (
json.dumps(value)
if isinstance(value, (dict, list)) else str(value)
)
if logger:
logger.debug(f"Converted field {key} to string: {record[key]}")
return record

# Test cases
def test_validation_string_conversion():
record = {"name": 123, "age": 30}
Expand All @@ -65,69 +61,46 @@ def test_validation_null_allowed():
def test_nested_dict_string_conversion():
record = {
"name": "John", "age": 30,
"address": {"street": 123, "city": "New York"},
}
pre_validated_record = pre_validate_for_string_type(record, nested_schema)
validator.validate(pre_validated_record) # This should not raise an error
assert (
"street" in json.loads(pre_validated_record["address"])
), "The 'address' should have been converted to a JSON string."

def test_nested_dict_with_nested_non_string():
record = {
"name": "John", "age": 30,
"address": {"street": "Main", "city": {"name": "New York"}},
"address": {"street": 123, "city": "New York", "state": "NY", "zip": "10001"},
}
pre_validated_record = pre_validate_for_string_type(record, nested_schema)
validator.validate(pre_validated_record) # This should not raise an error
assert (
"city" in json.loads(pre_validated_record["address"])
), "The 'city' should have been converted to a JSON string."

def test_single_level_schema_nested_dict_to_string():
record = {"name": {"first": "John", "last": "Doe"}, "age": 30, "address": None}
pre_validated_record = pre_validate_for_string_type(record, nested_schema)
nested_validator.validate(pre_validated_record) # This should not raise an error
assert (
json.loads(pre_validated_record["name"]) == {"first": "John", "last": "Doe"}
), "The JSON string is not correct."
pre_validated_record["age"] == "30"
), "The 'age' should have been converted to a string."

def test_single_level_schema_deeply_nested_list_of_dicts_to_string():
record = {
"name": "John",
"age": 30,
"address": [
{"street": "Main", "city": {"name": "New York"}},
{"street": "Second", "city": {"name": "Los Angeles"}},
],
"address": {
"street": "Main",
"city": {"name": "New York"},
"state": "NY",
"zip": ["10001", "10002"],
},
}
pre_validated_record = pre_validate_for_string_type(record, nested_schema)
nested_validator.validate(pre_validated_record) # This should not raise an error
address_list = json.loads(pre_validated_record["address"])
assert (
all("street" in addr for addr in address_list)
), "The JSON string does not correctly represent the nested list of dicts."
isinstance(pre_validated_record["address"]["zip"], str)
), "The JSON string does not represent a str."

def test_multiple_fields_conversion():
# Test record with multiple fields needing conversion
record = {
"name": {"first": "John", "last": "Doe"}, # Expected to be a string
"age": 30,
"address": {"street": "Main", "city": {"name": "New York"}},
"address": {"street": "Main",
"city": {"name": "New York"}, "state": "NY", "zip": 10001},
}
pre_validated_record = pre_validate_for_string_type(record, nested_schema)
nested_validator.validate(pre_validated_record) # This should not raise an error

# Asserting the conversions
assert (
isinstance(pre_validated_record["name"], str)
), "The 'name' should have been converted to a JSON string."
assert (
isinstance(pre_validated_record["address"], str,
)), "The 'address' should have been converted to a JSON string."
assert (
json.loads(pre_validated_record["name"]) == {"first": "John", "last": "Doe"}
), "The JSON string for 'name' is not correct."
isinstance(pre_validated_record["address"]["city"], str)
), "The 'city' should have been converted to a string."
assert (
"street" in json.loads(pre_validated_record["address"])
), "The JSON string for 'address' does not correctly represent the nested dict."
isinstance(pre_validated_record["address"]["zip"], str,
)), "The 'zip' should have been converted to a string."

0 comments on commit 42fa89b

Please sign in to comment.