diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/cdk/load/data/TimeStringUtility.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/cdk/load/data/TimeStringUtility.kt new file mode 100644 index 000000000000..d3342329b19b --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/cdk/load/data/TimeStringUtility.kt @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.data + +import java.time.LocalDate +import java.time.LocalDateTime +import java.time.LocalTime +import java.time.OffsetDateTime +import java.time.OffsetTime +import java.time.ZoneOffset +import java.time.ZonedDateTime + +// TODO Copied from load-iceberg-parquet. This class needs to be extracted +object TimeStringUtility { + + fun toLocalDate(dateString: String): LocalDate { + return LocalDate.parse(dateString, TimeStringToInteger.DATE_TIME_FORMATTER) + } + + fun toLocalDateTime(dateString: String): LocalDateTime { + return LocalDateTime.parse(dateString, TimeStringToInteger.DATE_TIME_FORMATTER) + } + + fun toOffset(timeString: String): LocalTime { + return try { + toMicrosOfDayWithTimezone(timeString) + } catch (e: Exception) { + toMicrosOfDayWithoutTimezone(timeString) + } + } + + private fun toMicrosOfDayWithTimezone(timeString: String): LocalTime { + return OffsetTime.parse(timeString, TimeStringToInteger.TIME_FORMATTER).toLocalTime() + } + + private fun toMicrosOfDayWithoutTimezone(timeString: String): LocalTime { + return LocalTime.parse(timeString, TimeStringToInteger.TIME_FORMATTER) + } + + fun toOffsetDateTime(timestampString: String): OffsetDateTime { + return try { + toOffsetDateTimeWithTimezone(timestampString) + } catch (e: Exception) { + toOffsetDateTimeWithoutTimezone(timestampString) + } + } + + private fun toOffsetDateTimeWithTimezone(timestampString: String): OffsetDateTime { + return ZonedDateTime.parse(timestampString, TimeStringToInteger.DATE_TIME_FORMATTER) + .toOffsetDateTime() + } + + private fun toOffsetDateTimeWithoutTimezone(timestampString: String): OffsetDateTime { + return LocalDateTime.parse(timestampString, TimeStringToInteger.DATE_TIME_FORMATTER) + .atOffset(ZoneOffset.UTC) + } +} diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteTypeToSqlType.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteTypeToSqlType.kt new file mode 100644 index 000000000000..0f4a50ac7e6c --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteTypeToSqlType.kt @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.mssql.v2.convert + +import io.airbyte.cdk.load.data.AirbyteType +import io.airbyte.cdk.load.data.ArrayType +import io.airbyte.cdk.load.data.ArrayTypeWithoutSchema +import io.airbyte.cdk.load.data.BooleanType +import io.airbyte.cdk.load.data.DateType +import io.airbyte.cdk.load.data.IntegerType +import io.airbyte.cdk.load.data.NumberType +import io.airbyte.cdk.load.data.ObjectType +import io.airbyte.cdk.load.data.ObjectTypeWithEmptySchema +import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema +import io.airbyte.cdk.load.data.StringType +import io.airbyte.cdk.load.data.TimeTypeWithTimezone +import io.airbyte.cdk.load.data.TimeTypeWithoutTimezone +import io.airbyte.cdk.load.data.TimestampTypeWithTimezone +import io.airbyte.cdk.load.data.TimestampTypeWithoutTimezone +import io.airbyte.cdk.load.data.UnionType +import io.airbyte.cdk.load.data.UnknownType +import io.airbyte.integrations.destination.mssql.v2.model.SqlColumn +import io.airbyte.integrations.destination.mssql.v2.model.SqlTable +import java.sql.Types + +/** CDK pipeline [AirbyteType] to SQL [Types] converter. */ +class AirbyteTypeToSqlType { + + /** + * Converts an [AirbyteType] to the associated SQL [Types] value. + * + * @param airbyteSchema The stream's Airbyte schema, represented as an [AirbyteType] + * @return The associated SQL [Types] value. + * @throws IllegalArgumentException if the [AirbyteType] is not supported. + */ + fun convert(airbyteSchema: AirbyteType): Int { + return when (airbyteSchema) { + is ObjectType -> Types.BLOB + is ArrayType -> Types.BLOB + is ArrayTypeWithoutSchema -> Types.BLOB + is BooleanType -> Types.BOOLEAN + is DateType -> Types.DATE + is IntegerType -> Types.BIGINT + is NumberType -> Types.DECIMAL + is ObjectTypeWithEmptySchema -> Types.BLOB + is ObjectTypeWithoutSchema -> Types.BLOB + is StringType -> Types.VARCHAR + is TimeTypeWithTimezone -> Types.TIME_WITH_TIMEZONE + is TimeTypeWithoutTimezone -> Types.TIME + is TimestampTypeWithTimezone -> Types.TIMESTAMP_WITH_TIMEZONE + is TimestampTypeWithoutTimezone -> Types.TIMESTAMP + is UnionType -> Types.BLOB + is UnknownType -> Types.BLOB + } + } +} + +/** + * Extension function that converts an [ObjectType] into a [SqlTable] that can be used to define a + * SQL table. + * + * @param primaryKeys The list of configured primary key properties that should be treated as + * primary keys in the generated [SqlTable] + * @return The [SqlTable] that represents the table to be mapped to the stream represented by the + * [ObjectType]. + */ +fun ObjectType.toSqlTable(primaryKeys: List>): SqlTable { + val identifierFieldNames = primaryKeys.flatten().toSet() + val sqlTypeConverter = AirbyteTypeToSqlType() + val columns = + this.properties.entries.map { (name, field) -> + val isPrimaryKey = identifierFieldNames.contains(name) + val isNullable = !isPrimaryKey && field.nullable + SqlColumn( + name = name, + type = sqlTypeConverter.convert(field.type), + isPrimaryKey = isPrimaryKey, + isNullable = isNullable + ) + } + return SqlTable(columns = columns) +} diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteValueToSqlValue.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteValueToSqlValue.kt new file mode 100644 index 000000000000..c3c1fede77e9 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteValueToSqlValue.kt @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.mssql.v2.convert + +import io.airbyte.cdk.load.data.AirbyteValue +import io.airbyte.cdk.load.data.ArrayValue +import io.airbyte.cdk.load.data.BooleanValue +import io.airbyte.cdk.load.data.DateValue +import io.airbyte.cdk.load.data.IntegerValue +import io.airbyte.cdk.load.data.NullValue +import io.airbyte.cdk.load.data.NumberValue +import io.airbyte.cdk.load.data.ObjectValue +import io.airbyte.cdk.load.data.StringValue +import io.airbyte.cdk.load.data.TimeStringUtility.toLocalDate +import io.airbyte.cdk.load.data.TimeStringUtility.toLocalDateTime +import io.airbyte.cdk.load.data.TimeStringUtility.toOffset +import io.airbyte.cdk.load.data.TimeValue +import io.airbyte.cdk.load.data.TimestampValue +import io.airbyte.cdk.load.data.UnknownValue +import io.airbyte.cdk.load.util.serializeToJsonBytes +import java.sql.Date +import java.sql.Time +import java.sql.Timestamp + +/** CDK pipeline [AirbyteValue] to SQL values converter. */ +class AirbyteValueToSqlValue { + + /** + * Converts an [AirbyteValue] to the associated SQL value. + * + * @param airbyteValue The [AirbyteValue] from an Airbyte record + * @return The corresponding SQL value for the given [AirbyteValue]. + * @throws IllegalArgumentException if the [AirbyteValue] is not supported. + */ + fun convert(airbyteValue: AirbyteValue): Any? { + return when (airbyteValue) { + is ObjectValue -> { + val convertedValues = + airbyteValue.values.entries.associate { (name, value) -> + name to convert(value) + } + convertedValues + } + is ArrayValue -> airbyteValue.values.map { convert(it) } + is BooleanValue -> airbyteValue.value + is DateValue -> Date.valueOf(toLocalDate(airbyteValue.value)) + is IntegerValue -> airbyteValue.value + is NullValue -> null + is NumberValue -> airbyteValue.value.toDouble().toBigDecimal() + is StringValue -> airbyteValue.value + is TimeValue -> Time.valueOf(toOffset(airbyteValue.value)) + is TimestampValue -> Timestamp.valueOf(toLocalDateTime(airbyteValue.value)) + is UnknownValue -> airbyteValue.value.serializeToJsonBytes() + } + } +} + +/** + * Extension function that converts an [ObjectValue] into a row of SQL values. + * + * @param sqlTable The [SqlTable] that contains data type information for each column. This is used + * to filter the [ObjectValue]'s values to only those that exist in the table. + * @return A [SqlTableRow] that contains values converted to their SQL data type equivalents from + * the provided [ObjectValue]. + */ +fun ObjectValue.toSqlValue(sqlTable: SqlTable): SqlTableRow { + val converter = AirbyteValueToSqlValue() + return SqlTableRow( + values = + this.values + .filter { (name, _) -> sqlTable.columns.find { it.name == name } != null } + .map { (name, value) -> + val dataType = sqlTable.columns.find { it.name == name }!!.type + val converted = + when (value) { + is ObjectValue -> + (converter.convert(value) as LinkedHashMap<*, *>) + .serializeToJsonBytes() + is ArrayValue -> + (converter.convert(value) as List<*>).serializeToJsonBytes() + else -> converter.convert(value) + } + SqlTableRowValue(name = name, value = converted, type = dataType) + } + ) +} diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/model/SqlModels.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/model/SqlModels.kt new file mode 100644 index 000000000000..59f4457b61e1 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/model/SqlModels.kt @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.mssql.v2.model + +import java.sql.Types + +/** + * Representation of a colum in a SQL table. + * + * @param name The name of the column + * @param type The data type of the column (see [Types] for values). + * @param isPrimaryKey Whether the column represents a primary key. + * @param isNullable Whether the column's value supports null values. + */ +data class SqlColumn( + val name: String, + val type: Int, + val isPrimaryKey: Boolean = false, + val isNullable: Boolean = false +) + +/** + * Representation of a SQL table. + * + * @param columns The list of columns in the table. + */ +data class SqlTable(val columns: List) + +/** + * Representation of a value in a SQL row/column cell. + * + * @param name The name of the column. + * @param value The value of the row/column cell. + * @param type The SQL type of the row/column cell (see [Types] for values). + */ +data class SqlTableRowValue(val name: String, val value: Any?, val type: Int) + +/** + * Representation of a row of values in a SQL table. + * + * @param values A list of values stored in the row. + */ +data class SqlTableRow(val values: List) diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/test/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteTypeToSqlTypeTest.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/test/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteTypeToSqlTypeTest.kt new file mode 100644 index 000000000000..24ec8aea1da3 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/test/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteTypeToSqlTypeTest.kt @@ -0,0 +1,177 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.mssql.v2.convert + +import com.fasterxml.jackson.databind.JsonNode +import io.airbyte.cdk.load.data.ArrayType +import io.airbyte.cdk.load.data.ArrayTypeWithoutSchema +import io.airbyte.cdk.load.data.BooleanType +import io.airbyte.cdk.load.data.DateType +import io.airbyte.cdk.load.data.FieldType +import io.airbyte.cdk.load.data.IntegerType +import io.airbyte.cdk.load.data.NumberType +import io.airbyte.cdk.load.data.ObjectType +import io.airbyte.cdk.load.data.ObjectTypeWithEmptySchema +import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema +import io.airbyte.cdk.load.data.StringType +import io.airbyte.cdk.load.data.TimeTypeWithTimezone +import io.airbyte.cdk.load.data.TimeTypeWithoutTimezone +import io.airbyte.cdk.load.data.TimestampTypeWithTimezone +import io.airbyte.cdk.load.data.TimestampTypeWithoutTimezone +import io.airbyte.cdk.load.data.UnionType +import io.airbyte.cdk.load.data.UnknownType +import io.mockk.mockk +import java.sql.Types +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Test + +class AirbyteTypeToSqlTypeTest { + + private val converter = AirbyteTypeToSqlType() + + @Test + fun testConvertObjectType() { + val objectType = + ObjectType( + linkedMapOf( + "id" to FieldType(IntegerType, false), + "name" to FieldType(StringType, true), + ), + ) + val result = converter.convert(objectType) + assertEquals(Types.BLOB, result) + } + + @Test + fun testConvertArrayType() { + val arrayType = ArrayType(FieldType(IntegerType, false)) + val result = converter.convert(arrayType) + assertEquals(Types.BLOB, result) + } + + @Test + fun testConvertArrayTypeWithoutSchema() { + val arrayType = ArrayTypeWithoutSchema + val result = converter.convert(arrayType) + assertEquals(Types.BLOB, result) + } + + @Test + fun testConvertBooleanType() { + val booleanType = BooleanType + val result = converter.convert(booleanType) + assertEquals(Types.BOOLEAN, result) + } + + @Test + fun testConvertDateType() { + val dateType = DateType + val result = converter.convert(dateType) + assertEquals(Types.DATE, result) + } + + @Test + fun testConvertIntegerType() { + val integerType = IntegerType + val result = converter.convert(integerType) + assertEquals(Types.BIGINT, result) + } + + @Test + fun testConvertNumberType() { + val numberType = NumberType + val result = converter.convert(numberType) + assertEquals(Types.DECIMAL, result) + } + + @Test + fun testConvertObjectTypeWithEmptySchema() { + val objectType = ObjectTypeWithEmptySchema + val result = converter.convert(objectType) + assertEquals(Types.BLOB, result) + } + + @Test + fun testConvertObjectTypeWithoutSchema() { + val objectType = ObjectTypeWithoutSchema + val result = converter.convert(objectType) + assertEquals(Types.BLOB, result) + } + + @Test + fun testConvertStringType() { + val stringType = StringType + val result = converter.convert(stringType) + assertEquals(Types.VARCHAR, result) + } + + @Test + fun testConvertTimeTypeWithTimezone() { + val timeType = TimeTypeWithTimezone + val result = converter.convert(timeType) + assertEquals(Types.TIME_WITH_TIMEZONE, result) + } + + @Test + fun testConvertTimeTypeWithoutTimezone() { + val timeType = TimeTypeWithoutTimezone + val result = converter.convert(timeType) + assertEquals(Types.TIME, result) + } + + @Test + fun testConvertTimestampTypeWithTimezone() { + val timestampType = TimestampTypeWithTimezone + val result = converter.convert(timestampType) + assertEquals(Types.TIMESTAMP_WITH_TIMEZONE, result) + } + + @Test + fun testConvertTimestampTypeWithoutTimezone() { + val timestampType = TimestampTypeWithoutTimezone + val result = converter.convert(timestampType) + assertEquals(Types.TIMESTAMP, result) + } + + @Test + fun testConvertUnionType() { + val unionType = UnionType(setOf(StringType, NumberType)) + val result = converter.convert(unionType) + assertEquals(Types.BLOB, result) + } + + @Test + fun testConvertUnknownType() { + val unknownType = UnknownType(mockk()) + val result = converter.convert(unknownType) + assertEquals(Types.BLOB, result) + } + + @Test + fun testToSqlTable() { + val primaryKey = "id" + val nullableColumn = "email" + val objectType = + ObjectType( + linkedMapOf( + primaryKey to FieldType(IntegerType, false), + "age" to FieldType(IntegerType, false), + nullableColumn to FieldType(StringType, true), + ), + ) + val primaryKeys = listOf(listOf(primaryKey)) + val table = objectType.toSqlTable(primaryKeys = primaryKeys) + + assertEquals(objectType.properties.size, table.columns.size) + objectType.properties.forEach { (name, type) -> + val column = table.columns.find { it.name == name } + assertNotNull(column) + assertEquals(converter.convert(type.type), column?.type) + assertEquals(primaryKey == name, column?.isPrimaryKey) + assertEquals(nullableColumn == name, column?.isNullable) + } + } +} diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/test/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteValueToSqlValueTest.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/test/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteValueToSqlValueTest.kt new file mode 100644 index 000000000000..0fa25c1624f5 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/test/kotlin/io/airbyte/integrations/destination/mssql/v2/convert/AirbyteValueToSqlValueTest.kt @@ -0,0 +1,257 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.mssql.v2.convert + +import io.airbyte.cdk.load.data.ArrayValue +import io.airbyte.cdk.load.data.DateValue +import io.airbyte.cdk.load.data.IntegerValue +import io.airbyte.cdk.load.data.NullValue +import io.airbyte.cdk.load.data.NumberValue +import io.airbyte.cdk.load.data.ObjectValue +import io.airbyte.cdk.load.data.StringValue +import io.airbyte.cdk.load.data.TimeStringUtility.toLocalDate +import io.airbyte.cdk.load.data.TimeStringUtility.toLocalDateTime +import io.airbyte.cdk.load.data.TimeStringUtility.toOffset +import io.airbyte.cdk.load.data.TimeValue +import io.airbyte.cdk.load.data.TimestampValue +import io.airbyte.cdk.load.data.UnknownValue +import io.airbyte.cdk.load.util.Jsons +import io.airbyte.cdk.load.util.serializeToJsonBytes +import java.math.BigDecimal +import java.math.BigInteger +import java.sql.Date +import java.sql.Time +import java.sql.Timestamp +import java.sql.Types +import java.time.ZoneOffset +import org.junit.jupiter.api.Assertions.assertArrayEquals +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Assertions.assertNull +import org.junit.jupiter.api.Test + +internal class AirbyteValueToSqlValueTest { + + private val converter = AirbyteValueToSqlValue() + + @Test + fun testConvertObjectValue() { + val objectValue = + ObjectValue(linkedMapOf("id" to IntegerValue(42L), "name" to StringValue("John Doe"))) + val result = converter.convert(objectValue) + assertEquals(LinkedHashMap::class.java, result?.javaClass) + assertEquals(mapOf("id" to 42.toBigInteger(), "name" to "John Doe"), result) + } + + @Test + fun testConvertArrayValue() { + val arrayValue = ArrayValue(listOf(StringValue("John Doe"), IntegerValue(42L))) + val result = converter.convert(arrayValue) + assertEquals(ArrayList::class.java, result?.javaClass) + assertEquals(listOf("John Doe", 42.toBigInteger()), result) + } + + @Test + fun testConvertDateValue() { + val dateValue = DateValue("2024-11-18") + val result = converter.convert(dateValue) + assertEquals(Date::class.java, result?.javaClass) + assertEquals( + toLocalDate(dateValue.value).atStartOfDay().toInstant(ZoneOffset.UTC).toEpochMilli(), + (result as Date).time + ) + } + + @Test + fun testConvertIntegerValue() { + val intValue = IntegerValue(42) + val result = converter.convert(intValue) + assertEquals(BigInteger::class.java, result?.javaClass) + assertEquals(42.toBigInteger(), result) + } + + @Test + fun testConvertNullValue() { + val nullValue = NullValue + val result = converter.convert(nullValue) + assertNull(result) + } + + @Test + fun testConvertNumberValue() { + val numberValue = NumberValue(42.5.toBigDecimal()) + val result = converter.convert(numberValue) + assertEquals(BigDecimal::class.java, result?.javaClass) + assertEquals(42.5.toBigDecimal(), result) + } + + @Test + fun testConvertStringValue() { + val stringValue = StringValue("test") + val result = converter.convert(stringValue) + assertEquals(String::class.java, result?.javaClass) + assertEquals("test", result) + } + + @Test + fun testConvertTimeValue() { + val timeValue = TimeValue("12:34:56") + val result = converter.convert(timeValue) + assertEquals(Time::class.java, result?.javaClass) + assertEquals(Time.valueOf(toOffset(timeValue.value)).time, (result as Time).time) + } + + @Test + fun testConvertTimestampValue() { + val timestampValue = TimestampValue("2024-11-18T12:34:56Z") + val result = converter.convert(timestampValue) + assertEquals(Timestamp::class.java, result?.javaClass) + assertEquals( + Timestamp.valueOf(toLocalDateTime(timestampValue.value)).time, + (result as Timestamp).time + ) + } + + @Test + fun testConvertUnknownValue() { + val jsonNode = Jsons.createObjectNode().put("id", "unknownValue") + val unknownValue = UnknownValue(jsonNode) + val result = converter.convert(unknownValue) + assertEquals(ByteArray::class.java, result?.javaClass) + assertArrayEquals(Jsons.writeValueAsBytes(unknownValue.value), result as ByteArray) + } + + @Test + fun testToSqlValue() { + val sqlTable = + SqlTable( + listOf( + SqlColumn( + name = "id", + type = Types.INTEGER, + isPrimaryKey = true, + isNullable = false + ), + SqlColumn( + name = "name", + type = Types.VARCHAR, + isPrimaryKey = false, + isNullable = true + ), + SqlColumn( + name = "meta", + type = Types.BLOB, + isPrimaryKey = false, + isNullable = false + ), + SqlColumn( + name = "items", + type = Types.BLOB, + isPrimaryKey = false, + isNullable = false + ) + ) + ) + val objectValue = + ObjectValue( + linkedMapOf( + "id" to IntegerValue(123L), + "name" to StringValue("John Doe"), + "meta" to + ObjectValue( + linkedMapOf( + "sync_id" to IntegerValue(123L), + "changes" to + ObjectValue( + linkedMapOf( + "change" to StringValue("insert"), + "reason" to StringValue("reason"), + ) + ) + ) + ), + "items" to ArrayValue(listOf(StringValue("item1"), StringValue("item2"))) + ) + ) + + val sqlValue = objectValue.toSqlValue(sqlTable) + + assertEquals(sqlTable.columns.size, sqlValue.values.size) + assertEquals( + BigInteger::class.java, + sqlValue.values.find { it.name == "id" }?.value?.javaClass + ) + assertEquals(123.toBigInteger(), sqlValue.values.find { it.name == "id" }?.value) + assertEquals( + String::class.java, + sqlValue.values.find { it.name == "name" }?.value?.javaClass + ) + assertEquals("John Doe", sqlValue.values.find { it.name == "name" }?.value) + assertEquals( + ByteArray::class.java, + sqlValue.values.find { it.name == "meta" }?.value?.javaClass + ) + assertArrayEquals( + mapOf( + "sync_id" to 123.toBigInteger(), + "changes" to + mapOf( + "change" to "insert", + "reason" to "reason", + ) + ) + .serializeToJsonBytes(), + sqlValue.values.find { it.name == "meta" }?.value as ByteArray + ) + assertEquals( + ByteArray::class.java, + sqlValue.values.find { it.name == "items" }?.value?.javaClass + ) + assertArrayEquals( + listOf("item1", "item2").serializeToJsonBytes(), + sqlValue.values.find { it.name == "items" }?.value as ByteArray + ) + } + + @Test + fun testToSqlValueIgnoresFieldsNotInTable() { + val sqlTable = + SqlTable( + listOf( + SqlColumn( + name = "id", + type = Types.INTEGER, + isPrimaryKey = true, + isNullable = false + ), + ) + ) + val objectValue = + ObjectValue( + linkedMapOf( + "id" to IntegerValue(123L), + "name" to StringValue("Should be ignored"), + ) + ) + + val sqlValue = objectValue.toSqlValue(sqlTable) + assertEquals(sqlTable.columns.size, sqlValue.values.size) + assertEquals( + BigInteger::class.java, + sqlValue.values.find { it.name == "id" }?.value?.javaClass + ) + assertEquals(123.toBigInteger(), sqlValue.values.find { it.name == "id" }?.value) + } + + @Test + fun testObjectMapToJsonBytes() { + val objectValue = + ObjectValue(linkedMapOf("id" to IntegerValue(42L), "name" to StringValue("John Doe"))) + val objectValueMap = converter.convert(objectValue) + val jsonBytes = objectValueMap?.serializeToJsonBytes() + assertNotNull(jsonBytes) + assertArrayEquals(Jsons.writeValueAsBytes(objectValueMap), jsonBytes) + } +}