diff --git a/pyproject.toml b/pyproject.toml index 16864d9..e2fc06d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "shaped-target-clickhouse" -version = "0.1.11" +version = "0.1.13" description = "`target-clickhouse` is a Singer target for clickhouse, built with the Meltano Singer SDK." readme = "README.md" authors = ["Ben Theunissen"] diff --git a/target_clickhouse/connectors.py b/target_clickhouse/connectors.py index 6c103fe..94f0744 100644 --- a/target_clickhouse/connectors.py +++ b/target_clickhouse/connectors.py @@ -63,7 +63,7 @@ def to_sql_type(self, jsonschema_type: dict) -> sqlalchemy.types.TypeEngine: # so we need to use the FLOAT type. if type(sql_type) == sqlalchemy.types.DECIMAL: sql_type = typing.cast( - sqlalchemy.types.TypeEngine, sqlalchemy.types.FLOAT(), + sqlalchemy.types.TypeEngine, sqlalchemy.types.String(), ) elif type(sql_type) == sqlalchemy.types.INTEGER: sql_type = typing.cast( diff --git a/target_clickhouse/sinks.py b/target_clickhouse/sinks.py index a9210d9..03fd9c5 100644 --- a/target_clickhouse/sinks.py +++ b/target_clickhouse/sinks.py @@ -2,8 +2,7 @@ from __future__ import annotations -import decimal -from collections.abc import MutableMapping +from logging import Logger from typing import Any, Iterable import jsonschema.exceptions as jsonschema_exceptions @@ -155,7 +154,7 @@ def _validate_and_parse(self, record: dict) -> dict: Validated record. """ # Pre-validate and correct string type mismatches. - record = self._pre_validate_for_string_type(record) + record = pre_validate_for_string_type(record, self.schema, self.logger) try: self._validator.validate(record) @@ -171,51 +170,49 @@ def _validate_and_parse(self, record: dict) -> dict: return record - def _pre_validate_for_string_type(self, record: dict) -> dict: - """Pre-validate record for string type mismatches and correct them. - - Args: - record: Individual record in the stream. +def pre_validate_for_string_type( + record: dict, + schema: dict, + logger: Logger | None = None, +) -> dict: + """Pre-validate record for string type mismatches and correct them. + + Args: + record: Individual record in the stream. + schema: JSON schema for the stream. + logger: Logger to use for logging. + + Returns: + Record with corrected string type mismatches. + """ + if schema is None: + if logger: + logger.debug("Schema is None, skipping pre-validation.") + return record - Returns: - Record with corrected string type mismatches. - """ - for key, value in record.items(): - # Checking if the schema expects a string for this key. - expected_type = self.schema.get("properties", {}).get(key, {}).get("type") - if expected_type is None: - continue - if not isinstance(expected_type, list): - expected_type = [expected_type] - if "string" in expected_type and not isinstance(value, str): - # Convert the value to string if it's not already a string. - record[key] = ( - json.dumps(record[key]) - if isinstance(value, (dict, list)) else str(value) + for key, value in record.items(): + # Checking if the schema expects a string for this key. + expected_type = schema.get("properties", {}).get(key, {}).get("type") + if expected_type is None: + continue + if not isinstance(expected_type, list): + expected_type = [expected_type] + + if "object" in expected_type and isinstance(value, dict): + pre_validate_for_string_type( + value, + schema.get("properties", {}).get(key), + logger, + ) + elif "string" in expected_type and not isinstance(value, str): + # Convert the value to string if it's not already a string. + record[key] = ( + json.dumps(record[key]) + if isinstance(value, (dict, list)) else str(value) + ) + if logger: + logger.debug( + f"Converted field {key} to string: {record[key]}", ) - if self.logger: - self.logger.debug( - f"Converted field {key} to string: {record[key]}", - ) - - return self._convert_decimal_to_float(record) - def _convert_decimal_to_float(self, obj): - """Recursively convert all Decimal values in a dictionary to floats. - - Args: - obj: The input object (dictionary, list, or any other data type). - - Returns: - The object with all Decimal values converted to strings. - """ - if isinstance(obj, MutableMapping): - for key, value in obj.items(): - obj[key] = self._convert_decimal_to_float(value) - elif isinstance(obj, list): - for i, item in enumerate(obj): - obj[i] = self._convert_decimal_to_float(item) - elif isinstance(obj, decimal.Decimal): - return float(obj) - - return obj + return record diff --git a/tests/test_validation.py b/tests/test_validation.py index ce09ab3..df82159 100644 --- a/tests/test_validation.py +++ b/tests/test_validation.py @@ -1,8 +1,9 @@ -import json import logging from jsonschema import Draft7Validator +from target_clickhouse.sinks import pre_validate_for_string_type + # Schema definitions schema = { "type": "object", @@ -17,9 +18,16 @@ "type": "object", "properties": { "name": {"type": ["string", "null"]}, - "age": {"type": "number"}, + "age": {"type": "string"}, "address": { - "type": ["string", "null"], + "type": "object", + "properties": { + "street": {"type": ["string", "null"]}, + "city": {"type": "string"}, + "state": {"type": "string"}, + "zip": {"type": "string"}, + }, + "required": ["street", "city", "state", "zip"], }, }, "required": ["name", "age", "address"], @@ -33,18 +41,6 @@ logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -def pre_validate_for_string_type(record: dict, schema: dict) -> dict: - for key, value in record.items(): - expected_type = schema.get("properties", {}).get(key, {}).get("type") - if "string" in expected_type and not isinstance(value, str): - record[key] = ( - json.dumps(value) - if isinstance(value, (dict, list)) else str(value) - ) - if logger: - logger.debug(f"Converted field {key} to string: {record[key]}") - return record - # Test cases def test_validation_string_conversion(): record = {"name": 123, "age": 30} @@ -65,69 +61,46 @@ def test_validation_null_allowed(): def test_nested_dict_string_conversion(): record = { "name": "John", "age": 30, - "address": {"street": 123, "city": "New York"}, - } - pre_validated_record = pre_validate_for_string_type(record, nested_schema) - validator.validate(pre_validated_record) # This should not raise an error - assert ( - "street" in json.loads(pre_validated_record["address"]) - ), "The 'address' should have been converted to a JSON string." - -def test_nested_dict_with_nested_non_string(): - record = { - "name": "John", "age": 30, - "address": {"street": "Main", "city": {"name": "New York"}}, + "address": {"street": 123, "city": "New York", "state": "NY", "zip": "10001"}, } pre_validated_record = pre_validate_for_string_type(record, nested_schema) - validator.validate(pre_validated_record) # This should not raise an error - assert ( - "city" in json.loads(pre_validated_record["address"]) - ), "The 'city' should have been converted to a JSON string." - -def test_single_level_schema_nested_dict_to_string(): - record = {"name": {"first": "John", "last": "Doe"}, "age": 30, "address": None} - pre_validated_record = pre_validate_for_string_type(record, nested_schema) nested_validator.validate(pre_validated_record) # This should not raise an error assert ( - json.loads(pre_validated_record["name"]) == {"first": "John", "last": "Doe"} - ), "The JSON string is not correct." + pre_validated_record["age"] == "30" + ), "The 'age' should have been converted to a string." def test_single_level_schema_deeply_nested_list_of_dicts_to_string(): record = { "name": "John", "age": 30, - "address": [ - {"street": "Main", "city": {"name": "New York"}}, - {"street": "Second", "city": {"name": "Los Angeles"}}, - ], + "address": { + "street": "Main", + "city": {"name": "New York"}, + "state": "NY", + "zip": ["10001", "10002"], + }, } pre_validated_record = pre_validate_for_string_type(record, nested_schema) nested_validator.validate(pre_validated_record) # This should not raise an error - address_list = json.loads(pre_validated_record["address"]) assert ( - all("street" in addr for addr in address_list) - ), "The JSON string does not correctly represent the nested list of dicts." + isinstance(pre_validated_record["address"]["zip"], str) + ), "The JSON string does not represent a str." def test_multiple_fields_conversion(): # Test record with multiple fields needing conversion record = { "name": {"first": "John", "last": "Doe"}, # Expected to be a string "age": 30, - "address": {"street": "Main", "city": {"name": "New York"}}, + "address": {"street": "Main", + "city": {"name": "New York"}, "state": "NY", "zip": 10001}, } pre_validated_record = pre_validate_for_string_type(record, nested_schema) nested_validator.validate(pre_validated_record) # This should not raise an error # Asserting the conversions assert ( - isinstance(pre_validated_record["name"], str) - ), "The 'name' should have been converted to a JSON string." - assert ( - isinstance(pre_validated_record["address"], str, - )), "The 'address' should have been converted to a JSON string." - assert ( - json.loads(pre_validated_record["name"]) == {"first": "John", "last": "Doe"} - ), "The JSON string for 'name' is not correct." + isinstance(pre_validated_record["address"]["city"], str) + ), "The 'city' should have been converted to a string." assert ( - "street" in json.loads(pre_validated_record["address"]) - ), "The JSON string for 'address' does not correctly represent the nested dict." + isinstance(pre_validated_record["address"]["zip"], str, + )), "The 'zip' should have been converted to a string."